diff --git a/jobs/fetch_jianshu_lottery_win_records.py b/jobs/fetch_jianshu_lottery_win_records.py index 02acdd2..70d70bd 100644 --- a/jobs/fetch_jianshu_lottery_win_records.py +++ b/jobs/fetch_jianshu_lottery_win_records.py @@ -5,7 +5,7 @@ from jkit.jianshu_lottery import JianshuLottery, JianshuLotteryWinRecord from prefect import flow, get_run_logger from prefect.states import Completed, State -from pymongo import DESCENDING +from pymongo import ASCENDING, DESCENDING, IndexModel from utils.config_generators import generate_deployment_config, generate_flow_config from utils.db import DB @@ -26,7 +26,7 @@ class UserInfoField(Field, **FIELD_OBJECT_CONFIG): class JianshuLotteryWinRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG): - _id: PositiveInt # type: ignore + record_id: PositiveInt time: datetime award_name: str user_info: UserInfoField @@ -40,12 +40,12 @@ async def get_latest_stored_record_id() -> int: except StopAsyncIteration: return 0 - return latest_data._id + return latest_data.record_id def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordDocument: return JianshuLotteryWinRecordDocument( - _id=item.id, + record_id=item.id, time=item.time, award_name=item.award_name, user_info=UserInfoField( @@ -62,6 +62,10 @@ def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordDoc ) ) async def flow_func() -> State: + await COLLECTION.create_indexes( + [IndexModel([("recordId", ASCENDING)], unique=True)] + ) + logger = get_run_logger() stop_id = await get_latest_stored_record_id()