Skip to content

Commit cac3680

Browse files
committed
Require drop null columns by default. Set limit to 1 by defaul if FROM query used. Add more debug, ward logs.
1 parent f2077ef commit cac3680

File tree

6 files changed

+238
-77
lines changed

6 files changed

+238
-77
lines changed

docs/index.asciidoc

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ To configure ES|QL query in the plugin, set your ES|QL query in the `query` para
135135

136136
IMPORTANT: We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments.
137137

138-
The following is a basic ES|QL query that sets food name to transaction event based on event's food ID:
138+
The following is a basic ES|QL query that sets food name to transaction event based on upstream event's food ID:
139139
[source, ruby]
140140
filter {
141141
elasticsearch {
@@ -145,27 +145,20 @@ The following is a basic ES|QL query that sets food name to transaction event ba
145145
FROM food-index
146146
| WHERE id = "?food_id"
147147
'
148-
fields => { "name" => "food_name" }
149148
query_params => {
150-
named_params => ["food_id" => "%{[food_id]}"]
151-
drop_null_columns => true
149+
named_params => ["food_id" => "[food][id]"]
152150
}
151+
fields => { "food.name" => "food_name" }
153152
}
154153
}
155154

156155
Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query.
157156

158-
NOTE: With ES|QL query, {ls} doesn't generate `event.original`.
157+
In the result event, the plugin sets total result size in `[@metadata][total_hits]` field. It also limits the result size to 1 when `FROM` query is used.
159158

160-
In the result event, the plugin sets total result size in `[@metadata][total_values]` field. It also limits the result size to 1 when `FROM` query is used.
159+
NOTE: If `FROM` execution command used and not `LIMIT` is set, the plugin attaches `| LIMIT 1`.
161160

162-
Consider the following caveat scenarios:
163-
164-
- ES|QL by default returns entire columns even if their values are `null`. The plugin provides a `drop_null_columns` option via <<plugins-{type}s-{plugin}-query_params>>. Enabling this parameter instructs {es} to automatically exclude columns with null values from query results.
165-
- If your {es} index uses https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] mapping(s), ES|QL query fetches all parent and sub-fields fields. Since {ls} events cannot contain parent field's concrete value and sub-field values together, we recommend using the `DROP` keyword in your ES|QL query explicitly remove sub-fields.
166-
- If your {es} index contains top level `tags` field, this will conflict with {ls} event's reserved `tags` field. {ls} moves `tags` field values to the `_tags` and populates `tags` with `["_tagsparsefailure"]`.
167-
168-
For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation].
161+
For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{es} ES|QL documentation].
169162

170163
[id="plugins-{type}s-{plugin}-options"]
171164
==== Elasticsearch Filter Configuration Options

lib/logstash/filters/elasticsearch.rb

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
136136
# named params can be applied as following via query_params:
137137
# query_params => {
138138
# "named_params" => [ {"type" => "%{[type]}"}]
139-
# "drop_null_columns" => true
140139
# }
141140
config :query_params, :validate => :hash, :default => {}
142141

@@ -177,8 +176,11 @@ def register
177176
query_type = resolve_query_type
178177
case query_type
179178
when "esql"
179+
invalid_params_with_esql = original_params.keys & %w(index query_template sort docinfo_fields aggregation_fields enable_sort result_size)
180+
raise LogStash::ConfigurationError, "Configured #{invalid_params_with_esql} params cannot be used with ES|QL query" if invalid_params_with_esql.any?
181+
180182
validate_ls_version_for_esql_support!
181-
validate_params_with_esql_query!
183+
validate_esql_query_and_params!
182184
@esql_executor ||= LogStash::Filters::Elasticsearch::EsqlExecutor.new(self, @logger)
183185
else # dsl
184186
validate_dsl_query_settings!
@@ -448,19 +450,25 @@ def validate_ls_version_for_esql_support!
448450
end
449451
end
450452

