From 81439c61be949ebf46ff31d2199a38f7a35d3265 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Wed, 4 Jan 2023 18:31:56 -0700 Subject: [PATCH 01/12] Sketch out a client side muxer with a wildcard subscription --- lib/protobuf/nats/client.rb | 97 +++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index 4291618..01b12d7 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -250,6 +250,103 @@ def nats_request_with_two_responses(subject, data, opts) end end + if false + + def nats_request_with_two_responses(subject, data, opts) + # Wait for the ACK from the server + ack_timeout = opts[:ack_timeout] || 5 + # Wait for the protobuf response + timeout = opts[:timeout] || 60 + + nats = Protobuf::Nats.client_nats_connection + + # Cheap check first before synchronize + unless @resp_sub_prefix + synchronize do + # TODO: This needs to be global, yo. + start_request_muxer! unless @resp_sub_prefix + end + end + + # Publish message with the reply topic pointed at the response muxer. + token = nats.new_inbox + signal = @resp_sub.new_cond + @resp_sub.synchronize do + @resp_map[token][:signal] = signal + end + reply_to = "#{@resp_inbox_prefix}.#{token}" + nats.publish(subject, data, reply_to) + + # Wait for reply + + # Receive the first message + ::MonotonicTime::with_nats_timeout(ack_timeout) do + @resp_sub.synchronize do + signal.wait(ack_timeout) + end + rescue ::NATS::Timeout => e + return :ack_timeout + end + + # Check for a NACK + first_message = @resp_sub.synchronize { @resp_map[token][:response].shift } + return :nack if first_message.data == ::Protobuf::Nats::Messages::NACK + + # Receive the second message + ::MonotonicTime::with_nats_timeout(timeout) do + @resp_sub.synchronize do + signal.wait(timeout) + end + rescue ::NATS::Timeout => e + fail ::Protobuf::Nats::Errors::ResponseTimeout, formatted_service_and_method_name + end + + second_message = @resp_sub.synchronize { @resp_map[token][:response].shift } + + # Check messages + response = case ::Protobuf::Nats::Messages::ACK + when first_message.data then second_message.data + when second_message.data then first_message.data + else return :ack_timeout + end + + response + ensure + cleanup_muxer_topic(topic) if topic + end + + def cleanup_muxer_topic(topic) + @resp_sub.synchronize do + @resp_map.delete(token) + end + end + + def start_request_muxer! + nats = Protobuf::Nats.client_nats_connection + @resp_inbox_prefix = nats.new_inbox + @resp_map = Hash.new { |h,k| h[k] = { } } + @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*") + + Thread.new do + loop do + msg = @resp_sub.pending_queue.pop + next if msg.nil? + @resp_sub.synchronize do + # Decrease pending size since consumed already + @resp_sub.pending_size -= msg.data.size + end + token = msg.subject.split('.').last + + @resp_sub.synchronize do + future = @resp_map[token][:signal] + @resp_map[token][:response] ||= [] + @resp_map[token][:response] << msg + future.signal + end + end + end + end + else def nats_request_with_two_responses(subject, data, opts) From ee14969c52b00091f8fa12531a783e85ce0d8980 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Wed, 4 Jan 2023 19:19:59 -0700 Subject: [PATCH 02/12] Fix up a few things with the new ruby pure client --- lib/protobuf/nats/client.rb | 17 +++++++++----- lib/protobuf/nats/server.rb | 45 ++++++++++++++++++++++++++++++++----- protobuf-nats.gemspec | 2 +- spec/fake_nats_client.rb | 1 + 4 files changed, 53 insertions(+), 12 deletions(-) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index 01b12d7..2cf12ef 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -6,6 +6,9 @@ module Protobuf module Nats class Client < ::Protobuf::Rpc::Connectors::Base + + CLIENT_MUTEX = ::Mutex.new + # Structure to hold subscription and inbox to use within pool SubscriptionInbox = ::Struct.new(:subscription, :inbox) do def swap(sub_inbox) @@ -250,7 +253,7 @@ def nats_request_with_two_responses(subject, data, opts) end end - if false + elsif true def nats_request_with_two_responses(subject, data, opts) # Wait for the ACK from the server @@ -262,7 +265,7 @@ def nats_request_with_two_responses(subject, data, opts) # Cheap check first before synchronize unless @resp_sub_prefix - synchronize do + CLIENT_MUTEX.synchronize do # TODO: This needs to be global, yo. start_request_muxer! unless @resp_sub_prefix end @@ -303,6 +306,9 @@ def nats_request_with_two_responses(subject, data, opts) second_message = @resp_sub.synchronize { @resp_map[token][:response].shift } + require "pry"; binding.pry + + # Check messages response = case ::Protobuf::Nats::Messages::ACK when first_message.data then second_message.data @@ -312,10 +318,9 @@ def nats_request_with_two_responses(subject, data, opts) response ensure - cleanup_muxer_topic(topic) if topic - end + return if @resp_sub.nil? - def cleanup_muxer_topic(topic) + # Clean up @resp_sub.synchronize do @resp_map.delete(token) end @@ -349,6 +354,8 @@ def start_request_muxer! else + fail "barf" + def nats_request_with_two_responses(subject, data, opts) nats = Protobuf::Nats.client_nats_connection inbox = nats.new_inbox diff --git a/lib/protobuf/nats/server.rb b/lib/protobuf/nats/server.rb index da231be..bf4349c 100644 --- a/lib/protobuf/nats/server.rb +++ b/lib/protobuf/nats/server.rb @@ -6,11 +6,45 @@ module Protobuf module Nats + class SuperSubscriptionManager + def initialize(nats, &cb) + # Central queue used by all subscriptions + @pending_queue = ::SizedQueue.new(::NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT) + @subscriptions = [] + @nats = nats + + Thread.new do + loop do + msg = @pending_queue.pop + cb.call(msg.data, msg.reply) + end + end + end + + def queue_subscribe(name) + sub = @nats.subscribe(name, :queue => name) + + # Create a subscription but reset the pending queue to use a central pending queue. + # NOTE: This is a potential race condition. Chances of the round-trip message to an + # existing queue before this queue swap happens seems extremely low, but possible. + sub.pending_queue = @pending_queue + + @subscriptions << sub + + sub + end + + def unsubscribe_all + subscriptions.each { |sub| sub.unsubscribe } + end + end + + class Server include ::Protobuf::Rpc::Server include ::Protobuf::Logging - attr_reader :nats, :thread_pool, :subscriptions + attr_reader :nats, :thread_pool, :subscription_manager MILLISECOND = 1000 @@ -25,7 +59,8 @@ def initialize(options) @thread_pool = ::Protobuf::Nats::ThreadPool.new(@options[:threads], :max_queue => max_queue_size) - @subscriptions = [] + @subscription_manager = SuperSubscriptionManager.new(@nats) do + end @server = options.fetch(:server, ::Socket.gethostname) end @@ -114,7 +149,7 @@ def print_subscription_keys def subscribe_to_services_once with_each_subscription_key do |subscription_key_and_queue| - subscriptions << nats.subscribe(subscription_key_and_queue, :queue => subscription_key_and_queue) do |request_data, reply_id, _subject| + subscription_manager.queue_subscribe(subscription_key_and_queue) do |request_data, reply_id| unless enqueue_request(request_data, reply_id) logger.error { "Thread pool is full! Dropping message for: #{subscription_key_and_queue}" } end @@ -233,9 +268,7 @@ def subscribe def unsubscribe logger.info "Unsubscribing from rpc routes..." - subscriptions.each do |subscription_id| - nats.unsubscribe(subscription_id) - end + subscription_manager.unsubscribe_all end end end diff --git a/protobuf-nats.gemspec b/protobuf-nats.gemspec index 1fff921..61a4b9d 100644 --- a/protobuf-nats.gemspec +++ b/protobuf-nats.gemspec @@ -33,7 +33,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "activesupport", ">= 3.2" spec.add_runtime_dependency "connection_pool" spec.add_runtime_dependency "protobuf", "~> 3.7", ">= 3.7.2" - spec.add_runtime_dependency "nats-pure", "~> 0.3", "< 0.4" + spec.add_runtime_dependency "nats-pure", "~> 2" spec.add_development_dependency "bundler" spec.add_development_dependency "rake", "~> 10.0" diff --git a/spec/fake_nats_client.rb b/spec/fake_nats_client.rb index d514541..2d70aa1 100644 --- a/spec/fake_nats_client.rb +++ b/spec/fake_nats_client.rb @@ -26,6 +26,7 @@ def flush def subscribe(subject, args, &block) subscriptions[subject] = block + ::NATS::Subscription.new end def unsubscribe(*) From f85d4dc42f58c7617bec94dad3cd4f631fb3249f Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Wed, 4 Jan 2023 20:48:05 -0700 Subject: [PATCH 03/12] Fix up tests.. mocks are not great --- lib/protobuf/nats/client.rb | 37 ++++++++++++++++++++++--------------- spec/fake_nats_client.rb | 28 ++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 21 deletions(-) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index 2cf12ef..be7162b 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -283,9 +283,11 @@ def nats_request_with_two_responses(subject, data, opts) # Wait for reply # Receive the first message - ::MonotonicTime::with_nats_timeout(ack_timeout) do - @resp_sub.synchronize do - signal.wait(ack_timeout) + begin + ::NATS::MonotonicTime::with_nats_timeout(ack_timeout) do + @resp_sub.synchronize do + signal.wait(ack_timeout) + end end rescue ::NATS::Timeout => e return :ack_timeout @@ -296,26 +298,28 @@ def nats_request_with_two_responses(subject, data, opts) return :nack if first_message.data == ::Protobuf::Nats::Messages::NACK # Receive the second message - ::MonotonicTime::with_nats_timeout(timeout) do - @resp_sub.synchronize do - signal.wait(timeout) + begin + ::NATS::MonotonicTime::with_nats_timeout(timeout) do + @resp_sub.synchronize do + signal.wait(timeout) + end end - rescue ::NATS::Timeout => e - fail ::Protobuf::Nats::Errors::ResponseTimeout, formatted_service_and_method_name + rescue ::NATS::Timeout + # ignore to raise a repsonse timeout below end - second_message = @resp_sub.synchronize { @resp_map[token][:response].shift } - - require "pry"; binding.pry - + # NOTE: This might be nil, so be careful checking the data value + second_message_data = @resp_sub.synchronize { @resp_map[token][:response].shift }&.data # Check messages response = case ::Protobuf::Nats::Messages::ACK - when first_message.data then second_message.data - when second_message.data then first_message.data + when first_message.data then second_message_data + when second_message_data then first_message.data else return :ack_timeout end + fail(::Protobuf::Nats::Errors::ResponseTimeout, formatted_service_and_method_name) unless response + response ensure return if @resp_sub.nil? @@ -343,6 +347,9 @@ def start_request_muxer! token = msg.subject.split('.').last @resp_sub.synchronize do + # Reject if the token is missing from the request map + next unless @resp_map.key?(token) + future = @resp_map[token][:signal] @resp_map[token][:response] ||= [] @resp_map[token][:response] << msg @@ -354,7 +361,7 @@ def start_request_muxer! else - fail "barf" + fail "no longer using this impl for MRI" def nats_request_with_two_responses(subject, data, opts) nats = Protobuf::Nats.client_nats_connection diff --git a/spec/fake_nats_client.rb b/spec/fake_nats_client.rb index 2d70aa1..1c44527 100644 --- a/spec/fake_nats_client.rb +++ b/spec/fake_nats_client.rb @@ -24,9 +24,13 @@ def publish(*) def flush end - def subscribe(subject, args, &block) - subscriptions[subject] = block - ::NATS::Subscription.new + def subscribe(subject, args = {}, &block) + s = ::NATS::Subscription.new + s.pending_queue = ::SizedQueue.new(1024) + + subscriptions[subject] = {:block => block, :subscription => s } + + s end def unsubscribe(*) @@ -48,9 +52,15 @@ def schedule_messages(messages) Thread.new do begin sleep message.seconds_in_future - block = subscriptions[message.subject] + + sub = subscriptions[message.subject] || + subscriptions[message.subject.split(".").first + ".*"] + + block = sub[:block] block.call(message.data) if block @next_message = message + s = sub[:subscription] + s.pending_queue.push(message) if s.pending_queue rescue => error puts error end @@ -60,8 +70,14 @@ def schedule_messages(messages) end class FakeNackClient < FakeNatsClient - def subscribe(subject, args, &block) - Thread.new { block.call(::Protobuf::Nats::Messages::NACK) } + def subscribe(subject, args = {}, &block) + s = super + + Thread.new { block.call(::Protobuf::Nats::Messages::NACK) } if block + + s.pending_queue.push(NATS::Msg.new(:data => ::Protobuf::Nats::Messages::NACK, :subject => "BASE.#{@inbox}")) + + s end def next_message(_sub, _timeout) From bec0a799092161d53c83009f8b83b0adb2be1c07 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Wed, 4 Jan 2023 22:51:20 -0700 Subject: [PATCH 04/12] Add a working global response muxer --- lib/protobuf/nats/client.rb | 178 +++++++++++++++++++++++------------- lib/protobuf/nats/server.rb | 11 +-- spec/fake_nats_client.rb | 13 ++- 3 files changed, 129 insertions(+), 73 deletions(-) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index be7162b..9f4a60f 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -5,9 +5,114 @@ module Protobuf module Nats + class ResponseMuxerRequest + def initialize(muxer, token, signal) + @muxer = muxer + @token = token + @signal = signal + end + + def publish(subject, data) + @muxer.publish(subject, data, @token) + end + + def next_message(timeout) + @muxer.next_message(@token, timeout) + end + + def cleanup + @muxer.cleanup(@token) + end + end + + class ResponseMuxer + LOCK = ::Mutex.new + + def initialize + @resp_map = Hash.new { |h,k| h[k] = { } } + end + + def cleanup(token) + @resp_sub.synchronize { @resp_map.delete(token) } + end + + def next_message(token, timeout) + ::NATS::MonotonicTime::with_nats_timeout(timeout) do + @resp_sub.synchronize do + break if @resp_map[token].key?(:response) && + !@resp_map[token][:response].empty? + + @resp_map[token][:signal].wait(timeout) + end + end + + @resp_sub.synchronize { @resp_map[token][:response].shift } + end + + def new_request + nats = Protobuf::Nats.client_nats_connection + token = nats.new_inbox.split('.').last + signal = @resp_sub.new_cond + @resp_sub.synchronize do + @resp_map[token][:signal] = signal + end + + ResponseMuxerRequest.new(self, token, signal) + end + + def publish(subject, data, token) + nats = Protobuf::Nats.client_nats_connection + reply_to = "#{@resp_inbox_prefix}.#{token}" + nats.publish(subject, data, reply_to) + end + + def start + return if started? + LOCK.synchronize do + # We check this twice in case another thread was waiting for the lock to + # start this party. + return if started? + + nats = ::Protobuf::Nats.client_nats_connection + return if nats.nil? + + @resp_inbox_prefix = nats.new_inbox + @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*") + @started = true + end + + Thread.new do + loop do + msg = @resp_sub.pending_queue.pop + next if msg.nil? + @resp_sub.synchronize do + # Decrease pending size since consumed already + @resp_sub.pending_size -= msg.data.size + end + token = msg.subject.split('.').last + + @resp_sub.synchronize do + # Reject if the token is missing from the request map + break unless @resp_map.key?(token) + + signal = @resp_map[token][:signal] + @resp_map[token][:response] ||= [] + @resp_map[token][:response] << msg + signal.signal + end + end + end + end + + def started? + !!@started + end + end + class Client < ::Protobuf::Rpc::Connectors::Base CLIENT_MUTEX = ::Mutex.new + RESPONSE_MUXER = ResponseMuxer.new # Structure to hold subscription and inbox to use within pool SubscriptionInbox = ::Struct.new(:subscription, :inbox) do @@ -39,6 +144,9 @@ def initialize(options) # This will ensure the client is started. ::Protobuf::Nats.start_client_nats_connection + + # Ensure the response muxer is started + RESPONSE_MUXER.start end def new_subscription_inbox @@ -263,53 +371,29 @@ def nats_request_with_two_responses(subject, data, opts) nats = Protobuf::Nats.client_nats_connection - # Cheap check first before synchronize - unless @resp_sub_prefix - CLIENT_MUTEX.synchronize do - # TODO: This needs to be global, yo. - start_request_muxer! unless @resp_sub_prefix - end - end - # Publish message with the reply topic pointed at the response muxer. - token = nats.new_inbox - signal = @resp_sub.new_cond - @resp_sub.synchronize do - @resp_map[token][:signal] = signal - end - reply_to = "#{@resp_inbox_prefix}.#{token}" - nats.publish(subject, data, reply_to) - - # Wait for reply + req = RESPONSE_MUXER.new_request + req.publish(subject, data) # Receive the first message begin - ::NATS::MonotonicTime::with_nats_timeout(ack_timeout) do - @resp_sub.synchronize do - signal.wait(ack_timeout) - end - end + first_message = req.next_message(ack_timeout) rescue ::NATS::Timeout => e return :ack_timeout end # Check for a NACK - first_message = @resp_sub.synchronize { @resp_map[token][:response].shift } return :nack if first_message.data == ::Protobuf::Nats::Messages::NACK # Receive the second message begin - ::NATS::MonotonicTime::with_nats_timeout(timeout) do - @resp_sub.synchronize do - signal.wait(timeout) - end - end + second_message = req.next_message(timeout) rescue ::NATS::Timeout # ignore to raise a repsonse timeout below end # NOTE: This might be nil, so be careful checking the data value - second_message_data = @resp_sub.synchronize { @resp_map[token][:response].shift }&.data + second_message_data = second_message&.data # Check messages response = case ::Protobuf::Nats::Messages::ACK @@ -322,41 +406,7 @@ def nats_request_with_two_responses(subject, data, opts) response ensure - return if @resp_sub.nil? - - # Clean up - @resp_sub.synchronize do - @resp_map.delete(token) - end - end - - def start_request_muxer! - nats = Protobuf::Nats.client_nats_connection - @resp_inbox_prefix = nats.new_inbox - @resp_map = Hash.new { |h,k| h[k] = { } } - @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*") - - Thread.new do - loop do - msg = @resp_sub.pending_queue.pop - next if msg.nil? - @resp_sub.synchronize do - # Decrease pending size since consumed already - @resp_sub.pending_size -= msg.data.size - end - token = msg.subject.split('.').last - - @resp_sub.synchronize do - # Reject if the token is missing from the request map - next unless @resp_map.key?(token) - - future = @resp_map[token][:signal] - @resp_map[token][:response] ||= [] - @resp_map[token][:response] << msg - future.signal - end - end - end + req.cleanup if req end else diff --git a/lib/protobuf/nats/server.rb b/lib/protobuf/nats/server.rb index bf4349c..bed48d9 100644 --- a/lib/protobuf/nats/server.rb +++ b/lib/protobuf/nats/server.rb @@ -59,7 +59,10 @@ def initialize(options) @thread_pool = ::Protobuf::Nats::ThreadPool.new(@options[:threads], :max_queue => max_queue_size) - @subscription_manager = SuperSubscriptionManager.new(@nats) do + @subscription_manager = SuperSubscriptionManager.new(@nats) do |request_data, reply_id| + unless enqueue_request(request_data, reply_id) + logger.error { "Thread pool is full! Dropping message for: #{subscription_key_and_queue}" } + end end @server = options.fetch(:server, ::Socket.gethostname) end @@ -149,11 +152,7 @@ def print_subscription_keys def subscribe_to_services_once with_each_subscription_key do |subscription_key_and_queue| - subscription_manager.queue_subscribe(subscription_key_and_queue) do |request_data, reply_id| - unless enqueue_request(request_data, reply_id) - logger.error { "Thread pool is full! Dropping message for: #{subscription_key_and_queue}" } - end - end + subscription_manager.queue_subscribe(subscription_key_and_queue) end end diff --git a/spec/fake_nats_client.rb b/spec/fake_nats_client.rb index 1c44527..a7b390c 100644 --- a/spec/fake_nats_client.rb +++ b/spec/fake_nats_client.rb @@ -70,12 +70,19 @@ def schedule_messages(messages) end class FakeNackClient < FakeNatsClient + def publish(*) + subscriptions.each do |_key, sub| + s = sub[:subscription] + s.pending_queue.push(NATS::Msg.new(:data => ::Protobuf::Nats::Messages::NACK, :subject => "BASE.#{@inbox}")) + end + end + def subscribe(subject, args = {}, &block) s = super - Thread.new { block.call(::Protobuf::Nats::Messages::NACK) } if block - - s.pending_queue.push(NATS::Msg.new(:data => ::Protobuf::Nats::Messages::NACK, :subject => "BASE.#{@inbox}")) + Thread.new do + block.call(::Protobuf::Nats::Messages::NACK) if block + end s end From 73f0fa0de4393851d28b5091b93fb868c2aa04a7 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Thu, 5 Jan 2023 06:52:10 -0700 Subject: [PATCH 05/12] Reset the muxer between each test (as the client changes) --- lib/protobuf/nats/client.rb | 21 +++++++++++++++------ spec/spec_helper.rb | 4 +++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index 9f4a60f..c8235df 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -6,10 +6,9 @@ module Protobuf module Nats class ResponseMuxerRequest - def initialize(muxer, token, signal) + def initialize(muxer, token) @muxer = muxer @token = token - @signal = signal end def publish(subject, data) @@ -52,12 +51,11 @@ def next_message(token, timeout) def new_request nats = Protobuf::Nats.client_nats_connection token = nats.new_inbox.split('.').last - signal = @resp_sub.new_cond @resp_sub.synchronize do - @resp_map[token][:signal] = signal + @resp_map[token][:signal] = @resp_sub.new_cond end - ResponseMuxerRequest.new(self, token, signal) + ResponseMuxerRequest.new(self, token) end def publish(subject, data, token) @@ -66,6 +64,17 @@ def publish(subject, data, token) nats.publish(subject, data, reply_to) end + def restart + start unless started? + + LOCK.synchronize do + @resp_handler&.kill + @started = false + end + + start + end + def start return if started? LOCK.synchronize do @@ -81,7 +90,7 @@ def start @started = true end - Thread.new do + @resp_handler = Thread.new do loop do msg = @resp_sub.pending_queue.pop next if msg.nil? diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6a0f2e3..40afe57 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -17,6 +17,8 @@ end config.before(:each) do - allow(Protobuf::Nats).to receive(:start_client_nats_connection) + allow(::Protobuf::Nats).to receive(:start_client_nats_connection) + + ::Protobuf::Nats::Client::RESPONSE_MUXER.restart end end From fbec8360fc62b49b084f8e627157da4a374c24c9 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Thu, 5 Jan 2023 06:55:26 -0700 Subject: [PATCH 06/12] Add error handler when client response muxer handler fails --- lib/protobuf/nats/client.rb | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index c8235df..c54a945 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -91,23 +91,28 @@ def start end @resp_handler = Thread.new do - loop do - msg = @resp_sub.pending_queue.pop - next if msg.nil? - @resp_sub.synchronize do - # Decrease pending size since consumed already - @resp_sub.pending_size -= msg.data.size - end - token = msg.subject.split('.').last + begin + loop do + msg = @resp_sub.pending_queue.pop + next if msg.nil? + @resp_sub.synchronize do + # Decrease pending size since consumed already + @resp_sub.pending_size -= msg.data.size + end + token = msg.subject.split('.').last - @resp_sub.synchronize do - # Reject if the token is missing from the request map - break unless @resp_map.key?(token) + @resp_sub.synchronize do + # Reject if the token is missing from the request map + break unless @resp_map.key?(token) - signal = @resp_map[token][:signal] - @resp_map[token][:response] ||= [] - @resp_map[token][:response] << msg - signal.signal + signal = @resp_map[token][:signal] + @resp_map[token][:response] ||= [] + @resp_map[token][:response] << msg + signal.signal + end + rescue => error + ::Protobuf::Nats.notify_error_callbacks(error) + LOCK.synchronize { @started = false } end end end From 010028da45ed436ffa4c1078d5fe1e04c644648f Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Thu, 5 Jan 2023 07:19:00 -0700 Subject: [PATCH 07/12] Fork jruby/mri code as needed and remove old mri client impl --- lib/protobuf/nats/client.rb | 55 +------------------------------------ lib/protobuf/nats/server.rb | 35 +++++++++++++++-------- 2 files changed, 25 insertions(+), 65 deletions(-) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index c54a945..d913ccb 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -375,7 +375,7 @@ def nats_request_with_two_responses(subject, data, opts) end end - elsif true + else def nats_request_with_two_responses(subject, data, opts) # Wait for the ACK from the server @@ -423,59 +423,6 @@ def nats_request_with_two_responses(subject, data, opts) req.cleanup if req end - else - - fail "no longer using this impl for MRI" - - def nats_request_with_two_responses(subject, data, opts) - nats = Protobuf::Nats.client_nats_connection - inbox = nats.new_inbox - lock = ::Monitor.new - received = lock.new_cond - messages = [] - first_message = nil - second_message = nil - response = nil - - sid = nats.subscribe(inbox, :max => 2) do |message, _, _| - lock.synchronize do - messages << message - received.signal - end - end - - lock.synchronize do - # Publish to server - nats.publish(subject, data, inbox) - - # Wait for the ACK from the server - ack_timeout = opts[:ack_timeout] || 5 - received.wait(ack_timeout) if messages.empty? - first_message = messages.shift - - return :ack_timeout if first_message.nil? - return :nack if first_message == ::Protobuf::Nats::Messages::NACK - - # Wait for the protobuf response - timeout = opts[:timeout] || 60 - received.wait(timeout) if messages.empty? - second_message = messages.shift - end - - response = case ::Protobuf::Nats::Messages::ACK - when first_message then second_message - when second_message then first_message - else return :ack_timeout - end - - fail(::Protobuf::Nats::Errors::ResponseTimeout, formatted_service_and_method_name) unless response - - response - ensure - # Ensure we don't leave a subscription sitting around. - nats.unsubscribe(sid) if response.nil? - end - end end diff --git a/lib/protobuf/nats/server.rb b/lib/protobuf/nats/server.rb index bed48d9..edb0369 100644 --- a/lib/protobuf/nats/server.rb +++ b/lib/protobuf/nats/server.rb @@ -12,34 +12,47 @@ def initialize(nats, &cb) @pending_queue = ::SizedQueue.new(::NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT) @subscriptions = [] @nats = nats + @callback = cb - Thread.new do + # For MRI, reroute the pending queue to the callback + @pending_queue_handler = Thread.new do loop do msg = @pending_queue.pop - cb.call(msg.data, msg.reply) + @callback.call(msg.data, msg.reply) end end end def queue_subscribe(name) - sub = @nats.subscribe(name, :queue => name) + if defined? JRUBY_VERSION + @subscriptions << @nats.subscribe(name, :queue => name) do |request_data, reply_id| + @callback.call(request_data, reply_id) + end + else + sub = @nats.subscribe(name, :queue => name) - # Create a subscription but reset the pending queue to use a central pending queue. - # NOTE: This is a potential race condition. Chances of the round-trip message to an - # existing queue before this queue swap happens seems extremely low, but possible. - sub.pending_queue = @pending_queue + # Create a subscription but reset the pending queue to use a central pending queue. + # NOTE: This is a potential race condition. Chances of the round-trip message to an + # existing queue before this queue swap happens seems extremely low, but possible. + sub.pending_queue = @pending_queue - @subscriptions << sub + @subscriptions << sub - sub + sub + end end def unsubscribe_all - subscriptions.each { |sub| sub.unsubscribe } + if defined? JRUBY_VERSION + subscriptions.each do |subscription_id| + @nats.unsubscribe(subscription_id) + end + else + subscriptions.each { |sub| sub.unsubscribe } + end end end - class Server include ::Protobuf::Rpc::Server include ::Protobuf::Logging From 349744b617e45e5f00e80307d12849f0fdcbe141 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Thu, 5 Jan 2023 07:41:34 -0700 Subject: [PATCH 08/12] Fix bugs + move platform to specific file The MRI client is working great for both jruby and mri --- lib/protobuf/nats.rb | 3 ++- lib/protobuf/nats/client.rb | 3 ++- lib/protobuf/nats/platform.rb | 14 ++++++++++++++ lib/protobuf/nats/server.rb | 8 ++++---- spec/protobuf/nats/jnats_spec.rb | 2 +- 5 files changed, 23 insertions(+), 7 deletions(-) create mode 100644 lib/protobuf/nats/platform.rb diff --git a/lib/protobuf/nats.rb b/lib/protobuf/nats.rb index 82c0e9c..84475e2 100644 --- a/lib/protobuf/nats.rb +++ b/lib/protobuf/nats.rb @@ -6,6 +6,7 @@ require "nats/io/client" +require "protobuf/nats/platform" require "protobuf/nats/errors" require "protobuf/nats/client" require "protobuf/nats/server" @@ -23,7 +24,7 @@ module Messages NACK = "\2".freeze end - NatsClient = if defined? JRUBY_VERSION + NatsClient = if jruby? require "protobuf/nats/jnats" ::Protobuf::Nats::JNats else diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index d913ccb..fb0dadb 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -1,5 +1,6 @@ require "connection_pool" require "protobuf/nats" +require "protobuf/nats/platform" require "protobuf/rpc/connectors/base" require "monitor" @@ -320,7 +321,7 @@ def formatted_service_and_method_name # The Java nats client offers better message queueing so we're going to use # that over locking ourselves. This split in code isn't great, but we can # refactor this later. - if defined? JRUBY_VERSION + if ::Protobuf::Nats.jruby? # This is a request that expects two responses. # 1. An ACK from the server. We use a shorter timeout. diff --git a/lib/protobuf/nats/platform.rb b/lib/protobuf/nats/platform.rb new file mode 100644 index 0000000..c606c32 --- /dev/null +++ b/lib/protobuf/nats/platform.rb @@ -0,0 +1,14 @@ +module Protobuf + module Nats + def self.jruby? + return false if jnats_disabled? + + defined? JRUBY_VERSION + end + + def self.jnats_disabled? + !!ENV["PB_NATS_DISABLE_JNATS"] + end + end +end + diff --git a/lib/protobuf/nats/server.rb b/lib/protobuf/nats/server.rb index edb0369..922bb8d 100644 --- a/lib/protobuf/nats/server.rb +++ b/lib/protobuf/nats/server.rb @@ -24,7 +24,7 @@ def initialize(nats, &cb) end def queue_subscribe(name) - if defined? JRUBY_VERSION + if ::Protobuf::Nats.jruby? @subscriptions << @nats.subscribe(name, :queue => name) do |request_data, reply_id| @callback.call(request_data, reply_id) end @@ -43,12 +43,12 @@ def queue_subscribe(name) end def unsubscribe_all - if defined? JRUBY_VERSION - subscriptions.each do |subscription_id| + if ::Protobuf::Nats.jruby? + @subscriptions.each do |subscription_id| @nats.unsubscribe(subscription_id) end else - subscriptions.each { |sub| sub.unsubscribe } + @subscriptions.each { |sub| sub.unsubscribe } end end end diff --git a/spec/protobuf/nats/jnats_spec.rb b/spec/protobuf/nats/jnats_spec.rb index 7a02e27..6d8579c 100644 --- a/spec/protobuf/nats/jnats_spec.rb +++ b/spec/protobuf/nats/jnats_spec.rb @@ -1,6 +1,6 @@ require "rspec" -if defined?(JRUBY_VERSION) +if ::Protobuf::Nats.jruby? require "protobuf/nats/jnats" describe ::Protobuf::Nats::JNats do From 3e7ced5d1be449538d7ba2b0eba3876800943098 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Thu, 5 Jan 2023 07:51:55 -0700 Subject: [PATCH 09/12] Add new flag to readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 7b60f9d..4fc4479 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ used to allow JVM based servers to warm-up slowly to prevent jolts in runtime pe `PB_NATS_CLIENT_SUBSCRIPTION_POOL_SIZE` - If subscription pooling is desired for the request/response cycle then the pool size maximum should be set; the pool is lazy and therefore will only start new subscriptions as necessary (default: 0) +`PB_NATS_DISABLE_JNATS` - Disable the default jruby jnats client on the jruby platform, use the nats-pure.rb client instead (default: false). + `PROTOBUF_NATS_CONFIG_PATH` - Custom path to the config yaml (default: "config/protobuf_nats.yml"). ### YAML Config From 4684de8523fcfae5ea5060ed57240ff299fd479e Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Thu, 5 Jan 2023 09:58:58 -0700 Subject: [PATCH 10/12] Remove dead code --- lib/protobuf/nats/client.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index fb0dadb..c1a2a23 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -126,7 +126,6 @@ def started? class Client < ::Protobuf::Rpc::Connectors::Base - CLIENT_MUTEX = ::Mutex.new RESPONSE_MUXER = ResponseMuxer.new # Structure to hold subscription and inbox to use within pool From dafddfcd8662ef1376a782db4f7ef2e8fa4b158a Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Fri, 6 Jan 2023 16:17:30 -0700 Subject: [PATCH 11/12] Add a pre-release to the branch --- lib/protobuf/nats/version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/protobuf/nats/version.rb b/lib/protobuf/nats/version.rb index 582b4f5..b79b36a 100644 --- a/lib/protobuf/nats/version.rb +++ b/lib/protobuf/nats/version.rb @@ -1,5 +1,5 @@ module Protobuf module Nats - VERSION = "0.10.4" + VERSION = "0.11.0.pre0" end end From 1f2e9a0db328930db1b37ea5e3b0df0bb25fca3d Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Fri, 6 Jan 2023 16:19:06 -0700 Subject: [PATCH 12/12] Bump to an unused pre-release version --- lib/protobuf/nats/version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/protobuf/nats/version.rb b/lib/protobuf/nats/version.rb index b79b36a..07d55b9 100644 --- a/lib/protobuf/nats/version.rb +++ b/lib/protobuf/nats/version.rb @@ -1,5 +1,5 @@ module Protobuf module Nats - VERSION = "0.11.0.pre0" + VERSION = "0.12.0.pre0" end end