From 26616c5100dc0a76d5d584fa6af4ed5949b992d6 Mon Sep 17 00:00:00 2001 From: yezi Date: Tue, 12 Mar 2024 06:14:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E8=BF=81=E7=A7=BB?= =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- migrate_01_article_fp_rank.py | 37 ++++++++--- migrate_02_assets_rank.py | 111 ++++++++++++++++++++++++++++++++ migrate_03_daily_update_rank.py | 106 ++++++++++++++++++++++++++++++ models/assets_ranking_record.py | 2 +- utils/migrate_helper.py | 21 ++++++ 5 files changed, 266 insertions(+), 11 deletions(-) create mode 100644 migrate_02_assets_rank.py create mode 100644 migrate_03_daily_update_rank.py create mode 100644 utils/migrate_helper.py diff --git a/migrate_01_article_fp_rank.py b/migrate_01_article_fp_rank.py index 8b4dbbc..82c37cf 100644 --- a/migrate_01_article_fp_rank.py +++ b/migrate_01_article_fp_rank.py @@ -12,6 +12,10 @@ ) from models.jianshu_user import JianshuUserDocument from old_models.article_fp_rank import OldArticleFPRank +from utils.migrate_helper import ( + get_collection_data_count, + get_collection_data_time_range, +) logger = RunLogger() @@ -56,10 +60,14 @@ async def main() -> None: await ensure_all_new_collection_indexes() logger.info("已为新版数据构建索引") - old_collection_data_count = await OldArticleFPRank.Meta.collection.count_documents( - {} + old_data_count = await get_collection_data_count(OldArticleFPRank) + old_start_time, old_end_time = await get_collection_data_time_range( + OldArticleFPRank, "date" + ) + logger.info( + f"旧集合数据量:{old_data_count}," + f"数据时间范围:{old_start_time} - {old_end_time}" ) - logger.info(f"旧集合数据量:{old_collection_data_count}") data_to_save: List[ArticleEarningRankingRecordDocument] = [] async for item in OldArticleFPRank.Meta.collection.find().sort( @@ -85,18 +93,27 @@ async def main() -> None: ) data_to_save.clear() - logger.info("数据转换完成,开始进行校验") - new_collection_data_count = ( - await ArticleEarningRankingRecordDocument.Meta.collection.count_documents({}) + logger.info("数据转换完成,开始校验") + new_data_count = await get_collection_data_count( + ArticleEarningRankingRecordDocument + ) + if old_data_count != new_data_count: + logger.critical( + f"数据量不匹配(迁移前 {old_data_count}," f"迁移后 {new_data_count})" + ) + exit() + new_start_time, new_end_time = await get_collection_data_time_range( + ArticleEarningRankingRecordDocument, "date" ) - if old_collection_data_count != new_collection_data_count: + if old_start_time != new_start_time or old_end_time != new_end_time: logger.critical( - f"数据量不匹配(迁移前 {old_collection_data_count}," - f"迁移后 {new_collection_data_count})" + "数据时间范围不匹配" + f"(迁移前 {old_start_time} - {old_end_time}," + f"迁移后 {new_start_time} - {new_end_time})" ) exit() - logger.info("数据校验成功,迁移流程结束") + logger.info("校验成功,迁移流程结束") asyncio.run(main()) diff --git a/migrate_02_assets_rank.py b/migrate_02_assets_rank.py new file mode 100644 index 0000000..30d9486 --- /dev/null +++ b/migrate_02_assets_rank.py @@ -0,0 +1,111 @@ +import asyncio +from datetime import datetime +from typing import List + +from jkit.identifier_convert import user_url_to_slug +from sspeedup.logging.run_logger import RunLogger + +from models.assets_ranking_record import AmountField, AssetsRankingRecordDocument +from models.jianshu_user import JianshuUserDocument +from old_models.assets_rank import OldAssetsRank +from utils.migrate_helper import ( + get_collection_data_count, + get_collection_data_time_range, +) + +logger = RunLogger() + + +async def ensure_all_old_collection_indexes() -> None: + await OldAssetsRank.ensure_indexes() + + +async def ensure_all_new_collection_indexes() -> None: + await JianshuUserDocument.ensure_indexes() + await AssetsRankingRecordDocument.ensure_indexes() + + +async def insert_or_update_user(item: OldAssetsRank) -> None: + if item.user.url: + await JianshuUserDocument.insert_or_update_one( + slug=user_url_to_slug(item.user.url), + updated_at=datetime.fromisoformat(item.date.isoformat()), + id=item.user.id, + name=item.user.name, + ) + + +async def convert_item(item: OldAssetsRank) -> AssetsRankingRecordDocument: + return AssetsRankingRecordDocument( + date=item.date, + ranking=item.ranking, + amount=AmountField( + fp=item.assets.fp, + ftn=item.assets.ftn, + assets=item.assets.total, + ), + user_slug=user_url_to_slug(item.user.url) if item.user.url else None, + ) + + +async def main() -> None: + await ensure_all_old_collection_indexes() + logger.info("已为旧版数据构建索引") + await ensure_all_new_collection_indexes() + logger.info("已为新版数据构建索引") + + old_data_count = await get_collection_data_count(OldAssetsRank) + old_start_time, old_end_time = await get_collection_data_time_range( + OldAssetsRank, "date" + ) + logger.info( + f"旧集合数据量:{old_data_count}," + f"数据时间范围:{old_start_time} - {old_end_time}" + ) + + data_to_save: List[AssetsRankingRecordDocument] = [] + async for item in OldAssetsRank.Meta.collection.find().sort( + {"date": 1, "ranking": 1} + ): + item = OldAssetsRank.from_dict(item) + await insert_or_update_user(item) + data_to_save.append(await convert_item(item)) + + if len(data_to_save) == 1000: + await AssetsRankingRecordDocument.insert_many(data_to_save) + logger.debug( + f"已转换 {len(data_to_save)} 条数据," + f"最新数据的日期为 {data_to_save[-1].date}" + ) + data_to_save.clear() + + if len(data_to_save): + await AssetsRankingRecordDocument.insert_many(data_to_save) + logger.debug( + f"已转换 {len(data_to_save)} 条数据," + f"最新数据的日期为 {data_to_save[-1].date}" + ) + data_to_save.clear() + + logger.info("数据转换完成,开始校验") + new_data_count = await get_collection_data_count(AssetsRankingRecordDocument) + if old_data_count != new_data_count: + logger.critical( + f"数据量不匹配(迁移前 {old_data_count}," f"迁移后 {new_data_count})" + ) + exit() + new_start_time, new_end_time = await get_collection_data_time_range( + AssetsRankingRecordDocument, "date" + ) + if old_start_time != new_start_time or old_end_time != new_end_time: + logger.critical( + "数据时间范围不匹配" + f"(迁移前 {old_start_time} - {old_end_time}," + f"迁移后 {new_start_time} - {new_end_time})" + ) + exit() + + logger.info("校验成功,迁移流程结束") + + +asyncio.run(main()) diff --git a/migrate_03_daily_update_rank.py b/migrate_03_daily_update_rank.py new file mode 100644 index 0000000..1dca503 --- /dev/null +++ b/migrate_03_daily_update_rank.py @@ -0,0 +1,106 @@ +import asyncio +from datetime import datetime +from typing import List + +from jkit.identifier_convert import user_url_to_slug +from sspeedup.logging.run_logger import RunLogger + +from models.daily_update_ranking_record import DailyUpdateRankingRecordDocument +from models.jianshu_user import JianshuUserDocument +from old_models.daily_update_rank import OldDailyUpdateRank +from utils.migrate_helper import ( + get_collection_data_count, + get_collection_data_time_range, +) + +logger = RunLogger() + + +async def ensure_all_old_collection_indexes() -> None: + await OldDailyUpdateRank.ensure_indexes() + + +async def ensure_all_new_collection_indexes() -> None: + await JianshuUserDocument.ensure_indexes() + await DailyUpdateRankingRecordDocument.ensure_indexes() + + +async def insert_or_update_user(item: OldDailyUpdateRank) -> None: + if item.user.url: + await JianshuUserDocument.insert_or_update_one( + slug=user_url_to_slug(item.user.url), + updated_at=datetime.fromisoformat(item.date.isoformat()), + name=item.user.name, + ) + + +async def convert_item(item: OldDailyUpdateRank) -> DailyUpdateRankingRecordDocument: + return DailyUpdateRankingRecordDocument( + date=item.date, + ranking=item.ranking, + days=item.days, + user_slug=user_url_to_slug(item.user.url), + ) + + +async def main() -> None: + await ensure_all_old_collection_indexes() + logger.info("已为旧版数据构建索引") + await ensure_all_new_collection_indexes() + logger.info("已为新版数据构建索引") + + old_data_count = await get_collection_data_count(OldDailyUpdateRank) + old_start_time, old_end_time = await get_collection_data_time_range( + OldDailyUpdateRank, "date" + ) + logger.info( + f"旧集合数据量:{old_data_count}," + f"数据时间范围:{old_start_time} - {old_end_time}" + ) + + data_to_save: List[DailyUpdateRankingRecordDocument] = [] + async for item in OldDailyUpdateRank.Meta.collection.find().sort( + {"date": 1, "ranking": 1} + ): + item = OldDailyUpdateRank.from_dict(item) + await insert_or_update_user(item) + data_to_save.append(await convert_item(item)) + + if len(data_to_save) == 1000: + await DailyUpdateRankingRecordDocument.insert_many(data_to_save) + logger.debug( + f"已转换 {len(data_to_save)} 条数据," + f"最新数据的日期为 {data_to_save[-1].date}" + ) + data_to_save.clear() + + if len(data_to_save): + await DailyUpdateRankingRecordDocument.insert_many(data_to_save) + logger.debug( + f"已转换 {len(data_to_save)} 条数据," + f"最新数据的日期为 {data_to_save[-1].date}" + ) + data_to_save.clear() + + logger.info("数据转换完成,开始校验") + new_data_count = await get_collection_data_count(DailyUpdateRankingRecordDocument) + if old_data_count != new_data_count: + logger.critical( + f"数据量不匹配(迁移前 {old_data_count}," f"迁移后 {new_data_count})" + ) + exit() + new_start_time, new_end_time = await get_collection_data_time_range( + DailyUpdateRankingRecordDocument, "date" + ) + if old_start_time != new_start_time or old_end_time != new_end_time: + logger.critical( + "数据时间范围不匹配" + f"(迁移前 {old_start_time} - {old_end_time}," + f"迁移后 {new_start_time} - {new_end_time})" + ) + exit() + + logger.info("校验成功,迁移流程结束") + + +asyncio.run(main()) diff --git a/models/assets_ranking_record.py b/models/assets_ranking_record.py index e757f11..bb07c05 100644 --- a/models/assets_ranking_record.py +++ b/models/assets_ranking_record.py @@ -21,7 +21,7 @@ class AmountField(Field, **FIELD_OBJECT_CONFIG): fp: Optional[NonNegativeFloat] ftn: Optional[NonNegativeFloat] - assets: PositiveFloat + assets: Optional[PositiveFloat] class AssetsRankingRecordDocument(Document, **DOCUMENT_OBJECT_CONFIG): diff --git a/utils/migrate_helper.py b/utils/migrate_helper.py new file mode 100644 index 0000000..a6d59e1 --- /dev/null +++ b/utils/migrate_helper.py @@ -0,0 +1,21 @@ +from datetime import datetime +from typing import Tuple, Type + +from utils.document_model import Document + + +async def get_collection_data_count(document: Type[Document]) -> int: + return await document.Meta.collection.count_documents({}) + + +async def get_collection_data_time_range( + document: Type[Document], key: str +) -> Tuple[datetime, datetime]: + start_time = ( + await document.Meta.collection.find().sort({key: 1}).limit(1).__anext__() + ) + end_time = ( + await document.Meta.collection.find().sort({key: -1}).limit(1).__anext__() + ) + + return (start_time, end_time)