451-
def validate_params_with_esql_query!
452-
invalid_params_with_esql = original_params.keys & %w(index query_template sort docinfo_fields aggregation_fields enable_sort result_size)
453-
fail("Configured #{invalid_params_with_esql} params cannot be used with ES|QL query") if invalid_params_with_esql.any?
454-
455-
accepted_query_params = %w(named_params drop_null_columns)
453+
def validate_esql_query_and_params!
454+
accepted_query_params = %w(named_params)
456455
original_query_params = original_params["query_params"] ||= {}
457456
invalid_query_params = original_query_params.keys - accepted_query_params
458-
fail("#{accepted_query_params} options are accepted in `query_params`, but found #{invalid_query_params} invalid option(s)") if invalid_query_params.any?
457+
raise LogStash::ConfigurationError, "#{accepted_query_params} option(s) accepted in `query_params`, but found #{invalid_query_params} invalid option(s)" if invalid_query_params.any?
459458

460459
is_named_params_array = original_query_params["named_params"] ? original_query_params["named_params"].class.eql?(Array) : true
461-
fail("`query_params => named_params` is required to be array") unless is_named_params_array
460+
raise LogStash::ConfigurationError, "`query_params => named_params` is required to be array" unless is_named_params_array
461+
462+
named_params = original_query_params["named_params"] ||= []
463+
named_params_keys = named_params.map(&:keys).flatten
462464

463-
# TODO: validate that placeholders in query should match the named_params
465+
placeholders = @query.scan(/\?(\w+)/).flatten
466+
raise LogStash::ConfigurationError, "Number of placeholders in `query` and `named_params` do not match" unless placeholders.size == named_params_keys.size
467+
468+
placeholders.each do |placeholder|
469+
placeholder.delete_prefix!("?")
470+
raise LogStash::ConfigurationError, "Placeholder #{placeholder} not found in query" unless named_params_keys.include?(placeholder)
471+
end
464472
end
465473

466474
def validate_es_for_esql_support!

lib/logstash/filters/elasticsearch/esql_executor.rb

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,19 @@ class EsqlExecutor
77

88
def initialize(plugin, logger)
99
@plugin = plugin
10+
@logger = logger
1011

11-
params = plugin.params["query_params"] || {}
12-
@drop_null_columns = params["drop_null_columns"] || false
13-
@named_params = params["named_params"] || []
1412
@query = plugin.params["query"]
13+
if @query.strip.start_with?("FROM") && !@query.match?(/\|\s*LIMIT/)
14+
@logger.warn("ES|QL query doesn't contain LIMIT, adding `| LIMIT 1` to optimize the performance")
15+
@query.concat(' | LIMIT 1')
16+
end
17+
18+
query_params = plugin.params["query_params"] || {}
19+
@named_params = query_params["named_params"] || []
1520
@fields = plugin.params["fields"]
1621
@tag_on_failure = plugin.params["tag_on_failure"]
17-
@logger = logger
22+
@logger.debug("ES|QL query executor initialized with ", query: @query, named_params: @named_params)
1823
end
1924

2025
def process(client, event)
@@ -34,8 +39,11 @@ def resolve_parameters(event)
3439
@named_params.map do |entry|
3540
entry.each_with_object({}) do |(key, value), new_entry|
3641
begin
37-
new_entry[key] = event.sprintf(value)
42+
resolved_value = event.get(value)
43+
@logger.debug("Resolved value for #{key}: #{resolved_value}, its class: #{resolved_value.class}")
44+
new_entry[key] = resolved_value
3845
rescue => e
46+
# catches invalid field reference
3947
@logger.error("Failed to resolve parameter", key: key, value: value, error: e.message)
4048
raise
4149
end
@@ -44,30 +52,45 @@ def resolve_parameters(event)
4452
end
4553

