Skip to content

Commit

Permalink
Factor message-handling loop into invididual functions.
Browse files Browse the repository at this point in the history
This change makes the loop much easier to read and edit. However, the
functions need to take an inordinate number of "global" variables, so
they could use some further factoring.
  • Loading branch information
kfindeisen committed Oct 15, 2024
1 parent 88872c4 commit 3fe4916
Showing 1 changed file with 194 additions and 106 deletions.
300 changes: 194 additions & 106 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,40 @@ class NextVisitModel:
totalCheckpoints: int
private_sndStamp: float

@staticmethod
def from_raw_message(message: dict[str, typing.Any]) -> typing.Self:
"""Factory creating a NextVisitModel from an unpacked message.
Parameters
----------
message : `dict` [`str`]
A mapping containing message fields.
Returns
-------
model : `NextVisitModel`
An object containing the fields in the message.
"""
# Message may contain fields that aren't in NextVisitModel
return NextVisitModel(
salIndex=message["salIndex"],
scriptSalIndex=message["scriptSalIndex"],
instrument=message["instrument"],
groupId=message["groupId"],
coordinateSystem=message["coordinateSystem"],
position=message["position"],
startTime=message["startTime"],
rotationSystem=message["rotationSystem"],
cameraAngle=message["cameraAngle"],
filters=message["filters"],
dome=message["dome"],
duration=message["duration"],
nimages=message["nimages"],
survey=message["survey"],
totalCheckpoints=message["totalCheckpoints"],
private_sndStamp=message["private_sndStamp"],
)

