From e9e4b8ff1c531da221e705722edb03e01e21996f Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 14 Jun 2024 14:48:06 -0700 Subject: [PATCH 1/2] Organize imports. This makes it easier to figure out what is imported (and fixes a duplicate import that had slipped through the cracks). --- src/main.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main.py b/src/main.py index 51d7e9f..f3fa603 100644 --- a/src/main.py +++ b/src/main.py @@ -1,26 +1,26 @@ +import asyncio +import dataclasses import json import logging import os +from pathlib import Path import sys -import asyncio -import httpx -import yaml import typing -import dataclasses + from aiokafka import AIOKafkaConsumer # type:ignore from cloudevents.conversion import to_structured from cloudevents.http import CloudEvent -from dataclasses import dataclass -from pathlib import Path -from kafkit.registry.httpx import RegistryApi +import httpx from kafkit.registry import Deserializer +from kafkit.registry.httpx import RegistryApi from prometheus_client import start_http_server, Summary # type:ignore from prometheus_client import Gauge +import yaml REQUEST_TIME = Summary("request_processing_seconds", "Time spent processing request") -@dataclass +@dataclasses.dataclass class NextVisitModel: "Next Visit Message" salIndex: int From bf44a1b17b0b48ebf46ea10c3c0b064cab08c907 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 14 Jun 2024 14:52:18 -0700 Subject: [PATCH 2/2] Check for and filter old messages. The MESSAGE_EXPIRATION envvar sets the maximum allowed age of a message, in seconds. Ignoring old messages makes Prompt Processing more robust against network problems that cause large volumes of messages to be delivered at once. (The messages are not lost permanently; they can be processed in catch-up.) --- src/main.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/main.py b/src/main.py index f3fa603..bd30b71 100644 --- a/src/main.py +++ b/src/main.py @@ -1,10 +1,12 @@ import asyncio import dataclasses +import datetime import json import logging import os from pathlib import Path import sys +import time import typing from aiokafka import AIOKafkaConsumer # type:ignore @@ -160,6 +162,7 @@ async def main() -> None: group_id = os.environ["CONSUMER_GROUP"] topic = os.environ["NEXT_VISIT_TOPIC"] offset = os.environ["OFFSET"] + expire = float(os.environ["MESSAGE_EXPIRATION"]) kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"] latiss_knative_serving_url = os.environ["LATISS_KNATIVE_SERVING_URL"] lsstcomcam_knative_serving_url = os.environ["LSSTCOMCAM_KNATIVE_SERVING_URL"] @@ -273,6 +276,19 @@ async def main() -> None: "it's not an observation.") continue + # efdStamp is visit publication, in seconds since 1970-01-01 UTC + if next_visit_message_initial["message"]["private_efdStamp"]: + published = next_visit_message_initial["message"]["private_efdStamp"] + age = time.time() - published + if age > expire: + logging.warning("Message published at %s is %s old, ignoring.", + time.ctime(published), + datetime.timedelta(seconds=age) + ) + continue + else: + logging.warning("Message does not have private_efdStamp, can't determine age.") + next_visit_message_updated = NextVisitModel( salIndex=next_visit_message_initial["message"]["salIndex"], scriptSalIndex=next_visit_message_initial["message"][