diff --git a/src/main.py b/src/main.py index f3fa603..b114bf3 100644 --- a/src/main.py +++ b/src/main.py @@ -1,10 +1,12 @@ import asyncio import dataclasses +import datetime import json import logging import os from pathlib import Path import sys +import time import typing from aiokafka import AIOKafkaConsumer # type:ignore @@ -160,6 +162,7 @@ async def main() -> None: group_id = os.environ["CONSUMER_GROUP"] topic = os.environ["NEXT_VISIT_TOPIC"] offset = os.environ["OFFSET"] + expire = os.environ["MESSAGE_EXPIRATION"] kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"] latiss_knative_serving_url = os.environ["LATISS_KNATIVE_SERVING_URL"] lsstcomcam_knative_serving_url = os.environ["LSSTCOMCAM_KNATIVE_SERVING_URL"] @@ -273,6 +276,19 @@ async def main() -> None: "it's not an observation.") continue + # efdStamp is visit publication, in seconds since 1970-01-01 UTC + if next_visit_message_initial["message"]["private_efdStamp"]: + published = next_visit_message_initial["message"]["private_efdStamp"] + age = time.time() - published + if age > expire: + logging.warning("Message published at %s is %s old, ignoring.", + time.ctime(published), + datetime.timedelta(seconds=age) + ) + continue + else: + logging.warning("Message does not have private_efdStamp, can't determine age.") + next_visit_message_updated = NextVisitModel( salIndex=next_visit_message_initial["message"]["salIndex"], scriptSalIndex=next_visit_message_initial["message"][