From 4c81c2559894133de8a8354b390591ed544cba48 Mon Sep 17 00:00:00 2001 From: ptrus Date: Wed, 27 Nov 2024 21:10:59 +0100 Subject: [PATCH] storage: split runtime events related accounts into a separate table --- analyzer/aggregate_stats/aggregate_stats.go | 6 +- analyzer/consensus/consensus.go | 5 +- analyzer/consensus/messages.go | 12 +- analyzer/queries/queries.go | 14 +- analyzer/runtime/extract.go | 152 ++++++++++-------- analyzer/runtime/runtime.go | 12 +- analyzer/runtime/runtime_test.go | 4 +- analyzer/runtime/visitors.go | 44 ++--- analyzer/util/addresses/registration.go | 72 +-------- storage/client/client.go | 7 +- storage/client/queries/queries.go | 8 +- storage/migrations/01_runtimes.up.sql | 47 ++++-- .../07_runtime_events_related_accounts.up.sql | 25 +++ storage/oasis/nodeapi/api.go | 6 +- storage/oasis/nodeapi/file/runtime.go | 32 +++- storage/oasis/nodeapi/universal_runtime.go | 5 +- 16 files changed, 256 insertions(+), 195 deletions(-) create mode 100644 storage/migrations/07_runtime_events_related_accounts.up.sql diff --git a/analyzer/aggregate_stats/aggregate_stats.go b/analyzer/aggregate_stats/aggregate_stats.go index af1fa8379..885ed23e2 100644 --- a/analyzer/aggregate_stats/aggregate_stats.go +++ b/analyzer/aggregate_stats/aggregate_stats.go @@ -202,14 +202,14 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) { switch { case err == nil: // Continues below. - case errors.Is(pgx.ErrNoRows, err): + case errors.Is(err, pgx.ErrNoRows): // No stats yet. Start at the earliest indexed block. var earliestBlockTs *time.Time earliestBlockTs, err = a.earliestBlockTs(statCtx, statsComputation.layer) switch { case err == nil: latestComputed = floorWindow(earliestBlockTs) - case errors.Is(pgx.ErrNoRows, err): + case errors.Is(err, pgx.ErrNoRows): // No data log a debug only log. logger.Debug("no stats available yet, skipping iteration") cancel() @@ -230,7 +230,7 @@ func (a *aggregateStatsAnalyzer) aggregateStatsWorker(ctx context.Context) { switch { case err == nil: // Continues below. - case errors.Is(pgx.ErrNoRows, err): + case errors.Is(err, pgx.ErrNoRows): logger.Debug("no stats available yet, skipping iteration") cancel() continue diff --git a/analyzer/consensus/consensus.go b/analyzer/consensus/consensus.go index 5b3abb4a5..ecbc52ccb 100644 --- a/analyzer/consensus/consensus.go +++ b/analyzer/consensus/consensus.go @@ -797,12 +797,15 @@ func (m *processor) queueRootHashMessageUpserts(batch *storage.QueryBatch, data // The runtime has its own staking account, which is what // performs these actions, e.g. when sending or receiving the // consensus token. Register that as related to the message. - if _, err := addresses.RegisterRelatedRuntimeAddress(messageData.addressPreimages, messageData.relatedAddresses, event.RoothashExecutorCommitted.RuntimeID); err != nil { + if runtimeAddr, err := addresses.RegisterRuntimeAddress(messageData.addressPreimages, event.RoothashExecutorCommitted.RuntimeID); err != nil { logger.Info("register runtime address failed", "runtime_id", event.RoothashExecutorCommitted.RuntimeID, "err", err, ) + } else { + messageData.relatedAddresses[runtimeAddr] = struct{}{} } + for addr, preimageData := range messageData.addressPreimages { batch.Queue(queries.AddressPreimageInsert, addr, diff --git a/analyzer/consensus/messages.go b/analyzer/consensus/messages.go index 5a9c43d39..4c1e0cfa8 100644 --- a/analyzer/consensus/messages.go +++ b/analyzer/consensus/messages.go @@ -35,13 +35,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData { break } messageData.body = body - _, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Transfer.To) + to, err := addresses.FromOCSAddress(m.Staking.Transfer.To) if err != nil { logger.Info("register related address 'to' failed", "message_type", messageData.messageType, "err", err, ) } + messageData.relatedAddresses[to] = struct{}{} case m.Staking.Withdraw != nil: messageData.messageType = apiTypes.RoothashMessageTypeStakingWithdraw body, err := json.Marshal(m.Staking.Withdraw) @@ -53,13 +54,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData { break } messageData.body = body - _, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.Withdraw.From) + from, err := addresses.FromOCSAddress(m.Staking.Withdraw.From) if err != nil { logger.Info("register related address 'from' failed", "message_type", messageData.messageType, "err", err, ) } + messageData.relatedAddresses[from] = struct{}{} case m.Staking.AddEscrow != nil: messageData.messageType = apiTypes.RoothashMessageTypeStakingAddEscrow body, err := json.Marshal(m.Staking.AddEscrow) @@ -71,13 +73,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData { break } messageData.body = body - _, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.AddEscrow.Account) + account, err := addresses.FromOCSAddress(m.Staking.AddEscrow.Account) if err != nil { logger.Info("register related address 'account' failed", "message_type", messageData.messageType, "err", err, ) } + messageData.relatedAddresses[account] = struct{}{} case m.Staking.ReclaimEscrow != nil: messageData.messageType = apiTypes.RoothashMessageTypeStakingReclaimEscrow body, err := json.Marshal(m.Staking.ReclaimEscrow) @@ -89,13 +92,14 @@ func extractMessageData(logger *log.Logger, m message.Message) MessageData { break } messageData.body = body - _, err = addresses.RegisterRelatedOCSAddress(messageData.relatedAddresses, m.Staking.ReclaimEscrow.Account) + account, err := addresses.FromOCSAddress(m.Staking.ReclaimEscrow.Account) if err != nil { logger.Info("register related address 'account' failed", "message_type", messageData.messageType, "err", err, ) } + messageData.relatedAddresses[account] = struct{}{} default: logger.Info("unhandled staking message", "staking_message", m.Staking, diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index de5473a93..1d4fcc86f 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -515,10 +515,10 @@ var ( SELECT epochs.id, epochs.start_height, prev_epoch.validators FROM chain.epochs as epochs LEFT JOIN history.validators as history - ON epochs.id = history.epoch + ON epochs.id = history.epoch LEFT JOIN chain.epochs as prev_epoch ON epochs.id = prev_epoch.id + 1 - WHERE + WHERE history.epoch IS NULL AND epochs.id >= $1 ORDER BY epochs.id @@ -531,7 +531,7 @@ var ( ValidatorStakingRewardUpdate = ` UPDATE history.validators SET staking_rewards = $3 - WHERE + WHERE id = $1 AND epoch = $2` @@ -545,7 +545,7 @@ var ( FROM chain.epochs AS epochs LEFT JOIN history.validators AS history ON epochs.id = history.epoch - WHERE + WHERE history.epoch IS NULL AND epochs.id >= $1` @@ -636,9 +636,13 @@ var ( tx_hash = $2` RuntimeEventInsert = ` - INSERT INTO chain.runtime_events (runtime, round, tx_index, tx_hash, tx_eth_hash, timestamp, type, body, related_accounts, evm_log_name, evm_log_params, evm_log_signature) + INSERT INTO chain.runtime_events (runtime, round, event_index, tx_index, tx_hash, tx_eth_hash, timestamp, type, body, evm_log_name, evm_log_params, evm_log_signature) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)` + RuntimeEventRelatedAccountsInsert = ` + INSERT INTO chain.runtime_events_related_accounts (runtime, round, event_index, related_account) + SELECT $1, $2, $3, unnest($4::text[])` + // We use COALESCE here to avoid overwriting existing data with null values. RuntimeEventEvmParsedFieldsUpdate = ` UPDATE chain.runtime_events diff --git a/analyzer/runtime/extract.go b/analyzer/runtime/extract.go index 28b710f0b..cfdaa3478 100644 --- a/analyzer/runtime/extract.go +++ b/analyzer/runtime/extract.go @@ -29,7 +29,6 @@ import ( "github.com/oasisprotocol/nexus/analyzer/runtime/encryption" evm "github.com/oasisprotocol/nexus/analyzer/runtime/evm" uncategorized "github.com/oasisprotocol/nexus/analyzer/uncategorized" - "github.com/oasisprotocol/nexus/analyzer/util" "github.com/oasisprotocol/nexus/analyzer/util/addresses" "github.com/oasisprotocol/nexus/analyzer/util/eth" apiTypes "github.com/oasisprotocol/nexus/api/v1/types" @@ -91,6 +90,7 @@ type TxError struct { type EventBody interface{} type EventData struct { + EventIndex int // Index of the event in the block, as returned by the GetEventsRaw RPC. TxIndex *int // nil for non-tx events TxHash *string // nil for non-tx events TxEthHash *string // nil for non-evm-tx events @@ -230,25 +230,23 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime SwapSyncs: map[apiTypes.Address]*PossibleSwapSync{}, } - // Extract info from non-tx events. - rawNonTxEvents := []nodeapi.RuntimeEvent{} - for _, e := range rawEvents { - if e.TxHash.String() == util.ZeroTxHash { - rawNonTxEvents = append(rawNonTxEvents, e) - } - } - nonTxEvents, err := extractEvents(&blockData, map[apiTypes.Address]struct{}{}, rawNonTxEvents) + extractedEvents, err := extractEvents(&blockData, rawEvents) if err != nil { return nil, fmt.Errorf("extract non-tx events: %w", err) } - blockData.EventData = nonTxEvents + for _, event := range extractedEvents { + if event.TxIndex == nil { + blockData.EventData = append(blockData.EventData, event) + } + } // Extract info from transactions. for txIndex, txr := range txrs { txr := txr // For safe usage of `&txr` inside this long loop. var blockTransactionData BlockTransactionData blockTransactionData.Index = txIndex - blockTransactionData.Hash = txr.Tx.Hash().Hex() + txHash := txr.Tx.Hash() + blockTransactionData.Hash = txHash.Hex() if len(txr.Tx.AuthProofs) == 1 && txr.Tx.AuthProofs[0].Module == "evm.ethereum.v0" { ethHash := hex.EncodeToString(eth.Keccak256(txr.Tx.Body)) blockTransactionData.EthHash = ðHash @@ -275,10 +273,11 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime si := si // we have no dangerous uses of &si, but capture the variable just in case (and to make the linter happy) var blockTransactionSignerData BlockTransactionSignerData blockTransactionSignerData.Index = j - addr, err1 := addresses.RegisterRelatedAddressSpec(blockData.AddressPreimages, blockTransactionData.RelatedAccountAddresses, &si.AddressSpec) + addr, err1 := addresses.RegisterAddressSpec(blockData.AddressPreimages, &si.AddressSpec) if err1 != nil { return nil, fmt.Errorf("tx %d signer %d visit address spec: %w", txIndex, j, err1) } + blockTransactionData.RelatedAccountAddresses[addr] = struct{}{} blockTransactionSignerData.Address = addr blockTransactionSignerData.Nonce = int(si.Nonce) blockTransactionData.SignerData = append(blockTransactionData.SignerData, &blockTransactionSignerData) @@ -321,9 +320,10 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime blockTransactionData.Body = body amount = body.Amount.Amount blockTransactionData.AmountSymbol = common.Ptr(stringifyDenomination(sdkPT, body.Amount.Denomination)) - if to, err = addresses.RegisterRelatedSdkAddress(blockTransactionData.RelatedAccountAddresses, &body.To); err != nil { + if to, err = addresses.FromSdkAddress(&body.To); err != nil { return fmt.Errorf("to: %w", err) } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} return nil }, ConsensusAccountsDeposit: func(body *consensusaccounts.Deposit) error { @@ -331,7 +331,7 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime amount = body.Amount.Amount blockTransactionData.AmountSymbol = common.Ptr(stringifyDenomination(sdkPT, body.Amount.Denomination)) if body.To != nil { - if to, err = addresses.RegisterRelatedSdkAddress(blockTransactionData.RelatedAccountAddresses, body.To); err != nil { + if to, err = addresses.FromSdkAddress(body.To); err != nil { return fmt.Errorf("to: %w", err) } } else { @@ -340,6 +340,7 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime // Ref: https://github.com/oasisprotocol/oasis-sdk/blob/runtime-sdk/v0.8.4/runtime-sdk/src/modules/consensus_accounts/mod.rs#L418 to = blockTransactionData.SignerData[0].Address } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} return nil }, ConsensusAccountsWithdraw: func(body *consensusaccounts.Withdraw) error { @@ -399,9 +400,10 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime // In Undelegate semantics, the inexistent `body.To` is implicitly the account that created this tx, i.e. the delegator R. // Ref: https://github.com/oasisprotocol/oasis-sdk/blob/eb97a8162f84ae81d11d805e6dceeeb016841c27/runtime-sdk/src/modules/consensus_accounts/mod.rs#L465-L465 // However, we instead expose `body.From` as the DB/API `to` for consistency with `Delegate`, and because it is more useful: the delegator R is already indexed in the tx sender field. - if to, err = addresses.RegisterRelatedSdkAddress(blockTransactionData.RelatedAccountAddresses, &body.From); err != nil { + if to, err = addresses.FromSdkAddress(&body.From); err != nil { return fmt.Errorf("from: %w", err) } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} // The `amount` (of tokens) is not contained in the body, only `shares` is. There isn't sufficient information // to convert `shares` to `amount` until the undelegation actually happens (= UndelegateDone event); in the meantime, // the validator's token pool might change, e.g. because of slashing. @@ -415,9 +417,10 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime if !txr.Result.IsUnknown() && txr.Result.IsSuccess() && len(*ok) == 20 { // Decode address of newly-created contract // todo: is this rigorous enough? - if to, err = addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, blockTransactionData.RelatedAccountAddresses, *ok); err != nil { + if to, err = addresses.RegisterEthAddress(blockData.AddressPreimages, *ok); err != nil { return fmt.Errorf("created contract: %w", err) } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} blockTransactionData.EVMContract = &evm.EVMContractData{ Address: to, CreationBytecode: body.InitCode, @@ -458,9 +461,10 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime EVMCall: func(body *sdkEVM.Call, ok *[]byte) error { blockTransactionData.Body = body amount = uncategorized.QuantityFromBytes(body.Value) - if to, err = addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, blockTransactionData.RelatedAccountAddresses, body.Address); err != nil { + if to, err = addresses.RegisterEthAddress(blockData.AddressPreimages, body.Address); err != nil { return fmt.Errorf("address: %w", err) } + blockTransactionData.RelatedAccountAddresses[to] = struct{}{} if evmEncrypted, failedCallResult, err2 := evm.EVMMaybeUnmarshalEncryptedData(body.Data, ok); err2 == nil { blockTransactionData.EVMEncrypted = evmEncrypted // For non-evm txs as well as older Sapphire txs, the outer CallResult may @@ -517,13 +521,17 @@ func ExtractRound(blockHeader nodeapi.RuntimeBlockHeader, txrs []nodeapi.Runtime } blockTransactionData.Amount = common.Ptr(common.BigIntFromQuantity(amount)) } - txEvents := make([]nodeapi.RuntimeEvent, len(txr.Events)) - for i, e := range txr.Events { - txEvents[i] = (nodeapi.RuntimeEvent)(*e) - } - extractedTxEvents, err := extractEvents(&blockData, blockTransactionData.RelatedAccountAddresses, txEvents) - if err != nil { - return nil, fmt.Errorf("tx %d: %w", txIndex, err) + + // Find extracted events for this tx. + var extractedTxEvents []*EventData + for _, event := range extractedEvents { + if event.TxIndex != nil && *event.TxIndex == txIndex { + extractedTxEvents = append(extractedTxEvents, event) + // Register related addresses found in the event for the transaction as well. + for addr := range event.RelatedAddresses { + blockTransactionData.RelatedAccountAddresses[addr] = struct{}{} + } + } } txGasUsed, foundGasUsedEvent := sumGasUsed(extractedTxEvents) // Populate eventData with tx-specific data. @@ -661,31 +669,33 @@ func tryParseErrorMessage(errorModule string, errorCode uint32, msg string) *str return &sanitizedMsg } -func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Address]struct{}, eventsRaw []nodeapi.RuntimeEvent) ([]*EventData, error) { //nolint:gocyclo +func extractEvents(blockData *BlockData, eventsRaw []nodeapi.RuntimeEvent) ([]*EventData, error) { //nolint:gocyclo extractedEvents := []*EventData{} if err := VisitSdkEvents(eventsRaw, &SdkEventHandler{ - Core: func(event *core.Event) error { + Core: func(event *core.Event, eventIdx int) error { if event.GasUsed != nil { eventData := EventData{ - Type: apiTypes.RuntimeEventTypeCoreGasUsed, - Body: event.GasUsed, - WithScope: ScopedSdkEvent{Core: event}, + EventIndex: eventIdx, + Type: apiTypes.RuntimeEventTypeCoreGasUsed, + Body: event.GasUsed, + WithScope: ScopedSdkEvent{Core: event}, } extractedEvents = append(extractedEvents, &eventData) } return nil }, - Accounts: func(event *accounts.Event) error { + Accounts: func(event *accounts.Event, eventIdx int) error { if event.Transfer != nil { - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Transfer.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.Transfer.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Transfer.To) + toAddr, err1 := addresses.FromSdkAddress(&event.Transfer.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ + EventIndex: eventIdx, Type: apiTypes.RuntimeEventTypeAccountsTransfer, Body: event.Transfer, WithScope: ScopedSdkEvent{Accounts: event}, @@ -694,11 +704,12 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.Burn != nil { - ownerAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Burn.Owner) + ownerAddr, err1 := addresses.FromSdkAddress(&event.Burn.Owner) if err1 != nil { return fmt.Errorf("owner: %w", err1) } eventData := EventData{ + EventIndex: eventIdx, Type: apiTypes.RuntimeEventTypeAccountsBurn, Body: event.Burn, WithScope: ScopedSdkEvent{Accounts: event}, @@ -707,11 +718,12 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.Mint != nil { - ownerAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Mint.Owner) + ownerAddr, err1 := addresses.FromSdkAddress(&event.Mint.Owner) if err1 != nil { return fmt.Errorf("owner: %w", err1) } eventData := EventData{ + EventIndex: eventIdx, Type: apiTypes.RuntimeEventTypeAccountsMint, Body: event.Mint, WithScope: ScopedSdkEvent{Accounts: event}, @@ -721,18 +733,19 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad } return nil }, - ConsensusAccounts: func(event *consensusaccounts.Event) error { + ConsensusAccounts: func(event *consensusaccounts.Event, eventIndex int) error { if event.Deposit != nil { // NOTE: .From is a _consensus_ addr (not runtime). It's still related though. - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Deposit.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.Deposit.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Deposit.To) + toAddr, err1 := addresses.FromSdkAddress(&event.Deposit.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ + EventIndex: eventIndex, Type: apiTypes.RuntimeEventTypeConsensusAccountsDeposit, Body: event.Deposit, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, @@ -741,16 +754,17 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.Withdraw != nil { - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Withdraw.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.Withdraw.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } // NOTE: .To is a _consensus_ addr (not runtime). It's still related though. - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Withdraw.To) + toAddr, err1 := addresses.FromSdkAddress(&event.Withdraw.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ + EventIndex: eventIndex, Type: apiTypes.RuntimeEventTypeConsensusAccountsWithdraw, Body: event.Withdraw, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, @@ -761,15 +775,16 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad if event.Delegate != nil { // No dead reckoning needed; balance changes are signalled by other, co-emitted events. // See "LESSON" comment in the code that handles the Delegate tx. - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Delegate.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.Delegate.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.Delegate.To) + toAddr, err1 := addresses.FromSdkAddress(&event.Delegate.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ + EventIndex: eventIndex, Type: apiTypes.RuntimeEventTypeConsensusAccountsDelegate, Body: event.Delegate, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, @@ -778,15 +793,16 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.UndelegateStart != nil { - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.UndelegateStart.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.UndelegateStart.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.UndelegateStart.To) + toAddr, err1 := addresses.FromSdkAddress(&event.UndelegateStart.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ + EventIndex: eventIndex, Type: apiTypes.RuntimeEventTypeConsensusAccountsUndelegateStart, Body: event.UndelegateStart, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, @@ -796,15 +812,16 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad extractedEvents = append(extractedEvents, &eventData) } if event.UndelegateDone != nil { - fromAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.UndelegateDone.From) + fromAddr, err1 := addresses.FromSdkAddress(&event.UndelegateDone.From) if err1 != nil { return fmt.Errorf("from: %w", err1) } - toAddr, err1 := addresses.RegisterRelatedSdkAddress(relatedAccountAddresses, &event.UndelegateDone.To) + toAddr, err1 := addresses.FromSdkAddress(&event.UndelegateDone.To) if err1 != nil { return fmt.Errorf("to: %w", err1) } eventData := EventData{ + EventIndex: eventIndex, Type: apiTypes.RuntimeEventTypeConsensusAccountsUndelegateDone, Body: event.UndelegateDone, WithScope: ScopedSdkEvent{ConsensusAccounts: event}, @@ -814,12 +831,13 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad } return nil }, - EVM: func(event *sdkEVM.Event) error { - eventAddr, err1 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, event.Address) + EVM: func(event *sdkEVM.Event, eventIndex int) error { + eventAddr, err1 := addresses.RegisterEthAddress(blockData.AddressPreimages, event.Address) if err1 != nil { return fmt.Errorf("event address: %w", err1) } eventData := EventData{ + EventIndex: eventIndex, Type: apiTypes.RuntimeEventTypeEvmLog, Body: event, WithScope: ScopedSdkEvent{EVM: event}, @@ -830,7 +848,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad fromZero := bytes.Equal(fromECAddr.Bytes(), eth.ZeroEthAddr) toZero := bytes.Equal(toECAddr.Bytes(), eth.ZeroEthAddr) if !fromZero { - fromAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, fromECAddr.Bytes()) + fromAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, fromECAddr.Bytes()) if err2 != nil { return fmt.Errorf("from: %w", err2) } @@ -838,7 +856,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad registerTokenDecrease(blockData.TokenBalanceChanges, eventAddr, fromAddr, value) } if !toZero { - toAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, toECAddr.Bytes()) + toAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, toECAddr.Bytes()) if err2 != nil { return fmt.Errorf("to: %w", err2) } @@ -882,14 +900,14 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad }, ERC20Approval: func(ownerECAddr ethCommon.Address, spenderECAddr ethCommon.Address, value *big.Int) error { if !bytes.Equal(ownerECAddr.Bytes(), eth.ZeroEthAddr) { - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } eventData.RelatedAddresses[ownerAddr] = struct{}{} } if !bytes.Equal(spenderECAddr.Bytes(), eth.ZeroEthAddr) { - spenderAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, spenderECAddr.Bytes()) + spenderAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, spenderECAddr.Bytes()) if err2 != nil { return fmt.Errorf("spender: %w", err2) } @@ -927,7 +945,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad var fromAddr, toAddr apiTypes.Address if !fromZero { var err2 error - fromAddr, err2 = addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, fromECAddr.Bytes()) + fromAddr, err2 = addresses.RegisterEthAddress(blockData.AddressPreimages, fromECAddr.Bytes()) if err2 != nil { return fmt.Errorf("from: %w", err2) } @@ -936,7 +954,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad } if !toZero { var err2 error - toAddr, err2 = addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, toECAddr.Bytes()) + toAddr, err2 = addresses.RegisterEthAddress(blockData.AddressPreimages, toECAddr.Bytes()) if err2 != nil { return fmt.Errorf("to: %w", err2) } @@ -990,14 +1008,14 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad }, ERC721Approval: func(ownerECAddr ethCommon.Address, approvedECAddr ethCommon.Address, tokenID *big.Int) error { if !bytes.Equal(ownerECAddr.Bytes(), eth.ZeroEthAddr) { - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } eventData.RelatedAddresses[ownerAddr] = struct{}{} } if !bytes.Equal(approvedECAddr.Bytes(), eth.ZeroEthAddr) { - approvedAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, approvedECAddr.Bytes()) + approvedAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, approvedECAddr.Bytes()) if err2 != nil { return fmt.Errorf("approved: %w", err2) } @@ -1032,14 +1050,14 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad }, ERC721ApprovalForAll: func(ownerECAddr ethCommon.Address, operatorECAddr ethCommon.Address, approved bool) error { if !bytes.Equal(ownerECAddr.Bytes(), eth.ZeroEthAddr) { - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } eventData.RelatedAddresses[ownerAddr] = struct{}{} } if !bytes.Equal(operatorECAddr.Bytes(), eth.ZeroEthAddr) { - operatorAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, operatorECAddr.Bytes()) + operatorAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, operatorECAddr.Bytes()) if err2 != nil { return fmt.Errorf("operator: %w", err2) } @@ -1070,17 +1088,17 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad return nil }, IUniswapV2FactoryPairCreated: func(token0ECAddr ethCommon.Address, token1ECAddr ethCommon.Address, pairECAddr ethCommon.Address, allPairsLength *big.Int) error { - token0Addr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, token0ECAddr.Bytes()) + token0Addr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, token0ECAddr.Bytes()) if err != nil { return fmt.Errorf("token0: %w", err) } eventData.RelatedAddresses[token0Addr] = struct{}{} - token1Addr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, token1ECAddr.Bytes()) + token1Addr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, token1ECAddr.Bytes()) if err != nil { return fmt.Errorf("token1: %w", err) } eventData.RelatedAddresses[token1Addr] = struct{}{} - pairAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, pairECAddr.Bytes()) + pairAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, pairECAddr.Bytes()) if err != nil { return fmt.Errorf("pair: %w", err) } @@ -1121,7 +1139,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad return nil }, IUniswapV2PairMint: func(senderECAddr ethCommon.Address, amount0 *big.Int, amount1 *big.Int) error { - senderAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, senderECAddr.Bytes()) + senderAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, senderECAddr.Bytes()) if err != nil { return fmt.Errorf("sender: %w", err) } @@ -1152,12 +1170,12 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad return nil }, IUniswapV2PairBurn: func(senderECAddr ethCommon.Address, amount0 *big.Int, amount1 *big.Int, toECAddr ethCommon.Address) error { - senderAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, senderECAddr.Bytes()) + senderAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, senderECAddr.Bytes()) if err != nil { return fmt.Errorf("sender: %w", err) } eventData.RelatedAddresses[senderAddr] = struct{}{} - toAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, toECAddr.Bytes()) + toAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, toECAddr.Bytes()) if err != nil { return fmt.Errorf("to: %w", err) } @@ -1193,12 +1211,12 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad return nil }, IUniswapV2PairSwap: func(senderECAddr ethCommon.Address, amount0In *big.Int, amount1In *big.Int, amount0Out *big.Int, amount1Out *big.Int, toECAddr ethCommon.Address) error { - senderAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, senderECAddr.Bytes()) + senderAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, senderECAddr.Bytes()) if err != nil { return fmt.Errorf("sender: %w", err) } eventData.RelatedAddresses[senderAddr] = struct{}{} - toAddr, err := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, toECAddr.Bytes()) + toAddr, err := addresses.RegisterEthAddress(blockData.AddressPreimages, toECAddr.Bytes()) if err != nil { return fmt.Errorf("to: %w", err) } @@ -1275,7 +1293,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad WROSEDeposit: func(ownerECAddr ethCommon.Address, amount *big.Int) error { wrapperAddr := eventAddr // the WROSE wrapper contract is implicitly the address that emitted the contract - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } @@ -1322,7 +1340,7 @@ func extractEvents(blockData *BlockData, relatedAccountAddresses map[apiTypes.Ad WROSEWithdrawal: func(ownerECAddr ethCommon.Address, amount *big.Int) error { wrapperAddr := eventAddr // the WROSE wrapper contract is implicitly the address that emitted the contract - ownerAddr, err2 := addresses.RegisterRelatedEthAddress(blockData.AddressPreimages, relatedAccountAddresses, ownerECAddr.Bytes()) + ownerAddr, err2 := addresses.RegisterEthAddress(blockData.AddressPreimages, ownerECAddr.Bytes()) if err2 != nil { return fmt.Errorf("owner: %w", err2) } diff --git a/analyzer/runtime/runtime.go b/analyzer/runtime/runtime.go index 25c457447..bf12e559f 100644 --- a/analyzer/runtime/runtime.go +++ b/analyzer/runtime/runtime.go @@ -401,22 +401,30 @@ func (m *processor) queueDbUpdates(batch *storage.QueryBatch, data *BlockData) { // Insert events. for _, eventData := range data.EventData { - eventRelatedAddresses := addresses.SliceFromSet(eventData.RelatedAddresses) batch.Queue( queries.RuntimeEventInsert, m.runtime, data.Header.Round, + eventData.EventIndex, eventData.TxIndex, eventData.TxHash, eventData.TxEthHash, data.Header.Timestamp, eventData.Type, eventData.Body, - eventRelatedAddresses, eventData.EvmLogName, eventData.EvmLogParams, eventData.EvmLogSignature, ) + + eventRelatedAddresses := addresses.SliceFromSet(eventData.RelatedAddresses) + batch.Queue( + queries.RuntimeEventRelatedAccountsInsert, + m.runtime, + data.Header.Round, + eventData.EventIndex, + eventRelatedAddresses, + ) } // Insert address preimages. diff --git a/analyzer/runtime/runtime_test.go b/analyzer/runtime/runtime_test.go index ae201d42e..cccdd4d64 100644 --- a/analyzer/runtime/runtime_test.go +++ b/analyzer/runtime/runtime_test.go @@ -97,9 +97,9 @@ func (mock *mockNode) GetEventsRaw(ctx context.Context, round uint64) ([]nodeapi // Include events that were part of transactions. txrs := mock.Txs[round] - for _, tx := range txrs { + for i, tx := range txrs { for _, ev := range tx.Events { - events = append(events, nodeapi.RuntimeEvent(*ev)) + events = append(events, nodeapi.RuntimeEvent{Event: ev, Index: uint64(i)}) } } diff --git a/analyzer/runtime/visitors.go b/analyzer/runtime/visitors.go index 710433e42..6354c469f 100644 --- a/analyzer/runtime/visitors.go +++ b/analyzer/runtime/visitors.go @@ -130,63 +130,69 @@ func VisitCall(call *sdkTypes.Call, result *sdkTypes.CallResult, handler *CallHa } type SdkEventHandler struct { - Core func(event *core.Event) error - Accounts func(event *accounts.Event) error - ConsensusAccounts func(event *consensusaccounts.Event) error - EVM func(event *evm.Event) error + Core func(event *core.Event, eventIdx int) error + Accounts func(event *accounts.Event, eventIdx int) error + ConsensusAccounts func(event *consensusaccounts.Event, eventIdx int) error + EVM func(event *evm.Event, eventIdx int) error } -func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler) error { +func VisitSdkEvent(event *nodeapi.RuntimeEvent, handler *SdkEventHandler, currIdx int) (int, error) { if handler.Core != nil { coreEvents, err := DecodeCoreEvent(event) if err != nil { - return fmt.Errorf("decode core: %w", err) + return currIdx, fmt.Errorf("decode core: %w", err) } for i := range coreEvents { - if err = handler.Core(&coreEvents[i]); err != nil { - return fmt.Errorf("decoded event %d core: %w", i, err) + if err = handler.Core(&coreEvents[i], currIdx); err != nil { + return currIdx, fmt.Errorf("decoded event %d core: %w", i, err) } + currIdx++ } } if handler.Accounts != nil { accountEvents, err := DecodeAccountsEvent(event) if err != nil { - return fmt.Errorf("decode accounts: %w", err) + return currIdx, fmt.Errorf("decode accounts: %w", err) } for i := range accountEvents { - if err = handler.Accounts(&accountEvents[i]); err != nil { - return fmt.Errorf("decoded event %d accounts: %w", i, err) + if err = handler.Accounts(&accountEvents[i], currIdx); err != nil { + return currIdx, fmt.Errorf("decoded event %d accounts: %w", i, err) } + currIdx++ } } if handler.ConsensusAccounts != nil { consensusAccountsEvents, err := DecodeConsensusAccountsEvent(event) if err != nil { - return fmt.Errorf("decode consensus accounts: %w", err) + return currIdx, fmt.Errorf("decode consensus accounts: %w", err) } for i := range consensusAccountsEvents { - if err = handler.ConsensusAccounts(&consensusAccountsEvents[i]); err != nil { - return fmt.Errorf("decoded event %d consensus accounts: %w", i, err) + if err = handler.ConsensusAccounts(&consensusAccountsEvents[i], currIdx); err != nil { + return currIdx, fmt.Errorf("decoded event %d consensus accounts: %w", i, err) } + currIdx++ } } if handler.EVM != nil { evmEvents, err := DecodeEVMEvent(event) if err != nil { - return fmt.Errorf("decode evm: %w", err) + return currIdx, fmt.Errorf("decode evm: %w", err) } for i := range evmEvents { - if err = handler.EVM(&evmEvents[i]); err != nil { - return fmt.Errorf("decoded event %d evm: %w", i, err) + if err = handler.EVM(&evmEvents[i], currIdx); err != nil { + return currIdx, fmt.Errorf("decoded event %d evm: %w", i, err) } + currIdx++ } } - return nil + return currIdx, nil } func VisitSdkEvents(events []nodeapi.RuntimeEvent, handler *SdkEventHandler) error { + var currIdx int + var err error for i := range events { - if err := VisitSdkEvent(&events[i], handler); err != nil { + if currIdx, err = VisitSdkEvent(&events[i], handler, currIdx); err != nil { return fmt.Errorf("event %d: %w; raw event: %+v", i, err, events[i]) } } diff --git a/analyzer/util/addresses/registration.go b/analyzer/util/addresses/registration.go index 979aedefa..2575d86ec 100644 --- a/analyzer/util/addresses/registration.go +++ b/analyzer/util/addresses/registration.go @@ -58,7 +58,7 @@ func extractAddressPreimage(as *sdkTypes.AddressSpec) (*PreimageData, error) { }, nil } -func registerAddressSpec(addressPreimages map[apiTypes.Address]*PreimageData, as *sdkTypes.AddressSpec) (apiTypes.Address, error) { +func RegisterAddressSpec(addressPreimages map[apiTypes.Address]*PreimageData, as *sdkTypes.AddressSpec) (apiTypes.Address, error) { addr, err := FromAddressSpec(as) if err != nil { return "", err @@ -75,7 +75,7 @@ func registerAddressSpec(addressPreimages map[apiTypes.Address]*PreimageData, as return addr, nil } -func registerEthAddress(addressPreimages map[apiTypes.Address]*PreimageData, ethAddr []byte) (apiTypes.Address, error) { +func RegisterEthAddress(addressPreimages map[apiTypes.Address]*PreimageData, ethAddr []byte) (apiTypes.Address, error) { addr, err := FromEthAddress(ethAddr) if err != nil { return "", err @@ -92,7 +92,7 @@ func registerEthAddress(addressPreimages map[apiTypes.Address]*PreimageData, eth return addr, nil } -func registerRuntimeAddress(addressPreimages map[apiTypes.Address]*PreimageData, id coreCommon.Namespace) (apiTypes.Address, error) { +func RegisterRuntimeAddress(addressPreimages map[apiTypes.Address]*PreimageData, id coreCommon.Namespace) (apiTypes.Address, error) { addr, err := FromRuntimeID(id) if err != nil { return "", err @@ -112,69 +112,3 @@ func registerRuntimeAddress(addressPreimages map[apiTypes.Address]*PreimageData, return addr, nil } - -func RegisterRelatedSdkAddress(relatedAddresses map[apiTypes.Address]struct{}, sdkAddr *sdkTypes.Address) (apiTypes.Address, error) { - addr, err := FromSdkAddress(sdkAddr) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedAddressSpec(addressPreimages map[apiTypes.Address]*PreimageData, relatedAddresses map[apiTypes.Address]struct{}, as *sdkTypes.AddressSpec) (apiTypes.Address, error) { - addr, err := registerAddressSpec(addressPreimages, as) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedOCAddress(relatedAddresses map[apiTypes.Address]struct{}, ocAddr address.Address) (apiTypes.Address, error) { - addr, err := FromOCAddress(ocAddr) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedOCSAddress(relatedAddresses map[apiTypes.Address]struct{}, ocsAddr staking.Address) (apiTypes.Address, error) { - addr, err := FromOCSAddress(ocsAddr) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedEthAddress(addressPreimages map[apiTypes.Address]*PreimageData, relatedAddresses map[apiTypes.Address]struct{}, ethAddr []byte) (apiTypes.Address, error) { - addr, err := registerEthAddress(addressPreimages, ethAddr) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} - -func RegisterRelatedRuntimeAddress(addressPreimages map[apiTypes.Address]*PreimageData, relatedAddresses map[apiTypes.Address]struct{}, id coreCommon.Namespace) (apiTypes.Address, error) { - addr, err := registerRuntimeAddress(addressPreimages, id) - if err != nil { - return "", err - } - - relatedAddresses[addr] = struct{}{} - - return addr, nil -} diff --git a/storage/client/client.go b/storage/client/client.go index 93196f0b7..1028d06d0 100644 --- a/storage/client/client.go +++ b/storage/client/client.go @@ -1583,7 +1583,12 @@ func (c *StorageClient) RuntimeEvents(ctx context.Context, p apiTypes.GetRuntime evmLogSignature = &h } if p.NftId != nil && p.ContractAddress == nil { - return nil, fmt.Errorf("must specify contract_address with nft_id") + return nil, fmt.Errorf("'nft_id' can only be used in combination with 'contract_address'") + } + if p.ContractAddress != nil { + if p.NftId == nil && p.EvmLogSignature == nil { + return nil, fmt.Errorf("'contract_address' can only be used in combination with either 'nft_id' or 'evm_log_signature'") + } } ocAddrContract, err := apiTypes.UnmarshalToOcAddress(p.ContractAddress) if err != nil { diff --git a/storage/client/queries/queries.go b/storage/client/queries/queries.go index e3f0f1abb..c9221be9d 100644 --- a/storage/client/queries/queries.go +++ b/storage/client/queries/queries.go @@ -586,6 +586,12 @@ const ( (evs.runtime=tokens.runtime) AND (preimages.address=tokens.token_address) AND (tokens.token_type IS NOT NULL) -- exclude token _candidates_ that we haven't inspected yet; we have no info about them (name, decimals, etc) + LEFT JOIN chain.runtime_events_related_accounts as rel ON + (evs.runtime = rel.runtime) AND + (evs.round = rel.round) AND + (evs.event_index = rel.event_index) AND + -- When related_address ($7) is NULL and hence we do no filtering on it, avoid the join altogether. + ($7::text IS NOT NULL) WHERE (evs.runtime = $1) AND ($2::bigint IS NULL OR evs.round = $2::bigint) AND @@ -593,7 +599,7 @@ const ( ($4::text IS NULL OR evs.tx_hash = $4::text OR evs.tx_eth_hash = $4::text) AND ($5::text IS NULL OR evs.type = $5::text) AND ($6::bytea IS NULL OR evs.evm_log_signature = $6::bytea) AND - ($7::text IS NULL OR evs.related_accounts @> ARRAY[$7::text]) AND + ($7::text IS NULL OR rel.account_address = $7::text) AND ($8::text IS NULL OR ( -- Currently this only supports EVM smart contracts. evs.type = 'evm.log' AND diff --git a/storage/migrations/01_runtimes.up.sql b/storage/migrations/01_runtimes.up.sql index 5d23d13ce..190552c50 100644 --- a/storage/migrations/01_runtimes.up.sql +++ b/storage/migrations/01_runtimes.up.sql @@ -90,15 +90,15 @@ CREATE TABLE chain.runtime_transactions error_module TEXT, error_code UINT63, error_message TEXT, - -- The unparsed transaction error message. The "parsed" version will be + -- The unparsed transaction error message. The "parsed" version will be -- identical in the majority of cases. One notable exception are txs that -- were reverted inside the EVM; for those, the raw msg is abi-encoded. error_message_raw TEXT, -- Custom errors may be arbitrarily defined by the contract abi. This field - -- stores the full abi-decoded error object. Note that the error name is + -- stores the full abi-decoded error object. Note that the error name is -- stored separately in the existing error_message column. For example, if we -- have an error like `InsufficientBalance{available: 4, required: 10}`. - -- the error_message column would hold `InsufficientBalance`, and + -- the error_message column would hold `InsufficientBalance`, and -- the error_params column would store `{available: 4, required: 10}`. error_params JSONB, -- Internal tracking for parsing evm.Call transactions using the contract @@ -144,18 +144,19 @@ CREATE TABLE chain.runtime_events ( runtime runtime NOT NULL, round UINT63 NOT NULL, + event_index UINT31 NOT NULL, -- The index of the event within the block - as returned by the GetEventsRaw RPC. + PRIMARY KEY (runtime, round, event_index), + tx_index UINT31, - FOREIGN KEY (runtime, round, tx_index) REFERENCES chain.runtime_transactions(runtime, round, tx_index) DEFERRABLE INITIALLY DEFERRED, tx_hash HEX64, tx_eth_hash HEX64, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, - + -- TODO: add link to openapi spec section with runtime event types. type TEXT NOT NULL, -- The raw event, as returned by the oasis-sdk runtime client. body JSONB NOT NULL, - related_accounts TEXT[], -- The events of type `evm.log` are further parsed into known event types, e.g. (ERC20) Transfer, -- to populate the `evm_log_name`, `evm_log_params`, and `evm_log_signature` fields. @@ -167,12 +168,13 @@ CREATE TABLE chain.runtime_events -- Internal tracking for parsing evm.Call events using the contract -- abi when available. - abi_parsed_at TIMESTAMP WITH TIME ZONE + abi_parsed_at TIMESTAMP WITH TIME ZONE, + + related_accounts TEXT[] -- Removed in 07_runtime_events_related_accounts.up.sql. ); CREATE INDEX ix_runtime_events_round ON chain.runtime_events(runtime, round); -- for sorting by round, when there are no filters applied CREATE INDEX ix_runtime_events_tx_hash ON chain.runtime_events USING hash (tx_hash); CREATE INDEX ix_runtime_events_tx_eth_hash ON chain.runtime_events USING hash (tx_eth_hash); -CREATE INDEX ix_runtime_events_related_accounts ON chain.runtime_events USING gin(related_accounts); -- for fetching account activity for a given account CREATE INDEX ix_runtime_events_evm_log_signature ON chain.runtime_events(runtime, evm_log_signature, round); -- for fetching a certain event type, eg Transfers CREATE INDEX ix_runtime_events_evm_log_params ON chain.runtime_events USING gin(evm_log_params); CREATE INDEX ix_runtime_events_type ON chain.runtime_events (runtime, type); @@ -181,6 +183,17 @@ CREATE INDEX ix_runtime_events_nft_transfers ON chain.runtime_events (runtime, ( type = 'evm.log' AND evm_log_signature = '\xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' AND jsonb_array_length(body -> 'topics') = 4; +-- CREATE INDEX ix_runtime_events_related_accounts ON chain.runtime_events USING gin(related_accounts); -- for fetching account activity for a given account -- Deleted in 08_runtime_events_related_accounts.up.sql. + +-- Added in 07_runtime_events_related_accounts.up.sql. +-- CREATE TABLE chain.runtime_events_related_accounts +-- ( +-- runtime runtime NOT NULL, +-- round UINT63 NOT NULL, +-- event_index UINT31 NOT NULL, +-- related_account TEXT NOT NULL +-- PRIMARY KEY (runtime, round, event_index, related_account), +-- ); -- Roothash messages are small structures that a runtime can send to -- communicate with the consensus layer. They are agreed upon for each runtime @@ -230,13 +243,13 @@ CREATE TABLE chain.runtime_accounts -- we've encountered the preimage before to find out what the address was -- derived from. -- --- If you need to go the other way, from context + data to address, you'd +-- If you need to go the other way, from context + data to address, you'd -- normally just run the derivation. See oasis-core/go/common/crypto/address/address.go --- for details. Consider inserting the preimage here if you're ingesting new +-- for details. Consider inserting the preimage here if you're ingesting new -- blockchain data. -- --- However, we do provide an index going the other way because certain queries --- require computing the derivation within Postgres and implementing/importing +-- However, we do provide an index going the other way because certain queries +-- require computing the derivation within Postgres and implementing/importing -- the right hash function will take some work. -- TODO: import keccak hash into postgres. -- @@ -256,7 +269,7 @@ CREATE TABLE chain.address_preimages -- Ethereum address. For a "staking" context, this is the ed25519 pubkey. address_data BYTEA NOT NULL ); -CREATE INDEX ix_address_preimages_address_data ON chain.address_preimages (address_data) +CREATE INDEX ix_address_preimages_address_data ON chain.address_preimages (address_data) WHERE context_identifier = 'oasis-runtime-sdk/address: secp256k1eth' AND context_version = 0; -- -- -- -- -- -- -- -- -- -- -- -- -- Module evm -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- @@ -285,13 +298,13 @@ CREATE TABLE chain.evm_tokens -- NOT an uint because a non-conforming token contract could issue a fake burn event, -- causing a negative dead-reckoned total_supply. total_supply NUMERIC(1000,0), - + num_transfers UINT63 NOT NULL DEFAULT 0, - + -- Block analyzer bumps this when it sees the mutable fields of the token -- change (e.g. total supply) based on dead reckoning. last_mutate_round UINT63 NOT NULL DEFAULT 0, - + -- Token analyzer bumps this when it downloads info about the token. last_download_round UINT63 ); @@ -304,7 +317,7 @@ CREATE TABLE analysis.evm_token_balances token_address oasis_addr NOT NULL, account_address oasis_addr NOT NULL, PRIMARY KEY (runtime, token_address, account_address), - + last_mutate_round UINT63 NOT NULL, last_download_round UINT63 ); diff --git a/storage/migrations/07_runtime_events_related_accounts.up.sql b/storage/migrations/07_runtime_events_related_accounts.up.sql new file mode 100644 index 000000000..79bdc7cd6 --- /dev/null +++ b/storage/migrations/07_runtime_events_related_accounts.up.sql @@ -0,0 +1,25 @@ +BEGIN; + +CREATE TABLE chain.runtime_events_related_accounts +( + runtime runtime NOT NULL, + round UINT63 NOT NULL, + event_index UINT31 NOT NULL, + related_account TEXT NOT NULL, + PRIMARY KEY (runtime, round, event_index, related_account) +); + +-- Used for fetching all events related to an account (sorted by round). +CREATE INDEX ix_runtime_events_related_accounts_related_account_round ON chain.runtime_events_related_accounts(runtime, related_account, round); + +DROP INDEX IF EXISTS chain.ix_runtime_events_related_accounts; + +-- TODO: we need a more high-level ("go") migration for this. Basically do a re-index but just of the runtime_events table. +-- This is not something our migration framework currently supports. +-- If we plan to avoid the need for many reindexing in the future, we should consider implementing support for this. +-- Alternatively, we plan on doing more reindexing in future, then we can just wait with this change until we do the next reindexing. + +ALTER TABLE chain.runtime_events DROP COLUMN related_accounts; + + +COMMIT; diff --git a/storage/oasis/nodeapi/api.go b/storage/oasis/nodeapi/api.go index 49f72da77..4b78429c2 100644 --- a/storage/oasis/nodeapi/api.go +++ b/storage/oasis/nodeapi/api.go @@ -284,10 +284,14 @@ type RuntimeApiLite interface { } type ( - RuntimeEvent sdkTypes.Event RuntimeTransactionWithResults sdkClient.TransactionWithResults ) +type RuntimeEvent struct { + *sdkTypes.Event + Index uint64 +} + // Derived from oasis-core: roothash/api/block/header.go // Expanded to include the precomputed hash of the header; // we mustn't compute it on the fly because depending on the diff --git a/storage/oasis/nodeapi/file/runtime.go b/storage/oasis/nodeapi/file/runtime.go index a5f7e3fc1..664b6d89a 100644 --- a/storage/oasis/nodeapi/file/runtime.go +++ b/storage/oasis/nodeapi/file/runtime.go @@ -70,12 +70,40 @@ func (r *FileRuntimeApiLite) GetTransactionsWithResults(ctx context.Context, rou ) } +// nodeapi.RuntimeEvent changed from being an alias of sdkTypes.Event, to a struct which additionally contains the index of the event. +// To avoid invalidating the cache (causing the need to re-index all events), we keep using the old type in the caching backend and convert between +// the types on the fly. This is possible since the index of the event is the order in which the event is returned by the GetEventsRaw method. func (r *FileRuntimeApiLite) GetEventsRaw(ctx context.Context, round uint64) ([]nodeapi.RuntimeEvent, error) { - return kvstore.GetSliceFromCacheOrCall( + type CachedRuntimeEvent = sdkTypes.Event + + cachedEvents, err := kvstore.GetSliceFromCacheOrCall( r.db, round == roothash.RoundLatest, kvstore.GenerateCacheKey("GetEventsRaw", r.runtime, round), - func() ([]nodeapi.RuntimeEvent, error) { return r.runtimeApi.GetEventsRaw(ctx, round) }, + func() ([]CachedRuntimeEvent, error) { + rawEvents, err := r.runtimeApi.GetEventsRaw(ctx, round) + if err != nil { + return nil, err + } + cachedEvents := make([]CachedRuntimeEvent, len(rawEvents)) + for i, ev := range rawEvents { + cachedEvents[i] = *ev.Event + } + return cachedEvents, nil + }, ) + if err != nil { + return nil, err + } + + // Convert to nexus-internal type. + evs := make([]nodeapi.RuntimeEvent, len(cachedEvents)) + for i, ev := range cachedEvents { + evs[i] = nodeapi.RuntimeEvent{ + Event: &ev, + Index: uint64(i), + } + } + return evs, nil } func (r *FileRuntimeApiLite) GetBalances(ctx context.Context, round uint64, addr nodeapi.Address) (map[sdkTypes.Denomination]common.BigInt, error) { diff --git a/storage/oasis/nodeapi/universal_runtime.go b/storage/oasis/nodeapi/universal_runtime.go index 56e0c017e..fb66986ea 100644 --- a/storage/oasis/nodeapi/universal_runtime.go +++ b/storage/oasis/nodeapi/universal_runtime.go @@ -130,7 +130,10 @@ func (rc *UniversalRuntimeApiLite) GetEventsRaw(ctx context.Context, round uint6 // Convert to nexus-internal type. evs := make([]RuntimeEvent, len(rsp)) for i, ev := range rsp { - evs[i] = (RuntimeEvent)(*ev) + evs[i] = RuntimeEvent{ + Event: ev, + Index: uint64(i), + } } return evs, nil