Skip to content

Commit

Permalink
JFetcher v3.6.0
Browse files Browse the repository at this point in the history
功能变动:

- **实现数据双写逻辑**
- **添加新版数据库模型与迁移脚本**
- 更新部署配置
  • Loading branch information
FHU-yezi committed Oct 24, 2024
2 parents b063521 + d57c272 commit 3ee5066
Show file tree
Hide file tree
Showing 35 changed files with 1,237 additions and 663 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ networks:

services:
main:
image: jfetcher:3.5.0
image: jfetcher:3.6.0
container_name: jfetcher
build: .
volumes:
Expand All @@ -26,7 +26,7 @@ services:
deploy:
resources:
limits:
memory: 768M
memory: 1G
restart_policy:
condition: on-failure
delay: 5s
Expand Down
37 changes: 37 additions & 0 deletions fix_article_earning_ranking_missing_author_slug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from asyncio import run as asyncio_run
from asyncio import sleep
from datetime import datetime

from jkit.article import Article
from jkit.exceptions import ResourceUnavailableError

from models.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecordDocument,
)


async def main() -> None:
async for item in ArticleEarningRankingRecordDocument.find_many(
{"date": {"$gte": datetime(2024, 10, 23)}, "authorSlug": None},
sort={"date": "ASC", "ranking": "ASC"},
):
if not item.article.slug:
print("Skipping because article slug is missing")
continue

try:
article = Article.from_slug(item.article.slug)
author_slug = (await article.info).author_info.slug

await ArticleEarningRankingRecordDocument.update_one(
{"date": item.date, "ranking": item.ranking},
{"$set": {"authorSlug": author_slug}},
)
print(f"Fixed author slug for {item.article.slug} to {author_slug}")
except ResourceUnavailableError:
print("Skipping because article is deleted")

await sleep(0.5)


asyncio_run(main())
3 changes: 1 addition & 2 deletions jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ def import_deployment(path: str) -> DeploymentType:

DEPLOYMENT_PATHS: set[str] = {
"jobs.jianshu.article_earning_ranking:deployment",
"jobs.jianshu.assets_ranking:deployment",
"jobs.jianshu.user_assets_ranking:deployment",
"jobs.jianshu.daily_update_ranking:deployment",
"jobs.jianshu.lp_recommend:deployment",
"jobs.jpep.ftn_trade:buy_deployment",
"jobs.jpep.ftn_trade:sell_deployment",
}
Expand Down
33 changes: 33 additions & 0 deletions jobs/jianshu/article_earning_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
EarningField,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord as NewDbArticleEarningRankingRecord,
)
from models.new.jianshu.user import User as NewDbUser
from utils.log import (
get_flow_run_name,
log_flow_run_start,
Expand Down Expand Up @@ -72,6 +76,12 @@ async def process_item(
name=author_info.name,
avatar_url=author_info.avatar_url,
)
await NewDbUser.upsert(
slug=author_slug,
id=author_info.id,
name=author_info.name,
avatar_url=author_info.avatar_url,
)

return ArticleEarningRankingRecordDocument(
date=date,
Expand All @@ -88,6 +98,26 @@ async def process_item(
)


def transform_to_new_db_model(
data: list[ArticleEarningRankingRecordDocument],
) -> list[NewDbArticleEarningRankingRecord]:
result: list[NewDbArticleEarningRankingRecord] = []
for item in data:
result.append( # noqa: PERF401
NewDbArticleEarningRankingRecord(
date=item.date.date(),
ranking=item.ranking,
slug=item.article.slug,
title=item.article.title,
author_slug=item.author_slug,
author_earning=item.earning.to_author,
voter_earning=item.earning.to_voter,
)
)

return result


@flow(
**generate_flow_config(
name="采集文章收益排行榜记录",
Expand All @@ -105,6 +135,9 @@ async def main() -> None:

await ArticleEarningRankingRecordDocument.insert_many(data)

new_data = transform_to_new_db_model(data)
await NewDbArticleEarningRankingRecord.insert_many(new_data)

log_flow_run_success(logger, data_count=len(data))


Expand Down
29 changes: 29 additions & 0 deletions jobs/jianshu/daily_update_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
DailyUpdateRankingRecordDocument,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.daily_update_ranking_record import (
DailyUpdateRankingRecord as NewDbDailyUpateRankingRecord,
)
from models.new.jianshu.user import User as NewDbUser
from utils.log import log_flow_run_start, log_flow_run_success, logger
from utils.prefect_helper import (
generate_deployment_config,
Expand All @@ -26,6 +30,11 @@ async def process_item(
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)
await NewDbUser.upsert(
slug=item.user_info.slug,
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)

return DailyUpdateRankingRecordDocument(
date=date,
Expand All @@ -35,6 +44,23 @@ async def process_item(
)


def transform_to_new_db_model(
data: list[DailyUpdateRankingRecordDocument],
) -> list[NewDbDailyUpateRankingRecord]:
result: list[NewDbDailyUpateRankingRecord] = []
for item in data:
result.append( # noqa: PERF401
NewDbDailyUpateRankingRecord(
date=item.date.date(),
ranking=item.ranking,
slug=item.user_slug,
days=item.days,
)
)

return result


@flow(
**generate_flow_config(
name="采集日更排行榜记录",
Expand All @@ -52,6 +78,9 @@ async def main() -> None:

await DailyUpdateRankingRecordDocument.insert_many(data)

new_data = transform_to_new_db_model(data)
await NewDbDailyUpateRankingRecord.insert_many(new_data)

log_flow_run_success(logger, data_count=len(data))


Expand Down
70 changes: 0 additions & 70 deletions jobs/jianshu/lottery.py

This file was deleted.

99 changes: 0 additions & 99 deletions jobs/jianshu/lp_recommend.py

This file was deleted.

Loading

0 comments on commit 3ee5066

Please sign in to comment.