Skip to content

Commit

Permalink
feat: 修改工作流运行失败通知推送方式
Browse files Browse the repository at this point in the history
  • Loading branch information
FHU-yezi committed Feb 25, 2024
1 parent 6a758f6 commit aedb2cc
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 1 deletion.
6 changes: 5 additions & 1 deletion config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@ mongodb:
database: jfetcher
log:
save_level: DEBUG
print_level: DEBUG
print_level: DEBUG
feishu_notification:
enabled: false
webhook_url: <webhook-url>
failure_card_id: <card-id>
7 changes: 7 additions & 0 deletions utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@
)


class _FeishuNotification(Struct, **CONFIG_STRUCT_CONFIG):
enabled: bool = False
webhook_url: str = "<webhook-url>"
failure_card_id: str = "<card-id>"


class _Config(Struct, **CONFIG_STRUCT_CONFIG):
version: str = "v3.0.0"
mongodb: MongoDBConfig = MongoDBConfig()
log: LoggingConfig = LoggingConfig()
feishu_notification: _FeishuNotification = _FeishuNotification()


CONFIG = load_or_save_default_config(_Config)
3 changes: 3 additions & 0 deletions utils/config_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from prefect.client.schemas.schedules import CronSchedule

from utils.config import CONFIG
from utils.event_handlers import on_failure_or_crashed


def generate_flow_config(
Expand All @@ -17,6 +18,8 @@ def generate_flow_config(
"retries": retries,
"retry_delay_seconds": retry_delay_seconds,
"timeout_seconds": timeout,
"on_failure": [on_failure_or_crashed],
"on_crashed": [on_failure_or_crashed],
}


Expand Down
37 changes: 37 additions & 0 deletions utils/event_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from httpx import AsyncClient
from prefect.client.schemas.objects import Flow, FlowRun, State

from utils.config import CONFIG

CLIENT = AsyncClient(http2=True)


async def on_failure_or_crashed(flow: Flow, flow_run: FlowRun, state: State) -> None:
if not CONFIG.feishu_notification.enabled:
return

response = await CLIENT.post(
url=CONFIG.feishu_notification.webhook_url,
json={
"msg_type": "interactive",
"card": {
"type": "template",
"data": {
"template_id": CONFIG.feishu_notification.failure_card_id,
"template_variable": {
"status": flow_run.state_name,
"flow_name": flow.name,
"run_name": flow_run.name,
"deployment_name": "[To-Do]",
"start_time": flow_run.start_time.strftime(r"%Y-%m-%d %H:%M:%S")
if flow_run.start_time
else "[未知]",
"error_message": state.message,
"details_url": f"http://prod-server:4200/flow-runs/flow-run/{flow_run.id}",
},
},
},
},
)

response.raise_for_status()

0 comments on commit aedb2cc

Please sign in to comment.