diff --git a/lib/logstash/codecs/multiline.rb b/lib/logstash/codecs/multiline.rb index 5a28b90..503b351 100644 --- a/lib/logstash/codecs/multiline.rb +++ b/lib/logstash/codecs/multiline.rb @@ -4,6 +4,10 @@ require "logstash/timestamp" require "logstash/codecs/auto_flush" +require "grok-pure" +require 'logstash/patterns/core' +require "logstash/util/buftok" + # The multiline codec will collapse multiline messages and merge them into a # single event. # @@ -111,6 +115,13 @@ module LogStash module Codecs class Multiline < LogStash::Codecs::Base # This only affects "plain" format logs since JSON is `UTF-8` already. config :charset, :validate => ::Encoding.name_list, :default => "UTF-8" + # Change the delimiter that separates lines + config :delimiter, :validate => :string, :default => "\n" + + # Assume data received from input plugin as line based. For some input plugins + # like stdin or tcp/udp data is not line based and this option should be set to false. + config :line_based_input, :validate => :boolean, :default => true + # Tag multiline events with a given tag. This tag will only be added # to events that actually have multiple lines in them. config :multiline_tag, :validate => :string, :default => "multiline" @@ -132,19 +143,10 @@ module LogStash module Codecs class Multiline < LogStash::Codecs::Base # auto_flush_interval. No default. If unset, no auto_flush. Units: seconds config :auto_flush_interval, :validate => :number - public - def register - require "grok-pure" # rubygem 'jls-grok' - require 'logstash/patterns/core' - - # Detect if we are running from a jarfile, pick the right path. - patterns_path = [] - patterns_path += [LogStash::Patterns::Core.path] - + @tokenizer = FileWatch::BufferedTokenizer.new(@delimiter) @grok = Grok.new - - @patterns_dir = patterns_path.to_a + @patterns_dir + @patterns_dir = [LogStash::Patterns::Core.path] + @patterns_dir @patterns_dir.each do |path| if ::File.directory?(path) path = ::File.join(path, "*") @@ -165,12 +167,67 @@ def register @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger + + # TODO: (colin) I don't really understand this @last_seen_listener poutine. I needed to create + # this lamda to DRY across the close & auto_flush methods to pass the closure to the new + # @tokenizer which I figured needs to be flushed upon close. there does not seem to be + # explicit tests for this closing logic. + # what is not clear here is the initialization of @last_seen_listener which gets initialized + # in the accept method but the close method systematically call auto_flush which assumes the + # existence of @last_seen_listener. this whole logic is confusing and should be either made + # more explicit and self documenting OR documentation should be prodided. + @auto_flush_block = lambda do |event| + @last_seen_listener.process_event(event) + end + if @auto_flush_interval # will start on first decode @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval) end - end # def register + end + + def decode(data, &block) + data = data + @delimiter if @line_based_input + @tokenizer.extract(data.force_encoding("ASCII-8BIT")).each do |line| + match_line(@converter.convert(line), &block) + end + end + + def encode(event) + # Nothing to do. + @on_event.call(event, event) + end + + # this is the termination or final flush API + # + # @param block [Proc] the closure that will be called for all events that need to be flushed. + def flush(&block) + remainder = @tokenizer.flush + match_line(@converter.convert(remainder), &block) unless remainder.empty? + flush_buffered_events(&block) + end + + # TODO: (colin) I believe there is a problem here in calling auto_flush. auto_flush depends on + # @auto_flush_block which references @last_seen_listener which is only initialized in the context of + # IdentityMapCodec which in turn I believe cannot by assumed. the multiline codec could run without + # IdentityMapCodec. + def close + if auto_flush_runner.pending? + #will cancel task if necessary + auto_flush_runner.stop + end + + remainder = @tokenizer.flush + match_line(@converter.convert(remainder), &@auto_flush_block) unless remainder.empty? + + auto_flush + end + + # TODO: (colin) what is the pupose of this accept method? AFAICT it is only used if this codec is used + # within the IdentityMapCodec. it is not clear when & in which context this method is used. + # I believe the codec should still be able to live outside the context of an IdentityMapCodec but there + # are usage of ivars like @last_seen_listener only inititalized int the context of IdentityMapCodec. def accept(listener) # memoize references to listener that holds upstream state @previous_listener = @last_seen_listener || listener @@ -180,45 +237,62 @@ def accept(listener) end end - def decode(text, &block) - text = @converter.convert(text) - text.split("\n").each do |line| - match = @grok.match(line) - @logger.debug("Multiline", :pattern => @pattern, :text => line, - :match => !match.nil?, :negate => @negate) + def buffer(line) + @buffer_bytes += line.bytesize + @buffer.push(line) + end - # Add negate option - match = (match and !@negate) || (!match and @negate) - @handler.call(line, match, &block) - end - end # def decode + def reset_buffer + @buffer = [] + @buffer_bytes = 0 + end + + # TODO: (colin) this method is not clearly documented as being required by the AutoFlush class & tasks. + # I belive there is a problem here with the usage of @auto_flush_block which assumes to be in the + # context of an IdentityMapCodec but the multiline codec could run without IdentityMapCodec + def auto_flush + flush_buffered_events(&@auto_flush_block) + end - def buffer(text) - @buffer_bytes += text.bytesize - @buffer.push(text) + # TODO: (colin) auto_flush_active? doesn't seem to be used anywhere. any reason to keep this api? + def auto_flush_active? + !@auto_flush_interval.nil? end - def flush(&block) + # private + + # merge all currently bufferred events and call the passed block for the resulting merged event + # + # @param block [Proc] the closure that will be called for the resulting merged event + def flush_buffered_events(&block) if block_given? && @buffer.any? no_error = true - events = merge_events + event = merge_events begin - yield events + yield event rescue ::Exception => e # need to rescue everything # likliest cause: backpressure or timeout by exception # can't really do anything but leave the data in the buffer for next time if there is one - @logger.error("Multiline: flush downstream error", :exception => e) + @logger.error("Multiline: buffered events flush downstream error", :exception => e) no_error = false end reset_buffer if no_error end end - def auto_flush - flush do |event| - @last_seen_listener.process_event(event) - end + # evalutate if a given line matches the configured pattern and call the appropriate do_next or do_previous + # handler given the match state. + # + # @param line [String] the string to match against the pattern + # @param block [Proc] the closure that will be called for each event that might result after processing this line + def match_line(line, &block) + match = @grok.match(line) + @logger.debug? && @logger.debug("Multiline", :pattern => @pattern, :line => line, :match => !match.nil?, :negate => @negate) + + # Add negate option + match = (match and !@negate) || (!match and @negate) + @handler.call(line, match, &block) end def merge_events @@ -229,11 +303,6 @@ def merge_events event end - def reset_buffer - @buffer = [] - @buffer_bytes = 0 - end - def doing_previous? @what == "previous" end @@ -242,16 +311,16 @@ def what_based_listener doing_previous? ? @previous_listener : @last_seen_listener end - def do_next(text, matched, &block) - buffer(text) + def do_next(line, matched, &block) + buffer(line) auto_flush_runner.start - flush(&block) if !matched || buffer_over_limits? + flush_buffered_events(&block) if !matched || buffer_over_limits? end - def do_previous(text, matched, &block) - flush(&block) if !matched || buffer_over_limits? + def do_previous(line, matched, &block) + flush_buffered_events(&block) if !matched || buffer_over_limits? auto_flush_runner.start - buffer(text) + buffer(line) end def over_maximum_lines? @@ -266,24 +335,7 @@ def buffer_over_limits? over_maximum_lines? || over_maximum_bytes? end - def encode(event) - # Nothing to do. - @on_event.call(event, event) - end # def encode - - def close - if auto_flush_runner.pending? - #will cancel task if necessary - auto_flush_runner.stop - end - auto_flush - end - - def auto_flush_active? - !@auto_flush_interval.nil? - end - def auto_flush_runner @auto_flush_runner || AutoFlushUnset.new(nil, nil) end -end end end # class LogStash::Codecs::Multiline +end end end diff --git a/spec/codecs/multiline_spec.rb b/spec/codecs/multiline_spec.rb index a046f49..5058743 100644 --- a/spec/codecs/multiline_spec.rb +++ b/spec/codecs/multiline_spec.rb @@ -58,6 +58,22 @@ expect(events[1]["message"]).to eq "0987654321" end + it "should handle message continuation across decode calls (i.e. use buftok)" do + config.update( + "pattern" => '\D', + "what" => "previous", + "line_based_input" => false, + ) + lineio = StringIO.new("1234567890\nA234567890\nB234567890\n0987654321\n") + until lineio.eof + line = lineio.read(5) + codec.decode(line) {|evt| events.push(evt)} + end + codec.flush { |e| events << e } + expect(events[0]["message"]).to eq "1234567890\nA234567890\nB234567890" + expect(events[1]["message"]).to eq "0987654321" + end + it "should allow grok patterns to be used" do config.update( "pattern" => "^%{NUMBER} %{TIME}", @@ -220,9 +236,11 @@ let(:codec) { Mlc::MultilineRspec.new(config).tap {|c| c.register} } let(:events) { [] } let(:lines) do - { "en.log" => ["hello world", " second line", " third line"], + { + "en.log" => ["hello world", " second line", " third line"], "fr.log" => ["Salut le Monde", " deuxième ligne", " troisième ligne"], - "de.log" => ["Hallo Welt"] } + "de.log" => ["Hallo Welt"] + } end let(:listener_class) { Mlc::LineListener } let(:auto_flush_interval) { 0.5 } @@ -252,13 +270,17 @@ let(:listener_class) { Mlc::LineErrorListener } it "does not build any events, logs an error and the buffer data remains" do - config.update("pattern" => "^\\s", "what" => "previous", - "auto_flush_interval" => auto_flush_interval) + config.update( + "pattern" => "^\\s", + "what" => "previous", + "auto_flush_interval" => auto_flush_interval + ) + codec.logger = Mlc::MultilineLogTracer.new line_producer.call("en.log") sleep(auto_flush_interval + 0.1) msg, args = codec.logger.trace_for(:error) - expect(msg).to eq("Multiline: flush downstream error") + expect(msg).to eq("Multiline: buffered events flush downstream error") expect(args[:exception].message).to eq(errmsg) expect(events.size).to eq(0) expect(codec.buffer_size).to eq(3) @@ -274,8 +296,11 @@ def assert_produced_events(key, sleeping) context "mode: previous, when there are pauses between multiline file writes" do it "auto-flushes events from the accumulated lines to the queue" do - config.update("pattern" => "^\\s", "what" => "previous", - "auto_flush_interval" => auto_flush_interval) + config.update( + "pattern" => "^\\s", + "what" => "previous", + "auto_flush_interval" => auto_flush_interval + ) assert_produced_events("en.log", auto_flush_interval + 0.1) do expect(events[0]).to match_path_and_line("en.log", lines["en.log"])