From 69f739092b5a4cda14471b9068f88f4fd7efd729 Mon Sep 17 00:00:00 2001 From: dspeck1 Date: Thu, 21 Nov 2024 09:05:44 -0600 Subject: [PATCH] Add fan out delivery time to track message arrival --- src/main.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/main.py b/src/main.py index 7450ac4..45bf7e1 100644 --- a/src/main.py +++ b/src/main.py @@ -46,6 +46,7 @@ def add_detectors( self, message: dict, active_detectors: list, + fan_out_delivery_time: str ) -> list[dict[str, str]]: """Adds and duplicates next visit messages for fanout. @@ -55,6 +56,8 @@ def add_detectors( The next visit message. active_detectors: `list` The active detectors for an instrument. + fan_out_delivery_Time: `str` + The time that fan out sends the message in system time. Yields ------ message_list : `list` @@ -103,6 +106,7 @@ async def fan_out_msg( await producer.start() logging.info(f"sending msg {data}") await producer.send_and_wait(fan_out_topic, data) + # TODO Review if flush needed. await producer.flush() await producer.stop() @@ -269,6 +273,8 @@ async def main() -> None: "it's not an observation.") continue + fan_out_delivery_time = time.time() + ''' # Temporary disable so we can see older messages for testing. @@ -323,6 +329,7 @@ async def main() -> None: next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), latiss_active_detectors, + fan_out_delivery_time, ) ) fan_out_topic = fan_out_latiss_topic @@ -334,6 +341,7 @@ async def main() -> None: dataclasses.asdict(next_visit_message_updated), # Just use ComCam active detector config. lsstcomcam_active_detectors, + fan_out_delivery_time, ) ) fan_out_topic = fan_out_comcamsim_topic @@ -356,6 +364,7 @@ async def main() -> None: next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_active_detectors, + fan_out_delivery_time, ) ) fan_out_topic = fan_out_hsc_topic @@ -366,6 +375,7 @@ async def main() -> None: next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_active_detectors_59134, + fan_out_delivery_time, ) ) fan_out_topic = fan_out_hsc_topic @@ -376,6 +386,7 @@ async def main() -> None: next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_active_detectors_59142, + fan_out_delivery_time, ) ) fan_out_topic = fan_out_hsc_topic @@ -386,6 +397,7 @@ async def main() -> None: next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_active_detectors_59150, + fan_out_delivery_time, ) ) fan_out_topic = fan_out_hsc_topic @@ -396,6 +408,7 @@ async def main() -> None: next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_active_detectors_59160, + fan_out_delivery_time, ) ) fan_out_topic = fan_out_hsc_topic