Skip to content

Commit 5abbe49

Browse files
mashhursyaauie
andauthored
Add a target field support. (#196)
* Introduces a target field as a field reference (mixin validated) where if set result is placed into the target. * Simplifies the set extracted values to the event with target logic. Applies setting to target with aggregations similarly with es-input. * Mention to target in each fields which can be placed in the target. Docs info fields are placed in target field. --------- Co-authored-by: Rye Biesemeyer <yaauie@users.noreply.github.com>
1 parent 244c691 commit 5abbe49

File tree

6 files changed

+121
-12
lines changed

6 files changed

+121
-12
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.2.0
2+
- Add `target` configuration option to store the result into it [#196](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/196)
3+
14
## 4.1.1
25
- Add elastic-transport client support used in elasticsearch-ruby 8.x [#191](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/191)
36

docs/index.asciidoc

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ NOTE: As of version `4.0.0` of this plugin, a number of previously deprecated se
162162
| <<plugins-{type}s-{plugin}-ssl_truststore_type>> |<<string,string>>|No
163163
| <<plugins-{type}s-{plugin}-ssl_verification_mode>> |<<string,string>>, one of `["full", "none"]`|No
164164
| <<plugins-{type}s-{plugin}-tag_on_failure>> |<<array,array>>|No
165+
| <<plugins-{type}s-{plugin}-target>> |<<string,string>>|No
165166
| <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
166167
|=======================================================================
167168

@@ -175,8 +176,11 @@ filter plugins.
175176

176177
* Value type is <<hash,hash>>
177178
* Default value is `{}`
179+
* Format: `"aggregation_name" => "[path][on][event]"`:
180+
** `aggregation_name`: aggregation name in result from {es}
181+
** `[path][on][event]`: path for where to place the value on the current event, using field-reference notation
178182

179-
Hash of aggregation names to copy from elasticsearch response into Logstash event fields
183+
A mapping of aggregations to copy into the <<plugins-{type}s-{plugin}-target>> of the current event.
180184

181185
Example:
182186
[source,ruby]
@@ -246,8 +250,11 @@ These custom headers will override any headers previously set by the plugin such
246250

247251
* Value type is <<hash,hash>>
248252
* Default value is `{}`
253+
* Format: `"path.in.source" => "[path][on][event]"`:
254+
** `path.in.source`: field path in document source of result from {es}, using dot-notation
255+
** `[path][on][event]`: path for where to place the value on the current event, using field-reference notation
249256

250-
Hash of docinfo fields to copy from old event (found via elasticsearch) into new event
257+
A mapping of docinfo (`_source`) fields to copy into the <<plugins-{type}s-{plugin}-target>> of the current event.
251258

252259
Example:
253260
[source,ruby]
@@ -273,9 +280,11 @@ Whether results should be sorted or not
273280

274281
* Value type is <<array,array>>
275282
* Default value is `{}`
283+
* Format: `"path.in.result" => "[path][on][event]"`:
284+
** `path.in.result`: field path in indexed result from {es}, using dot-notation
285+
** `[path][on][event]`: path for where to place the value on the current event, using field-reference notation
276286

277-
An array of fields to copy from the old event (found via elasticsearch) into the
278-
new event, currently being processed.
287+
A mapping of indexed fields to copy into the <<plugins-{type}s-{plugin}-target>> of the current event.
279288

280289
In the following example, the values of `@timestamp` and `event_id` on the event
281290
found via elasticsearch are copied to the current event's
@@ -523,6 +532,43 @@ WARNING: Setting certificate verification to `none` disables many security benef
523532

524533
Tags the event on failure to look up previous log event information. This can be used in later analysis.
525534

535+
[id="plugins-{type}s-{plugin}-target"]
536+
===== `target`
537+
538+
* Value type is <<string,string>>
539+
* There is no default value for this setting.
540+
541+
Define the target field for placing the result data.
542+
If this setting is omitted, the target will be the root (top level) of the event.
543+
544+
The destination fields specified in <<plugins-{type}s-{plugin}-fields>>, <<plugins-{type}s-{plugin}-aggregation_fields>>, and <<plugins-{type}s-{plugin}-docinfo_fields>> are relative to this target.
545+
546+
For example, if you want the data to be put in the `operation` field:
547+
[source,ruby]
548+
if [type] == "end" {
549+
filter {
550+
query => "type:start AND transaction:%{[transactionId]}"
551+
elasticsearch {
552+
target => "transaction"
553+
fields => {
554+
"@timestamp" => "started"
555+
"transaction_id" => "id"
556+
}
557+
}
558+
}
559+
}
560+
561+
`fields` fields will be expanded into a data structure in the `target` field, overall shape looks like this:
562+
[source,ruby]
563+
{
564+
"transaction" => {
565+
"started" => "2025-04-29T12:01:46.263Z"
566+
"id" => "1234567890"
567+
}
568+
}
569+
570+
NOTE: when writing to a field that already exists on the event, the previous value will be overwritten.
571+
526572
[id="plugins-{type}s-{plugin}-user"]
527573
===== `user`
528574

lib/logstash/filters/elasticsearch.rb

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,19 @@
22
require "logstash/filters/base"
33
require "logstash/namespace"
44
require "logstash/json"
5+
require 'logstash/plugin_mixins/ecs_compatibility_support'
6+
require 'logstash/plugin_mixins/ecs_compatibility_support/target_check'
57
require 'logstash/plugin_mixins/ca_trusted_fingerprint_support'
8+
require 'logstash/plugin_mixins/validator_support/field_reference_validation_adapter'
69
require "monitor"
710

811
require_relative "elasticsearch/client"
912

1013
class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
14+
15+
include LogStash::PluginMixins::ECSCompatibilitySupport
16+
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
17+
1118
config_name "elasticsearch"
1219

1320
# List of elasticsearch hosts to use for querying.
@@ -118,6 +125,9 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
118125
# Tags the event on failure to look up geo information. This can be used in later analysis.
119126
config :tag_on_failure, :validate => :array, :default => ["_elasticsearch_lookup_failure"]
120127

128+
# If set, the the result set will be nested under the target field
129+
config :target, :validate => :field_reference
130+
121131
# How many times to retry on failure?
122132
config :retry_on_failure, :validate => :number, :default => 0
123133

@@ -205,25 +215,27 @@ def filter(event)
205215
matched = true
206216
@fields.each do |old_key, new_key|
207217
old_key_path = extract_path(old_key)
208-
set = resultsHits.map do |doc|
218+
extracted_hit_values = resultsHits.map do |doc|
209219
extract_value(doc["_source"], old_key_path)
210220
end
211-
event.set(new_key, set.count > 1 ? set : set.first)
221+
value_to_set = extracted_hit_values.count > 1 ? extracted_hit_values : extracted_hit_values.first
222+
set_to_event_target(event, new_key, value_to_set)
212223
end
213224
@docinfo_fields.each do |old_key, new_key|
214225
old_key_path = extract_path(old_key)
215-
set = resultsHits.map do |doc|
226+
extracted_docs_info = resultsHits.map do |doc|
216227
extract_value(doc, old_key_path)
217228
end
218-
event.set(new_key, set.count > 1 ? set : set.first)
229+
value_to_set = extracted_docs_info.count > 1 ? extracted_docs_info : extracted_docs_info.first
230+
set_to_event_target(event, new_key, value_to_set)
219231
end
220232
end
221233

222234
resultsAggs = results["aggregations"]
223235
if !resultsAggs.nil? && !resultsAggs.empty?
224236
matched = true
225237
@aggregation_fields.each do |agg_name, ls_field|
226-
event.set(ls_field, resultsAggs[agg_name])
238+
set_to_event_target(event, ls_field, resultsAggs[agg_name])
227239
end
228240
end
229241

@@ -256,6 +268,18 @@ def prepare_user_agent
256268

257269
private
258270

271+
# if @target is defined, creates a nested structure to inject result into target field
272+
# if not defined, directly sets to the top-level event field
273+
# @param event [LogStash::Event]
274+
# @param new_key [String] name of the field to set
275+
# @param value_to_set [Array] values to set
276+
# @return [void]
277+
def set_to_event_target(event, new_key, value_to_set)
278+
key_to_set = target ? "[#{target}][#{new_key}]" : new_key
279+
280+
event.set(key_to_set, value_to_set)
281+
end
282+
259283
def client_options
260284
@client_options ||= {
261285
:user => @user,

logstash-filter-elasticsearch.gemspec

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-filter-elasticsearch'
4-
s.version = '4.1.1'
4+
s.version = '4.2.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -23,7 +23,9 @@ Gem::Specification.new do |s|
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
2424
s.add_runtime_dependency 'elasticsearch', ">= 7.14.9", '< 9'
2525
s.add_runtime_dependency 'manticore', ">= 0.7.1"
26+
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.3'
2627
s.add_runtime_dependency 'logstash-mixin-ca_trusted_fingerprint_support', '~> 1.0'
28+
s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0'
2729
s.add_development_dependency 'cabin', ['~> 0.6']
2830
s.add_development_dependency 'webrick'
2931
s.add_development_dependency 'logstash-devutils'

spec/filters/elasticsearch_spec.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,33 @@ def wait_receive_request
864864
end
865865
end
866866

867+
describe "#set_to_event_target" do
868+
869+
context "when `@target` is nil, default behavior" do
870+
let(:config) {{ }}
871+
872+
it "sets the value directly to the top-level event field" do
873+
plugin.send(:set_to_event_target, event, "new_field", %w[value1 value2])
874+
expect(event.get("new_field")).to eq(%w[value1 value2])
875+
end
876+
end
877+
878+
context "when @target is defined" do
879+
let(:config) {{ "target" => "nested" }}
880+
881+
it "creates a nested structure under the target field" do
882+
plugin.send(:set_to_event_target, event, "new_field", %w[value1 value2])
883+
expect(event.get("nested")).to eq({ "new_field" => %w[value1 value2] })
884+
end
885+
886+
it "overwrites existing target field with new data" do
887+
event.set("nested", { "existing_field" => "existing_value", "new_field" => "value0" })
888+
plugin.send(:set_to_event_target, event, "new_field", ["value1"])
889+
expect(event.get("nested")).to eq({ "existing_field" => "existing_value", "new_field" => ["value1"] })
890+
end
891+
end
892+
end
893+
867894
def extract_transport(client)
868895
# on 7x: client.transport.transport
869896
# on >=8.x: client.transport

spec/filters/integration/elasticsearch_spec.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@
8484
end
8585

8686
it "fails to register plugin" do
87-
expect { plugin.register }.to raise_error Elasticsearch::Transport::Transport::Errors::Unauthorized
87+
expect { plugin.register }.to raise_error elastic_ruby_v8_client_available? ?
88+
Elastic::Transport::Transport::Errors::Unauthorized :
89+
Elasticsearch::Transport::Transport::Errors::Unauthorized
8890
end
8991

9092
end if ELASTIC_SECURITY_ENABLED
@@ -150,5 +152,10 @@
150152
end
151153
end
152154
end
153-
155+
def elastic_ruby_v8_client_available?
156+
Elasticsearch::Transport
157+
false
158+
rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available
159+
true
160+
end
154161
end

0 commit comments

Comments
 (0)