Skip to content

Commit

Permalink
feat: 实现新版数据库 users 表插入与更新机制
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Oct 24, 2024
1 parent 7fe4484 commit fa3bc48
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 0 deletions.
7 changes: 7 additions & 0 deletions jobs/jianshu/article_earning_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from models.new.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord as NewDbArticleEarningRankingRecord,
)
from models.new.jianshu.user import User as NewDbUser
from utils.log import (
get_flow_run_name,
log_flow_run_start,
Expand Down Expand Up @@ -75,6 +76,12 @@ async def process_item(
name=author_info.name,
avatar_url=author_info.avatar_url,
)
await NewDbUser.upsert(
slug=author_slug,
id=author_info.id,
name=author_info.name,
avatar_url=author_info.avatar_url,
)

return ArticleEarningRankingRecordDocument(
date=date,
Expand Down
6 changes: 6 additions & 0 deletions jobs/jianshu/daily_update_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from models.new.jianshu.daily_update_ranking_record import (
DailyUpdateRankingRecord as NewDbDailyUpateRankingRecord,
)
from models.new.jianshu.user import User as NewDbUser
from utils.log import log_flow_run_start, log_flow_run_success, logger
from utils.prefect_helper import (
generate_deployment_config,
Expand All @@ -29,6 +30,11 @@ async def process_item(
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)
await NewDbUser.upsert(
slug=item.user_info.slug,
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)

return DailyUpdateRankingRecordDocument(
date=date,
Expand Down
7 changes: 7 additions & 0 deletions jobs/jianshu/user_assets_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
AssetsRankingRecordDocument,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.user import User as NewDbUser
from models.new.jianshu.user_assets_ranking_record import (
UserAssetsRankingRecord as NewDbAssetsRankingRecord,
)
Expand Down Expand Up @@ -74,6 +75,12 @@ async def process_item(
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)
await NewDbUser.upsert(
slug=item.user_info.slug,
id=item.user_info.id,
name=item.user_info.name,
avatar_url=item.user_info.avatar_url,
)

return AssetsRankingRecordDocument(
date=date,
Expand Down
100 changes: 100 additions & 0 deletions models/new/jianshu/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,103 @@ async def create(self) -> None:
self.avatar_url,
),
)

@classmethod
async def get_by_slug(cls, slug: str) -> Optional["User"]:
cursor = await jianshu_conn.execute(
"SELECT status, update_time, id, name, history_names, "
"avatar_url FROM users WHERE slug = %s;",
(slug,),
)

data = await cursor.fetchone()
if not data:
return None

return cls(
slug=slug,
status=data[0],
update_time=data[1],
id=data[2],
name=data[3],
history_names=data[4],
avatar_url=data[5],
)

@classmethod
async def upsert(
cls,
slug: str,
id: Optional[int] = None, # noqa: A002
name: Optional[str] = None,
avatar_url: Optional[str] = None,
) -> None:
user = await cls.get_by_slug(slug)
# 如果不存在,创建用户
if not user:
await cls(
slug=slug,
status=StatusEnum.NORMAL,
update_time=datetime.now(),
id=id,
name=name,
history_names=[],
avatar_url=avatar_url,
).create()
return

# 如果当前数据不是最新,跳过更新
if user.update_time > datetime.now():
return

# 在一个事务中一次性完成全部字段的更新
async with jianshu_conn.transaction():
# 更新更新时间
await jianshu_conn.execute(
"UPDATE users SET update_time = %s WHERE slug = %s",
(datetime.now(), slug),
)

# ID 无法被修改,如果异常则抛出错误
if user.id and id and user.id != id:
raise ValueError(f"用户 ID 不一致:{user.id} != {id}")

# 如果没有存储 ID,进行添加
if not user.id and id:
await jianshu_conn.execute(
"UPDATE users SET id = %s WHERE slug = %s",
(id, slug),
)

# 如果没有存储昵称,进行添加
if not user.name and name:
await jianshu_conn.execute(
"UPDATE users SET name = %s WHERE slug = %s",
(name, slug),
)

# 更新昵称
if user.name and name and user.name != name:
await jianshu_conn.execute(
"UPDATE users SET name = %s WHERE slug = %s",
(name, slug),
)
await jianshu_conn.execute(
"UPDATE users SET history_names = array_append(history_names, %s) "
"WHERE slug = %s;",
(user.name, slug),
)

# 如果没有存储头像链接,进行添加
if not user.avatar_url and avatar_url:
await jianshu_conn.execute(
"UPDATE users SET avatar_url = %s WHERE slug = %s",
(avatar_url, slug),
)

# 更新头像链接
if user.avatar_url and avatar_url and user.avatar_url != avatar_url:
await jianshu_conn.execute(
"UPDATE users SET avatar_url = %s WHERE slug = %s",
(avatar_url, slug),
)

0 comments on commit fa3bc48

Please sign in to comment.