Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream: "core/rawdb: freezer batch write" #1908

Merged
merged 6 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 80 additions & 104 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -1123,27 +1122,6 @@ const (
SideStatTy
)

// truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error {
frozen, err := bc.db.Ancients()
if err != nil {
return err
}
// Short circuit if there is no data to truncate in ancient store.
if frozen <= head+1 {
return nil
}
// Truncate all the data in the freezer beyond the specified head
if err := bc.db.TruncateAncients(head + 1); err != nil {
return err
}
bc.purge()

log.Info("Rewind ancient data", "number", head)
return nil
}

// numberHash is just a container for a number and a hash, to represent a block
type numberHash struct {
number uint64
Expand Down Expand Up @@ -1182,8 +1160,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
size = 0
size = int64(0)
)

// updateHead updates the head fast sync block if the inserted blocks are better
// and returns an indicator whether the inserted blocks are canonical.
updateHead := func(head *types.Block) bool {
Expand All @@ -1204,120 +1183,116 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
return false
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR makes significant changes to InsertReceiptChain but it is also missing a test that was included in the upstream PR TestInsertReceiptChainRollback. Looking at the results of coverage I can see that none of the error cases of insertReceiptChain are executed so I think it would be good to include a test here that exercises some of the error cases for InsertReceiptChain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new test uses sidechains, which is why we can't run it. But the old test uses a test field/test mechanism "testInsert" to provoke the tests to be tested, which was removed in this refactor. The errors are tested using sidechains, which we don't have.

Re-adding the mechanism to make it testeable was probably a change that should be made in a different PR for the same reason you mentioned the redundant parameter one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// writeAncient writes blockchain and corresponding receipt chain into ancient store.
//
// this function only accepts canonical chain data. All side chain will be reverted
// eventually.
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
var (
previous = bc.CurrentFastBlock()
batch = bc.db.NewBatch()
)
// If any error occurs before updating the head or we are inserting a side chain,
// all the data written this time wll be rolled back.
defer func() {
if previous != nil {
if err := bc.truncateAncient(previous.NumberU64()); err != nil {
log.Crit("Truncate ancient store failed", "err", err)
}
}
}()
var deleted []*numberHash
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit insertion if it is required(used in testing only)
if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
return i, errors.New("insertion is terminated for testing purpose")
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4])
}
if block.NumberU64() == 1 {
// Make sure to write the genesis into the freezer
if frozen, _ := bc.db.Ancients(); frozen == 0 {
h := rawdb.ReadCanonicalHash(bc.db, 0)
b := rawdb.ReadBlock(bc.db, h, 0)
size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0))
log.Info("Wrote genesis to ancients")
first := blockChain[0]
last := blockChain[len(blockChain)-1]

// Ensure genesis is in ancients.
if first.NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 {
b := bc.genesisBlock
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, big.NewInt(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that WriteAncientBlocks is ignoring the td parameter I think we should just remove it from the parameter list, otherwise providing values here is quite confusing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hbandura Is there an issue for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is now #1918

size += writeSize
if err != nil {
log.Error("Error writing genesis to ancients", "err", err)
return 0, err
}
log.Info("Wrote genesis to ancients")
}
// Flush data into ancient database.
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))

// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
}
// Before writing the blocks to the ancients, we need to ensure that
// they correspond to the what the headerchain 'expects'.
// We only check the last block/header, since it's a contiguous chain.
if !bc.HasHeader(last.Hash(), last.NumberU64()) {
return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4])
}

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
size += writeSize
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
}

// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
var batch = bc.db.NewBatch()
for _, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
}
stats.processed++
}

// Flush all tx-lookup index data.
size += batch.ValueSize()
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
// The tx index data could not be written.
// Roll back the ancient store update.
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
}
batch.Reset()

// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
return 0, err
}
if !updateHead(blockChain[len(blockChain)-1]) {
return 0, errors.New("side blocks can't be accepted as the ancient chain data")
}
previous = nil // disable rollback explicitly

