Skip to content

Commit 688751f

Browse files
committed
ES|QL PoC for es-input.
1 parent 7f8120c commit 688751f

File tree

2 files changed

+94
-4
lines changed

2 files changed

+94
-4
lines changed

lib/logstash/inputs/elasticsearch.rb

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
7373

7474
require 'logstash/inputs/elasticsearch/paginated_search'
7575
require 'logstash/inputs/elasticsearch/aggregation'
76+
require 'logstash/inputs/elasticsearch/esql'
7677

7778
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
7879
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
@@ -253,6 +254,11 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
253254
# If set, the _source of each hit will be added nested under the target instead of at the top-level
254255
config :target, :validate => :field_reference
255256

257+
# A mode to switch between DSL and ES|QL, defaults to DSL
258+
config :query_mode, :validate => %w[dsl, esql], :default => 'dsl'
259+
260+
config :esql, :validate => :hash
261+
256262
# Obsolete Settings
257263
config :ssl, :obsolete => "Set 'ssl_enabled' instead."
258264
config :ca_file, :obsolete => "Set 'ssl_certificate_authorities' instead."
@@ -283,10 +289,13 @@ def register
283289
fill_hosts_from_cloud_id
284290
setup_ssl_params!
285291

286-
@base_query = LogStash::Json.load(@query)
287-
if @slices
288-
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
289-
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
292+
puts "Query mode: #{@query_mode}"
293+
if @query_mode == 'dsl'
294+
@base_query = LogStash::Json.load(@query)
295+
if @slices
296+
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
297+
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
298+
end
290299
end
291300

292301
@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
@@ -360,6 +369,15 @@ def event_from_hit(hit, root_field)
360369
return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
361370
end
362371

372+
# hit: {columns: [], values: []}
373+
def decorate_and_push_to_queue(output_queue, mapped_entry)
374+
puts "mapped_entry class: #{mapped_entry.class}"
375+
puts "mapped_entry value: #{mapped_entry.inspect}"
376+
event = targeted_event_factory.new_event mapped_entry
377+
decorate(event)
378+
output_queue << event
379+
end
380+
363381
def set_docinfo_fields(hit, event)
364382
# do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
365383
docinfo_target = event.get(@docinfo_target) || {}
@@ -627,6 +645,11 @@ def setup_search_api
627645
end
628646

629647
def setup_query_executor
648+
if @query_mode == 'esql'
649+
@query_executor = LogStash::Inputs::Elasticsearch::Esql.new(@client, self)
650+
return @query_executor
651+
end
652+
630653
@query_executor = case @response_type
631654
when 'hits'
632655
if @resolved_search_api == "search_after"
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
require 'logstash/helpers/loggable_try'
2+
3+
module LogStash
4+
module Inputs
5+
class Elasticsearch
6+
class Esql
7+
include LogStash::Util::Loggable
8+
9+
ESQL_JOB = "ES|QL job"
10+
11+
def initialize(client, plugin)
12+
@client = client
13+
@plugin_params = plugin.params
14+
@plugin = plugin
15+
@retries = @plugin_params["retries"]
16+
17+
@query = @plugin_params["query"]
18+
esql_options = @plugin_params["esql"] ? @plugin_params["esql"]: {}
19+
@esql_params = esql_options["params"] ? esql_options["params"] : {}
20+
# TODO: add filter as well
21+
# @esql_params = esql_options["filter"] | []
22+
end
23+
24+
def retryable(job_name, &block)
25+
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
26+
stud_try.try((@retries + 1).times) { yield }
27+
rescue => e
28+
error_details = {:message => e.message, :cause => e.cause}
29+
error_details[:backtrace] = e.backtrace if logger.debug?
30+
logger.error("#{job_name} failed with ", error_details)
31+
false
32+
end
33+
34+
def do_run(output_queue)
35+
logger.info("ES|QL executor starting")
36+
response = retryable(ESQL_JOB) do
37+
@client.esql.query({ body: { query: @query }, format: 'json' })
38+
# TODO: make sure to add params, filters, etc...
39+
# @client.esql.query({ body: { query: @query }, format: 'json' }.merge(@esql_params))
40+
41+
end
42+
puts "Response: #{response.inspect}"
43+
if response && response['values']
44+
response['values'].each do |value|
45+
mapped_data = map_column_and_values(response['columns'], value)
46+
puts "Mapped Data: #{mapped_data}"
47+
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
48+
end
49+
end
50+
end
51+
52+
def map_column_and_values(columns, values)
53+
puts "columns class: #{columns.class}"
54+
puts "values class: #{values.class}"
55+
puts "columns: #{columns.inspect}"
56+
puts "values: #{values.inspect}"
57+
mapped_data = {}
58+
columns.each_with_index do |column, index|
59+
mapped_data[column["name"]] = values[index]
60+
end
61+
puts "values: #{mapped_data.inspect}"
62+
mapped_data
63+
end
64+
end
65+
end
66+
end
67+
end

0 commit comments

Comments
 (0)