From 0e35ac9229bcac0f94ed55effc4ed8d1853a419f Mon Sep 17 00:00:00 2001 From: yezi Date: Tue, 12 Mar 2024 05:41:31 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=96=87=E7=AB=A0?= =?UTF-8?q?=E6=94=B6=E7=9B=8A=E6=8E=92=E8=A1=8C=E6=A6=9C=E8=BF=81=E7=A7=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- migrate_01_article_fp_rank.py | 102 ++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 migrate_01_article_fp_rank.py diff --git a/migrate_01_article_fp_rank.py b/migrate_01_article_fp_rank.py new file mode 100644 index 0000000..8b4dbbc --- /dev/null +++ b/migrate_01_article_fp_rank.py @@ -0,0 +1,102 @@ +import asyncio +from datetime import datetime +from typing import List + +from jkit.identifier_convert import article_url_to_slug, user_url_to_slug +from sspeedup.logging.run_logger import RunLogger + +from models.article_earning_ranking_record import ( + ArticleEarningRankingRecordDocument, + ArticleField, + EarningField, +) +from models.jianshu_user import JianshuUserDocument +from old_models.article_fp_rank import OldArticleFPRank + +logger = RunLogger() + + +async def ensure_all_old_collection_indexes() -> None: + await OldArticleFPRank.ensure_indexes() + + +async def ensure_all_new_collection_indexes() -> None: + await JianshuUserDocument.ensure_indexes() + await ArticleEarningRankingRecordDocument.ensure_indexes() + + +async def insert_or_update_user(item: OldArticleFPRank) -> None: + if item.author.url: + await JianshuUserDocument.insert_or_update_one( + slug=user_url_to_slug(item.author.url), + updated_at=datetime.fromisoformat(item.date.isoformat()), + id=item.author.id, + name=item.author.name, + ) + + +async def convert_item(item: OldArticleFPRank) -> ArticleEarningRankingRecordDocument: + return ArticleEarningRankingRecordDocument( + date=item.date, + ranking=item.ranking, + article=ArticleField( + slug=article_url_to_slug(item.article.url) if item.article.url else None, + title=item.article.title, + ), + author_slug=user_url_to_slug(item.author.url) if item.author.url else None, + earning=EarningField( + to_author=item.reward.to_author, to_voter=item.reward.to_voter + ), + ) + + +async def main() -> None: + await ensure_all_old_collection_indexes() + logger.info("已为旧版数据构建索引") + await ensure_all_new_collection_indexes() + logger.info("已为新版数据构建索引") + + old_collection_data_count = await OldArticleFPRank.Meta.collection.count_documents( + {} + ) + logger.info(f"旧集合数据量:{old_collection_data_count}") + + data_to_save: List[ArticleEarningRankingRecordDocument] = [] + async for item in OldArticleFPRank.Meta.collection.find().sort( + {"date": 1, "ranking": 1} + ): + item = OldArticleFPRank.from_dict(item) + await insert_or_update_user(item) + data_to_save.append(await convert_item(item)) + + if len(data_to_save) == 1000: + await ArticleEarningRankingRecordDocument.insert_many(data_to_save) + logger.debug( + f"已转换 {len(data_to_save)} 条数据," + f"最新数据的日期为 {data_to_save[-1].date}" + ) + data_to_save.clear() + + if len(data_to_save): + await ArticleEarningRankingRecordDocument.insert_many(data_to_save) + logger.debug( + f"已转换 {len(data_to_save)} 条数据," + f"最新数据的日期为 {data_to_save[-1].date}" + ) + data_to_save.clear() + + logger.info("数据转换完成,开始进行校验") + new_collection_data_count = ( + await ArticleEarningRankingRecordDocument.Meta.collection.count_documents({}) + ) + if old_collection_data_count != new_collection_data_count: + logger.critical( + f"数据量不匹配(迁移前 {old_collection_data_count}," + f"迁移后 {new_collection_data_count})" + ) + exit() + + logger.info("数据校验成功,迁移流程结束") + + +asyncio.run(main())