Skip to content

Commit

Permalink
wip (missing retry column)
Browse files Browse the repository at this point in the history
  • Loading branch information
brkagithub committed Dec 16, 2024
1 parent 64664ab commit e73f648
Show file tree
Hide file tree
Showing 8 changed files with 453 additions and 170 deletions.
341 changes: 190 additions & 151 deletions src/commands/paranet/paranet-sync-command.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
export async function up({ context: { queryInterface, Sequelize } }) {
await queryInterface.createTable('paranet_assets', {
id: {
type: Sequelize.INTEGER,
primaryKey: true,
autoIncrement: true,
},
blockchain_id: {
type: Sequelize.STRING,
allowNull: false,
},
ual: {
type: Sequelize.STRING,
allowNull: false,
},
paranet_ual: {
type: Sequelize.STRING,
allowNull: false,
},
public_assertion_id: {
type: Sequelize.STRING,
allowNull: true,
},
private_assertion_id: {
type: Sequelize.STRING,
allowNull: true,
},
sender: {
type: Sequelize.STRING,
allowNull: true,
},
transaction_hash: {
type: Sequelize.STRING,
allowNull: true,
},
error_message: {
type: Sequelize.TEXT,
allowNull: true,
},
is_synced: {
type: Sequelize.BOOLEAN,
allowNull: false,
defaultValue: false,
},
created_at: {
allowNull: false,
type: Sequelize.DATE,
defaultValue: Sequelize.literal('NOW()'),
},
updated_at: {
allowNull: false,
type: Sequelize.DATE,
defaultValue: Sequelize.literal('NOW()'),
},
});
}

