Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Paranet sync rework #3518

Open
wants to merge 1 commit into
base: release/v8.0.0-sigma
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
334 changes: 176 additions & 158 deletions src/commands/paranet/paranet-sync-command.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
export async function up({ context: { queryInterface, Sequelize } }) {
await queryInterface.createTable('paranet_asset', {
id: {
type: Sequelize.INTEGER,
primaryKey: true,
autoIncrement: true,
},
blockchain_id: {
type: Sequelize.STRING,
allowNull: false,
},
ual: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make ual and paranet_ual as unique keys unique you can do it like this in raw SQL
ALTER TABLE `paranet_asset` ADD UNIQUE `unique_index`(`ual`, `paranet_ual`);

To prevent creation of duplicates

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

```await queryInterface.sequelize.query(`...`);```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Added this using queryInterface.addConstraint, like we had in finality_status

type: Sequelize.STRING,
allowNull: false,
},
paranet_ual: {
type: Sequelize.STRING,
allowNull: false,
},
public_assertion_id: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Deleted

type: Sequelize.STRING,
allowNull: true,
},
private_assertion_id: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Deleted

type: Sequelize.STRING,
allowNull: true,
},
sender: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't get this I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Deleted

type: Sequelize.STRING,
allowNull: true,
},
transaction_hash: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't get this atm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Deleted

type: Sequelize.STRING,
allowNull: true,
},
error_message: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was used when there was multiple records for failed, do you think we should need it

type: Sequelize.TEXT,
allowNull: true,
},
is_synced: {
type: Sequelize.BOOLEAN,
allowNull: false,
defaultValue: false,
},
retries: {
allowNull: false,
type: Sequelize.INTEGER,
defaultValue: 0,
},
created_at: {
allowNull: false,
type: Sequelize.DATE,
defaultValue: Sequelize.literal('NOW()'),
},
updated_at: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There needs to be a trigger for this, in raw SQL it is created like this:

        CREATE TRIGGER before_update_paranet_asset
        BEFORE UPDATE ON paranet_asset
        FOR EACH ROW
        SET NEW.updated_at = NOW();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add indexes on columns we do queries on:

Used to find missed assets

        CREATE INDEX idx_paranet_asset_updated_at_retries
        ON paranet_asset(updated_at, retries, isSynced);

Used to find all paranet assets

        CREATE INDEX idx_paranet_asset_paranet_ual_ual
        ON paranet_asset(ual, paranet_ual);

Not sure if we need any other indexes atm

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CREATE INDEX idx_paranet_asset_paranet_ual_syneced
        ON paranet_asset(synced, paranet_ual);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for first trigger, its done automatically by sequelize 99%, but will test it when i can

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to be done like this https://sequelize.org/docs/v6/core-concepts/model-basics/#timestamps - its meant to be done like this, on sequelize level and then sequelize will add timestamps. Do you think this is ok? @Mihajlo-Pavlovic

allowNull: false,
type: Sequelize.DATE,
defaultValue: Sequelize.literal('NOW()'),
},
});
}

