Skip to content

Commit 0b80688

Browse files
committed
Add confirmed publishing.
1 parent 2e9e00b commit 0b80688

File tree

10 files changed

+37
-21
lines changed

10 files changed

+37
-21
lines changed

.github/workflows/downstream.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ name: Downstream
22

33
on:
44
workflow_dispatch:
5-
branches:
6-
- trunk
75
push:
86
branches:
97
- trunk

.github/workflows/tests.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ jobs:
99
- uses: actions/checkout@v1
1010
- uses: ruby/setup-ruby@v1
1111
with:
12-
ruby-version: 2.6.3
12+
ruby-version: 2.6.9
1313
- name: Cache Gems
14-
uses: actions/cache@v1
14+
uses: actions/cache@v4
1515
with:
1616
path: vendor/bundle
17-
key: ${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }}
17+
key: ${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }}
1818
restore-keys: |
19-
${{ runner.os }}-acapi-2.6.3-${{ hashFiles('**/Gemfile.lock') }}
19+
${{ runner.os }}-acapi-2.6.9-${{ hashFiles('**/Gemfile.lock') }}
2020
- name: Install Gems
2121
run: |
2222
sudo gem install bundler -v '1.17.3'

.ruby-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.6.6
1+
2.6.9

Gemfile.lock

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ GEM
5555
timers (~> 4.0.0)
5656
coderay (1.1.0)
5757
concurrent-ruby (1.1.6)
58-
crass (1.0.4)
58+
crass (1.0.6)
5959
diff-lcs (1.3)
6060
erubis (2.7.0)
6161
ffi (1.10.0)
@@ -85,20 +85,21 @@ GEM
8585
celluloid (~> 0.16.0)
8686
rb-fsevent (>= 0.9.3)
8787
rb-inotify (>= 0.9)
88-
loofah (2.2.2)
88+
loofah (2.25.0)
8989
crass (~> 1.0.2)
90-
nokogiri (>= 1.5.9)
90+
nokogiri (>= 1.12.0)
9191
lumberjack (1.0.9)
9292
mail (2.6.3)
9393
mime-types (>= 1.16, < 3)
9494
method_source (0.8.2)
9595
mime-types (2.99.3)
96-
mini_portile2 (2.4.0)
96+
mini_portile2 (2.8.9)
9797
minitest (5.14.0)
9898
multi_json (1.13.1)
9999
nenv (0.2.0)
100-
nokogiri (1.9.1)
101-
mini_portile2 (~> 2.4.0)
100+
nokogiri (1.13.10)
101+
mini_portile2 (~> 2.8.0)
102+
racc (~> 1.4)
102103
notiffany (0.0.6)
103104
nenv (~> 0.1)
104105
shellany (~> 0.0)
@@ -111,6 +112,7 @@ GEM
111112
pry-remote (0.1.8)
112113
pry (~> 0.9)
113114
slop (~> 3.0)
115+
racc (1.8.1)
114116
rack (1.6.9)
115117
rack-test (0.6.3)
116118
rack (>= 1.0)

lib/acapi/amqp/client.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ def initialize(chan, q)
1212
@argument_errors = []
1313
@bad_argument_queue = "acapi.error.middleware.service.bad_arguments"
1414
@processing_failed_queue = "acapi.error.middleware.service.processing_failed"
15+
@republish_channel = @channel.connection.create_channel
16+
@republish_channel.confirm_select
17+
@republish_queue = @republish_channel.queue(@queue.name, @queue.options)
1518
@exit_after_work = false
1619
end
1720

@@ -102,7 +105,8 @@ def subscribe(opts = {})
102105
publish_processing_failed(delivery_info, properties, payload, e)
103106
else
104107
new_properties = redelivery_properties(existing_retry_count, delivery_info, properties)
105-
queue.publish(payload, new_properties)
108+
@republish_queue.publish(payload, new_properties)
109+
@republish_channel.wait_for_confirms
106110
channel.acknowledge(delivery_info.delivery_tag, false)
107111
end
108112
rescue => e

