Skip to content

Commit

Permalink
feat: 迁移简书积分兑换平台用户与信用记录数据
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Nov 1, 2024
1 parent 22a7b2b commit 0ccbefb
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 126 deletions.
25 changes: 10 additions & 15 deletions jobs/jpep/ftn_trade.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
from jkit.jpep.ftn_macket import FTNMacket, FTNMacketOrderRecord
from prefect import flow

from models.jpep.credit_history import CreditHistoryDocument
from models.jpep.ftn_trade_order import (
AmountField,
FTNTradeOrderDocument,
)
from models.jpep.user import UserDocument
from models.jpep.credit_record import CreditRecord
from models.jpep.ftn_trade_order import AmountField, FTNTradeOrderDocument
from models.jpep.user import User
from utils.log import log_flow_run_start, log_flow_run_success, logger
from utils.prefect_helper import (
generate_deployment_config,
Expand Down Expand Up @@ -38,23 +35,21 @@ async def process_item(
type: Literal["buy", "sell"], # noqa: A002
) -> FTNTradeOrderDocument:
if item.publisher_info.id:
await UserDocument.insert_or_update_one(
updated_at=time,
await User.upsert(
id=item.publisher_info.id,
name=item.publisher_info.name,
hashed_name=item.publisher_info.hashed_name,
avatar_url=item.publisher_info.avatar_url,
)

latest_credit_value = await CreditHistoryDocument.get_latest_value(
item.publisher_info.id
)
if not latest_credit_value or latest_credit_value != item.publisher_info.credit:
await CreditHistoryDocument(
latest_credit = await CreditRecord.get_latest_credit(item.publisher_info.id)
# 如果没有记录过这个用户的信用值,或信用值已修改,增加新的记录
if not latest_credit or latest_credit != item.publisher_info.credit:
await CreditRecord(
time=time,
user_id=item.publisher_info.id,
value=item.publisher_info.credit,
).save()
credit=item.publisher_info.credit,
).create()

return FTNTradeOrderDocument(
fetch_time=time,
Expand Down
19 changes: 2 additions & 17 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,12 @@
from prefect import serve

from jobs import DEPLOYMENTS
from models.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord,
)
from models.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord
from models.jianshu.user import User
from models.jianshu.user_assets_ranking_record import UserAssetsRankingRecord
from models.jpep.credit_history import CreditHistoryDocument
from models.jpep.ftn_trade_order import FTNTradeOrderDocument
from models.jpep.user import UserDocument
from models import init_db
from utils.log import logger


async def main() -> None:
await CreditHistoryDocument.ensure_indexes()
await FTNTradeOrderDocument.ensure_indexes()
await UserDocument.ensure_indexes()

await ArticleEarningRankingRecord.init()
await UserAssetsRankingRecord.init()
await DailyUpdateRankingRecord.init()
await User.init()
await init_db()
logger.info("初始化数据库完成")

logger.info("启动工作流")
Expand Down
26 changes: 26 additions & 0 deletions migrate_credit_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from asyncio import run as asyncio_run

from sshared.logging import Logger

from models.jpep.credit_record import CreditRecord
from utils.mongo import JPEP_DB

OLD_COLLECTION = JPEP_DB.credit_history
logger = Logger()


async def main() -> None:
await CreditRecord.init()

logger.info("开始执行数据迁移")
async for item in OLD_COLLECTION.find().sort({"time": 1, "userId": 1}):
await CreditRecord(
time=item["time"], user_id=item["userId"], credit=item["value"]
).create()

logger.debug(f"已迁移 {item['time']} - {item['userId']}")

logger.info("数据迁移完成")


asyncio_run(main())
30 changes: 30 additions & 0 deletions migrate_jpep_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from asyncio import run as asyncio_run

from sshared.logging import Logger

from models.jpep.user import User
from utils.mongo import JPEP_DB

OLD_COLLECTION = JPEP_DB.users
logger = Logger()


async def main() -> None:
await User.init()

logger.info("开始执行数据迁移")
async for item in OLD_COLLECTION.find().sort({"id": 1}):
await User(
id=item["id"],
update_time=item["updatedAt"],
name=item["name"],
hashed_name=item["hashedName"],
avatar_url=item["avatarUrl"],
).create()

logger.debug(f"已迁移 {item['id']}")

logger.info("数据迁移完成")


asyncio_run(main())
19 changes: 19 additions & 0 deletions models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from models.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord,
)
from models.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord
from models.jianshu.user import User as JianshuUser
from models.jianshu.user_assets_ranking_record import UserAssetsRankingRecord
from models.jpep.credit_record import CreditRecord
from models.jpep.ftn_trade_order import FTNTradeOrderDocument
from models.jpep.user import User as JPEPUser


async def init_db() -> None:
await ArticleEarningRankingRecord.init()
await DailyUpdateRankingRecord.init()
await JianshuUser.init()
await UserAssetsRankingRecord.init()
await CreditRecord.init()
await FTNTradeOrderDocument.ensure_indexes()
await JPEPUser.init()
25 changes: 0 additions & 25 deletions models/jpep/credit_history.py

This file was deleted.

55 changes: 55 additions & 0 deletions models/jpep/credit_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from datetime import datetime
from typing import Optional

from sshared.postgres import Table
from sshared.strict_struct import (
NonNegativeInt,
PositiveInt,
)

from utils.postgres import get_jpep_conn


class CreditRecord(Table, frozen=True):
time: datetime
user_id: PositiveInt
credit: NonNegativeInt

@classmethod
async def _create_table(cls) -> None:
conn = await get_jpep_conn()
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS credit_records (
time TIMESTAMP NOT NULL,
user_id INTEGER NOT NULL,
credit INTEGER NOT NULL,
CONSTRAINT pk_credit_records_time_user_id PRIMARY KEY (time, user_id)
);
"""
)

async def create(self) -> None:
self.validate()

conn = await get_jpep_conn()
await conn.execute(
"INSERT INTO credit_records (time, user_id, credit) VALUES "
"(%s, %s, %s);",
(self.time, self.user_id, self.credit),
)

@classmethod
async def get_latest_credit(cls, user_id: int) -> Optional[int]:
conn = await get_jpep_conn()
cursor = await conn.execute(
"SELECT credit FROM credit_records WHERE user_id = %s "
"ORDER BY time DESC LIMIT 1;",
(user_id,),
)

data = await cursor.fetchone()
if not data:
return None

return data[0]
Loading

0 comments on commit 0ccbefb

Please sign in to comment.