From 0ccbefb13faef3321af0f0ce21b80959f753d022 Mon Sep 17 00:00:00 2001 From: FHU-yezi Date: Fri, 1 Nov 2024 15:09:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=BF=81=E7=A7=BB=E7=AE=80=E4=B9=A6?= =?UTF-8?q?=E7=A7=AF=E5=88=86=E5=85=91=E6=8D=A2=E5=B9=B3=E5=8F=B0=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E4=B8=8E=E4=BF=A1=E7=94=A8=E8=AE=B0=E5=BD=95=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jobs/jpep/ftn_trade.py | 25 +++--- main.py | 19 +--- migrate_credit_history.py | 26 ++++++ migrate_jpep_user.py | 30 +++++++ models/__init__.py | 19 ++++ models/jpep/credit_history.py | 25 ------ models/jpep/credit_record.py | 55 ++++++++++++ models/jpep/user.py | 159 +++++++++++++++++++--------------- utils/postgres.py | 2 +- 9 files changed, 234 insertions(+), 126 deletions(-) create mode 100644 migrate_credit_history.py create mode 100644 migrate_jpep_user.py create mode 100644 models/__init__.py delete mode 100644 models/jpep/credit_history.py create mode 100644 models/jpep/credit_record.py diff --git a/jobs/jpep/ftn_trade.py b/jobs/jpep/ftn_trade.py index 8dbd852..5daff8c 100644 --- a/jobs/jpep/ftn_trade.py +++ b/jobs/jpep/ftn_trade.py @@ -4,12 +4,9 @@ from jkit.jpep.ftn_macket import FTNMacket, FTNMacketOrderRecord from prefect import flow -from models.jpep.credit_history import CreditHistoryDocument -from models.jpep.ftn_trade_order import ( - AmountField, - FTNTradeOrderDocument, -) -from models.jpep.user import UserDocument +from models.jpep.credit_record import CreditRecord +from models.jpep.ftn_trade_order import AmountField, FTNTradeOrderDocument +from models.jpep.user import User from utils.log import log_flow_run_start, log_flow_run_success, logger from utils.prefect_helper import ( generate_deployment_config, @@ -38,23 +35,21 @@ async def process_item( type: Literal["buy", "sell"], # noqa: A002 ) -> FTNTradeOrderDocument: if item.publisher_info.id: - await UserDocument.insert_or_update_one( - updated_at=time, + await User.upsert( id=item.publisher_info.id, name=item.publisher_info.name, hashed_name=item.publisher_info.hashed_name, avatar_url=item.publisher_info.avatar_url, ) - latest_credit_value = await CreditHistoryDocument.get_latest_value( - item.publisher_info.id - ) - if not latest_credit_value or latest_credit_value != item.publisher_info.credit: - await CreditHistoryDocument( + latest_credit = await CreditRecord.get_latest_credit(item.publisher_info.id) + # 如果没有记录过这个用户的信用值,或信用值已修改,增加新的记录 + if not latest_credit or latest_credit != item.publisher_info.credit: + await CreditRecord( time=time, user_id=item.publisher_info.id, - value=item.publisher_info.credit, - ).save() + credit=item.publisher_info.credit, + ).create() return FTNTradeOrderDocument( fetch_time=time, diff --git a/main.py b/main.py index ae108a4..4d6178e 100644 --- a/main.py +++ b/main.py @@ -3,27 +3,12 @@ from prefect import serve from jobs import DEPLOYMENTS -from models.jianshu.article_earning_ranking_record import ( - ArticleEarningRankingRecord, -) -from models.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord -from models.jianshu.user import User -from models.jianshu.user_assets_ranking_record import UserAssetsRankingRecord -from models.jpep.credit_history import CreditHistoryDocument -from models.jpep.ftn_trade_order import FTNTradeOrderDocument -from models.jpep.user import UserDocument +from models import init_db from utils.log import logger async def main() -> None: - await CreditHistoryDocument.ensure_indexes() - await FTNTradeOrderDocument.ensure_indexes() - await UserDocument.ensure_indexes() - - await ArticleEarningRankingRecord.init() - await UserAssetsRankingRecord.init() - await DailyUpdateRankingRecord.init() - await User.init() + await init_db() logger.info("初始化数据库完成") logger.info("启动工作流") diff --git a/migrate_credit_history.py b/migrate_credit_history.py new file mode 100644 index 0000000..b6602ef --- /dev/null +++ b/migrate_credit_history.py @@ -0,0 +1,26 @@ +from asyncio import run as asyncio_run + +from sshared.logging import Logger + +from models.jpep.credit_record import CreditRecord +from utils.mongo import JPEP_DB + +OLD_COLLECTION = JPEP_DB.credit_history +logger = Logger() + + +async def main() -> None: + await CreditRecord.init() + + logger.info("开始执行数据迁移") + async for item in OLD_COLLECTION.find().sort({"time": 1, "userId": 1}): + await CreditRecord( + time=item["time"], user_id=item["userId"], credit=item["value"] + ).create() + + logger.debug(f"已迁移 {item['time']} - {item['userId']}") + + logger.info("数据迁移完成") + + +asyncio_run(main()) diff --git a/migrate_jpep_user.py b/migrate_jpep_user.py new file mode 100644 index 0000000..3620f21 --- /dev/null +++ b/migrate_jpep_user.py @@ -0,0 +1,30 @@ +from asyncio import run as asyncio_run + +from sshared.logging import Logger + +from models.jpep.user import User +from utils.mongo import JPEP_DB + +OLD_COLLECTION = JPEP_DB.users +logger = Logger() + + +async def main() -> None: + await User.init() + + logger.info("开始执行数据迁移") + async for item in OLD_COLLECTION.find().sort({"id": 1}): + await User( + id=item["id"], + update_time=item["updatedAt"], + name=item["name"], + hashed_name=item["hashedName"], + avatar_url=item["avatarUrl"], + ).create() + + logger.debug(f"已迁移 {item['id']}") + + logger.info("数据迁移完成") + + +asyncio_run(main()) diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..ce66423 --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,19 @@ +from models.jianshu.article_earning_ranking_record import ( + ArticleEarningRankingRecord, +) +from models.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord +from models.jianshu.user import User as JianshuUser +from models.jianshu.user_assets_ranking_record import UserAssetsRankingRecord +from models.jpep.credit_record import CreditRecord +from models.jpep.ftn_trade_order import FTNTradeOrderDocument +from models.jpep.user import User as JPEPUser + + +async def init_db() -> None: + await ArticleEarningRankingRecord.init() + await DailyUpdateRankingRecord.init() + await JianshuUser.init() + await UserAssetsRankingRecord.init() + await CreditRecord.init() + await FTNTradeOrderDocument.ensure_indexes() + await JPEPUser.init() diff --git a/models/jpep/credit_history.py b/models/jpep/credit_history.py deleted file mode 100644 index e8281cd..0000000 --- a/models/jpep/credit_history.py +++ /dev/null @@ -1,25 +0,0 @@ -from datetime import datetime -from typing import Optional - -from jkit.msgspec_constraints import NonNegativeInt, PositiveInt -from sshared.mongo import Document, Index - -from utils.mongo import JPEP_DB - - -class CreditHistoryDocument(Document, frozen=True): - time: datetime - user_id: PositiveInt - value: NonNegativeInt - - class Meta: # type: ignore - collection = JPEP_DB.credit_history - indexes = (Index(keys=("time", "userId"), unique=True),) - - @classmethod - async def get_latest_value(cls, user_id: int) -> Optional[int]: - latest_record = await cls.find_one({"userId": user_id}, sort={"time": "DESC"}) - if not latest_record: - return None - - return latest_record.value diff --git a/models/jpep/credit_record.py b/models/jpep/credit_record.py new file mode 100644 index 0000000..18fb0c8 --- /dev/null +++ b/models/jpep/credit_record.py @@ -0,0 +1,55 @@ +from datetime import datetime +from typing import Optional + +from sshared.postgres import Table +from sshared.strict_struct import ( + NonNegativeInt, + PositiveInt, +) + +from utils.postgres import get_jpep_conn + + +class CreditRecord(Table, frozen=True): + time: datetime + user_id: PositiveInt + credit: NonNegativeInt + + @classmethod + async def _create_table(cls) -> None: + conn = await get_jpep_conn() + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS credit_records ( + time TIMESTAMP NOT NULL, + user_id INTEGER NOT NULL, + credit INTEGER NOT NULL, + CONSTRAINT pk_credit_records_time_user_id PRIMARY KEY (time, user_id) + ); + """ + ) + + async def create(self) -> None: + self.validate() + + conn = await get_jpep_conn() + await conn.execute( + "INSERT INTO credit_records (time, user_id, credit) VALUES " + "(%s, %s, %s);", + (self.time, self.user_id, self.credit), + ) + + @classmethod + async def get_latest_credit(cls, user_id: int) -> Optional[int]: + conn = await get_jpep_conn() + cursor = await conn.execute( + "SELECT credit FROM credit_records WHERE user_id = %s " + "ORDER BY time DESC LIMIT 1;", + (user_id,), + ) + + data = await cursor.fetchone() + if not data: + return None + + return data[0] diff --git a/models/jpep/user.py b/models/jpep/user.py index 2c72067..42e902e 100644 --- a/models/jpep/user.py +++ b/models/jpep/user.py @@ -1,96 +1,119 @@ from datetime import datetime -from typing import Any, Optional +from typing import Optional -from jkit.msgspec_constraints import NonNegativeInt, PositiveInt -from sshared.mongo import Document, Field, Index +from sshared.postgres import Table +from sshared.strict_struct import ( + NonEmptyStr, + PositiveInt, +) -from utils.mongo import JPEP_DB +from utils.postgres import get_jpep_conn -class CreditHistoryFieldItem(Field, frozen=True): - time: datetime - value: NonNegativeInt +class User(Table, frozen=True): + id: PositiveInt + update_time: datetime + name: NonEmptyStr + hashed_name: NonEmptyStr + avatar_url: Optional[NonEmptyStr] + @classmethod + async def _create_table(cls) -> None: + conn = await get_jpep_conn() + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS users ( + id INTEGER NOT NULL CONSTRAINT pk_users_id PRIMARY KEY, + update_time TIMESTAMP NOT NULL, + name TEXT NOT NULL, + hashed_name VARCHAR(9) NOT NULL, + avatar_url TEXT + ); + """ + ) -class UserDocument(Document, frozen=True): - updated_at: datetime - id: PositiveInt - name: str - hashed_name: Optional[str] - avatar_url: Optional[str] - - class Meta: # type: ignore - collection = JPEP_DB.users - indexes = ( - Index(keys=("id",), unique=True), - Index(keys=("updatedAt",)), + async def create(self) -> None: + self.validate() + + conn = await get_jpep_conn() + await conn.execute( + "INSERT INTO users (id, update_time, name, hashed_name, avatar_url) VALUES " + "(%s, %s, %s, %s, %s);", + (self.id, self.update_time, self.name, self.hashed_name, self.avatar_url), ) @classmethod - async def is_record_exist(cls, id: int) -> bool: # noqa: A002 - return await cls.find_one({"id": id}) is not None + async def get_by_id(cls, id: int) -> Optional["User"]: # noqa: A002 + conn = await get_jpep_conn() + cursor = await conn.execute( + "SELECT update_time, name, hashed_name, avatar_url " + "FROM users WHERE id = %s", + (id,), + ) + + data = await cursor.fetchone() + if not data: + return None + + return cls( + id=id, + update_time=data[0], + name=data[1], + hashed_name=data[2], + avatar_url=data[3], + ) @classmethod - async def insert_or_update_one( + async def upsert( cls, - *, - updated_at: Optional[datetime] = None, id: int, # noqa: A002 name: str, hashed_name: str, - avatar_url: Optional[str], + avatar_url: Optional[str] = None, ) -> None: - if not updated_at: - updated_at = datetime.now() - - if not await cls.is_record_exist(id): - await UserDocument( - updated_at=updated_at, + user = await cls.get_by_id(id) + # 如果不存在,创建用户 + if not user: + await cls( id=id, + update_time=datetime.now(), name=name, hashed_name=hashed_name, avatar_url=avatar_url, - ).save() + ).create() return - db_data = await cls.find_one({"id": id}) - if not db_data: - raise AssertionError("意外的空值") - # 如果数据库中数据的更新时间晚于本次更新时间,则本次数据已不是最新 - # 此时跳过更新 - if updated_at < db_data.updated_at: + # 如果当前数据不是最新,跳过更新 + if user.update_time > datetime.now(): return - update_data: dict[str, Any] = { - "$set": { - # 即使没有要更新的数据,也要刷新更新时间 - "updatedAt": updated_at, - } - } - # 如果昵称不一致,更新之 - if (name is not None and db_data.name is not None) and name != db_data.name: - update_data["$set"]["name"] = name + # 在一个事务中一次性完成全部字段的更新 + conn = await get_jpep_conn() + async with conn.transaction(): + # 更新更新时间 + await conn.execute( + "UPDATE users SET update_time = %s WHERE id = %s", + (datetime.now(), id), + ) - # 如果头像链接有变动,更新之 - if avatar_url is not None and avatar_url != db_data.avatar_url: - update_data["$set"]["avatarUrl"] = avatar_url + # 更新昵称和哈希后昵称 + if user.name and name and user.name != name: + # 哈希后昵称一定会跟随昵称变化,一同更新 + await conn.execute( + "UPDATE users SET name = %s, hashed_name = %s WHERE id = %s", + (name, hashed_name, id), + ) - await cls.get_collection().update_one({"id": id}, update_data) + # 如果没有存储头像链接,进行添加 + if not user.avatar_url and avatar_url: + await conn.execute( + "UPDATE users SET avatar_url = %s WHERE id = %s", + (avatar_url, id), + ) - @classmethod - async def insert_one_if_not_exist( - cls, - *, - updated_at: datetime, - id: int, # noqa: A002 - name: str, - hashed_name: Optional[str], - ) -> None: - if not await cls.is_record_exist(id): - await UserDocument( - updated_at=updated_at, - id=id, - name=name, - hashed_name=hashed_name, - avatar_url=None, - ).save() + # 更新头像链接 + if user.avatar_url and avatar_url and user.avatar_url != avatar_url: + await conn.execute( + "UPDATE users SET avatar_url = %s WHERE id = %s", + (avatar_url, id), + ) diff --git a/utils/postgres.py b/utils/postgres.py index 6c2f243..f7f477a 100644 --- a/utils/postgres.py +++ b/utils/postgres.py @@ -11,4 +11,4 @@ _jpep_connection_manager = ConnectionManager(CONFIG.jpep_postgres.connection_string) get_jianshu_conn = _jianshu_connection_manager.get_conn -get_jianshu_conn = _jpep_connection_manager.get_conn +get_jpep_conn = _jpep_connection_manager.get_conn