-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upstream: "core/rawdb: freezer batch write" #1908
Changes from 5 commits
794c613
f63cf5e
b668875
5e9c096
5c37351
62ca7b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -213,8 +213,7 @@ type BlockChain struct { | |
processor Processor // Block transaction processor interface | ||
vmConfig vm.Config | ||
|
||
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. | ||
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. | ||
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. | ||
} | ||
|
||
// NewBlockChain returns a fully initialised block chain using information | ||
|
@@ -1123,27 +1122,6 @@ const ( | |
SideStatTy | ||
) | ||
|
||
// truncateAncient rewinds the blockchain to the specified header and deletes all | ||
// data in the ancient store that exceeds the specified header. | ||
func (bc *BlockChain) truncateAncient(head uint64) error { | ||
frozen, err := bc.db.Ancients() | ||
if err != nil { | ||
return err | ||
} | ||
// Short circuit if there is no data to truncate in ancient store. | ||
if frozen <= head+1 { | ||
return nil | ||
} | ||
// Truncate all the data in the freezer beyond the specified head | ||
if err := bc.db.TruncateAncients(head + 1); err != nil { | ||
return err | ||
} | ||
bc.purge() | ||
|
||
log.Info("Rewind ancient data", "number", head) | ||
return nil | ||
} | ||
|
||
// numberHash is just a container for a number and a hash, to represent a block | ||
type numberHash struct { | ||
number uint64 | ||
|
@@ -1182,8 +1160,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ | |
var ( | ||
stats = struct{ processed, ignored int32 }{} | ||
start = time.Now() | ||
size = 0 | ||
size = int64(0) | ||
) | ||
|
||
// updateHead updates the head fast sync block if the inserted blocks are better | ||
// and returns an indicator whether the inserted blocks are canonical. | ||
updateHead := func(head *types.Block) bool { | ||
|
@@ -1204,120 +1183,116 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ | |
} | ||
return false | ||
} | ||
|
||
// writeAncient writes blockchain and corresponding receipt chain into ancient store. | ||
// | ||
// this function only accepts canonical chain data. All side chain will be reverted | ||
// eventually. | ||
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { | ||
var ( | ||
previous = bc.CurrentFastBlock() | ||
batch = bc.db.NewBatch() | ||
) | ||
// If any error occurs before updating the head or we are inserting a side chain, | ||
// all the data written this time wll be rolled back. | ||
defer func() { | ||
if previous != nil { | ||
if err := bc.truncateAncient(previous.NumberU64()); err != nil { | ||
log.Crit("Truncate ancient store failed", "err", err) | ||
} | ||
} | ||
}() | ||
var deleted []*numberHash | ||
for i, block := range blockChain { | ||
// Short circuit insertion if shutting down or processing failed | ||
if bc.insertStopped() { | ||
return 0, errInsertionInterrupted | ||
} | ||
// Short circuit insertion if it is required(used in testing only) | ||
if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) { | ||
return i, errors.New("insertion is terminated for testing purpose") | ||
} | ||
// Short circuit if the owner header is unknown | ||
if !bc.HasHeader(block.Hash(), block.NumberU64()) { | ||
return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4]) | ||
} | ||
if block.NumberU64() == 1 { | ||
// Make sure to write the genesis into the freezer | ||
if frozen, _ := bc.db.Ancients(); frozen == 0 { | ||
h := rawdb.ReadCanonicalHash(bc.db, 0) | ||
b := rawdb.ReadBlock(bc.db, h, 0) | ||
size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0)) | ||
log.Info("Wrote genesis to ancients") | ||
first := blockChain[0] | ||
last := blockChain[len(blockChain)-1] | ||
|
||
// Ensure genesis is in ancients. | ||
if first.NumberU64() == 1 { | ||
if frozen, _ := bc.db.Ancients(); frozen == 0 { | ||
b := bc.genesisBlock | ||
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, big.NewInt(1)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hbandura Is there an issue for this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is now #1918 |
||
size += writeSize | ||
if err != nil { | ||
log.Error("Error writing genesis to ancients", "err", err) | ||
return 0, err | ||
} | ||
log.Info("Wrote genesis to ancients") | ||
} | ||
// Flush data into ancient database. | ||
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64())) | ||
|
||
// Write tx indices if any condition is satisfied: | ||
// * If user requires to reserve all tx indices(txlookuplimit=0) | ||
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) | ||
// * If block number is large enough to be regarded as a recent block | ||
// It means blocks below the ancientLimit-txlookupLimit won't be indexed. | ||
// | ||
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with | ||
// an external ancient database, during the setup, blockchain will start | ||
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) | ||
// range. In this case, all tx indices of newly imported blocks should be | ||
// generated. | ||
} | ||
// Before writing the blocks to the ancients, we need to ensure that | ||
// they correspond to the what the headerchain 'expects'. | ||
// We only check the last block/header, since it's a contiguous chain. | ||
if !bc.HasHeader(last.Hash(), last.NumberU64()) { | ||
return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4]) | ||
} | ||
|
||
// Write all chain data to ancients. | ||
td := bc.GetTd(first.Hash(), first.NumberU64()) | ||
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td) | ||
size += writeSize | ||
if err != nil { | ||
log.Error("Error importing chain data to ancients", "err", err) | ||
return 0, err | ||
} | ||
|
||
// Write tx indices if any condition is satisfied: | ||
// * If user requires to reserve all tx indices(txlookuplimit=0) | ||
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit) | ||
// * If block number is large enough to be regarded as a recent block | ||
// It means blocks below the ancientLimit-txlookupLimit won't be indexed. | ||
// | ||
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with | ||
// an external ancient database, during the setup, blockchain will start | ||
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients) | ||
// range. In this case, all tx indices of newly imported blocks should be | ||
// generated. | ||
var batch = bc.db.NewBatch() | ||
for _, block := range blockChain { | ||
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { | ||
rawdb.WriteTxLookupEntriesByBlock(batch, block) | ||
} else if rawdb.ReadTxIndexTail(bc.db) != nil { | ||
rawdb.WriteTxLookupEntriesByBlock(batch, block) | ||
} | ||
stats.processed++ | ||
} | ||
|
||
// Flush all tx-lookup index data. | ||
size += batch.ValueSize() | ||
size += int64(batch.ValueSize()) | ||
if err := batch.Write(); err != nil { | ||
// The tx index data could not be written. | ||
// Roll back the ancient store update. | ||
fastBlock := bc.CurrentFastBlock().NumberU64() | ||
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil { | ||
log.Error("Can't truncate ancient store after failed insert", "err", err) | ||
} | ||
return 0, err | ||
} | ||
batch.Reset() | ||
|
||
// Sync the ancient store explicitly to ensure all data has been flushed to disk. | ||
if err := bc.db.Sync(); err != nil { | ||
return 0, err | ||
} | ||
if !updateHead(blockChain[len(blockChain)-1]) { | ||
return 0, errors.New("side blocks can't be accepted as the ancient chain data") | ||
} | ||
previous = nil // disable rollback explicitly | ||
|
||
// Wipe out canonical block data. | ||
for _, nh := range deleted { | ||
rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number) | ||
rawdb.DeleteCanonicalHash(batch, nh.number) | ||
} | ||
for _, block := range blockChain { | ||
// Always keep genesis block in active database. | ||
if block.NumberU64() != 0 { | ||
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) | ||
rawdb.DeleteCanonicalHash(batch, block.NumberU64()) | ||
// Update the current fast block because all block data is now present in DB. | ||
previousFastBlock := bc.CurrentFastBlock().NumberU64() | ||
if !updateHead(blockChain[len(blockChain)-1]) { | ||
// We end up here if the header chain has reorg'ed, and the blocks/receipts | ||
// don't match the canonical chain. | ||
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil { | ||
log.Error("Can't truncate ancient store after failed insert", "err", err) | ||
} | ||
return 0, errSideChainReceipts | ||
} | ||
if err := batch.Write(); err != nil { | ||
return 0, err | ||
} | ||
batch.Reset() | ||
|
||
// Wipe out side chain too. | ||
for _, nh := range deleted { | ||
for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) { | ||
rawdb.DeleteBlock(batch, hash, nh.number) | ||
// Delete block data from the main database. | ||
batch.Reset() | ||
canonHashes := make(map[common.Hash]struct{}) | ||
for _, block := range blockChain { | ||
canonHashes[block.Hash()] = struct{}{} | ||
if block.NumberU64() == 0 { | ||
continue | ||
} | ||
rawdb.DeleteCanonicalHash(batch, block.NumberU64()) | ||
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64()) | ||
} | ||
for _, block := range blockChain { | ||
// Always keep genesis block in active database. | ||
if block.NumberU64() != 0 { | ||
for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) { | ||
rawdb.DeleteBlock(batch, hash, block.NumberU64()) | ||
} | ||
// Delete side chain hash-to-number mappings. | ||
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) { | ||
if _, canon := canonHashes[nh.Hash]; !canon { | ||
rawdb.DeleteHeader(batch, nh.Hash, nh.Number) | ||
} | ||
} | ||
if err := batch.Write(); err != nil { | ||
return 0, err | ||
} | ||
return 0, nil | ||
} | ||
|
||
// writeLive writes blockchain and corresponding receipt chain into active store. | ||
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { | ||
skipPresenceCheck := false | ||
|
@@ -1355,7 +1330,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ | |
if err := batch.Write(); err != nil { | ||
return 0, err | ||
} | ||
size += batch.ValueSize() | ||
size += int64(batch.ValueSize()) | ||
batch.Reset() | ||
} | ||
stats.processed++ | ||
|
@@ -1364,14 +1339,15 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ | |
// we can ensure all components of body is completed(body, receipts, | ||
// tx indexes) | ||
if batch.ValueSize() > 0 { | ||
size += batch.ValueSize() | ||
size += int64(batch.ValueSize()) | ||
if err := batch.Write(); err != nil { | ||
return 0, err | ||
} | ||
} | ||
updateHead(blockChain[len(blockChain)-1]) | ||
return 0, nil | ||
} | ||
|
||
// Write downloaded chain data and corresponding receipt chain data | ||
if len(ancientBlocks) > 0 { | ||
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR makes significant changes to
InsertReceiptChain
but it is also missing a test that was included in the upstream PRTestInsertReceiptChainRollback
. Looking at the results of coverage I can see that none of the error cases ofinsertReceiptChain
are executed so I think it would be good to include a test here that exercises some of the error cases forInsertReceiptChain
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new test uses sidechains, which is why we can't run it. But the old test uses a test field/test mechanism "testInsert" to provoke the tests to be tested, which was removed in this refactor. The errors are tested using sidechains, which we don't have.
Re-adding the mechanism to make it testeable was probably a change that should be made in a different PR for the same reason you mentioned the redundant parameter one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#1917