Skip to content

Commit 3bb5a15

Browse files
authored
Merge pull request #193 from mashhurs/elastic-transport-support-3.x
Elastic transport support 3.x
2 parents e01e64d + d857116 commit 3bb5a15

File tree

5 files changed

+67
-11
lines changed

5 files changed

+67
-11
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.17.1
2+
- Add elastic-transport client support used in elasticsearch-ruby 8.x [#193](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/193)
3+
14
## 3.17.0
25
- Added support for custom headers [#190](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/190)
36

lib/logstash/filters/elasticsearch.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
require "monitor"
88

99
require_relative "elasticsearch/client"
10-
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
1110

1211
class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
1312
config_name "elasticsearch"
@@ -183,6 +182,9 @@ def register
183182

184183
test_connection!
185184
setup_serverless
185+
if get_client.es_transport_client_type == "elasticsearch_transport"
186+
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
187+
end
186188
end # def register
187189

188190
def filter(event)

lib/logstash/filters/elasticsearch/client.rb

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
# encoding: utf-8
22
require "elasticsearch"
33
require "base64"
4-
require "elasticsearch/transport/transport/http/manticore"
54

65

76
module LogStash
87
module Filters
98
class ElasticsearchClient
109

1110
attr_reader :client
11+
attr_reader :es_transport_client_type
1212

1313
BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
1414
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
@@ -44,7 +44,7 @@ def initialize(logger, hosts, options = {})
4444

4545
client_options = {
4646
hosts: hosts,
47-
transport_class: ::Elasticsearch::Transport::Transport::HTTP::Manticore,
47+
transport_class: get_transport_client_class,
4848
transport_options: transport_options,
4949
ssl: ssl_options,
5050
retry_on_failure: options[:retry_on_failure],
@@ -98,6 +98,20 @@ def setup_api_key(api_key)
9898
token = ::Base64.strict_encode64(api_key.value)
9999
{ 'Authorization' => "ApiKey #{token}" }
100100
end
101+
102+
def get_transport_client_class
103+
# LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport`
104+
# And now `elasticsearch-transport` is old, instead we have `elastic-transport`.
105+
# LS-core updated `elasticsearch` > 8: https://github.com/elastic/logstash/pull/17161
106+
# Following source bits are for the compatibility to support both `elasticsearch-transport` and `elastic-transport` gems
107+
require "elasticsearch/transport/transport/http/manticore"
108+
es_transport_client_type = "elasticsearch_transport"
109+
::Elasticsearch::Transport::Transport::HTTP::Manticore
110+
rescue ::LoadError
111+
require "elastic/transport/transport/http/manticore"
112+
es_transport_client_type = "elastic_transport"
113+
::Elastic::Transport::Transport::HTTP::Manticore
114+
end
101115
end
102116
end
103117
end

logstash-filter-elasticsearch.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-filter-elasticsearch'
4-
s.version = '3.17.0'
4+
s.version = '3.17.1'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -21,7 +21,7 @@ Gem::Specification.new do |s|
2121

2222
# Gem dependencies
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
24-
s.add_runtime_dependency 'elasticsearch', ">= 7.14.9" # LS >= 6.7 and < 7.14 all used version 5.0.5
24+
s.add_runtime_dependency 'elasticsearch', ">= 7.14.9", '< 9'
2525
s.add_runtime_dependency 'manticore', ">= 0.7.1"
2626
s.add_runtime_dependency 'logstash-mixin-ca_trusted_fingerprint_support', '~> 1.0'
2727
s.add_runtime_dependency 'logstash-mixin-normalize_config_support', '~>1.0'

spec/filters/elasticsearch_spec.rb

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,18 @@
6060
allow(plugin).to receive(:get_client).and_return(filter_client)
6161
allow(filter_client).to receive(:serverless?).and_return(true)
6262
allow(filter_client).to receive(:client).and_return(es_client)
63-
allow(es_client).to receive(:info).with(a_hash_including(:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER)).and_raise(
64-
Elasticsearch::Transport::Transport::Errors::BadRequest.new
65-
)
63+
64+
if elastic_ruby_v8_client_available?
65+
allow(es_client).to receive(:info)
66+
.with(a_hash_including(
67+
:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
68+
.and_raise(Elastic::Transport::Transport::Errors::BadRequest.new)
69+
else
70+
allow(es_client).to receive(:info)
71+
.with(a_hash_including(
72+
:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER))
73+
.and_raise(Elasticsearch::Transport::Transport::Errors::BadRequest.new)
74+
end
6675
end
6776

6877
it "raises an exception when Elastic Api Version is not supported" do
@@ -103,6 +112,11 @@
103112

104113
before(:each) do
105114
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
115+
if elastic_ruby_v8_client_available?
116+
allow(client).to receive(:es_transport_client_type).and_return('elastic_transport')
117+
else
118+
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
119+
end
106120
allow(client).to receive(:search).and_return(response)
107121
allow(plugin).to receive(:test_connection!)
108122
allow(plugin).to receive(:setup_serverless)
@@ -347,6 +361,11 @@
347361

348362
before do
349363
allow(plugin).to receive(:get_client).and_return(client_double)
364+
if elastic_ruby_v8_client_available?
365+
allow(client_double).to receive(:es_transport_client_type).and_return('elastic_transport')
366+
else
367+
allow(client_double).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
368+
end
350369
allow(client_double).to receive(:client).and_return(transport_double)
351370
end
352371

@@ -506,7 +525,12 @@ def wait_receive_request
506525
# this spec is a safeguard to trigger an assessment of thread-safety should
507526
# we choose a different transport adapter in the future.
508527
transport_class = extract_transport(client).options.fetch(:transport_class)
509-
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
528+
if elastic_ruby_v8_client_available?
529+
allow(client).to receive(:es_transport_client_type).and_return("elastic_transport")
530+
expect(transport_class).to equal ::Elastic::Transport::Transport::HTTP::Manticore
531+
else
532+
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
533+
end
510534
end
511535

512536
it 'uses a client with sufficient connection pool size' do
@@ -821,6 +845,11 @@ def wait_receive_request
821845

822846
before(:each) do
823847
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
848+
if elastic_ruby_v8_client_available?
849+
allow(client).to receive(:es_transport_client_type).and_return('elastic_transport')
850+
else
851+
allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport')
852+
end
824853
allow(plugin).to receive(:test_connection!)
825854
allow(plugin).to receive(:setup_serverless)
826855
plugin.register
@@ -835,11 +864,19 @@ def wait_receive_request
835864
end
836865
end
837866

838-
# @note can be removed once gem depends on elasticsearch >= 6.x
839-
def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client
867+
def extract_transport(client)
868+
# on 7x: client.transport.transport
869+
# on >=8.x: client.transport
840870
client.transport.respond_to?(:transport) ? client.transport.transport : client.transport
841871
end
842872

873+
def elastic_ruby_v8_client_available?
874+
Elasticsearch::Transport
875+
false
876+
rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available
877+
true
878+
end
879+
843880
class MockResponse
844881
attr_reader :code, :headers
845882

0 commit comments

Comments
 (0)