Skip to content

Commit 4ed69ff

Browse files
committed
Introduce query_params option to accept drop_null_columns, set default timestampt converter to LogStash::Timestamp, dotted fields extended to nested fields.
1 parent 4ce6fa4 commit 4ed69ff

File tree

5 files changed

+113
-9
lines changed

5 files changed

+113
-9
lines changed

docs/index.asciidoc

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,13 @@ The following is a basic scheduled ES|QL query that runs hourly:
266266

267267
Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query.
268268

269-
NOTE: With ES|QL query, {ls} doesn't generate `event.original`
269+
NOTE: With ES|QL query, {ls} doesn't generate `event.original`.
270+
271+
Consider the following caveat scenarios:
272+
273+
- ES|QL by default returns entire columns even if their values are `null`. The plugin provides a `drop_null_columns` option via <<plugins-{type}s-{plugin}-query_params>>. Enabling this parameter instructs {es} to automatically exclude columns with null values from query results.
274+
- If your {es} index uses https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] mapping(s), ES|QL query fetches all parent and sub-fields fields. Since {ls} events cannot contain parent field's concrete value and sub-field values together, we recommend using the `DROP` keyword in your ES|QL query explicitly remove sub-fields.
275+
- If your {es} index contains top level `tags` field, this will conflict with {ls} event's reserved `tags` field. {ls} moves `tags` field values to the `_tags` and populates `tags` with `["_tagsparsefailure"]`.
270276

271277
For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation].
272278

@@ -297,6 +303,7 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details.
297303
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
298304
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
299305
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
306+
| <<plugins-{type}s-{plugin}-query_params>> |<<hash,hash>>|No
300307
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations","esql"]`|No
301308
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
302309
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
@@ -544,6 +551,31 @@ documentation] for more information.
544551
When <<plugins-{type}s-{plugin}-search_api>> resolves to `search_after` and the query does not specify `sort`,
545552
the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the query. Please refer to the {ref}/paginate-search-results.html#search-after[Elasticsearch search_after] parameter to know more.
546553

554+
[id="plugins-{type}s-{plugin}-query_params"]
555+
===== `query_params`
556+
Parameters to send to {es} together with <<plugins-{type}s-{plugin}-query>>.
557+
558+
Accepted options:
559+
[cols="2,1,3",options="header"]
560+
|===
561+
|Option name |Default value | Description
562+
563+
|`drop_null_columns` |`false` | Requests {es} to filter out `null` columns
564+
|===
565+
566+
Example
567+
[source, ruby]
568+
input {
569+
elasticsearch {
570+
response_type => 'esql'
571+
query => 'FROM access-logs* | WHERE type="apache"'
572+
query_params => {
573+
drop_null_columns => true
574+
}
575+
}
576+
}
577+
578+
547579
[id="plugins-{type}s-{plugin}-response_type"]
548580
===== `response_type`
549581

lib/logstash/inputs/elasticsearch.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
276276
# If set, the _source of each hit will be added nested under the target instead of at the top-level
277277
config :target, :validate => :field_reference
278278

279+
# Parameters query or query APIs can use
280+
# current acceptable params: drop_null_columns => true|false (for ES|QL)
281+
config :query_params, :validate => :hash, :default => {}
282+
279283
# Obsolete Settings
280284
config :ssl, :obsolete => "Set 'ssl_enabled' instead."
281285
config :ca_file, :obsolete => "Set 'ssl_certificate_authorities' instead."
@@ -323,6 +327,7 @@ def register
323327

324328
@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
325329

330+
validate_query_params!
326331
validate_authentication
327332
fill_user_password_from_cloud_auth
328333

@@ -751,6 +756,17 @@ def validate_esql_query!
751756
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
752757
end
753758

759+
def validate_query_params!
760+
# keep the original, remove ES|QL accepted params and validate
761+
cloned_query_params = @query_params.clone
762+
if @response_type == 'esql'
763+
cloned_query_params.delete("drop_null_columns")
764+
fail(LogStash::ConfigurationError, "#{cloned_query_params} not accepted when `response_type => 'esql'`") if cloned_query_params.any?
765+
else
766+
fail(LogStash::ConfigurationError, "#{@query_params} not accepted when `response_type => #{@response_type}`") if @query_params.any?
767+
end
768+
end
769+
754770
def inform_ineffective_esql_params
755771
ineffective_options = original_params.keys & %w(index target size slices search_api, docinfo, docinfo_target, docinfo_fields)
756772
@logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1

lib/logstash/inputs/elasticsearch/esql.rb

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ class Esql
88

99
ESQL_JOB = "ES|QL job"
1010

11+
ESQL_PARSERS_BY_TYPE = Hash.new(lambda { |x| x }).merge(
12+
'date' => ->(value) { value && LogStash::Timestamp.new(value) },
13+
)
14+
1115
# Initialize the ESQL query executor
1216
# @param client [Elasticsearch::Client] The Elasticsearch client instance
1317
# @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance
@@ -20,6 +24,9 @@ def initialize(client, plugin)
2024
unless @query.include?('METADATA')
2125
logger.warn("The query doesn't have METADATA keyword. Including it makes _id and _version available in the documents", {:query => @query})
2226
end
27+
28+
params = plugin.params["query_params"] || {}
29+
@drop_null_columns = params["drop_null_columns"] || false
2330
end
2431

