From 6dcabac0d960ca4d6855306e8d00e9af401f82c5 Mon Sep 17 00:00:00 2001 From: brkagithub Date: Mon, 16 Dec 2024 17:15:13 +0100 Subject: [PATCH] paranet sync rework (wip) --- src/commands/paranet/paranet-sync-command.js | 334 +++++++++--------- .../20241215122200-create-paranet-assets.js | 65 ++++ .../sequelize/models/missed-paranet-asset.js | 2 + .../sequelize/models/paranet-asset.js | 84 +++++ .../sequelize/models/paranet-synced-asset.js | 1 + .../repositories/paranet-asset-repository.js | 139 ++++++++ .../paranet-synced-asset-repository.js | 1 + .../repository/repository-module-manager.js | 68 ++-- 8 files changed, 503 insertions(+), 191 deletions(-) create mode 100644 src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js create mode 100644 src/modules/repository/implementation/sequelize/models/paranet-asset.js create mode 100644 src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js diff --git a/src/commands/paranet/paranet-sync-command.js b/src/commands/paranet/paranet-sync-command.js index 07d619de47..6b86b17094 100644 --- a/src/commands/paranet/paranet-sync-command.js +++ b/src/commands/paranet/paranet-sync-command.js @@ -11,10 +11,10 @@ import { PARANET_SYNC_RETRY_DELAY_MS, OPERATION_STATUS, PARANET_NODES_ACCESS_POLICIES, - PARANET_SYNC_SOURCES, - TRIPLE_STORE_REPOSITORIES, - LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS, - LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY, + // PARANET_SYNC_SOURCES, + // TRIPLE_STORE_REPOSITORIES, + // LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS, + // LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY, } from '../../constants/constants.js'; class ParanetSyncCommand extends Command { @@ -38,7 +38,7 @@ class ParanetSyncCommand extends Command { PARANET_NODES_ACCESS_POLICIES[paranetMetadata.nodesAccessPolicy]; this.logger.info( - `Paranet sync: Starting paranet sync for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}`, + `Paranet sync: Starting paranet sync for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}, access policy ${paranetNodesAccessPolicy}`, ); // Fetch counts from blockchain and database @@ -50,20 +50,12 @@ class ParanetSyncCommand extends Command { ).toNumber(); const syncedAssetsCount = - await this.repositoryModuleManager.getParanetSyncedAssetRecordsCountByDataSource( - paranetUAL, - PARANET_SYNC_SOURCES.SYNC, - ); - const localStoredAssetsCount = - await this.repositoryModuleManager.getParanetSyncedAssetRecordsCountByDataSource( - paranetUAL, - PARANET_SYNC_SOURCES.LOCAL_STORE, - ); + await this.repositoryModuleManager.getParanetSyncedAssetRecordsCount(paranetUAL); const totalMissedAssetsCount = await this.repositoryModuleManager.getCountOfMissedAssetsOfParanet(paranetUAL); const missedAssetsCount = - await this.repositoryModuleManager.getFilteredCountOfMissedAssetsOfParanet( + await this.repositoryModuleManager.getMissedParanetAssetsRecordsWithRetryCount( paranetUAL, PARANET_SYNC_RETRIES_LIMIT, PARANET_SYNC_RETRY_DELAY_MS, @@ -72,7 +64,7 @@ class ParanetSyncCommand extends Command { const paranetRepository = this.paranetService.getParanetRepositoryName(paranetUAL); this.logger.info( - `Paranet sync: Paranet: ${paranetUAL} (${paranetId}) Total count of Paranet KAs in the contract: ${contractKaCount}; Synced KAs count: ${syncedAssetsCount}; Local Stored KAs count: ${localStoredAssetsCount}; Total count of missed KAs: ${totalMissedAssetsCount}`, + `Paranet sync: Paranet: ${paranetUAL} (${paranetId}) Total count of Paranet KAs in the contract: ${contractKaCount}; Synced KAs count: ${syncedAssetsCount}; Total count of missed KAs: ${totalMissedAssetsCount}`, ); // First, attempt to sync missed KAs if any exist @@ -87,7 +79,7 @@ class ParanetSyncCommand extends Command { OPERATION_ID_STATUS.PARANET.PARANET_SYNC_MISSED_KAS_SYNC_START, ); - const [successulMissedSyncsCount, failedMissedSyncsCount] = await this.syncMissedKAs( + const [successfulMissedSyncsCount, failedMissedSyncsCount] = await this.syncMissedKAs( paranetUAL, paranetId, paranetNodesAccessPolicy, @@ -96,7 +88,7 @@ class ParanetSyncCommand extends Command { ); this.logger.info( - `Paranet sync: Successful missed assets syncs: ${successulMissedSyncsCount}; ` + + `Paranet sync: Successful missed assets syncs: ${successfulMissedSyncsCount}; ` + `Failed missed assets syncs: ${failedMissedSyncsCount} for paranet: ${paranetUAL} ` + `(${paranetId}), operation ID: ${operationId}!`, ); @@ -105,17 +97,16 @@ class ParanetSyncCommand extends Command { operationId, blockchain, OPERATION_ID_STATUS.PARANET.PARANET_SYNC_MISSED_KAS_SYNC_END, - successulMissedSyncsCount, + successfulMissedSyncsCount, failedMissedSyncsCount, ); } // Then, check for new KAs on the blockchain - if (syncedAssetsCount + localStoredAssetsCount + totalMissedAssetsCount < contractKaCount) { + if (syncedAssetsCount + totalMissedAssetsCount < contractKaCount) { this.logger.info( `Paranet sync: Syncing ${ - contractKaCount - - (syncedAssetsCount + localStoredAssetsCount + totalMissedAssetsCount) + contractKaCount - (syncedAssetsCount + totalMissedAssetsCount) } new assets for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}`, ); @@ -131,9 +122,9 @@ class ParanetSyncCommand extends Command { contractKaCount, paranetUAL, paranetId, - paranetNodesAccessPolicy, - paranetRepository, - operationId, + // paranetNodesAccessPolicy, + // paranetRepository, + // operationId, ); this.logger.info( @@ -204,27 +195,53 @@ class ParanetSyncCommand extends Command { this.logger.debug( `Paranet sync: Get for ${ual} with operation id ${getOperationId} initiated.`, ); - if (paranetNodesAccessPolicy === 'OPEN') { - await this.commandExecutor.add({ - name: 'networkGetCommand', - sequence: [], - delay: 0, - data: { - operationId: getOperationId, - id: ual, - blockchain, - contract, - tokenId, - state: assertionId, - assertionId, - paranetId, - paranetUAL, - }, - transactional: false, - }); - } else if (paranetNodesAccessPolicy === 'CURATED') { + + const maxAttempts = PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_MAX_ATTEMPTS; + const pollingInterval = PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_INTERVAL_MILLIS; + + let attempt = 0; + let getResult; + + await this.commandExecutor.add({ + name: 'localGetCommand', + sequence: [], + delay: 0, + data: { + operationId: getOperationId, + id: ual, + blockchain, + contract, + tokenId, + state: assertionId, + assertionId, + paranetId, + paranetUAL, + }, + transactional: false, + }); + + do { + await setTimeout(pollingInterval); + getResult = await this.operationIdService.getOperationIdRecord(getOperationId); + attempt += 1; + } while ( + attempt < maxAttempts && + getResult?.status !== OPERATION_ID_STATUS.FAILED && + getResult?.status !== OPERATION_ID_STATUS.COMPLETED + ); + + if (getResult?.status !== OPERATION_ID_STATUS.COMPLETED) { + this.logger.info( + `Local GET failed for tokenId: ${tokenId}, attempting network GET.`, + ); + + const networkCommandName = + paranetNodesAccessPolicy === 'OPEN' + ? 'networkGetCommand' + : 'curatedParanetNetworkGetCommand'; + await this.commandExecutor.add({ - name: 'curatedParanetNetworkGetCommand', + name: networkCommandName, sequence: [], delay: 0, data: { @@ -240,38 +257,31 @@ class ParanetSyncCommand extends Command { }, transactional: false, }); - } - await this.operationIdService.updateOperationIdStatus( - getOperationId, - blockchain, - OPERATION_ID_STATUS.GET.GET_INIT_END, - ); - - let attempt = 0; - let getResult; - do { - await setTimeout(PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_INTERVAL_MILLIS); - getResult = await this.operationIdService.getOperationIdRecord(getOperationId); - attempt += 1; - } while ( - attempt < PARANET_SYNC_PARAMETERS.GET_RESULT_POLLING_MAX_ATTEMPTS && - getResult?.status !== OPERATION_ID_STATUS.FAILED && - getResult?.status !== OPERATION_ID_STATUS.COMPLETED - ); + attempt = 0; + do { + await setTimeout(pollingInterval); + getResult = await this.operationIdService.getOperationIdRecord(getOperationId); + attempt += 1; + } while ( + attempt < maxAttempts && + getResult?.status !== OPERATION_ID_STATUS.FAILED && + getResult?.status !== OPERATION_ID_STATUS.COMPLETED + ); + } if (getResult?.status !== OPERATION_ID_STATUS.COMPLETED) { this.logger.warn( - `Paranet sync: Unable to sync tokenId: ${tokenId}, for contract: ${contract} state index: ${stateIndex} blockchain: ${blockchain}, GET result: ${JSON.stringify( + `Paranet sync: Unable to sync tokenId: ${tokenId}, for contract: ${contract}, state index: ${stateIndex}, blockchain: ${blockchain}, GET result: ${JSON.stringify( getResult, )}`, ); - await this.repositoryModuleManager.createMissedParanetAssetRecord({ - blockchainId: blockchain, + // TODO: if exists, increase retry count + await this.repositoryModuleManager.incrementRetriesForUalAndParanetUal( ual, - paranetUal: paranetUAL, - }); + paranetUAL, + ); return false; } @@ -281,53 +291,55 @@ class ParanetSyncCommand extends Command { `Paranet sync: ${data.assertion.length} nquads found for asset with ual: ${ual}, state index: ${stateIndex}, assertionId: ${assertionId}`, ); - let repository; - if (latestState) { - repository = paranetRepository; - } else if (paranetNodesAccessPolicy === 'OPEN') { - repository = TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY; - } else if (paranetNodesAccessPolicy === 'CURATED') { - repository = TRIPLE_STORE_REPOSITORIES.PRIVATE_HISTORY; - } else { - throw new Error('Unsupported access policy'); - } + const repository = paranetRepository; - await this.tripleStoreService.localStoreAsset( - repository, - assertionId, - data.assertion, - blockchain, - contract, - tokenId, - LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS, - LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY, - ); - if (paranetNodesAccessPolicy === 'CURATED' && data.privateAssertion) { - await this.tripleStoreService.localStoreAsset( - repository, - data.syncedAssetRecord.privateAssertionId, - data.privateAssertion, - blockchain, - contract, - tokenId, - ); + const assertions = [data.public, data.private]; + + const storePromises = []; + + for (const assertionData of assertions) { + if (assertionData?.assertion && assertionData?.assertionId) { + storePromises.push( + this.tripleStoreService.insertKnowledgeCollection( + repository, + ual, + assertionData.assertion, + ), + ); + } } - const privateAssertionId = - paranetNodesAccessPolicy === 'CURATED' - ? data.syncedAssetRecord?.privateAssertionId - : null; + + await Promise.all(storePromises); + + // this doesnt work for v8 + // await this.tripleStoreService.localStoreAsset( + // repository, + // assertionId, + // data.assertion, + // blockchain, + // contract, + // tokenId, + // LOCAL_INSERT_FOR_CURATED_PARANET_MAX_ATTEMPTS, + // LOCAL_INSERT_FOR_CURATED_PARANET_RETRY_DELAY, + // ); + // if (paranetNodesAccessPolicy === 'CURATED' && data.privateAssertion) { + // await this.tripleStoreService.localStoreAsset( + // repository, + // data.syncedAssetRecord.privateAssertionId, + // data.privateAssertion, + // blockchain, + // contract, + // tokenId, + // ); + // } + // const privateAssertionId = + // paranetNodesAccessPolicy === 'CURATED' + // ? data.syncedAssetRecord?.privateAssertionId + // : null; await this.repositoryModuleManager.incrementParanetKaCount(paranetId, blockchain); - await this.repositoryModuleManager.createParanetSyncedAssetRecord( - blockchain, - ual, - paranetUAL, - assertionId, - privateAssertionId, - data.syncedAssetRecord?.sender, - data.syncedAssetRecord?.transactionHash, - PARANET_SYNC_SOURCES.SYNC, - ); + + await this.repositoryModuleManager.updateAssetToBeSynced(ual, paranetUAL); return true; } catch (error) { @@ -335,11 +347,8 @@ class ParanetSyncCommand extends Command { `Paranet sync: Unable to sync tokenId: ${tokenId}, for contract: ${contract} state index: ${stateIndex} blockchain: ${blockchain}, error: ${error}`, ); - await this.repositoryModuleManager.createMissedParanetAssetRecord({ - blockchainId: blockchain, - ual, - paranetUal: paranetUAL, - }); + // TODO: probably dont need to do anything here and just leave it unsynced, maybe increase retry count + await this.repositoryModuleManager.incrementRetriesForUalAndParanetUal(ual, paranetUAL); return false; } @@ -355,7 +364,6 @@ class ParanetSyncCommand extends Command { paranetNodesAccessPolicy, paranetRepository, operationId, - removeMissingAssetRecord = false, ) { try { this.logger.info( @@ -388,20 +396,17 @@ class ParanetSyncCommand extends Command { )); } - if (isSuccessful && removeMissingAssetRecord) { - await this.repositoryModuleManager.removeMissedParanetAssetRecordsByUAL(ual); - } + // if (isSuccessful && removeMissingAssetRecord) { + // await this.repositoryModuleManager.syncKnowledgeAssetsByUAL(ual); + // } return isSuccessful; } catch (error) { this.logger.warn( `Paranet sync: Failed to sync asset: ${ual} for paranet: ${paranetId}, error: ${error}`, ); - await this.repositoryModuleManager.createMissedParanetAssetRecord({ - blockchain, - ual, - paranetUAL, - }); + // TODO: increase retry count + await this.repositoryModuleManager.incrementRetriesForUalAndParanetUal(ual, paranetUAL); return false; } @@ -446,7 +451,6 @@ class ParanetSyncCommand extends Command { paranetNodesAccessPolicy, paranetRepository, operationId, - true, // removeMissingAssetRecord ); }); @@ -468,13 +472,13 @@ class ParanetSyncCommand extends Command { contractKaCount, paranetUAL, paranetId, - paranetNodesAccessPolicy, - paranetRepository, - operationId, + // paranetNodesAccessPolicy, + // paranetRepository, + // operationId, ) { let i = Number(startIndex); - const results = []; + // const results = []; while (i <= contractKaCount) { const nextKaArray = await this.blockchainModuleManager.getParanetKnowledgeCollectionsWithPagination( @@ -490,7 +494,7 @@ class ParanetSyncCommand extends Command { i += nextKaArray.length; - const filteredKAs = []; + // const filteredKAs = []; // NOTE: This could also be processed in parallel if needed for (const knowledgeAssetId of nextKaArray) { const { knowledgeAssetStorageContract, tokenId: knowledgeAssetTokenId } = @@ -505,6 +509,7 @@ class ParanetSyncCommand extends Command { knowledgeAssetTokenId, ); + // TODO: can do these two queries in one and just get if asset exists in table const isAlreadySynced = await this.repositoryModuleManager.paranetSyncedAssetRecordExists(ual); @@ -513,46 +518,59 @@ class ParanetSyncCommand extends Command { continue; } + // TODO: can do these two queries in one and just get if asset exists in table const isMissedAsset = - await this.repositoryModuleManager.missedParanetAssetRecordExists(ual); + await this.repositoryModuleManager.missedParanetAssetRecordExists( + ual, + paranetUAL, + ); // Skip missed KAs as they are synced in the other function if (isMissedAsset) { continue; } - filteredKAs.push([ + await this.repositoryModuleManager.createParanetAssetRecord({ + blockchainId: blockchain, ual, - blockchain, - knowledgeAssetStorageContract, - knowledgeAssetTokenId, - ]); - } + paranetUal: paranetUAL, + }); - if (filteredKAs.length > 0) { - const promises = filteredKAs.map( - ([syncKAUal, syncKABlockchain, syncKAContract, syncKATokenId]) => - this.syncAsset( - syncKAUal, - syncKABlockchain, - syncKAContract, - syncKATokenId, - paranetUAL, - paranetId, - paranetNodesAccessPolicy, - paranetRepository, - operationId, - false, // removeMissingAssetRecord - ), - ); + // so instead of pushing to filtered KAs and syncing + // just add them to DB as missed and let the other loop catch them? - const batchResults = await Promise.all(promises); - results.push(...batchResults); + // filteredKAs.push([ + // ual, + // blockchain, + // knowledgeAssetStorageContract, + // knowledgeAssetTokenId, + // ]); } + + // if (filteredKAs.length > 0) { + // const promises = filteredKAs.map( + // ([syncKAUal, syncKABlockchain, syncKAContract, syncKATokenId]) => + // this.syncAsset( + // syncKAUal, + // syncKABlockchain, + // syncKAContract, + // syncKATokenId, + // paranetUAL, + // paranetId, + // paranetNodesAccessPolicy, + // paranetRepository, + // operationId, + // false, + // ), + // ); + + // const batchResults = await Promise.all(promises); + // results.push(...batchResults); + // } } - const successfulCount = results.filter(Boolean).length; - return [successfulCount, results.length - successfulCount]; + // const successfulCount = results.filter(Boolean).length; + // return [successfulCount, results.length - successfulCount]; } /** diff --git a/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js b/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js new file mode 100644 index 0000000000..8949803f8f --- /dev/null +++ b/src/modules/repository/implementation/sequelize/migrations/20241215122200-create-paranet-assets.js @@ -0,0 +1,65 @@ +export async function up({ context: { queryInterface, Sequelize } }) { + await queryInterface.createTable('paranet_asset', { + id: { + type: Sequelize.INTEGER, + primaryKey: true, + autoIncrement: true, + }, + blockchain_id: { + type: Sequelize.STRING, + allowNull: false, + }, + ual: { + type: Sequelize.STRING, + allowNull: false, + }, + paranet_ual: { + type: Sequelize.STRING, + allowNull: false, + }, + public_assertion_id: { + type: Sequelize.STRING, + allowNull: true, + }, + private_assertion_id: { + type: Sequelize.STRING, + allowNull: true, + }, + sender: { + type: Sequelize.STRING, + allowNull: true, + }, + transaction_hash: { + type: Sequelize.STRING, + allowNull: true, + }, + error_message: { + type: Sequelize.TEXT, + allowNull: true, + }, + is_synced: { + type: Sequelize.BOOLEAN, + allowNull: false, + defaultValue: false, + }, + retries: { + allowNull: false, + type: Sequelize.INTEGER, + defaultValue: 0, + }, + created_at: { + allowNull: false, + type: Sequelize.DATE, + defaultValue: Sequelize.literal('NOW()'), + }, + updated_at: { + allowNull: false, + type: Sequelize.DATE, + defaultValue: Sequelize.literal('NOW()'), + }, + }); +} + +export async function down({ context: { queryInterface } }) { + await queryInterface.dropTable('paranet_assets'); +} diff --git a/src/modules/repository/implementation/sequelize/models/missed-paranet-asset.js b/src/modules/repository/implementation/sequelize/models/missed-paranet-asset.js index c7788955df..e7e1f088d3 100644 --- a/src/modules/repository/implementation/sequelize/models/missed-paranet-asset.js +++ b/src/modules/repository/implementation/sequelize/models/missed-paranet-asset.js @@ -1,3 +1,5 @@ +// NOT USED ANYMORE + export default (sequelize, DataTypes) => { const blockchain = sequelize.define( 'missed_paranet_asset', diff --git a/src/modules/repository/implementation/sequelize/models/paranet-asset.js b/src/modules/repository/implementation/sequelize/models/paranet-asset.js new file mode 100644 index 0000000000..1ad30e3db7 --- /dev/null +++ b/src/modules/repository/implementation/sequelize/models/paranet-asset.js @@ -0,0 +1,84 @@ +export default (sequelize, DataTypes) => { + const paranetAsset = sequelize.define( + 'paranet_asset', + { + id: { + autoIncrement: true, + primaryKey: true, + type: DataTypes.INTEGER, + }, + blockchainId: { + allowNull: false, + type: DataTypes.STRING, + }, + ual: { + allowNull: false, + type: DataTypes.STRING, + }, + paranetUal: { + allowNull: false, + type: DataTypes.STRING, + }, + publicAssertionId: { + allowNull: true, + type: DataTypes.STRING, + }, + privateAssertionId: { + allowNull: true, + type: DataTypes.STRING, + }, + sender: { + allowNull: true, + type: DataTypes.STRING, + }, + transactionHash: { + allowNull: true, + type: DataTypes.STRING, + }, + errorMessage: { + allowNull: true, + type: DataTypes.TEXT, + }, + isSynced: { + allowNull: false, + type: DataTypes.BOOLEAN, + defaultValue: false, + }, + retries: { + allowNull: false, + type: DataTypes.INTEGER, + defaultValue: 0, + }, + createdAt: { + type: DataTypes.DATE, + }, + updatedAt: { + type: DataTypes.DATE, + }, + }, + { + underscored: true, + indexes: [ + { + unique: true, + fields: ['ual', 'paranetUal'], // Composite unique constraint on `ual` and `paranetUal` + }, + { + fields: ['updatedAt', 'retries', 'isSynced'], + }, + { + fields: ['ual', 'paranetUal'], + }, + { + fields: ['isSynced', 'paranetUal'], + }, + ], + }, + ); + + paranetAsset.associate = () => { + // Define associations here if needed + }; + + return paranetAsset; +}; diff --git a/src/modules/repository/implementation/sequelize/models/paranet-synced-asset.js b/src/modules/repository/implementation/sequelize/models/paranet-synced-asset.js index 97eb6c0a90..45fc8cad0b 100644 --- a/src/modules/repository/implementation/sequelize/models/paranet-synced-asset.js +++ b/src/modules/repository/implementation/sequelize/models/paranet-synced-asset.js @@ -1,5 +1,6 @@ import { PARANET_SYNC_SOURCES } from '../../../../../constants/constants.js'; +// NOT USED ANYMORE export default (sequelize, DataTypes) => { const blockchain = sequelize.define( 'paranet_synced_asset', diff --git a/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js b/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js new file mode 100644 index 0000000000..d57dba5682 --- /dev/null +++ b/src/modules/repository/implementation/sequelize/repositories/paranet-asset-repository.js @@ -0,0 +1,139 @@ +import Sequelize from 'sequelize'; + +class ParanetAssetRepository { + constructor(models) { + this.sequelize = models.sequelize; + this.model = models.paranet_asset; + } + + async createParanetAssetRecord(missedParanetAsset, options) { + return this.model.create({ ...missedParanetAsset, isSynced: false }, options); + } + + async getCountOfMissedAssetsOfParanet(paranetUal, options = {}) { + return this.model.count({ + where: { + paranetUal, + isSynced: false, + }, + ...options, + }); + } + + async getParanetSyncedAssetRecordsCount(paranetUal, options = {}) { + return this.model.count({ + where: { + paranet_ual: paranetUal, + isSynced: true, + }, + ...options, + }); + } + + // TODO: remove + // async getFilteredCountOfMissedAssetsOfParanet( + // paranetUal, + // retryCountLimit, + // retryDelayInMs, + // options = {}, + // ) { + // const now = new Date(); + // const delayDate = new Date(now.getTime() - retryDelayInMs); + + // const records = await this.model.findAll({ + // where: { + // paranetUal, + // isSynced: false, // Only unsynced assets + // retries: { + // [Sequelize.Op.lt]: retryCountLimit, // Filter by retries count + // }, + // created_at: { + // [Sequelize.Op.lte]: delayDate, // Filter by created_at date + // }, + // }, + // ...options, + // }); + + // return records.length; // Return the count of matching records + // } + + async getMissedParanetAssetsRecordsWithRetryCount( + paranetUal, + retryCountLimit, + retryDelayInMs, + limit = null, + options = {}, + ) { + const now = new Date(); + const delayDate = new Date(now.getTime() - retryDelayInMs); + + const queryOptions = { + where: { + paranetUal, + isSynced: false, + retries: { + [Sequelize.Op.lt]: retryCountLimit, + }, + updated_at: { + [Sequelize.Op.lte]: delayDate, + }, + }, + ...options, + }; + + if (limit !== null) { + queryOptions.limit = limit; + } + + return this.model.findAll(queryOptions); + } + + async missedParanetAssetRecordExists(ual, paranetUal, options = {}) { + const missedParanetAssetRecord = await this.model.findOne({ + where: { ual, isSynced: false }, + ...options, + }); + + return !!missedParanetAssetRecord; + } + + async paranetSyncedAssetRecordExists(ual, paranetUal, options = {}) { + const paranetSyncedAssetRecord = await this.model.getParanetSyncedAssetRecordByUAL( + ual, + paranetUal, + options, + ); + + return !!paranetSyncedAssetRecord; + } + + async updateAssetToBeSynced(ual, paranetUal, options = {}) { + const [affectedRows] = await this.model.update( + { isSynced: true }, + { + where: { + ual, + paranetUal, + }, + ...options, + }, + ); + + return affectedRows; + } + + async incrementRetriesForUalAndParanetUal(ual, paranetUal, options = {}) { + const [affectedRows] = await this.model.increment('retries', { + by: 1, + where: { + ual, + paranetUal, + }, + ...options, + }); + + return affectedRows; + } +} + +export default ParanetAssetRepository; diff --git a/src/modules/repository/implementation/sequelize/repositories/paranet-synced-asset-repository.js b/src/modules/repository/implementation/sequelize/repositories/paranet-synced-asset-repository.js index c9cde1c422..ce00ecb414 100644 --- a/src/modules/repository/implementation/sequelize/repositories/paranet-synced-asset-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/paranet-synced-asset-repository.js @@ -1,3 +1,4 @@ +// DEPRECATED class ParanetSyncedAssetRepository { constructor(models) { this.sequelize = models.sequelize; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 92f4793043..55be3e8986 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -373,8 +373,8 @@ class RepositoryModuleManager extends BaseModuleManager { ); } - async createMissedParanetAssetRecord(missedParanetAssset, options = {}) { - return this.getRepository('missed_paranet_asset').createMissedParanetAssetRecord( + async createParanetAssetRecord(missedParanetAssset, options = {}) { + return this.getRepository('paranet_asset').createParanetAssetRecord( missedParanetAssset, options, ); @@ -388,8 +388,13 @@ class RepositoryModuleManager extends BaseModuleManager { } async missedParanetAssetRecordExists(ual, options = {}) { - return this.getRepository('missed_paranet_asset').missedParanetAssetRecordExists( + return this.getRepository('paranet_asset').missedParanetAssetRecordExists(ual, options); + } + + async incrementRetriesForUalAndParanetUal(ual, paranetUal, options = {}) { + return this.getRepository('paranet_asset').incrementRetriesForUalAndParanetUal( ual, + paranetUal, options, ); } @@ -408,9 +413,7 @@ class RepositoryModuleManager extends BaseModuleManager { limit = null, options = {}, ) { - return this.getRepository( - 'missed_paranet_asset', - ).getMissedParanetAssetsRecordsWithRetryCount( + return this.getRepository('paranet_asset').getMissedParanetAssetsRecordsWithRetryCount( paranetUal, retryCountLimit, retryDelayInMs, @@ -419,26 +422,27 @@ class RepositoryModuleManager extends BaseModuleManager { ); } - async getCountOfMissedAssetsOfParanet(ual, options = {}) { - return this.getRepository('missed_paranet_asset').getCountOfMissedAssetsOfParanet( - ual, + async getCountOfMissedAssetsOfParanet(paranetUal, options = {}) { + return this.getRepository('paranet_asset').getCountOfMissedAssetsOfParanet( + paranetUal, options, ); } - async getFilteredCountOfMissedAssetsOfParanet( - ual, - retryCountLimit, - retryDelayInMs, - options = {}, - ) { - return this.getRepository('missed_paranet_asset').getFilteredCountOfMissedAssetsOfParanet( - ual, - retryCountLimit, - retryDelayInMs, - options, - ); - } + // TODO: remove + // async getFilteredCountOfMissedAssetsOfParanet( + // ual, + // retryCountLimit, + // retryDelayInMs, + // options = {}, + // ) { + // return this.getRepository('paranet_asset').getFilteredCountOfMissedAssetsOfParanet( + // ual, + // retryCountLimit, + // retryDelayInMs, + // options, + // ); + // } async getParanetsBlockchains(options = {}) { return this.getRepository('paranet').getParanetsBlockchains(options); @@ -455,7 +459,7 @@ class RepositoryModuleManager extends BaseModuleManager { dataSource, options = {}, ) { - return this.getRepository('paranet_synced_asset').createParanetSyncedAssetRecord( + return this.getRepository('paranet_asset').createParanetSyncedAssetRecord( blockchainId, ual, paranetUal, @@ -469,23 +473,21 @@ class RepositoryModuleManager extends BaseModuleManager { } async getParanetSyncedAssetRecordByUAL(ual, options = {}) { - return this.getRepository('paranet_synced_asset').getParanetSyncedAssetRecordByUAL( - ual, - options, - ); + return this.getRepository('paranet_asset').getParanetSyncedAssetRecordByUAL(ual, options); } - async getParanetSyncedAssetRecordsCountByDataSource(paranetUal, dataSource, options = {}) { + async getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options = {}) { return this.getRepository( 'paranet_synced_asset', - ).getParanetSyncedAssetRecordsCountByDataSource(paranetUal, dataSource, options); + ).getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options); } async paranetSyncedAssetRecordExists(ual, options = {}) { - return this.getRepository('paranet_synced_asset').paranetSyncedAssetRecordExists( - ual, - options, - ); + return this.getRepository('paranet_asset').paranetSyncedAssetRecordExists(ual, options); + } + + async updateAssetToBeSynced(ual, asset, options = {}) { + return this.getRepository('paranet_asset').updateAssetToBeSynced(ual, asset, options); } async incrementParanetKaCount(paranetId, blockchainId, options = {}) {