Skip to content

Commit

Permalink
feat: 数据库模型与工作流逻辑分离
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Mar 3, 2024
1 parent c63c514 commit 88a430f
Show file tree
Hide file tree
Showing 16 changed files with 381 additions and 265 deletions.
49 changes: 9 additions & 40 deletions jobs/fetch_article_earning_ranking_records.py
Original file line number Diff line number Diff line change
@@ -1,51 +1,24 @@
from datetime import date, datetime, timedelta
from typing import List, Optional, Tuple

from jkit._constraints import PositiveFloat, PositiveInt
from jkit.config import CONFIG
from jkit.exceptions import ResourceUnavailableError
from jkit.ranking.article_earning import ArticleEarningRanking, RecordField
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pymongo import IndexModel

from models.article_earning_ranking_record import (
ArticleEarningRankingRecordDocument,
ArticleField,
AuthorField,
EarningField,
init_db,
insert_many,
)
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)

COLLECTION = DB.article_earning_ranking_records


class ArticleField(Field, **FIELD_OBJECT_CONFIG):
title: Optional[str]
slug: Optional[str]


class AuthorField(Field, **FIELD_OBJECT_CONFIG):
id: Optional[PositiveInt]
slug: Optional[str]
name: Optional[str]


class EarningField(Field, **FIELD_OBJECT_CONFIG):
to_author: PositiveFloat
to_voter: PositiveFloat


class ArticleEarningRankingRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
date: date
ranking: PositiveInt
article: ArticleField
author: AuthorField
earning: EarningField


async def get_author_id_and_slug(
Expand Down Expand Up @@ -74,10 +47,6 @@ async def get_author_id_and_slug(
return None, None


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel([("date", "ranking")], unique=True)])


async def process_item(
item: RecordField, /, *, target_date: date
) -> ArticleEarningRankingRecordDocument:
Expand Down Expand Up @@ -117,7 +86,7 @@ async def flow_func() -> State:
processed_item = await process_item(item, target_date=target_date)
data.append(processed_item)

await COLLECTION.insert_many(x.to_dict() for x in data)
await insert_many(data)

return Completed(message=f"target_date={target_date}, data_count={len(data)}")

Expand Down
42 changes: 7 additions & 35 deletions jobs/fetch_assets_ranking_records.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,23 @@
from datetime import date, datetime
from typing import List, Optional, Tuple

from jkit._constraints import NonNegativeFloat, PositiveFloat, PositiveInt
from jkit.config import CONFIG
from jkit.exceptions import ResourceUnavailableError
from jkit.ranking.assets import AssetsRanking, AssetsRankingRecord
from jkit.user import User
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pymongo import IndexModel

from models.assets_ranking_record import (
AssetsRankingRecordDocument,
UserInfoField,
init_db,
insert_many,
)
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)

COLLECTION = DB.assets_ranking_records


class UserInfoField(Field, **FIELD_OBJECT_CONFIG):
id: Optional[PositiveInt]
slug: Optional[str]
name: Optional[str]


class AssetsRankingRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
date: date
ranking: PositiveInt

fp_amount: Optional[NonNegativeFloat]
ftn_amount: Optional[NonNegativeFloat]
assets_amount: PositiveFloat

user_info: UserInfoField


async def get_fp_ftn_amount(
Expand Down Expand Up @@ -70,10 +48,6 @@ async def get_fp_ftn_amount(
return None, None


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel(("date", "ranking"), unique=True)])


async def process_item(
item: AssetsRankingRecord, /, *, target_date: date
) -> AssetsRankingRecordDocument:
Expand Down Expand Up @@ -104,16 +78,14 @@ async def flow_func() -> State:
target_date = datetime.now().date()

data: List[AssetsRankingRecordDocument] = []
# FIXME: jkit.exceptions.ValidationError: Expected `str` matching regex
# '^https?:\\/\\/.*\\.jianshu\\.io\\/[\\w%-\\/]*\\/?$' - at `$.user_info.avatar_url`
async for item in AssetsRanking():
processed_item = await process_item(item, target_date=target_date)
data.append(processed_item)

if len(data) == 1000:
break

await COLLECTION.insert_many(x.to_dict() for x in data)
await insert_many(data)

return Completed(message=f"target_date={target_date}, data_count={len(data)}")

Expand Down
42 changes: 11 additions & 31 deletions jobs/fetch_daily_update_ranking_records.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,23 @@
from datetime import date, datetime
from typing import List

from jkit._constraints import PositiveInt
from jkit.ranking.daily_update import DailyUpdateRanking, DailyUpdateRankingRecord
from jkit.ranking.daily_update import (
DailyUpdateRanking,
DailyUpdateRankingRecord,
)
from prefect import flow
from prefect.states import Completed, State
from pymongo import IndexModel

