diff --git a/app.py b/app.py index f0a8af0..99362da 100644 --- a/app.py +++ b/app.py @@ -4,6 +4,7 @@ """ import os +import threading import logging import json from datetime import datetime, timezone @@ -73,7 +74,7 @@ def connect_to_postgres(): return None -def callback(_ch, _method, _properties, body): +def callback(ch, method, _properties, body): """Callback function to process messages from the RabbitMQ queue.""" logger.info("Callback triggered.") try: @@ -92,7 +93,7 @@ def callback(_ch, _method, _properties, body): location_values = [] for loc_type in ["FromCity", "FromState", "FromCountry", "FromZip"]: if message.get(loc_type): - location_columns.append(f"\"{loc_type}\"") + location_columns.append(f'"{loc_type}"') location_values.append(message.get(loc_type)) # Prepare additional columns and values for media items @@ -103,11 +104,11 @@ def callback(_ch, _method, _properties, body): media_url_key = f"MediaUrl{i}" if message.get(media_type_key): - media_columns.append(f"\"MediaContentType{i}\"") + media_columns.append(f'"MediaContentType{i}"') media_values.append(message.get(media_type_key)) if message.get(media_url_key): - media_columns.append(f"\"MediaUrl{i}\"") + media_columns.append(f'"MediaUrl{i}"') media_values.append(message.get(media_url_key)) # Combine static columns with dynamic columns @@ -158,6 +159,7 @@ def callback(_ch, _method, _properties, body): conn.commit() logger.info("Inserted message into Postgres.") + ch.basic_ack(delivery_tag=method.delivery_tag) except psycopg.errors.DatabaseError as db_error: logger.error("Database error during insertion: %s", db_error) finally: @@ -179,15 +181,63 @@ def consume_messages(): try: connection = pika.BlockingConnection(parameters) channel = connection.channel() - channel.queue_declare(queue=POSTGRES_QUEUE, durable=True) - channel.basic_consume( - queue=POSTGRES_QUEUE, on_message_callback=callback, auto_ack=True + + # Declare the dead-letter exchange and queue + channel.exchange_declare( + exchange="dead_letter_exchange", exchange_type="direct" ) + channel.queue_declare(queue="dead_letter_queue", durable=True) + channel.queue_bind( + exchange="dead_letter_exchange", queue="dead_letter_queue" + ) + + channel.queue_declare( + queue=POSTGRES_QUEUE, + durable=True, + arguments={ + "x-message-ttl": 60000, # TTL of 60 seconds + "x-dead-letter-exchange": "dead_letter_exchange", + }, + ) + channel.basic_consume( + queue=POSTGRES_QUEUE, on_message_callback=callback, auto_ack=False + ) # auto_ack=False to ensure messages are not lost if the consumer crashes logger.info("Now ready to consume messages.") channel.start_consuming() except pika.exceptions.AMQPConnectionError as e: logger.error("Failed to connect to RabbitMQ: %s", e) - logger.info("Retrying in 5 seconds...") + logger.debug("Retrying in 5 seconds...") + time.sleep(5) + + +def retry_dead_messages(): + """Retry messages from the dead-letter queue.""" + while True: + logger.debug("Attempting to connect to RabbitMQ for retry...") + credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) + parameters = pika.ConnectionParameters( + host=RABBITMQ_HOST, credentials=credentials + ) + try: + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + # Consume messages from the dead-letter queue + def retry_callback(ch, method, properties, body): + logger.info("Retrying message from dead-letter queue.") + ch.basic_publish(exchange="", routing_key=POSTGRES_QUEUE, body=body) + ch.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_consume( + queue="dead_letter_queue", + on_message_callback=retry_callback, + auto_ack=False, + ) + logger.info("Now ready to retry messages from dead-letter queue.") + channel.start_consuming() + except pika.exceptions.AMQPConnectionError as e: + logger.error("Failed to connect to RabbitMQ for retry: %s", e) + logger.debug("Retrying in 5 seconds...") time.sleep(5) @@ -199,5 +249,12 @@ def hello_world(): if __name__ == "__main__": logger.info("Starting Flask app and RabbitMQ consumer...") - consume_messages() + + main_consumer_thread = threading.Thread(target=consume_messages) + main_consumer_thread.start() + + # Start the retry consumer in a separate thread + retry_consumer_thread = threading.Thread(target=retry_dead_messages) + retry_consumer_thread.start() + app.run(host="0.0.0.0", port=APP_PORT) diff --git a/diagrams/sequence.puml b/diagrams/sequence.puml new file mode 100644 index 0000000..c8a6211 --- /dev/null +++ b/diagrams/sequence.puml @@ -0,0 +1,37 @@ +@startuml Sequence Diagram +participant FlaskApp +participant Logger +participant Postgres +participant RabbitMQ + +User -> FlaskApp: Start Application +activate FlaskApp +FlaskApp -> FlaskApp: consume_messages() +activate RabbitMQ + +FlaskApp -> RabbitMQ: connect to RabbitMQ +alt Connection Successful + RabbitMQ -> RabbitMQ: start_consuming() + loop While Consuming + RabbitMQ -> RabbitMQ: callback() + activate RabbitMQ + RabbitMQ -> RabbitMQ: json.loads(body) + RabbitMQ -> Postgres: connect_to_postgres() + activate Postgres + alt Connection Successful + Postgres -> Postgres: Insert data into Postgres + Postgres -> Postgres: commit() + else Connection Failed + RabbitMQ -> RabbitMQ: Send to dead letter exchange to be requeued + end + deactivate Postgres + RabbitMQ -> RabbitMQ: ack() + RabbitMQ -> Postgres: close connection + deactivate RabbitMQ + end +else Connection Failed + RabbitMQ -> RabbitMQ: time.sleep(5) +end +deactivate RabbitMQ +deactivate FlaskApp +@enduml \ No newline at end of file