diff --git a/jobs/jianshu/lottery.py b/jobs/jianshu/lottery.py deleted file mode 100644 index 79dded7..0000000 --- a/jobs/jianshu/lottery.py +++ /dev/null @@ -1,70 +0,0 @@ -from jkit.lottery import Lottery, LotteryWinRecord -from prefect import flow - -from models.jianshu.lottery_win_record import ( - LotteryWinRecordDocument, -) -from models.jianshu.user import UserDocument -from utils.log import log_flow_run_start, log_flow_run_success, logger -from utils.prefect_helper import ( - generate_deployment_config, - generate_flow_config, -) - - -async def process_item(item: LotteryWinRecord) -> LotteryWinRecordDocument: - await UserDocument.insert_or_update_one( - slug=item.user_info.slug, - updated_at=item.time, - id=item.user_info.id, - name=item.user_info.name, - avatar_url=item.user_info.avatar_url, - ) - - return LotteryWinRecordDocument( - id=item.id, - time=item.time, - award_name=item.award_name, - user_slug=item.user_info.slug, - ) - - -@flow( - **generate_flow_config( - name="采集简书大转盘抽奖中奖记录", - ) -) -async def main() -> None: - flow_run_name = log_flow_run_start(logger) - - stop_id = await LotteryWinRecordDocument.get_latest_record_id() - logger.info("已获取到最新的记录 ID", flow_run_name=flow_run_name, stop_id=stop_id) - if stop_id == 0: - logger.warn("数据库中没有记录", flow_run_name=flow_run_name) - - data: list[LotteryWinRecordDocument] = [] - async for item in Lottery().iter_win_records(): - if item.id == stop_id: - break - - processed_item = await process_item(item) - data.append(processed_item) - - if len(data) == 500: - logger.warn("采集数据量达到上限", flow_run_name=flow_run_name) - break - - if data: - await LotteryWinRecordDocument.insert_many(data) - else: - logger.info("无数据,不执行保存操作", flow_run_name=flow_run_name) - - log_flow_run_success(logger, data_count=len(data)) - - -deployment = main.to_deployment( - **generate_deployment_config( - name="采集简书大转盘抽奖中奖记录", - cron="*/10 * * * *", - ) -) diff --git a/main.py b/main.py index 42b8ae9..57e456f 100644 --- a/main.py +++ b/main.py @@ -4,6 +4,12 @@ from jobs import DEPLOYMENTS from models import MODELS +from models.new.jianshu.article_earning_ranking_record import ( + ArticleEarningRankingRecord, +) +from models.new.jianshu.assets_ranking_record import AssetsRankingRecord +from models.new.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord +from models.new.jianshu.user import User from utils.log import logger @@ -12,6 +18,12 @@ async def main() -> None: for model in MODELS: await model.ensure_indexes() logger.info("索引创建完成") + logger.info("正在初始化新版数据库") + await ArticleEarningRankingRecord.init() + await AssetsRankingRecord.init() + await DailyUpdateRankingRecord.init() + await User.init() + logger.info("初始化新版数据库完成") logger.info("启动工作流") await serve(*DEPLOYMENTS, print_starting_message=False) # type: ignore diff --git a/models/__init__.py b/models/__init__.py index 86b84c0..52b6a8c 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -3,7 +3,6 @@ from .jianshu.article_earning_ranking_record import ArticleEarningRankingRecordDocument from .jianshu.assets_ranking_record import AssetsRankingRecordDocument from .jianshu.daily_update_ranking_record import DailyUpdateRankingRecordDocument -from .jianshu.lottery_win_record import LotteryWinRecordDocument from .jianshu.user import UserDocument as JianshuUserDocument from .jpep.credit_history import CreditHistoryDocument from .jpep.ftn_trade_order import FTNTradeOrderDocument @@ -13,7 +12,6 @@ ArticleEarningRankingRecordDocument, AssetsRankingRecordDocument, DailyUpdateRankingRecordDocument, - LotteryWinRecordDocument, JianshuUserDocument, CreditHistoryDocument, FTNTradeOrderDocument,