-
Notifications
You must be signed in to change notification settings - Fork 74
use PortManagementSupport mixin to reserve port in #register #238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |||||||||||||||
| require "logstash/util/socket_peer" | ||||||||||||||||
| require "logstash-input-tcp_jars" | ||||||||||||||||
| require 'logstash/plugin_mixins/ecs_compatibility_support' | ||||||||||||||||
| require 'logstash/plugin_mixins/port_management_support' | ||||||||||||||||
|
|
||||||||||||||||
| require "socket" | ||||||||||||||||
| require "openssl" | ||||||||||||||||
|
|
@@ -68,6 +69,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base | |||||||||||||||
| # ecs_compatibility option, provided by Logstash core or the support adapter. | ||||||||||||||||
| include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) | ||||||||||||||||
|
|
||||||||||||||||
| include LogStash::PluginMixins::PortManagementSupport | ||||||||||||||||
|
|
||||||||||||||||
| config_name "tcp" | ||||||||||||||||
|
|
||||||||||||||||
| default :codec, "line" | ||||||||||||||||
|
|
@@ -177,15 +180,20 @@ def register | |||||||||||||||
| validate_ssl_config! | ||||||||||||||||
|
|
||||||||||||||||
| if server? | ||||||||||||||||
| @loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context) | ||||||||||||||||
| @port_reservation = port_management.reserve(addr: @host, port: @port) do |reserved_addr, reserved_port| | ||||||||||||||||
| @loop = InputLoop.new(@id, reserved_addr, reserved_port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context) | ||||||||||||||||
| end | ||||||||||||||||
| end | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| def run(output_queue) | ||||||||||||||||
| @output_queue = output_queue | ||||||||||||||||
| if server? | ||||||||||||||||
| @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}", :ssl_enabled => @ssl_enabled) | ||||||||||||||||
| @loop.run | ||||||||||||||||
| @port_reservation.convert do |reserved_addr, reserved_port| | ||||||||||||||||
| @logger.info("Starting tcp input listener", :address => "#{reserved_addr}:#{reserved_port}", :ssl_enabled => @ssl_enabled) | ||||||||||||||||
| @loop.start | ||||||||||||||||
| end | ||||||||||||||||
| @loop.wait_until_closed | ||||||||||||||||
|
Comment on lines
+192
to
+196
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we just do:
Suggested change
Essentially tell the global manager:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In The point of allowing the caller to execute a block while the global lock is held is to ensure that some other plugin can't use this library to create another reservation in the window between the dummy server being shut down and the caller standing up something to replace it. If we were to add a second layer of locking (e.g., each reservation having its own mutex), we could do the bind outside of the global lock but we would introduce complexity around the race conditions. |
||||||||||||||||
| else | ||||||||||||||||
| run_client() | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,19 +21,27 @@ | |
| #Cabin::Channel.get(LogStash).level = :debug | ||
| describe LogStash::Inputs::Tcp, :ecs_compatibility_support do | ||
|
|
||
| def get_port | ||
| begin | ||
| # Start high to better avoid common services | ||
| port = rand(10000..65535) | ||
| s = TCPServer.new("127.0.0.1", port) | ||
| s.close | ||
| return port | ||
| rescue Errno::EADDRINUSE | ||
| retry | ||
| end | ||
| ## | ||
| # yield the block with a port that is available | ||
| # @return [Integer]: a port that is available | ||
| def find_available_port | ||
| with_bound_port(&:itself) | ||
| end | ||
|
|
||
| ## | ||
| # Yields block with a port that is unavailable | ||
| # @yieldparam port [Integer] | ||
| # @yieldreturn [Object] | ||
| # @return [Object] | ||
| def with_bound_port(port=0, &block) | ||
| server = TCPServer.new("::", port) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tests used to focus on ipv4 and while I'm sure we also want to move tests to ipv6 not sure if this PR is the right place to do it. |
||
|
|
||
| return yield(server.local_address.ip_port) | ||
| ensure | ||
| server.close | ||
| end | ||
|
|
||
| let(:port) { get_port } | ||
| let(:port) { find_available_port } | ||
|
|
||
| context "codec (PR #1372)" do | ||
| it "switches from plain to line" do | ||
|
|
@@ -373,6 +381,14 @@ def get_port | |
| expect { subject.register }.to_not raise_error | ||
| end | ||
|
|
||
| context "when the port is unavailable" do | ||
| it 'raises a helpful exception' do | ||
| with_bound_port(port) do |unavailable_port| | ||
| expect { subject.register }.to raise_error(Errno::EADDRINUSE) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| context "when using ssl" do | ||
| let(:config) do | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |||||
| /** | ||||||
| * Plain TCP Server Implementation. | ||||||
| */ | ||||||
| public final class InputLoop implements Runnable, Closeable { | ||||||
| public final class InputLoop implements Closeable { | ||||||
|
|
||||||
| // historically this class was passing around the plugin's logger | ||||||
| private static final Logger logger = LogManager.getLogger("logstash.inputs.tcp"); | ||||||
|
|
@@ -46,6 +46,11 @@ public final class InputLoop implements Runnable, Closeable { | |||||
| */ | ||||||
| private final ServerBootstrap serverBootstrap; | ||||||
|
|
||||||
| /** | ||||||
| * The channel after starting | ||||||
| */ | ||||||
| private volatile Channel channel; | ||||||
|
|
||||||
| /** | ||||||
| * SSL configuration. | ||||||
| */ | ||||||
|
|
@@ -82,11 +87,26 @@ public InputLoop(final String id, final String host, final int port, final Decod | |||||
| .childHandler(new InputLoop.InputHandler(decoder, sslContext)); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void run() { | ||||||
| public synchronized void start() { | ||||||
| if (channel != null) { | ||||||
| throw new IllegalStateException("Already started"); | ||||||
| } | ||||||
| try { | ||||||
| serverBootstrap.bind(host, port).sync().channel().closeFuture().sync(); | ||||||
| } catch (final InterruptedException ex) { | ||||||
| channel = serverBootstrap.bind(host, port).sync().channel(); | ||||||
| }catch (final InterruptedException ex) { | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| throw new IllegalStateException(ex); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| public void waitUntilClosed() { | ||||||
| synchronized (this) { | ||||||
| if (channel == null) { | ||||||
| throw new IllegalStateException("not started"); | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| } | ||||||
| } | ||||||
| try { | ||||||
| channel.closeFuture().sync(); | ||||||
| }catch (final InterruptedException ex) { | ||||||
| throw new IllegalStateException(ex); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to use blocks here since we're not wrapping behavior:
Also we should set the reservation scope for the port alone. Not sure if it's worth differentiating the addr, we can be conservative here and allow the port to be reserved regardless of the addr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are.
PortManagementSupport::Reservation#initializereleases the reservation if an exception is raised by the block.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to spawn the server that effectively holds the reservation, we need to know the addr to bind to, so really we are reserving an addr:port pair, not just the port.