Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions output/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Indexer struct {
indexType string
RateCounter *ratecounter.RateCounter
lastDisplayUpdate time.Time
fields map[string]string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the value is not a string (e.g. float, integer, etc.)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What indeed. :)

I was just following the established convention that it routes only by strings. If there is a need to support routing for all JSON datatypes, it could be done with a type switch in each output plugin. IMO that's overkill unless someone has a compelling reason.

}

type Config struct {
Expand All @@ -58,7 +59,7 @@ func init() {
output.Register("elasticsearch", New)
}

func New() (output.Output) {
func New() output.Output {
return &ESServer{
host: fmt.Sprintf("%s:%d", defaultHost, time.Now().Unix()),
term: make(chan bool, 1),
Expand Down Expand Up @@ -102,10 +103,22 @@ func (i *Indexer) flush() error {
}

func (i *Indexer) index(ev *buffer.Event) error {
for key, value := range i.fields {
val, ok := (*ev.Fields)[key]
if !ok {
//early out if no match
return nil
}
strval, isstr := val.(string)
if !isstr || value != strval {
//early out if no match
return nil
}
}

doc := *ev.Text
idx := indexName(i.indexPrefix)
typ := i.indexType

request := elastic.NewBulkIndexRequest().Index(idx).Type(typ).Doc(doc)
i.bulkService.Add(request)
i.RateCounter.Incr(1)
Expand Down Expand Up @@ -193,7 +206,7 @@ func (es *ESServer) insertIndexTemplate(client *elastic.Client) error {
}

func (es *ESServer) Start() error {
if (es.b == nil) {
if es.b == nil {
log.Printf("[%s] No Route is specified for this output", es.name)
return nil
}
Expand Down Expand Up @@ -256,7 +269,7 @@ func (es *ESServer) Start() error {
rateCounter := ratecounter.NewRateCounter(1 * time.Second)

// Create indexer
idx := &Indexer{service, es.config.IndexPrefix, es.config.IndexType, rateCounter, time.Now()}
idx := &Indexer{service, es.config.IndexPrefix, es.config.IndexType, rateCounter, time.Now(), es.fields}

// Loop events and publish to elasticsearch
tick := time.NewTicker(time.Duration(esFlushInterval) * time.Second)
Expand Down
14 changes: 10 additions & 4 deletions output/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (redisServer *RedisServer) Init(name string, config yaml.MapSlice, sender b
}

func (redisServer *RedisServer) Start() error {
if (redisServer.sender == nil) {
if redisServer.sender == nil {
log.Printf("[%s] No Route is specified for this output", redisServer.name)
return nil
}
Expand All @@ -173,16 +173,22 @@ func (redisServer *RedisServer) Start() error {
for {
select {
case ev := <-receiveChan:
rateCounter.Incr(1)
var allowed bool
allowed = true
for key, value := range redisServer.fields {
if ((*ev.Fields)[key] == nil || ((*ev.Fields)[key] != nil && value != (*ev.Fields)[key].(string))) {
for key, value := range redisServer.fields {
val, ok := (*ev.Fields)[key]
if !ok {
allowed = false
break
}
strval, isstr := val.(string)
if !isstr || value != strval {
allowed = false
break
}
}
if allowed {
rateCounter.Incr(1)
text := *ev.Text
for _, queue := range allQueues {
queue.data <- text
Expand Down
21 changes: 13 additions & 8 deletions output/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func uuid() string {
}

type Config struct {
AwsKeyIdLoc string `yaml:"aws_key_id_loc"`
AwsKeyIdLoc string `yaml:"aws_key_id_loc"`
AwsSecKeyLoc string `yaml:"aws_sec_key_loc"`
AwsS3Bucket string `yaml:"aws_s3_bucket"`
AwsS3Region string `yaml:"aws_s3_region"`
AwsS3Bucket string `yaml:"aws_s3_bucket"`
AwsS3Region string `yaml:"aws_s3_region"`

LocalPath string `yaml:"local_path"`
Path string `yaml:"s3_path"`
Expand Down Expand Up @@ -181,7 +181,7 @@ func init() {
output.Register("s3", New)
}

func New() (output.Output) {
func New() output.Output {
return &S3Writer{term: make(chan bool, 1)}
}

Expand Down Expand Up @@ -265,15 +265,14 @@ func (s3Writer *S3Writer) Init(name string, config yaml.MapSlice, sender buffer.
}

func (s3Writer *S3Writer) Start() error {
if (s3Writer.Sender == nil) {
if s3Writer.Sender == nil {
log.Printf("[%s] No route is specified for this output", s3Writer.name)
return nil
}
// Create file saver
fileSaver := new(FileSaver)
fileSaver.Config = s3Writer.Config
fileSaver.RateCounter = ratecounter.NewRateCounter(1 * time.Second)

id := "s3_output"
// Add the client as a subscriber
receiveChan := make(chan *buffer.Event, recvBuffer)
Expand All @@ -290,8 +289,14 @@ func (s3Writer *S3Writer) Start() error {
case ev := <-receiveChan:
var allowed bool
allowed = true
for key, value := range s3Writer.fields {
if ((*ev.Fields)[key] == nil || ((*ev.Fields)[key] != nil && value != (*ev.Fields)[key].(string))) {
for key, value := range s3Writer.fields {
val, ok := (*ev.Fields)[key]
if !ok {
allowed = false
break
}
strval, isstr := val.(string)
if !isstr || value != strval {
allowed = false
break
}
Expand Down
26 changes: 16 additions & 10 deletions output/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ type Config struct {
}

type TCPServer struct {
name string
name string
fields map[string]string
host string
b buffer.Sender
term chan bool
host string
b buffer.Sender
term chan bool
}

func init() {
output.Register("tcp", New)
}

func New() (output.Output) {
func New() output.Output {
return &TCPServer{term: make(chan bool, 1)}
}

Expand All @@ -54,13 +54,19 @@ func (s *TCPServer) accept(c net.Conn) {
case ev := <-r:
var allowed bool
allowed = true
for key, value := range s.fields {
if ((*ev.Fields)[key] == nil || ((*ev.Fields)[key] != nil && value != (*ev.Fields)[key].(string))) {
for key, value := range s.fields {
val, ok := (*ev.Fields)[key]
if !ok {
allowed = false
break
}
}
if allowed {
strval, isstr := val.(string)
if !isstr || value != strval {
allowed = false
break
}
}
if allowed {
_, err := c.Write([]byte(fmt.Sprintf("%s %s\n", ev.Source, *ev.Text)))
if err != nil {
log.Printf("[%s - %s] error sending event to tcp connection: %v", s.name, c.RemoteAddr().String(), err)
Expand Down Expand Up @@ -91,7 +97,7 @@ func (s *TCPServer) Init(name string, config yaml.MapSlice, b buffer.Sender, rou
}

func (s *TCPServer) Start() error {
if (s.b == nil) {
if s.b == nil {
log.Printf("[%s] No Route is specified for this output", s.name)
return nil
}
Expand Down
38 changes: 27 additions & 11 deletions output/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ type Config struct {
}

type WebSocketServer struct {
name string
name string
fields map[string]string
host string
b buffer.Sender
term chan bool
host string
b buffer.Sender
term chan bool

mtx sync.RWMutex
logs map[string]time.Time
Expand All @@ -44,7 +44,7 @@ func init() {
output.Register("websocket", New)
}

func New() (output.Output) {
func New() output.Output {
return &WebSocketServer{
logs: make(map[string]time.Time),
term: make(chan bool, 1),
Expand Down Expand Up @@ -74,11 +74,27 @@ func (ws *WebSocketServer) wslogsHandler(w *websocket.Conn) {
continue
}
}
var allowed bool
allowed = true
for key, value := range ws.fields {
val, ok := (*ev.Fields)[key]
if !ok {
allowed = false
break
}
strval, isstr := val.(string)
if !isstr || value != strval {
allowed = false
break
}
}
if allowed {

err := websocket.Message.Send(w, *ev.Text)
if err != nil {
log.Printf("[%s] error sending ws message: %v", w.RemoteAddr().String(), err.Error())
return
err := websocket.Message.Send(w, *ev.Text)
if err != nil {
log.Printf("[%s] error sending ws message: %v", w.RemoteAddr().String(), err.Error())
return
}
}
}
}
Expand Down Expand Up @@ -108,7 +124,7 @@ func (ws *WebSocketServer) logListMaintainer() {
}()

r := make(chan *buffer.Event, recvBuffer)
ws.b.AddSubscriber(ws.name + "_logList", r)
ws.b.AddSubscriber(ws.name+"_logList", r)

ticker := time.NewTicker(time.Duration(600) * time.Second)

Expand Down Expand Up @@ -150,7 +166,7 @@ func (ws *WebSocketServer) Init(name string, config yaml.MapSlice, b buffer.Send
}

func (ws *WebSocketServer) Start() error {
if (ws.b == nil) {
if ws.b == nil {
log.Printf("[%s] No route is specified for this output", ws.name)
return nil
}
Expand Down