Skip to content

Commit

Permalink
feat: 优化迁移脚本
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Mar 11, 2024
1 parent 0e35ac9 commit 26616c5
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 11 deletions.
37 changes: 27 additions & 10 deletions migrate_01_article_fp_rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
)
from models.jianshu_user import JianshuUserDocument
from old_models.article_fp_rank import OldArticleFPRank
from utils.migrate_helper import (
get_collection_data_count,
get_collection_data_time_range,
)

logger = RunLogger()

Expand Down Expand Up @@ -56,10 +60,14 @@ async def main() -> None:
await ensure_all_new_collection_indexes()
logger.info("已为新版数据构建索引")

old_collection_data_count = await OldArticleFPRank.Meta.collection.count_documents(
{}
old_data_count = await get_collection_data_count(OldArticleFPRank)
old_start_time, old_end_time = await get_collection_data_time_range(
OldArticleFPRank, "date"
)
logger.info(
f"旧集合数据量:{old_data_count},"
f"数据时间范围:{old_start_time} - {old_end_time}"
)
logger.info(f"旧集合数据量:{old_collection_data_count}")

data_to_save: List[ArticleEarningRankingRecordDocument] = []
async for item in OldArticleFPRank.Meta.collection.find().sort(
Expand All @@ -85,18 +93,27 @@ async def main() -> None:
)
data_to_save.clear()

logger.info("数据转换完成,开始进行校验")
new_collection_data_count = (
await ArticleEarningRankingRecordDocument.Meta.collection.count_documents({})
logger.info("数据转换完成,开始校验")
new_data_count = await get_collection_data_count(
ArticleEarningRankingRecordDocument
)
if old_data_count != new_data_count:
logger.critical(
f"数据量不匹配(迁移前 {old_data_count}," f"迁移后 {new_data_count})"
)
exit()
new_start_time, new_end_time = await get_collection_data_time_range(
ArticleEarningRankingRecordDocument, "date"
)
if old_collection_data_count != new_collection_data_count:
if old_start_time != new_start_time or old_end_time != new_end_time:
logger.critical(
f"数据量不匹配(迁移前 {old_collection_data_count},"
f"迁移后 {new_collection_data_count})"
"数据时间范围不匹配"
f"(迁移前 {old_start_time} - {old_end_time},"
f"迁移后 {new_start_time} - {new_end_time})"
)
exit()

logger.info("数据校验成功,迁移流程结束")
logger.info("校验成功,迁移流程结束")


asyncio.run(main())
111 changes: 111 additions & 0 deletions migrate_02_assets_rank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import asyncio
from datetime import datetime
from typing import List

from jkit.identifier_convert import user_url_to_slug
from sspeedup.logging.run_logger import RunLogger

from models.assets_ranking_record import AmountField, AssetsRankingRecordDocument
from models.jianshu_user import JianshuUserDocument
from old_models.assets_rank import OldAssetsRank
from utils.migrate_helper import (
get_collection_data_count,
get_collection_data_time_range,
)

logger = RunLogger()


async def ensure_all_old_collection_indexes() -> None:
await OldAssetsRank.ensure_indexes()


async def ensure_all_new_collection_indexes() -> None:
await JianshuUserDocument.ensure_indexes()
await AssetsRankingRecordDocument.ensure_indexes()


async def insert_or_update_user(item: OldAssetsRank) -> None:
if item.user.url:
await JianshuUserDocument.insert_or_update_one(
slug=user_url_to_slug(item.user.url),
updated_at=datetime.fromisoformat(item.date.isoformat()),
id=item.user.id,
name=item.user.name,
)


async def convert_item(item: OldAssetsRank) -> AssetsRankingRecordDocument:
return AssetsRankingRecordDocument(
date=item.date,
ranking=item.ranking,
amount=AmountField(
fp=item.assets.fp,
ftn=item.assets.ftn,
assets=item.assets.total,
),
user_slug=user_url_to_slug(item.user.url) if item.user.url else None,
)


async def main() -> None:
await ensure_all_old_collection_indexes()
logger.info("已为旧版数据构建索引")
await ensure_all_new_collection_indexes()
logger.info("已为新版数据构建索引")

old_data_count = await get_collection_data_count(OldAssetsRank)
old_start_time, old_end_time = await get_collection_data_time_range(
OldAssetsRank, "date"
)
logger.info(
f"旧集合数据量:{old_data_count},"
f"数据时间范围:{old_start_time} - {old_end_time}"
)

data_to_save: List[AssetsRankingRecordDocument] = []
async for item in OldAssetsRank.Meta.collection.find().sort(
{"date": 1, "ranking": 1}
):
item = OldAssetsRank.from_dict(item)
await insert_or_update_user(item)
data_to_save.append(await convert_item(item))

if len(data_to_save) == 1000:
await AssetsRankingRecordDocument.insert_many(data_to_save)
logger.debug(
f"已转换 {len(data_to_save)} 条数据,"
f"最新数据的日期为 {data_to_save[-1].date}"
)
data_to_save.clear()

if len(data_to_save):
await AssetsRankingRecordDocument.insert_many(data_to_save)
logger.debug(
f"已转换 {len(data_to_save)} 条数据,"
f"最新数据的日期为 {data_to_save[-1].date}"
)
data_to_save.clear()

logger.info("数据转换完成,开始校验")
new_data_count = await get_collection_data_count(AssetsRankingRecordDocument)
if old_data_count != new_data_count:
logger.critical(
f"数据量不匹配(迁移前 {old_data_count}," f"迁移后 {new_data_count})"
)
exit()
new_start_time, new_end_time = await get_collection_data_time_range(
AssetsRankingRecordDocument, "date"
)
if old_start_time != new_start_time or old_end_time != new_end_time:
logger.critical(
"数据时间范围不匹配"
f"(迁移前 {old_start_time} - {old_end_time},"
f"迁移后 {new_start_time} - {new_end_time})"
)
exit()

logger.info("校验成功,迁移流程结束")


asyncio.run(main())
106 changes: 106 additions & 0 deletions migrate_03_daily_update_rank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import asyncio
from datetime import datetime
from typing import List

from jkit.identifier_convert import user_url_to_slug
from sspeedup.logging.run_logger import RunLogger

from models.daily_update_ranking_record import DailyUpdateRankingRecordDocument
from models.jianshu_user import JianshuUserDocument
from old_models.daily_update_rank import OldDailyUpdateRank
from utils.migrate_helper import (
get_collection_data_count,
get_collection_data_time_range,
)

logger = RunLogger()


async def ensure_all_old_collection_indexes() -> None:
await OldDailyUpdateRank.ensure_indexes()


async def ensure_all_new_collection_indexes() -> None:
await JianshuUserDocument.ensure_indexes()
await DailyUpdateRankingRecordDocument.ensure_indexes()


async def insert_or_update_user(item: OldDailyUpdateRank) -> None:
if item.user.url:
await JianshuUserDocument.insert_or_update_one(
slug=user_url_to_slug(item.user.url),
updated_at=datetime.fromisoformat(item.date.isoformat()),
name=item.user.name,
)


async def convert_item(item: OldDailyUpdateRank) -> DailyUpdateRankingRecordDocument:
return DailyUpdateRankingRecordDocument(
date=item.date,
ranking=item.ranking,
days=item.days,
user_slug=user_url_to_slug(item.user.url),
)


async def main() -> None:
await ensure_all_old_collection_indexes()
logger.info("已为旧版数据构建索引")
await ensure_all_new_collection_indexes()
logger.info("已为新版数据构建索引")

old_data_count = await get_collection_data_count(OldDailyUpdateRank)
old_start_time, old_end_time = await get_collection_data_time_range(
OldDailyUpdateRank, "date"
)
logger.info(
f"旧集合数据量:{old_data_count},"
f"数据时间范围:{old_start_time} - {old_end_time}"
)

data_to_save: List[DailyUpdateRankingRecordDocument] = []
async for item in OldDailyUpdateRank.Meta.collection.find().sort(
{"date": 1, "ranking": 1}
):
item = OldDailyUpdateRank.from_dict(item)
await insert_or_update_user(item)
data_to_save.append(await convert_item(item))

if len(data_to_save) == 1000:
await DailyUpdateRankingRecordDocument.insert_many(data_to_save)
logger.debug(
f"已转换 {len(data_to_save)} 条数据,"
f"最新数据的日期为 {data_to_save[-1].date}"
)
data_to_save.clear()

if len(data_to_save):
await DailyUpdateRankingRecordDocument.insert_many(data_to_save)
logger.debug(
f"已转换 {len(data_to_save)} 条数据,"
f"最新数据的日期为 {data_to_save[-1].date}"
)
data_to_save.clear()

logger.info("数据转换完成,开始校验")
new_data_count = await get_collection_data_count(DailyUpdateRankingRecordDocument)
if old_data_count != new_data_count:
logger.critical(
f"数据量不匹配(迁移前 {old_data_count}," f"迁移后 {new_data_count})"
)
exit()
new_start_time, new_end_time = await get_collection_data_time_range(
DailyUpdateRankingRecordDocument, "date"
)
if old_start_time != new_start_time or old_end_time != new_end_time:
logger.critical(
"数据时间范围不匹配"
f"(迁移前 {old_start_time} - {old_end_time},"
f"迁移后 {new_start_time} - {new_end_time})"
)
exit()

logger.info("校验成功,迁移流程结束")


asyncio.run(main())
2 changes: 1 addition & 1 deletion models/assets_ranking_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
class AmountField(Field, **FIELD_OBJECT_CONFIG):
fp: Optional[NonNegativeFloat]
ftn: Optional[NonNegativeFloat]
assets: PositiveFloat
assets: Optional[PositiveFloat]


class AssetsRankingRecordDocument(Document, **DOCUMENT_OBJECT_CONFIG):
Expand Down
21 changes: 21 additions & 0 deletions utils/migrate_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from datetime import datetime
from typing import Tuple, Type

from utils.document_model import Document


async def get_collection_data_count(document: Type[Document]) -> int:
return await document.Meta.collection.count_documents({})


async def get_collection_data_time_range(
document: Type[Document], key: str
) -> Tuple[datetime, datetime]:
start_time = (
await document.Meta.collection.find().sort({key: 1}).limit(1).__anext__()
)
end_time = (
await document.Meta.collection.find().sort({key: -1}).limit(1).__anext__()
)

return (start_time, end_time)

0 comments on commit 26616c5

Please sign in to comment.