Skip to content

Commit

Permalink
Check for and filter old messages.
Browse files Browse the repository at this point in the history
The MESSAGE_EXPIRATION envvar sets the maximum allowed age of a
message, in seconds. Ignoring old messages makes Prompt Processing more
robust against network problems that cause large volumes of messages to
be delivered at once. (The messages are not lost permanently; they can
be processed in catch-up.)
  • Loading branch information
kfindeisen committed Jun 14, 2024
1 parent e9e4b8f commit 6adb8c0
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import dataclasses
import datetime
import json
import logging
import os
from pathlib import Path
import sys
import time
import typing

from aiokafka import AIOKafkaConsumer # type:ignore
Expand Down Expand Up @@ -160,6 +162,7 @@ async def main() -> None:
group_id = os.environ["CONSUMER_GROUP"]
topic = os.environ["NEXT_VISIT_TOPIC"]
offset = os.environ["OFFSET"]
expire = os.environ["MESSAGE_EXPIRATION"]
kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"]
latiss_knative_serving_url = os.environ["LATISS_KNATIVE_SERVING_URL"]
lsstcomcam_knative_serving_url = os.environ["LSSTCOMCAM_KNATIVE_SERVING_URL"]
Expand Down Expand Up @@ -273,6 +276,19 @@ async def main() -> None:
"it's not an observation.")
continue

# efdStamp is visit publication, in seconds since 1970-01-01 UTC
if next_visit_message_initial["message"]["private_efdStamp"]:
published = next_visit_message_initial["message"]["private_efdStamp"]
age = time.time() - published
if age > expire:
logging.warning("Message published at %s is %s old, ignoring.",
time.ctime(published),
datetime.timedelta(seconds=age)
)
continue
else:
logging.warning("Message does not have private_efdStamp, can't determine age.")

next_visit_message_updated = NextVisitModel(
salIndex=next_visit_message_initial["message"]["salIndex"],
scriptSalIndex=next_visit_message_initial["message"][
Expand Down

0 comments on commit 6adb8c0

Please sign in to comment.