diff --git a/utils/config_generators.py b/utils/config_generators.py index d83758a..145c61e 100644 --- a/utils/config_generators.py +++ b/utils/config_generators.py @@ -1,3 +1,4 @@ +from functools import partial from typing import Any, Dict, Union from prefect.client.schemas.schedules import CronSchedule @@ -18,8 +19,8 @@ def generate_flow_config( "retries": retries, "retry_delay_seconds": retry_delay_seconds, "timeout_seconds": timeout, - "on_failure": [on_failure_or_crashed], - "on_crashed": [on_failure_or_crashed], + "on_failure": [partial(on_failure_or_crashed, status="Failed")], + "on_crashed": [partial(on_failure_or_crashed, status="Crashed")], } diff --git a/utils/event_handlers.py b/utils/event_handlers.py index 531a813..bbc9776 100644 --- a/utils/event_handlers.py +++ b/utils/event_handlers.py @@ -1,3 +1,5 @@ +from typing import Literal + from httpx import AsyncClient from prefect.client.schemas.objects import Flow, FlowRun, State @@ -6,7 +8,9 @@ CLIENT = AsyncClient(http2=True) -async def on_failure_or_crashed(flow: Flow, flow_run: FlowRun, state: State) -> None: +async def on_failure_or_crashed( + status: Literal["Failed", "Crashed"], flow: Flow, flow_run: FlowRun, state: State +) -> None: if not CONFIG.feishu_notification.enabled: return @@ -19,14 +23,19 @@ async def on_failure_or_crashed(flow: Flow, flow_run: FlowRun, state: State) -> "data": { "template_id": CONFIG.feishu_notification.failure_card_id, "template_variable": { - "status": flow_run.state_name, + "status": status, "flow_name": flow.name, "run_name": flow_run.name, - "deployment_name": "[To-Do]", + "parameters": str(flow_run.parameters), "start_time": flow_run.start_time.strftime(r"%Y-%m-%d %H:%M:%S") if flow_run.start_time else "[未知]", - "error_message": state.message, + "retries": flow_run.empirical_policy.retries, + "error_message": state.message.replace( + "Flow run encountered an exception. ", "" + ) + if state.message + else "[未知]", "details_url": f"http://prod-server:4200/flow-runs/flow-run/{flow_run.id}", }, },