Skip to content

Commit

Permalink
feat: 移除双写逻辑与迁移脚本
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Oct 26, 2024
1 parent 2e8b30d commit b19c313
Show file tree
Hide file tree
Showing 21 changed files with 275 additions and 901 deletions.
65 changes: 12 additions & 53 deletions jobs/jianshu/article_earning_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,8 @@
from sshared.retry.asyncio import retry
from sshared.time import get_today_as_datetime

from models.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecordDocument,
ArticleField,
EarningField,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord as NewDbArticleEarningRankingRecord,
)
from models.new.jianshu.user import User as NewDbUser
from models.jianshu.article_earning_ranking_record import ArticleEarningRankingRecord
from models.jianshu.user import User
from utils.log import (
get_flow_run_name,
log_flow_run_start,
Expand Down Expand Up @@ -66,58 +58,28 @@ async def get_author_slug_and_info(

async def process_item(
item: RecordField, date: datetime
) -> ArticleEarningRankingRecordDocument:
) -> ArticleEarningRankingRecord:
author_slug, author_info = await get_author_slug_and_info(item)

if author_slug is not None and author_info is not None:
await UserDocument.insert_or_update_one(
slug=author_slug,
id=author_info.id,
name=author_info.name,
avatar_url=author_info.avatar_url,
)
await NewDbUser.upsert(
await User.upsert(
slug=author_slug,
id=author_info.id,
name=author_info.name,
avatar_url=author_info.avatar_url,
)

return ArticleEarningRankingRecordDocument(
date=date,
return ArticleEarningRankingRecord(
date=date.date(),
ranking=item.ranking,
article=ArticleField(
title=item.title,
slug=item.slug,
),
slug=item.slug,
title=item.title,
author_slug=author_slug,
earning=EarningField(
to_author=item.fp_to_author_anount,
to_voter=item.fp_to_voter_amount,
),
author_earning=item.fp_to_author_anount,
voter_earning=item.fp_to_voter_amount,
)


def transform_to_new_db_model(
data: list[ArticleEarningRankingRecordDocument],
) -> list[NewDbArticleEarningRankingRecord]:
result: list[NewDbArticleEarningRankingRecord] = []
for item in data:
result.append( # noqa: PERF401
NewDbArticleEarningRankingRecord(
date=item.date.date(),
ranking=item.ranking,
slug=item.article.slug,
title=item.article.title,
author_slug=item.author_slug,
author_earning=item.earning.to_author,
voter_earning=item.earning.to_voter,
)
)

return result


@flow(
**generate_flow_config(
name="采集文章收益排行榜记录",
Expand All @@ -128,15 +90,12 @@ async def main() -> None:

date = get_today_as_datetime() - timedelta(days=1)

data: list[ArticleEarningRankingRecordDocument] = []
data: list[ArticleEarningRankingRecord] = []
async for item in ArticleEarningRanking(date.date()):
processed_item = await process_item(item, date=date)
data.append(processed_item)

await ArticleEarningRankingRecordDocument.insert_many(data)

new_data = transform_to_new_db_model(data)
await NewDbArticleEarningRankingRecord.insert_many(new_data)
await ArticleEarningRankingRecord.insert_many(data)

log_flow_run_success(logger, data_count=len(data))

Expand Down
47 changes: 9 additions & 38 deletions jobs/jianshu/daily_update_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@
from sshared.time import get_today_as_datetime

from models.jianshu.daily_update_ranking_record import (
DailyUpdateRankingRecordDocument,
DailyUpdateRankingRecord as DbDailyUpdateRankingRecord,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.daily_update_ranking_record import (
DailyUpdateRankingRecord as NewDbDailyUpateRankingRecord,
)
from models.new.jianshu.user import User as NewDbUser
from models.jianshu.user import User
from utils.log import log_flow_run_start, log_flow_run_success, logger
from utils.prefect_helper import (
generate_deployment_config,
Expand All @@ -24,43 +20,21 @@

async def process_item(
item: DailyUpdateRankingRecord, date: datetime
) -> DailyUpdateRankingRecordDocument:
await UserDocument.insert_or_update_one(
slug=item.user_info.slug,
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)
await NewDbUser.upsert(
) -> DbDailyUpdateRankingRecord:
await User.upsert(
slug=item.user_info.slug,
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)

return DailyUpdateRankingRecordDocument(
date=date,
return DbDailyUpdateRankingRecord(
date=date.date(),
ranking=item.ranking,
slug=item.user_info.slug,
days=item.days,
user_slug=item.user_info.slug,
)


def transform_to_new_db_model(
data: list[DailyUpdateRankingRecordDocument],
) -> list[NewDbDailyUpateRankingRecord]:
result: list[NewDbDailyUpateRankingRecord] = []
for item in data:
result.append( # noqa: PERF401
NewDbDailyUpateRankingRecord(
date=item.date.date(),
ranking=item.ranking,
slug=item.user_slug,
days=item.days,
)
)

return result


@flow(
**generate_flow_config(
name="采集日更排行榜记录",
Expand All @@ -71,15 +45,12 @@ async def main() -> None:

date = get_today_as_datetime()

data: list[DailyUpdateRankingRecordDocument] = []
data: list[DbDailyUpdateRankingRecord] = []
async for item in DailyUpdateRanking():
processed_item = await process_item(item, date=date)
data.append(processed_item)

await DailyUpdateRankingRecordDocument.insert_many(data)

new_data = transform_to_new_db_model(data)
await NewDbDailyUpateRankingRecord.insert_many(new_data)
await DbDailyUpdateRankingRecord.insert_many(data)

log_flow_run_success(logger, data_count=len(data))

Expand Down
61 changes: 13 additions & 48 deletions jobs/jianshu/user_assets_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@
from sshared.retry.asyncio import retry
from sshared.time import get_today_as_datetime

from models.jianshu.assets_ranking_record import (
AmountField,
AssetsRankingRecordDocument,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.user import User as NewDbUser
from models.new.jianshu.user_assets_ranking_record import (
UserAssetsRankingRecord as NewDbAssetsRankingRecord,
from models.jianshu.user import User as DbUser
from models.jianshu.user_assets_ranking_record import (
UserAssetsRankingRecord as DbAssetsRankingRecord,
)
from utils.log import (
get_flow_run_name,
Expand Down Expand Up @@ -65,54 +60,27 @@ async def get_fp_ftn_amount(

async def process_item(
item: AssetsRankingRecord, date: datetime
) -> AssetsRankingRecordDocument:
) -> DbAssetsRankingRecord:
fp_amount, ftn_amount = await get_fp_ftn_amount(item)

if item.user_info.slug:
await UserDocument.insert_or_update_one(
slug=item.user_info.slug,
id=item.user_info.id,
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)
await NewDbUser.upsert(
await DbUser.upsert(
slug=item.user_info.slug,
id=item.user_info.id,
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)

return AssetsRankingRecordDocument(
date=date,
return DbAssetsRankingRecord(
date=date.date(),
ranking=item.ranking,
amount=AmountField(
fp=fp_amount,
ftn=ftn_amount,
assets=item.assets_amount,
),
user_slug=item.user_info.slug,
slug=item.user_info.slug,
fp=fp_amount,
ftn=ftn_amount,
assets=item.assets_amount,
)


def transform_to_new_db_model(
data: list[AssetsRankingRecordDocument],
) -> list[NewDbAssetsRankingRecord]:
result: list[NewDbAssetsRankingRecord] = []
for item in data:
result.append( # noqa: PERF401
NewDbAssetsRankingRecord(
date=item.date.date(),
ranking=item.ranking,
slug=item.user_slug,
fp=item.amount.fp,
ftn=item.amount.ftn,
assets=item.amount.assets,
)
)

return result


@flow(
**generate_flow_config(
name="采集用户资产排行榜记录",
Expand All @@ -123,18 +91,15 @@ async def main() -> None:

date = get_today_as_datetime()

data: list[AssetsRankingRecordDocument] = []
data: list[DbAssetsRankingRecord] = []
async for item in AssetsRanking():
processed_item = await process_item(item, date=date)
data.append(processed_item)

if len(data) == 1000:
break

await AssetsRankingRecordDocument.insert_many(data)

new_data = transform_to_new_db_model(data)
await NewDbAssetsRankingRecord.insert_many(new_data)
await DbAssetsRankingRecord.insert_many(data)

log_flow_run_success(logger, data_count=len(data))

Expand Down
23 changes: 12 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,28 @@
from prefect import serve

from jobs import DEPLOYMENTS
from models import MODELS
from models.new.jianshu.article_earning_ranking_record import (
from models.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord,
)
from models.new.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord
from models.new.jianshu.user import User
from models.new.jianshu.user_assets_ranking_record import UserAssetsRankingRecord
from models.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord
from models.jianshu.user import User
from models.jianshu.user_assets_ranking_record import UserAssetsRankingRecord
from models.jpep.credit_history import CreditHistoryDocument
from models.jpep.ftn_trade_order import FTNTradeOrderDocument
from models.jpep.user import UserDocument
from utils.log import logger


async def main() -> None:
logger.info("正在为数据库创建索引")
for model in MODELS:
await model.ensure_indexes()
logger.info("索引创建完成")
logger.info("正在初始化新版数据库")
await CreditHistoryDocument.ensure_indexes()
await FTNTradeOrderDocument.ensure_indexes()
await UserDocument.ensure_indexes()

await ArticleEarningRankingRecord.init()
await UserAssetsRankingRecord.init()
await DailyUpdateRankingRecord.init()
await User.init()
logger.info("初始化新版数据库完成")
logger.info("初始化数据库完成")

logger.info("启动工作流")
await serve(*DEPLOYMENTS, print_starting_message=False) # type: ignore
Expand Down
54 changes: 0 additions & 54 deletions migrate_article_earning_ranking_record.py

This file was deleted.

Loading

0 comments on commit b19c313

Please sign in to comment.