Skip to content

Commit

Permalink
feat: 使用 PostgreSQL 存储日志,使用 Gotify 发送工作流运行失败通知
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Oct 7, 2024
1 parent d4b2319 commit 5b1c3d3
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 80 deletions.
29 changes: 12 additions & 17 deletions config.example.toml
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
# MongoDB 数据库
[mongodb]
# 地址
host = "localhost"
# 端口
port = 27017
# 数据库名
database = "jfetcher"

# 日志
[postgres]
host = "localhost"
port = 5432
user = "postgres"
password = "postgres"
database = "jfetcher"

[logging]
# 是否保存日志
enable_save = true
# 日志展示等级
display_level = "DEBUG"
# 日志保存等级
save_level = "DEBUG"

# 飞书消息推送
[feishu_notification]
# 是否启用消息推送
enabled = true
# 消息推送 Webhook URL
webhook_url = ""
# 任务运行失败消息卡片 ID
failure_card_id = ""
[notify]
enabled: true
host = "localhost"
port = 8701
token = ""
14 changes: 4 additions & 10 deletions utils/config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
from sshared.config import ConfigBase
from sshared.config.blocks import ConfigBlock, LoggingBlock, MongoBlock
from sshared.strict_struct import NonEmptyStr


class FeishuNotificationBlock(ConfigBlock, frozen=True):
enabled: bool
webhook_url: NonEmptyStr
failure_card_id: NonEmptyStr
from sshared.config.blocks import GotifyBlock, LoggingBlock, MongoBlock, PostgresBlock


class _Config(ConfigBase, frozen=True):
mongodb: MongoBlock
mongo: MongoBlock
postgres: PostgresBlock
logging: LoggingBlock
feishu_notification: FeishuNotificationBlock
notify: GotifyBlock


CONFIG = _Config.load_from_file("config.toml")
2 changes: 1 addition & 1 deletion utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from utils.config import CONFIG

_CLIENT = AsyncIOMotorClient(CONFIG.mongodb.host, CONFIG.mongodb.port)
_CLIENT = AsyncIOMotorClient(CONFIG.mongo.host, CONFIG.mongo.port)

JIANSHU_DB = _CLIENT.jianshu
JPEP_DB = _CLIENT.jpep
13 changes: 5 additions & 8 deletions utils/log.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from prefect import runtime
from pymongo import MongoClient
from sshared.logging import Logger, _ExtraType
from sshared.logging import Logger
from sshared.logging.types import ExtraType

from utils.config import CONFIG

logger = Logger(
# TODO
save_level=CONFIG.logging.save_level,
display_level=CONFIG.logging.display_level,
save_collection=MongoClient()[CONFIG.mongodb.database].log
if CONFIG.logging.enable_save
else None,
save_level=CONFIG.logging.save_level,
connection_string=CONFIG.postgres.connection_string
)


Expand All @@ -30,7 +27,7 @@ def log_flow_run_start(logger: Logger) -> str:
return get_flow_run_name() # type: ignore


def log_flow_run_success(logger: Logger, **kwargs: _ExtraType) -> None:
def log_flow_run_success(logger: Logger, **kwargs: ExtraType) -> None:
logger.info(
"工作流执行成功",
flow_run_name=runtime.flow_run.name, # type: ignore
Expand Down
73 changes: 32 additions & 41 deletions utils/prefect_helper.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,53 @@
from functools import partial
from typing import Any, Literal, Union

from httpx import AsyncClient, HTTPStatusError
from prefect.client.schemas.objects import Flow, FlowRun, State
from prefect.client.schemas.schedules import CronSchedule
from sshared.notifier import Notifier

from utils.config import CONFIG
from utils.log import logger

_CLIENT = AsyncClient(http2=True)
if CONFIG.notify.enabled:
notifier = Notifier(
host=CONFIG.notify.host, port=CONFIG.notify.port, token=CONFIG.notify.token
)
else:
notifier = None


async def failed_flow_run_handler(
status: Literal["Failed", "Crashed"], flow: Flow, flow_run: FlowRun, state: State
) -> None:
if not CONFIG.feishu_notification.enabled:
logger.warn("未开启失败飞书通知,跳过消息发送", flow_run_name=flow_run.name)
if not notifier:
logger.warn("未开启 Gotify 通知,跳过失败通知发送", flow_run_name=flow_run.name)
return

response = await _CLIENT.post(
url=CONFIG.feishu_notification.webhook_url,
json={
"msg_type": "interactive",
"card": {
"type": "template",
"data": {
"template_id": CONFIG.feishu_notification.failure_card_id,
"template_variable": {
"status": status,
"flow_name": flow.name,
"run_name": flow_run.name,
"parameters": str(flow_run.parameters),
"start_time": flow_run.start_time.in_timezone(
"Asia/Shanghai"
).strftime(r"%Y-%m-%d %H:%M:%S")
if flow_run.start_time
else "[未知]",
"retries": flow_run.empirical_policy.retries,
"error_message": state.message.replace(
"Flow run encountered an exception. ", ""
)
if state.message
else "[未知]",
"details_url": f"http://prod-server:4200/flow-runs/flow-run/{flow_run.id}",
},
},
},
},
start_time = (
flow_run.start_time.in_timezone("Asia/Shanghai").strftime(r"%Y-%m-%d %H:%M:%S")
if flow_run.start_time
else "[未知]",
)
error_message = (
state.message.replace("Flow run encountered an exception. ", "")
if state.message
else "[未知]"
)

await notifier.send_message(
title=f"工作流 {flow.name} 运行失败({status})",
message=f"""
运行名称:{flow_run.name}\n
参数:{flow_run.parameters!s}\n
开始时间:{start_time}\n
重试次数:{flow_run.empirical_policy.retries}\n
错误信息:{error_message}\n\n
[在 Prefect 中查看](http://prod-server:4200/flow-runs/flow-run/{flow_run.id}/)
""",
markdown=True,
)

try:
response.raise_for_status()
except HTTPStatusError as e:
logger.error(
"发送失败飞书通知时发生异常", exception=e, flow_run_name=flow_run.name
)
else:
logger.info("发送失败飞书通知成功", flow_run_name=flow_run.name)
logger.info("发送失败通知成功", flow_run_name=flow_run.name)


def generate_flow_config(
Expand Down
6 changes: 3 additions & 3 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5b1c3d3

Please sign in to comment.