export async function down({ context: { queryInterface } }) {
await queryInterface.dropTable('paranet_assets');
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// NOT USED ANYMORE

export default (sequelize, DataTypes) => {
const blockchain = sequelize.define(
'missed_paranet_asset',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
export default (sequelize, DataTypes) => {
const paranetAsset = sequelize.define(
'paranet_asset',
{
id: {
autoIncrement: true,
primaryKey: true,
type: DataTypes.INTEGER,
},
blockchainId: {
allowNull: false,
type: DataTypes.STRING,
},
ual: {
allowNull: false,
type: DataTypes.STRING,
},
paranetUal: {
allowNull: false,
type: DataTypes.STRING,
},
publicAssertionId: {
allowNull: true,
type: DataTypes.STRING,
},
privateAssertionId: {
allowNull: true,
type: DataTypes.STRING,
},
sender: {
allowNull: true,
type: DataTypes.STRING,
},
transactionHash: {
allowNull: true,
type: DataTypes.STRING,
},
errorMessage: {
allowNull: true,
type: DataTypes.TEXT,
},
isSynced: {
allowNull: false,
type: DataTypes.BOOLEAN,
defaultValue: false,
},
createdAt: {
type: DataTypes.DATE,
},
updatedAt: {
type: DataTypes.DATE,
},
},
{ underscored: true },
);

paranetAsset.associate = () => {
// Define associations here if needed
};

return paranetAsset;
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { PARANET_SYNC_SOURCES } from '../../../../../constants/constants.js';

// NOT USED ANYMORE
export default (sequelize, DataTypes) => {
const blockchain = sequelize.define(
'paranet_synced_asset',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import Sequelize from 'sequelize';

class ParanetAssetRepository {
constructor(models) {
this.sequelize = models.sequelize;
this.model = models.paranet_asset;
}

async createMissedParanetAssetRecord(missedParanetAsset, options) {
return this.model.create({ ...missedParanetAsset, isSynced: false }, options);
}

async createParanetSyncedAssetRecord(
blockchainId,
ual,
paranetUal,
publicAssertionId,
privateAssertionId,
sender,
transactionHash,
options,
) {
return this.model.create(
{
blockchainId,
ual,
paranetUal,
publicAssertionId,
privateAssertionId,
sender,
transactionHash,
isSynced: true,
},
options,
);
}

async getCountOfMissedAssetsOfParanet(paranetUal, options = {}) {
return this.model.count({
where: {
paranetUal,
isSynced: false,
},
...options,
});
}

async getParanetSyncedAssetRecordsCount(paranetUal, options = {}) {
return this.model.count({
where: {
paranet_ual: paranetUal,
isSynced: true,
},
...options,
});
}

async getFilteredCountOfMissedAssetsOfParanet(
paranetUal,
retryCountLimit,
retryDelayInMs,
options = {},
) {
const now = new Date();
const delayDate = new Date(now.getTime() - retryDelayInMs);

const records = await this.model.findAll({
attributes: [
[Sequelize.fn('MAX', Sequelize.col('created_at')), 'latestCreatedAt'],
[Sequelize.fn('COUNT', Sequelize.col('ual')), 'retryCount'],
],
where: {
paranet_ual: paranetUal,
isSynced: false,
},
group: ['paranet_ual', 'ual'],
having: {
retryCount: {
[Sequelize.Op.lt]: retryCountLimit,
},
latestCreatedAt: {
[Sequelize.Op.lte]: delayDate,
},
},
...options,
});

return records.length;
}

async missedParanetAssetRecordExists(ual, options = {}) {
const missedParanetAssetRecord = await this.model.findOne({
where: { ual, isSynced: false },
...options,
});

return !!missedParanetAssetRecord;
}

async paranetSyncedAssetRecordExists(ual, options = {}) {
const paranetSyncedAssetRecord = await this.getParanetSyncedAssetRecordByUAL(ual, options);

return !!paranetSyncedAssetRecord;
}

async getParanetSyncedAssetRecordByUAL(ual, options = {}) {
return this.model.findOne({
where: { ual, isSynced: true },
...options,
});
}

async updateParanetAssetByUAL(ual, asset, options = {}) {
const [affectedRows] = await this.model.update(asset, {
where: { ual },
...options,
});

return affectedRows;
}
}

export default ParanetAssetRepository;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// DEPRECATED
class ParanetSyncedAssetRepository {
constructor(models) {
this.sequelize = models.sequelize;
Expand Down
33 changes: 14 additions & 19 deletions src/modules/repository/repository-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,7 @@ class RepositoryModuleManager extends BaseModuleManager {
}

async missedParanetAssetRecordExists(ual, options = {}) {
return this.getRepository('missed_paranet_asset').missedParanetAssetRecordExists(
ual,
options,
);
return this.getRepository('paranet_asset').missedParanetAssetRecordExists(ual, options);
}

async removeMissedParanetAssetRecordsByUAL(ual, options = {}) {
Expand Down Expand Up @@ -419,9 +416,9 @@ class RepositoryModuleManager extends BaseModuleManager {
);
}

async getCountOfMissedAssetsOfParanet(ual, options = {}) {
return this.getRepository('missed_paranet_asset').getCountOfMissedAssetsOfParanet(
ual,
async getCountOfMissedAssetsOfParanet(paranetUal, options = {}) {
return this.getRepository('paranet_asset').getCountOfMissedAssetsOfParanet(
paranetUal,
options,
);
}
Expand All @@ -432,7 +429,7 @@ class RepositoryModuleManager extends BaseModuleManager {
retryDelayInMs,
options = {},
) {
return this.getRepository('missed_paranet_asset').getFilteredCountOfMissedAssetsOfParanet(
return this.getRepository('paranet_asset').getFilteredCountOfMissedAssetsOfParanet(
ual,
retryCountLimit,
retryDelayInMs,
Expand All @@ -455,7 +452,7 @@ class RepositoryModuleManager extends BaseModuleManager {
dataSource,
options = {},
) {
return this.getRepository('paranet_synced_asset').createParanetSyncedAssetRecord(
return this.getRepository('paranet_asset').createParanetSyncedAssetRecord(
blockchainId,
ual,
paranetUal,
Expand All @@ -469,23 +466,21 @@ class RepositoryModuleManager extends BaseModuleManager {
}

async getParanetSyncedAssetRecordByUAL(ual, options = {}) {
return this.getRepository('paranet_synced_asset').getParanetSyncedAssetRecordByUAL(
ual,
options,
);
return this.getRepository('paranet_asset').getParanetSyncedAssetRecordByUAL(ual, options);
}

async getParanetSyncedAssetRecordsCountByDataSource(paranetUal, dataSource, options = {}) {
async getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options = {}) {
return this.getRepository(
'paranet_synced_asset',
).getParanetSyncedAssetRecordsCountByDataSource(paranetUal, dataSource, options);
).getParanetSyncedAssetRecordsCountByDataSource(paranetUal, options);
}

async paranetSyncedAssetRecordExists(ual, options = {}) {
return this.getRepository('paranet_synced_asset').paranetSyncedAssetRecordExists(
ual,
options,
);
return this.getRepository('paranet_asset').paranetSyncedAssetRecordExists(ual, options);
}

async updateParanetAssetByUAL(ual, asset, options = {}) {
return this.getRepository('paranet_asset').syncKnowledgeAssetsByUAL(ual, asset, options);
}

async incrementParanetKaCount(paranetId, blockchainId, options = {}) {
Expand Down

0 comments on commit e73f648

Please sign in to comment.