Skip to content

Commit 058494d

Browse files
committed
[bxrabbitmq] + Publisher Confirm
1 parent e6116cf commit 058494d

File tree

8 files changed

+81
-36
lines changed

8 files changed

+81
-36
lines changed

examples/tutorials/t1_receive.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ void receive ()
3232
{
3333
std::clog << "\nTUTORIAL 1 : 'Hello World' - receive\n\n" ;
3434
rabbitmq::connection_parameters c_par;
35-
c_par.host = "caerabbitmq.in2p3.fr";
36-
c_par.port = 5671;
37-
c_par.login = "guest;
35+
c_par.host = "localhost";
36+
c_par.port = 5672;
37+
c_par.login = "guest";
3838
c_par.passwd = "guest";
3939
rabbitmq::connection con (c_par);
4040
if (con.is_ok ()) {

examples/tutorials/t5_emit_log_topic.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ int main (int argc, char** argv)
3333
void emit_log_topic (const std::string routing_key_, const std::string message_)
3434
{
3535
std::clog << "\nTUTORIAL 5 : 'Topics' - emit log topic\n\n" ;
36+
const bool publisher_confirm = true;
3637
rabbitmq::connection_parameters c_par;
37-
rabbitmq::connection con (c_par);
38+
rabbitmq::connection con (c_par, publisher_confirm);
3839
if (con.is_ok ()) {
3940
rabbitmq::channel & chan = con.grab_channel ();
4041
rabbitmq::exchange_parameters x_par;

examples/tutorials/t6_rpc_client.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ int main (int argc, char** argv)
3333
void rpc_client (const unsigned int num)
3434
{
3535
std::clog << "\nTUTORIAL 6 : 'RPC' - rpc client\n\n" ;
36+
const bool publisher_confirm = true;
3637
rabbitmq::connection_parameters c_par;
37-
rabbitmq::connection con (c_par);
38+
rabbitmq::connection con (c_par, publisher_confirm);
3839
if (con.is_ok ()) {
3940
rabbitmq::channel & chan = con.grab_channel ();
4041
std::string routing_key;

src/rabbitmq/channel.cc

Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ namespace rabbitmq {
2929
static const basic_properties props_from_amqp (const amqp_basic_properties_t & props_);
3030

3131
public:
32-
impl ();
32+
impl (const bool publisher_confirm = false);
3333
~impl (); // TODO
3434

3535
void connect (const connection_parameters & params, const unsigned int num);
@@ -47,9 +47,7 @@ namespace rabbitmq {
4747
void basic_publish (const std::string & exchange_,
4848
const std::string & routing_key_,
4949
const std::string & body_,
50-
const basic_properties & props_,
51-
const bool & mandatory_,
52-
const bool & immediate_);
50+
const basic_properties & props_);
5351

5452
void basic_consume (const std::string & queue_,
5553
const std::string & consumer_tag_,
@@ -80,14 +78,17 @@ namespace rabbitmq {
8078
bool _channel_ok_;
8179
pid_t _consuming_;
8280
bool _acknowledge_;
81+
bool _publisher_confirm_;
8382
};
8483

8584
/*******************************************************************************/
8685

8786

88-
channel::channel (const connection & con_, const unsigned int num_)
87+
channel::channel (const connection & con_,
88+
const unsigned int num_,
89+
const bool publisher_confirm_)
8990
{
90-
_pimpl_ = new impl ();
91+
_pimpl_ = new impl (publisher_confirm_);
9192
_pimpl_->connect (con_.get_connection_parameters (), num_);
9293
return;
9394
}
@@ -123,11 +124,9 @@ namespace rabbitmq {
123124
void channel::basic_publish (const std::string exchange_,
124125
const std::string routing_key_,
125126
const std::string body_,
126-
const basic_properties & props_,
127-
const bool mandatory_,
128-
const bool immediate_)
127+
const basic_properties & props_)
129128
{
130-
_pimpl_->basic_publish (exchange_, routing_key_, body_, props_, mandatory_, immediate_);
129+
_pimpl_->basic_publish (exchange_, routing_key_, body_, props_);
131130
}
132131

133132
void channel::basic_consume (const std::string queue_,
@@ -324,11 +323,12 @@ namespace rabbitmq {
324323
return props;
325324
}
326325

327-
channel::impl::impl ()
326+
channel::impl::impl (const bool publisher_confirm_)
328327
{
329-
_login_ok_ = false;
330-
_channel_ok_ = false;
331-
_consuming_ = 0;
328+
_login_ok_ = false;
329+
_channel_ok_ = false;
330+
_consuming_ = 0;
331+
_publisher_confirm_ = publisher_confirm_;
332332
}
333333

334334
void channel::impl::connect (const connection_parameters & params_, const unsigned int num_)
@@ -379,6 +379,12 @@ namespace rabbitmq {
379379
if (reply.reply_type != AMQP_RESPONSE_NORMAL) {
380380
throw ::rabbitmq::exception ("Unable to create Rabbit channel");
381381
}
382+
if (_publisher_confirm_) {
383+
amqp_confirm_select_ok_t* ok = amqp_confirm_select (_amqp_con_, _amqp_ch_);
384+
if (ok == NULL) {
385+
throw ::rabbitmq::exception ("Unable to select publisher confirm mode");
386+
}
387+
}
382388
_channel_ok_ = true;
383389
}
384390

@@ -469,28 +475,63 @@ namespace rabbitmq {
469475
void channel::impl::basic_publish (const std::string & exchange_,
470476
const std::string & routing_key_,
471477
const std::string & body_,
472-
const basic_properties & props_,
473-
const bool & mandatory_,
474-
const bool & immediate_)
478+
const basic_properties & props_)
475479
{
476480
if (_channel_ok_) {
477481
amqp_basic_properties_t props = props_to_amqp (props_);
478482
int err = amqp_basic_publish (_amqp_con_,
479483
_amqp_ch_,
480484
str_to_amqp (exchange_),
481485
str_to_amqp (routing_key_),
482-
mandatory_,
483-
immediate_,
486+
_publisher_confirm_, // 'mandatory'
487+
0, // 'immediate' no more supported by amqp
484488
& props,
485489
str_to_amqp (body_));
486490
if (err != AMQP_STATUS_OK) {
487-
throw ::rabbitmq::exception ("basic_publish failure");
488-
// todo precisions
491+
throw ::rabbitmq::exception ("basic_publish failure with code : " + err);
492+
}
493+
if (_publisher_confirm_) {
494+
bool basic_return = false;
495+
amqp_frame_t frame;
496+
while (1) {
497+
// std::clog << "--- confirm steps ---" << std::endl;
498+
err = amqp_simple_wait_frame (_amqp_con_, &frame);
499+
if (err != AMQP_STATUS_OK) {
500+
throw ::rabbitmq::exception ("basic_publish confirm failure with code : " + err);
501+
} else {
502+
if (frame.frame_type == AMQP_FRAME_METHOD) {
503+
if (frame.payload.method.id == AMQP_BASIC_ACK_METHOD) {
504+
// std::clog << "basic_publish confirm1 ACK : channel=" << frame.channel << std::endl;
505+
if (basic_return) {
506+
throw ::rabbitmq::exception ("basic_publish confirm 'return'");
507+
}
508+
break;
509+
} else if (frame.payload.method.id == AMQP_BASIC_RETURN_METHOD) {
510+
basic_return = true;
511+
// std::clog << "basic_publish confirm1 RETURN : channel=" << frame.channel << std::endl;
512+
} else if (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD) {
513+
// std::clog << "basic_publish confirm1 CLOSE CHANNEL : channel=" << frame.channel << std::endl;
514+
throw ::rabbitmq::exception ("basic_publish confirm 'close channel'");
515+
} else if (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD) {
516+
// std::clog << "basic_publish confirm1 CLOSE CONNECTION : channel=" << frame.channel << std::endl;
517+
throw ::rabbitmq::exception ("basic_publish confirm 'close connection'");
518+
} else {
519+
// std::clog << "basic_publish confirm1 METHOD : channel=" << frame.channel << " method_id=" << frame.payload.method.id << std::endl;
520+
throw ::rabbitmq::exception ("basic_publish confirm methode code : " + frame.payload.method.id);
521+
}
522+
}
523+
if (frame.frame_type == AMQP_FRAME_HEADER) {
524+
// std::clog << "basic_publish confirm2 HEADER" << std::endl;
525+
}
526+
if (frame.frame_type == AMQP_FRAME_BODY) {
527+
// std::clog << "basic_publish confirm3 BODY" << std::endl;
528+
}
529+
}
530+
}
489531
}
490532
} else {
491-
throw ::rabbitmq::exception ("no channel for basic_publish");
533+
throw ::rabbitmq::exception ("basic_publish channel isn't ok");
492534
}
493-
// todo return
494535
}
495536

496537
void channel::impl::basic_consume (const std::string & queue_,

src/rabbitmq/channel.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ namespace rabbitmq {
2222

2323
public:
2424
/// Constructor & co
25-
channel (const connection & con_, const unsigned int num_);
25+
channel (const connection & con_,
26+
const unsigned int num_,
27+
const bool publisher_confirm_ = false);
2628
// Ctor & co : forbidden
2729
~channel ();
2830
channel () = delete;
@@ -42,9 +44,7 @@ namespace rabbitmq {
4244
void basic_publish (const std::string exchange_,
4345
const std::string routing_key_,
4446
const std::string body_,
45-
const basic_properties & props_ = basic_properties::default_basic_properties (),
46-
const bool mandatory_ = false,
47-
const bool immediate_ = false);
47+
const basic_properties & props_ = basic_properties::default_basic_properties ());
4848

4949
void basic_consume (const std::string queue_,
5050
const std::string consumer_tag_ = "",

src/rabbitmq/connection.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
namespace rabbitmq {
99

1010

11-
connection::connection (const connection_parameters & params_)
11+
connection::connection (const connection_parameters & params_,
12+
const bool publisher_confirm_)
1213
{
1314
_params_ = params_;
14-
_chan1_.reset (new channel (*this, 1));
15+
_chan1_.reset (new channel (*this, 1, publisher_confirm_));
1516
return;
1617
}
1718

src/rabbitmq/connection.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ namespace rabbitmq {
2525

2626
public:
2727
/// Constructor & co
28-
connection (const connection_parameters & params_);
28+
connection (const connection_parameters & params_,
29+
const bool publisher_confirm_ = false);
2930
// forbidden
3031
connection () = delete;
3132
connection (const connection & con_) = delete;

src/rabbitmq/testing/test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ namespace rabbitmq {
6363
props.set_content_type ("text/plain");
6464
props.set_delivery_mode (2); // persistant
6565
for (int i=0; i<10; i++) {
66-
chan.basic_publish ("", "q_hello", "Hello_World_" + std::to_string (num + i), props, false, false);
66+
chan.basic_publish ("", "q_hello", "Hello_World_" + std::to_string (num + i), props);
6767
}
6868
std::clog << "publish to q_hello ..." << std::endl;
6969
} else {

0 commit comments

Comments
 (0)