-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
102 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
import asyncio | ||
from datetime import datetime | ||
from typing import List | ||
|
||
from jkit.identifier_convert import article_url_to_slug, user_url_to_slug | ||
from sspeedup.logging.run_logger import RunLogger | ||
|
||
from models.article_earning_ranking_record import ( | ||
ArticleEarningRankingRecordDocument, | ||
ArticleField, | ||
EarningField, | ||
) | ||
from models.jianshu_user import JianshuUserDocument | ||
from old_models.article_fp_rank import OldArticleFPRank | ||
|
||
logger = RunLogger() | ||
|
||
|
||
async def ensure_all_old_collection_indexes() -> None: | ||
await OldArticleFPRank.ensure_indexes() | ||
|
||
|
||
async def ensure_all_new_collection_indexes() -> None: | ||
await JianshuUserDocument.ensure_indexes() | ||
await ArticleEarningRankingRecordDocument.ensure_indexes() | ||
|
||
|
||
async def insert_or_update_user(item: OldArticleFPRank) -> None: | ||
if item.author.url: | ||
await JianshuUserDocument.insert_or_update_one( | ||
slug=user_url_to_slug(item.author.url), | ||
updated_at=datetime.fromisoformat(item.date.isoformat()), | ||
id=item.author.id, | ||
name=item.author.name, | ||
) | ||
|
||
|
||
async def convert_item(item: OldArticleFPRank) -> ArticleEarningRankingRecordDocument: | ||
return ArticleEarningRankingRecordDocument( | ||
date=item.date, | ||
ranking=item.ranking, | ||
article=ArticleField( | ||
slug=article_url_to_slug(item.article.url) if item.article.url else None, | ||
title=item.article.title, | ||
), | ||
author_slug=user_url_to_slug(item.author.url) if item.author.url else None, | ||
earning=EarningField( | ||
to_author=item.reward.to_author, to_voter=item.reward.to_voter | ||
), | ||
) | ||
|
||
|
||
async def main() -> None: | ||
await ensure_all_old_collection_indexes() | ||
logger.info("已为旧版数据构建索引") | ||
await ensure_all_new_collection_indexes() | ||
logger.info("已为新版数据构建索引") | ||
|
||
old_collection_data_count = await OldArticleFPRank.Meta.collection.count_documents( | ||
{} | ||
) | ||
logger.info(f"旧集合数据量:{old_collection_data_count}") | ||
|
||
data_to_save: List[ArticleEarningRankingRecordDocument] = [] | ||
async for item in OldArticleFPRank.Meta.collection.find().sort( | ||
{"date": 1, "ranking": 1} | ||
): | ||
item = OldArticleFPRank.from_dict(item) | ||
await insert_or_update_user(item) | ||
data_to_save.append(await convert_item(item)) | ||
|
||
if len(data_to_save) == 1000: | ||
await ArticleEarningRankingRecordDocument.insert_many(data_to_save) | ||
logger.debug( | ||
f"已转换 {len(data_to_save)} 条数据," | ||
f"最新数据的日期为 {data_to_save[-1].date}" | ||
) | ||
data_to_save.clear() | ||
|
||
if len(data_to_save): | ||
await ArticleEarningRankingRecordDocument.insert_many(data_to_save) | ||
logger.debug( | ||
f"已转换 {len(data_to_save)} 条数据," | ||
f"最新数据的日期为 {data_to_save[-1].date}" | ||
) | ||
data_to_save.clear() | ||
|
||
logger.info("数据转换完成,开始进行校验") | ||
new_collection_data_count = ( | ||
await ArticleEarningRankingRecordDocument.Meta.collection.count_documents({}) | ||
) | ||
if old_collection_data_count != new_collection_data_count: | ||
logger.critical( | ||
f"数据量不匹配(迁移前 {old_collection_data_count}," | ||
f"迁移后 {new_collection_data_count})" | ||
) | ||
exit() | ||
|
||
logger.info("数据校验成功,迁移流程结束") | ||
|
||
|
||
asyncio.run(main()) |