Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: tx client concurrency test #4104

Merged
merged 8 commits into from
Jan 17, 2025
88 changes: 48 additions & 40 deletions pkg/user/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package user_test
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/celestiaorg/celestia-app/v3/test/util/testnode"
"github.com/celestiaorg/go-square/v2/share"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/config"
tmrand "github.com/tendermint/tendermint/libs/rand"
)

Expand All @@ -21,50 +23,56 @@ func TestConcurrentTxSubmission(t *testing.T) {
t.Skip("skipping in short mode")
}

// Setup network
tmConfig := testnode.DefaultTendermintConfig()
tmConfig.Consensus.TimeoutCommit = 10 * time.Second
ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig))
_, err := ctx.WaitForHeight(1)
require.NoError(t, err)
// Iterate over all mempool versions
mempools := []string{config.MempoolV0, config.MempoolV1, config.MempoolV2}
for _, mempool := range mempools {
t.Run(fmt.Sprintf("mempool %s", mempool), func(t *testing.T) {
// Setup network
tmConfig := testnode.DefaultTendermintConfig()
tmConfig.Mempool.Version = mempool
tmConfig.Consensus.TimeoutCommit = 10 * time.Second
ctx, _, _ := testnode.NewNetwork(t, testnode.DefaultConfig().WithTendermintConfig(tmConfig))
_, err := ctx.WaitForHeight(1)
require.NoError(t, err)

// Setup signer
txClient, err := testnode.NewTxClientFromContext(ctx)
require.NoError(t, err)
// Setup signer
txClient, err := testnode.NewTxClientFromContext(ctx)
require.NoError(t, err)

// Pregenerate all the blobs
numTxs := 10
blobs := blobfactory.ManyRandBlobs(tmrand.NewRand(), blobfactory.Repeat(2048, numTxs)...)
// Pregenerate all the blobs
numTxs := 100
blobs := blobfactory.ManyRandBlobs(tmrand.NewRand(), blobfactory.Repeat(2048, numTxs)...)

// Prepare transactions
var (
wg sync.WaitGroup
errCh = make(chan error)
)
// Prepare transactions
var (
wg sync.WaitGroup
errCh = make(chan error, 1)
)

subCtx, cancel := context.WithCancel(ctx.GoContext())
defer cancel()
time.AfterFunc(time.Minute, cancel)
for i := 0; i < numTxs; i++ {
wg.Add(1)
go func(b *share.Blob) {
defer wg.Done()
_, err := txClient.SubmitPayForBlob(subCtx, []*share.Blob{b}, user.SetGasLimitAndGasPrice(500_000, appconsts.DefaultMinGasPrice))
if err != nil && !errors.Is(err, context.Canceled) {
// only catch the first error
select {
case errCh <- err:
cancel()
default:
}
subCtx, cancel := context.WithCancel(ctx.GoContext())
defer cancel()
time.AfterFunc(time.Minute, cancel)
for i := 0; i < numTxs; i++ {
wg.Add(1)
go func(b *share.Blob) {
defer wg.Done()
_, err := txClient.SubmitPayForBlob(subCtx, []*share.Blob{b}, user.SetGasLimitAndGasPrice(500_000, appconsts.DefaultMinGasPrice))
if err != nil && !errors.Is(err, context.Canceled) {
// only catch the first error
select {
case errCh <- err:
cancel()
default:
}
}
}(blobs[i])
}
}(blobs[i])
}
wg.Wait()

select {
case err := <-errCh:
require.NoError(t, err)
default:
wg.Wait()
select {
case err := <-errCh:
require.NoError(t, err)
default:
}
})
}
}
3 changes: 3 additions & 0 deletions pkg/user/tx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon
return nil, client.handleEvictions(txHash)
default:
client.deleteFromTxTracker(txHash)
if ctx.Err() != nil {
return nil, ctx.Err()
}
return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash)
}
}
Expand Down
Loading