4654
def execute_query(client, params)
55+
# debug logs may help to check what query shape the plugin is sending to ES
4756
@logger.debug("Executing ES|QL query", query: @query, params: params)
48-
client.search({ body: { query: @query, params: params }, format: 'json', drop_null_columns: @drop_null_columns }, 'esql')
57+
client.search({ body: { query: @query, params: params }, format: 'json', drop_null_columns: true }, 'esql')
4958
end
5059

5160
def process_response(event, response)
52-
return unless response['values'] && response['columns']
61+
columns = response['columns'].freeze
62+
values = response['values'].freeze
63+
if values.nil? || values.size == 0
64+
@logger.debug("Empty ES|QL query result", columns: columns, values: values)
65+
return
66+
end
67+
68+
# this shouldn't never happen but just in case not crash the plugin
69+
if columns.nil? || columns.size == 0
70+
@logger.error("No columns exist but received values", columns: columns, values: values)
71+
return
72+
end
5373

54-
# TODO: set to the target field once target support is added
55-
event.set("[@metadata][total_values]", response['values'].size)
56-
add_requested_fields(event, response)
74+
# TODO: do we need to set `total_hits` to target?
75+
# if not, how do we resolve conflict with existing es-input total_hits field?
76+
# FYI: with DSL it stores in `[@metadata][total_hits]`
77+
event.set("[@metadata][total_hits]", values.size)
78+
add_requested_fields(event, columns, values)
5779
end
5880

5981
def inform_warning(response)
6082
return unless (warning = response&.headers&.dig('warning'))
6183
@logger.warn("ES|QL executor received warning", { message: warning })
6284
end
6385

64-
def add_requested_fields(event, response)
86+
def add_requested_fields(event, columns, values)
6587
@fields.each do |old_key, new_key|
66-
column_index = response['columns'].find_index { |col| col['name'] == old_key }
88+
column_index = columns.find_index { |col| col['name'] == old_key }
6789
next unless column_index
6890

69-
values = response['values'].map { |entry| entry[column_index] }
70-
event.set(new_key, values.one? ? values.first : values) if values&.size > 0
91+
row_values = values.map { |entry| entry[column_index] }&.compact # remove non-exist field values with compact
92+
# TODO: set to the target field once target support is added
93+
event.set(new_key, row_values.one? ? row_values.first : row_values) if row_values&.size > 0
7194
end
7295
end
7396
end

spec/filters/elasticsearch_esql_spec.rb

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,43 +13,44 @@
1313
end
1414
let(:esql_executor) { described_class.new(plugin, logger) }
1515

16-
describe "when initializes" do
16+
context "when initializes" do
1717
it "sets up the ESQL client with correct parameters" do
18+
allow(logger).to receive(:debug)
1819
expect(esql_executor.instance_variable_get(:@query)).to eq(plugin_config["query"])
1920
expect(esql_executor.instance_variable_get(:@named_params)).to eq([])
20-
expect(esql_executor.instance_variable_get(:@drop_null_columns)).to eq(false)
2121
expect(esql_executor.instance_variable_get(:@fields)).to eq({})
2222
expect(esql_executor.instance_variable_get(:@tag_on_failure)).to eq(["_elasticsearch_lookup_failure"])
2323
end
2424
end
2525

26-
describe "when processes" do
26+
context "when processes" do
2727
let(:plugin_config) {
2828
super()
2929
.merge(
3030
{
3131
"query" => "FROM my-index | WHERE field = ?foo | LIMIT 5",
32-
"query_params" => { "named_params" => [{ "foo" => "%{bar}" }] },
33-
"fields" => { "val" => "val_new" }
32+
"query_params" => { "named_params" => [{ "foo" => "[bar]" }] },
33+
"fields" => { "val" => "val_new", "odd" => "new_odd" }
3434
})
3535
}
3636
let(:event) { LogStash::Event.new({}) }
37-
let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'id' }, { 'name' => 'val' }] } }
37+
let(:response) { { 'values' => [["foo", "bar", nil]], 'columns' => [{ 'name' => 'id' }, { 'name' => 'val' }, { 'name' => 'odd' }] } }
3838

