Skip to content

Commit

Permalink
JFetcher v3.9.0
Browse files Browse the repository at this point in the history
功能变动:

- 完全移除 MongoDB 相关逻辑和迁移脚本
  • Loading branch information
FHU-yezi committed Nov 2, 2024
2 parents 22ad2da + 9868b4b commit ba9feaf
Show file tree
Hide file tree
Showing 19 changed files with 260 additions and 518 deletions.
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
## 环境

- Python 3.9+
- MongoDB
- PostgreSQL
- Prefect 2

Expand Down Expand Up @@ -33,7 +32,6 @@ cp config.example.toml config.toml

如果您使用 Docker 进行部署:

- mongo.host 填写 `mongodb`
- jianshu_postgres.host 填写 `postgres`
- jpep_postgres.host 填写 `postgres`
- logging.host 填写 `postgres`
Expand All @@ -47,15 +45,12 @@ cp config.example.toml config.toml

```shell
docker network create gotify
docker network create mongodb
docker network create postgres
docker network create prefect
```

您需要在 `gotify` 网络的 `27017` 端口上运行一个 Gotify 服务。

您需要在 `mongodb` 网络的 `27017` 端口上运行一个 MongoDB 服务,该服务不开启身份验证。

您需要在 `postgres` 网络的 `5173` 端口上运行一个 PostgreSQL 服务,身份验证相关信息请参考 `部署 - 数据库准备` 一节。

您需要在 `prefect` 网络的 `4200` 端口上运行一个 Prefect 服务。
Expand Down Expand Up @@ -84,8 +79,6 @@ uv install

您需要在 `8701` 端口上运行一个 Gotify 服务。

您需要在 `27017` 端口上运行一个 MongoDB 服务,该服务不开启身份验证。

您需要在 `5173` 端口上运行一个 PostgreSQL 服务,身份验证相关信息请参考 `部署 - 数据库准备` 一节。

您需要在 `4200` 端口上运行一个 Prefect 服务。
Expand Down
5 changes: 0 additions & 5 deletions config.example.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
[mongo]
host = "localhost"
port = 27017
database = "jfetcher"

[jianshu_postgres]
host = "localhost"
port = 5432
Expand Down
8 changes: 2 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
networks:
gotify:
external: true
mongodb:
external: true
postgres:
external: true
prefect:
external: true

services:
main:
image: jfetcher:3.8.0
image: jfetcher:3.9.0
container_name: jfetcher
build: .
volumes:
- ./config.toml:/app/config.toml:ro
networks:
- gotify
- mongodb
- postgres
- prefect
environment:
- PYTHONUNBUFFERED=1
- PREFECT_API_URL=http://prefect:4200/api
deploy:
resources:
deploy: resources:
limits:
memory: 1G
restart_policy:
Expand Down
48 changes: 48 additions & 0 deletions fix_article_earning_ranking_missing_author_slug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from asyncio import run as asyncio_run
from datetime import date

from jkit.article import Article
from jkit.config import CONFIG
from jkit.exceptions import ResourceUnavailableError
from sshared.logging import Logger

from utils.postgres import get_jianshu_conn

START_DATE = date(2024, 10, 30)

CONFIG.data_validation.enabled = False
logger = Logger()


async def main() -> None:
logger.info(f"准备修复 {START_DATE} 至今的数据")

conn = await get_jianshu_conn()

cursor = await conn.execute(
"SELECT slug FROM article_earning_ranking_records "
"WHERE date >= %s AND slug IS NOT NULL AND author_slug IS NULL "
"ORDER BY date, ranking;",
(START_DATE,),
)

async for item in cursor:
article = Article.from_slug(item[0])

try:
author_slug = (await article.info).author_info.slug
except ResourceUnavailableError:
logger.warn(f"文章 {article.slug} 已删除,跳过数据获取")
continue

await conn.execute(
"UPDATE article_earning_ranking_records SET author_slug = %s "
"WHERE slug = %s;",
(author_slug, article.slug),
)
logger.debug(f"已成功更新 {article.slug} 的 author_slug 字段为 {author_slug}")

logger.info("数据修复完成")


asyncio_run(main())
73 changes: 22 additions & 51 deletions jobs/jpep/ftn_trade.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from prefect import flow