from models.daily_update_ranking_record import (
DailyUpdateRankingRecordDocument,
UserInfoField,
init_db,
insert_many,
)
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)

COLLECTION = DB.daily_update_ranking_records


class UserInfoField(Field, **FIELD_OBJECT_CONFIG):
slug: str
name: str


class DailyUpdateRankingRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
date: date
ranking: PositiveInt
days: PositiveInt
user_info: UserInfoField


async def init_db() -> None:
await COLLECTION.create_indexes(
[IndexModel(("date", "user_info.slug"), unique=True)]
)


def process_item(
Expand Down Expand Up @@ -69,7 +49,7 @@ async def flow_func() -> State:
processed_item = process_item(item, current_date=current_date)
data.append(processed_item)

await COLLECTION.insert_many(x.to_dict() for x in data)
await insert_many(data)

return Completed(message=f"data_count={len(data)}")

Expand Down
53 changes: 10 additions & 43 deletions jobs/fetch_jianshu_lottery_win_records.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,20 @@
from datetime import datetime
from typing import List

from jkit._constraints import PositiveInt
from jkit.jianshu_lottery import JianshuLottery, JianshuLotteryWinRecord
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pymongo import IndexModel

from models.jianshu_lottery_win_record import (
JianshuLotteryWinRecordDocument,
UserInfoField,
get_latest_record_id,
init_db,
insert_many,
)
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)

COLLECTION = DB.jianshu_lottery_win_records


class UserInfoField(Field, **FIELD_OBJECT_CONFIG):
id: PositiveInt
slug: str
name: str


class JianshuLotteryWinRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
record_id: PositiveInt
time: datetime
award_name: str
user_info: UserInfoField


async def get_latest_stored_record_id() -> int:
try:
latest_data = JianshuLotteryWinRecordDocument.from_dict(
await COLLECTION.find().sort("_id", -1).__anext__()
)
except StopAsyncIteration:
return 0

return latest_data.record_id


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel(("recordId",), unique=True)])


def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordDocument:
Expand All @@ -73,8 +40,8 @@ async def flow_func() -> State:

logger = get_run_logger()

stop_id = await get_latest_stored_record_id()
logger.info(f"获取到最新的已存储 ID:{stop_id}")
stop_id = await get_latest_record_id()
logger.info(f"获取到最新的记录 ID:{stop_id}")
if stop_id == 0:
logger.warning("数据库中没有记录")

Expand All @@ -89,7 +56,7 @@ async def flow_func() -> State:
logger.warning("采集数据量达到上限")

if data:
await COLLECTION.insert_many(x.to_dict() for x in data)
await insert_many(data)
else:
logger.info("无数据,不执行保存操作")

Expand Down
54 changes: 8 additions & 46 deletions jobs/fetch_jpep_ftn_trade_orders.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,20 @@
from datetime import datetime
from typing import List, Literal, Optional
from typing import List, Literal

from jkit._constraints import (
NonNegativeInt,
PositiveFloat,
PositiveInt,
)
from jkit.jpep.ftn_macket import FTNMacket, FTNMacketOrderRecord
from prefect import flow
from prefect.states import Completed, State
from pymongo import IndexModel

from models.jpep_ftn_trade_order import (
JPEPFTNTradeOrderDocument,
PublisherInfoField,
init_db,
insert_many,
)
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Documemt,
Field,
)

COLLECTION = DB.jpep_ftn_trade_orders


class PublisherInfoField(Field, **FIELD_OBJECT_CONFIG):
is_anonymous: bool
id: Optional[PositiveInt]
name: Optional[str]
hashed_name: Optional[str]
credit: Optional[NonNegativeInt]


class JPEPFTNTradeOrderDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
fetch_time: datetime
order_id: PositiveInt
type: Literal["buy", "sell"]
price: PositiveFloat

total_amount: PositiveInt
traded_amount: NonNegativeInt
tradable_amount: NonNegativeInt
minimum_trade_amount: PositiveInt

traded_count: NonNegativeInt
publish_time: datetime

publisher_info: PublisherInfoField


def get_fetch_time() -> datetime:
Expand All @@ -58,10 +24,6 @@ def get_fetch_time() -> datetime:
return current_dt.replace(minute=current_dt.minute // 10, second=0, microsecond=0)


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel(("fetchTime", "orderId"), unique=True)])


def process_item(
item: FTNMacketOrderRecord,
/,
Expand Down Expand Up @@ -105,7 +67,7 @@ async def flow_func(type: Literal["buy", "sell"]) -> State: # noqa: A002
processed_item = process_item(item, fetch_time=fetch_time, type=type)
data.append(processed_item)

await COLLECTION.insert_many(x.to_dict() for x in data)
await insert_many(data)

return Completed(message=f"fetch_time={fetch_time}, data_count={len(data)}")

Expand Down
Loading

0 comments on commit 88a430f

Please sign in to comment.