Skip to content

Commit

Permalink
Remove special-casing of LSSTComCam/LSSTCam.
Browse files Browse the repository at this point in the history
The original behavior was to crash and restart the entire fan-out
service if it got a message it didn't know how to handle. If the
message loop instead catches the corresponding exception, then there is
no need to specifically exempt the LSST cameras.

With this commit, it's now safe to configure the SUPPORTED_INSTRUMENTS
env variable.
  • Loading branch information
kfindeisen committed Oct 15, 2024
1 parent 3fe4916 commit 8d93f83
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,6 @@ def make_fanned_out_messages(message: NextVisitModel,
Raised if ``message`` cannot be fanned-out or sent.
"""
match message.instrument:
case "LSSTComCam" | "LSSTCam" as instrument:
raise UnsupportedMessageError(f"Ignore {instrument} message {message}"
" as the prompt service for this is not yet deployed.")
case "HSC":
# HSC has extra active detector configurations just for the
# upload.py test.
Expand Down Expand Up @@ -496,25 +493,27 @@ async def main() -> None:

while True: # run continously
async for msg in consumer:

next_visit_message_initial = await deserializer.deserialize(
data=msg.value
)
logging.info(f"message deserialized {next_visit_message_initial}")

if not is_handleable(next_visit_message_initial["message"], expire):
continue

next_visit_message_updated = NextVisitModel.from_raw_message(
next_visit_message_initial["message"]
)

send_info = make_fanned_out_messages(next_visit_message_updated,
instruments,
gauges,
hsc_upload_detectors,
)
dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges)
try:
next_visit_message_initial = await deserializer.deserialize(
data=msg.value
)
logging.info(f"message deserialized {next_visit_message_initial}")

if not is_handleable(next_visit_message_initial["message"], expire):
continue

next_visit_message_updated = NextVisitModel.from_raw_message(
next_visit_message_initial["message"]
)

send_info = make_fanned_out_messages(next_visit_message_updated,
instruments,
gauges,
hsc_upload_detectors,
)
dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges)
except UnsupportedMessageError:
logging.exception("Could not process message, continuing.")
finally:
await consumer.stop()

Expand Down

0 comments on commit 8d93f83

Please sign in to comment.