diff --git a/output/elasticsearch/elasticsearch.go b/output/elasticsearch/elasticsearch.go index 6a54d3e..46c0ad3 100644 --- a/output/elasticsearch/elasticsearch.go +++ b/output/elasticsearch/elasticsearch.go @@ -32,6 +32,7 @@ type Indexer struct { indexType string RateCounter *ratecounter.RateCounter lastDisplayUpdate time.Time + fields map[string]string } type Config struct { @@ -58,7 +59,7 @@ func init() { output.Register("elasticsearch", New) } -func New() (output.Output) { +func New() output.Output { return &ESServer{ host: fmt.Sprintf("%s:%d", defaultHost, time.Now().Unix()), term: make(chan bool, 1), @@ -102,10 +103,22 @@ func (i *Indexer) flush() error { } func (i *Indexer) index(ev *buffer.Event) error { + for key, value := range i.fields { + val, ok := (*ev.Fields)[key] + if !ok { + //early out if no match + return nil + } + strval, isstr := val.(string) + if !isstr || value != strval { + //early out if no match + return nil + } + } + doc := *ev.Text idx := indexName(i.indexPrefix) typ := i.indexType - request := elastic.NewBulkIndexRequest().Index(idx).Type(typ).Doc(doc) i.bulkService.Add(request) i.RateCounter.Incr(1) @@ -193,7 +206,7 @@ func (es *ESServer) insertIndexTemplate(client *elastic.Client) error { } func (es *ESServer) Start() error { - if (es.b == nil) { + if es.b == nil { log.Printf("[%s] No Route is specified for this output", es.name) return nil } @@ -256,7 +269,7 @@ func (es *ESServer) Start() error { rateCounter := ratecounter.NewRateCounter(1 * time.Second) // Create indexer - idx := &Indexer{service, es.config.IndexPrefix, es.config.IndexType, rateCounter, time.Now()} + idx := &Indexer{service, es.config.IndexPrefix, es.config.IndexType, rateCounter, time.Now(), es.fields} // Loop events and publish to elasticsearch tick := time.NewTicker(time.Duration(esFlushInterval) * time.Second) diff --git a/output/redis/redis.go b/output/redis/redis.go index a1c1d6a..b7aa3a0 100644 --- a/output/redis/redis.go +++ b/output/redis/redis.go @@ -147,7 +147,7 @@ func (redisServer *RedisServer) Init(name string, config yaml.MapSlice, sender b } func (redisServer *RedisServer) Start() error { - if (redisServer.sender == nil) { + if redisServer.sender == nil { log.Printf("[%s] No Route is specified for this output", redisServer.name) return nil } @@ -173,16 +173,22 @@ func (redisServer *RedisServer) Start() error { for { select { case ev := <-receiveChan: - rateCounter.Incr(1) var allowed bool allowed = true - for key, value := range redisServer.fields { - if ((*ev.Fields)[key] == nil || ((*ev.Fields)[key] != nil && value != (*ev.Fields)[key].(string))) { + for key, value := range redisServer.fields { + val, ok := (*ev.Fields)[key] + if !ok { + allowed = false + break + } + strval, isstr := val.(string) + if !isstr || value != strval { allowed = false break } } if allowed { + rateCounter.Incr(1) text := *ev.Text for _, queue := range allQueues { queue.data <- text diff --git a/output/s3/s3.go b/output/s3/s3.go index 6d5ca65..ddd252a 100644 --- a/output/s3/s3.go +++ b/output/s3/s3.go @@ -41,10 +41,10 @@ func uuid() string { } type Config struct { - AwsKeyIdLoc string `yaml:"aws_key_id_loc"` + AwsKeyIdLoc string `yaml:"aws_key_id_loc"` AwsSecKeyLoc string `yaml:"aws_sec_key_loc"` - AwsS3Bucket string `yaml:"aws_s3_bucket"` - AwsS3Region string `yaml:"aws_s3_region"` + AwsS3Bucket string `yaml:"aws_s3_bucket"` + AwsS3Region string `yaml:"aws_s3_region"` LocalPath string `yaml:"local_path"` Path string `yaml:"s3_path"` @@ -181,7 +181,7 @@ func init() { output.Register("s3", New) } -func New() (output.Output) { +func New() output.Output { return &S3Writer{term: make(chan bool, 1)} } @@ -265,7 +265,7 @@ func (s3Writer *S3Writer) Init(name string, config yaml.MapSlice, sender buffer. } func (s3Writer *S3Writer) Start() error { - if (s3Writer.Sender == nil) { + if s3Writer.Sender == nil { log.Printf("[%s] No route is specified for this output", s3Writer.name) return nil } @@ -273,7 +273,6 @@ func (s3Writer *S3Writer) Start() error { fileSaver := new(FileSaver) fileSaver.Config = s3Writer.Config fileSaver.RateCounter = ratecounter.NewRateCounter(1 * time.Second) - id := "s3_output" // Add the client as a subscriber receiveChan := make(chan *buffer.Event, recvBuffer) @@ -290,8 +289,14 @@ func (s3Writer *S3Writer) Start() error { case ev := <-receiveChan: var allowed bool allowed = true - for key, value := range s3Writer.fields { - if ((*ev.Fields)[key] == nil || ((*ev.Fields)[key] != nil && value != (*ev.Fields)[key].(string))) { + for key, value := range s3Writer.fields { + val, ok := (*ev.Fields)[key] + if !ok { + allowed = false + break + } + strval, isstr := val.(string) + if !isstr || value != strval { allowed = false break } diff --git a/output/tcp/tcp.go b/output/tcp/tcp.go index 9f1cc7a..1bce2f2 100644 --- a/output/tcp/tcp.go +++ b/output/tcp/tcp.go @@ -20,18 +20,18 @@ type Config struct { } type TCPServer struct { - name string + name string fields map[string]string - host string - b buffer.Sender - term chan bool + host string + b buffer.Sender + term chan bool } func init() { output.Register("tcp", New) } -func New() (output.Output) { +func New() output.Output { return &TCPServer{term: make(chan bool, 1)} } @@ -54,13 +54,19 @@ func (s *TCPServer) accept(c net.Conn) { case ev := <-r: var allowed bool allowed = true - for key, value := range s.fields { - if ((*ev.Fields)[key] == nil || ((*ev.Fields)[key] != nil && value != (*ev.Fields)[key].(string))) { + for key, value := range s.fields { + val, ok := (*ev.Fields)[key] + if !ok { allowed = false break } - } - if allowed { + strval, isstr := val.(string) + if !isstr || value != strval { + allowed = false + break + } + } + if allowed { _, err := c.Write([]byte(fmt.Sprintf("%s %s\n", ev.Source, *ev.Text))) if err != nil { log.Printf("[%s - %s] error sending event to tcp connection: %v", s.name, c.RemoteAddr().String(), err) @@ -91,7 +97,7 @@ func (s *TCPServer) Init(name string, config yaml.MapSlice, b buffer.Sender, rou } func (s *TCPServer) Start() error { - if (s.b == nil) { + if s.b == nil { log.Printf("[%s] No Route is specified for this output", s.name) return nil } diff --git a/output/websocket/websocket.go b/output/websocket/websocket.go index 6cd4eda..22cce66 100644 --- a/output/websocket/websocket.go +++ b/output/websocket/websocket.go @@ -25,11 +25,11 @@ type Config struct { } type WebSocketServer struct { - name string + name string fields map[string]string - host string - b buffer.Sender - term chan bool + host string + b buffer.Sender + term chan bool mtx sync.RWMutex logs map[string]time.Time @@ -44,7 +44,7 @@ func init() { output.Register("websocket", New) } -func New() (output.Output) { +func New() output.Output { return &WebSocketServer{ logs: make(map[string]time.Time), term: make(chan bool, 1), @@ -74,11 +74,27 @@ func (ws *WebSocketServer) wslogsHandler(w *websocket.Conn) { continue } } + var allowed bool + allowed = true + for key, value := range ws.fields { + val, ok := (*ev.Fields)[key] + if !ok { + allowed = false + break + } + strval, isstr := val.(string) + if !isstr || value != strval { + allowed = false + break + } + } + if allowed { - err := websocket.Message.Send(w, *ev.Text) - if err != nil { - log.Printf("[%s] error sending ws message: %v", w.RemoteAddr().String(), err.Error()) - return + err := websocket.Message.Send(w, *ev.Text) + if err != nil { + log.Printf("[%s] error sending ws message: %v", w.RemoteAddr().String(), err.Error()) + return + } } } } @@ -108,7 +124,7 @@ func (ws *WebSocketServer) logListMaintainer() { }() r := make(chan *buffer.Event, recvBuffer) - ws.b.AddSubscriber(ws.name + "_logList", r) + ws.b.AddSubscriber(ws.name+"_logList", r) ticker := time.NewTicker(time.Duration(600) * time.Second) @@ -150,7 +166,7 @@ func (ws *WebSocketServer) Init(name string, config yaml.MapSlice, b buffer.Send } func (ws *WebSocketServer) Start() error { - if (ws.b == nil) { + if ws.b == nil { log.Printf("[%s] No route is specified for this output", ws.name) return nil }