Skip to content

Commit

Permalink
feat: 调整部分数据库结构
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Oct 24, 2024
1 parent 8f7ab2c commit 7fe4484
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 58 deletions.
2 changes: 1 addition & 1 deletion jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def import_deployment(path: str) -> DeploymentType:

DEPLOYMENT_PATHS: set[str] = {
"jobs.jianshu.article_earning_ranking:deployment",
"jobs.jianshu.assets_ranking:deployment",
"jobs.jianshu.user_assets_ranking:deployment",
"jobs.jianshu.daily_update_ranking:deployment",
"jobs.jpep.ftn_trade:buy_deployment",
"jobs.jpep.ftn_trade:sell_deployment",
Expand Down
8 changes: 4 additions & 4 deletions jobs/jianshu/article_earning_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ def transform_to_new_db_model(
NewDbArticleEarningRankingRecord(
date=item.date.date(),
ranking=item.ranking,
article_slug=item.article.slug,
article_title=item.article.title,
slug=item.article.slug,
title=item.article.title,
author_slug=item.author_slug,
earning_to_author=item.earning.to_author,
earning_to_voter=item.earning.to_voter,
author_earning=item.earning.to_author,
voter_earning=item.earning.to_voter,
)
)

Expand Down
2 changes: 1 addition & 1 deletion jobs/jianshu/daily_update_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def transform_to_new_db_model(
NewDbDailyUpateRankingRecord(
date=item.date.date(),
ranking=item.ranking,
user_slug=item.user_slug,
slug=item.user_slug,
days=item.days,
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
AssetsRankingRecordDocument,
)
from models.jianshu.user import UserDocument
from models.new.jianshu.assets_ranking_record import (
AssetsRankingRecord as NewDbAssetsRankingRecord,
from models.new.jianshu.user_assets_ranking_record import (
UserAssetsRankingRecord as NewDbAssetsRankingRecord,
)
from utils.log import (
get_flow_run_name,
Expand Down Expand Up @@ -96,7 +96,7 @@ def transform_to_new_db_model(
NewDbAssetsRankingRecord(
date=item.date.date(),
ranking=item.ranking,
user_slug=item.user_slug,
slug=item.user_slug,
fp=item.amount.fp,
ftn=item.amount.ftn,
assets=item.amount.assets,
Expand All @@ -108,7 +108,7 @@ def transform_to_new_db_model(

@flow(
**generate_flow_config(
name="采集资产排行榜记录",
name="采集用户资产排行榜记录",
)
)
async def main() -> None:
Expand All @@ -134,7 +134,7 @@ async def main() -> None:

deployment = main.to_deployment(
**generate_deployment_config(
name="采集资产排行榜记录",
name="采集用户资产排行榜记录",
cron="0 1 * * *",
)
)
4 changes: 2 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from models.new.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecord,
)
from models.new.jianshu.assets_ranking_record import AssetsRankingRecord
from models.new.jianshu.daily_update_ranking_record import DailyUpdateRankingRecord
from models.new.jianshu.user import User
from models.new.jianshu.user_assets_ranking_record import UserAssetsRankingRecord
from utils.log import logger


Expand All @@ -20,7 +20,7 @@ async def main() -> None:
logger.info("索引创建完成")
logger.info("正在初始化新版数据库")
await ArticleEarningRankingRecord.init()
await AssetsRankingRecord.init()
await UserAssetsRankingRecord.init()
await DailyUpdateRankingRecord.init()
await User.init()
logger.info("初始化新版数据库完成")
Expand Down
8 changes: 4 additions & 4 deletions migrate_article_earning_ranking_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ async def main() -> None:
ArticleEarningRankingRecord(
date=item["date"].date(),
ranking=item["ranking"],
article_slug=item["article"]["slug"],
article_title=item["article"]["title"],
slug=item["article"]["slug"],
title=item["article"]["title"],
author_slug=item["authorSlug"],
earning_to_author=item["earning"]["toAuthor"],
earning_to_voter=item["earning"]["toVoter"],
author_earning=item["earning"]["toAuthor"],
voter_earning=item["earning"]["toVoter"],
)
)
if len(batch) == 5000:
Expand Down
2 changes: 1 addition & 1 deletion migrate_daily_update_ranking_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def main() -> None:
DailyUpdateRankingRecord(
date=item["date"].date(),
ranking=item["ranking"],
user_slug=item["userSlug"],
slug=item["userSlug"],
days=item["days"],
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,42 @@
from models.jianshu.assets_ranking_record import (
AssetsRankingRecordDocument,
)
from models.new.jianshu.assets_ranking_record import (
AssetsRankingRecord,
from models.new.jianshu.user_assets_ranking_record import (
UserAssetsRankingRecord,
)

logger = Logger()


async def main() -> None:
await AssetsRankingRecord.init()
await UserAssetsRankingRecord.init()
logger.debug("初始化数据库表完成")

batch: list[AssetsRankingRecord] = []
batch: list[UserAssetsRankingRecord] = []

async for item in AssetsRankingRecordDocument.Meta.collection.find().sort(
{"date": 1, "ranking": 1}
):
batch.append(
AssetsRankingRecord(
UserAssetsRankingRecord(
date=item["date"].date(),
ranking=item["ranking"],
user_slug=item["userSlug"],
slug=item["userSlug"],
fp=item["amount"]["fp"],
ftn=item["amount"]["ftn"],
assets=item["amount"]["assets"],
)
)
if len(batch) == 5000:
await AssetsRankingRecord.insert_many(batch)
await UserAssetsRankingRecord.insert_many(batch)
logger.debug(
f"迁移 {batch[0].date}/{batch[0].ranking} - "
f"{batch[-1].date}/{batch[-1].ranking} 完成"
)
batch.clear()

if batch:
await AssetsRankingRecord.insert_many(batch)
await UserAssetsRankingRecord.insert_many(batch)
logger.debug(
f"迁移 {batch[0].date}/{batch[0].ranking} - "
f"{batch[-1].date}/{batch[-1].ranking} 完成"
Expand Down
28 changes: 14 additions & 14 deletions models/new/jianshu/article_earning_ranking_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
class ArticleEarningRankingRecord(Table, frozen=True):
date: date
ranking: PositiveInt
article_slug: Optional[NonEmptyStr]
article_title: Optional[NonEmptyStr]
slug: Optional[NonEmptyStr]
title: Optional[NonEmptyStr]
author_slug: Optional[NonEmptyStr]

earning_to_author: PositiveFloat
earning_to_voter: PositiveFloat
author_earning: PositiveFloat
voter_earning: PositiveFloat

@classmethod
async def _create_table(cls) -> None:
Expand All @@ -24,11 +24,11 @@ async def _create_table(cls) -> None:
CREATE TABLE IF NOT EXISTS article_earning_ranking_records (
date DATE NOT NULL,
ranking SMALLINT NOT NULL,
article_slug VARCHAR(12),
article_title TEXT,
slug VARCHAR(12),
title TEXT,
author_slug VARCHAR(12),
earning_to_author NUMERIC NOT NULL,
earning_to_voter NUMERIC NOT NULL,
author_earning NUMERIC NOT NULL,
voter_earning NUMERIC NOT NULL,
CONSTRAINT pk_article_earning_ranking_records_date_ranking PRIMARY KEY (date, ranking)
);
""" # noqa: E501
Expand All @@ -41,17 +41,17 @@ async def insert_many(cls, data: list["ArticleEarningRankingRecord"]) -> None:

await jianshu_conn.cursor().executemany(
"INSERT INTO article_earning_ranking_records (date, ranking, "
"article_slug, article_title, author_slug, earning_to_author, "
"earning_to_voter) VALUES (%s, %s, %s, %s, %s, %s, %s);",
"slug, title, author_slug, author_earning, voter_earning) "
"VALUES (%s, %s, %s, %s, %s, %s, %s);",
[
(
item.date,
item.ranking,
item.article_slug,
item.article_title,
item.slug,
item.title,
item.author_slug,
item.earning_to_author,
item.earning_to_voter,
item.author_earning,
item.voter_earning,
)
for item in data
],
Expand Down
10 changes: 5 additions & 5 deletions models/new/jianshu/daily_update_ranking_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class DailyUpdateRankingRecord(Table, frozen=True):
date: date
ranking: PositiveInt
user_slug: NonEmptyStr
slug: NonEmptyStr
days: PositiveInt

@classmethod
Expand All @@ -19,9 +19,9 @@ async def _create_table(cls) -> None:
CREATE TABLE IF NOT EXISTS daily_update_ranking_records (
date DATE NOT NULL,
ranking SMALLINT NOT NULL,
user_slug VARCHAR(12),
slug VARCHAR(12),
days SMALLINT,
CONSTRAINT pk_daily_update_ranking_records_date_user_slug PRIMARY KEY (date, user_slug)
CONSTRAINT pk_daily_update_ranking_records_date_slug PRIMARY KEY (date, slug)
);
""" # noqa: E501
)
Expand All @@ -33,12 +33,12 @@ async def insert_many(cls, data: list["DailyUpdateRankingRecord"]) -> None:

await jianshu_conn.cursor().executemany(
"INSERT INTO daily_update_ranking_records (date, ranking, "
"user_slug, days) VALUES (%s, %s, %s, %s);",
"slug, days) VALUES (%s, %s, %s, %s);",
[
(
item.date,
item.ranking,
item.user_slug,
item.slug,
item.days,
)
for item in data
Expand Down
8 changes: 4 additions & 4 deletions models/new/jianshu/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ class StatusEnum(Enum):
class User(Table, frozen=True):
slug: NonEmptyStr
status: StatusEnum
update_time: datetime
id: Optional[PositiveInt]
name: Optional[NonEmptyStr]
update_time: datetime
history_names: list[NonEmptyStr]
avatar_url: Optional[NonEmptyStr]

Expand All @@ -35,9 +35,9 @@ async def _create_table(cls) -> None:
CREATE TABLE IF NOT EXISTS users (
slug VARCHAR(12) CONSTRAINT pk_users_slug PRIMARY KEY,
status enum_users_status NOT NULL,
update_time TIMESTAMP NOT NULL,
id INTEGER,
name VARCHAR(15),
update_time TIMESTAMP NOT NULL,
history_names VARCHAR(15)[] NOT NULL,
avatar_url TEXT
);
Expand All @@ -47,14 +47,14 @@ async def _create_table(cls) -> None:
async def create(self) -> None:
self.validate()
await jianshu_conn.execute(
"INSERT INTO users (slug, status, id, name, update_time, "
"INSERT INTO users (slug, status, update_time, id, name, "
"history_names, avatar_url) VALUES (%s, %s, %s, %s, %s, %s, %s);",
(
self.slug,
self.status,
self.update_time,
self.id,
self.name,
self.update_time,
self.history_names,
self.avatar_url,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from utils.postgres import jianshu_conn


class AssetsRankingRecord(Table, frozen=True):
class UserAssetsRankingRecord(Table, frozen=True):
date: date
ranking: PositiveInt
user_slug: Optional[NonEmptyStr]
slug: Optional[NonEmptyStr]

fp: Optional[NonNegativeFloat]
ftn: Optional[NonNegativeFloat]
Expand All @@ -20,31 +20,31 @@ class AssetsRankingRecord(Table, frozen=True):
async def _create_table(cls) -> None:
await jianshu_conn.execute(
"""
CREATE TABLE IF NOT EXISTS assets_ranking_records (
CREATE TABLE IF NOT EXISTS user_assets_ranking_records (
date DATE NOT NULL,
ranking SMALLINT NOT NULL,
user_slug VARCHAR(12),
slug VARCHAR(12),
fp NUMERIC,
ftn NUMERIC,
assets NUMERIC,
CONSTRAINT pk_assets_ranking_records_date_ranking PRIMARY KEY (date, ranking)
CONSTRAINT pk_user_assets_ranking_records_date_ranking PRIMARY KEY (date, ranking)
);
""" # noqa: E501
)

@classmethod
async def insert_many(cls, data: list["AssetsRankingRecord"]) -> None:
async def insert_many(cls, data: list["UserAssetsRankingRecord"]) -> None:
for item in data:
item.validate()

await jianshu_conn.cursor().executemany(
"INSERT INTO assets_ranking_records (date, ranking, "
"user_slug, fp, ftn, assets) VALUES (%s, %s, %s, %s, %s, %s);",
"INSERT INTO user_assets_ranking_records (date, ranking, "
"slug, fp, ftn, assets) VALUES (%s, %s, %s, %s, %s, %s);",
[
(
item.date,
item.ranking,
item.user_slug,
item.slug,
item.fp,
item.ftn,
item.assets,
Expand Down

0 comments on commit 7fe4484

Please sign in to comment.