3939
before do
40-
allow(event).to receive(:sprintf).and_return("resolved_value")
40+
allow(logger).to receive(:debug)
4141
end
4242

4343
it "resolves parameters" do
44-
expect(event).to receive(:sprintf).with("%{bar}").and_return("resolved_value")
44+
expect(event).to receive(:get).with("[bar]").and_return("resolved_value")
4545
resolved_params = esql_executor.send(:resolve_parameters, event)
4646
expect(resolved_params).to include("foo" => "resolved_value")
4747
end
4848

4949
it "executes the query with resolved parameters" do
5050
allow(logger).to receive(:debug)
51+
expect(event).to receive(:get).with("[bar]").and_return("resolved_value")
5152
expect(client).to receive(:search).with(
52-
{ body: { query: plugin_config["query"], params: [{ "foo" => "resolved_value" }] }, format: 'json', drop_null_columns: false, },
53+
{ body: { query: plugin_config["query"], params: [{ "foo" => "resolved_value" }] }, format: 'json', drop_null_columns: true, },
5354
'esql')
5455
resolved_params = esql_executor.send(:resolve_parameters, event)
5556
esql_executor.send(:execute_query, client, resolved_params)
@@ -62,7 +63,7 @@
6263
end
6364

6465
it "processes the response and adds metadata" do
65-
expect(event).to receive(:set).with("[@metadata][total_values]", 1)
66+
expect(event).to receive(:set).with("[@metadata][total_hits]", 1)
6667
expect(event).to receive(:set).with("val_new", "bar")
6768
esql_executor.send(:process_response, event, response)
6869
end
@@ -73,19 +74,25 @@
7374
allow(response).to receive(:headers).and_return({})
7475
expect(client).to receive(:search).with(
7576
{
76-
body: { query: plugin_config["query"], params: plugin_config["query_params"]["named_params"] },
77+
body: { query: plugin_config["query"], params: [{"foo"=>"resolve_me"}] },
7778
format: 'json',
78-
drop_null_columns: false,
79+
drop_null_columns: true,
7980
},
8081
'esql'
8182
).and_return(response)
82-
expect { esql_executor.process(client, LogStash::Event.new({ "hello" => "world" })) }.to_not raise_error
83+
84+
event = LogStash::Event.new({ "hello" => "world", "bar" => "resolve_me" })
85+
expect { esql_executor.process(client, event) }.to_not raise_error
86+
expect(event.get("[@metadata][total_hits]")).to eq(1)
87+
expect(event.get("hello")).to eq("world")
88+
expect(event.get("val_new")).to eq("bar")
89+
expect(event.get("new_odd")).to be_nil # filters out non-exist fields
8390
end
8491

8592
it "tags on plugin failures" do
86-
expect(event).to receive(:sprintf).with("%{bar}").and_raise("Event#sprintf error")
93+
expect(event).to receive(:get).with("[bar]").and_raise("Event#get Invalid FieldReference error")
8794

88-
expect(logger).to receive(:error).with("Failed to resolve parameter", {:error=>"Event#sprintf error", :key=>"foo", :value=>"%{bar}"})
95+
expect(logger).to receive(:error).with("Failed to resolve parameter", {:error=>"Event#get Invalid FieldReference error", :key=>"foo", :value=>"[bar]"})
8996
expect(logger).to receive(:error).with("Failed to process ES|QL filter", exception: instance_of(RuntimeError))
9097
expect(event).to receive(:tag).with("_elasticsearch_lookup_failure")
9198
esql_executor.process(client, event)

spec/filters/elasticsearch_spec.rb

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,108 @@ def wait_receive_request
864864
end
865865
end
866866

