Skip to content

Commit 2bde888

Browse files
committed
ES|QL support: ESQL executor implementation, response type to accept esql option, validations to make sure both LS and ES support the ESQL execution.
1 parent 688751f commit 2bde888

File tree

5 files changed

+197
-42
lines changed

5 files changed

+197
-42
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.1.0
2+
- ES|QL support [#N](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/N)
3+
14
## 5.0.2
25
- Add elastic-transport client support used in elasticsearch-ruby 8.x [#223](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/223)
36

lib/logstash/inputs/elasticsearch.rb

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,17 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
9696
# The index or alias to search.
9797
config :index, :validate => :string, :default => "logstash-*"
9898

99-
# The query to be executed. Read the Elasticsearch query DSL documentation
100-
# for more info
101-
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
99+
# The query to be executed.
100+
# It accepts DSL and ES|QL query shapes.
101+
# Read the Elasticsearch query documentations for more info:
102+
# - DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
103+
# - ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html
102104
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'
103105

104106
# This allows you to speccify the response type: either hits or aggregations
105107
# where hits: normal search request
106108
# aggregations: aggregation request
107-
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
109+
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits'
108110

109111
# This allows you to set the maximum number of hits returned per scroll.
110112
config :size, :validate => :number, :default => 1000
@@ -254,11 +256,6 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
254256
# If set, the _source of each hit will be added nested under the target instead of at the top-level
255257
config :target, :validate => :field_reference
256258

257-
# A mode to switch between DSL and ES|QL, defaults to DSL
258-
config :query_mode, :validate => %w[dsl, esql], :default => 'dsl'
259-
260-
config :esql, :validate => :hash
261-
262259
# Obsolete Settings
263260
config :ssl, :obsolete => "Set 'ssl_enabled' instead."
264261
config :ca_file, :obsolete => "Set 'ssl_certificate_authorities' instead."
@@ -289,8 +286,12 @@ def register
289286
fill_hosts_from_cloud_id
290287
setup_ssl_params!
291288

292-
puts "Query mode: #{@query_mode}"
293-
if @query_mode == 'dsl'
289+
if @response_type == 'esql'
290+
validate_ls_version_for_esql_support!
291+
validate_esql_query!
292+
inform_ineffective_esql_params
293+
else
294+
# for the ES|QL, plugin accepts raw string query but JSON for others
294295
@base_query = LogStash::Json.load(@query)
295296
if @slices
296297
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@@ -331,6 +332,9 @@ def register
331332

332333
test_connection!
333334

335+
# make sure connected ES supports ES|QL (8.11+)
336+
validate_es_for_esql_support! if @response_type == 'esql'
337+
334338
setup_serverless
335339

336340
setup_search_api
@@ -369,10 +373,7 @@ def event_from_hit(hit, root_field)
369373
return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
370374
end
371375

372-
# hit: {columns: [], values: []}
373376
def decorate_and_push_to_queue(output_queue, mapped_entry)
374-
puts "mapped_entry class: #{mapped_entry.class}"
375-
puts "mapped_entry value: #{mapped_entry.inspect}"
376377
event = targeted_event_factory.new_event mapped_entry
377378
decorate(event)
378379
output_queue << event
@@ -645,11 +646,6 @@ def setup_search_api
645646
end
646647

647648
def setup_query_executor
648-
if @query_mode == 'esql'
649-
@query_executor = LogStash::Inputs::Elasticsearch::Esql.new(@client, self)
650-
return @query_executor
651-
end
652-
653649
@query_executor = case @response_type
654650
when 'hits'
655651
if @resolved_search_api == "search_after"
@@ -660,6 +656,8 @@ def setup_query_executor
660656
end
661657
when 'aggregations'
662658
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
659+
when 'esql'
660+
LogStash::Inputs::Elasticsearch::Esql.new(@client, self)
663661
end
664662
end
665663

@@ -677,6 +675,31 @@ def get_transport_client_class
677675
::Elastic::Transport::Transport::HTTP::Manticore
678676
end
679677

678+
def validate_ls_version_for_esql_support!
679+
# LS 8.17.4+ has elasticsearch-ruby 8.17 client
680+
# elasticsearch-ruby 8.11+ supports ES|QL
681+
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create("8.17.4")
682+
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4")
683+
end
684+
end
685+
686+
def validate_esql_query!
687+
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
688+
source_commands = %w[FROM ROW SHOW]
689+
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
690+
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
691+
end
692+
693+
def inform_ineffective_esql_params
694+
ineffective_options = original_params.keys & %w(target size slices search_api)
695+
@logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1
696+
end
697+
698+
def validate_es_for_esql_support!
699+
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create("8.11")
700+
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. Please upgrade it.") unless es_supports_esql
701+
end
702+
680703
module URIOrEmptyValidator
681704
##
682705
# @override to provide :uri_or_empty validator

lib/logstash/inputs/elasticsearch/esql.rb

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,21 @@ class Esql
88

99
ESQL_JOB = "ES|QL job"
1010

11+
# Initialize the ESQL query executor
12+
# @param client [Elasticsearch::Client] The Elasticsearch client instance
13+
# @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance
1114
def initialize(client, plugin)
1215
@client = client
1316
@plugin_params = plugin.params
1417
@plugin = plugin
1518
@retries = @plugin_params["retries"]
16-
1719
@query = @plugin_params["query"]
18-
esql_options = @plugin_params["esql"] ? @plugin_params["esql"]: {}
19-
@esql_params = esql_options["params"] ? esql_options["params"] : {}
20-
# TODO: add filter as well
21-
# @esql_params = esql_options["filter"] | []
2220
end
2321

22+
# Execute a retryable operation with proper error handling
23+
# @param job_name [String] Name of the job for logging purposes
24+
# @yield The block to execute
25+
# @return [Boolean] true if successful, false otherwise
2426
def retryable(job_name, &block)
2527
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
2628
stud_try.try((@retries + 1).times) { yield }
@@ -31,35 +33,47 @@ def retryable(job_name, &block)
3133
false
3234
end
3335

36+
# Execute the ESQL query and process results
37+
# @param output_queue [Queue] The queue to push processed events to
3438
def do_run(output_queue)
3539
logger.info("ES|QL executor starting")
3640
response = retryable(ESQL_JOB) do
3741
@client.esql.query({ body: { query: @query }, format: 'json' })
38-
# TODO: make sure to add params, filters, etc...
39-
# @client.esql.query({ body: { query: @query }, format: 'json' }.merge(@esql_params))
42+
end
43+
# retriable already printed error details
44+
return if response == false
4045

46+
puts "response class: #{response.class}"
47+
puts "response: #{response.inspect}"
48+
unless response&.headers&.dig("warning")
49+
logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]})
50+
end
51+
if response['values'] && response['columns']
52+
process_response(response['values'], response['columns'], output_queue)
4153
end
42-
puts "Response: #{response.inspect}"
43-
if response && response['values']
44-
response['values'].each do |value|
45-
mapped_data = map_column_and_values(response['columns'], value)
46-
puts "Mapped Data: #{mapped_data}"
47-
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
48-
end
54+
end
55+
56+
private
57+
58+
# Process the ESQL response and push events to the output queue
59+
# @param values [Array[Array]] The ESQL query response hits
60+
# @param columns [Array[Hash]] The ESQL query response columns
61+
# @param output_queue [Queue] The queue to push processed events to
62+
def process_response(values, columns, output_queue)
63+
values.each do |value|
64+
mapped_data = map_column_and_values(columns, value)
65+
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
4966
end
5067
end
5168

