Skip to content

Commit 87e8d67

Browse files
authored
Feat: ECS support + review dependencies (#20)
The plugin is setting `host` on each event (as host's name). In ECS mode we expect to use `host.hostname` and we're also set `event.original` to be a good ECS citizen. Besides concurrent dependency was dropped (hasn't been used),
1 parent 15d5943 commit 87e8d67

File tree

5 files changed

+137
-22
lines changed

5 files changed

+137
-22
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.3.0
2+
- Feat: ECS support + review dependencies [#20](https://github.com/logstash-plugins/logstash-input-stdin/pull/20)
3+
14
## 3.2.6
25
- Docs: Set the default_codec doc attribute.
36

docs/index.asciidoc

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,38 @@ want to join lines, you'll want to use the multiline codec.
2929
[id="plugins-{type}s-{plugin}-options"]
3030
==== Stdin Input Configuration Options
3131

32-
There are no special configuration options for this plugin,
33-
but it does support the <<plugins-{type}s-{plugin}-common-options>>.
32+
This plugin supports the following configuration options.
33+
34+
[cols="<,<,<",options="header",]
35+
|=======================================================================
36+
|Setting |Input type|Required
37+
| <<plugins-{type}s-{plugin}-ecs_compatibility>> | <<string,string>>|No
38+
|=======================================================================
39+
40+
Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
41+
input plugins.
42+
43+
44+
&nbsp;
45+
46+
[id="plugins-{type}s-{plugin}-ecs_compatibility"]
47+
===== `ecs_compatibility`
48+
49+
* Value type is <<string,string>>
50+
* Supported values are:
51+
** `disabled`: does not use ECS-compatible field names (using `host` field to store host name)
52+
** `v1`: uses fields that are compatible with Elastic Common Schema (using `[host][hostname]`)
53+
* Default value depends on which version of Logstash is running:
54+
** When Logstash provides a `pipeline.ecs_compatibility` setting, its value is used as the default
55+
** Otherwise, the default value is `disabled`.
56+
57+
Controls this plugin's compatibility with the
58+
{ecs-ref}[Elastic Common Schema (ECS)].
59+
60+
61+
3462

3563
[id="plugins-{type}s-{plugin}-common-options"]
3664
include::{include_path}/{type}.asciidoc[]
3765

38-
:default_codec!:
66+
:default_codec!:

lib/logstash/inputs/stdin.rb

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# encoding: utf-8
22
require "logstash/inputs/base"
33
require "logstash/namespace"
4-
require "concurrent/atomics"
4+
require 'logstash/plugin_mixins/ecs_compatibility_support'
55
require "socket" # for Socket.gethostname
66
require "jruby-stdin-channel"
77

@@ -10,12 +10,27 @@
1010
# By default, each event is assumed to be one line. If you
1111
# want to join lines, you'll want to use the multiline codec.
1212
class LogStash::Inputs::Stdin < LogStash::Inputs::Base
13+
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1)
14+
1315
config_name "stdin"
1416

1517
default :codec, "line"
1618

1719
READ_SIZE = 16384
1820

21+
# When a configuration is using this plugin
22+
# We are defining a blocking pipeline which cannot be reloaded
23+
def self.reloadable?
24+
false
25+
end
26+
27+
def initialize(*params)
28+
super
29+
30+
@host_key = ecs_select[disabled: 'host', v1: '[host][hostname]']
31+
@event_original_key = ecs_select[disabled: nil, v1: '[event][original]']
32+
end
33+
1934
def register
2035
begin
2136
@stdin = StdinChannel::Reader.new
@@ -35,23 +50,24 @@ def run(queue)
3550
puts "The stdin plugin is now waiting for input:" if $stdin.tty?
3651
while !stop?
3752
if data = stdin_read
38-
@codec.decode(data) do |event|
39-
decorate(event)
40-
event.set("host", @host) if !event.include?("host")
41-
queue << event
42-
end
53+
process(data, queue)
4354
end
4455
end
4556
end
4657

47-
# When a configuration is using this plugin
48-
# We are defining a blocking pipeline which cannot be reloaded
49-
def self.reloadable?
50-
false
51-
end
52-
5358
private
5459

60+
def process(data, queue)
61+
@codec.decode(data) do |event|
62+
decorate(event)
63+
if @event_original_key && !event.include?(@event_original_key)
64+
event.set(@event_original_key, data)
65+
end
66+
event.set(@host_key, @host) if !event.include?(@host_key)
67+
queue << event
68+
end
69+
end
70+
5571
def default_stop
5672
$stdin.close rescue nil
5773
end

logstash-input-stdin.gemspec

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-stdin'
4-
s.version = '3.2.6'
4+
s.version = '3.3.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads events from standard input"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -21,13 +21,12 @@ Gem::Specification.new do |s|
2121

2222
# Gem dependencies
2323
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
24+
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.1'
2425
s.add_runtime_dependency "logstash-codec-line"
25-
s.add_runtime_dependency "concurrent-ruby"
2626
s.add_runtime_dependency "jruby-stdin-channel"
2727

2828
s.add_development_dependency "logstash-codec-plain"
2929
s.add_development_dependency "logstash-codec-json"
3030
s.add_development_dependency "logstash-codec-json_lines"
3131
s.add_development_dependency "logstash-devutils"
32-
s.add_development_dependency "insist"
3332
end

spec/inputs/stdin_spec.rb

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# encoding: utf-8
22
require "logstash/devutils/rspec/spec_helper"
3-
require "insist"
4-
require "socket"
3+
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'
54
require "logstash/inputs/stdin"
65

76
describe LogStash::Inputs::Stdin do
@@ -19,15 +18,85 @@
1918
require "logstash/codecs/line"
2019
plugin = LogStash::Inputs::Stdin.new("codec" => LogStash::Codecs::Plain.new)
2120
plugin.register
22-
insist { plugin.codec }.is_a?(LogStash::Codecs::Line)
21+
expect( plugin.codec ).is_a?(LogStash::Codecs::Line)
2322
end
2423

2524
it "switches from json to json_lines" do
2625
require "logstash/codecs/json"
2726
require "logstash/codecs/json_lines"
2827
plugin = LogStash::Inputs::Stdin.new("codec" => LogStash::Codecs::JSON.new)
2928
plugin.register
30-
insist { plugin.codec }.is_a?(LogStash::Codecs::JSONLines)
29+
expect( plugin.codec ).is_a?(LogStash::Codecs::JSONLines)
3130
end
3231
end
32+
33+
context 'ECS behavior', :ecs_compatibility_support do
34+
35+
subject { LogStash::Inputs::Stdin.new }
36+
37+
ecs_compatibility_matrix(:v1) do
38+
39+
before(:each) do
40+
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
41+
42+
subject.register
43+
44+
subject.send :process, stdin_data, queue
45+
46+
expect( queue.size ).to eql 1
47+
end
48+
49+
let(:queue) { Queue.new }
50+
51+
let(:stdin_data) { "a foo bar\n" }
52+
53+
after { subject.close }
54+
55+
it "sets message" do
56+
event = queue.pop
57+
expect( event.get('message') ).to eql 'a foo bar'
58+
end
59+
60+
it "sets hostname" do
61+
event = queue.pop
62+
expect( event.get('host') ).to eql 'hostname' => `hostname`.strip
63+
end
64+
65+
it "sets event.original" do
66+
event = queue.pop
67+
expect( event.get('event') ).to eql 'original' => stdin_data
68+
end
69+
70+
end
71+
end
72+
73+
context 'ECS disabled' do
74+
75+
subject { LogStash::Inputs::Stdin.new('ecs_compatibility' => 'disabled') }
76+
77+
before(:each) do
78+
subject.register
79+
80+
subject.send :process, stdin_data, queue
81+
82+
expect( queue.size ).to eql 1
83+
end
84+
85+
let(:queue) { Queue.new }
86+
87+
let(:stdin_data) { "a bar foo\n" }
88+
89+
after { subject.close }
90+
91+
it "sets message" do
92+
event = queue.pop
93+
expect( event.get('message') ).to eql 'a bar foo'
94+
end
95+
96+
it "sets hostname" do
97+
event = queue.pop
98+
expect( event.get('host') ).to eql `hostname`.strip
99+
end
100+
101+
end
33102
end

0 commit comments

Comments
 (0)