Skip to content

Commit

Permalink
feat: 将初始化数据库拆分为单独函数
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Feb 29, 2024
1 parent 0cc8f06 commit cc4cf6c
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 16 deletions.
2 changes: 1 addition & 1 deletion jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def import_deployment(path: str) -> DeploymentType:
"jobs.fetch_jianshu_lottery_win_records:deployment",
"jobs.fetch_jpep_ftn_trade_orders:buy_deployment",
"jobs.fetch_jpep_ftn_trade_orders:sell_deployment",
"jobs.fetch_lp_recommended_article_records:deployment"
"jobs.fetch_lp_recommended_article_records:deployment",
}


Expand Down
15 changes: 10 additions & 5 deletions jobs/fetch_article_earning_ranking_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from jkit.ranking.article_earning import ArticleEarningRanking, RecordField
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pymongo import ASCENDING, IndexModel
from pymongo import IndexModel

from utils.config_generators import generate_deployment_config, generate_flow_config
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Expand Down Expand Up @@ -71,6 +74,10 @@ async def get_author_id_and_slug(
return None, None


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel([("date", "ranking")], unique=True)])


async def process_item(
item: RecordField, /, *, target_date: date
) -> ArticleEarningRankingRecordDocument:
Expand Down Expand Up @@ -101,9 +108,7 @@ async def process_item(
)
)
async def flow_func() -> State:
await COLLECTION.create_indexes(
[IndexModel([("date", ASCENDING), ("ranking", ASCENDING)], unique=True)]
)
await init_db()

target_date = datetime.now().date() - timedelta(days=1)

Expand Down
12 changes: 11 additions & 1 deletion jobs/fetch_assets_ranking_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
from jkit.user import User
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pymongo import IndexModel

from utils.config_generators import generate_deployment_config, generate_flow_config
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Expand Down Expand Up @@ -66,6 +70,10 @@ async def get_fp_ftn_amount(
return None, None


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel(("date", "ranking"), unique=True)])


async def process_item(
item: AssetsRankingRecord, /, *, target_date: date
) -> AssetsRankingRecordDocument:
Expand All @@ -91,6 +99,8 @@ async def process_item(
)
)
async def flow_func() -> State:
await init_db()

target_date = datetime.now().date()

data: List[AssetsRankingRecordDocument] = []
Expand Down
14 changes: 13 additions & 1 deletion jobs/fetch_daily_update_ranking_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
from jkit.ranking.daily_update import DailyUpdateRanking, DailyUpdateRankingRecord
from prefect import flow
from prefect.states import Completed, State
from pymongo import IndexModel

from utils.config_generators import generate_deployment_config, generate_flow_config
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Expand All @@ -30,6 +34,12 @@ class DailyUpdateRankingRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
user_info: UserInfoField


async def init_db() -> None:
await COLLECTION.create_indexes(
[IndexModel(("date", "user_info.slug"), unique=True)]
)


def process_item(
item: DailyUpdateRankingRecord, /, *, current_date: date
) -> DailyUpdateRankingRecordDocument:
Expand All @@ -50,6 +60,8 @@ def process_item(
)
)
async def flow_func() -> State:
await init_db()

current_date = datetime.now().date()

data: List[DailyUpdateRankingRecordDocument] = []
Expand Down
17 changes: 11 additions & 6 deletions jobs/fetch_jianshu_lottery_win_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
from jkit.jianshu_lottery import JianshuLottery, JianshuLotteryWinRecord
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pymongo import ASCENDING, DESCENDING, IndexModel
from pymongo import IndexModel

from utils.config_generators import generate_deployment_config, generate_flow_config
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Expand Down Expand Up @@ -35,14 +38,18 @@ class JianshuLotteryWinRecordDocument(Documemt, **DOCUMENT_OBJECT_CONFIG):
async def get_latest_stored_record_id() -> int:
try:
latest_data = JianshuLotteryWinRecordDocument.from_dict(
await COLLECTION.find().sort("_id", DESCENDING).__anext__()
await COLLECTION.find().sort("_id", -1).__anext__()
)
except StopAsyncIteration:
return 0

return latest_data.record_id


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel(("recordId",), unique=True)])


def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordDocument:
return JianshuLotteryWinRecordDocument(
record_id=item.id,
Expand All @@ -62,9 +69,7 @@ def process_item(item: JianshuLotteryWinRecord, /) -> JianshuLotteryWinRecordDoc
)
)
async def flow_func() -> State:
await COLLECTION.create_indexes(
[IndexModel([("recordId", ASCENDING)], unique=True)]
)
await init_db()

logger = get_run_logger()

Expand Down
12 changes: 11 additions & 1 deletion jobs/fetch_jpep_ftn_trade_orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
from jkit.jpep.ftn_macket import FTNMacket, FTNMacketOrderRecord
from prefect import flow
from prefect.states import Completed, State
from pymongo import IndexModel

from utils.config_generators import generate_deployment_config, generate_flow_config
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Expand Down Expand Up @@ -54,6 +58,10 @@ def get_fetch_time() -> datetime:
return current_dt.replace(minute=current_dt.minute // 10, second=0, microsecond=0)


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel(("fetchTime", "orderId"), unique=True)])


def process_item(
item: FTNMacketOrderRecord,
/,
Expand Down Expand Up @@ -88,6 +96,8 @@ def process_item(
),
)
async def flow_func(type: Literal["buy", "sell"]) -> State: # noqa: A002
await init_db()

fetch_time = get_fetch_time()

data: List[JPEPFTNTradeOrderDocument] = []
Expand Down
12 changes: 11 additions & 1 deletion jobs/fetch_lp_recommended_article_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
from msgspec import field
from prefect import flow, get_run_logger
from prefect.states import Completed, State
from pymongo import IndexModel

from utils.config_generators import generate_deployment_config, generate_flow_config
from utils.config_generators import (
generate_deployment_config,
generate_flow_config,
)
from utils.db import DB
from utils.document_model import (
DOCUMENT_OBJECT_CONFIG,
Expand Down Expand Up @@ -62,6 +66,10 @@ async def is_stored(item: CollectionArticleInfo) -> bool:
return False


async def init_db() -> None:
await COLLECTION.create_indexes([IndexModel(("date", "slug"), unique=True)])


async def process_item(
item: CollectionArticleInfo, /, *, current_date: date
) -> Optional[LPRecommendedArticleRecord]:
Expand Down Expand Up @@ -99,6 +107,8 @@ async def process_item(
)
)
async def flow_func() -> State:
await init_db()

logger = get_run_logger()

current_date = datetime.now().date()
Expand Down

0 comments on commit cc4cf6c

Please sign in to comment.