-
-
Notifications
You must be signed in to change notification settings - Fork 109
Description
I'm new to Rust and currently using lapin version 2.5.0 to connect to RabbitMQ for message consumption. I've successfully set up connections and channels, but I'm encountering issues with maintaining the connection and handling automatic reconnection when the connection drops (e.g., due to network instability or a RabbitMQ restart).
One of the error logs I see is:
2024-11-14T10:35:33.300071Z WARN job::services::amqp: 301: Error receiving message: ProtocolError(AMQPError { kind: Hard(CONNECTIONFORCED), message: ShortString("CONNECTION_FORCED - Closed via Web management") })
main.rs:
let amqp_url = env::var("AMQP_URL").expect("AMQP_URL must be set")
let amqp_conn = amqp::create_connection(&amqp_url).await
let channel_set = amqp::create_channel(&amqp_conn).await
tokio::spawn(async move {
amqp::setup_consumer(
&channel_set,
&"test".to_string(),
&"next".to_string(),
&queue_trigger,
)
.await
})
amqp.rs:
pub async fn create_connection(url: &str) -> Connection {
Connection::connect(url, ConnectionProperties::default())
.await
.expect("Failed to create connection")
}
pub async fn create_channel(connection: &Connection) -> Channel {
connection
.create_channel()
.await
.expect("Failed to create channel")
}
pub async fn setup_consumer(channel: &Channel, exchange: &String, queue_name: &String) {
let consumer = channel
.basic_consume(
queue_name,
queue_name,
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("Failed to start consumer");
consumer
.for_each(|delivery| async {
match delivery {
Ok(delivery) => {
let message = String::from_utf8_lossy(&delivery.data);
info!("Render received payload for upload: {}", message);
acknowledge_message(&delivery, "Render Consumer".to_string()).await;
}
Err(error) => {
warn!("Error receiving message: {:?}", error);
}
}
})
.await;
}
Questions
What’s the best way to implement automatic reconnection in lapin?
I'd like to avoid having to manually reinitialize Connection and Channels every time the connection drops. Is there a recommended way to handle this within lapin?
Is there an option to set up a handler to automatically reconnect and recreate channels and consumers?
I found that on_error can detect errors, but it still requires custom logic for managing reconnections. Is there a pattern or approach that the team recommends for handling reconnections in lapin?
Since I'm new to Rust, I’d really appreciate any advice or examples you can provide. Thank you very much for your time and assistance!