@@ -3,6 +3,8 @@ package ru.d10xa.jsonlogviewer
33import cats .effect .IO
44import cats .effect .Ref
55import fs2 .*
6+ import fs2 .Pull
7+ import ru .d10xa .jsonlogviewer .csv .CsvLogLineParser
68import ru .d10xa .jsonlogviewer .decline .yaml .ConfigYaml
79import ru .d10xa .jsonlogviewer .decline .yaml .Feed
810import ru .d10xa .jsonlogviewer .decline .Config
@@ -35,81 +37,97 @@ object LogViewerStream {
3537 val feedStreams = feeds.zipWithIndex.map { (feed, index) =>
3638 val feedStream : Stream [IO , String ] =
3739 commandsAndInlineInputToStream(feed.commands, feed.inlineInput)
38- processStream(
39- config,
40- feedStream,
41- configYamlRef,
42- index
40+
41+ createProcessStream(
42+ config = config,
43+ lines = feedStream,
44+ configYamlRef = configYamlRef,
45+ index = index,
46+ initialFormatIn = feed.formatIn.orElse(config.formatIn)
4347 )
4448 }
4549 Stream .emits(feedStreams).parJoin(feedStreams.size)
4650 case None =>
47- processStream(config, stdinLinesStream, configYamlRef, - 1 )
51+ createProcessStream(
52+ config = config,
53+ lines = stdinLinesStream,
54+ configYamlRef = configYamlRef,
55+ index = - 1 ,
56+ initialFormatIn = config.formatIn
57+ )
4858 }
4959
5060 finalStream
5161 .intersperse(" \n " )
5262 .append(Stream .emit(" \n " ))
5363 }
5464
55- private def commandsAndInlineInputToStream (
56- commands : List [String ],
57- inlineInput : Option [String ]
58- ): Stream [IO , String ] =
59- new ShellImpl ().mergeCommandsAndInlineInput(commands, inlineInput)
60-
61- def makeLogLineParser (
65+ private def createProcessStream (
6266 config : Config ,
63- optFormatIn : Option [FormatIn ]
64- ): LogLineParser = {
65- val jsonPrefixPostfix = JsonPrefixPostfix (JsonDetector ())
66- optFormatIn match {
67- case Some (FormatIn .Logfmt ) => LogfmtLogLineParser (config)
68- case _ => JsonLogLineParser (config, jsonPrefixPostfix)
67+ lines : Stream [IO , String ],
68+ configYamlRef : Ref [IO , Option [ConfigYaml ]],
69+ index : Int ,
70+ initialFormatIn : Option [FormatIn ]
71+ ): Stream [IO , String ] =
72+ if (initialFormatIn.contains(FormatIn .Csv )) {
73+ lines.pull.uncons1.flatMap {
74+ case Some ((headerLine, rest)) =>
75+ val csvHeaderParser = CsvLogLineParser (config, headerLine)
76+ processStreamWithEffectiveConfig(
77+ config = config,
78+ lines = rest,
79+ configYamlRef = configYamlRef,
80+ index = index,
81+ parser = Some (csvHeaderParser)
82+ ).pull.echo
83+ case None =>
84+ Pull .done
85+ }.stream
86+ } else {
87+ processStreamWithEffectiveConfig(
88+ config = config,
89+ lines = lines,
90+ configYamlRef = configYamlRef,
91+ index = index,
92+ parser = None
93+ )
6994 }
70- }
7195
72- private def processStream (
73- baseConfig : Config ,
96+ private def processStreamWithEffectiveConfig (
97+ config : Config ,
7498 lines : Stream [IO , String ],
7599 configYamlRef : Ref [IO , Option [ConfigYaml ]],
76- index : Int
100+ index : Int ,
101+ parser : Option [LogLineParser ]
77102 ): Stream [IO , String ] =
78103 for {
79104 line <- lines
80105 optConfigYaml <- Stream .eval(configYamlRef.get)
81- formatIn = optConfigYaml
82- .flatMap(_.feeds)
83- .flatMap(_.lift(index).flatMap(_.formatIn))
84- .orElse(baseConfig.formatIn)
85- filter = optConfigYaml
86- .flatMap(_.feeds)
87- .flatMap(_.lift(index).flatMap(_.filter))
88- .orElse(baseConfig.filter)
89- rawInclude = optConfigYaml
90- .flatMap(_.feeds)
91- .flatMap(_.lift(index).flatMap(_.rawInclude))
92- rawExclude = optConfigYaml
93- .flatMap(_.feeds)
94- .flatMap(_.lift(index).flatMap(_.rawExclude))
95- feedName = optConfigYaml
96- .flatMap(_.feeds)
97- .flatMap(_.lift(index).flatMap(_.name))
98- effectiveConfig = baseConfig.copy(
99- filter = filter,
100- formatIn = formatIn
106+
107+ feedConfig = extractFeedConfig(optConfigYaml, index)
108+
109+ effectiveConfig = config.copy(
110+ filter = feedConfig.filter.orElse(config.filter),
111+ formatIn = feedConfig.formatIn.orElse(config.formatIn)
101112 )
113+
102114 timestampFilter = TimestampFilter ()
103115 parseResultKeys = ParseResultKeys (effectiveConfig)
104116 logLineFilter = LogLineFilter (effectiveConfig, parseResultKeys)
105- logLineParser = makeLogLineParser(effectiveConfig, formatIn)
106- outputLineFormatter = effectiveConfig.formatOut match
117+
118+ logLineParser = parser.getOrElse(
119+ makeNonCsvLogLineParser(effectiveConfig, feedConfig.formatIn)
120+ )
121+
122+ outputLineFormatter = effectiveConfig.formatOut match {
107123 case Some (Config .FormatOut .Raw ) => RawFormatter ()
108124 case Some (Config .FormatOut .Pretty ) | None =>
109- ColorLineFormatter (effectiveConfig, feedName)
125+ ColorLineFormatter (effectiveConfig, feedConfig.feedName, feedConfig.excludeFields)
126+ }
127+
110128 evaluatedLine <- Stream
111129 .emit(line)
112- .filter(rawFilter(_, rawInclude, rawExclude))
130+ .filter(rawFilter(_, feedConfig. rawInclude, feedConfig. rawExclude))
113131 .map(logLineParser.parse)
114132 .filter(logLineFilter.grep)
115133 .filter(logLineFilter.logLineQueryPredicate)
@@ -121,27 +139,80 @@ object LogViewerStream {
121139 effectiveConfig.timestamp.before
122140 )
123141 )
124- .map(pr =>
125- Try (outputLineFormatter.formatLine(pr)) match {
126- case Success (formatted) => formatted.toString
127- case Failure (_) => pr.raw
128- }
129- )
130- .map(_.toString)
142+ .map(formatWithSafety(_, outputLineFormatter))
131143 } yield evaluatedLine
132144
145+ private def formatWithSafety (
146+ parseResult : ParseResult ,
147+ formatter : OutputLineFormatter
148+ ): String =
149+ Try (formatter.formatLine(parseResult)) match {
150+ case Success (formatted) => formatted.toString
151+ case Failure (_) => parseResult.raw
152+ }
153+
154+ // TODO
155+ private case class FeedConfig (
156+ feedName : Option [String ],
157+ filter : Option [ru.d10xa.jsonlogviewer.query.QueryAST ],
158+ formatIn : Option [FormatIn ],
159+ rawInclude : Option [List [String ]],
160+ rawExclude : Option [List [String ]],
161+ excludeFields : Option [List [String ]]
162+ )
163+
164+ private def extractFeedConfig (
165+ optConfigYaml : Option [ConfigYaml ],
166+ index : Int
167+ ): FeedConfig = {
168+ val feedOpt = optConfigYaml
169+ .flatMap(_.feeds)
170+ .flatMap(_.lift(index))
171+
172+ FeedConfig (
173+ feedName = feedOpt.flatMap(_.name),
174+ filter = feedOpt.flatMap(_.filter),
175+ formatIn = feedOpt.flatMap(_.formatIn),
176+ rawInclude = feedOpt.flatMap(_.rawInclude),
177+ rawExclude = feedOpt.flatMap(_.rawExclude),
178+ excludeFields = feedOpt.flatMap(_.excludeFields)
179+ )
180+ }
181+
182+ private def commandsAndInlineInputToStream (
183+ commands : List [String ],
184+ inlineInput : Option [String ]
185+ ): Stream [IO , String ] =
186+ new ShellImpl ().mergeCommandsAndInlineInput(commands, inlineInput)
187+
188+ def makeNonCsvLogLineParser (
189+ config : Config ,
190+ optFormatIn : Option [FormatIn ]
191+ ): LogLineParser = {
192+ val jsonPrefixPostfix = JsonPrefixPostfix (JsonDetector ())
193+ optFormatIn match {
194+ case Some (FormatIn .Logfmt ) => LogfmtLogLineParser (config)
195+ case Some (FormatIn .Csv ) =>
196+ throw new IllegalStateException (
197+ " method makeNonCsvLogLineParser does not support csv"
198+ )
199+ case _ => JsonLogLineParser (config, jsonPrefixPostfix)
200+ }
201+ }
202+
133203 def rawFilter (
134204 str : String ,
135205 include : Option [List [String ]],
136206 exclude : Option [List [String ]]
137207 ): Boolean = {
138- val includeRegexes : List [Regex ] = include.getOrElse(Nil ).map(_.r)
139- val excludeRegexes : List [Regex ] = exclude.getOrElse(Nil ).map(_.r)
208+ val includeRegexes : List [Regex ] =
209+ include.getOrElse(Nil ).map(_.r)
210+ val excludeRegexes : List [Regex ] =
211+ exclude.getOrElse(Nil ).map(_.r)
140212 val includeMatches = includeRegexes.isEmpty || includeRegexes.exists(
141213 _.findFirstIn(str).isDefined
142214 )
143215 val excludeMatches = excludeRegexes.forall(_.findFirstIn(str).isEmpty)
144216 includeMatches && excludeMatches
145217 }
146-
147218}
0 commit comments