export async function down({ context: { queryInterface } }) {
await queryInterface.dropTable('paranet_assets');
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// NOT USED ANYMORE

export default (sequelize, DataTypes) => {
const blockchain = sequelize.define(
'missed_paranet_asset',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
export default (sequelize, DataTypes) => {
const paranetAsset = sequelize.define(
'paranet_asset',
{
id: {
autoIncrement: true,
primaryKey: true,
type: DataTypes.INTEGER,
},
blockchainId: {
allowNull: false,
type: DataTypes.STRING,
},
ual: {
allowNull: false,
type: DataTypes.STRING,
},
paranetUal: {
allowNull: false,
type: DataTypes.STRING,
},
publicAssertionId: {
allowNull: true,
type: DataTypes.STRING,
},
privateAssertionId: {
allowNull: true,
type: DataTypes.STRING,
},
sender: {
allowNull: true,
type: DataTypes.STRING,
},
transactionHash: {
allowNull: true,
type: DataTypes.STRING,
},
errorMessage: {
allowNull: true,
type: DataTypes.TEXT,
},
isSynced: {
allowNull: false,
type: DataTypes.BOOLEAN,
defaultValue: false,
},
retries: {
allowNull: false,
type: DataTypes.INTEGER,
defaultValue: 0,
},
createdAt: {
type: DataTypes.DATE,
},
updatedAt: {
type: DataTypes.DATE,
},
},
{
underscored: true,
indexes: [
{
unique: true,
fields: ['ual', 'paranetUal'], // Composite unique constraint on `ual` and `paranetUal`
},
{
fields: ['updatedAt', 'retries', 'isSynced'],
},
{
fields: ['ual', 'paranetUal'],
},
{
fields: ['isSynced', 'paranetUal'],
},
],
},
);

paranetAsset.associate = () => {
// Define associations here if needed
};

return paranetAsset;
};
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { PARANET_SYNC_SOURCES } from '../../../../../constants/constants.js';

// NOT USED ANYMORE
export default (sequelize, DataTypes) => {
const blockchain = sequelize.define(
'paranet_synced_asset',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import Sequelize from 'sequelize';

class ParanetAssetRepository {
constructor(models) {
this.sequelize = models.sequelize;
this.model = models.paranet_asset;
}

async createParanetAssetRecord(missedParanetAsset, options) {
return this.model.create({ ...missedParanetAsset, isSynced: false }, options);
}

async getCountOfMissedAssetsOfParanet(paranetUal, options = {}) {
return this.model.count({
where: {
paranetUal,
isSynced: false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also take into account retry to be less than constant and that updateAt + delay > current time

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like that is laready done on line 58

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use both queries for different reasons i think

},
...options,
});
}

async getParanetSyncedAssetRecordsCount(paranetUal, options = {}) {
return this.model.count({
where: {
paranet_ual: paranetUal,
isSynced: true,
},
...options,
});
}

// TODO: remove
// async getFilteredCountOfMissedAssetsOfParanet(
// paranetUal,
// retryCountLimit,
// retryDelayInMs,
// options = {},
// ) {
// const now = new Date();
// const delayDate = new Date(now.getTime() - retryDelayInMs);

// const records = await this.model.findAll({
// where: {
// paranetUal,
// isSynced: false, // Only unsynced assets
// retries: {
// [Sequelize.Op.lt]: retryCountLimit, // Filter by retries count
// },
// created_at: {
// [Sequelize.Op.lte]: delayDate, // Filter by created_at date
// },
// },
// ...options,
// });

// return records.length; // Return the count of matching records
// }

async getMissedParanetAssetsRecordsWithRetryCount(
paranetUal,
retryCountLimit,
retryDelayInMs,
limit = null,
options = {},
) {
const now = new Date();
const delayDate = new Date(now.getTime() - retryDelayInMs);

const queryOptions = {
where: {
paranetUal,
isSynced: false,
retries: {
[Sequelize.Op.lt]: retryCountLimit,
},
updated_at: {
[Sequelize.Op.lte]: delayDate,
},
},
...options,
};

if (limit !== null) {
queryOptions.limit = limit;
}

return this.model.findAll(queryOptions);
}

async missedParanetAssetRecordExists(ual, paranetUal, options = {}) {
const missedParanetAssetRecord = await this.model.findOne({
where: { ual, isSynced: false },
...options,
});

return !!missedParanetAssetRecord;
}

async paranetSyncedAssetRecordExists(ual, paranetUal, options = {}) {
const paranetSyncedAssetRecord = await this.model.getParanetSyncedAssetRecordByUAL(
ual,
paranetUal,
options,
);

return !!paranetSyncedAssetRecord;
}

async updateAssetToBeSynced(ual, paranetUal, options = {}) {
const [affectedRows] = await this.model.update(
{ isSynced: true },
{
where: {
ual,
paranetUal,
},
...options,
},
);

return affectedRows;
}

async incrementRetriesForUalAndParanetUal(ual, paranetUal, options = {}) {
const [affectedRows] = await this.model.increment('retries', {
by: 1,
where: {
ual,
paranetUal,
},
...options,
});

return affectedRows;
}
}

export default ParanetAssetRepository;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// DEPRECATED
class ParanetSyncedAssetRepository {
constructor(models) {
this.sequelize = models.sequelize;
Expand Down
Loading
Loading