Skip to content

Commit

Permalink
feat: 添加简书积分兑换平台挂单信息双写流程及迁移脚本
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Nov 1, 2024
1 parent 0ccbefb commit 25ad018
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 2 deletions.
37 changes: 37 additions & 0 deletions jobs/jpep/ftn_trade.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from models.jpep.credit_record import CreditRecord
from models.jpep.ftn_trade_order import AmountField, FTNTradeOrderDocument
from models.jpep.new.ftn_macket_record import FTNMacketRecord
from models.jpep.new.ftn_order import FTNOrder, TypeEnum
from models.jpep.user import User
from utils.log import log_flow_run_start, log_flow_run_success, logger
from utils.prefect_helper import (
Expand Down Expand Up @@ -68,6 +70,40 @@ async def process_item(
)


async def transform_and_write_new_data_source(
type: Literal["buy", "sell"], # noqa: A002
old_data: list[FTNTradeOrderDocument],
) -> None:
new_data: list[FTNMacketRecord] = []
for item in old_data:
order = await FTNOrder.get_by_id(item.id)
if not order:
await FTNOrder(
id=item.id,
type={"buy": TypeEnum.BUY, "sell": TypeEnum.SELL}[type],
publisher_id=item.publisher_id,
publish_time=item.published_at,
last_seen_time=item.fetch_time,
).create()
else:
await FTNOrder.update_last_seen_time(order.id, item.fetch_time)

new_data.append(
FTNMacketRecord(
fetch_time=item.fetch_time,
id=item.id,
price=item.price,
traded_count=item.traded_count,
total_amount=item.amount.total,
traded_amount=item.amount.traded,
remaining_amount=item.amount.tradable,
minimum_trade_amount=item.amount.minimum_trade,
)
)

await FTNMacketRecord.insert_many(new_data)


@flow(
**generate_flow_config(
name="采集简书积分兑换平台简书贝交易挂单",
Expand All @@ -85,6 +121,7 @@ async def main(type: Literal["buy", "sell"]) -> None: # noqa: A002

if data:
await FTNTradeOrderDocument.insert_many(data)
await transform_and_write_new_data_source(type, data)
else:
logger.warn("没有可采集的挂单信息,跳过数据写入")

Expand Down
51 changes: 51 additions & 0 deletions migrate_ftn_macket_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from asyncio import run as asyncio_run

from sshared.logging import Logger

from models.jpep.new.ftn_macket_record import FTNMacketRecord
from utils.mongo import JPEP_DB

OLD_COLLECTION = JPEP_DB.ftn_trade_orders
logger = Logger()


async def main() -> None:
await FTNMacketRecord.init()

batch: list[FTNMacketRecord] = []

logger.info("开始执行数据迁移")
async for item in OLD_COLLECTION.find().sort({"fetchTime": 1}):
batch.append(
FTNMacketRecord(
fetch_time=item["fetchTime"],
id=item["id"],
price=item["price"],
traded_count=item["tradedCount"],
total_amount=item["amount"]["total"],
traded_amount=item["amount"]["traded"],
remaining_amount=item["amount"]["tradable"],
minimum_trade_amount=item["amount"]["minimumTrade"],
)
)

if len(batch) == 10000:
await FTNMacketRecord.insert_many(batch)
logger.debug(
f"已迁移 {batch[0].fetch_time}/{batch[0].id} - "
f"{batch[-1].fetch_time}/{batch[-1].id}"
)
batch.clear()

if batch:
await FTNMacketRecord.insert_many(batch)
logger.debug(
f"已迁移 {batch[0].fetch_time}/{batch[0].id} - "
f"{batch[-1].fetch_time}/{batch[-1].id}"
)
batch.clear()

logger.info("数据迁移完成")


asyncio_run(main())
35 changes: 35 additions & 0 deletions migrate_ftn_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from asyncio import run as asyncio_run

from sshared.logging import Logger

from models.jpep.new.ftn_order import FTNOrder, TypeEnum
from utils.mongo import JPEP_DB

OLD_COLLECTION = JPEP_DB.ftn_trade_orders
logger = Logger()


async def main() -> None:
await FTNOrder.init()

logger.info("开始执行数据迁移")
for order_id in await OLD_COLLECTION.distinct("id"):
last_record = await OLD_COLLECTION.find_one(
{"id": order_id}, sort={"fetchTime": -1}
)
if not last_record:
raise ValueError

await FTNOrder(
id=last_record["id"],
type={"buy": TypeEnum.BUY, "sell": TypeEnum.SELL}[last_record["type"]],
publisher_id=last_record["publisherId"],
publish_time=last_record["publishedAt"],
last_seen_time=last_record["fetchTime"],
).create()
logger.debug(f"已迁移订单 {order_id}")

logger.info("数据迁移完成")


asyncio_run(main())
6 changes: 4 additions & 2 deletions models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from models.jianshu.user import User as JianshuUser
from models.jianshu.user_assets_ranking_record import UserAssetsRankingRecord
from models.jpep.credit_record import CreditRecord
from models.jpep.ftn_trade_order import FTNTradeOrderDocument
from models.jpep.new.ftn_macket_record import FTNMacketRecord
from models.jpep.new.ftn_order import FTNOrder
from models.jpep.user import User as JPEPUser


Expand All @@ -15,5 +16,6 @@ async def init_db() -> None:
await JianshuUser.init()
await UserAssetsRankingRecord.init()
await CreditRecord.init()
await FTNTradeOrderDocument.ensure_indexes()
await FTNMacketRecord.init()
await FTNOrder.init()
await JPEPUser.init()
61 changes: 61 additions & 0 deletions models/jpep/new/ftn_macket_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from datetime import datetime

from sshared.postgres import Table
from sshared.strict_struct import NonNegativeInt, PositiveFloat, PositiveInt

from utils.postgres import get_jpep_conn


class FTNMacketRecord(Table, frozen=True):
fetch_time: datetime
id: PositiveInt
price: PositiveFloat
traded_count: NonNegativeInt
total_amount: PositiveInt
traded_amount: NonNegativeInt
remaining_amount: int
minimum_trade_amount: PositiveInt

@classmethod
async def _create_table(cls) -> None:
conn = await get_jpep_conn()
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS ftn_macket_records (
fetch_time TIMESTAMP NOT NULL,
id INTEGER NOT NULL,
price NUMERIC NOT NULL,
traded_count SMALLINT NOT NULL,
total_amount INTEGER NOT NULL,
traded_amount INTEGER NOT NULL,
remaining_amount INTEGER NOT NULL,
minimum_trade_amount INTEGER NOT NULL,
CONSTRAINT pk_ftn_macket_records_fetch_time_id PRIMARY KEY (fetch_time, id)
);
""" # noqa: E501
)

@classmethod
async def insert_many(cls, data: list["FTNMacketRecord"]) -> None:
for item in data:
item.validate()

conn = await get_jpep_conn()
await conn.cursor().executemany(
"INSERT INTO ftn_macket_records (fetch_time, id, price, traded_count, "
"total_amount, traded_amount, remaining_amount, minimum_trade_amount) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s);",
[
(
item.fetch_time,
item.id,
item.price,
item.traded_count,
item.total_amount,
item.traded_amount,
item.remaining_amount,
item.minimum_trade_amount,
)
for item in data
],
)
84 changes: 84 additions & 0 deletions models/jpep/new/ftn_order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from datetime import datetime
from enum import Enum
from typing import Optional

from sshared.postgres import Table, create_enum
from sshared.strict_struct import PositiveInt

from utils.postgres import get_jpep_conn


class TypeEnum(Enum):
BUY = "BUY"
SELL = "SELL"


class FTNOrder(Table, frozen=True):
id: PositiveInt
type: TypeEnum
publisher_id: PositiveInt
publish_time: datetime
last_seen_time: Optional[datetime]

@classmethod
async def _create_enum(cls) -> None:
conn = await get_jpep_conn()
await create_enum(conn=conn, name="enum_ftn_orders_type", enum_class=TypeEnum)

@classmethod
async def _create_table(cls) -> None:
conn = await get_jpep_conn()
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS ftn_orders (
id INTEGER CONSTRAINT pk_ftn_orders_id PRIMARY KEY,
type enum_ftn_orders_type NOT NULL,
publisher_id INTEGER NOT NULL,
publish_time TIMESTAMP NOT NULL,
last_seen_time TIMESTAMP
);
"""
)

async def create(self) -> None:
conn = await get_jpep_conn()
await conn.execute(
"INSERT INTO ftn_orders (id, type, publisher_id, publish_time, "
"last_seen_time) VALUES (%s, %s, %s, %s, %s);",
(
self.id,
self.type,
self.publisher_id,
self.publish_time,
self.last_seen_time,
),
)

@classmethod
async def get_by_id(cls, id: int) -> Optional["FTNOrder"]: # noqa: A002
conn = await get_jpep_conn()
cursor = await conn.execute(
"SELECT type, publisher_id, publish_time, last_seen_time "
"FROM ftn_orders WHERE id = %s;",
(id,),
)

data = await cursor.fetchone()
if not data:
return None

return cls(
id=id,
type=data[0],
publisher_id=data[1],
publish_time=data[2],
last_seen_time=data[3],
)

@classmethod
async def update_last_seen_time(cls, id: int, last_seen_time: datetime) -> None: # noqa: A002
conn = await get_jpep_conn()
await conn.execute(
"UPDATE ftn_orders SET last_seen_time = %s WHERE id = %s;",
(last_seen_time, id),
)

0 comments on commit 25ad018

Please sign in to comment.