diff --git a/config/config.json b/config/config.json index 9906ca1803..e16f1e509d 100644 --- a/config/config.json +++ b/config/config.json @@ -131,7 +131,10 @@ "config": { "networkId": "otp::testnet", "hubContractAddress": "0x707233a55bD035C6Bc732196CA4dbffa63CbA169", - "rpcEndpoints": ["https://lofar-tm-rpc.origin-trail.network"], + "rpcEndpoints": [ + "https://lofar-tm-rpc.origin-trail.network", + "https://lofar.origintrail.network/" + ], "initialStakeAmount": 50000, "initialAskAmount": 2 } @@ -166,17 +169,25 @@ } } } + }, + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": false, + "signalingServerUrl": "null" + } + } + } } }, "maximumAssertionSizeInKb": 2500, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "info", - "telemetry": { - "enabled": true, - "sendTelemetryData": false, - "signalingServerUrl": "null" - }, "auth": { "ipBasedAuthEnabled": true, "tokenBasedAuthEnabled": false, @@ -301,17 +312,25 @@ "config": {} } } + }, + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": false, + "signalingServerUrl": "null" + } + } + } } }, "maximumAssertionSizeInKb": 2500, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", - "telemetry": { - "enabled": true, - "sendTelemetryData": false, - "signalingServerUrl": "null" - }, "auth": { "ipBasedAuthEnabled": true, "tokenBasedAuthEnabled": false, @@ -448,17 +467,25 @@ "config": {} } } + }, + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": true, + "signalingServerUrl": "https://testnet-signaling.origin-trail.network/signal" + } + } + } } }, "maximumAssertionSizeInKb": 2500, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", - "telemetry": { - "enabled": true, - "sendTelemetryData": true, - "signalingServerUrl": "https://testnet-signaling.origin-trail.network/signal" - }, "auth": { "ipBasedAuthEnabled": true, "tokenBasedAuthEnabled": false, @@ -561,7 +588,8 @@ "hubContractAddress": "0x5fA7916c48Fe6D5F1738d12Ad234b78c90B4cAdA", "rpcEndpoints": [ "https://astrosat-parachain-rpc.origin-trail.network", - "https://astrosat.origintrail.network/" + "https://astrosat.origintrail.network/", + "https://astrosat-2.origintrail.network/" ] } } @@ -596,17 +624,25 @@ "config": {} } } + }, + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": true, + "signalingServerUrl": "https://mainnet-signaling.origin-trail.network/signal" + } + } + } } }, "maximumAssertionSizeInKb": 2500, "commandExecutorVerboseLoggingEnabled": false, "appDataPath": "data", "logLevel": "trace", - "telemetry": { - "enabled": true, - "sendTelemetryData": true, - "signalingServerUrl": "https://mainnet-signaling.origin-trail.network/signal" - }, "auth": { "ipBasedAuthEnabled": true, "tokenBasedAuthEnabled": false, diff --git a/ot-node.js b/ot-node.js index fa68cfc2af..716669d52d 100644 --- a/ot-node.js +++ b/ot-node.js @@ -11,6 +11,7 @@ import OtnodeUpdateCommand from './src/commands/common/otnode-update-command.js' import OtAutoUpdater from './src/modules/auto-updater/implementation/ot-auto-updater.js'; import PullBlockchainShardingTableMigration from './src/migration/pull-sharding-table-migration.js'; import TripleStoreUserConfigurationMigration from './src/migration/triple-store-user-configuration-migration.js'; +import TelemetryModuleUserConfigurationMigration from './src/migration/telemetry-module-user-configuration-migration.js'; import PrivateAssetsMetadataMigration from './src/migration/private-assets-metadata-migration.js'; import ServiceAgreementsMetadataMigration from './src/migration/service-agreements-metadata-migration.js'; import RemoveAgreementStartEndTimeMigration from './src/migration/remove-agreement-start-end-time-migration.js'; @@ -36,6 +37,7 @@ class OTNode { await this.checkForUpdate(); await this.removeUpdateFile(); await this.executeTripleStoreUserConfigurationMigration(); + await this.executeTelemetryModuleUserConfigurationMigration(); this.logger.info(' ██████╗ ████████╗███╗ ██╗ ██████╗ ██████╗ ███████╗'); this.logger.info('██╔═══██╗╚══██╔══╝████╗ ██║██╔═══██╗██╔══██╗██╔════╝'); this.logger.info('██║ ██║ ██║ ██╔██╗ ██║██║ ██║██║ ██║█████╗'); @@ -65,11 +67,11 @@ class OTNode { await this.initializeCommandExecutor(); await this.initializeShardingTableService(); - await this.initializeTelemetryInjectionService(); await this.initializeBlockchainEventListenerService(); await this.initializeRouters(); await this.startNetworkModule(); + this.startTelemetryModule(); this.resumeCommandExecutor(); this.logger.info('Node is up and running!'); } @@ -275,6 +277,21 @@ class OTNode { await networkModuleManager.start(); } + startTelemetryModule() { + const telemetryModuleManager = this.container.resolve('telemetryModuleManager'); + const repositoryModuleManager = this.container.resolve('repositoryModuleManager'); + telemetryModuleManager.listenOnEvents((eventData) => { + repositoryModuleManager.createEventRecord( + eventData.operationId, + eventData.lastEvent, + eventData.timestamp, + eventData.value1, + eventData.value2, + eventData.value3, + ); + }); + } + async executePrivateAssetsMetadataMigration() { if ( process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || @@ -305,6 +322,25 @@ class OTNode { } } + async executeTelemetryModuleUserConfigurationMigration() { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const migration = new TelemetryModuleUserConfigurationMigration( + 'telemetryModuleUserConfigurationMigration', + this.logger, + this.config, + ); + if (!(await migration.migrationAlreadyExecuted())) { + await migration.migrate(); + this.logger.info('Node will now restart!'); + this.stop(1); + } + } + async executeTripleStoreUserConfigurationMigration() { if ( process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || @@ -474,22 +510,6 @@ class OTNode { } } - async initializeTelemetryInjectionService() { - if (this.config.telemetry.enabled) { - try { - const telemetryHubModuleManager = this.container.resolve( - 'telemetryInjectionService', - ); - telemetryHubModuleManager.initialize(); - this.logger.info('Telemetry Injection Service initialized successfully'); - } catch (e) { - this.logger.error( - `Telemetry hub module initialization failed. Error message: ${e.message}`, - ); - } - } - } - async removeUpdateFile() { const updateFilePath = this.fileService.getUpdateFilePath(); await this.fileService.removeFile(updateFilePath).catch((error) => { diff --git a/package-lock.json b/package-lock.json index 4fdac68a57..c4bc3b26fd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.0.16", + "version": "6.0.18", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.0.16", + "version": "6.0.18", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index 307fadd425..11eec6e27d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.0.16", + "version": "6.0.18", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/commands/common/send-telemetry-command.js b/src/commands/common/send-telemetry-command.js index 03fa2ff128..ada1aab635 100644 --- a/src/commands/common/send-telemetry-command.js +++ b/src/commands/common/send-telemetry-command.js @@ -1,4 +1,3 @@ -import axios from 'axios'; import { createRequire } from 'module'; import Command from '../command.js'; import { SEND_TELEMETRY_COMMAND_FREQUENCY_MINUTES } from '../../constants/constants.js'; @@ -11,9 +10,10 @@ class SendTelemetryCommand extends Command { super(ctx); this.logger = ctx.logger; this.config = ctx.config; - this.telemetryInjectionService = ctx.telemetryInjectionService; this.networkModuleManager = ctx.networkModuleManager; this.blockchainModuleManager = ctx.blockchainModuleManager; + this.repositoryModuleManager = ctx.repositoryModuleManager; + this.telemetryModuleManager = ctx.telemetryModuleManager; } /** @@ -21,35 +21,31 @@ class SendTelemetryCommand extends Command { * @param command */ async execute() { - if (!this.config.telemetry.enabled || !this.config.telemetry.sendTelemetryData) { + if ( + !this.config.modules.telemetry.enabled || + !this.telemetryModuleManager.getModuleConfiguration().sendTelemetryData + ) { return Command.empty(); } + try { - const events = await this.telemetryInjectionService.getUnpublishedEvents(); - const signalingMessage = { - nodeData: { - version: pjson.version, - identity: this.networkModuleManager.getPeerId().toB58String(), - hostname: this.config.hostname, - operational_wallet: this.blockchainModuleManager.getPublicKey(), - management_wallet: this.blockchainModuleManager.getManagementKey(), - triple_store: this.config.modules.tripleStore.defaultImplementation, - auto_update_enabled: this.config.modules.autoUpdater.enabled, - multiaddresses: this.networkModuleManager.getMultiaddrs(), - }, - events: events || [], - }; - const config = { - method: 'post', - url: this.config.telemetry.signalingServerUrl, - headers: { - 'Content-Type': 'application/json', - }, - data: JSON.stringify(signalingMessage), + const events = (await this.getUnpublishedEvents()) || []; + const nodeData = { + version: pjson.version, + identity: this.networkModuleManager.getPeerId().toB58String(), + hostname: this.config.hostname, + operational_wallet: this.blockchainModuleManager.getPublicKey(), + management_wallet: this.blockchainModuleManager.getManagementKey(), + triple_store: this.config.modules.tripleStore.defaultImplementation, + auto_update_enabled: this.config.modules.autoUpdater.enabled, + multiaddresses: this.networkModuleManager.getMultiaddrs(), }; - const response = await axios(config); - if (response.status === 200 && events?.length > 0) { - await this.telemetryInjectionService.removePublishedEvents(events); + const isDataSuccessfullySent = await this.telemetryModuleManager.sendTelemetryData( + nodeData, + events, + ); + if (isDataSuccessfullySent && events?.length > 0) { + await this.removePublishedEvents(events); } } catch (e) { await this.handleError(e); @@ -83,6 +79,16 @@ class SendTelemetryCommand extends Command { Object.assign(command, map); return command; } + + async getUnpublishedEvents() { + return this.repositoryModuleManager.getUnpublishedEvents(); + } + + async removePublishedEvents(events) { + const ids = events.map((event) => event.id); + + await this.repositoryModuleManager.destroyEvents(ids); + } } export default SendTelemetryCommand; diff --git a/src/commands/common/validate-asset-command.js b/src/commands/common/validate-asset-command.js index 458396d630..317a45cdb6 100644 --- a/src/commands/common/validate-asset-command.js +++ b/src/commands/common/validate-asset-command.js @@ -1,5 +1,10 @@ import Command from '../command.js'; -import { ERROR_TYPE, OPERATION_ID_STATUS, LOCAL_STORE_TYPES } from '../../constants/constants.js'; +import { + ERROR_TYPE, + OPERATION_ID_STATUS, + LOCAL_STORE_TYPES, + ZERO_BYTES32, +} from '../../constants/constants.js'; class ValidateAssetCommand extends Command { constructor(ctx) { @@ -43,7 +48,7 @@ class ValidateAssetCommand extends Command { tokenId, ); } - if (!blockchainAssertionId) { + if (!blockchainAssertionId || blockchainAssertionId === ZERO_BYTES32) { return Command.retry(); } const cachedData = await this.operationIdService.getCachedOperationIdData(operationId); diff --git a/src/commands/protocols/common/submit-commit-command.js b/src/commands/protocols/common/submit-commit-command.js index 7c9cfba13a..e8534b0ee2 100644 --- a/src/commands/protocols/common/submit-commit-command.js +++ b/src/commands/protocols/common/submit-commit-command.js @@ -49,8 +49,14 @@ class SubmitCommitCommand extends Command { stateIndex, ); if (alreadySubmitted) { - this.logger.trace( - `Commit already submitted for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}`, + const errorMessage = `Commit already submitted for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}`; + this.logger.trace(errorMessage); + + this.operationIdService.emitChangeEvent( + OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_COMMIT_END, + operationId, + agreementId, + epoch, ); return Command.empty(); } @@ -65,7 +71,10 @@ class SubmitCommitCommand extends Command { epoch, stateIndex, (result) => { - if (result?.error) { + if ( + result?.error && + !result.error.message.includes('NodeAlreadySubmittedCommit') + ) { reject(result.error); } resolve(); diff --git a/src/commands/protocols/common/submit-proofs-command.js b/src/commands/protocols/common/submit-proofs-command.js index 8348480f84..7e26edf5c5 100644 --- a/src/commands/protocols/common/submit-proofs-command.js +++ b/src/commands/protocols/common/submit-proofs-command.js @@ -66,7 +66,10 @@ class SubmitProofsCommand extends Command { ); if (!assertion.length) { - this.logger.trace(`Assertion with id: ${assertionId} not found in triple store.`); + const errorMessage = `Assertion with id: ${assertionId} not found in triple store.`; + this.logger.trace(errorMessage); + + await this.handleError(operationId, errorMessage, this.errorType, true); return Command.empty(); } @@ -97,12 +100,26 @@ class SubmitProofsCommand extends Command { stateIndex, ); if (alreadySubmitted) { - this.logger.trace( - `Proofs already submitted for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}`, + const errorMessage = `Proofs already submitted for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}`; + this.logger.trace(errorMessage); + + this.operationIdService.emitChangeEvent( + OPERATION_ID_STATUS.COMMIT_PROOF.SUBMIT_PROOFS_END, + operationId, + agreementId, + epoch, ); return Command.empty(); } + if (proof.length === 0) { + const errorMessage = `Error during Merkle Proof calculation for blockchain: ${blockchain} agreement id: ${agreementId}, epoch: ${epoch}, state index: ${stateIndex}, proof cannot be empty`; + this.logger.trace(errorMessage); + + await this.handleError(operationId, errorMessage, this.errorType, true); + return Command.empty(); + } + const transactionCompletePromise = new Promise((resolve, reject) => { this.blockchainModuleManager.sendProof( blockchain, @@ -115,7 +132,7 @@ class SubmitProofsCommand extends Command { leaf, stateIndex, (result) => { - if (result?.error) { + if (result?.error && !result.error.message.includes('NodeAlreadyRewarded')) { reject(result.error); } resolve(); diff --git a/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js b/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js index 0caa031c63..5e82bd6178 100644 --- a/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js +++ b/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js @@ -83,22 +83,6 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { stateIndex, ); - await this.repositoryModuleManager.updateServiceAgreementRecord( - blockchain, - contract, - tokenId, - agreementId, - agreementData.startTime, - agreementData.epochsNumber, - agreementData.epochLength, - agreementData.scoreFunctionId, - agreementData.proofWindowOffsetPerc, - hashFunctionId, - keyword, - assertionId, - stateIndex, - ); - await this.operationIdService.updateOperationIdStatus( operationId, OPERATION_ID_STATUS.PUBLISH.PUBLISH_LOCAL_STORE_END, diff --git a/src/constants/constants.js b/src/constants/constants.js index 3fff5854e4..ba1d328b3c 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -1,5 +1,13 @@ import { BigNumber } from 'ethers'; +export const WS_RPC_PROVIDER_PRIORITY = 2; + +export const HTTP_RPC_PROVIDER_PRIORITY = 1; + +export const FALLBACK_PROVIDER_QUORUM = 1; + +export const RPC_PROVIDER_STALL_TIMEOUT = 60 * 1000; + export const UINT256_MAX_BN = BigNumber.from(2).pow(256).sub(1); export const UINT32_MAX_BN = BigNumber.from(2).pow(32).sub(1); @@ -21,7 +29,7 @@ export const COMMIT_BLOCK_DURATION_IN_BLOCKS = 5; export const COMMITS_DELAY_BETWEEN_NODES_IN_BLOCKS = 2; -export const TRANSACTION_POLLING_TIMEOUT_MILLIS = 50 * 1000; +export const TRANSACTION_POLLING_TIMEOUT_MILLIS = 120 * 1000; export const SOLIDITY_ERROR_STRING_PREFIX = '0x08c379a0'; @@ -167,9 +175,9 @@ export const DEFAULT_COMMAND_REPEAT_INTERVAL_IN_MILLS = 5000; // 5 seconds export const DEFAULT_COMMAND_DELAY_IN_MILLS = 60 * 1000; // 60 seconds export const COMMAND_RETRIES = { - SUBMIT_COMMIT: 3, - SUBMIT_UPDATE_COMMIT: 3, - SUBMIT_PROOFS: 3, + SUBMIT_COMMIT: 0, + SUBMIT_UPDATE_COMMIT: 0, + SUBMIT_PROOFS: 0, }; export const WEBSOCKET_PROVIDER_OPTIONS = { diff --git a/src/migration/telemetry-module-user-configuration-migration.js b/src/migration/telemetry-module-user-configuration-migration.js new file mode 100644 index 0000000000..a49cc393af --- /dev/null +++ b/src/migration/telemetry-module-user-configuration-migration.js @@ -0,0 +1,45 @@ +import appRootPath from 'app-root-path'; +import path from 'path'; +import BaseMigration from './base-migration.js'; + +class TelemetryModuleUserConfigurationMigration extends BaseMigration { + async executeMigration() { + const configurationFolderPath = path.join(appRootPath.path, '..'); + const configurationFilePath = path.join( + configurationFolderPath, + this.config.configFilename, + ); + + const userConfiguration = await this.fileService.readFile(configurationFilePath, true); + + let newTelemetryConfig; + + if ('telemetry' in userConfiguration) { + const oldConfigTelemetry = userConfiguration.telemetry; + newTelemetryConfig = { + enabled: oldConfigTelemetry.enabled, + implementation: { + 'ot-telemetry': { + enabled: oldConfigTelemetry.enabled, + package: './telemetry/implementation/ot-telemetry.js', + config: { + sendTelemetryData: oldConfigTelemetry.sendTelemetryData, + signalingServerUrl: oldConfigTelemetry.signalingServerUrl, + }, + }, + }, + }; + + delete userConfiguration.telemetry; + userConfiguration.modules.telemetry = newTelemetryConfig; + + await this.fileService.writeContentsToFile( + configurationFolderPath, + this.config.configFilename, + JSON.stringify(userConfiguration, null, 4), + ); + } + } +} + +export default TelemetryModuleUserConfigurationMigration; diff --git a/src/modules/base-module-manager.js b/src/modules/base-module-manager.js index e96161999c..2bdc901d6a 100644 --- a/src/modules/base-module-manager.js +++ b/src/modules/base-module-manager.js @@ -91,7 +91,7 @@ class BaseModuleManager { delete this.handlers[name]; } - getModuleConfiguration(name) { + getModuleConfiguration(name = null) { return this.getImplementation(name).config; } } diff --git a/src/modules/blockchain/implementation/web3-service.js b/src/modules/blockchain/implementation/web3-service.js index 49632f426f..63545eedf2 100644 --- a/src/modules/blockchain/implementation/web3-service.js +++ b/src/modules/blockchain/implementation/web3-service.js @@ -15,6 +15,10 @@ import { TRANSACTION_POLLING_TIMEOUT_MILLIS, TRANSACTION_CONFIRMATIONS, BLOCK_TIME_MILLIS, + WS_RPC_PROVIDER_PRIORITY, + HTTP_RPC_PROVIDER_PRIORITY, + FALLBACK_PROVIDER_QUORUM, + RPC_PROVIDER_STALL_TIMEOUT, } from '../../../constants/constants.js'; const require = createRequire(import.meta.url); @@ -53,11 +57,11 @@ class Web3Service { this.config = config; this.logger = logger; - this.rpcNumber = 0; this.initializeTransactionQueue(TRANSACTION_QUEUE_CONCURRENCY); await this.initializeWeb3(); this.startBlock = await this.getBlockNumber(); await this.initializeContracts(); + // this.initializeProviderDebugging(); } initializeTransactionQueue(concurrency) { @@ -89,41 +93,52 @@ class Web3Service { } async initializeWeb3() { - let tries = 0; - let isRpcConnected = false; - while (!isRpcConnected) { - if (tries >= this.config.rpcEndpoints.length) { - throw Error('RPC initialization failed'); - } + const providers = []; + for (const rpcEndpoint of this.config.rpcEndpoints) { + const isWebSocket = rpcEndpoint.startsWith('ws'); + const Provider = isWebSocket + ? ethers.providers.WebSocketProvider + : ethers.providers.JsonRpcProvider; + const priority = isWebSocket ? WS_RPC_PROVIDER_PRIORITY : HTTP_RPC_PROVIDER_PRIORITY; try { - if (this.config.rpcEndpoints[this.rpcNumber].startsWith('ws')) { - this.provider = new ethers.providers.WebSocketProvider( - this.config.rpcEndpoints[this.rpcNumber], - ); - } else { - this.provider = new ethers.providers.JsonRpcProvider( - this.config.rpcEndpoints[this.rpcNumber], - ); - } + const provider = new Provider(rpcEndpoint); // eslint-disable-next-line no-await-in-loop - await this.providerReady(); - isRpcConnected = true; + await provider.getNetwork(); + + providers.push({ + provider, + priority, + weight: 1, + stallTimeout: RPC_PROVIDER_STALL_TIMEOUT, + }); + + this.logger.debug(`Connected to the blockchain RPC: ${rpcEndpoint}.`); } catch (e) { - this.logger.warn( - `Unable to connect to blockchain rpc : ${ - this.config.rpcEndpoints[this.rpcNumber] - }.`, - ); - tries += 1; - this.rpcNumber = (this.rpcNumber + 1) % this.config.rpcEndpoints.length; + this.logger.warn(`Unable to connect to the blockchain RPC: ${rpcEndpoint}.`); } } + try { + this.provider = new ethers.providers.FallbackProvider( + providers, + FALLBACK_PROVIDER_QUORUM, + ); + + // eslint-disable-next-line no-await-in-loop + await this.providerReady(); + } catch (e) { + throw Error( + `RPC Fallback Provider initialization failed. Fallback Provider quorum: ${FALLBACK_PROVIDER_QUORUM}. Error: ${e.message}.`, + ); + } + this.wallet = new ethers.Wallet(this.getPrivateKey(), this.provider); } async initializeContracts() { + this.contractAddresses = {}; + this.logger.info( `Initializing contracts with hub contract address: ${this.config.hubContractAddress}`, ); @@ -132,6 +147,7 @@ class Web3Service { ABIs.Hub, this.wallet, ); + this.contractAddresses[this.config.hubContractAddress] = this.HubContract; const contractsArray = await this.callContractFunction( this.HubContract, @@ -164,19 +180,59 @@ class Web3Service { }); this.logger.info(`Contracts initialized`); - this.logger.debug( - `Connected to blockchain rpc : ${this.config.rpcEndpoints[this.rpcNumber]}.`, - ); await this.logBalances(); } + initializeProviderDebugging() { + this.provider.on('debug', (info) => { + const { method } = info.request; + + if (['call', 'estimateGas'].includes(method)) { + const contractInstance = this.contractAddresses[info.request.params.to]; + const inputData = info.request.params.data; + const decodedInputData = this._decodeInputData( + inputData, + contractInstance.interface, + ); + + if (info.backend.error) { + const decodedErrorData = this._decodeErrorData( + info.backend.error, + contractInstance.interface, + ); + this.logger.debug( + `${decodedInputData} ${method} has failed; Error: ${decodedErrorData}; ` + + `RPC: ${info.backend.provider.connection.url}.`, + ); + } else if (info.backend.result !== undefined) { + let message = `${decodedInputData} ${method} has been successfully executed; `; + + if (info.backend.result !== null) { + const decodedResultData = this._decodeResultData( + inputData.slice(0, 10), + info.backend.result, + contractInstance.interface, + ); + message += `Result: ${decodedResultData} `; + } + + message += `RPC: ${info.backend.provider.connection.url}.`; + + this.logger.debug(message); + } + } + }); + } + initializeAssetStorageContract(assetStorageAddress) { this.assetStorageContracts[assetStorageAddress.toLowerCase()] = new ethers.Contract( assetStorageAddress, ABIs.ContentAssetStorage, this.wallet, ); + this.contractAddresses[assetStorageAddress] = + this.assetStorageContracts[assetStorageAddress.toLowerCase()]; } initializeScoringContract(id, contractAddress) { @@ -188,6 +244,7 @@ class Web3Service { ABIs[contractName], this.wallet, ); + this.contractAddresses[contractAddress] = this.scoringFunctionsContracts[id]; } else { this.logger.trace( `Skipping initialisation of contract with id: ${id}, address: ${contractAddress}`, @@ -202,6 +259,7 @@ class Web3Service { ABIs[contractName], this.wallet, ); + this.contractAddresses[contractAddress] = this[`${contractName}Contract`]; } else { this.logger.trace( `Skipping initialisation of contract: ${contractName}, address: ${contractAddress}`, @@ -336,8 +394,12 @@ class Web3Service { // eslint-disable-next-line no-await-in-loop result = await contractInstance[functionName](...args); } catch (error) { + const decodedErrorData = this._decodeErrorData(error, contractInstance.interface); // eslint-disable-next-line no-await-in-loop - await this.handleError(error, functionName); + await this.handleError( + Error(`Call failed, reason: ${decodedErrorData}`), + functionName, + ); } } @@ -356,19 +418,19 @@ class Web3Service { /* eslint-disable no-await-in-loop */ gasLimit = await contractInstance.estimateGas[functionName](...args); } catch (error) { - const decodedReturnData = this._decodeReturnData(error, contractInstance.interface); + const decodedErrorData = this._decodeErrorData(error, contractInstance.interface); await this.handleError( - Error(`gas estimation failed, reason: ${decodedReturnData}`), + Error(`Gas estimation failed, reason: ${decodedErrorData}`), functionName, ); } gasLimit = gasLimit ?? this.convertToWei(900, 'kwei'); - this.logger.info( - 'Sending signed transaction to blockchain, calling method: ' + - `${functionName} with gas limit: ${gasLimit.toString()} and gasPrice ${gasPrice.toString()}. ` + - `Transaction queue length: ${this.getTransactionQueueLength()}`, + this.logger.debug( + `Sending signed transaction ${functionName} to the blockchain ` + + `with gas limit: ${gasLimit.toString()} and gasPrice ${gasPrice.toString()}. ` + + `Transaction queue length: ${this.getTransactionQueueLength()}.`, ); try { @@ -386,23 +448,17 @@ class Web3Service { await this.provider.call(tx, tx.blockNumber); } } catch (error) { - const decodedReturnData = this._decodeReturnData(error, contractInstance.interface); - this.logger.warn( - `Failed executing smart contract function ${functionName}. Error: ${decodedReturnData}`, - ); + const decodedErrorData = this._decodeErrorData(error, contractInstance.interface); if ( !transactionRetried && (error.message.includes(`timeout exceeded`) || error.message.includes(`Pool(TooLowPriority`)) ) { gasPrice = Math.ceil(gasPrice * 1.2); - this.logger.warn( - `Retrying to execute smart contract function ${functionName} with gasPrice: ${gasPrice}`, - ); transactionRetried = true; } else { await this.handleError( - Error(`transaction reverted, reason: ${decodedReturnData}`), + Error(`Transaction reverted, reason: ${decodedErrorData}`), functionName, ); } @@ -411,7 +467,7 @@ class Web3Service { return result; } - _getReturnData(error) { + _getErrorData(error) { let nestedError = error; while (nestedError && nestedError.error) { nestedError = nestedError.error; @@ -435,23 +491,31 @@ class Web3Service { return returnData; } - _decodeReturnData(evmError, contractInterface) { - let returnData; + _decodeInputData(inputData, contractInterface) { + if (inputData === ZERO_PREFIX) { + return 'Empty input data.'; + } + + return contractInterface.decodeFunctionData(inputData.slice(0, 10), inputData); + } + + _decodeErrorData(evmError, contractInterface) { + let errorData; try { - returnData = this._getReturnData(evmError); + errorData = this._getErrorData(evmError); } catch (error) { return error.message; } // Handle empty error data - if (returnData === ZERO_PREFIX) { + if (errorData === ZERO_PREFIX) { return 'Empty error data.'; } // Handle standard solidity string error - if (returnData.startsWith(SOLIDITY_ERROR_STRING_PREFIX)) { - const encodedReason = returnData.slice(SOLIDITY_ERROR_STRING_PREFIX.length); + if (errorData.startsWith(SOLIDITY_ERROR_STRING_PREFIX)) { + const encodedReason = errorData.slice(SOLIDITY_ERROR_STRING_PREFIX.length); try { return ethers.utils.defaultAbiCoder.decode(['string'], `0x${encodedReason}`)[0]; } catch (error) { @@ -460,8 +524,8 @@ class Web3Service { } // Handle solidity panic code - if (returnData.startsWith(SOLIDITY_PANIC_CODE_PREFIX)) { - const encodedReason = returnData.slice(SOLIDITY_PANIC_CODE_PREFIX.length); + if (errorData.startsWith(SOLIDITY_PANIC_CODE_PREFIX)) { + const encodedReason = errorData.slice(SOLIDITY_PANIC_CODE_PREFIX.length); let code; try { [code] = ethers.utils.defaultAbiCoder.decode(['uint256'], `0x${encodedReason}`); @@ -474,7 +538,7 @@ class Web3Service { // Try parsing a custom error using the contract ABI try { - const decodedCustomError = contractInterface.parseError(returnData); + const decodedCustomError = contractInterface.parseError(errorData); const formattedArgs = decodedCustomError.errorFragment.inputs .map((input, i) => { const argName = input.name; @@ -488,6 +552,14 @@ class Web3Service { } } + _decodeResultData(fragment, resultData, contractInterface) { + if (resultData === ZERO_PREFIX) { + return 'Empty input data.'; + } + + return contractInterface.decodeFunctionResult(fragment, resultData); + } + _formatCustomErrorArgument(value) { if (value === null || value === undefined) { return 'null'; @@ -925,12 +997,6 @@ class Web3Service { } async restartService() { - this.rpcNumber = (this.rpcNumber + 1) % this.config.rpcEndpoints.length; - this.logger.warn( - `There was an issue with current blockchain rpc. Connecting to ${ - this.config.rpcEndpoints[this.rpcNumber] - }`, - ); await this.initializeWeb3(); await this.initializeContracts(); } @@ -942,9 +1008,7 @@ class Web3Service { } catch (rpcError) { isRpcError = true; this.logger.warn( - `Unable to execute smart contract function ${functionName} using blockchain rpc : ${ - this.config.rpcEndpoints[this.rpcNumber] - }.`, + `Unable to execute smart contract function ${functionName} using Fallback RPC Provider.`, ); await this.restartService(); } diff --git a/src/modules/module-config-validation.js b/src/modules/module-config-validation.js index 4eaaf02b42..fc2cd00619 100644 --- a/src/modules/module-config-validation.js +++ b/src/modules/module-config-validation.js @@ -8,9 +8,12 @@ class ModuleConfigValidation { validateModule(name, config) { this.validateRequiredModule(name, config); - const capitalizedName = name.charAt(0).toUpperCase() + name.slice(1); - this[`validate${capitalizedName}`](config); + if (typeof this[`validate${capitalizedName}`] === 'function') { + this[`validate${capitalizedName}`](config); + } else { + throw Error(`Missing validation for ${capitalizedName}`); + } } validateAutoUpdater() { @@ -68,6 +71,10 @@ class ModuleConfigValidation { this.logger.warn(message); } } + + validateTelemetry() { + return true; + } } export default ModuleConfigValidation; diff --git a/src/modules/telemetry/implementation/ot-telemetry.js b/src/modules/telemetry/implementation/ot-telemetry.js new file mode 100644 index 0000000000..c4460c7572 --- /dev/null +++ b/src/modules/telemetry/implementation/ot-telemetry.js @@ -0,0 +1,29 @@ +import axios from 'axios'; + +class OTTelemetry { + async initialize(config, logger) { + this.config = config; + this.logger = logger; + } + + listenOnEvents(eventEmitter, onEventReceived) { + return eventEmitter.on('operation_status_changed', onEventReceived); + } + + async sendTelemetryData(nodeData, events) { + const signalingMessage = { nodeData, events }; + const config = { + method: 'post', + url: this.config.signalingServerUrl, + headers: { + 'Content-Type': 'application/json', + }, + data: JSON.stringify(signalingMessage), + }; + const response = await axios(config); + const isSuccess = response.status === 200; + return isSuccess; + } +} + +export default OTTelemetry; diff --git a/src/modules/telemetry/telemetry-module-manager.js b/src/modules/telemetry/telemetry-module-manager.js new file mode 100644 index 0000000000..f5ef04377e --- /dev/null +++ b/src/modules/telemetry/telemetry-module-manager.js @@ -0,0 +1,29 @@ +import BaseModuleManager from '../base-module-manager.js'; + +class TelemetryModuleManager extends BaseModuleManager { + constructor(ctx) { + super(ctx); + this.eventEmitter = ctx.eventEmitter; + } + + getName() { + return 'telemetry'; + } + + listenOnEvents(onEventReceived) { + if (this.config.modules.telemetry.enabled && this.initialized) { + return this.getImplementation().module.listenOnEvents( + this.eventEmitter, + onEventReceived, + ); + } + } + + async sendTelemetryData(nodeData, events) { + if (this.initialized) { + return this.getImplementation().module.sendTelemetryData(nodeData, events); + } + } +} + +export default TelemetryModuleManager; diff --git a/src/service/telemetry-injection-service.js b/src/service/telemetry-injection-service.js deleted file mode 100644 index be59e6ca4f..0000000000 --- a/src/service/telemetry-injection-service.js +++ /dev/null @@ -1,37 +0,0 @@ -class TelemetryInjectionService { - constructor(ctx) { - this.logger = ctx.logger; - this.config = ctx.config; - this.eventEmitter = ctx.eventEmitter; - this.repositoryModuleManager = ctx.repositoryModuleManager; - } - - initialize() { - this.listenOnEvents(); - } - - listenOnEvents() { - this.eventEmitter.on('operation_status_changed', (eventData) => { - this.repositoryModuleManager.createEventRecord( - eventData.operationId, - eventData.lastEvent, - eventData.timestamp, - eventData.value1, - eventData.value2, - eventData.value3, - ); - }); - } - - async getUnpublishedEvents() { - return this.repositoryModuleManager.getUnpublishedEvents(); - } - - async removePublishedEvents(events) { - const ids = events.map((event) => event.id); - - await this.repositoryModuleManager.destroyEvents(ids); - } -} - -export default TelemetryInjectionService; diff --git a/test/modules/telemetry/config.json b/test/modules/telemetry/config.json new file mode 100644 index 0000000000..66c5eddea5 --- /dev/null +++ b/test/modules/telemetry/config.json @@ -0,0 +1,17 @@ +{ + "modules": { + "telemetry": { + "enabled": true, + "implementation": { + "ot-telemetry": { + "enabled": true, + "package": "./telemetry/implementation/ot-telemetry.js", + "config": { + "sendTelemetryData": false, + "signalingServerUrl": "null" + } + } + } + } + } +} diff --git a/test/modules/telemetry/telemetry.js b/test/modules/telemetry/telemetry.js new file mode 100644 index 0000000000..6a7c037fa7 --- /dev/null +++ b/test/modules/telemetry/telemetry.js @@ -0,0 +1,51 @@ +import { readFile } from 'fs/promises'; +import { describe, it, before } from 'mocha'; +import { expect, assert } from 'chai'; +import Logger from '../../../src/logger/logger.js'; +import TelemetryModuleManager from '../../../src/modules/telemetry/telemetry-module-manager.js'; + +let logger; +let telemetryModuleManager; +const config = JSON.parse(await readFile('./test/modules/telemetry/config.json')); + +describe('Telemetry module', () => { + before('Initialize logger', () => { + logger = new Logger('trace'); + logger.info = () => {}; + }); + + describe('Handle received events', () => { + it('should call onEventReceived when event is emitted', async () => { + const eventEmitter = { + eventListeners: {}, + + on(eventName, callback) { + if (!this.eventListeners[eventName]) { + this.eventListeners[eventName] = []; + } + this.eventListeners[eventName].push(callback); + }, + + emit(eventName, ...args) { + if (this.eventListeners[eventName]) { + this.eventListeners[eventName].forEach((callback) => callback(...args)); + } + }, + }; + + let callbackCalled = false; + + function onEventReceived() { + callbackCalled = true; + } + + telemetryModuleManager = new TelemetryModuleManager({ config, logger, eventEmitter }); + await telemetryModuleManager.initialize(); + telemetryModuleManager.listenOnEvents(onEventReceived); + + eventEmitter.emit('operation_status_changed'); + + assert(expect(callbackCalled).to.be.true); + }); + }); +});