From 75d206c0960c88088ec2e92c1a5bc4d07c5fa768 Mon Sep 17 00:00:00 2001 From: FHU-yezi Date: Thu, 24 Oct 2024 15:05:48 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=8F=8C=E5=86=99=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jobs/jianshu/article_earning_ranking.py | 26 +++++++++++++++++++ jobs/jianshu/assets_ranking.py | 25 ++++++++++++++++++ jobs/jianshu/daily_update_ranking.py | 23 ++++++++++++++++ .../jianshu/daily_update_ranking_record.py | 2 +- 4 files changed, 75 insertions(+), 1 deletion(-) diff --git a/jobs/jianshu/article_earning_ranking.py b/jobs/jianshu/article_earning_ranking.py index 08de95c..450138d 100644 --- a/jobs/jianshu/article_earning_ranking.py +++ b/jobs/jianshu/article_earning_ranking.py @@ -15,6 +15,9 @@ EarningField, ) from models.jianshu.user import UserDocument +from models.new.jianshu.article_earning_ranking_record import ( + ArticleEarningRankingRecord as NewDbArticleEarningRankingRecord, +) from utils.log import ( get_flow_run_name, log_flow_run_start, @@ -88,6 +91,26 @@ async def process_item( ) +def transform_to_new_db_model( + data: list[ArticleEarningRankingRecordDocument], +) -> list[NewDbArticleEarningRankingRecord]: + result: list[NewDbArticleEarningRankingRecord] = [] + for item in data: + result.append( # noqa: PERF401 + NewDbArticleEarningRankingRecord( + date=item.date.date(), + ranking=item.ranking, + article_slug=item.article.slug, + article_title=item.article.title, + author_slug=item.author_slug, + earning_to_author=item.earning.to_author, + earning_to_voter=item.earning.to_voter, + ) + ) + + return result + + @flow( **generate_flow_config( name="采集文章收益排行榜记录", @@ -105,6 +128,9 @@ async def main() -> None: await ArticleEarningRankingRecordDocument.insert_many(data) + new_data = transform_to_new_db_model(data) + await NewDbArticleEarningRankingRecord.insert_many(new_data) + log_flow_run_success(logger, data_count=len(data)) diff --git a/jobs/jianshu/assets_ranking.py b/jobs/jianshu/assets_ranking.py index 4e2a8a6..e946485 100644 --- a/jobs/jianshu/assets_ranking.py +++ b/jobs/jianshu/assets_ranking.py @@ -14,6 +14,9 @@ AssetsRankingRecordDocument, ) from models.jianshu.user import UserDocument +from models.new.jianshu.assets_ranking_record import ( + AssetsRankingRecord as NewDbAssetsRankingRecord, +) from utils.log import ( get_flow_run_name, log_flow_run_start, @@ -84,6 +87,25 @@ async def process_item( ) +def transform_to_new_db_model( + data: list[AssetsRankingRecordDocument], +) -> list[NewDbAssetsRankingRecord]: + result: list[NewDbAssetsRankingRecord] = [] + for item in data: + result.append( # noqa: PERF401 + NewDbAssetsRankingRecord( + date=item.date.date(), + ranking=item.ranking, + user_slug=item.user_slug, + fp=item.amount.fp, + ftn=item.amount.ftn, + assets=item.amount.assets, + ) + ) + + return result + + @flow( **generate_flow_config( name="采集资产排行榜记录", @@ -104,6 +126,9 @@ async def main() -> None: await AssetsRankingRecordDocument.insert_many(data) + new_data = transform_to_new_db_model(data) + await NewDbAssetsRankingRecord.insert_many(new_data) + log_flow_run_success(logger, data_count=len(data)) diff --git a/jobs/jianshu/daily_update_ranking.py b/jobs/jianshu/daily_update_ranking.py index de7d3b6..60f2756 100644 --- a/jobs/jianshu/daily_update_ranking.py +++ b/jobs/jianshu/daily_update_ranking.py @@ -11,6 +11,9 @@ DailyUpdateRankingRecordDocument, ) from models.jianshu.user import UserDocument +from models.new.jianshu.daily_update_ranking_record import ( + DailyUpdateRankingRecord as NewDbDailyUpateRankingRecord, +) from utils.log import log_flow_run_start, log_flow_run_success, logger from utils.prefect_helper import ( generate_deployment_config, @@ -35,6 +38,23 @@ async def process_item( ) +def transform_to_new_db_model( + data: list[DailyUpdateRankingRecordDocument], +) -> list[NewDbDailyUpateRankingRecord]: + result: list[NewDbDailyUpateRankingRecord] = [] + for item in data: + result.append( # noqa: PERF401 + NewDbDailyUpateRankingRecord( + date=item.date.date(), + ranking=item.ranking, + user_slug=item.user_slug, + days=item.days, + ) + ) + + return result + + @flow( **generate_flow_config( name="采集日更排行榜记录", @@ -52,6 +72,9 @@ async def main() -> None: await DailyUpdateRankingRecordDocument.insert_many(data) + new_data = transform_to_new_db_model(data) + await NewDbDailyUpateRankingRecord.insert_many(new_data) + log_flow_run_success(logger, data_count=len(data)) diff --git a/models/new/jianshu/daily_update_ranking_record.py b/models/new/jianshu/daily_update_ranking_record.py index 9d20820..57b47ca 100644 --- a/models/new/jianshu/daily_update_ranking_record.py +++ b/models/new/jianshu/daily_update_ranking_record.py @@ -21,7 +21,7 @@ async def _create_table(cls) -> None: ranking SMALLINT NOT NULL, user_slug VARCHAR(12), days SMALLINT, - CONSTRAINT pk_daily_update_ranking_records_date_ranking_user_slug PRIMARY KEY (date, ranking, user_slug) + CONSTRAINT pk_daily_update_ranking_records_date_user_slug PRIMARY KEY (date, user_slug) ); """ # noqa: E501 )