Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Track progression while syncing blocks (#1985)
Browse files Browse the repository at this point in the history
  • Loading branch information
stana-miric authored Oct 19, 2023
1 parent 72a7e3a commit 73469c6
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
14 changes: 12 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (s *syncer) Sync(callback func(*types.FullBlock) bool) error {
}

// fetch block from the peer
lastNumber, shouldTerminate, err := s.bulkSyncWithPeer(bestPeer.ID, callback)
lastNumber, shouldTerminate, err := s.bulkSyncWithPeer(bestPeer.ID, bestPeer.Number, callback)
if err != nil {
s.logger.Warn("failed to complete bulk sync with peer, try to next one", "peer ID", "error", bestPeer.ID, err)
}
Expand All @@ -209,7 +209,8 @@ func (s *syncer) Sync(callback func(*types.FullBlock) bool) error {
}

// bulkSyncWithPeer syncs block with a given peer
func (s *syncer) bulkSyncWithPeer(peerID peer.ID, newBlockCallback func(*types.FullBlock) bool) (uint64, bool, error) {
func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64,
newBlockCallback func(*types.FullBlock) bool) (uint64, bool, error) {
localLatest := s.blockchain.Header().Number
shouldTerminate := false

Expand All @@ -218,11 +219,20 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, newBlockCallback func(*types.F
return 0, false, err
}

// Create a blockchain subscription for the sync progression and start tracking
subscription := s.blockchain.SubscribeEvents()
s.syncProgression.StartProgression(localLatest+1, subscription)
s.syncProgression.UpdateHighestProgression(peerLatestBlock)

defer func() {
err := s.syncPeerClient.CloseStream(peerID)
if err != nil {
s.logger.Error("Failed to close stream: ", err)
}

// Stop monitoring the sync progression upon exit
s.syncProgression.StopProgression()
s.blockchain.UnsubscribeEvents(subscription)
}()

var lastReceivedNumber uint64
Expand Down
18 changes: 7 additions & 11 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,9 @@ func TestSync(t *testing.T) {
return &types.FullBlock{Block: b}, nil
}
},
blocks: blocks[:10],
//nolint:godox
// TODO: need to fix implementation? (to be fixed in EVM-529)
progressionStart: 0,
progressionHighest: 0,
blocks: blocks[:10],
progressionStart: 1,
progressionHighest: 10,
err: nil,
},
{
Expand Down Expand Up @@ -593,11 +591,9 @@ func TestSync(t *testing.T) {
return &types.FullBlock{Block: b}, nil
}
},
blocks: blocks[:10],
//nolint:godox
// TODO: need to fix implementation? (to be fixed in EVM-529)
progressionStart: 0,
progressionHighest: 0,
blocks: blocks[:10],
progressionStart: 1,
progressionHighest: 10,
err: nil,
},
}
Expand Down Expand Up @@ -853,7 +849,7 @@ func Test_bulkSyncWithPeer(t *testing.T) {
)
)

lastSynced, shouldTerminate, err := syncer.bulkSyncWithPeer(peer.ID("X"), test.blockCallback)
lastSynced, shouldTerminate, err := syncer.bulkSyncWithPeer(peer.ID("X"), test.lastSyncedBlockNumber, test.blockCallback)

assert.Equal(t, test.lastSyncedBlockNumber, lastSynced)
assert.Equal(t, test.shouldTerminate, shouldTerminate)
Expand Down

0 comments on commit 73469c6

Please sign in to comment.