Skip to content

Commit

Permalink
feat: 添加数据双写逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Oct 24, 2024
1 parent 10d0849 commit 75d206c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 1 deletion.
26 changes: 26 additions & 0 deletions jobs/jianshu/article_earning_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
EarningField,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord as NewDbArticleEarningRankingRecord,
)
from utils.log import (
get_flow_run_name,
log_flow_run_start,
Expand Down Expand Up @@ -88,6 +91,26 @@ async def process_item(
)


def transform_to_new_db_model(
data: list[ArticleEarningRankingRecordDocument],
) -> list[NewDbArticleEarningRankingRecord]:
result: list[NewDbArticleEarningRankingRecord] = []
for item in data:
result.append( # noqa: PERF401
NewDbArticleEarningRankingRecord(
date=item.date.date(),
ranking=item.ranking,
article_slug=item.article.slug,
article_title=item.article.title,
author_slug=item.author_slug,
earning_to_author=item.earning.to_author,
earning_to_voter=item.earning.to_voter,
)
)

return result


@flow(
**generate_flow_config(
name="采集文章收益排行榜记录",
Expand All @@ -105,6 +128,9 @@ async def main() -> None:

await ArticleEarningRankingRecordDocument.insert_many(data)

new_data = transform_to_new_db_model(data)
await NewDbArticleEarningRankingRecord.insert_many(new_data)

log_flow_run_success(logger, data_count=len(data))


Expand Down
25 changes: 25 additions & 0 deletions jobs/jianshu/assets_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
AssetsRankingRecordDocument,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.assets_ranking_record import (
AssetsRankingRecord as NewDbAssetsRankingRecord,
)
from utils.log import (
get_flow_run_name,
log_flow_run_start,
Expand Down Expand Up @@ -84,6 +87,25 @@ async def process_item(
)


def transform_to_new_db_model(
data: list[AssetsRankingRecordDocument],
) -> list[NewDbAssetsRankingRecord]:
result: list[NewDbAssetsRankingRecord] = []
for item in data:
result.append( # noqa: PERF401
NewDbAssetsRankingRecord(
date=item.date.date(),
ranking=item.ranking,
user_slug=item.user_slug,
fp=item.amount.fp,
ftn=item.amount.ftn,
assets=item.amount.assets,
)
)

return result


@flow(
**generate_flow_config(
name="采集资产排行榜记录",
Expand All @@ -104,6 +126,9 @@ async def main() -> None:

await AssetsRankingRecordDocument.insert_many(data)

new_data = transform_to_new_db_model(data)
await NewDbAssetsRankingRecord.insert_many(new_data)

log_flow_run_success(logger, data_count=len(data))


Expand Down
23 changes: 23 additions & 0 deletions jobs/jianshu/daily_update_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
DailyUpdateRankingRecordDocument,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.daily_update_ranking_record import (
DailyUpdateRankingRecord as NewDbDailyUpateRankingRecord,
)
from utils.log import log_flow_run_start, log_flow_run_success, logger
from utils.prefect_helper import (
generate_deployment_config,
Expand All @@ -35,6 +38,23 @@ async def process_item(
)


def transform_to_new_db_model(
data: list[DailyUpdateRankingRecordDocument],
) -> list[NewDbDailyUpateRankingRecord]:
result: list[NewDbDailyUpateRankingRecord] = []
for item in data:
result.append( # noqa: PERF401
NewDbDailyUpateRankingRecord(
date=item.date.date(),
ranking=item.ranking,
user_slug=item.user_slug,
days=item.days,
)
)

return result


@flow(
**generate_flow_config(
name="采集日更排行榜记录",
Expand All @@ -52,6 +72,9 @@ async def main() -> None:

await DailyUpdateRankingRecordDocument.insert_many(data)

new_data = transform_to_new_db_model(data)
await NewDbDailyUpateRankingRecord.insert_many(new_data)

log_flow_run_success(logger, data_count=len(data))


Expand Down
2 changes: 1 addition & 1 deletion models/new/jianshu/daily_update_ranking_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def _create_table(cls) -> None:
ranking SMALLINT NOT NULL,
user_slug VARCHAR(12),
days SMALLINT,
CONSTRAINT pk_daily_update_ranking_records_date_ranking_user_slug PRIMARY KEY (date, ranking, user_slug)
CONSTRAINT pk_daily_update_ranking_records_date_user_slug PRIMARY KEY (date, user_slug)
);
""" # noqa: E501
)
Expand Down

0 comments on commit 75d206c

Please sign in to comment.