def add_detectors(
self,
active_detectors: list,
Expand Down Expand Up @@ -161,6 +195,102 @@ class Submission:
"""The messages to send to ``url`` (collection [`dict`])."""


class UnsupportedMessageError(RuntimeError):
"""Exception raised if there is no Prompt Processing instance for a given
nextVisit message.
"""
pass


def is_handleable(message: dict[str, typing.Any], expire: float) -> bool:
"""Test whether a nextVisit message has enough data to be handled by
fan-out.
This function emits explanatory logs as a side effect.
Parameters
----------
message : `dict` [`str`]
An unpacked mapping of message fields.
expire : `float`
The maximum age, in seconds, that a message can still be handled.
Returns
-------
handleable : `bool`
`True` is the message can be processed, `False` otherwise.
"""
if not message["instrument"]:
logging.info("Message does not have an instrument. Assuming it's not an observation.")
return False

# efdStamp is visit publication, in seconds since 1970-01-01 UTC
if message["private_efdStamp"]:
published = message["private_efdStamp"]
age = round(time.time() - published) # Microsecond precision is distracting
if age > expire:
logging.warning("Message published on %s UTC is %s old, ignoring.",
time.ctime(published),
datetime.timedelta(seconds=age)
)
return False
else:
logging.warning("Message does not have private_efdStamp, can't determine age.")
return True


def make_fanned_out_messages(message: NextVisitModel,
instruments: collections.abc.Mapping[str, InstrumentConfig],
gauges: collections.abc.Mapping[str, Metrics],
hsc_upload_detectors: collections.abc.Mapping[int,
collections.abc.Collection[int]]
) -> Submission:
"""Create appropriate fanned-out messages for an incoming message.
Parameters
----------
message : `NextVisitModel`
The message to fan out.
instruments : mapping [`str`, `InstrumentConfig`]
A mapping from instrument name to configuration information.
gauges : mapping [`str`, `Metrics`]
A mapping from instrument name to metrics for that instrument.
hsc_upload_detectors : mapping [`int`, collection [`int`]]
A mapping from HSC-Cosmos visit to the supported detectors for that visit.
Returns
-------
send_info : `Submission`
The fanned out messages, along with where to send them.
Raises
------
UnsupportedMessageError
Raised if ``message`` cannot be fanned-out or sent.
"""
match message.instrument:
case "LSSTComCam" | "LSSTCam" as instrument:
raise UnsupportedMessageError(f"Ignore {instrument} message {message}"
" as the prompt service for this is not yet deployed.")
case "HSC":
# HSC has extra active detector configurations just for the
# upload.py test.
match message.salIndex:
case 999: # HSC datasets from using upload_from_repo.py
gauges["HSC"].total_received.inc()
return fan_out(message, instruments["HSC"])
case visit if visit in hsc_upload_detectors: # upload.py test datasets
gauges["HSC"].total_received.inc()
return fan_out_hsc(message, instruments["HSC"], hsc_upload_detectors[visit])
case _:
raise UnsupportedMessageError(f"No matching case for HSC salIndex {message.salIndex}")
case instrument if instrument in instruments:
gauges[instrument].total_received.inc()
return fan_out(message, instruments[instrument])
case _:
raise UnsupportedMessageError(f"no matching case for instrument {message.instrument}.")


def fan_out(next_visit, inst_config):
"""Prepare fanned-out messages for sending to the Prompt Processing service.
Expand Down Expand Up @@ -199,6 +329,61 @@ def fan_out_hsc(next_visit, inst_config, detectors):
return Submission(inst_config.url, next_visit.add_detectors(detectors))


def dispatch_fanned_out_messages(client: httpx.AsyncClient,
topic: str,
tasks: collections.abc.MutableSet[asyncio.Task],
send_info: Submission,
gauges: collections.abc.Mapping[str, Metrics],
):
"""Package and send the fanned-out messages to Prompt Processing.
Parameters
----------
client : `httpx.AsyncClient`
The client to which to upload the messages.
topic : `str`
The topic to which to upload the messages.
tasks : set [`asyncio.Task`]
Collection for holding the requests.
send_info : `Submission`
The data and address to submit.
gauges : mapping [`str`, `Metrics`]
A mapping from instrument name to metrics for that instrument.
"""
try:
attributes = {
"type": "com.example.kafka",
"source": topic,
}

for fan_out_message in send_info.fan_out_messages:
data = fan_out_message
data_json = json.dumps(data)

logging.info(f"data after json dump {data_json}")
event = CloudEvent(attributes, data_json)
headers, body = to_structured(event)
info = {
key: data[key] for key in ["instrument", "groupId", "detector"]
}

task = asyncio.create_task(
knative_request(
gauges[fan_out_message["instrument"]].in_process,
client,
send_info.url,
headers,
body,
str(info),
)
)
tasks.add(task)
task.add_done_callback(tasks.discard)

except ValueError:
logging.exception("Error while sending fanned-out messages.")


@REQUEST_TIME.time()
async def knative_request(
in_process_requests_gauge,
Expand Down Expand Up @@ -315,118 +500,21 @@ async def main() -> None:
next_visit_message_initial = await deserializer.deserialize(
data=msg.value
)

logging.info(f"message deserialized {next_visit_message_initial}")

if not next_visit_message_initial["message"]["instrument"]:
logging.info("Message does not have an instrument. Assuming "
"it's not an observation.")
if not is_handleable(next_visit_message_initial["message"], expire):
continue

# efdStamp is visit publication, in seconds since 1970-01-01 UTC
if next_visit_message_initial["message"]["private_efdStamp"]:
published = next_visit_message_initial["message"]["private_efdStamp"]
age = round(time.time() - published) # Microsecond precision is distracting
if age > expire:
logging.warning("Message published on %s UTC is %s old, ignoring.",
time.ctime(published),
datetime.timedelta(seconds=age)
)
continue
else:
logging.warning("Message does not have private_efdStamp, can't determine age.")

next_visit_message_updated = NextVisitModel(
salIndex=next_visit_message_initial["message"]["salIndex"],
scriptSalIndex=next_visit_message_initial["message"][
"scriptSalIndex"
],
instrument=next_visit_message_initial["message"]["instrument"],
groupId=next_visit_message_initial["message"]["groupId"],
coordinateSystem=next_visit_message_initial["message"][
"coordinateSystem"
],
position=next_visit_message_initial["message"]["position"],
startTime=next_visit_message_initial["message"]["startTime"],
rotationSystem=next_visit_message_initial["message"][
"rotationSystem"
],
cameraAngle=next_visit_message_initial["message"][
"cameraAngle"
],
filters=next_visit_message_initial["message"]["filters"],
dome=next_visit_message_initial["message"]["dome"],
duration=next_visit_message_initial["message"]["duration"],
nimages=next_visit_message_initial["message"]["nimages"],
survey=next_visit_message_initial["message"]["survey"],
totalCheckpoints=next_visit_message_initial["message"][
"totalCheckpoints"
],
private_sndStamp=next_visit_message_initial["message"][
"private_sndStamp"
],
next_visit_message_updated = NextVisitModel.from_raw_message(
next_visit_message_initial["message"]
)

match next_visit_message_updated.instrument:
case "LSSTComCam" | "LSSTCam" as instrument:
logging.info(f"Ignore {instrument} message {next_visit_message_updated}"
" as the prompt service for this is not yet deployed.")
continue
case "HSC":
# HSC has extra active detector configurations just for the
# upload.py test.
match next_visit_message_updated.salIndex:
case 999: # HSC datasets from using upload_from_repo.py
gauges["HSC"].total_received.inc()
send_info = fan_out(next_visit_message_updated, instruments["HSC"])
case visit if visit in hsc_upload_detectors: # upload.py test datasets
gauges["HSC"].total_received.inc()
send_info = fan_out_hsc(next_visit_message_updated, instruments["HSC"],
hsc_upload_detectors[visit])
case _:
raise RuntimeError("No matching case for HSC "
f"salIndex {next_visit_message_updated.salIndex}")
case instrument if instrument in supported_instruments:
gauges[instrument].total_received.inc()
send_info = fan_out(next_visit_message_updated, instruments[instrument])
case _:
raise RuntimeError(
f"no matching case for instrument {next_visit_message_updated.instrument}."
)

try:
attributes = {
"type": "com.example.kafka",
"source": topic,
}

for fan_out_message in send_info.fan_out_messages:
data = fan_out_message
data_json = json.dumps(data)

logging.info(f"data after json dump {data_json}")
event = CloudEvent(attributes, data_json)
headers, body = to_structured(event)
info = {
key: data[key] for key in ["instrument", "groupId", "detector"]
}

task = asyncio.create_task(
knative_request(
gauges[fan_out_message["instrument"]].in_process,
client,
send_info.url,
headers,
body,
str(info),
)
)
tasks.add(task)
task.add_done_callback(tasks.discard)

except ValueError as e:
logging.info("Error ", e)

send_info = make_fanned_out_messages(next_visit_message_updated,
instruments,
gauges,
hsc_upload_detectors,
)
dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges)
finally:
await consumer.stop()

Expand Down

0 comments on commit 3fe4916

Please sign in to comment.