diff --git a/syncer/syncer.go b/syncer/syncer.go index 55ec6de554..5c20b52624 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -188,7 +188,7 @@ func (s *syncer) Sync(callback func(*types.FullBlock) bool) error { } // fetch block from the peer - lastNumber, shouldTerminate, err := s.bulkSyncWithPeer(bestPeer.ID, callback) + lastNumber, shouldTerminate, err := s.bulkSyncWithPeer(bestPeer.ID, bestPeer.Number, callback) if err != nil { s.logger.Warn("failed to complete bulk sync with peer, try to next one", "peer ID", "error", bestPeer.ID, err) } @@ -209,7 +209,8 @@ func (s *syncer) Sync(callback func(*types.FullBlock) bool) error { } // bulkSyncWithPeer syncs block with a given peer -func (s *syncer) bulkSyncWithPeer(peerID peer.ID, newBlockCallback func(*types.FullBlock) bool) (uint64, bool, error) { +func (s *syncer) bulkSyncWithPeer(peerID peer.ID, peerLatestBlock uint64, + newBlockCallback func(*types.FullBlock) bool) (uint64, bool, error) { localLatest := s.blockchain.Header().Number shouldTerminate := false @@ -218,11 +219,20 @@ func (s *syncer) bulkSyncWithPeer(peerID peer.ID, newBlockCallback func(*types.F return 0, false, err } + // Create a blockchain subscription for the sync progression and start tracking + subscription := s.blockchain.SubscribeEvents() + s.syncProgression.StartProgression(localLatest+1, subscription) + s.syncProgression.UpdateHighestProgression(peerLatestBlock) + defer func() { err := s.syncPeerClient.CloseStream(peerID) if err != nil { s.logger.Error("Failed to close stream: ", err) } + + // Stop monitoring the sync progression upon exit + s.syncProgression.StopProgression() + s.blockchain.UnsubscribeEvents(subscription) }() var lastReceivedNumber uint64 diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index c7720c762e..9f87643169 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -546,11 +546,9 @@ func TestSync(t *testing.T) { return &types.FullBlock{Block: b}, nil } }, - blocks: blocks[:10], - //nolint:godox - // TODO: need to fix implementation? (to be fixed in EVM-529) - progressionStart: 0, - progressionHighest: 0, + blocks: blocks[:10], + progressionStart: 1, + progressionHighest: 10, err: nil, }, { @@ -593,11 +591,9 @@ func TestSync(t *testing.T) { return &types.FullBlock{Block: b}, nil } }, - blocks: blocks[:10], - //nolint:godox - // TODO: need to fix implementation? (to be fixed in EVM-529) - progressionStart: 0, - progressionHighest: 0, + blocks: blocks[:10], + progressionStart: 1, + progressionHighest: 10, err: nil, }, } @@ -853,7 +849,7 @@ func Test_bulkSyncWithPeer(t *testing.T) { ) ) - lastSynced, shouldTerminate, err := syncer.bulkSyncWithPeer(peer.ID("X"), test.blockCallback) + lastSynced, shouldTerminate, err := syncer.bulkSyncWithPeer(peer.ID("X"), test.lastSyncedBlockNumber, test.blockCallback) assert.Equal(t, test.lastSyncedBlockNumber, lastSynced) assert.Equal(t, test.shouldTerminate, shouldTerminate)