// Wipe out canonical block data.
for _, nh := range deleted {
rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number)
rawdb.DeleteCanonicalHash(batch, nh.number)
}
for _, block := range blockChain {
// Always keep genesis block in active database.
if block.NumberU64() != 0 {
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
// Update the current fast block because all block data is now present in DB.
previousFastBlock := bc.CurrentFastBlock().NumberU64()
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
}
if err := batch.Write(); err != nil {
return 0, err
}
batch.Reset()

// Wipe out side chain too.
for _, nh := range deleted {
for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
rawdb.DeleteBlock(batch, hash, nh.number)
// Delete block data from the main database.
batch.Reset()
canonHashes := make(map[common.Hash]struct{})
for _, block := range blockChain {
canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 {
continue
}
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
}
for _, block := range blockChain {
// Always keep genesis block in active database.
if block.NumberU64() != 0 {
for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
rawdb.DeleteBlock(batch, hash, block.NumberU64())
}
// Delete side chain hash-to-number mappings.
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
if _, canon := canonHashes[nh.Hash]; !canon {
rawdb.DeleteHeader(batch, nh.Hash, nh.Number)
}
}
if err := batch.Write(); err != nil {
return 0, err
}
return 0, nil
}

// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
skipPresenceCheck := false
Expand Down Expand Up @@ -1355,7 +1330,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := batch.Write(); err != nil {
return 0, err
}
size += batch.ValueSize()
size += int64(batch.ValueSize())
batch.Reset()
}
stats.processed++
Expand All @@ -1364,14 +1339,15 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() > 0 {
size += batch.ValueSize()
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
return 0, err
}
}
updateHead(blockChain[len(blockChain)-1])
return 0, nil
}

// Write downloaded chain data and corresponding receipt chain data
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
Expand Down
77 changes: 19 additions & 58 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ func TestFastVsFullChains(t *testing.T) {
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}

// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
Expand All @@ -682,10 +683,27 @@ func TestFastVsFullChains(t *testing.T) {
} else if types.DeriveSha(fblock.Transactions(), trie.NewStackTrie(nil)) != types.DeriveSha(arblock.Transactions(), trie.NewStackTrie(nil)) || types.DeriveSha(anblock.Transactions(), trie.NewStackTrie(nil)) != types.DeriveSha(arblock.Transactions(), trie.NewStackTrie(nil)) {
t.Errorf("block #%d [%x]: transactions mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Transactions(), anblock.Transactions(), arblock.Transactions())
}
if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) {

// Check receipts.
freceipts := rawdb.ReadReceipts(fastDb, hash, num, fast.Config())
anreceipts := rawdb.ReadReceipts(ancientDb, hash, num, fast.Config())
areceipts := rawdb.ReadReceipts(archiveDb, hash, num, fast.Config())
if types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) {
t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts)
}

// Check that hash-to-number mappings are present in all databases.
if m := rawdb.ReadHeaderNumber(fastDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in fastdb: %v", num, hash, m)
}
if m := rawdb.ReadHeaderNumber(ancientDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in ancientdb: %v", num, hash, m)
}
if m := rawdb.ReadHeaderNumber(archiveDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in archivedb: %v", num, hash, m)
}
}

// Check that the canonical chains are the same between the databases
for i := 0; i < len(blocks)+1; i++ {
if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash {
Expand Down Expand Up @@ -1630,63 +1648,6 @@ func TestBlockchainRecovery(t *testing.T) {
}
}

func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
// Configure and generate a sample block chain
var (
gendb = rawdb.NewMemoryDatabase()
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000)
gspec = &Genesis{Config: params.IstanbulTestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
genesis = gspec.MustCommit(gendb)
)
height := uint64(1024)
blocks, receipts := GenerateChain(gspec.Config, genesis, mockEngine.NewFaker(), gendb, int(height), nil)

// Import the chain as a ancient-first node and ensure all pointers are updated
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
gspec.MustCommit(ancientDb)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, mockEngine.NewFaker(), vm.Config{}, nil, nil)
defer ancient.Stop()

headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := ancient.InsertHeaderChain(headers, 1, true); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
// Abort ancient receipt chain insertion deliberately
ancient.terminateInsert = func(hash common.Hash, number uint64) bool {
return number == blocks[len(blocks)/2].NumberU64()
}
previousFastBlock := ancient.CurrentFastBlock()
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() {
t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64())
}
if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 {
t.Fatalf("failed to truncate ancient data")
}
ancient.terminateInsert = nil
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() {
t.Fatalf("failed to insert ancient recept chain after rollback")
}
}

// Tests that importing a very large side fork, which is larger than the canon chain,
// but where the difficulty per block is kept low: this means that it will not
// overtake the 'canon' chain until after it's passed canon by about 200 blocks.
Expand Down
2 changes: 2 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (

// ErrNoGenesis is returned when there is no Genesis Block.
ErrNoGenesis = errors.New("genesis not found in chain")

errSideChainReceipts = errors.New("side blocks can't be accepted as ancient chain data")
)

// List of evm-call-message pre-checking errors. All state transition messages will
Expand Down
Loading