Skip to content

Commit d3a1701

Browse files
mashhursyaauie
andauthored
Add a target field support. (#196) (#197)
* Introduces a target field as a field reference (mixin validated) where if set result is placed into the target. * Simplifies the set extracted values to the event with target logic. Applies setting to target with aggregations similarly with es-input. * Mention to target in each fields which can be placed in the target. Docs info fields are placed in target field. --------- (cherry picked from commit 5abbe49) Co-authored-by: Rye Biesemeyer <yaauie@users.noreply.github.com>
1 parent 3bb5a15 commit d3a1701

File tree

6 files changed

+121
-12
lines changed

6 files changed

+121
-12
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.18.0
2+
- Add `target` configuration option to store the result into it [#197](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/197)
3+
14
## 3.17.1
25
- Add elastic-transport client support used in elasticsearch-ruby 8.x [#193](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/193)
36

docs/index.asciidoc

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
160160
| <<plugins-{type}s-{plugin}-ssl_truststore_type>> |<<string,string>>|No
161161
| <<plugins-{type}s-{plugin}-ssl_verification_mode>> |<<string,string>>, one of `["full", "none"]`|No
162162
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<array,array>>|No
163+
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
163164
| <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
164165
|=======================================================================
165166

@@ -173,8 +174,11 @@ filter plugins.
173174

174175
* Value type is <<hash,hash>>
175176
* Default value is `{}`
177+
* Format: `"aggregation_name" => "[path][on][event]"`:
178+
** `aggregation_name`: aggregation name in result from {es}
179+
** `[path][on][event]`: path for where to place the value on the current event, using field-reference notation
176180

177-
Hash of aggregation names to copy from elasticsearch response into Logstash event fields
181+
A mapping of aggregations to copy into the <<plugins-{type}s-{plugin}-target>> of the current event.
178182

179183
Example:
180184
[source,ruby]
@@ -244,8 +248,11 @@ These custom headers will override any headers previously set by the plugin such
244248

245249
* Value type is <<hash,hash>>
246250
* Default value is `{}`
251+
* Format: `"path.in.source" => "[path][on][event]"`:
252+
** `path.in.source`: field path in document source of result from {es}, using dot-notation
253+
** `[path][on][event]`: path for where to place the value on the current event, using field-reference notation
247254

248-
Hash of docinfo fields to copy from old event (found via elasticsearch) into new event
255+
A mapping of docinfo (`_source`) fields to copy into the <<plugins-{type}s-{plugin}-target>> of the current event.
249256

250257
Example:
251258
[source,ruby]
@@ -271,9 +278,11 @@ Whether results should be sorted or not
271278

272279
* Value type is <<array,array>>
273280
* Default value is `{}`
281+
* Format: `"path.in.result" => "[path][on][event]"`:
282+
** `path.in.result`: field path in indexed result from {es}, using dot-notation
283+
** `[path][on][event]`: path for where to place the value on the current event, using field-reference notation
274284

275-
An array of fields to copy from the old event (found via elasticsearch) into the
276-
new event, currently being processed.
285+
A mapping of indexed fields to copy into the <<plugins-{type}s-{plugin}-target>> of the current event.
277286

278287
In the following example, the values of `@timestamp` and `event_id` on the event
279288
found via elasticsearch are copied to the current event's
@@ -521,6 +530,43 @@ WARNING: Setting certificate verification to `none` disables many security benef
521530

522531
Tags the event on failure to look up previous log event information. This can be used in later analysis.
523532

533+
[id="plugins-{type}s-{plugin}-target"]
534+
===== `target`
535+
536+
* Value type is <<string,string>>
537+
* There is no default value for this setting.
538+
539+
Define the target field for placing the result data.
540+
If this setting is omitted, the target will be the root (top level) of the event.
541+
542+
The destination fields specified in <<plugins-{type}s-{plugin}-fields>>, <<plugins-{type}s-{plugin}-aggregation_fields>>, and <<plugins-{type}s-{plugin}-docinfo_fields>> are relative to this target.
543+
544+
For example, if you want the data to be put in the `operation` field:
545+
[source,ruby]
546+
if [type] == "end" {
547+
filter {
548+
query => "type:start AND transaction:%{[transactionId]}"
549+
elasticsearch {
550+
target => "transaction"
551+
fields => {
552+
"@timestamp" => "started"
553+
"transaction_id" => "id"
554+
}
555+
}
556+
}
557+
}
558+
559+
`fields` fields will be expanded into a data structure in the `target` field, overall shape looks like this:
560+
[source,ruby]
561+
{
562+
"transaction" => {
563+
"started" => "2025-04-29T12:01:46.263Z"
564+
"id" => "1234567890"
565+
}
566+
}
567+
568+
NOTE: when writing to a field that already exists on the event, the previous value will be overwritten.
569+
524570
[id="plugins-{type}s-{plugin}-user"]
525571
===== `user`
526572

lib/logstash/filters/elasticsearch.rb

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,20 @@
22
require "logstash/filters/base"
33
require "logstash/namespace"
44
require "logstash/json"
5+
require 'logstash/plugin_mixins/ecs_compatibility_support'
6+
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
57
require 'logstash/plugin_mixins/ca_trusted_fingerprint_support'
68
require "logstash/plugin_mixins/normalize_config_support"
9+
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
710
require "monitor"
811

912
require_relative "elasticsearch/client"
1013

1114
class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
15+
16+
include LogStash::PluginMixins::ECSCompatibilitySupport
17+
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
18+
1219
config_name "elasticsearch"
1320

1421
# List of elasticsearch hosts to use for querying.
@@ -131,6 +138,9 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
131138
# Tags the event on failure to look up geo information. This can be used in later analysis.
132139
config :tag_on_failure, :validate => :array, :default => ["_elasticsearch_lookup_failure"]
133140

141+
# If set, the the result set will be nested under the target field
142+
config :target, :validate => :field_reference
143+
134144
# How many times to retry on failure?
135145
config :retry_on_failure, :validate => :number, :default => 0
136146

@@ -214,25 +224,27 @@ def filter(event)
214224
matched = true
215225
@fields.each do |old_key, new_key|
216226
old_key_path = extract_path(old_key)
217-
set = resultsHits.map do |doc|
227+
extracted_hit_values = resultsHits.map do |doc|
218228
extract_value(doc["_source"], old_key_path)
219229
end
220-
event.set(new_key, set.count > 1 ? set : set.first)
230+
value_to_set = extracted_hit_values.count > 1 ? extracted_hit_values : extracted_hit_values.first
231+
set_to_event_target(event, new_key, value_to_set)
221232
end
222233
@docinfo_fields.each do |old_key, new_key|
223234
old_key_path = extract_path(old_key)
224-
set = resultsHits.map do |doc|
235+
extracted_docs_info = resultsHits.map do |doc|
225236
extract_value(doc, old_key_path)
226237
end
227-
event.set(new_key, set.count > 1 ? set : set.first)
238+
value_to_set = extracted_docs_info.count > 1 ? extracted_docs_info : extracted_docs_info.first
239+
set_to_event_target(event, new_key, value_to_set)
228240
end
229241
end
230242

231243
resultsAggs = results["aggregations"]
232244
if !resultsAggs.nil? && !resultsAggs.empty?
233245
matched = true
234246
@aggregation_fields.each do |agg_name, ls_field|
235-
event.set(ls_field, resultsAggs[agg_name])
247+
set_to_event_target(event, ls_field, resultsAggs[agg_name])
236248
end
237249
end
238250

@@ -265,6 +277,18 @@ def prepare_user_agent
265277

266278
private
267279

280+
# if @target is defined, creates a nested structure to inject result into target field
281+
# if not defined, directly sets to the top-level event field
282+
# @param event [LogStash::Event]
283+
# @param new_key [String] name of the field to set
284+
# @param value_to_set [Array] values to set
285+
# @return [void]
286+
def set_to_event_target(event, new_key, value_to_set)
287+
key_to_set = target ? "[#{target}][#{new_key}]" : new_key
288+
289+
event.set(key_to_set, value_to_set)
290+
end
291+
268292
def client_options
269293
@client_options ||= {
270294
:user => @user,

logstash-filter-elasticsearch.gemspec

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-filter-elasticsearch'
4-
s.version = '3.17.1'
4+
s.version = '3.18.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -23,8 +23,10 @@ Gem::Specification.new do |s|
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
2424
s.add_runtime_dependency 'elasticsearch', ">= 7.14.9", '< 9'
2525
s.add_runtime_dependency 'manticore', ">= 0.7.1"
26+
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.3'
2627
s.add_runtime_dependency 'logstash-mixin-ca_trusted_fingerprint_support', '~> 1.0'
2728
s.add_runtime_dependency 'logstash-mixin-normalize_config_support', '~>1.0'
29+
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'
2830
s.add_development_dependency 'cabin', ['~> 0.6']
2931
s.add_development_dependency 'webrick'
3032
s.add_development_dependency 'logstash-devutils'

spec/filters/elasticsearch_spec.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,33 @@ def wait_receive_request
864864
end
865865
end
866866

867+
describe "#set_to_event_target" do
868+
869+
context "when `@target` is nil, default behavior" do
870+
let(:config) {{ }}
871+
872+
it "sets the value directly to the top-level event field" do
873+
plugin.send(:set_to_event_target, event, "new_field", %w[value1 value2])
874+
expect(event.get("new_field")).to eq(%w[value1 value2])
875+
end
876+
end
877+
878+
context "when @target is defined" do
879+
let(:config) {{ "target" => "nested" }}
880+
881+
it "creates a nested structure under the target field" do
882+
plugin.send(:set_to_event_target, event, "new_field", %w[value1 value2])
883+
expect(event.get("nested")).to eq({ "new_field" => %w[value1 value2] })
884+
end
885+
886+
it "overwrites existing target field with new data" do
887+
event.set("nested", { "existing_field" => "existing_value", "new_field" => "value0" })
888+
plugin.send(:set_to_event_target, event, "new_field", ["value1"])
889+
expect(event.get("nested")).to eq({ "existing_field" => "existing_value", "new_field" => ["value1"] })
890+
end
891+
end
892+
end
893+
867894
def extract_transport(client)
868895
# on 7x: client.transport.transport
869896
# on >=8.x: client.transport

spec/filters/integration/elasticsearch_spec.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@
8484
end
8585

8686
it "fails to register plugin" do
87-
expect { plugin.register }.to raise_error Elasticsearch::Transport::Transport::Errors::Unauthorized
87+
expect { plugin.register }.to raise_error elastic_ruby_v8_client_available? ?
88+
Elastic::Transport::Transport::Errors::Unauthorized :
89+
Elasticsearch::Transport::Transport::Errors::Unauthorized
8890
end
8991

9092
end if ELASTIC_SECURITY_ENABLED
@@ -150,5 +152,10 @@
150152
end
151153
end
152154
end
153-
155+
def elastic_ruby_v8_client_available?
156+
Elasticsearch::Transport
157+
false
158+
rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available
159+
true
160+
end
154161
end

0 commit comments

Comments
 (0)