diff --git a/jobs/jianshu/article_earning_ranking.py b/jobs/jianshu/article_earning_ranking.py index 08de95c..450138d 100644 --- a/jobs/jianshu/article_earning_ranking.py +++ b/jobs/jianshu/article_earning_ranking.py @@ -15,6 +15,9 @@ EarningField, ) from models.jianshu.user import UserDocument +from models.new.jianshu.article_earning_ranking_record import ( + ArticleEarningRankingRecord as NewDbArticleEarningRankingRecord, +) from utils.log import ( get_flow_run_name, log_flow_run_start, @@ -88,6 +91,26 @@ async def process_item( ) +def transform_to_new_db_model( + data: list[ArticleEarningRankingRecordDocument], +) -> list[NewDbArticleEarningRankingRecord]: + result: list[NewDbArticleEarningRankingRecord] = [] + for item in data: + result.append( # noqa: PERF401 + NewDbArticleEarningRankingRecord( + date=item.date.date(), + ranking=item.ranking, + article_slug=item.article.slug, + article_title=item.article.title, + author_slug=item.author_slug, + earning_to_author=item.earning.to_author, + earning_to_voter=item.earning.to_voter, + ) + ) + + return result + + @flow( **generate_flow_config( name="采集文章收益排行榜记录", @@ -105,6 +128,9 @@ async def main() -> None: await ArticleEarningRankingRecordDocument.insert_many(data) + new_data = transform_to_new_db_model(data) + await NewDbArticleEarningRankingRecord.insert_many(new_data) + log_flow_run_success(logger, data_count=len(data)) diff --git a/jobs/jianshu/assets_ranking.py b/jobs/jianshu/assets_ranking.py index 4e2a8a6..e946485 100644 --- a/jobs/jianshu/assets_ranking.py +++ b/jobs/jianshu/assets_ranking.py @@ -14,6 +14,9 @@ AssetsRankingRecordDocument, ) from models.jianshu.user import UserDocument +from models.new.jianshu.assets_ranking_record import ( + AssetsRankingRecord as NewDbAssetsRankingRecord, +) from utils.log import ( get_flow_run_name, log_flow_run_start, @@ -84,6 +87,25 @@ async def process_item( ) +def transform_to_new_db_model( + data: list[AssetsRankingRecordDocument], +) -> list[NewDbAssetsRankingRecord]: + result: list[NewDbAssetsRankingRecord] = [] + for item in data: + result.append( # noqa: PERF401 + NewDbAssetsRankingRecord( + date=item.date.date(), + ranking=item.ranking, + user_slug=item.user_slug, + fp=item.amount.fp, + ftn=item.amount.ftn, + assets=item.amount.assets, + ) + ) + + return result + + @flow( **generate_flow_config( name="采集资产排行榜记录", @@ -104,6 +126,9 @@ async def main() -> None: await AssetsRankingRecordDocument.insert_many(data) + new_data = transform_to_new_db_model(data) + await NewDbAssetsRankingRecord.insert_many(new_data) + log_flow_run_success(logger, data_count=len(data)) diff --git a/jobs/jianshu/daily_update_ranking.py b/jobs/jianshu/daily_update_ranking.py index de7d3b6..60f2756 100644 --- a/jobs/jianshu/daily_update_ranking.py +++ b/jobs/jianshu/daily_update_ranking.py @@ -11,6 +11,9 @@ DailyUpdateRankingRecordDocument, ) from models.jianshu.user import UserDocument +from models.new.jianshu.daily_update_ranking_record import ( + DailyUpdateRankingRecord as NewDbDailyUpateRankingRecord, +) from utils.log import log_flow_run_start, log_flow_run_success, logger from utils.prefect_helper import ( generate_deployment_config, @@ -35,6 +38,23 @@ async def process_item( ) +def transform_to_new_db_model( + data: list[DailyUpdateRankingRecordDocument], +) -> list[NewDbDailyUpateRankingRecord]: + result: list[NewDbDailyUpateRankingRecord] = [] + for item in data: + result.append( # noqa: PERF401 + NewDbDailyUpateRankingRecord( + date=item.date.date(), + ranking=item.ranking, + user_slug=item.user_slug, + days=item.days, + ) + ) + + return result + + @flow( **generate_flow_config( name="采集日更排行榜记录", @@ -52,6 +72,9 @@ async def main() -> None: await DailyUpdateRankingRecordDocument.insert_many(data) + new_data = transform_to_new_db_model(data) + await NewDbDailyUpateRankingRecord.insert_many(new_data) + log_flow_run_success(logger, data_count=len(data)) diff --git a/models/new/jianshu/daily_update_ranking_record.py b/models/new/jianshu/daily_update_ranking_record.py index 9d20820..57b47ca 100644 --- a/models/new/jianshu/daily_update_ranking_record.py +++ b/models/new/jianshu/daily_update_ranking_record.py @@ -21,7 +21,7 @@ async def _create_table(cls) -> None: ranking SMALLINT NOT NULL, user_slug VARCHAR(12), days SMALLINT, - CONSTRAINT pk_daily_update_ranking_records_date_ranking_user_slug PRIMARY KEY (date, ranking, user_slug) + CONSTRAINT pk_daily_update_ranking_records_date_user_slug PRIMARY KEY (date, user_slug) ); """ # noqa: E501 )