diff --git a/jobs/jpep/ftn_trade.py b/jobs/jpep/ftn_trade.py index 5daff8c..154e287 100644 --- a/jobs/jpep/ftn_trade.py +++ b/jobs/jpep/ftn_trade.py @@ -6,6 +6,8 @@ from models.jpep.credit_record import CreditRecord from models.jpep.ftn_trade_order import AmountField, FTNTradeOrderDocument +from models.jpep.new.ftn_macket_record import FTNMacketRecord +from models.jpep.new.ftn_order import FTNOrder, TypeEnum from models.jpep.user import User from utils.log import log_flow_run_start, log_flow_run_success, logger from utils.prefect_helper import ( @@ -68,6 +70,40 @@ async def process_item( ) +async def transform_and_write_new_data_source( + type: Literal["buy", "sell"], # noqa: A002 + old_data: list[FTNTradeOrderDocument], +) -> None: + new_data: list[FTNMacketRecord] = [] + for item in old_data: + order = await FTNOrder.get_by_id(item.id) + if not order: + await FTNOrder( + id=item.id, + type={"buy": TypeEnum.BUY, "sell": TypeEnum.SELL}[type], + publisher_id=item.publisher_id, + publish_time=item.published_at, + last_seen_time=item.fetch_time, + ).create() + else: + await FTNOrder.update_last_seen_time(order.id, item.fetch_time) + + new_data.append( + FTNMacketRecord( + fetch_time=item.fetch_time, + id=item.id, + price=item.price, + traded_count=item.traded_count, + total_amount=item.amount.total, + traded_amount=item.amount.traded, + remaining_amount=item.amount.tradable, + minimum_trade_amount=item.amount.minimum_trade, + ) + ) + + await FTNMacketRecord.insert_many(new_data) + + @flow( **generate_flow_config( name="采集简书积分兑换平台简书贝交易挂单", @@ -85,6 +121,7 @@ async def main(type: Literal["buy", "sell"]) -> None: # noqa: A002 if data: await FTNTradeOrderDocument.insert_many(data) + await transform_and_write_new_data_source(type, data) else: logger.warn("没有可采集的挂单信息,跳过数据写入") diff --git a/migrate_ftn_macket_records.py b/migrate_ftn_macket_records.py new file mode 100644 index 0000000..10f873f --- /dev/null +++ b/migrate_ftn_macket_records.py @@ -0,0 +1,51 @@ +from asyncio import run as asyncio_run + +from sshared.logging import Logger + +from models.jpep.new.ftn_macket_record import FTNMacketRecord +from utils.mongo import JPEP_DB + +OLD_COLLECTION = JPEP_DB.ftn_trade_orders +logger = Logger() + + +async def main() -> None: + await FTNMacketRecord.init() + + batch: list[FTNMacketRecord] = [] + + logger.info("开始执行数据迁移") + async for item in OLD_COLLECTION.find().sort({"fetchTime": 1}): + batch.append( + FTNMacketRecord( + fetch_time=item["fetchTime"], + id=item["id"], + price=item["price"], + traded_count=item["tradedCount"], + total_amount=item["amount"]["total"], + traded_amount=item["amount"]["traded"], + remaining_amount=item["amount"]["tradable"], + minimum_trade_amount=item["amount"]["minimumTrade"], + ) + ) + + if len(batch) == 10000: + await FTNMacketRecord.insert_many(batch) + logger.debug( + f"已迁移 {batch[0].fetch_time}/{batch[0].id} - " + f"{batch[-1].fetch_time}/{batch[-1].id}" + ) + batch.clear() + + if batch: + await FTNMacketRecord.insert_many(batch) + logger.debug( + f"已迁移 {batch[0].fetch_time}/{batch[0].id} - " + f"{batch[-1].fetch_time}/{batch[-1].id}" + ) + batch.clear() + + logger.info("数据迁移完成") + + +asyncio_run(main()) diff --git a/migrate_ftn_order.py b/migrate_ftn_order.py new file mode 100644 index 0000000..760c01c --- /dev/null +++ b/migrate_ftn_order.py @@ -0,0 +1,35 @@ +from asyncio import run as asyncio_run + +from sshared.logging import Logger + +from models.jpep.new.ftn_order import FTNOrder, TypeEnum +from utils.mongo import JPEP_DB + +OLD_COLLECTION = JPEP_DB.ftn_trade_orders +logger = Logger() + + +async def main() -> None: + await FTNOrder.init() + + logger.info("开始执行数据迁移") + for order_id in await OLD_COLLECTION.distinct("id"): + last_record = await OLD_COLLECTION.find_one( + {"id": order_id}, sort={"fetchTime": -1} + ) + if not last_record: + raise ValueError + + await FTNOrder( + id=last_record["id"], + type={"buy": TypeEnum.BUY, "sell": TypeEnum.SELL}[last_record["type"]], + publisher_id=last_record["publisherId"], + publish_time=last_record["publishedAt"], + last_seen_time=last_record["fetchTime"], + ).create() + logger.debug(f"已迁移订单 {order_id}") + + logger.info("数据迁移完成") + + +asyncio_run(main()) diff --git a/models/__init__.py b/models/__init__.py index ce66423..7b28f96 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -5,7 +5,8 @@ from models.jianshu.user import User as JianshuUser from models.jianshu.user_assets_ranking_record import UserAssetsRankingRecord from models.jpep.credit_record import CreditRecord -from models.jpep.ftn_trade_order import FTNTradeOrderDocument +from models.jpep.new.ftn_macket_record import FTNMacketRecord +from models.jpep.new.ftn_order import FTNOrder from models.jpep.user import User as JPEPUser @@ -15,5 +16,6 @@ async def init_db() -> None: await JianshuUser.init() await UserAssetsRankingRecord.init() await CreditRecord.init() - await FTNTradeOrderDocument.ensure_indexes() + await FTNMacketRecord.init() + await FTNOrder.init() await JPEPUser.init() diff --git a/models/jpep/new/ftn_macket_record.py b/models/jpep/new/ftn_macket_record.py new file mode 100644 index 0000000..4400b2b --- /dev/null +++ b/models/jpep/new/ftn_macket_record.py @@ -0,0 +1,61 @@ +from datetime import datetime + +from sshared.postgres import Table +from sshared.strict_struct import NonNegativeInt, PositiveFloat, PositiveInt + +from utils.postgres import get_jpep_conn + + +class FTNMacketRecord(Table, frozen=True): + fetch_time: datetime + id: PositiveInt + price: PositiveFloat + traded_count: NonNegativeInt + total_amount: PositiveInt + traded_amount: NonNegativeInt + remaining_amount: int + minimum_trade_amount: PositiveInt + + @classmethod + async def _create_table(cls) -> None: + conn = await get_jpep_conn() + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS ftn_macket_records ( + fetch_time TIMESTAMP NOT NULL, + id INTEGER NOT NULL, + price NUMERIC NOT NULL, + traded_count SMALLINT NOT NULL, + total_amount INTEGER NOT NULL, + traded_amount INTEGER NOT NULL, + remaining_amount INTEGER NOT NULL, + minimum_trade_amount INTEGER NOT NULL, + CONSTRAINT pk_ftn_macket_records_fetch_time_id PRIMARY KEY (fetch_time, id) + ); + """ # noqa: E501 + ) + + @classmethod + async def insert_many(cls, data: list["FTNMacketRecord"]) -> None: + for item in data: + item.validate() + + conn = await get_jpep_conn() + await conn.cursor().executemany( + "INSERT INTO ftn_macket_records (fetch_time, id, price, traded_count, " + "total_amount, traded_amount, remaining_amount, minimum_trade_amount) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s);", + [ + ( + item.fetch_time, + item.id, + item.price, + item.traded_count, + item.total_amount, + item.traded_amount, + item.remaining_amount, + item.minimum_trade_amount, + ) + for item in data + ], + ) diff --git a/models/jpep/new/ftn_order.py b/models/jpep/new/ftn_order.py new file mode 100644 index 0000000..f758921 --- /dev/null +++ b/models/jpep/new/ftn_order.py @@ -0,0 +1,84 @@ +from datetime import datetime +from enum import Enum +from typing import Optional + +from sshared.postgres import Table, create_enum +from sshared.strict_struct import PositiveInt + +from utils.postgres import get_jpep_conn + + +class TypeEnum(Enum): + BUY = "BUY" + SELL = "SELL" + + +class FTNOrder(Table, frozen=True): + id: PositiveInt + type: TypeEnum + publisher_id: PositiveInt + publish_time: datetime + last_seen_time: Optional[datetime] + + @classmethod + async def _create_enum(cls) -> None: + conn = await get_jpep_conn() + await create_enum(conn=conn, name="enum_ftn_orders_type", enum_class=TypeEnum) + + @classmethod + async def _create_table(cls) -> None: + conn = await get_jpep_conn() + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS ftn_orders ( + id INTEGER CONSTRAINT pk_ftn_orders_id PRIMARY KEY, + type enum_ftn_orders_type NOT NULL, + publisher_id INTEGER NOT NULL, + publish_time TIMESTAMP NOT NULL, + last_seen_time TIMESTAMP + ); + """ + ) + + async def create(self) -> None: + conn = await get_jpep_conn() + await conn.execute( + "INSERT INTO ftn_orders (id, type, publisher_id, publish_time, " + "last_seen_time) VALUES (%s, %s, %s, %s, %s);", + ( + self.id, + self.type, + self.publisher_id, + self.publish_time, + self.last_seen_time, + ), + ) + + @classmethod + async def get_by_id(cls, id: int) -> Optional["FTNOrder"]: # noqa: A002 + conn = await get_jpep_conn() + cursor = await conn.execute( + "SELECT type, publisher_id, publish_time, last_seen_time " + "FROM ftn_orders WHERE id = %s;", + (id,), + ) + + data = await cursor.fetchone() + if not data: + return None + + return cls( + id=id, + type=data[0], + publisher_id=data[1], + publish_time=data[2], + last_seen_time=data[3], + ) + + @classmethod + async def update_last_seen_time(cls, id: int, last_seen_time: datetime) -> None: # noqa: A002 + conn = await get_jpep_conn() + await conn.execute( + "UPDATE ftn_orders SET last_seen_time = %s WHERE id = %s;", + (last_seen_time, id), + )