From 314b2c5aa5f3fa2ec5a8565d56ea174a126a9230 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Wed, 6 May 2020 19:03:40 -0400 Subject: [PATCH 1/6] support decoding multiple lines --- lib/logstash/codecs/csv.rb | 58 +++++++++++++++++++++++++------------- spec/codecs/csv_spec.rb | 44 ++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 33 deletions(-) diff --git a/lib/logstash/codecs/csv.rb b/lib/logstash/codecs/csv.rb index 07d6416..6c92139 100644 --- a/lib/logstash/codecs/csv.rb +++ b/lib/logstash/codecs/csv.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/codecs/base" require "logstash/util/charset" +require "logstash/util/buftok" require "csv" class LogStash::Codecs::CSV < LogStash::Codecs::Base @@ -42,6 +43,9 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base # Defaults to false. config :autodetect_column_names, :validate => :boolean, :default => false + # Define the line delimiter + config :delimiter, :validate => :string, :default => "\n" + # Define a set of datatype conversions to be applied to columns. # Possible conversions are integer, float, date, date_time, boolean # @@ -88,6 +92,7 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base CONVERTERS.freeze def register + @buffer = FileWatch::BufferedTokenizer.new(@delimiter) @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger @@ -108,10 +113,38 @@ def register @logger.debug? && @logger.debug("CSV parsing options", :col_sep => @separator, :quote_char => @quote_char) end - def decode(data) - data = @converter.convert(data) + def decode(data, &block) + @buffer.extract(data).each do |line| + parse(@converter.convert(line), &block) + end + end + + def encode(event) + if @include_headers + csv_data = CSV.generate_line(select_keys(event), :col_sep => @separator, :quote_char => @quote_char, :headers => true) + @on_event.call(event, csv_data) + + # output headers only once per codec lifecycle + @include_headers = false + end + + csv_data = CSV.generate_line(select_values(event), :col_sep => @separator, :quote_char => @quote_char) + @on_event.call(event, csv_data) + end + + def flush(&block) + remainder = @buffer.flush + if !remainder.empty? + parse(@converter.convert(remainder), &block) + end + end + + private + + def parse(line, &block) begin - values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char) + values = CSV.parse_line(line, :col_sep => @separator, :quote_char => @quote_char) + return if values.nil? if (@autodetect_column_names && @columns.empty?) @columns = values @@ -131,26 +164,11 @@ def decode(data) yield LogStash::Event.new(decoded) rescue CSV::MalformedCSVError => e - @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data) - yield LogStash::Event.new("message" => data, "tags" => ["_csvparsefailure"]) - end - end - - def encode(event) - if @include_headers - csv_data = CSV.generate_line(select_keys(event), :col_sep => @separator, :quote_char => @quote_char, :headers => true) - @on_event.call(event, csv_data) - - # output headers only once per codec lifecycle - @include_headers = false + @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => line) + yield LogStash::Event.new("message" => line, "tags" => ["_csvparsefailure"]) end - - csv_data = CSV.generate_line(select_values(event), :col_sep => @separator, :quote_char => @quote_char) - @on_event.call(event, csv_data) end - private - def select_values(event) if @columns.empty? event.to_hash.values diff --git a/spec/codecs/csv_spec.rb b/spec/codecs/csv_spec.rb index 898b3a7..c385b8c 100644 --- a/spec/codecs/csv_spec.rb +++ b/spec/codecs/csv_spec.rb @@ -22,6 +22,32 @@ end end + describe "multiple lines" do + let(:data) { "big,bird\nsesame,street\nfoo,bar\n" } + + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(3) + expect(events[0].get("column1")).to eq("big") + expect(events[0].get("column2")).to eq("bird") + expect(events[1].get("column1")).to eq("sesame") + expect(events[1].get("column2")).to eq("street") + expect(events[2].get("column1")).to eq("foo") + expect(events[2].get("column2")).to eq("bar") + end + end + + describe "empty lines" do + let(:data) { "big,bird\n\n\nsesame,street\nfoo,bar\n\n\n" } + + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(3) + end + end + describe "given column names" do let(:doc) { "big,bird,sesame street" } let(:config) do @@ -37,9 +63,7 @@ end context "parse csv skipping empty columns" do - let(:data) { "val1,,val3" } - let(:config) do { "skip_empty_columns" => true, "columns" => ["custom1", "custom2", "custom3"] } @@ -55,11 +79,12 @@ end context "parse csv without autogeneration of names" do - let(:data) { "val1,val2,val3" } let(:config) do - { "autogenerate_column_names" => false, - "columns" => ["custom1", "custom2"] } + { + "autogenerate_column_names" => false, + "columns" => ["custom1", "custom2"] + } end it "extract all the values" do @@ -70,12 +95,10 @@ end end end - end describe "custom separator" do let(:data) { "big,bird;sesame street" } - let(:config) do { "separator" => ";" } end @@ -92,7 +115,7 @@ let(:data) { "big,bird,'sesame street'" } let(:config) do - { "quote_char" => "'"} + { "quote_char" => "'" } end it "return an event from CSV data" do @@ -133,15 +156,12 @@ end describe "having headers" do - let(:data) do [ "size,animal,movie", "big,bird,sesame street"] end - let(:new_data) do [ "host,country,city", "example.com,germany,berlin"] end - let(:config) do { "autodetect_column_names" => true } end @@ -157,7 +177,6 @@ end describe "using field convertion" do - let(:config) do { "convert" => { "column1" => "integer", "column3" => "boolean" } } end @@ -172,7 +191,6 @@ end context "when using column names" do - let(:config) do { "convert" => { "custom1" => "integer", "custom3" => "boolean" }, "columns" => ["custom1", "custom2", "custom3"] } From 5063f9566279ca6bca3434b101880dff339898d7 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Wed, 6 May 2020 19:08:33 -0400 Subject: [PATCH 2/6] add docs --- docs/index.asciidoc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index a2aba1e..67eb268 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -33,6 +33,7 @@ The csv codec takes CSV data, parses it and passes it along. | <> |<>, one of `["ASCII-8BIT", "UTF-8", "US-ASCII", "Big5", "Big5-HKSCS", "Big5-UAO", "CP949", "Emacs-Mule", "EUC-JP", "EUC-KR", "EUC-TW", "GB2312", "GB18030", "GBK", "ISO-8859-1", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "KOI8-R", "KOI8-U", "Shift_JIS", "UTF-16BE", "UTF-16LE", "UTF-32BE", "UTF-32LE", "Windows-31J", "Windows-1250", "Windows-1251", "Windows-1252", "IBM437", "IBM737", "IBM775", "CP850", "IBM852", "CP852", "IBM855", "CP855", "IBM857", "IBM860", "IBM861", "IBM862", "IBM863", "IBM864", "IBM865", "IBM866", "IBM869", "Windows-1258", "GB1988", "macCentEuro", "macCroatian", "macCyrillic", "macGreek", "macIceland", "macRoman", "macRomania", "macThai", "macTurkish", "macUkraine", "CP950", "CP951", "IBM037", "stateless-ISO-2022-JP", "eucJP-ms", "CP51932", "EUC-JIS-2004", "GB12345", "ISO-2022-JP", "ISO-2022-JP-2", "CP50220", "CP50221", "Windows-1256", "Windows-1253", "Windows-1255", "Windows-1254", "TIS-620", "Windows-874", "Windows-1257", "MacJapanese", "UTF-7", "UTF8-MAC", "UTF-16", "UTF-32", "UTF8-DoCoMo", "SJIS-DoCoMo", "UTF8-KDDI", "SJIS-KDDI", "ISO-2022-JP-KDDI", "stateless-ISO-2022-JP-KDDI", "UTF8-SoftBank", "SJIS-SoftBank", "BINARY", "CP437", "CP737", "CP775", "IBM850", "CP857", "CP860", "CP861", "CP862", "CP863", "CP864", "CP865", "CP866", "CP869", "CP1258", "Big5-HKSCS:2008", "ebcdic-cp-us", "eucJP", "euc-jp-ms", "EUC-JISX0213", "eucKR", "eucTW", "EUC-CN", "eucCN", "CP936", "ISO2022-JP", "ISO2022-JP2", "ISO8859-1", "ISO8859-2", "ISO8859-3", "ISO8859-4", "ISO8859-5", "ISO8859-6", "CP1256", "ISO8859-7", "CP1253", "ISO8859-8", "CP1255", "ISO8859-9", "CP1254", "ISO8859-10", "ISO8859-11", "CP874", "ISO8859-13", "CP1257", "ISO8859-14", "ISO8859-15", "ISO8859-16", "CP878", "MacJapan", "ASCII", "ANSI_X3.4-1968", "646", "CP65000", "CP65001", "UTF-8-MAC", "UTF-8-HFS", "UCS-2BE", "UCS-4BE", "UCS-4LE", "CP932", "csWindows31J", "SJIS", "PCK", "CP1250", "CP1251", "CP1252", "external", "locale"]`|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -102,6 +103,14 @@ Possible conversions are: `integer`, `float`, `date`, `date_time`, `boolean` } } +[id="plugins-{type}s-{plugin}-delimiter"] +===== `delimiter` + + * Value type is <> + * Default value is `"\n"` + +Define the line delimiter. + [id="plugins-{type}s-{plugin}-include_headers"] ===== `include_headers` From cfeb539695e1f9cae2a77c3276b1abebe7a45ffa Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Wed, 6 May 2020 19:13:56 -0400 Subject: [PATCH 3/6] bump version to 1.0.1 --- CHANGELOG.md | 2 ++ logstash-codec-csv.gemspec | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7b229f..2166042 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +## 1.0.1 + - Fixed parsing of line delimited data [#8](https://github.com/logstash-plugins/logstash-codec-csv/pull/8) ## 1.0.0 - Fixed dependencies to work with logstash v6 and up. Overhauled to match features of the CSV Filter. Improved spec coverage [#4](https://github.com/logstash-plugins/logstash-codec-csv/pull/4) ## 0.1.5 diff --git a/logstash-codec-csv.gemspec b/logstash-codec-csv.gemspec index 6418a21..6c86eda 100644 --- a/logstash-codec-csv.gemspec +++ b/logstash-codec-csv.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-codec-csv' - s.version = '1.0.0' + s.version = '1.0.1' s.licenses = ['Apache License (2.0)'] s.summary = "The csv codec take CSV data, parses it and passes it away" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 7ea51e5015ab0309cf455e5fb40da9ae6f3003de Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Thu, 7 May 2020 12:43:15 -0400 Subject: [PATCH 4/6] flush spec --- lib/logstash/codecs/csv.rb | 3 +++ spec/codecs/csv_spec.rb | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/lib/logstash/codecs/csv.rb b/lib/logstash/codecs/csv.rb index 6c92139..a45f9fb 100644 --- a/lib/logstash/codecs/csv.rb +++ b/lib/logstash/codecs/csv.rb @@ -114,7 +114,9 @@ def register end def decode(data, &block) + puts("*** data=#{data.inspect}") @buffer.extract(data).each do |line| + puts("*** line=#{line.inspect}") parse(@converter.convert(line), &block) end end @@ -135,6 +137,7 @@ def encode(event) def flush(&block) remainder = @buffer.flush if !remainder.empty? + puts("flush: remainder=#{remainder}") parse(@converter.convert(remainder), &block) end end diff --git a/spec/codecs/csv_spec.rb b/spec/codecs/csv_spec.rb index c385b8c..fcd20df 100644 --- a/spec/codecs/csv_spec.rb +++ b/spec/codecs/csv_spec.rb @@ -48,6 +48,18 @@ end end + describe "flush" do + let(:data) { "big,bird\nsesame,street" } + + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(1) + codec.flush {|event| events << event} + expect(events.size).to eq(2) + end + end + describe "given column names" do let(:doc) { "big,bird,sesame street" } let(:config) do From fa041f319375bd6374c8f8eb9f561647600c39e5 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Thu, 7 May 2020 14:09:28 -0400 Subject: [PATCH 5/6] remove debug traces --- lib/logstash/codecs/csv.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/logstash/codecs/csv.rb b/lib/logstash/codecs/csv.rb index a45f9fb..6c92139 100644 --- a/lib/logstash/codecs/csv.rb +++ b/lib/logstash/codecs/csv.rb @@ -114,9 +114,7 @@ def register end def decode(data, &block) - puts("*** data=#{data.inspect}") @buffer.extract(data).each do |line| - puts("*** line=#{line.inspect}") parse(@converter.convert(line), &block) end end @@ -137,7 +135,6 @@ def encode(event) def flush(&block) remainder = @buffer.flush if !remainder.empty? - puts("flush: remainder=#{remainder}") parse(@converter.convert(remainder), &block) end end From 9d56a62214955156f9ac01b3cd91050e911a93ab Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Sat, 9 May 2020 12:01:34 -0400 Subject: [PATCH 6/6] add input_type config option --- lib/logstash/codecs/csv.rb | 64 ++++--- spec/codecs/csv_spec.rb | 362 ++++++++++++++++++++++--------------- 2 files changed, 260 insertions(+), 166 deletions(-) diff --git a/lib/logstash/codecs/csv.rb b/lib/logstash/codecs/csv.rb index 6c92139..0509aaf 100644 --- a/lib/logstash/codecs/csv.rb +++ b/lib/logstash/codecs/csv.rb @@ -62,6 +62,12 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base # "CP1252". config :charset, :validate => ::Encoding.name_list, :default => "UTF-8" + # The input type the associated input plugin is providing. Inputs such as the file or http input + # plugins which provide complete data chunks such as lines or documents to the codec need the `line` + # input type, while other inputs such as stdin or tcp where data can be incomplete across data + # chunks need to use the 'stream' input type. + config :input_type, :validate => ["line", "stream"], :default => "line" + CONVERTERS = { :integer => lambda do |value| CSV::Converters[:integer].call(value) @@ -92,7 +98,10 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base CONVERTERS.freeze def register - @buffer = FileWatch::BufferedTokenizer.new(@delimiter) + @streaming = @input_type == "stream" + if @streaming + @buffer = FileWatch::BufferedTokenizer.new(@delimiter) + end @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger @@ -114,8 +123,12 @@ def register end def decode(data, &block) - @buffer.extract(data).each do |line| - parse(@converter.convert(line), &block) + if @streaming + @buffer.extract(data).each do |line| + parse(@converter.convert(line), &block) + end + else + parse(@converter.convert(data), &block) end end @@ -133,9 +146,11 @@ def encode(event) end def flush(&block) - remainder = @buffer.flush - if !remainder.empty? - parse(@converter.convert(remainder), &block) + if @streaming + remainder = @buffer.flush + if !remainder.empty? + parse(@converter.convert(remainder), &block) + end end end @@ -143,29 +158,30 @@ def flush(&block) def parse(line, &block) begin - values = CSV.parse_line(line, :col_sep => @separator, :quote_char => @quote_char) - return if values.nil? + CSV.parse(line, :col_sep => @separator, :quote_char => @quote_char).each do |values| + next if values.nil? || values.empty? - if (@autodetect_column_names && @columns.empty?) - @columns = values - @logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect) - return - end + if (@autodetect_column_names && @columns.empty?) + @columns = values + @logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect) + next + end - decoded = {} - values.each_index do |i| - unless (@skip_empty_columns && (values[i].nil? || values[i].empty?)) - unless ignore_field?(i) - field_name = @columns[i] || "column#{i + 1}" - decoded[field_name] = transform(field_name, values[i]) + decoded = {} + values.each_index do |i| + unless (@skip_empty_columns && (values[i].nil? || values[i].empty?)) + unless ignore_field?(i) + field_name = @columns[i] || "column#{i + 1}" + decoded[field_name] = transform(field_name, values[i]) + end end end - end - yield LogStash::Event.new(decoded) - rescue CSV::MalformedCSVError => e - @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => line) - yield LogStash::Event.new("message" => line, "tags" => ["_csvparsefailure"]) + yield LogStash::Event.new(decoded) + rescue CSV::MalformedCSVError => e + @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => line) + yield LogStash::Event.new("message" => line, "tags" => ["_csvparsefailure"]) + end end end diff --git a/spec/codecs/csv_spec.rb b/spec/codecs/csv_spec.rb index fcd20df..e3bde58 100644 --- a/spec/codecs/csv_spec.rb +++ b/spec/codecs/csv_spec.rb @@ -4,222 +4,300 @@ describe LogStash::Codecs::CSV do - subject(:codec) { LogStash::Codecs::CSV.new(config) } - let(:config) { Hash.new } + subject(:codec) { LogStash::Codecs::CSV.new(base_config.merge(config)) } + let(:config) { Hash.new } before(:each) do codec.register end - describe "decode" do - let(:data) { "big,bird,sesame street" } + shared_examples "decoding tests" do + describe "decode" do - it "return an event from CSV data" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq("big") - expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq("sesame street") - end - end + describe "single line" do + let(:data) { "big,bird,sesame street" } - describe "multiple lines" do - let(:data) { "big,bird\nsesame,street\nfoo,bar\n" } - - it "return events from CSV data" do - events = [] - codec.decode(data) {|event| events << event} - expect(events.size).to eq(3) - expect(events[0].get("column1")).to eq("big") - expect(events[0].get("column2")).to eq("bird") - expect(events[1].get("column1")).to eq("sesame") - expect(events[1].get("column2")).to eq("street") - expect(events[2].get("column1")).to eq("foo") - expect(events[2].get("column2")).to eq("bar") + it "return an event from CSV data" do + codec.decode(data) do |event| + expect(event.get("column1")).to eq("big") + expect(event.get("column2")).to eq("bird") + expect(event.get("column3")).to eq("sesame street") + end + end end - end - describe "empty lines" do - let(:data) { "big,bird\n\n\nsesame,street\nfoo,bar\n\n\n" } - - it "return events from CSV data" do - events = [] - codec.decode(data) {|event| events << event} - expect(events.size).to eq(3) + describe "multiple lines" do + let(:data) { "big,bird\nsesame,street\nfoo,bar\n" } + + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(3) + expect(events[0].get("column1")).to eq("big") + expect(events[0].get("column2")).to eq("bird") + expect(events[1].get("column1")).to eq("sesame") + expect(events[1].get("column2")).to eq("street") + expect(events[2].get("column1")).to eq("foo") + expect(events[2].get("column2")).to eq("bar") + end end - end - describe "flush" do - let(:data) { "big,bird\nsesame,street" } + describe "empty lines" do + let(:data) { "big,bird\n\n\nsesame,street\nfoo,bar\n\n\n" } - it "return events from CSV data" do - events = [] - codec.decode(data) {|event| events << event} - expect(events.size).to eq(1) - codec.flush {|event| events << event} - expect(events.size).to eq(2) + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(3) + end end - end - describe "given column names" do - let(:doc) { "big,bird,sesame street" } - let(:config) do - { "columns" => ["first", "last", "address" ] } - end + describe "flush" do + let(:data) { "big,bird\nsesame,street" } - it "extract all the values" do - codec.decode(data) do |event| - expect(event.get("first")).to eq("big") - expect(event.get("last")).to eq("bird") - expect(event.get("address")).to eq("sesame street") + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + codec.flush {|event| events << event} + expect(events.size).to eq(2) end end - context "parse csv skipping empty columns" do - let(:data) { "val1,,val3" } + describe "given column names" do + let(:data) { "big,bird,sesame street" } let(:config) do - { "skip_empty_columns" => true, - "columns" => ["custom1", "custom2", "custom3"] } + { "columns" => ["first", "last", "address" ] } end it "extract all the values" do codec.decode(data) do |event| - expect(event.get("custom1")).to eq("val1") - expect(event.to_hash).not_to include("custom2") - expect(event.get("custom3")).to eq("val3") + expect(event.get("first")).to eq("big") + expect(event.get("last")).to eq("bird") + expect(event.get("address")).to eq("sesame street") + end + end + + context "parse csv skipping empty columns" do + let(:data) { "val1,,val3" } + let(:config) do + { "skip_empty_columns" => true, + "columns" => ["custom1", "custom2", "custom3"] } + end + + it "extract all the values" do + codec.decode(data) do |event| + expect(event.get("custom1")).to eq("val1") + expect(event.to_hash).not_to include("custom2") + expect(event.get("custom3")).to eq("val3") + end + end + end + + context "parse csv without autogeneration of names" do + let(:data) { "val1,val2,val3" } + let(:config) do + { + "autogenerate_column_names" => false, + "columns" => ["custom1", "custom2"] + } + end + + it "extract all the values" do + codec.decode(data) do |event| + expect(event.get("custom1")).to eq("val1") + expect(event.get("custom2")).to eq("val2") + expect(event.get("column3")).to be_falsey + end end end end - context "parse csv without autogeneration of names" do - let(:data) { "val1,val2,val3" } + describe "custom separator" do + let(:data) { "big,bird;sesame street" } let(:config) do - { - "autogenerate_column_names" => false, - "columns" => ["custom1", "custom2"] - } + { "separator" => ";" } end - it "extract all the values" do + it "return an event from CSV data" do codec.decode(data) do |event| - expect(event.get("custom1")).to eq("val1") - expect(event.get("custom2")).to eq("val2") - expect(event.get("column3")).to be_falsey + expect(event.get("column1")).to eq("big,bird") + expect(event.get("column2")).to eq("sesame street") end end end - end - describe "custom separator" do - let(:data) { "big,bird;sesame street" } - let(:config) do - { "separator" => ";" } - end + describe "quote char" do + let(:data) { "big,bird,'sesame street'" } - it "return an event from CSV data" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq("big,bird") - expect(event.get("column2")).to eq("sesame street") + let(:config) do + { "quote_char" => "'" } end - end - end - describe "quote char" do - let(:data) { "big,bird,'sesame street'" } + it "return an event from CSV data" do + codec.decode(data) do |event| + expect(event.get("column1")).to eq("big") + expect(event.get("column2")).to eq("bird") + expect(event.get("column3")).to eq("sesame street") + end + end - let(:config) do - { "quote_char" => "'" } - end + context "using the default one" do + let(:data) { 'big,bird,"sesame, street"' } + let(:config) { Hash.new } - it "return an event from CSV data" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq("big") - expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq("sesame street") + it "return an event from CSV data" do + codec.decode(data) do |event| + expect(event.get("column1")).to eq("big") + expect(event.get("column2")).to eq("bird") + expect(event.get("column3")).to eq("sesame, street") + end + end + end + + context "using a null" do + let(:data) { 'big,bird,"sesame" street' } + let(:config) do + { "quote_char" => "\x00" } + end + + it "return an event from CSV data" do + codec.decode(data) do |event| + expect(event.get("column1")).to eq("big") + expect(event.get("column2")).to eq("bird") + expect(event.get("column3")).to eq('"sesame" street') + end + end end end - context "using the default one" do - let(:data) { 'big,bird,"sesame, street"' } - let(:config) { Hash.new } + describe "having headers" do + let(:data) do + [ "size,animal,movie", "big,bird,sesame street"] + end + let(:new_data) do + [ "host,country,city", "example.com,germany,berlin"] + end + let(:config) do + { "autodetect_column_names" => true } + end - it "return an event from CSV data" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq("big") - expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq("sesame, street") + it "include header information when requested" do + codec.decode(data[0]) # Read the headers + codec.decode(data[1]) do |event| + expect(event.get("size")).to eq("big") + expect(event.get("animal")).to eq("bird") + expect(event.get("movie")).to eq("sesame street") end end end - context "using a null" do - let(:data) { 'big,bird,"sesame" street' } + describe "using field convertion" do let(:config) do - { "quote_char" => "\x00" } + { "convert" => { "column1" => "integer", "column3" => "boolean" } } end + let(:data) { "1234,bird,false" } - it "return an event from CSV data" do + it "get converted values to the expected type" do codec.decode(data) do |event| - expect(event.get("column1")).to eq("big") + expect(event.get("column1")).to eq(1234) expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq('"sesame" street') + expect(event.get("column3")).to eq(false) + end + end + + context "when using column names" do + let(:config) do + { "convert" => { "custom1" => "integer", "custom3" => "boolean" }, + "columns" => ["custom1", "custom2", "custom3"] } + end + + it "get converted values to the expected type" do + codec.decode(data) do |event| + expect(event.get("custom1")).to eq(1234) + expect(event.get("custom2")).to eq("bird") + expect(event.get("custom3")).to eq(false) + end end end end end + end # shared_examples - describe "having headers" do - let(:data) do - [ "size,animal,movie", "big,bird,sesame street"] - end - let(:new_data) do - [ "host,country,city", "example.com,germany,berlin"] - end - let(:config) do - { "autodetect_column_names" => true } + describe "line input_type" do + let(:base_config) { { "input_type" => "line" } } + + include_examples "decoding tests" + + context "line break in column" do + let(:data) { "\"a\",\"b\",\"c\"\n1,\"text\nwith line break\",2\n2,\"foo\",3\n" } + let(:config) { { "autodetect_column_names" => true } } + + it "is supported and return events" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(2) + expect(events[0].get("a")).to eq("1") + expect(events[0].get("b")).to eq("text\nwith line break") end + end + end + + describe "stream input_type" do + let(:base_config) { { "input_type" => "stream" } } - it "include header information when requested" do - codec.decode(data[0]) # Read the headers - codec.decode(data[1]) do |event| - expect(event.get("size")).to eq("big") - expect(event.get("animal")).to eq("bird") - expect(event.get("movie")).to eq("sesame street") + include_examples "decoding tests" + + context "incomplete chunks with final line break" do + let(:chunks) { ["big,bi", "rd,sesame street\n"] } + + it "return an event from CSV data" do + events = [] + chunks.each do |chunk| + codec.decode(chunk) { |event| events << event } end + + expect(events.size).to eq(1) + expect(events[0].get("column1")).to eq("big") + expect(events[0].get("column2")).to eq("bird") + expect(events[0].get("column3")).to eq("sesame street") end end - describe "using field convertion" do - let(:config) do - { "convert" => { "column1" => "integer", "column3" => "boolean" } } - end - let(:data) { "1234,bird,false" } + context "incomplete chunks without final line break with flush" do + let(:chunks) { ["aaa,b", "bb,ccc\ndd", "d,ee", "e,fff"] } - it "get converted values to the expected type" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq(1234) - expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq(false) + it "return an event from CSV data" do + events = [] + chunks.each do |chunk| + codec.decode(chunk) { |event| events << event } end + expect(events.size).to eq(1) + + codec.flush { |event| events << event } + expect(events.size).to eq(2) + + expect(events[0].get("column1")).to eq("aaa") + expect(events[0].get("column2")).to eq("bbb") + expect(events[0].get("column3")).to eq("ccc") + expect(events[1].get("column1")).to eq("ddd") + expect(events[1].get("column2")).to eq("eee") + expect(events[1].get("column3")).to eq("fff") end + end - context "when using column names" do - let(:config) do - { "convert" => { "custom1" => "integer", "custom3" => "boolean" }, - "columns" => ["custom1", "custom2", "custom3"] } - end + context "line break in column" do + let(:data) { "\"a\",\"b\",\"c\"\n1,\"text\nwith line break\",2\n2,\"foo\",3\n" } + let(:config) { { "autodetect_column_names" => true } } - it "get converted values to the expected type" do - codec.decode(data) do |event| - expect(event.get("custom1")).to eq(1234) - expect(event.get("custom2")).to eq("bird") - expect(event.get("custom3")).to eq(false) - end - end + it "is unsupported" do + expect{codec.decode(data)}.to raise_error(CSV::MalformedCSVError) end end end describe "encode" do + let(:base_config) { Hash.new } + context "not including headers" do let(:event) { LogStash::Event.new({"f1" => "v1", "f2" => "v2"}) }