69+
# Map column names to their corresponding values
70+
# @param columns [Array] Array of column definitions
71+
# @param values [Array] Array of values for the current row
72+
# @return [Hash] Mapped data with column names as keys
5273
def map_column_and_values(columns, values)
53-
puts "columns class: #{columns.class}"
54-
puts "values class: #{values.class}"
55-
puts "columns: #{columns.inspect}"
56-
puts "values: #{values.inspect}"
57-
mapped_data = {}
58-
columns.each_with_index do |column, index|
74+
columns.each_with_index.with_object({}) do |(column, index), mapped_data|
5975
mapped_data[column["name"]] = values[index]
6076
end
61-
puts "values: #{mapped_data.inspect}"
62-
mapped_data
6377
end
6478
end
6579
end

logstash-input-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-elasticsearch'
4-
s.version = '5.0.2'
4+
s.version = '5.1.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads query results from an Elasticsearch cluster"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# encoding: utf-8
2+
require "logstash/devutils/rspec/spec_helper"
3+
require "logstash/inputs/elasticsearch"
4+
require "elasticsearch"
5+
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'
6+
7+
describe LogStash::Inputs::Elasticsearch, :ecs_compatibility_support do
8+
let(:plugin) { described_class.new(config) }
9+
let(:client) { instance_double(Elasticsearch::Client) }
10+
let(:queue) { Queue.new }
11+
let(:cluster_info) { { "version" => { "number" => "8.11.0", "build_flavor" => "default" } } }
12+
13+
let(:config) do
14+
{
15+
"query" => "FROM test-index | STATS count() BY field",
16+
"response_type" => "esql",
17+
"retries" => 3
18+
}
19+
end
20+
21+
describe "#initialize" do
22+
it "sets up the ESQL client with correct parameters" do
23+
expect(plugin.instance_variable_get(:@query)).to eq(config["query"])
24+
expect(plugin.instance_variable_get(:@response_type)).to eq(config["response_type"])
25+
expect(plugin.instance_variable_get(:@retries)).to eq(config["retries"])
26+
end
27+
end
28+
29+
describe "#register" do
30+
before(:each) do
31+
Elasticsearch::Client.send(:define_method, :ping) { }
32+
allow_any_instance_of(Elasticsearch::Client).to receive(:info).and_return(cluster_info)
33+
end
34+
it "creates ES|QL executor" do
35+
plugin.register
36+
expect(plugin.instance_variable_get(:@query_executor)).to be_an_instance_of(LogStash::Inputs::Elasticsearch::Esql)
37+
end
38+
end
39+
40+
describe "#validation" do
41+
42+
describe "LS version" do
43+
context "when compatible" do
44+
before(:each) do
45+
stub_const("LogStash::VERSION", "8.11.0")
46+
end
47+
48+
it "does not raise an error" do
49+
expect { plugin.send(:validate_ls_version_for_esql_support!) }.not_to raise_error
50+
end
51+
end
52+
53+
context "when incompatible" do
54+
before(:each) do
55+
stub_const("LOGSTASH_VERSION", "8.10.0")
56+
end
57+
58+
it "raises a runtime error" do
59+
expect { plugin.send(:validate_ls_version_for_esql_support!) }
60+
.to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4/)
61+
end
62+
end
63+
end
64+
65+
describe "ES version" do
66+
before(:each) do
67+
allow(plugin).to receive(:es_version).and_return("8.10.5")
68+
end
69+
70+
context "when incompatible" do
71+
it "raises a runtime error" do
72+
expect { plugin.send(:validate_es_for_esql_support!) }
73+
.to raise_error(RuntimeError, /Connected Elasticsearch 8.10.5 version does not supports ES|QL. Please upgrade it./)
74+
end
75+
end
76+
end
77+
78+
describe "ES|QL query" do
79+
context "when query is valid" do
80+
it "does not raise an error" do
81+
expect { plugin.send(:validate_esql_query!) }.not_to raise_error
82+
end
83+
end
84+
85+
context "when query is empty" do
86+
let(:config) do
87+
{
88+
"query" => " "
89+
}
90+
end
91+
92+
# TODO: make shared spec
93+
it "raises a configuration error" do
94+
expect { plugin.send(:validate_esql_query!) }
95+
.to raise_error(LogStash::ConfigurationError, /`query` cannot be empty/)
96+
end
97+
end
98+
99+
context "when query doesn't align with ES syntax" do
100+
let(:config) do
101+
{
102+
"query" => "RANDOM query"
103+
}
104+
end
105+
106+
it "raises a configuration error" do
107+
source_commands = %w[FROM ROW SHOW]
108+
expect { plugin.send(:validate_esql_query!) }
109+
.to raise_error(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}")
110+
end
111+
end
112+
end
113+
end
114+
115+
end

0 commit comments

Comments
 (0)