867+
describe "ES|QL" do
868+
869+
describe "compatibility" do
870+
let(:config) {{ "hosts" => ["localhost:9200"], "query" => "FROM my-index" }}
871+
872+
context "when LS doesn't support ES|QL" do
873+
let(:ls_version) { LogStash::Filters::Elasticsearch::LS_ESQL_SUPPORT_VERSION }
874+
before(:each) do
875+
stub_const("LOGSTASH_VERSION", "8.17.0")
876+
end
877+
878+
it "raises a runtime error" do
879+
expect { plugin.send(:validate_ls_version_for_esql_support!) }
880+
.to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{ls_version}/)
881+
end
882+
end
883+
884+
context "when ES doesn't support ES|QL" do
885+
let(:es_version) { LogStash::Filters::Elasticsearch::ES_ESQL_SUPPORT_VERSION }
886+
let(:client) { double(:client) }
887+
888+
it "raises a runtime error" do
889+
allow(plugin).to receive(:get_client).twice.and_return(client)
890+
allow(client).to receive(:es_version).and_return("8.8.0")
891+
892+
expect { plugin.send(:validate_es_for_esql_support!) }
893+
.to raise_error(RuntimeError, /Connected Elasticsearch 8.8.0 version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{es_version} version./)
894+
end
895+
end
896+
end
897+
898+
context "when non-ES|QL params applied" do
899+
let(:config) do
900+
{
901+
"hosts" => ["localhost:9200"],
902+
"query" => "FROM my-index",
903+
"index" => "some-index",
904+
"docinfo_fields" => { "_index" => "es_index" },
905+
"sort" => "@timestamp:desc",
906+
"enable_sort" => true,
907+
"aggregation_fields" => { "bytes_avg" => "bytes_avg_ls_field" }
908+
}
909+
end
910+
it "raises a config error" do
911+
invalid_params_with_esql = %w(index docinfo_fields sort enable_sort aggregation_fields)
912+
error_text = /Configured #{invalid_params_with_esql} params cannot be used with ES|QL query/i
913+
expect { plugin.register }.to raise_error LogStash::ConfigurationError, error_text
914+
end
915+
end
916+
917+
context "when `named_params` isn't array" do
918+
let(:config) do
919+
{
920+
"hosts" => ["localhost:9200"],
921+
"query" => "FROM my-index",
922+
"query_params" => { "named_params" => {"a" => "b"} },
923+
}
924+
end
925+
it "raises a config error" do
926+
expect { plugin.register }.to raise_error LogStash::ConfigurationError, /`query_params => named_params` is required to be array/
927+
end
928+
end
929+
930+
context "when `named_params` exists but not placeholder in the query" do
931+
let(:config) do
932+
{
933+
"hosts" => ["localhost:9200"],
934+
"query" => "FROM my-index",
935+
"query_params" => { "named_params" => [{"a" => "b"}] },
936+
}
937+
end
938+
it "raises a config error" do
939+
expect { plugin.register }.to raise_error LogStash::ConfigurationError, /Number of placeholders in `query` and `named_params` do not match/
940+
end
941+
end
942+
943+
context "when `named_params` doesn't exist but placeholder found" do
944+
let(:config) do
945+
{
946+
"hosts" => ["localhost:9200"],
947+
"query" => "FROM my-index | WHERE a = ?a"
948+
}
949+
end
950+
it "raises a config error" do
951+
expect { plugin.register }.to raise_error LogStash::ConfigurationError, /Number of placeholders in `query` and `named_params` do not match/
952+
end
953+
end
954+
955+
context "when placeholder and `named_params` do not match" do
956+
let(:config) do
957+
{
958+
"hosts" => ["localhost:9200"],
959+
"query" => "FROM my-index | WHERE type = ?type",
960+
"query_params" => { "named_params" => [{"b" => "c"}] },
961+
}
962+
end
963+
it "raises a config error" do
964+
expect { plugin.register }.to raise_error LogStash::ConfigurationError, /Placeholder type not found in query/
965+
end
966+
end
967+
end
968+
867969
def extract_transport(client)
868970
# on 7x: client.transport.transport
869971
# on >=8.x: client.transport

0 commit comments

Comments
 (0)