Skip to content

Commit

Permalink
add serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeck1 committed Oct 1, 2024
1 parent eca3cfc commit e9e9857
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,17 @@ def detector_load(conf: dict, instrument: str) -> list[int]:
active_detectors.append(k)
return active_detectors

def serializer(value):
return json.dumps(value).encode()

async def fan_out_msg(
producer,
fan_out_topic,
data
):
await producer.start()
logging.info(f"sending msg {data}")
await producer.send_and_wait(fan_out_topic, b(json.dumps(data)))
await producer.send_and_wait(fan_out_topic, data)
await producer.stop()


Expand Down Expand Up @@ -369,7 +372,8 @@ async def main() -> None:
try:
# https://aiokafka.readthedocs.io/en/stable/producer.html
producer = AIOKafkaProducer(
bootstrap_servers=prompt_processing_kafka_cluster
bootstrap_servers=prompt_processing_kafka_cluster,
value_serializer=serializer
)
await producer.start()
logging.info ("started kafka producer")
Expand Down

0 comments on commit e9e9857

Please sign in to comment.