lib/acapi/amqp/requestor.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@ def initialize(conn)
1111
def request(properties, payload, timeout = 15)
1212
delivery_info, r_props, r_payload = [nil, nil, nil]
1313
channel = @connection.create_channel
14+
p_channel = @connection.create_channel
1415
temp_queue = channel.queue("", :exclusive => true)
1516
channel.prefetch(1)
16-
request_exchange = channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true)
17+
p_channel.confirm_select
18+
request_exchange = p_channel.fanout(Rails.application.config.acapi.remote_request_exchange, :durable => true)
1719
request_exchange.publish(payload, properties.dup.merge({ :reply_to => temp_queue.name, :persistent => true }))
20+
p_channel.wait_for_confirms
1821
delivery_info, r_props, r_payload = [nil, nil, nil]
1922
begin
2023
Timeout::timeout(timeout) do
@@ -26,6 +29,7 @@ def request(properties, payload, timeout = 15)
2629
end
2730
ensure
2831
temp_queue.delete
32+
p_channel.close
2933
channel.close
3034
end
3135
[delivery_info, r_props, r_payload]

lib/acapi/amqp/responder.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ module Amqp
33
module Responder
44
def with_response_exchange(connection)
55
channel = connection.create_channel
6+
channel.confirm_select
67
publish_exchange = channel.default_exchange
78
yield publish_exchange
9+
channel.wait_for_confirms
810
channel.close
911
end
1012
end

lib/acapi/local_amqp_publisher.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ def log(name, started, finished, unique_id, data = {})
6161
end
6262
msg = Acapi::Amqp::OutMessage.new(@app_id, name, finished, finished, unique_id, data)
6363
@exchange.publish(*msg.to_message_properties)
64+
@p_channel.wait_for_confirms
6465
end
6566

6667
def open_connection_if_needed
@@ -69,7 +70,9 @@ def open_connection_if_needed
6970
@connection.start
7071
@channel = @connection.create_channel
7172
@queue = @channel.queue(QUEUE_NAME, {:durable => true})
72-
@exchange = @channel.fanout(EXCHANGE_NAME, {:durable => true})
73+
@p_channel = @connection.create_channel
74+
@p_channel.confirm_select
75+
@exchange = @p_channel.fanout(EXCHANGE_NAME, {:durable => true})
7376
@queue.bind(@exchange, {})
7477
end
7578

lib/acapi/requestor.rb

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ def request(req_name, payload,timeout=1)
3232
end
3333

3434
def open_connection_for_request
35-
if !@connection
36-
@connection = Bunny.new(@uri, :heartbeat => 15)
37-
@connection.start
38-
end
35+
return if @connection.present? && @connection.connected?
36+
@connection = Bunny.new(@uri, :heartbeat => 15)
37+
@connection.start
3938
end
4039

4140
def reconnect!

spec/lib/acapi/local_amqp_publisher_spec.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
allow(session).to receive(:create_channel).and_return(channel)
3434
allow(channel).to receive(:queue).with(forwarding_queue_name, {:durable => true}).and_return(queue)
3535
allow(channel).to receive(:fanout).with(forwarding_exchange_name, {:durable => true}).and_return(exchange)
36+
allow(channel).to receive(:confirm_select)
37+
allow(channel).to receive(:wait_for_confirms)
3638
allow(queue).to receive(:bind).with(exchange, {})
3739
end
3840

@@ -123,10 +125,12 @@
123125
it "supports reconnection for after_fork" do
124126
#publish to force the connection
125127
allow(exchange).to receive(:publish)
128+
allow(channel).to receive(:confirm_select)
129+
allow(channel).to receive(:wait_for_confirms)
126130
expect(session).to receive(:close)
127131
expect(Bunny).to receive(:new).and_return(session)
128132
expect(session).to receive(:start)
129-
expect(session).to receive(:create_channel).and_return(channel)
133+
allow(session).to receive(:create_channel).and_return(channel)
130134
expect(channel).to receive(:queue).with(forwarding_queue_name, {:durable=> true}).and_return(queue)
131135
expect(channel).to receive(:fanout).with(forwarding_exchange_name, {:durable => true}).and_return(exchange)
132136
expect(queue).to receive(:bind).with(exchange, {})

0 commit comments

Comments
 (0)