Skip to content

Commit

Permalink
Implement RabbitMQ dead-letter queue handling and retry mechanism; re…
Browse files Browse the repository at this point in the history
…factor callback function to explicitly acknowledge completion
  • Loading branch information
mdrxy committed Nov 7, 2024
1 parent 766d215 commit 313051e
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 9 deletions.
75 changes: 66 additions & 9 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

import os
import threading
import logging
import json
from datetime import datetime, timezone
Expand Down Expand Up @@ -73,7 +74,7 @@ def connect_to_postgres():
return None


def callback(_ch, _method, _properties, body):
def callback(ch, method, _properties, body):
"""Callback function to process messages from the RabbitMQ queue."""
logger.info("Callback triggered.")
try:
Expand All @@ -92,7 +93,7 @@ def callback(_ch, _method, _properties, body):
location_values = []
for loc_type in ["FromCity", "FromState", "FromCountry", "FromZip"]:
if message.get(loc_type):
location_columns.append(f"\"{loc_type}\"")
location_columns.append(f'"{loc_type}"')
location_values.append(message.get(loc_type))

# Prepare additional columns and values for media items
Expand All @@ -103,11 +104,11 @@ def callback(_ch, _method, _properties, body):
media_url_key = f"MediaUrl{i}"

if message.get(media_type_key):
media_columns.append(f"\"MediaContentType{i}\"")
media_columns.append(f'"MediaContentType{i}"')
media_values.append(message.get(media_type_key))

if message.get(media_url_key):
media_columns.append(f"\"MediaUrl{i}\"")
media_columns.append(f'"MediaUrl{i}"')
media_values.append(message.get(media_url_key))

# Combine static columns with dynamic columns
Expand Down Expand Up @@ -158,6 +159,7 @@ def callback(_ch, _method, _properties, body):

conn.commit()
logger.info("Inserted message into Postgres.")
ch.basic_ack(delivery_tag=method.delivery_tag)
except psycopg.errors.DatabaseError as db_error:
logger.error("Database error during insertion: %s", db_error)
finally:
Expand All @@ -179,15 +181,63 @@ def consume_messages():
try:
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=POSTGRES_QUEUE, durable=True)
channel.basic_consume(
queue=POSTGRES_QUEUE, on_message_callback=callback, auto_ack=True

# Declare the dead-letter exchange and queue
channel.exchange_declare(
exchange="dead_letter_exchange", exchange_type="direct"
)
channel.queue_declare(queue="dead_letter_queue", durable=True)
channel.queue_bind(
exchange="dead_letter_exchange", queue="dead_letter_queue"
)

channel.queue_declare(
queue=POSTGRES_QUEUE,
durable=True,
arguments={
"x-message-ttl": 60000, # TTL of 60 seconds
"x-dead-letter-exchange": "dead_letter_exchange",
},
)
channel.basic_consume(
queue=POSTGRES_QUEUE, on_message_callback=callback, auto_ack=False
) # auto_ack=False to ensure messages are not lost if the consumer crashes
logger.info("Now ready to consume messages.")
channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
logger.error("Failed to connect to RabbitMQ: %s", e)
logger.info("Retrying in 5 seconds...")
logger.debug("Retrying in 5 seconds...")
time.sleep(5)


def retry_dead_messages():
"""Retry messages from the dead-letter queue."""
while True:
logger.debug("Attempting to connect to RabbitMQ for retry...")
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST, credentials=credentials
)
try:
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# Consume messages from the dead-letter queue
def retry_callback(ch, method, properties, body):
logger.info("Retrying message from dead-letter queue.")
ch.basic_publish(exchange="", routing_key=POSTGRES_QUEUE, body=body)
ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
queue="dead_letter_queue",
on_message_callback=retry_callback,
auto_ack=False,
)
logger.info("Now ready to retry messages from dead-letter queue.")
channel.start_consuming()
except pika.exceptions.AMQPConnectionError as e:
logger.error("Failed to connect to RabbitMQ for retry: %s", e)
logger.debug("Retrying in 5 seconds...")
time.sleep(5)


Expand All @@ -199,5 +249,12 @@ def hello_world():

if __name__ == "__main__":
logger.info("Starting Flask app and RabbitMQ consumer...")
consume_messages()

main_consumer_thread = threading.Thread(target=consume_messages)
main_consumer_thread.start()

# Start the retry consumer in a separate thread
retry_consumer_thread = threading.Thread(target=retry_dead_messages)
retry_consumer_thread.start()

app.run(host="0.0.0.0", port=APP_PORT)
37 changes: 37 additions & 0 deletions diagrams/sequence.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
@startuml Sequence Diagram
participant FlaskApp
participant Logger
participant Postgres
participant RabbitMQ

User -> FlaskApp: Start Application
activate FlaskApp
FlaskApp -> FlaskApp: consume_messages()
activate RabbitMQ

FlaskApp -> RabbitMQ: connect to RabbitMQ
alt Connection Successful
RabbitMQ -> RabbitMQ: start_consuming()
loop While Consuming
RabbitMQ -> RabbitMQ: callback()
activate RabbitMQ
RabbitMQ -> RabbitMQ: json.loads(body)
RabbitMQ -> Postgres: connect_to_postgres()
activate Postgres
alt Connection Successful
Postgres -> Postgres: Insert data into Postgres
Postgres -> Postgres: commit()
else Connection Failed
RabbitMQ -> RabbitMQ: Send to dead letter exchange to be requeued
end
deactivate Postgres
RabbitMQ -> RabbitMQ: ack()
RabbitMQ -> Postgres: close connection
deactivate RabbitMQ
end
else Connection Failed
RabbitMQ -> RabbitMQ: time.sleep(5)
end
deactivate RabbitMQ
deactivate FlaskApp
@enduml

0 comments on commit 313051e

Please sign in to comment.