2532
# Execute the ESQL query and process results
@@ -28,7 +35,7 @@ def initialize(client, plugin)
2835
def do_run(output_queue, query)
2936
logger.info("ES|QL executor starting")
3037
response = retryable(ESQL_JOB) do
31-
@client.esql.query({ body: { query: @query }, format: 'json' })
38+
@client.esql.query({ body: { query: @query }, format: 'json', drop_null_columns: @drop_null_columns })
3239
end
3340
# retriable already printed error details
3441
return if response == false
@@ -64,7 +71,13 @@ def retryable(job_name, &block)
6471
def process_response(values, columns, output_queue)
6572
values.each do |value|
6673
mapped_data = map_column_and_values(columns, value)
67-
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
74+
nest_structured_data = nest_keys(mapped_data)
75+
@plugin.decorate_and_push_to_queue(output_queue, nest_structured_data)
76+
rescue => e
77+
# if event creation fails with whatever reason, inform user and tag with failure and return entry as it is
78+
logger.warn("Event creation error, ", message: e.message, exception: e.class, data: { "columns" => columns, "values" => [value] })
79+
failed_event = LogStash::Event.new("columns" => columns, "values" => [value], "tags" => ['_elasticsearch_input_failure'])
80+
output_queue << failed_event
6881
end
6982
end
7083

@@ -74,7 +87,19 @@ def process_response(values, columns, output_queue)
7487
# @return [Hash] Mapped data with column names as keys
7588
def map_column_and_values(columns, values)
7689
columns.each_with_index.with_object({}) do |(column, index), mapped_data|
77-
mapped_data[column["name"]] = values[index]
90+
mapped_data[column["name"]] = ESQL_PARSERS_BY_TYPE[column["type"]].call(values[index])
91+
end
92+
end
93+
94+
# Transforms dotted keys to nested JSON shape
95+
# @param dot_keyed_hash [Hash] whose keys are dotted (example 'a.b.c.d': 'val')
96+
# @return [Hash] whose keys are nested with value mapped ({'a':{'b':{'c':{'d':'val'}}}})
97+
def nest_keys(dot_keyed_hash)
98+
dot_keyed_hash.each_with_object({}) do |(key, value), result|
99+
key_parts = key.to_s.split('.')
100+
*path, leaf = key_parts
101+
leaf_scope = path.inject(result) { |scope, part| scope[part] ||= {} }
102+
leaf_scope[leaf] = value
78103
end
79104
end
80105
end

spec/inputs/elasticsearch_esql_spec.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444

4545
describe "when executing chain of processes" do
4646
let(:output_queue) { Queue.new }
47-
let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'id'}, { 'name' => 'val'}] } }
47+
let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'a.b.1.d'}, { 'name' => 'h_g.k$l.m.0'}] } }
4848

4949
before do
5050
allow(esql_executor).to receive(:retryable).and_yield
@@ -55,7 +55,7 @@
5555
it "executes the ESQL query and processes the results" do
5656
allow(response).to receive(:headers).and_return({})
5757
esql_executor.do_run(output_queue, plugin_config["query"])
58-
expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'})
58+
expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {"a"=>{"b"=>{"1"=>{"d"=>"foo"}}}, "h_g"=>{"k$l"=>{"m"=>{"0"=>"bar"}}}})
5959
end
6060

6161
it "logs a warning if the response contains a warning header" do
@@ -71,16 +71,15 @@
7171
end
7272
end
7373

74-
7574
describe "when starts processing the response" do
7675
let(:output_queue) { Queue.new }
7776
let(:values) { [%w[foo bar]] }
78-
let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] }
77+
let(:columns) { [{'name' => 'some.id'}, {'name' => 'some.val'}] }
7978

8079
it "processes the ESQL response and pushes events to the output queue" do
8180
allow(plugin).to receive(:decorate_and_push_to_queue)
8281
esql_executor.send(:process_response, values, columns, output_queue)
83-
expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'})
82+
expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {"some"=>{"id"=>"foo", "val"=>"bar"}})
8483
end
8584
end
8685

spec/inputs/elasticsearch_spec.rb

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,6 +1473,38 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie
14731473
end
14741474
end
14751475
end
1476+
1477+
describe "with extra params" do
1478+
context "empty `query_params`" do
1479+
let(:config) {
1480+
super().merge('query_params' => {})
1481+
}
1482+
1483+
it "does not raise a configuration error" do
1484+
expect { plugin.send(:validate_query_params!) }.not_to raise_error
1485+
end
1486+
end
1487+
1488+
context "with actual `drop_null_columns` value" do
1489+
let(:config) {
1490+
super().merge('query_params' => { 'drop_null_columns' => true })
1491+
}
1492+
1493+
it "does not raise a configuration error" do
1494+
expect { plugin.send(:validate_query_params!) }.not_to raise_error
1495+
end
1496+
end
1497+
1498+
context "with extra non ES|QL params" do
1499+
let(:config) {
1500+
super().merge('query_params' => { 'drop_null_columns' => true, 'test' => 'hi'})
1501+
}
1502+
1503+
it "does not raise a configuration error" do
1504+
expect { plugin.send(:validate_query_params!) }.to raise_error(LogStash::ConfigurationError, "{\"test\"=>\"hi\"} not accepted when `response_type => 'esql'`")
1505+
end
1506+
end
1507+
end
14761508
end
14771509
end
14781510
end

0 commit comments

Comments
 (0)