Skip to content

Commit 35d8d43

Browse files
guyboertjejordansissel
authored andcommitted
fix typo in comment and whitespace
add identity_map_codec and tests tidy up comments and typos remove commented out require "concurrent" update to v2.0.3 and amend changelog closes 10 Fixes #13
1 parent 7dada4e commit 35d8d43

File tree

5 files changed

+467
-5
lines changed

5 files changed

+467
-5
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
## 2.0.3
2+
- Add pseudo codec IdentityMapCodec. Support class for identity based multiline processing.
3+
14
## 2.0.0
2-
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
5+
- Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully,
36
instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895
47
- Dependency on logstash-core update to 2.0
58

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
# encoding: utf-8
2+
require "logstash/namespace"
3+
require "thread_safe"
4+
5+
# This class is a Codec duck type
6+
# Using Composition, it maps from a stream identity to
7+
# a cloned codec instance via the same API as a Codec
8+
# it implements the codec public API
9+
10+
module LogStash module Codecs class IdentityMapCodec
11+
# subclass of Exception, LS has more than limit (20000) active streams
12+
class IdentityMapUpperLimitException < Exception; end
13+
14+
module EightyPercentWarning
15+
extend self
16+
def visit(imc)
17+
current_size, limit = imc.current_size_and_limit
18+
return if current_size < (limit * 0.8)
19+
imc.logger.warn("IdentityMapCodec has reached 80% capacity",
20+
:current_size => current_size, :upper_limit => limit)
21+
end
22+
end
23+
24+
module UpperLimitReached
25+
extend self
26+
def visit(imc)
27+
current_size, limit = imc.current_size_and_limit
28+
return if current_size < limit
29+
# we hit the limit
30+
# try to clean out stale streams
31+
current_size, limit = imc.map_cleanup
32+
return if current_size < limit
33+
# we are still at the limit and all streams are in use
34+
imc.logger.error("IdentityMapCodec has reached 100% capacity",
35+
:current_size => current_size, :upper_limit => limit)
36+
raise IdentityMapUpperLimitException.new
37+
end
38+
end
39+
40+
class MapCleaner
41+
def initialize(imc, interval)
42+
@imc, @interval = imc, interval
43+
@running = false
44+
end
45+
46+
def start
47+
return self if running?
48+
@running = true
49+
@thread = Thread.new(@imc) do |imc|
50+
loop do
51+
sleep @interval
52+
break if !@running
53+
imc.map_cleanup
54+
end
55+
end
56+
self
57+
end
58+
59+
def running?
60+
@running
61+
end
62+
63+
def stop
64+
return if !running?
65+
@running = false
66+
@thread.wakeup
67+
end
68+
end
69+
70+
# A composite class to hold both the codec and the eviction_timeout
71+
# instances of this Value Object are stored in the mapping hash
72+
class CodecValue
73+
attr_reader :codec
74+
attr_accessor :timeout
75+
76+
def initialize(codec)
77+
@codec = codec
78+
end
79+
end
80+
81+
#maximum size of the mapping hash
82+
MAX_IDENTITIES = 20_000
83+
84+
# time after which a stream is
85+
# considered stale
86+
# each time a stream is accessed
87+
# it is given a new timeout
88+
EVICT_TIMEOUT = 60 * 60 * 1 # 1 hour
89+
90+
# time that the cleaner thread sleeps for
91+
# before it tries to clean out stale mappings
92+
CLEANER_INTERVAL = 60 * 5 # 5 minutes
93+
94+
attr_reader :identity_map
95+
attr_accessor :base_codec, :cleaner
96+
97+
def initialize(codec)
98+
@base_codec = codec
99+
@base_codecs = [codec]
100+
@identity_map = ThreadSafe::Hash.new &method(:codec_builder)
101+
@max_identities = MAX_IDENTITIES
102+
@evict_timeout = EVICT_TIMEOUT
103+
@cleaner = MapCleaner.new(self, CLEANER_INTERVAL)
104+
@decode_block = lambda {|*| }
105+
end
106+
107+
# ==============================================
108+
# Constructional/builder methods
109+
# chain this method off of new
110+
#
111+
# used to add a non-default maximum identities
112+
def max_identities(max)
113+
@max_identities = max.to_i
114+
self
115+
end
116+
117+
# used to add a non-default evict timeout
118+
def evict_timeout(timeout)
119+
@evict_timeout = timeout.to_i
120+
self
121+
end
122+
123+
# used to add a non-default cleaner interval
124+
def cleaner_interval(interval)
125+
@cleaner.stop
126+
@cleaner = MapCleaner.new(self, interval.to_i)
127+
self
128+
end
129+
# end Constructional/builder methods
130+
# ==============================================
131+
132+
# ==============================================
133+
# Codec API
134+
def decode(data, identity = nil, &block)
135+
@decode_block = block if @decode_block != block
136+
stream_codec(identity).decode(data, &block)
137+
end
138+
139+
alias_method :<<, :decode
140+
141+
def encode(event, identity = nil)
142+
stream_codec(identity).encode(event)
143+
end
144+
145+
# this method will not be called from
146+
# the input or the pipeline unless
147+
# we implement codec flush on shutdown
148+
# problematic, because we may not have
149+
# received all the multiline parts yet.
150+
# but if we don't flush we will lose data
151+
def flush(&block)
152+
all_codecs.each do |codec|
153+
#let ruby do its default args thing
154+
block.nil? ? codec.flush : codec.flush(&block)
155+
end
156+
end
157+
158+
def close()
159+
cleaner.stop
160+
all_codecs.each(&:close)
161+
end
162+
# end Codec API
163+
# ==============================================
164+
165+
def all_codecs
166+
no_streams? ? @base_codecs : identity_map.values.map(&:codec)
167+
end
168+
169+
def max_limit
170+
@max_identities
171+
end
172+
173+
def identity_count
174+
identity_map.size
175+
end
176+
177+
# support cleaning of stale stream/codecs
178+
# a stream is considered stale if it has not
179+
# been accessed in the last @evict_timeout
180+
# period (default 1 hour)
181+
def map_cleanup
182+
cut_off = Time.now.to_i
183+
# delete_if is atomic
184+
# contents should not mutate during this call
185+
identity_map.delete_if do |identity, compo|
186+
if (flag = compo.timeout <= cut_off)
187+
compo.codec.flush(&@decode_block)
188+
end
189+
flag
190+
end
191+
current_size_and_limit
192+
end
193+
194+
def current_size_and_limit
195+
[identity_count, max_limit]
196+
end
197+
198+
def logger
199+
# we 'borrow' the codec's logger as we don't have our own
200+
@base_codec.logger
201+
end
202+
203+
def codec_without_usage_update(identity)
204+
find_codec_value(identity).codec
205+
end
206+
207+
def eviction_timestamp_for(identity)
208+
find_codec_value(identity).timeout
209+
end
210+
211+
private
212+
213+
def stream_codec(identity)
214+
return base_codec if identity.nil?
215+
record_codec_usage(identity) # returns codec
216+
end
217+
218+
def find_codec_value(identity)
219+
identity_map[identity]
220+
end
221+
222+
# for nil stream this method is not called
223+
def record_codec_usage(identity)
224+
check_map_limits
225+
# only start the cleaner if streams are in use
226+
# continuous calls to start are OK
227+
cleaner.start
228+
compo = find_codec_value(identity)
229+
compo.timeout = eviction_timestamp
230+
compo.codec
231+
end
232+
233+
def eviction_timestamp
234+
Time.now.to_i + @evict_timeout
235+
end
236+
237+
def check_map_limits
238+
UpperLimitReached.visit(self)
239+
EightyPercentWarning.visit(self)
240+
end
241+
242+
def codec_builder(hash, k)
243+
codec = hash.empty? ? @base_codec : @base_codec.clone
244+
compo = CodecValue.new(codec)
245+
hash.store(k, compo)
246+
end
247+
248+
def no_streams?
249+
identity_map.empty?
250+
end
251+
end end end

lib/logstash/codecs/multiline.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def buffer(text)
181181
end
182182

183183
def flush(&block)
184-
if @buffer.any?
184+
if @buffer.any?
185185
yield merge_events
186186
reset_buffer
187187
end
@@ -211,7 +211,7 @@ def do_previous(text, matched, &block)
211211
end
212212

213213
def over_maximun_lines?
214-
@buffer.size > @max_lines
214+
@buffer.size > @max_lines
215215
end
216216

217217
def over_maximun_bytes?
@@ -227,4 +227,4 @@ def encode(event)
227227
@on_event.call(event, event)
228228
end # def encode
229229

230-
end # class LogStash::Codecs::Plain
230+
end # class LogStash::Codecs::Multiline

logstash-codec-multiline.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-codec-multiline'
4-
s.version = '2.0.2'
4+
s.version = '2.0.3'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "The multiline codec will collapse multiline messages and merge them into a single event."
77
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)