From 5446096ae3e0caaff465b9cca540236371104dda Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 20 Nov 2025 12:35:49 +0100 Subject: [PATCH 01/10] refact output.go --- cmd/crowdsec/output.go | 49 ++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/cmd/crowdsec/output.go b/cmd/crowdsec/output.go index fcb372d1d57..b479f955370 100644 --- a/cmd/crowdsec/output.go +++ b/cmd/crowdsec/output.go @@ -20,18 +20,15 @@ func dedupAlerts(alerts []pipeline.RuntimeAlert) []*models.Alert { for idx, alert := range alerts { log.Tracef("alert %d/%d", idx, len(alerts)) - // if we have more than one source, we need to dedup - if len(alert.Sources) == 0 || len(alert.Sources) == 1 { + if len(alert.Sources) <= 1 { dedupCache = append(dedupCache, alert.Alert) continue } - for k := range alert.Sources { - refsrc := *alert.Alert // copy - + // if we have more than one source, we need to dedup + for k, src := range alert.Sources { log.Tracef("source[%s]", k) - - src := alert.Sources[k] + refsrc := *alert.Alert // copy refsrc.Source = &src dedupCache = append(dedupCache, &refsrc) } @@ -57,8 +54,15 @@ func PushAlerts(ctx context.Context, alerts []pipeline.RuntimeAlert, client *api var bucketOverflows []pipeline.Event -func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pipeline.Event, buckets *leaky.Buckets, postOverflowCTX parser.UnixParserCtx, - postOverflowNodes []parser.Node, client *apiclient.ApiClient) error { +func runOutput( + ctx context.Context, + input chan pipeline.Event, + overflow chan pipeline.Event, + buckets *leaky.Buckets, + postOverflowCTX parser.UnixParserCtx, + postOverflowNodes []parser.Node, + client *apiclient.ApiClient, +) error { var ( cache []pipeline.RuntimeAlert cacheMutex sync.Mutex @@ -73,8 +77,7 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip if len(cache) > 0 { cacheMutex.Lock() cachecopy := cache - newcache := make([]pipeline.RuntimeAlert, 0) - cache = newcache + cache = nil cacheMutex.Unlock() /* This loop needs to block as little as possible as scenarios directly write to the input chan @@ -103,35 +106,34 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip } return nil case event := <-overflow: + ov := event.Overflow + // if alert is empty and mapKey is present, the overflow is just to cleanup bucket - if event.Overflow.Alert == nil && event.Overflow.Mapkey != "" { - buckets.Bucket_map.Delete(event.Overflow.Mapkey) + if ov.Alert == nil && ov.Mapkey != "" { + buckets.Bucket_map.Delete(ov.Mapkey) break } + /* process post overflow parser nodes */ event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes) if err != nil { return fmt.Errorf("postoverflow failed: %w", err) } - log.Info(*event.Overflow.Alert.Message) + log.Info(*ov.Alert.Message) // if the Alert is nil, it's to signal bucket is ready for GC, don't track this // dump after postoveflow processing to avoid missing whitelist info - if dumpStates && event.Overflow.Alert != nil { - if bucketOverflows == nil { - bucketOverflows = make([]pipeline.Event, 0) - } - + if dumpStates && ov.Alert != nil { bucketOverflows = append(bucketOverflows, event) } - if event.Overflow.Whitelisted { - log.Infof("[%s] is whitelisted, skip.", *event.Overflow.Alert.Message) + if ov.Whitelisted { + log.Infof("[%s] is whitelisted, skip.", *ov.Alert.Message) continue } - if event.Overflow.Reprocess { + if ov.Reprocess { log.Debugf("Overflow being reprocessed.") select { case input <- event: @@ -140,12 +142,13 @@ func runOutput(ctx context.Context, input chan pipeline.Event, overflow chan pip log.Debugf("parsing is dead, skipping") } } + if dumpStates { continue } cacheMutex.Lock() - cache = append(cache, event.Overflow) + cache = append(cache, ov) cacheMutex.Unlock() } } From 23b76848d66c6691cbc74f6cd57527196153acd6 Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 20 Nov 2025 12:48:41 +0100 Subject: [PATCH 02/10] refact pour.go --- cmd/crowdsec/pour.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/cmd/crowdsec/pour.go b/cmd/crowdsec/pour.go index f48de97fc41..c21ad7c715f 100644 --- a/cmd/crowdsec/pour.go +++ b/cmd/crowdsec/pour.go @@ -13,25 +13,27 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) -func maybeGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) error { +func shouldTriggerGC(count int) bool { + return count % 5000 == 0 +} + +func triggerGC(parsed pipeline.Event, buckets *leaky.Buckets, cConfig *csconfig.Config) error { log.Infof("%d existing buckets", leaky.LeakyRoutineCount) + // when in forensics mode, garbage collect buckets - if cConfig.Crowdsec.BucketsGCEnabled { - if parsed.MarshaledTime != "" { - z := &time.Time{} - if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil { - log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err) - } else { - log.Warning("Starting buckets garbage collection ...") + if !cConfig.Crowdsec.BucketsGCEnabled || parsed.MarshaledTime == "" { + return nil + } - if err = leaky.GarbageCollectBuckets(*z, buckets); err != nil { - return err - } - } - } + z := &time.Time{} + if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil { + log.Warningf("Failed to parse time from event '%s': %s", parsed.MarshaledTime, err) + return nil } - return nil + log.Warning("Starting buckets garbage collection...") + + return leaky.GarbageCollectBuckets(*z, buckets) } func runPour(input chan pipeline.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) error { @@ -47,8 +49,8 @@ func runPour(input chan pipeline.Event, holders []leaky.BucketFactory, buckets * startTime := time.Now() count++ - if count%5000 == 0 { - if err := maybeGC(parsed, buckets, cConfig); err != nil { + if shouldTriggerGC(count) { + if err := triggerGC(parsed, buckets, cConfig); err != nil { return fmt.Errorf("failed to start bucket GC: %w", err) } } From 61e74006617d93c016b9503d57a6bac3fd75b7ae Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 20 Nov 2025 13:09:53 +0100 Subject: [PATCH 03/10] refact parse.go --- cmd/crowdsec/parse.go | 84 ++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 33 deletions(-) diff --git a/cmd/crowdsec/parse.go b/cmd/crowdsec/parse.go index d6c4e199103..4b64f49da70 100644 --- a/cmd/crowdsec/parse.go +++ b/cmd/crowdsec/parse.go @@ -11,6 +11,54 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) +func parseEvent( + event pipeline.Event, + parserCTX parser.UnixParserCtx, + nodes []parser.Node, +) *pipeline.Event { + if !event.Process { + return nil + } + + // Application security engine is going to generate 2 events: + // - one that is treated as a log and can go to scenarios + // - another one that will go directly to LAPI + if event.Type == pipeline.APPSEC { + outputEventChan <- event + return nil + } + + if event.Line.Module == "" { + log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line) + return nil + } + + metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc() + + startParsing := time.Now() + /* parse the log using magic */ + parsed, err := parser.Parse(parserCTX, event, nodes) + if err != nil { + log.Errorf("failed parsing: %v", err) + } + + elapsed := time.Since(startParsing) + metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds()) + if !parsed.Process { + metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() + log.Debugf("Discarding line %+v", parsed) + return nil + } + + metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() + if parsed.Whitelisted { + log.Debugf("event whitelisted, discard") + return nil + } + + return &parsed +} + func runParse(input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error { for { select { @@ -18,41 +66,11 @@ func runParse(input chan pipeline.Event, output chan pipeline.Event, parserCTX p log.Infof("Killing parser routines") return nil case event := <-input: - if !event.Process { - continue - } - /*Application security engine is going to generate 2 events: - - one that is treated as a log and can go to scenarios - - another one that will go directly to LAPI*/ - if event.Type == pipeline.APPSEC { - outputEventChan <- event - continue - } - if event.Line.Module == "" { - log.Errorf("empty event.Line.Module field, the acquisition module must set it ! : %+v", event.Line) - continue - } - metrics.GlobalParserHits.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Inc() - - startParsing := time.Now() - /* parse the log using magic */ - parsed, err := parser.Parse(parserCTX, event, nodes) - if err != nil { - log.Errorf("failed parsing: %v", err) - } - elapsed := time.Since(startParsing) - metrics.GlobalParsingHistogram.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module}).Observe(elapsed.Seconds()) - if !parsed.Process { - metrics.GlobalParserHitsKo.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() - log.Debugf("Discarding line %+v", parsed) - continue - } - metrics.GlobalParserHitsOk.With(prometheus.Labels{"source": event.Line.Src, "type": event.Line.Module, "acquis_type": event.Line.Labels["type"]}).Inc() - if parsed.Whitelisted { - log.Debugf("event whitelisted, discard") + parsed := parseEvent(event, parserCTX, nodes) + if parsed == nil { continue } - output <- parsed + output <- *parsed } } } From 0b9ae8524ddc616fe0ee64b466e03dc59da3fa5f Mon Sep 17 00:00:00 2001 From: marco Date: Fri, 7 Nov 2025 15:22:59 +0100 Subject: [PATCH 04/10] unused/unparam --- .golangci.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 0b91c4f0639..70c6d4478a0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,7 +29,7 @@ linters: - funcorder - exhaustive - errname # Checks that sentinel errors are prefixed with the `Err` and error types are suffixed with the `Error`. - - unparam # Reports unused function parameters +# - unparam # Reports unused function parameters - errchkjson # Checks types passed to the json encoding functions. Reports unsupported types and reports occasions, where the check for the returned error can be omitted. # we allow named returns (to improve readability, in the function signatures) @@ -322,7 +322,7 @@ linters: - name: unsecure-url-scheme disabled: true - name: unused-parameter - disabled: true + disabled: false - name: use-any disabled: true - name: use-waitgroup-go @@ -488,6 +488,10 @@ linters: path: pkg/acquisition/modules/s3/source.go text: found a struct that contains a context.Context field + - linters: + - unused + text: 'var _ is unused' + # migrate over time - linters: From d555702e26fe1b3554ad7d0db326257e4b190381 Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 27 Nov 2025 14:49:43 +0100 Subject: [PATCH 05/10] unused parameters/errors --- cmd/crowdsec/appsec_stub.go | 2 +- cmd/crowdsec/crowdsec.go | 6 +----- cmd/crowdsec/lpmetrics.go | 5 ++--- cmd/crowdsec/parse.go | 4 ++-- cmd/crowdsec/win_service_manage.go | 2 -- 5 files changed, 6 insertions(+), 13 deletions(-) diff --git a/cmd/crowdsec/appsec_stub.go b/cmd/crowdsec/appsec_stub.go index 4a65b32a9ad..2e4c8d753c6 100644 --- a/cmd/crowdsec/appsec_stub.go +++ b/cmd/crowdsec/appsec_stub.go @@ -6,6 +6,6 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/cwhub" ) -func LoadAppsecRules(hub *cwhub.Hub) error { +func LoadAppsecRules(_ *cwhub.Hub) error { return nil } diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 6ea50ff2329..9e83bb2c1df 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -79,10 +79,7 @@ func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers) { parsersTomb.Go(func() error { defer trace.CatchPanic("crowdsec/runParse") - if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { - // this error will never happen as parser.Parse is not able to return errors - return err - } + runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes) return nil }) @@ -147,7 +144,6 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap apiClient, lpMetricsDefaultInterval, log.WithField("service", "lpmetrics"), - []string{}, datasources, hub, ) diff --git a/cmd/crowdsec/lpmetrics.go b/cmd/crowdsec/lpmetrics.go index 4cb46792043..579c14b0970 100644 --- a/cmd/crowdsec/lpmetrics.go +++ b/cmd/crowdsec/lpmetrics.go @@ -88,7 +88,7 @@ func getHubState(hub *cwhub.Hub) models.HubItems { } // newStaticMetrics is called when the process starts, or reloads the configuration -func newStaticMetrics(consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub) staticMetrics { +func newStaticMetrics(datasources []acquisition.DataSource, hub *cwhub.Hub) staticMetrics { datasourceMap := map[string]int64{} for _, ds := range datasources { @@ -112,11 +112,10 @@ func NewMetricsProvider( apic *apiclient.ApiClient, interval time.Duration, logger *logrus.Entry, - consoleOptions []string, datasources []acquisition.DataSource, hub *cwhub.Hub, ) *MetricsProvider { - static := newStaticMetrics(consoleOptions, datasources, hub) + static := newStaticMetrics(datasources, hub) logger.Debugf("Detected %s %s (family: %s)", static.osName, static.osVersion, static.osFamily) diff --git a/cmd/crowdsec/parse.go b/cmd/crowdsec/parse.go index 4b64f49da70..7b6b92baf54 100644 --- a/cmd/crowdsec/parse.go +++ b/cmd/crowdsec/parse.go @@ -59,12 +59,12 @@ func parseEvent( return &parsed } -func runParse(input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) error { +func runParse(input chan pipeline.Event, output chan pipeline.Event, parserCTX parser.UnixParserCtx, nodes []parser.Node) { for { select { case <-parsersTomb.Dying(): log.Infof("Killing parser routines") - return nil + return case event := <-input: parsed := parseEvent(event, parserCTX, nodes) if parsed == nil { diff --git a/cmd/crowdsec/win_service_manage.go b/cmd/crowdsec/win_service_manage.go index 4e31dc019af..be3556cba51 100644 --- a/cmd/crowdsec/win_service_manage.go +++ b/cmd/crowdsec/win_service_manage.go @@ -10,8 +10,6 @@ import ( "fmt" "time" - //"time" - "golang.org/x/sys/windows/svc" "golang.org/x/sys/windows/svc/mgr" ) From b63e73f5059d7371072fe2e3a006e94164538dad Mon Sep 17 00:00:00 2001 From: marco Date: Mon, 24 Nov 2025 13:44:56 +0100 Subject: [PATCH 06/10] refact dump.go --- cmd/crowdsec/crowdsec.go | 4 +++- cmd/crowdsec/dump.go | 40 +++++++++++++--------------------------- cmd/crowdsec/main.go | 1 - pkg/parser/runtime.go | 5 +---- 4 files changed, 17 insertions(+), 33 deletions(-) diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 9e83bb2c1df..9ba3de216ba 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -228,7 +228,9 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf log.Debugf("everything is dead, return crowdsecTomb") if dumpStates { - if err := dumpAllStates(); err != nil { + log.Debugf("Dumping parser+bucket states to %s", dumpFolder) + + if err := dumpAllStates(dumpFolder); err != nil { log.Fatal(err) } diff --git a/cmd/crowdsec/dump.go b/cmd/crowdsec/dump.go index 33c65878b11..1c7f1d35a96 100644 --- a/cmd/crowdsec/dump.go +++ b/cmd/crowdsec/dump.go @@ -5,52 +5,38 @@ import ( "os" "path/filepath" - log "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket" "github.com/crowdsecurity/crowdsec/pkg/parser" ) -func dumpAllStates() error { - log.Debugf("Dumping parser+bucket states to %s", parser.DumpFolder) +func dumpAllStates(dir string) error { + err := os.MkdirAll(dir, 0o755) + if err != nil { + return err + } - if err := dumpState( - filepath.Join(parser.DumpFolder, "parser-dump.yaml"), - parser.StageParseCache, - ); err != nil { - return fmt.Errorf("while dumping parser state: %w", err) + if err := dumpState(dir, "parser-dump.yaml", parser.StageParseCache); err != nil { + return fmt.Errorf("dumping parser state: %w", err) } - if err := dumpState( - filepath.Join(parser.DumpFolder, "bucket-dump.yaml"), - bucketOverflows, - ); err != nil { - return fmt.Errorf("while dumping bucket overflow state: %w", err) + if err := dumpState(dir, "bucket-dump.yaml", bucketOverflows); err != nil { + return fmt.Errorf("dumping bucket overflow state: %w", err) } - if err := dumpState( - filepath.Join(parser.DumpFolder, "bucketpour-dump.yaml"), - leaky.BucketPourCache, - ); err != nil { - return fmt.Errorf("while dumping bucket pour state: %w", err) + if err := dumpState(dir, "bucketpour-dump.yaml", leaky.BucketPourCache); err != nil { + return fmt.Errorf("dumping bucket pour state: %w", err) } return nil } -func dumpState(destPath string, obj any) error { - dir := filepath.Dir(destPath) - - err := os.MkdirAll(dir, 0o755) - if err != nil { - return err - } - +func dumpState(dir, name string, obj any) error { out, err := yaml.Marshal(obj) if err != nil { return err } - return os.WriteFile(destPath, out, 0o666) + return os.WriteFile(filepath.Join(dir, name), out, 0o666) } diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 6ca465f0508..6be7b6fdb48 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -129,7 +129,6 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo if dumpFolder != "" { parser.ParseDump = true - parser.DumpFolder = dumpFolder leakybucket.BucketPourTrack = true dumpStates = true } diff --git a/pkg/parser/runtime.go b/pkg/parser/runtime.go index cfe71b512ab..603c90b283e 100644 --- a/pkg/parser/runtime.go +++ b/pkg/parser/runtime.go @@ -242,10 +242,7 @@ func stageidx(stage string, stages []string) int { return -1 } -var ( - ParseDump bool - DumpFolder string -) +var ParseDump bool var ( StageParseCache dumps.ParserResults = make(dumps.ParserResults) From e784b29d427160b1f5ced005842f42e992a997be Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 27 Nov 2025 15:09:44 +0100 Subject: [PATCH 07/10] break -> return --- cmd/crowdsec/serve.go | 10 +++------- cmd/crowdsec/win_service.go | 3 +-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index 1fc9ab366ab..a0e2b2a0949 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -246,7 +246,6 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { go func() { defer trace.CatchPanic("crowdsec/HandleSignals") - Loop: for { s := <-signalChan switch s { @@ -256,14 +255,12 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { if err = shutdown(s, cConfig); err != nil { exitChan <- fmt.Errorf("failed shutdown: %w", err) - - break Loop + return } if newConfig, err = reloadHandler(ctx, s); err != nil { exitChan <- fmt.Errorf("reload handler failure: %w", err) - - break Loop + return } if newConfig != nil { @@ -275,8 +272,7 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { if err = shutdown(s, cConfig); err != nil { exitChan <- fmt.Errorf("failed shutdown: %w", err) - - break Loop + return } exitChan <- nil diff --git a/cmd/crowdsec/win_service.go b/cmd/crowdsec/win_service.go index 1e63e5c7626..8342638f16a 100644 --- a/cmd/crowdsec/win_service.go +++ b/cmd/crowdsec/win_service.go @@ -31,7 +31,6 @@ func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest, changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted} go func() { - loop: for { select { case <-tick: @@ -47,7 +46,7 @@ func (m *crowdsec_winservice) Execute(args []string, r <-chan svc.ChangeRequest, log.Errorf("Error while shutting down: %s", err) // don't return, we still want to notify windows that we are stopped ? } - break loop + return default: log.Errorf("unexpected control request #%d", c) } From 027e894870f472f69dd467e0263c7a1dd2b7d0ed Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 27 Nov 2025 15:23:07 +0100 Subject: [PATCH 08/10] extract unregisterWatcher() --- cmd/crowdsec/serve.go | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index a0e2b2a0949..2e1e20375e9 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -222,6 +222,24 @@ func drainChan(c chan pipeline.Event) { } } +func unregisterWatcher(ctx context.Context, cConfig *csconfig.Config) (bool, error) { + if cConfig.API == nil || cConfig.API.Client == nil || !cConfig.API.Client.UnregisterOnExit { + return false, nil + } + + lapiClient, err := apiclient.GetLAPIClient() + if err != nil { + return false, err + } + + _, err = lapiClient.Auth.UnregisterWatcher(ctx) + if err != nil { + return false, err + } + + return true, nil +} + func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { var ( newConfig *csconfig.Config @@ -285,20 +303,11 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { log.Warning("Crowdsec service shutting down") } - if cConfig.API != nil && cConfig.API.Client != nil && cConfig.API.Client.UnregisterOnExit { - log.Warning("Unregistering watcher") - - lapiClient, err := apiclient.GetLAPIClient() - if err != nil { - return err - } - - _, err = lapiClient.Auth.UnregisterWatcher(ctx) - if err != nil { - return fmt.Errorf("failed to unregister watcher: %w", err) + if ok, werr := unregisterWatcher(ctx, cConfig); werr != nil { + log.WithError(werr).Warning("unregistering watcher") + if ok { + log.Warning("Watcher unregistered") } - - log.Warning("Watcher unregistered") } return err From 59073c72ad13561e005a362c60bf06bcb713cec0 Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 27 Nov 2025 15:36:56 +0100 Subject: [PATCH 09/10] lint --- .golangci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 70c6d4478a0..4dba7922d09 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -29,7 +29,7 @@ linters: - funcorder - exhaustive - errname # Checks that sentinel errors are prefixed with the `Err` and error types are suffixed with the `Error`. -# - unparam # Reports unused function parameters + - unparam # Reports unused function parameters - errchkjson # Checks types passed to the json encoding functions. Reports unsupported types and reports occasions, where the check for the returned error can be omitted. # we allow named returns (to improve readability, in the function signatures) @@ -322,7 +322,7 @@ linters: - name: unsecure-url-scheme disabled: true - name: unused-parameter - disabled: false + disabled: true - name: use-any disabled: true - name: use-waitgroup-go From e45bd57959123c86930fbe3a04fb022cfc171cd4 Mon Sep 17 00:00:00 2001 From: marco Date: Thu, 27 Nov 2025 16:53:02 +0100 Subject: [PATCH 10/10] remove global inputLineChan --- cmd/crowdsec/crowdsec.go | 17 ++++++++--------- cmd/crowdsec/main.go | 5 +++-- cmd/crowdsec/run_in_svc.go | 5 +++-- cmd/crowdsec/run_in_svc_windows.go | 9 +++++---- cmd/crowdsec/serve.go | 18 +++++++++--------- 5 files changed, 28 insertions(+), 26 deletions(-) diff --git a/cmd/crowdsec/crowdsec.go b/cmd/crowdsec/crowdsec.go index 9ba3de216ba..3832c15831d 100644 --- a/cmd/crowdsec/crowdsec.go +++ b/cmd/crowdsec/crowdsec.go @@ -68,7 +68,7 @@ func initCrowdsec(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub, return csParsers, datasources, nil } -func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers) { +func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers, logLines chan pipeline.Event) { // start go-routines for parsing, buckets pour and outputs. parserWg := &sync.WaitGroup{} @@ -79,7 +79,7 @@ func startParserRoutines(cConfig *csconfig.Config, parsers *parser.Parsers) { parsersTomb.Go(func() error { defer trace.CatchPanic("crowdsec/runParse") - runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes) + runParse(logLines, inputEventChan, *parsers.Ctx, parsers.Nodes) return nil }) @@ -167,11 +167,10 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap } // runCrowdsec starts the log processor service -func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error { +func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource, logLines chan pipeline.Event) error { inputEventChan = make(chan pipeline.Event) - inputLineChan = make(chan pipeline.Event) - startParserRoutines(cConfig, parsers) + startParserRoutines(cConfig, parsers, logLines) startBucketRoutines(cConfig) @@ -190,7 +189,7 @@ func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser. log.Info("Starting processing data") - if err := acquisition.StartAcquisition(ctx, dataSources, inputLineChan, &acquisTomb); err != nil { + if err := acquisition.StartAcquisition(ctx, dataSources, logLines, &acquisTomb); err != nil { return fmt.Errorf("starting acquisition error: %w", err) } @@ -198,7 +197,7 @@ func runCrowdsec(ctx context.Context, cConfig *csconfig.Config, parsers *parser. } // serveCrowdsec wraps the log processor service -func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) { +func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool, logLines chan pipeline.Event) { crowdsecTomb.Go(func() error { defer trace.CatchPanic("crowdsec/serveCrowdsec") @@ -209,7 +208,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf agentReady <- true - if err := runCrowdsec(ctx, cConfig, parsers, hub, datasources); err != nil { + if err := runCrowdsec(ctx, cConfig, parsers, hub, datasources, logLines); err != nil { log.Fatalf("unable to start crowdsec routines: %s", err) } }() @@ -221,7 +220,7 @@ func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconf waitOnTomb() log.Debugf("Shutting down crowdsec routines") - if err := ShutdownCrowdsecRoutines(); err != nil { + if err := ShutdownCrowdsecRoutines(logLines); err != nil { return fmt.Errorf("unable to shutdown crowdsec routines: %w", err) } diff --git a/cmd/crowdsec/main.go b/cmd/crowdsec/main.go index 6be7b6fdb48..6392a60d0c1 100644 --- a/cmd/crowdsec/main.go +++ b/cmd/crowdsec/main.go @@ -47,7 +47,6 @@ var ( holders []leakybucket.BucketFactory buckets *leakybucket.Buckets - inputLineChan chan pipeline.Event inputEventChan chan pipeline.Event outputEventChan chan pipeline.Event // the buckets init returns its own chan that is used for multiplexing // settings @@ -230,7 +229,9 @@ func run() error { return err } - return StartRunSvc(ctx, cConfig) + logLines := make(chan pipeline.Event) + + return StartRunSvc(ctx, cConfig, logLines) } func main() { diff --git a/cmd/crowdsec/run_in_svc.go b/cmd/crowdsec/run_in_svc.go index 996dc3a897a..c637e5899af 100644 --- a/cmd/crowdsec/run_in_svc.go +++ b/cmd/crowdsec/run_in_svc.go @@ -16,13 +16,14 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/database" "github.com/crowdsecurity/crowdsec/pkg/fflag" + "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) func isWindowsService() (bool, error) { return false, nil } -func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { +func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, logLines chan pipeline.Event) error { defer trace.CatchPanic("crowdsec/StartRunSvc") // Always try to stop CPU profiling to avoid passing flags around @@ -63,5 +64,5 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { }() } - return Serve(ctx, cConfig, agentReady) + return Serve(ctx, cConfig, agentReady, logLines) } diff --git a/cmd/crowdsec/run_in_svc_windows.go b/cmd/crowdsec/run_in_svc_windows.go index 9df909483d2..4c581e72cfd 100644 --- a/cmd/crowdsec/run_in_svc_windows.go +++ b/cmd/crowdsec/run_in_svc_windows.go @@ -15,13 +15,14 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/csconfig" "github.com/crowdsecurity/crowdsec/pkg/database" "github.com/crowdsecurity/crowdsec/pkg/fflag" + "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) func isWindowsService() (bool, error) { return svc.IsWindowsService() } -func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { +func StartRunSvc(ctx context.Context, cConfig *csconfig.Config, logLines chan pipeline.Event) error { const svcName = "CrowdSec" const svcDescription = "Crowdsec IPS/IDS" @@ -61,7 +62,7 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { return fmt.Errorf("failed to %s %s: %w", flags.WinSvc, svcName, err) } case "": - return WindowsRun(ctx, cConfig) + return WindowsRun(ctx, cConfig, logLines) default: return fmt.Errorf("Invalid value for winsvc parameter: %s", flags.WinSvc) } @@ -69,7 +70,7 @@ func StartRunSvc(ctx context.Context, cConfig *csconfig.Config) error { return nil } -func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error { +func WindowsRun(ctx context.Context, cConfig *csconfig.Config, logLines chan pipeline.Event) error { if fflag.PProfBlockProfile.IsEnabled() { runtime.SetBlockProfileRate(1) runtime.SetMutexProfileFraction(1) @@ -96,5 +97,5 @@ func WindowsRun(ctx context.Context, cConfig *csconfig.Config) error { registerPrometheus(cConfig.Prometheus) go servePrometheus(cConfig.Prometheus, dbClient, agentReady) } - return Serve(ctx, cConfig, agentReady) + return Serve(ctx, cConfig, agentReady, logLines) } diff --git a/cmd/crowdsec/serve.go b/cmd/crowdsec/serve.go index 2e1e20375e9..3c4cd24c50f 100644 --- a/cmd/crowdsec/serve.go +++ b/cmd/crowdsec/serve.go @@ -24,7 +24,7 @@ import ( "github.com/crowdsecurity/crowdsec/pkg/pipeline" ) -func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) { +func reloadHandler(ctx context.Context, _ os.Signal, logLines chan pipeline.Event) (*csconfig.Config, error) { // re-initialize tombs acquisTomb = tomb.Tomb{} parsersTomb = tomb.Tomb{} @@ -79,7 +79,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) { } agentReady := make(chan bool, 1) - serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady) + serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, logLines) } log.Info("Reload is finished") @@ -87,7 +87,7 @@ func reloadHandler(ctx context.Context, _ os.Signal) (*csconfig.Config, error) { return cConfig, nil } -func ShutdownCrowdsecRoutines() error { +func ShutdownCrowdsecRoutines(logLines chan pipeline.Event) error { var reterr error log.Debugf("Shutting down crowdsec sub-routines") @@ -95,7 +95,7 @@ func ShutdownCrowdsecRoutines() error { if len(dataSources) > 0 { acquisTomb.Kill(nil) log.Debugf("waiting for acquisition to finish") - drainChan(inputLineChan) + drainChan(logLines) if err := acquisTomb.Wait(); err != nil { log.Warningf("Acquisition returned error : %s", err) @@ -240,7 +240,7 @@ func unregisterWatcher(ctx context.Context, cConfig *csconfig.Config) (bool, err return true, nil } -func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { +func HandleSignals(ctx context.Context, cConfig *csconfig.Config, logLines chan pipeline.Event) error { var ( newConfig *csconfig.Config err error @@ -276,7 +276,7 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { return } - if newConfig, err = reloadHandler(ctx, s); err != nil { + if newConfig, err = reloadHandler(ctx, s, logLines); err != nil { exitChan <- fmt.Errorf("reload handler failure: %w", err) return } @@ -313,7 +313,7 @@ func HandleSignals(ctx context.Context, cConfig *csconfig.Config) error { return err } -func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) error { +func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool, logLines chan pipeline.Event) error { acquisTomb = tomb.Tomb{} parsersTomb = tomb.Tomb{} bucketsTomb = tomb.Tomb{} @@ -389,7 +389,7 @@ func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) // if it's just linting, we're done if !flags.TestMode { - serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady) + serveCrowdsec(ctx, csParsers, cConfig, hub, datasources, agentReady, logLines) } else { agentReady <- true } @@ -412,7 +412,7 @@ func Serve(ctx context.Context, cConfig *csconfig.Config, agentReady chan bool) if cConfig.Common != nil && !flags.haveTimeMachine() && !isWindowsSvc { _ = csdaemon.Notify(csdaemon.Ready, log.StandardLogger()) // wait for signals - return HandleSignals(ctx, cConfig) + return HandleSignals(ctx, cConfig, logLines) } waitChans := make([]<-chan struct{}, 0)