from models.jpep.credit_record import CreditRecord
from models.jpep.ftn_trade_order import AmountField, FTNTradeOrderDocument
from models.jpep.new.ftn_macket_record import FTNMacketRecord
from models.jpep.new.ftn_order import FTNOrder, TypeEnum
from models.jpep.ftn_macket_record import FTNMacketRecord
from models.jpep.ftn_order import FTNOrder, TypeEnum
from models.jpep.user import User
from utils.log import log_flow_run_start, log_flow_run_success, logger
from utils.prefect_helper import (
Expand Down Expand Up @@ -35,7 +34,7 @@ async def process_item(
item: FTNMacketOrderRecord,
time: datetime,
type: Literal["buy", "sell"], # noqa: A002
) -> FTNTradeOrderDocument:
) -> FTNMacketRecord:
if item.publisher_info.id:
await User.upsert(
id=item.publisher_info.id,
Expand All @@ -53,57 +52,30 @@ async def process_item(
credit=item.publisher_info.credit,
).create()

return FTNTradeOrderDocument(
order = await FTNOrder.get_by_id(item.id)
if not order:
await FTNOrder(
id=item.id,
type={"buy": TypeEnum.BUY, "sell": TypeEnum.SELL}[type],
publisher_id=item.publisher_info.id,
publish_time=item.publish_time,
last_seen_time=time,
).create()
else:
await FTNOrder.update_last_seen_time(order.id, time)

return FTNMacketRecord(
fetch_time=time,
id=item.id,
published_at=item.publish_time,
type=type,
price=item.price,
traded_count=item.traded_count,
amount=AmountField(
total=item.total_amount,
traded=item.traded_amount,
tradable=item.tradable_amount,
minimum_trade=item.minimum_trade_amount,
),
publisher_id=item.publisher_info.id,
total_amount=item.total_amount,
traded_amount=item.traded_amount,
remaining_amount=item.tradable_amount,
minimum_trade_amount=item.minimum_trade_amount,
)


async def transform_and_write_new_data_source(
type: Literal["buy", "sell"], # noqa: A002
old_data: list[FTNTradeOrderDocument],
) -> None:
new_data: list[FTNMacketRecord] = []
for item in old_data:
order = await FTNOrder.get_by_id(item.id)
if not order:
await FTNOrder(
id=item.id,
type={"buy": TypeEnum.BUY, "sell": TypeEnum.SELL}[type],
publisher_id=item.publisher_id,
publish_time=item.published_at,
last_seen_time=item.fetch_time,
).create()
else:
await FTNOrder.update_last_seen_time(order.id, item.fetch_time)

new_data.append(
FTNMacketRecord(
fetch_time=item.fetch_time,
id=item.id,
price=item.price,
traded_count=item.traded_count,
total_amount=item.amount.total,
traded_amount=item.amount.traded,
remaining_amount=item.amount.tradable,
minimum_trade_amount=item.amount.minimum_trade,
)
)

await FTNMacketRecord.insert_many(new_data)


@flow(
**generate_flow_config(
name="采集简书积分兑换平台简书贝交易挂单",
Expand All @@ -114,14 +86,13 @@ async def main(type: Literal["buy", "sell"]) -> None: # noqa: A002

fetch_time = get_fetch_time()

data: list[FTNTradeOrderDocument] = []
data: list[FTNMacketRecord] = []
async for item in FTNMacket().iter_orders(type=type):
processed_item = await process_item(item, time=fetch_time, type=type)
data.append(processed_item)

if data:
await FTNTradeOrderDocument.insert_many(data)
await transform_and_write_new_data_source(type, data)
await FTNMacketRecord.insert_many(data)
else:
logger.warn("没有可采集的挂单信息,跳过数据写入")

Expand Down
26 changes: 0 additions & 26 deletions migrate_credit_history.py

This file was deleted.

51 changes: 0 additions & 51 deletions migrate_ftn_macket_records.py

This file was deleted.

35 changes: 0 additions & 35 deletions migrate_ftn_order.py

This file was deleted.

30 changes: 0 additions & 30 deletions migrate_jpep_user.py

This file was deleted.

Loading

0 comments on commit ba9feaf

Please sign in to comment.