Skip to content

How to Handle Automatic Reconnection in Lapin When Connection is Lost? #420

@nitsarut1

Description

@nitsarut1

I'm new to Rust and currently using lapin version 2.5.0 to connect to RabbitMQ for message consumption. I've successfully set up connections and channels, but I'm encountering issues with maintaining the connection and handling automatic reconnection when the connection drops (e.g., due to network instability or a RabbitMQ restart).

One of the error logs I see is:

2024-11-14T10:35:33.300071Z  WARN job::services::amqp: 301: Error receiving message: ProtocolError(AMQPError { kind: Hard(CONNECTIONFORCED), message: ShortString("CONNECTION_FORCED - Closed via Web management") })
main.rs:
let amqp_url = env::var("AMQP_URL").expect("AMQP_URL must be set")
let amqp_conn = amqp::create_connection(&amqp_url).await
let channel_set = amqp::create_channel(&amqp_conn).await
tokio::spawn(async move {
    amqp::setup_consumer(
        &channel_set,
        &"test".to_string(),
        &"next".to_string(),
        &queue_trigger,
    )
    .await
})
amqp.rs:
pub async fn create_connection(url: &str) -> Connection {
    Connection::connect(url, ConnectionProperties::default())
        .await
        .expect("Failed to create connection")
}

pub async fn create_channel(connection: &Connection) -> Channel {
    connection
        .create_channel()
        .await
        .expect("Failed to create channel")
}

pub async fn setup_consumer(channel: &Channel, exchange: &String, queue_name: &String) {
    let consumer = channel
        .basic_consume(
            queue_name,
            queue_name,
            BasicConsumeOptions::default(),
            FieldTable::default(),
        )
        .await
        .expect("Failed to start consumer");

    consumer
        .for_each(|delivery| async {
            match delivery {
                Ok(delivery) => {
                    let message = String::from_utf8_lossy(&delivery.data);
                    info!("Render received payload for upload: {}", message);
                    acknowledge_message(&delivery, "Render Consumer".to_string()).await;
                }
                Err(error) => {
                    warn!("Error receiving message: {:?}", error);
                }
            }
        })
        .await;
}

Questions
What’s the best way to implement automatic reconnection in lapin?

I'd like to avoid having to manually reinitialize Connection and Channels every time the connection drops. Is there a recommended way to handle this within lapin?
Is there an option to set up a handler to automatically reconnect and recreate channels and consumers?

I found that on_error can detect errors, but it still requires custom logic for managing reconnections. Is there a pattern or approach that the team recommends for handling reconnections in lapin?
Since I'm new to Rust, I’d really appreciate any advice or examples you can provide. Thank you very much for your time and assistance!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions