Skip to content

Commit

Permalink
feat: 调整工作流结构与数据存储结构
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Mar 11, 2024
1 parent 26616c5 commit 3541bed
Show file tree
Hide file tree
Showing 19 changed files with 67 additions and 62 deletions.
14 changes: 7 additions & 7 deletions jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ def import_deployment(path: str) -> DeploymentType:


DEPLOYMENT_PATHS: Set[str] = {
"jobs.article_earning_ranking:deployment",
"jobs.assets_ranking:deployment",
"jobs.daily_update_ranking:deployment",
"jobs.jianshu_lottery:deployment",
"jobs.jpep_ftn_trade:buy_deployment",
"jobs.jpep_ftn_trade:sell_deployment",
"jobs.lp_recommended_articles:deployment",
"jobs.jianshu.article_earning_ranking:deployment",
"jobs.jianshu.assets_ranking:deployment",
"jobs.jianshu.daily_update_ranking:deployment",
"jobs.jianshu.lottery:deployment",
"jobs.jianshu.lp_recommend:deployment",
"jobs.jpep.ftn_trade:buy_deployment",
"jobs.jpep.ftn_trade:sell_deployment",
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from prefect import flow, get_run_logger
from prefect.states import Completed, State

from models.article_earning_ranking_record import (
from models.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecordDocument,
ArticleField,
EarningField,
)
from models.jianshu_user import JianshuUserDocument
from models.jianshu.user import JianshuUserDocument
from utils.async_retry import async_retry
from utils.config_generators import (
generate_deployment_config,
Expand Down
4 changes: 2 additions & 2 deletions jobs/assets_ranking.py → jobs/jianshu/assets_ranking.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from prefect import flow, get_run_logger
from prefect.states import Completed, State

from models.assets_ranking_record import (
from models.jianshu.assets_ranking_record import (
AmountField,
AssetsRankingRecordDocument,
)
from models.jianshu_user import JianshuUserDocument
from models.jianshu.user import JianshuUserDocument
from utils.async_retry import async_retry
from utils.config_generators import (
generate_deployment_config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from prefect import flow
from prefect.states import Completed, State

from models.daily_update_ranking_record import (
from models.jianshu.daily_update_ranking_record import (
DailyUpdateRankingRecordDocument,
)
from models.jianshu_user import JianshuUserDocument
from models.jianshu.user import JianshuUserDocument
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
Expand Down
18 changes: 9 additions & 9 deletions jobs/jianshu_lottery.py → jobs/jianshu/lottery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from prefect import flow, get_run_logger
from prefect.states import Completed, State

from models.jianshu_lottery_win_record import (
JianshuLotteryWinRecordDocument,
from models.jianshu.lottery_win_record import (
LotteryWinRecordDocument,
)
from models.jianshu_user import JianshuUserDocument
from models.jianshu.user import JianshuUserDocument
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
Expand All @@ -16,7 +16,7 @@

async def process_item(
item: JianshuLotteryWinRecord, /
) -> JianshuLotteryWinRecordDocument:
) -> LotteryWinRecordDocument:
await JianshuUserDocument.insert_or_update_one(
slug=item.user_info.slug,
updated_at=item.time,
Expand All @@ -25,7 +25,7 @@ async def process_item(
avatar_url=item.user_info.avatar_url,
)

return JianshuLotteryWinRecordDocument(
return LotteryWinRecordDocument(
id=item.id,
time=item.time,
award_name=item.award_name,
Expand All @@ -39,17 +39,17 @@ async def process_item(
)
)
async def flow_func() -> State:
await JianshuLotteryWinRecordDocument.ensure_indexes()
await LotteryWinRecordDocument.ensure_indexes()
await JianshuUserDocument.ensure_indexes()

logger = get_run_logger()

stop_id = await JianshuLotteryWinRecordDocument.get_latest_record_id()
stop_id = await LotteryWinRecordDocument.get_latest_record_id()
logger.info(f"获取到最新的记录 ID:{stop_id}")
if stop_id == 0:
logger.warning("数据库中没有记录")

data: List[JianshuLotteryWinRecordDocument] = []
data: List[LotteryWinRecordDocument] = []
async for item in JianshuLottery().iter_win_records():
if item.id == stop_id:
break
Expand All @@ -60,7 +60,7 @@ async def flow_func() -> State:
logger.warning("采集数据量达到上限")

if data:
await JianshuLotteryWinRecordDocument.insert_many(data)
await LotteryWinRecordDocument.insert_many(data)
else:
logger.info("无数据,不执行保存操作")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
from prefect import flow, get_run_logger
from prefect.states import Completed, State

from models.jianshu_user import JianshuUserDocument
from models.lp_recommend_article_record import (
from models.jianshu.lp_recommend_article_record import (
LPRecommendedArticleRecordDocument,
)
from models.jianshu.user import JianshuUserDocument
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
Expand Down
18 changes: 9 additions & 9 deletions jobs/jpep_ftn_trade.py → jobs/jpep/ftn_trade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
from prefect import flow
from prefect.states import Completed, State

from models.jpep_ftn_trade_order import (
from models.jpep.ftn_trade_order import (
AmountField,
JPEPFTNTradeOrderDocument,
FTNTradeOrderDocument,
)
from models.jpep_user import JPEPUserDocument
from models.jpep.user import UserDocument
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
Expand Down Expand Up @@ -37,9 +37,9 @@ async def process_item(
*,
fetch_time: datetime,
type: Literal["buy", "sell"], # noqa: A002
) -> JPEPFTNTradeOrderDocument:
) -> FTNTradeOrderDocument:
if item.publisher_info.id:
await JPEPUserDocument.insert_or_update_one(
await UserDocument.insert_or_update_one(
updated_at=fetch_time,
id=item.publisher_info.id,
name=item.publisher_info.name, # type: ignore
Expand All @@ -48,7 +48,7 @@ async def process_item(
credit=item.publisher_info.credit, # type: ignore
)

return JPEPFTNTradeOrderDocument(
return FTNTradeOrderDocument(
fetch_time=fetch_time,
id=item.id,
published_at=item.publish_time,
Expand All @@ -71,16 +71,16 @@ async def process_item(
),
)
async def flow_func(type: Literal["buy", "sell"]) -> State: # noqa: A002
await JPEPFTNTradeOrderDocument.ensure_indexes()
await FTNTradeOrderDocument.ensure_indexes()

fetch_time = get_fetch_time()

data: List[JPEPFTNTradeOrderDocument] = []
data: List[FTNTradeOrderDocument] = []
async for item in FTNMacket().iter_orders(type=type):
processed_item = await process_item(item, fetch_time=fetch_time, type=type)
data.append(processed_item)

await JPEPFTNTradeOrderDocument.insert_many(data)
await FTNTradeOrderDocument.insert_many(data)

return Completed(message=f"fetch_time={fetch_time}, data_count={len(data)}")

Expand Down
4 changes: 2 additions & 2 deletions migrate_01_article_fp_rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from jkit.identifier_convert import article_url_to_slug, user_url_to_slug
from sspeedup.logging.run_logger import RunLogger

from models.article_earning_ranking_record import (
from models.jianshu.article_earning_ranking_record import (
ArticleEarningRankingRecordDocument,
ArticleField,
EarningField,
)
from models.jianshu_user import JianshuUserDocument
from models.jianshu.user import JianshuUserDocument
from old_models.article_fp_rank import OldArticleFPRank
from utils.migrate_helper import (
get_collection_data_count,
Expand Down
7 changes: 5 additions & 2 deletions migrate_02_assets_rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from jkit.identifier_convert import user_url_to_slug
from sspeedup.logging.run_logger import RunLogger

from models.assets_ranking_record import AmountField, AssetsRankingRecordDocument
from models.jianshu_user import JianshuUserDocument
from models.jianshu.assets_ranking_record import (
AmountField,
AssetsRankingRecordDocument,
)
from models.jianshu.user import JianshuUserDocument
from old_models.assets_rank import OldAssetsRank
from utils.migrate_helper import (
get_collection_data_count,
Expand Down
4 changes: 2 additions & 2 deletions migrate_03_daily_update_rank.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from jkit.identifier_convert import user_url_to_slug
from sspeedup.logging.run_logger import RunLogger

from models.daily_update_ranking_record import DailyUpdateRankingRecordDocument
from models.jianshu_user import JianshuUserDocument
from models.jianshu.daily_update_ranking_record import DailyUpdateRankingRecordDocument
from models.jianshu.user import JianshuUserDocument
from old_models.daily_update_rank import OldDailyUpdateRank
from utils.migrate_helper import (
get_collection_data_count,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from pymongo import IndexModel

from utils.db import DB
from utils.db import JIANSHU_DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Expand Down Expand Up @@ -38,7 +38,7 @@ class ArticleEarningRankingRecordDocument(Document, **DOCUMENT_OBJECT_CONFIG):
earning: EarningField

class Meta: # type: ignore
collection = DB.article_earning_ranking_records
collection = JIANSHU_DB.article_earning_ranking_records
indexes: ClassVar[List[IndexModel]] = [
IndexModel(["date", "ranking"], unique=True),
]
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)
from pymongo import IndexModel

from utils.db import DB
from utils.db import JIANSHU_DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Expand All @@ -32,7 +32,7 @@ class AssetsRankingRecordDocument(Document, **DOCUMENT_OBJECT_CONFIG):
user_slug: Optional[UserSlug]

class Meta: # type: ignore
collection = DB.assets_ranking_records
collection = JIANSHU_DB.assets_ranking_records
indexes: ClassVar[List[IndexModel]] = [
IndexModel(["date", "ranking"], unique=True),
]
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
)
from pymongo import IndexModel

from utils.db import DB
from utils.db import JIANSHU_DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Document,
Expand All @@ -22,7 +22,7 @@ class DailyUpdateRankingRecordDocument(Document, **DOCUMENT_OBJECT_CONFIG):
user_slug: UserSlug

class Meta: # type: ignore
collection = DB.daily_update_ranking_records
collection = JIANSHU_DB.daily_update_ranking_records
indexes: ClassVar[List[IndexModel]] = [
IndexModel(["date", "userSlug"], unique=True),
]
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,30 @@
)
from pymongo import IndexModel

from utils.db import DB
from utils.db import JIANSHU_DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Document,
)


class JianshuLotteryWinRecordDocument(Document, **DOCUMENT_OBJECT_CONFIG):
class LotteryWinRecordDocument(Document, **DOCUMENT_OBJECT_CONFIG):
id: PositiveInt
time: datetime
award_name: NonEmptyStr

user_slug: UserSlug

class Meta: # type: ignore
collection = DB.jianshu_lottery_win_records
collection = JIANSHU_DB.lottery_win_records
indexes: ClassVar[List[IndexModel]] = [
IndexModel(["id"], unique=True),
]

@classmethod
async def get_latest_record_id(cls) -> int:
try:
latest_data = JianshuLotteryWinRecordDocument.from_dict(
latest_data = LotteryWinRecordDocument.from_dict(
await cls.Meta.collection.find().sort("id", -1).__anext__()
)
except StopAsyncIteration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from msgspec import field
from pymongo import IndexModel

from utils.db import DB
from utils.db import JIANSHU_DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Document,
Expand All @@ -39,7 +39,7 @@ class LPRecommendedArticleRecordDocument(Document, **DOCUMENT_OBJECT_CONFIG):
author_slug: UserSlug

class Meta: # type: ignore
collection = DB.lp_recommended_article_records
collection = JIANSHU_DB.lp_recommended_article_records
indexes: ClassVar[List[IndexModel]] = [
IndexModel(["date", "slug"], unique=True),
]
Expand Down
4 changes: 2 additions & 2 deletions models/jianshu_user.py → models/jianshu/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from jkit._constraints import PositiveInt, UserName, UserSlug, UserUploadedUrl
from pymongo import IndexModel

from utils.db import DB
from utils.db import JIANSHU_DB
from utils.document_model import Document


Expand All @@ -24,7 +24,7 @@ class JianshuUserDocument(Document):
avatar_url: Optional[UserUploadedUrl]

class Meta: # type: ignore
collection = DB.jianshu_users
collection = JIANSHU_DB.users
indexes: ClassVar[List[IndexModel]] = [
IndexModel(["slug"], unique=True),
IndexModel(["updatedAt"]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)
from pymongo import IndexModel

from utils.db import DB
from utils.db import JPEP_DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
FIELD_OBJECT_CONFIG,
Expand All @@ -24,7 +24,7 @@ class AmountField(Field, **FIELD_OBJECT_CONFIG):
minimum_trade: PositiveInt


class JPEPFTNTradeOrderDocument(Document, **DOCUMENT_OBJECT_CONFIG):
class FTNTradeOrderDocument(Document, **DOCUMENT_OBJECT_CONFIG):
fetch_time: datetime
id: PositiveInt
published_at: datetime
Expand All @@ -36,7 +36,7 @@ class JPEPFTNTradeOrderDocument(Document, **DOCUMENT_OBJECT_CONFIG):
publisher_id: PositiveInt

class Meta: # type: ignore
collection = DB.jpep_ftn_trade_orders
collection = JPEP_DB.ftn_trade_orders
indexes: ClassVar[List[IndexModel]] = [
IndexModel(["fetchTime", "id"], unique=True),
]
Loading

0 comments on commit 3541bed

Please sign in to comment.