From 0bccf1b110e53b1954f9af890f19fb05689df32c Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 17 Jan 2025 10:37:32 -0500 Subject: [PATCH] fix: tx client concurrency test (backport #4104) (#4239) This PR makes two small tweaks: - Fixes `TestConcurrentTxSubmission` by adding a capacity of 1 to the errCh. Currently errors were being ignored because the wait group meant that there wasn't a process to read to the channel as it was being written to. This fixes this - Catches the case where a user cancels the context when calling `ConfirmTx` **This test is broken until https://github.com/celestiaorg/celestia-core/pull/1553 is resolved**
This is an automatic backport of pull request #4104 done by [Mergify](https://mergify.com). Co-authored-by: Callum Waters --- pkg/user/e2e_test.go | 88 +++++++++++++++++++++++-------------------- pkg/user/tx_client.go | 3 ++ 2 files changed, 51 insertions(+), 40 deletions(-) diff --git a/pkg/user/e2e_test.go b/pkg/user/e2e_test.go index e42710c22f..dcf15554b1 100644 --- a/pkg/user/e2e_test.go +++ b/pkg/user/e2e_test.go @@ -3,6 +3,7 @@ package user_test import ( "context" "errors" + "fmt" "sync" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/celestiaorg/celestia-app/v3/test/util/testnode" "github.com/celestiaorg/go-square/v2/share" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/config" tmrand "github.com/tendermint/tendermint/libs/rand" ) @@ -21,50 +23,56 @@ func TestConcurrentTxSubmission(t *testing.T) { t.Skip("skipping in short mode") } - // Setup network - tmConfig := testnode.DefaultTendermintConfig() - tmConfig.Consensus.TimeoutCommit = 10 * time.Second - ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig)) - _, err := ctx.WaitForHeight(1) - require.NoError(t, err) + // Iterate over all mempool versions + mempools := []string{config.MempoolV0, config.MempoolV1, config.MempoolV2} + for _, mempool := range mempools { + t.Run(fmt.Sprintf("mempool %s", mempool), func(t *testing.T) { + // Setup network + tmConfig := testnode.DefaultTendermintConfig() + tmConfig.Mempool.Version = mempool + tmConfig.Consensus.TimeoutCommit = 10 * time.Second + ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig)) + _, err := ctx.WaitForHeight(1) + require.NoError(t, err) - // Setup signer - txClient, err := testnode.NewTxClientFromContext(ctx) - require.NoError(t, err) + // Setup signer + txClient, err := testnode.NewTxClientFromContext(ctx) + require.NoError(t, err) - // Pregenerate all the blobs - numTxs := 10 - blobs := blobfactory.ManyRandBlobs(tmrand.NewRand(), blobfactory.Repeat(2048, numTxs)...) + // Pregenerate all the blobs + numTxs := 100 + blobs := blobfactory.ManyRandBlobs(tmrand.NewRand(), blobfactory.Repeat(2048, numTxs)...) - // Prepare transactions - var ( - wg sync.WaitGroup - errCh = make(chan error) - ) + // Prepare transactions + var ( + wg sync.WaitGroup + errCh = make(chan error, 1) + ) - subCtx, cancel := context.WithCancel(ctx.GoContext()) - defer cancel() - time.AfterFunc(time.Minute, cancel) - for i := 0; i < numTxs; i++ { - wg.Add(1) - go func(b *share.Blob) { - defer wg.Done() - _, err := txClient.SubmitPayForBlob(subCtx, []*share.Blob{b}, user.SetGasLimitAndGasPrice(500_000, appconsts.DefaultMinGasPrice)) - if err != nil && !errors.Is(err, context.Canceled) { - // only catch the first error - select { - case errCh <- err: - cancel() - default: - } + subCtx, cancel := context.WithCancel(ctx.GoContext()) + defer cancel() + time.AfterFunc(time.Minute, cancel) + for i := 0; i < numTxs; i++ { + wg.Add(1) + go func(b *share.Blob) { + defer wg.Done() + _, err := txClient.SubmitPayForBlob(subCtx, []*share.Blob{b}, user.SetGasLimitAndGasPrice(500_000, appconsts.DefaultMinGasPrice)) + if err != nil && !errors.Is(err, context.Canceled) { + // only catch the first error + select { + case errCh <- err: + cancel() + default: + } + } + }(blobs[i]) } - }(blobs[i]) - } - wg.Wait() - - select { - case err := <-errCh: - require.NoError(t, err) - default: + wg.Wait() + select { + case err := <-errCh: + require.NoError(t, err) + default: + } + }) } } diff --git a/pkg/user/tx_client.go b/pkg/user/tx_client.go index ce2f7b8933..d4032b814a 100644 --- a/pkg/user/tx_client.go +++ b/pkg/user/tx_client.go @@ -463,6 +463,9 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon return nil, client.handleEvictions(txHash) default: client.deleteFromTxTracker(txHash) + if ctx.Err() != nil { + return nil, ctx.Err() + } return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash) } }