diff --git a/.github/workflows/release_images.yml b/.github/workflows/release_images.yml index 6a6f5ce3..7d8e2611 100644 --- a/.github/workflows/release_images.yml +++ b/.github/workflows/release_images.yml @@ -97,6 +97,13 @@ jobs: echo "Updated values.yaml content:" cat operator/documentdb-helm-chart/values.yaml + - name: Inject telemetry connection string + if: ${{ secrets.APPINSIGHTS_CONNECTION_STRING != '' }} + run: | + echo "Injecting Application Insights connection string for telemetry" + # Use yq to update the connectionString field in values.yaml + sed -i 's|connectionString: ""|connectionString: "${{ secrets.APPINSIGHTS_CONNECTION_STRING }}"|g' operator/documentdb-helm-chart/values.yaml + - name: Set chart version run: | echo "CHART_VERSION=${{ github.event.inputs.version }}" >> $GITHUB_ENV diff --git a/docs/designs/appinsights-metrics.md b/docs/designs/appinsights-metrics.md index 65ddbe0d..66872bd1 100644 --- a/docs/designs/appinsights-metrics.md +++ b/docs/designs/appinsights-metrics.md @@ -21,7 +21,7 @@ This document specifies all telemetry data points to be collected by Application - **Metric**: `operator.health.status` - **Value**: `1` (healthy) or `0` (unhealthy) - **Frequency**: Every 60 seconds -- **Dimensions**: `pod_name`, `namespace` +- **Dimensions**: `pod_name`, `namespace_hash` --- diff --git a/docs/designs/telemetry-configuration.md b/docs/designs/telemetry-configuration.md new file mode 100644 index 00000000..d6b03646 --- /dev/null +++ b/docs/designs/telemetry-configuration.md @@ -0,0 +1,134 @@ +# Application Insights Telemetry Configuration + +This document describes how to configure Application Insights telemetry collection for the DocumentDB Kubernetes Operator. + +## Overview + +The DocumentDB Operator can send telemetry data to Azure Application Insights to help monitor operator health, track cluster lifecycle events, and diagnose issues. All telemetry is designed with privacy in mind - no personally identifiable information (PII) is collected. + +## Configuration + +### Environment Variables + +Configure telemetry by setting these environment variables in the operator deployment: + +| Variable | Description | Required | +|----------|-------------|----------| +| `APPINSIGHTS_INSTRUMENTATIONKEY` | Application Insights instrumentation key | Yes (or connection string) | +| `APPLICATIONINSIGHTS_CONNECTION_STRING` | Application Insights connection string (alternative to instrumentation key) | Yes (or instrumentation key) | +| `DOCUMENTDB_TELEMETRY_ENABLED` | Set to `false` to disable telemetry collection | No (default: `true`) | + +### Helm Chart Configuration + +When installing via Helm, you can configure telemetry in your values.yaml: + +```yaml +# values.yaml +telemetry: + enabled: true + instrumentationKey: "YOUR-INSTRUMENTATION-KEY-HERE" + # Or use connection string: + # connectionString: "InstrumentationKey=xxx;IngestionEndpoint=https://..." + # Or use an existing secret containing APPINSIGHTS_INSTRUMENTATIONKEY / APPLICATIONINSIGHTS_CONNECTION_STRING: + # existingSecret: "documentdb-operator-telemetry" +``` + +### Kubernetes Secret + +For production deployments, store the instrumentation key in a Kubernetes secret: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: documentdb-operator-telemetry + namespace: documentdb-system +type: Opaque +stringData: + APPINSIGHTS_INSTRUMENTATIONKEY: "YOUR-INSTRUMENTATION-KEY-HERE" +``` + +Then reference it in the operator deployment: + +```yaml +envFrom: + - secretRef: + name: documentdb-operator-telemetry +``` + +## Privacy & Data Collection + +### What We Collect + +The operator collects anonymous, aggregated telemetry data including: + +- **Operator lifecycle**: Startup events, health status, version information +- **Cluster operations**: Create, update, delete events (with timing metrics) +- **Backup operations**: Backup creation, completion, and expiration events +- **Error tracking**: Categorized errors (no raw error messages with sensitive data) +- **Performance metrics**: Reconciliation duration, API call latency + +### What We DON'T Collect + +To protect your privacy, we explicitly do NOT collect: + +- Cluster names, namespace names, or any user-provided resource names +- Connection strings, passwords, or credentials +- IP addresses or hostnames +- Storage class names (may contain organizational information) +- Raw error messages (only categorized error types) +- Container image names + +### Privacy Protection Mechanisms + +1. **GUIDs Instead of Names**: All resources are identified by auto-generated GUIDs stored in annotations (`telemetry.documentdb.io/cluster-id`) +2. **Hashed Namespaces**: Namespace names are SHA-256 hashed before transmission +3. **Categorized Data**: Values like PVC sizes are categorized (small/medium/large) instead of exact values +4. **Error Sanitization**: Error messages are stripped of potential PII and truncated + +## Disabling Telemetry + +To completely disable telemetry collection: + +1. **Via environment variable**: + ```yaml + env: + - name: DOCUMENTDB_TELEMETRY_ENABLED + value: "false" + ``` + +2. **Via Helm**: + ```yaml + telemetry: + enabled: false + ``` + +3. **Don't provide instrumentation key**: If no `APPINSIGHTS_INSTRUMENTATIONKEY` or `APPLICATIONINSIGHTS_CONNECTION_STRING` is set, telemetry is automatically disabled. + +## Telemetry Events Reference + +See [appinsights-metrics.md](appinsights-metrics.md) for the complete specification of all telemetry events and metrics collected. + +## Troubleshooting + +### Telemetry Not Being Sent + +1. Verify the instrumentation key is correctly configured: + ```bash + kubectl get deployment documentdb-operator -n documentdb-system -o yaml | grep -A5 APPINSIGHTS + ``` + +2. Check operator logs for telemetry initialization: + ```bash + kubectl logs -n documentdb-system -l app=documentdb-operator | grep -i telemetry + ``` + +3. Verify network connectivity to Application Insights endpoint (`dc.services.visualstudio.com`) + +### High Cardinality Warnings + +If you see warnings about high cardinality dimensions, this indicates too many unique values for a dimension. The telemetry system automatically samples high-frequency events to mitigate this. + +## Support + +For issues related to telemetry collection, please open an issue on the [GitHub repository](https://github.com/documentdb/documentdb-kubernetes-operator/issues). diff --git a/operator/documentdb-helm-chart/templates/05_clusterrole.yaml b/operator/documentdb-helm-chart/templates/05_clusterrole.yaml index 29189c69..469d6648 100644 --- a/operator/documentdb-helm-chart/templates/05_clusterrole.yaml +++ b/operator/documentdb-helm-chart/templates/05_clusterrole.yaml @@ -56,6 +56,10 @@ rules: - apiGroups: ["snapshot.storage.k8s.io"] resources: ["volumesnapshotclasses"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] +# Node read permissions for telemetry cloud provider detection +- apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list"] # PersistentVolume permissions for PV controller - apiGroups: [""] resources: ["persistentvolumes"] diff --git a/operator/documentdb-helm-chart/templates/09_documentdb_operator.yaml b/operator/documentdb-helm-chart/templates/09_documentdb_operator.yaml index adf34cfe..d1b36960 100644 --- a/operator/documentdb-helm-chart/templates/09_documentdb_operator.yaml +++ b/operator/documentdb-helm-chart/templates/09_documentdb_operator.yaml @@ -25,10 +25,30 @@ spec: env: - name: GATEWAY_PORT value: "10260" + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace {{- if .Values.documentDbVersion | default .Chart.AppVersion }} - name: DOCUMENTDB_VERSION value: "{{ .Values.documentDbVersion | default .Chart.AppVersion }}" {{- end }} + # Telemetry configuration + {{- if not .Values.telemetry.enabled }} + - name: DOCUMENTDB_TELEMETRY_ENABLED + value: "false" + {{- else }} + {{- if .Values.telemetry.existingSecret }} + envFrom: + - secretRef: + name: {{ .Values.telemetry.existingSecret }} + {{- else if .Values.telemetry.connectionString }} + - name: APPLICATIONINSIGHTS_CONNECTION_STRING + value: {{ .Values.telemetry.connectionString | quote }} + {{- else if .Values.telemetry.instrumentationKey }} + - name: APPINSIGHTS_INSTRUMENTATIONKEY + value: {{ .Values.telemetry.instrumentationKey | quote }} + {{- end }} {{- if .Values.gatewayImagePullPolicy }} - name: GATEWAY_IMAGE_PULL_POLICY value: "{{ .Values.gatewayImagePullPolicy }}" diff --git a/operator/documentdb-helm-chart/values.yaml b/operator/documentdb-helm-chart/values.yaml index 4fd85c6b..d3bc94d7 100644 --- a/operator/documentdb-helm-chart/values.yaml +++ b/operator/documentdb-helm-chart/values.yaml @@ -6,6 +6,17 @@ replicaCount: 1 # Defaults to Chart.appVersion if not specified documentDbVersion: "" +# Telemetry configuration for Application Insights +telemetry: + # Enable or disable telemetry collection + enabled: true + # Application Insights instrumentation key (provide either this or connectionString) + instrumentationKey: "" + # Application Insights connection string (alternative to instrumentationKey) + connectionString: "" + # Name of existing secret containing telemetry credentials + # Secret should have keys: APPINSIGHTS_INSTRUMENTATIONKEY or APPLICATIONINSIGHTS_CONNECTION_STRING + existingSecret: "" # Gateway image pull policy for the gateway sidecar container. # Valid values: Always, IfNotPresent, Never. Defaults to IfNotPresent if not set. gatewayImagePullPolicy: "" diff --git a/operator/src/cmd/main.go b/operator/src/cmd/main.go index bf5f2ddf..e5486d1a 100644 --- a/operator/src/cmd/main.go +++ b/operator/src/cmd/main.go @@ -4,6 +4,7 @@ package main import ( + "context" "crypto/tls" "flag" "os" @@ -29,10 +30,17 @@ import ( cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" dbpreview "github.com/documentdb/documentdb-operator/api/preview" "github.com/documentdb/documentdb-operator/internal/controller" + "github.com/documentdb/documentdb-operator/internal/telemetry" fleetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" // +kubebuilder:scaffold:imports ) +// Version information - set via ldflags at build time +var ( + version = "dev" + helmChartVersion = "" +) + var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") @@ -211,29 +219,52 @@ func main() { os.Exit(1) } + // Initialize telemetry + telemetryMgr, err := telemetry.NewManager( + context.Background(), + telemetry.ManagerConfig{ + OperatorVersion: version, + HelmChartVersion: helmChartVersion, + Logger: setupLog, + }, + mgr.GetClient(), + clientset, + ) + if err != nil { + setupLog.Error(err, "unable to initialize telemetry manager") + // Continue without telemetry - it's not critical + } else { + telemetryMgr.Start() + defer telemetryMgr.Stop() + setupLog.Info("Telemetry initialized", "enabled", telemetryMgr.IsEnabled()) + } + if err = (&controller.DocumentDBReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Config: mgr.GetConfig(), - Clientset: clientset, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Config: mgr.GetConfig(), + Clientset: clientset, + TelemetryMgr: telemetryMgr, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "DocumentDB") os.Exit(1) } if err = (&controller.BackupReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("backup-controller"), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("backup-controller"), + TelemetryMgr: telemetryMgr, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Backup") os.Exit(1) } if err = (&controller.ScheduledBackupReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("scheduled-backup-controller"), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("scheduled-backup-controller"), + TelemetryMgr: telemetryMgr, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ScheduledBackup") os.Exit(1) diff --git a/operator/src/go.mod b/operator/src/go.mod index f001b0f1..981c8406 100644 --- a/operator/src/go.mod +++ b/operator/src/go.mod @@ -9,6 +9,8 @@ require ( github.com/cloudnative-pg/cloudnative-pg v1.28.1 github.com/cloudnative-pg/machinery v0.3.3 github.com/go-logr/logr v1.4.3 + github.com/google/uuid v1.6.0 + github.com/microsoft/ApplicationInsights-Go v0.4.4 github.com/onsi/ginkgo/v2 v2.28.1 github.com/onsi/gomega v1.39.1 github.com/stretchr/testify v1.11.1 @@ -21,7 +23,14 @@ require ( ) require ( + code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c // indirect + github.com/gofrs/uuid v3.3.0+incompatible // indirect +) + +require ( + cel.dev/expr v0.24.0 // indirect github.com/Masterminds/semver/v3 v3.4.0 // indirect + github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cloudnative-pg/cnpg-i v0.3.1 // indirect github.com/go-openapi/swag/cmdutils v0.25.4 // indirect @@ -45,8 +54,6 @@ require ( ) require ( - cel.dev/expr v0.24.0 // indirect - github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -68,7 +75,6 @@ require ( github.com/google/gnostic-models v0.7.1 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/pprof v0.0.0-20260115054156-294ebfa9ad83 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/operator/src/go.sum b/operator/src/go.sum index e032565b..d5b110cd 100644 --- a/operator/src/go.sum +++ b/operator/src/go.sum @@ -1,5 +1,7 @@ cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY= cel.dev/expr v0.24.0/go.mod h1:hLPLo1W4QUmuYdA72RBX06QTs6MXw941piREPl3Yfiw= +code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c h1:5eeuG0BHx1+DHeT3AP+ISKZ2ht1UjGhm581ljqYpVeQ= +code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= @@ -37,6 +39,7 @@ github.com/evanphx/json-patch/v5 v5.9.11 h1:/8HVnzMq13/3x9TPvjG08wUGqBTmZBsCWzjT github.com/evanphx/json-patch/v5 v5.9.11/go.mod h1:3j+LviiESTElxA4p3EMKAB9HXj3/XEtnUf6OZxqIQTM= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= @@ -92,6 +95,9 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw= github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA= +github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84= +github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= @@ -113,6 +119,7 @@ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5T github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/joshdk/go-junit v1.0.0 h1:S86cUKIdwBHWwA6xCmFlf3RTLfVXYQfvanM5Uh+K6GE= @@ -121,8 +128,11 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kubernetes-csi/external-snapshotter/client/v8 v8.4.0 h1:bMqrb3UHgHbP+PW9VwiejfDJU1R0PpXVZNMdeH8WYKI= @@ -135,6 +145,8 @@ github.com/maruel/natural v1.1.1 h1:Hja7XhhmvEFhcByqDoHz9QZbkWey+COd9xWfCfn1ioo= github.com/maruel/natural v1.1.1/go.mod h1:v+Rfd79xlw1AgVBjbO0BEQmptqb5HvL/k9GRHB7ZKEg= github.com/mfridman/tparse v0.18.0 h1:wh6dzOKaIwkUGyKgOntDW4liXSo37qg5AXbIhkMV3vE= github.com/mfridman/tparse v0.18.0/go.mod h1:gEvqZTuCgEhPbYk/2lS3Kcxg1GmTxxU7kTC8DvP0i/A= +github.com/microsoft/ApplicationInsights-Go v0.4.4 h1:G4+H9WNs6ygSCe6sUyxRc2U81TI5Es90b2t/MwX5KqY= +github.com/microsoft/ApplicationInsights-Go v0.4.4/go.mod h1:fKRUseBqkw6bDiXTs3ESTiU/4YTIHsQS4W3fP2ieF4U= github.com/moby/spdystream v0.5.0 h1:7r0J1Si3QO/kjRitvSLVVFUjxMEb/YLj6S9FF62JBCU= github.com/moby/spdystream v0.5.0/go.mod h1:xBAYlnt/ay+11ShkdFKNAG7LsyK/tmNBVvVOwrfMgdI= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -147,8 +159,11 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo/v2 v2.28.1 h1:S4hj+HbZp40fNKuLUQOYLDgZLwNUVn19N3Atb98NCyI= github.com/onsi/ginkgo/v2 v2.28.1/go.mod h1:CLtbVInNckU3/+gC8LzkGUb9oF+e8W8TdUsxPwvdOgE= +github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.39.1 h1:1IJLAad4zjPn2PsnhH70V4DKRFlrCzGBNrNaru+Vf28= github.com/onsi/gomega v1.39.1/go.mod h1:hL6yVALoTOxeWudERyfppUcZXjMwIMLnuSfruD2lcfg= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -189,6 +204,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0= github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= @@ -237,16 +253,20 @@ golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGc golang.org/x/exp v0.0.0-20250718183923-645b1fa84792/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc= golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY= golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= @@ -266,12 +286,16 @@ google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpW google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/evanphx/json-patch.v4 v4.13.0 h1:czT3CmqEaQ1aanPc5SdlgQrrEIb8w/wwCvWWnfEbYzo= gopkg.in/evanphx/json-patch.v4 v4.13.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/operator/src/internal/controller/backup_controller.go b/operator/src/internal/controller/backup_controller.go index d32707ef..905c80fe 100644 --- a/operator/src/internal/controller/backup_controller.go +++ b/operator/src/internal/controller/backup_controller.go @@ -19,14 +19,16 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" dbpreview "github.com/documentdb/documentdb-operator/api/preview" + "github.com/documentdb/documentdb-operator/internal/telemetry" util "github.com/documentdb/documentdb-operator/internal/utils" ) // BackupReconciler reconciles a Backup object type BackupReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder + Scheme *runtime.Scheme + Recorder record.EventRecorder + TelemetryMgr *telemetry.Manager } // Reconcile handles the reconciliation loop for Backup resources. @@ -47,6 +49,8 @@ func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // Delete the Backup resource if it has expired if backup.Status.IsExpired() { r.Recorder.Event(backup, "Normal", "BackupExpired", "Backup has expired and will be deleted") + // Track backup deletion telemetry before deleting + r.trackBackupDeleted(ctx, backup, "expired") if err := r.Delete(ctx, backup); err != nil { r.Recorder.Event(backup, "Warning", "BackupDeleteFailed", "Failed to delete expired Backup: "+err.Error()) return ctrl.Result{}, err @@ -192,6 +196,16 @@ func (r *BackupReconciler) createCNPGBackup(ctx context.Context, backup *dbprevi r.Recorder.Event(backup, "Normal", "BackupInitialized", "Successfully initialized backup") + // Track backup creation telemetry + // Determine backup type from labels - scheduled backups have the "scheduledbackup" label + backupType := "on-demand" + if backup.Labels != nil { + if v, ok := backup.Labels["scheduledbackup"]; ok && v != "" { + backupType = "scheduled" + } + } + r.trackBackupCreated(ctx, backup, cluster, backupType) + // Requeue to check status return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } @@ -263,6 +277,70 @@ func (r *BackupReconciler) SetBackupPhaseSkipped(ctx context.Context, backup *db return ctrl.Result{RequeueAfter: requeueAfter}, nil } +// trackBackupCreated tracks backup creation telemetry. +func (r *BackupReconciler) trackBackupCreated(ctx context.Context, backup *dbpreview.Backup, cluster *dbpreview.DocumentDB, backupType string) { + if r.TelemetryMgr == nil || !r.TelemetryMgr.IsEnabled() { + return + } + + // Get or create backup ID + backupID, err := r.TelemetryMgr.GUIDs.GetOrCreateBackupID(ctx, backup) + if err != nil { + log.FromContext(ctx).V(1).Info("Failed to get backup telemetry ID", "error", err) + return + } + + clusterID := r.TelemetryMgr.GUIDs.GetClusterID(cluster) + + // Determine if this is from primary cluster + replicationContext, _ := util.GetReplicationContext(ctx, r.Client, *cluster) + isPrimary := replicationContext == nil || replicationContext.IsPrimary() + + retentionDays := 30 // default + if cluster.Spec.Backup != nil && cluster.Spec.Backup.RetentionDays > 0 { + retentionDays = cluster.Spec.Backup.RetentionDays + } + // Check if backup has its own retention override + if backup.Spec.RetentionDays != nil && *backup.Spec.RetentionDays > 0 { + retentionDays = *backup.Spec.RetentionDays + } + + r.TelemetryMgr.Events.TrackBackupCreated(telemetry.BackupCreatedEvent{ + BackupID: backupID, + ClusterID: clusterID, + NamespaceHash: telemetry.HashNamespace(backup.Namespace), + BackupType: backupType, + BackupMethod: "VolumeSnapshot", + BackupPhase: "starting", + RetentionDays: retentionDays, + CloudProvider: telemetry.MapCloudProviderToString(cluster.Spec.Environment), + IsPrimaryCluster: isPrimary, + }) +} + +// trackBackupDeleted tracks backup deletion telemetry. +func (r *BackupReconciler) trackBackupDeleted(ctx context.Context, backup *dbpreview.Backup, reason string) { + if r.TelemetryMgr == nil || !r.TelemetryMgr.IsEnabled() { + return + } + + backupID := r.TelemetryMgr.GUIDs.GetBackupID(backup) + if backupID == "" { + return + } + + ageDays := 0 + if backup.CreationTimestamp.Time.Year() > 1 { + ageDays = int(time.Since(backup.CreationTimestamp.Time).Hours() / 24) + } + + r.TelemetryMgr.Events.TrackBackupDeleted(telemetry.BackupDeletedEvent{ + BackupID: backupID, + DeletionReason: reason, + BackupAgeDays: ageDays, + }) +} + // SetupWithManager sets up the controller with the Manager. func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error { // Register VolumeSnapshotClass with the scheme diff --git a/operator/src/internal/controller/documentdb_controller.go b/operator/src/internal/controller/documentdb_controller.go index b5a7633c..7d3fccc2 100644 --- a/operator/src/internal/controller/documentdb_controller.go +++ b/operator/src/internal/controller/documentdb_controller.go @@ -37,6 +37,7 @@ import ( dbpreview "github.com/documentdb/documentdb-operator/api/preview" cnpg "github.com/documentdb/documentdb-operator/internal/cnpg" + "github.com/documentdb/documentdb-operator/internal/telemetry" util "github.com/documentdb/documentdb-operator/internal/utils" ) @@ -55,9 +56,10 @@ const ( // DocumentDBReconciler reconciles a DocumentDB object type DocumentDBReconciler struct { client.Client - Scheme *runtime.Scheme - Config *rest.Config - Clientset kubernetes.Interface + Scheme *runtime.Scheme + Config *rest.Config + Clientset kubernetes.Interface + TelemetryMgr *telemetry.Manager // Recorder emits Kubernetes events for this controller, including PV retention warnings during deletion. Recorder record.EventRecorder // SQLExecutor executes SQL commands against a CNPG cluster's primary pod. @@ -74,31 +76,49 @@ var reconcileMutex sync.Mutex // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;create;delete // +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get;list;watch;update;patch -func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) { + reconcileStart := time.Now() reconcileMutex.Lock() defer reconcileMutex.Unlock() logger := log.FromContext(ctx) + // Track reconciliation duration at the end using named return value + defer func() { + r.trackReconcileDuration(ctx, "DocumentDB", "reconcile", time.Since(reconcileStart).Seconds(), retErr == nil) + }() + // Fetch the DocumentDB instance documentdb := &dbpreview.DocumentDB{} - err := r.Get(ctx, req.NamespacedName, documentdb) - if err != nil { + if err := r.Get(ctx, req.NamespacedName, documentdb); err != nil { if errors.IsNotFound(err) { // DocumentDB resource not found, handle cleanup logger.Info("DocumentDB resource not found. Cleaning up associated resources.") - if err := r.cleanupResources(ctx, req); err != nil { - return ctrl.Result{}, err + if cleanupErr := r.cleanupResources(ctx, req); cleanupErr != nil { + retErr = cleanupErr + return result, retErr } - return ctrl.Result{}, nil + return result, nil } logger.Error(err, "Failed to get DocumentDB resource") - return ctrl.Result{}, err + r.trackReconcileError(ctx, "DocumentDB", req.Name, req.Namespace, "get-resource", err) + retErr = err + return result, retErr + } + + // Ensure cluster has telemetry ID + if r.TelemetryMgr != nil && r.TelemetryMgr.IsEnabled() { + if _, err := r.TelemetryMgr.GUIDs.GetOrCreateClusterID(ctx, documentdb); err != nil { + logger.V(1).Info("Failed to create telemetry ID for cluster", "error", err) + } } // Handle finalizer lifecycle (add on create, remove on delete) - if done, result, err := r.reconcileFinalizer(ctx, documentdb); done || err != nil { - return result, err + if done, res, err := r.reconcileFinalizer(ctx, documentdb); done || err != nil { + if err != nil { + retErr = err + } + return res, retErr } replicationContext, err := util.GetReplicationContext(ctx, r.Client, *documentdb) @@ -175,11 +195,14 @@ func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err := r.Client.Get(ctx, types.NamespacedName{Name: desiredCnpgCluster.Name, Namespace: req.Namespace}, currentCnpgCluster); err != nil { if errors.IsNotFound(err) { + clusterCreateStart := time.Now() if err := r.Client.Create(ctx, desiredCnpgCluster); err != nil { logger.Error(err, "Failed to create CNPG Cluster") return ctrl.Result{RequeueAfter: RequeueAfterShort}, nil } logger.Info("CNPG Cluster created successfully", "Cluster.Name", desiredCnpgCluster.Name, "Namespace", desiredCnpgCluster.Namespace) + // Track cluster creation telemetry + r.TrackClusterCreated(ctx, documentdb, time.Since(clusterCreateStart).Seconds()) return ctrl.Result{RequeueAfter: RequeueAfterLong}, nil } logger.Error(err, "Failed to get CNPG Cluster") @@ -187,11 +210,14 @@ func (r *DocumentDBReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Check if anything has changed in the generated cnpg spec + updateStart := time.Now() err, requeueTime := r.TryUpdateCluster(ctx, currentCnpgCluster, desiredCnpgCluster, documentdb, replicationContext) if err != nil { logger.Error(err, "Failed to update CNPG Cluster") } if requeueTime > 0 { + // Track cluster update if something changed + r.trackClusterUpdated(ctx, documentdb, "configuration", time.Since(updateStart).Seconds()) return ctrl.Result{RequeueAfter: requeueTime}, nil } @@ -698,6 +724,134 @@ func (r *DocumentDBReconciler) executeSQLCommand(ctx context.Context, cluster *c return stdout.String(), nil } +// trackReconcileError tracks reconciliation errors to telemetry. +// Note: ResourceID is omitted to avoid PII - errors are tracked by namespace hash and error type only. +func (r *DocumentDBReconciler) trackReconcileError(ctx context.Context, resourceType, resourceName, namespace, errorType string, err error) { + if r.TelemetryMgr == nil || !r.TelemetryMgr.IsEnabled() { + return + } + + // Do not include resourceName as it may contain PII (user-provided names) + // Errors can be correlated by namespace_hash + error_type + timestamp + r.TelemetryMgr.Events.TrackReconciliationError(telemetry.ReconciliationErrorEvent{ + ResourceType: resourceType, + ResourceID: "", // Omitted to avoid PII - use namespace_hash for correlation + NamespaceHash: telemetry.HashNamespace(namespace), + ErrorType: errorType, + ErrorMessage: sanitizeError(err), + ResolutionStatus: "pending", + }) +} + +// trackReconcileDuration tracks reconciliation duration to telemetry. +func (r *DocumentDBReconciler) trackReconcileDuration(ctx context.Context, resourceType, operation string, durationSeconds float64, success bool) { + if r.TelemetryMgr == nil || !r.TelemetryMgr.IsEnabled() { + return + } + + status := "success" + if !success { + status = "error" + } + + r.TelemetryMgr.Metrics.TrackReconciliationDuration(telemetry.ReconciliationDurationMetric{ + ResourceType: resourceType, + Operation: operation, + Status: status, + DurationSeconds: durationSeconds, + }) +} + +// trackClusterUpdated tracks when a cluster is updated. +func (r *DocumentDBReconciler) trackClusterUpdated(ctx context.Context, documentdb *dbpreview.DocumentDB, updateType string, durationSeconds float64) { + if r.TelemetryMgr == nil || !r.TelemetryMgr.IsEnabled() { + return + } + + clusterID := r.TelemetryMgr.GUIDs.GetClusterID(documentdb) + if clusterID == "" { + return + } + + r.TelemetryMgr.Events.TrackClusterUpdated(telemetry.ClusterUpdatedEvent{ + ClusterID: clusterID, + NamespaceHash: telemetry.HashNamespace(documentdb.Namespace), + UpdateType: updateType, + UpdateDurationSeconds: durationSeconds, + }) +} + +// TrackClusterCreated tracks when a new cluster is created. +func (r *DocumentDBReconciler) TrackClusterCreated(ctx context.Context, documentdb *dbpreview.DocumentDB, durationSeconds float64) { + if r.TelemetryMgr == nil || !r.TelemetryMgr.IsEnabled() { + return + } + + clusterID := r.TelemetryMgr.GUIDs.GetClusterID(documentdb) + bootstrapType := "new" + if documentdb.Spec.Bootstrap != nil && documentdb.Spec.Bootstrap.Recovery != nil { + bootstrapType = "recovery" + } + + r.TelemetryMgr.Events.TrackClusterCreated(telemetry.ClusterCreatedEvent{ + ClusterID: clusterID, + NamespaceHash: telemetry.HashNamespace(documentdb.Namespace), + CreationDurationSeconds: durationSeconds, + NodeCount: documentdb.Spec.NodeCount, + InstancesPerNode: documentdb.Spec.InstancesPerNode, + StorageSize: documentdb.Spec.Resource.Storage.PvcSize, + CloudProvider: telemetry.MapCloudProviderToString(documentdb.Spec.Environment), + TLSEnabled: documentdb.Spec.TLS != nil, + BootstrapType: bootstrapType, + SidecarInjectorPlugin: documentdb.Spec.SidecarInjectorPluginName != "", + ServiceType: documentdb.Spec.ExposeViaService.ServiceType, + }) + + // Also track cluster configuration metric + r.TelemetryMgr.Metrics.TrackClusterConfiguration(telemetry.ClusterConfigurationMetric{ + ClusterID: clusterID, + NamespaceHash: telemetry.HashNamespace(documentdb.Namespace), + NodeCount: documentdb.Spec.NodeCount, + InstancesPerNode: documentdb.Spec.InstancesPerNode, + TotalInstances: documentdb.Spec.NodeCount * documentdb.Spec.InstancesPerNode, + PVCSizeCategory: telemetry.CategorizePVCSize(documentdb.Spec.Resource.Storage.PvcSize), + DocumentDBVersion: documentdb.Spec.DocumentDBVersion, + }) +} + +// sanitizeError returns a coarse, non-PII classification of the error. +// Per telemetry spec, we do not include raw error text to avoid leaking PII or sensitive data. +func sanitizeError(err error) string { + if err == nil { + return "" + } + + // Map well-known Kubernetes/API error types to generic, non-PII messages. + switch { + case errors.IsNotFound(err): + return "resource not found" + case errors.IsAlreadyExists(err): + return "resource already exists" + case errors.IsForbidden(err): + return "forbidden" + case errors.IsUnauthorized(err): + return "unauthorized" + case errors.IsConflict(err): + return "conflict" + case errors.IsTimeout(err): + return "timeout" + case errors.IsInvalid(err): + return "invalid resource" + case errors.IsServerTimeout(err): + return "server timeout" + case errors.IsServiceUnavailable(err): + return "service unavailable" + default: + // Do not include the raw error text to avoid leaking PII or sensitive data. + return "unknown error" + } +} + // reconcilePVRecovery handles recovery from a retained PersistentVolume. // // CNPG only supports recovery from PVC (via VolumeSnapshots.Storage with Kind: PersistentVolumeClaim), diff --git a/operator/src/internal/controller/scheduledbackup_controller.go b/operator/src/internal/controller/scheduledbackup_controller.go index 0086eedf..a091d226 100644 --- a/operator/src/internal/controller/scheduledbackup_controller.go +++ b/operator/src/internal/controller/scheduledbackup_controller.go @@ -18,13 +18,15 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" dbpreview "github.com/documentdb/documentdb-operator/api/preview" + "github.com/documentdb/documentdb-operator/internal/telemetry" ) // ScheduledBackupReconciler reconciles a ScheduledBackup object type ScheduledBackupReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder + Scheme *runtime.Scheme + Recorder record.EventRecorder + TelemetryMgr *telemetry.Manager } // Reconcile handles the reconciliation loop for ScheduledBackup resources. @@ -82,6 +84,9 @@ func (r *ScheduledBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } + // Track scheduled backup execution telemetry + r.trackScheduledBackupTriggered(ctx, scheduledBackup) + scheduledBackup.Status.LastScheduledTime = &metav1.Time{Time: now} // Calculate next run time @@ -139,6 +144,53 @@ func (r *ScheduledBackupReconciler) ensureOwnerReference(ctx context.Context, sc return nil } +// trackScheduledBackupCreated tracks scheduled backup creation telemetry. +func (r *ScheduledBackupReconciler) trackScheduledBackupCreated(ctx context.Context, scheduledBackup *dbpreview.ScheduledBackup, cluster *dbpreview.DocumentDB) { + if r.TelemetryMgr == nil || !r.TelemetryMgr.IsEnabled() { + return + } + + // Get or create scheduled backup ID + scheduledBackupID, err := r.TelemetryMgr.GUIDs.GetOrCreateScheduledBackupID(ctx, scheduledBackup) + if err != nil { + log.FromContext(ctx).V(1).Info("Failed to get scheduled backup telemetry ID", "error", err) + return + } + + clusterID := r.TelemetryMgr.GUIDs.GetClusterID(cluster) + + retentionDays := 30 // default + if cluster.Spec.Backup != nil && cluster.Spec.Backup.RetentionDays > 0 { + retentionDays = cluster.Spec.Backup.RetentionDays + } + + r.TelemetryMgr.Events.TrackScheduledBackupCreated(telemetry.ScheduledBackupCreatedEvent{ + ScheduledBackupID: scheduledBackupID, + ClusterID: clusterID, + ScheduleFrequency: string(telemetry.CategorizeScheduleFrequency(scheduledBackup.Spec.Schedule)), + RetentionDays: retentionDays, + }) +} + +// trackScheduledBackupTriggered tracks when a scheduled backup actually triggers. +func (r *ScheduledBackupReconciler) trackScheduledBackupTriggered(ctx context.Context, scheduledBackup *dbpreview.ScheduledBackup) { + if r.TelemetryMgr == nil || !r.TelemetryMgr.IsEnabled() { + return + } + + // Fetch the cluster to get telemetry IDs + cluster := &dbpreview.DocumentDB{} + clusterKey := client.ObjectKey{ + Name: scheduledBackup.Spec.Cluster.Name, + Namespace: scheduledBackup.Namespace, + } + if err := r.Get(ctx, clusterKey, cluster); err != nil { + return + } + + r.trackScheduledBackupCreated(ctx, scheduledBackup, cluster) +} + // SetupWithManager sets up the controller with the Manager. func (r *ScheduledBackupReconciler) SetupWithManager(mgr ctrl.Manager) error { // Register field index for spec.cluster so we can query Backups by cluster name diff --git a/operator/src/internal/telemetry/client.go b/operator/src/internal/telemetry/client.go new file mode 100644 index 00000000..94a3fdb1 --- /dev/null +++ b/operator/src/internal/telemetry/client.go @@ -0,0 +1,246 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package telemetry + +import ( + "fmt" + "os" + "strings" + "time" + + "github.com/go-logr/logr" + "github.com/microsoft/ApplicationInsights-Go/appinsights" +) + +const ( + // EnvAppInsightsKey is the environment variable for the Application Insights instrumentation key. + EnvAppInsightsKey = "APPINSIGHTS_INSTRUMENTATIONKEY" + // EnvAppInsightsConnectionString is the environment variable for the Application Insights connection string. + EnvAppInsightsConnectionString = "APPLICATIONINSIGHTS_CONNECTION_STRING" + // EnvTelemetryEnabled is the environment variable to enable/disable telemetry. + EnvTelemetryEnabled = "DOCUMENTDB_TELEMETRY_ENABLED" +) + +// TelemetryClient handles sending telemetry to Application Insights using the official SDK. +type TelemetryClient struct { + client appinsights.TelemetryClient + enabled bool + operatorContext *OperatorContext + logger logr.Logger +} + +// ClientOption configures the TelemetryClient. +type ClientOption func(*TelemetryClient) + +// WithLogger sets the logger for the telemetry client. +func WithLogger(logger logr.Logger) ClientOption { + return func(c *TelemetryClient) { + c.logger = logger + } +} + +// NewTelemetryClient creates a new TelemetryClient using the official Application Insights SDK. +func NewTelemetryClient(ctx *OperatorContext, opts ...ClientOption) *TelemetryClient { + tc := &TelemetryClient{ + operatorContext: ctx, + enabled: true, + } + + // Apply options + for _, opt := range opts { + opt(tc) + } + + // Check if telemetry is enabled + if enabled := os.Getenv(EnvTelemetryEnabled); enabled == "false" { + tc.enabled = false + if tc.logger.GetSink() != nil { + tc.logger.Info("Telemetry collection is disabled via environment variable") + } + return tc + } + + // Get instrumentation key from environment + instrumentationKey := os.Getenv(EnvAppInsightsKey) + if instrumentationKey == "" { + // Try connection string + connStr := os.Getenv(EnvAppInsightsConnectionString) + instrumentationKey = parseInstrumentationKeyFromConnectionString(connStr) + } + + if instrumentationKey == "" { + tc.enabled = false + if tc.logger.GetSink() != nil { + tc.logger.Info("No Application Insights instrumentation key found, telemetry disabled") + } + return tc + } + + // Create telemetry configuration + telemetryConfig := appinsights.NewTelemetryConfiguration(instrumentationKey) + + // Configure batching - send every 30 seconds or when batch reaches 100 items + telemetryConfig.MaxBatchSize = 100 + telemetryConfig.MaxBatchInterval = 30 * time.Second + + // Check for custom endpoint from connection string + connStr := os.Getenv(EnvAppInsightsConnectionString) + if endpoint := parseIngestionEndpointFromConnectionString(connStr); endpoint != "" { + telemetryConfig.EndpointUrl = strings.TrimSuffix(endpoint, "/") + "/v2/track" + } + + // Create the client + tc.client = appinsights.NewTelemetryClientFromConfig(telemetryConfig) + + // Set common context tags + tc.client.Context().Tags.Cloud().SetRole("documentdb-operator") + tc.client.Context().Tags.Cloud().SetRoleInstance(ctx.OperatorNamespaceHash) + tc.client.Context().Tags.Application().SetVer(ctx.OperatorVersion) + + // Set common properties that will be added to all telemetry + tc.client.Context().CommonProperties["kubernetes_distribution"] = string(ctx.KubernetesDistribution) + tc.client.Context().CommonProperties["kubernetes_version"] = ctx.KubernetesVersion + tc.client.Context().CommonProperties["operator_version"] = ctx.OperatorVersion + if ctx.Region != "" { + tc.client.Context().CommonProperties["region"] = ctx.Region + } + + // Enable diagnostics logging if logger is available + if tc.logger.GetSink() != nil { + appinsights.NewDiagnosticsMessageListener(func(msg string) error { + tc.logger.V(1).Info("Application Insights diagnostic", "message", msg) + return nil + }) + } + + return tc +} + +// Start begins the telemetry client (no-op for SDK-based client as it handles this internally). +func (c *TelemetryClient) Start() { + // The official SDK handles background processing internally +} + +// Stop gracefully stops the telemetry client and flushes remaining events. +func (c *TelemetryClient) Stop() { + if !c.enabled || c.client == nil { + return + } + + // Close the channel with a timeout for retries + select { + case <-c.client.Channel().Close(10 * time.Second): + if c.logger.GetSink() != nil { + c.logger.Info("Telemetry channel closed successfully") + } + case <-time.After(30 * time.Second): + if c.logger.GetSink() != nil { + c.logger.Info("Telemetry channel close timed out") + } + } +} + +// IsEnabled returns whether telemetry collection is enabled. +func (c *TelemetryClient) IsEnabled() bool { + return c.enabled +} + +// TrackEvent sends a custom event to Application Insights. +func (c *TelemetryClient) TrackEvent(eventName string, properties map[string]interface{}) { + if !c.enabled || c.client == nil { + return + } + + event := appinsights.NewEventTelemetry(eventName) + + // Add properties + for k, v := range properties { + event.Properties[k] = fmt.Sprintf("%v", v) + } + + c.client.Track(event) +} + +// TrackMetric sends a metric to Application Insights. +func (c *TelemetryClient) TrackMetric(metricName string, value float64, properties map[string]interface{}) { + if !c.enabled || c.client == nil { + return + } + + metric := appinsights.NewMetricTelemetry(metricName, value) + + // Add properties + for k, v := range properties { + metric.Properties[k] = fmt.Sprintf("%v", v) + } + + c.client.Track(metric) +} + +// TrackException sends an exception/error to Application Insights. +func (c *TelemetryClient) TrackException(err error, properties map[string]interface{}) { + if !c.enabled || c.client == nil { + return + } + + // Sanitize error message to remove potential PII + sanitizedMessage := sanitizeErrorMessage(err.Error()) + + exception := appinsights.NewExceptionTelemetry(sanitizedMessage) + + // Add properties + for k, v := range properties { + exception.Properties[k] = fmt.Sprintf("%v", v) + } + + c.client.Track(exception) +} + +// parseInstrumentationKeyFromConnectionString extracts the instrumentation key from a connection string. +func parseInstrumentationKeyFromConnectionString(connStr string) string { + if connStr == "" { + return "" + } + + // Connection string format: InstrumentationKey=xxx;IngestionEndpoint=xxx;... + for _, part := range strings.Split(connStr, ";") { + // Trim whitespace to handle cases like "; InstrumentationKey=..." or copy-paste errors + part = strings.TrimSpace(part) + if strings.HasPrefix(part, "InstrumentationKey=") { + return strings.TrimPrefix(part, "InstrumentationKey=") + } + } + + return "" +} + +// parseIngestionEndpointFromConnectionString extracts the ingestion endpoint from a connection string. +func parseIngestionEndpointFromConnectionString(connStr string) string { + if connStr == "" { + return "" + } + + // Connection string format: InstrumentationKey=xxx;IngestionEndpoint=xxx;... + for _, part := range strings.Split(connStr, ";") { + // Trim whitespace to handle cases like "; IngestionEndpoint=..." or copy-paste errors + part = strings.TrimSpace(part) + if strings.HasPrefix(part, "IngestionEndpoint=") { + return strings.TrimPrefix(part, "IngestionEndpoint=") + } + } + + return "" +} + +// sanitizeErrorMessage removes potential PII from error messages. +func sanitizeErrorMessage(msg string) string { + // Basic sanitization - in production, this should be more comprehensive + // Remove potential file paths, IP addresses, etc. + // For now, truncate to reasonable length + const maxLength = 500 + if len(msg) > maxLength { + msg = msg[:maxLength] + "..." + } + return msg +} diff --git a/operator/src/internal/telemetry/events.go b/operator/src/internal/telemetry/events.go new file mode 100644 index 00000000..a1f65ba3 --- /dev/null +++ b/operator/src/internal/telemetry/events.go @@ -0,0 +1,180 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package telemetry + +import ( + "time" +) + +// EventTracker provides high-level methods for tracking telemetry events. +type EventTracker struct { + client *TelemetryClient + guidManager *GUIDManager +} + +// NewEventTracker creates a new EventTracker. +func NewEventTracker(client *TelemetryClient, guidManager *GUIDManager) *EventTracker { + return &EventTracker{ + client: client, + guidManager: guidManager, + } +} + +// TrackOperatorStartup tracks the OperatorStartup event. +func (t *EventTracker) TrackOperatorStartup(event OperatorStartupEvent) { + t.client.TrackEvent("OperatorStartup", map[string]interface{}{ + "operator_version": event.OperatorVersion, + "kubernetes_version": event.KubernetesVersion, + "cloud_provider": event.CloudProvider, + "startup_timestamp": event.StartupTimestamp.Format(time.RFC3339), + "restart_count": event.RestartCount, + "helm_chart_version": event.HelmChartVersion, + }) +} + +// TrackClusterCreated tracks the ClusterCreated event. +func (t *EventTracker) TrackClusterCreated(event ClusterCreatedEvent) { + t.client.TrackEvent("ClusterCreated", map[string]interface{}{ + "cluster_id": event.ClusterID, + "namespace_hash": event.NamespaceHash, + "creation_duration_seconds": event.CreationDurationSeconds, + "node_count": event.NodeCount, + "instances_per_node": event.InstancesPerNode, + "storage_size": event.StorageSize, + "cloud_provider": event.CloudProvider, + "tls_enabled": event.TLSEnabled, + "bootstrap_type": event.BootstrapType, + "sidecar_injector_plugin": event.SidecarInjectorPlugin, + "service_type": event.ServiceType, + }) +} + +// TrackClusterUpdated tracks the ClusterUpdated event. +func (t *EventTracker) TrackClusterUpdated(event ClusterUpdatedEvent) { + t.client.TrackEvent("ClusterUpdated", map[string]interface{}{ + "cluster_id": event.ClusterID, + "namespace_hash": event.NamespaceHash, + "update_type": event.UpdateType, + "update_duration_seconds": event.UpdateDurationSeconds, + }) +} + +// TrackClusterDeleted tracks the ClusterDeleted event. +func (t *EventTracker) TrackClusterDeleted(event ClusterDeletedEvent) { + t.client.TrackEvent("ClusterDeleted", map[string]interface{}{ + "cluster_id": event.ClusterID, + "namespace_hash": event.NamespaceHash, + "deletion_duration_seconds": event.DeletionDurationSeconds, + "cluster_age_days": event.ClusterAgeDays, + "backup_count": event.BackupCount, + }) +} + +// TrackBackupCreated tracks the BackupCreated event. +func (t *EventTracker) TrackBackupCreated(event BackupCreatedEvent) { + t.client.TrackEvent("BackupCreated", map[string]interface{}{ + "backup_id": event.BackupID, + "cluster_id": event.ClusterID, + "namespace_hash": event.NamespaceHash, + "backup_type": event.BackupType, + "backup_method": event.BackupMethod, + "backup_size_bytes": event.BackupSizeBytes, + "backup_duration_seconds": event.BackupDurationSeconds, + "retention_days": event.RetentionDays, + "backup_phase": event.BackupPhase, + "cloud_provider": event.CloudProvider, + "is_primary_cluster": event.IsPrimaryCluster, + }) +} + +// TrackBackupDeleted tracks the BackupDeleted event. +func (t *EventTracker) TrackBackupDeleted(event BackupDeletedEvent) { + t.client.TrackEvent("BackupDeleted", map[string]interface{}{ + "backup_id": event.BackupID, + "deletion_reason": event.DeletionReason, + "backup_age_days": event.BackupAgeDays, + }) +} + +// TrackScheduledBackupCreated tracks the ScheduledBackupCreated event. +func (t *EventTracker) TrackScheduledBackupCreated(event ScheduledBackupCreatedEvent) { + t.client.TrackEvent("ScheduledBackupCreated", map[string]interface{}{ + "scheduled_backup_id": event.ScheduledBackupID, + "cluster_id": event.ClusterID, + "schedule_frequency": event.ScheduleFrequency, + "retention_days": event.RetentionDays, + }) +} + +// TrackClusterRestored tracks the ClusterRestored event. +func (t *EventTracker) TrackClusterRestored(event ClusterRestoredEvent) { + t.client.TrackEvent("ClusterRestored", map[string]interface{}{ + "new_cluster_id": event.NewClusterID, + "source_backup_id": event.SourceBackupID, + "namespace_hash": event.NamespaceHash, + "restore_duration_seconds": event.RestoreDurationSeconds, + "backup_age_hours": event.BackupAgeHours, + "restore_phase": event.RestorePhase, + }) +} + +// TrackFailoverOccurred tracks the FailoverOccurred event. +func (t *EventTracker) TrackFailoverOccurred(event FailoverOccurredEvent) { + t.client.TrackEvent("FailoverOccurred", map[string]interface{}{ + "cluster_id": event.ClusterID, + "namespace_hash": event.NamespaceHash, + "failover_type": event.FailoverType, + "old_primary_index": event.OldPrimaryIndex, + "new_primary_index": event.NewPrimaryIndex, + "failover_duration_seconds": event.FailoverDurationSeconds, + "downtime_seconds": event.DowntimeSeconds, + "replication_lag_bytes": event.ReplicationLagBytes, + "trigger_reason": event.TriggerReason, + }) +} + +// TrackReconciliationError tracks the ReconciliationError event. +func (t *EventTracker) TrackReconciliationError(event ReconciliationErrorEvent) { + t.client.TrackEvent("ReconciliationError", map[string]interface{}{ + "resource_type": event.ResourceType, + "resource_id": event.ResourceID, + "namespace_hash": event.NamespaceHash, + "error_type": event.ErrorType, + "error_message": event.ErrorMessage, + "error_code": event.ErrorCode, + "retry_count": event.RetryCount, + "resolution_status": event.ResolutionStatus, + }) +} + +// TrackVolumeSnapshotError tracks the VolumeSnapshotError event. +func (t *EventTracker) TrackVolumeSnapshotError(event VolumeSnapshotErrorEvent) { + t.client.TrackEvent("VolumeSnapshotError", map[string]interface{}{ + "backup_id": event.BackupID, + "cluster_id": event.ClusterID, + "error_type": event.ErrorType, + "csi_driver_type": event.CSIDriverType, + "cloud_provider": event.CloudProvider, + }) +} + +// TrackCNPGIntegrationError tracks the CNPGIntegrationError event. +func (t *EventTracker) TrackCNPGIntegrationError(event CNPGIntegrationErrorEvent) { + t.client.TrackEvent("CNPGIntegrationError", map[string]interface{}{ + "cluster_id": event.ClusterID, + "cnpg_resource_type": event.CNPGResourceType, + "error_category": event.ErrorCategory, + "operation": event.Operation, + }) +} + +// TrackBackupExpired tracks the BackupExpired event. +func (t *EventTracker) TrackBackupExpired(event BackupExpiredEvent) { + t.client.TrackEvent("BackupExpired", map[string]interface{}{ + "backup_id": event.BackupID, + "cluster_id": event.ClusterID, + "retention_days": event.RetentionDays, + "actual_age_days": event.ActualAgeDays, + }) +} diff --git a/operator/src/internal/telemetry/guid.go b/operator/src/internal/telemetry/guid.go new file mode 100644 index 00000000..21310a75 --- /dev/null +++ b/operator/src/internal/telemetry/guid.go @@ -0,0 +1,113 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package telemetry + +import ( + "context" + + "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GUIDManager handles generation and retrieval of telemetry GUIDs. +type GUIDManager struct { + client client.Client +} + +// NewGUIDManager creates a new GUIDManager. +func NewGUIDManager(c client.Client) *GUIDManager { + return &GUIDManager{client: c} +} + +// GetOrCreateClusterID retrieves or creates a telemetry GUID for a cluster. +func (m *GUIDManager) GetOrCreateClusterID(ctx context.Context, obj client.Object) (string, error) { + return m.getOrCreateID(ctx, obj, ClusterIDAnnotation) +} + +// GetOrCreateBackupID retrieves or creates a telemetry GUID for a backup. +func (m *GUIDManager) GetOrCreateBackupID(ctx context.Context, obj client.Object) (string, error) { + return m.getOrCreateID(ctx, obj, BackupIDAnnotation) +} + +// GetOrCreateScheduledBackupID retrieves or creates a telemetry GUID for a scheduled backup. +func (m *GUIDManager) GetOrCreateScheduledBackupID(ctx context.Context, obj client.Object) (string, error) { + return m.getOrCreateID(ctx, obj, ScheduledBackupIDAnnotation) +} + +// GetClusterID retrieves the telemetry GUID for a cluster without creating one. +func (m *GUIDManager) GetClusterID(obj client.Object) string { + return getAnnotation(obj, ClusterIDAnnotation) +} + +// GetBackupID retrieves the telemetry GUID for a backup without creating one. +func (m *GUIDManager) GetBackupID(obj client.Object) string { + return getAnnotation(obj, BackupIDAnnotation) +} + +// getOrCreateID retrieves or creates a GUID in the specified annotation. +func (m *GUIDManager) getOrCreateID(ctx context.Context, obj client.Object, annotationKey string) (string, error) { + // Check if ID already exists + existingID := getAnnotation(obj, annotationKey) + if existingID != "" { + return existingID, nil + } + + // Generate new UUID + newID := uuid.New().String() + + // Update the object with the new annotation + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[annotationKey] = newID + obj.SetAnnotations(annotations) + + // Persist the annotation + if m.client != nil { + if err := m.client.Update(ctx, obj); err != nil { + return newID, err + } + } + + return newID, nil +} + +// SetClusterID sets a telemetry GUID for a cluster (without persisting). +// Useful when creating new resources. +func SetClusterID(obj metav1.Object) string { + return setAnnotation(obj, ClusterIDAnnotation) +} + +// SetBackupID sets a telemetry GUID for a backup (without persisting). +func SetBackupID(obj metav1.Object) string { + return setAnnotation(obj, BackupIDAnnotation) +} + +// SetScheduledBackupID sets a telemetry GUID for a scheduled backup (without persisting). +func SetScheduledBackupID(obj metav1.Object) string { + return setAnnotation(obj, ScheduledBackupIDAnnotation) +} + +// getAnnotation safely retrieves an annotation value. +func getAnnotation(obj client.Object, key string) string { + annotations := obj.GetAnnotations() + if annotations == nil { + return "" + } + return annotations[key] +} + +// setAnnotation sets a new UUID in an annotation and returns it. +func setAnnotation(obj metav1.Object, key string) string { + newID := uuid.New().String() + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[key] = newID + obj.SetAnnotations(annotations) + return newID +} diff --git a/operator/src/internal/telemetry/manager.go b/operator/src/internal/telemetry/manager.go new file mode 100644 index 00000000..4e2fa8a8 --- /dev/null +++ b/operator/src/internal/telemetry/manager.go @@ -0,0 +1,241 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package telemetry + +import ( + "context" + "os" + "time" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// Manager is the main entry point for telemetry operations. +type Manager struct { + Client *TelemetryClient + Events *EventTracker + Metrics *MetricsTracker + GUIDs *GUIDManager + operatorCtx *OperatorContext + logger logr.Logger +} + +// ManagerConfig contains configuration for the telemetry manager. +type ManagerConfig struct { + OperatorVersion string + HelmChartVersion string + Logger logr.Logger +} + +// NewManager creates a new telemetry Manager. +func NewManager(ctx context.Context, cfg ManagerConfig, k8sClient client.Client, clientset kubernetes.Interface) (*Manager, error) { + // Detect operator context + operatorCtx, err := detectOperatorContext(ctx, cfg, clientset) + if err != nil { + cfg.Logger.Error(err, "Failed to detect operator context, using defaults") + operatorCtx = &OperatorContext{ + OperatorVersion: cfg.OperatorVersion, + KubernetesDistribution: DistributionOther, + CloudProvider: CloudProviderUnknown, + StartupTimestamp: time.Now(), + } + } + + // Create telemetry client + telemetryClient := NewTelemetryClient(operatorCtx, WithLogger(cfg.Logger)) + + // Create GUID manager + guidManager := NewGUIDManager(k8sClient) + + // Create event and metrics trackers + eventTracker := NewEventTracker(telemetryClient, guidManager) + metricsTracker := NewMetricsTracker(telemetryClient) + + return &Manager{ + Client: telemetryClient, + Events: eventTracker, + Metrics: metricsTracker, + GUIDs: guidManager, + operatorCtx: operatorCtx, + logger: cfg.Logger, + }, nil +} + +// Start begins telemetry collection. +func (m *Manager) Start() { + m.Client.Start() + + // Send operator startup event + m.Events.TrackOperatorStartup(OperatorStartupEvent{ + OperatorVersion: m.operatorCtx.OperatorVersion, + KubernetesVersion: m.operatorCtx.KubernetesVersion, + CloudProvider: string(m.operatorCtx.CloudProvider), + StartupTimestamp: m.operatorCtx.StartupTimestamp, + RestartCount: getRestartCount(), + HelmChartVersion: m.operatorCtx.HelmChartVersion, + }) + + m.logger.Info("Telemetry collection started", + "enabled", m.Client.IsEnabled(), + "operatorVersion", m.operatorCtx.OperatorVersion, + "k8sVersion", m.operatorCtx.KubernetesVersion, + ) +} + +// Stop gracefully stops telemetry collection. +func (m *Manager) Stop() { + m.Client.Stop() + m.logger.Info("Telemetry collection stopped") +} + +// IsEnabled returns whether telemetry is enabled. +func (m *Manager) IsEnabled() bool { + return m.Client.IsEnabled() +} + +// GetOperatorContext returns the detected operator context. +func (m *Manager) GetOperatorContext() *OperatorContext { + return m.operatorCtx +} + +// detectOperatorContext detects the deployment environment. +func detectOperatorContext(ctx context.Context, cfg ManagerConfig, clientset kubernetes.Interface) (*OperatorContext, error) { + opCtx := &OperatorContext{ + OperatorVersion: cfg.OperatorVersion, + HelmChartVersion: cfg.HelmChartVersion, + StartupTimestamp: time.Now(), + } + + // Get Kubernetes version + if clientset != nil { + discoveryClient := clientset.Discovery() + if serverVersion, err := discoveryClient.ServerVersion(); err == nil { + opCtx.KubernetesVersion = serverVersion.GitVersion + opCtx.KubernetesDistribution = DetectKubernetesDistribution(serverVersion.GitVersion) + } + } + + // Detect cloud provider from environment or node labels + opCtx.CloudProvider = detectCloudProvider(ctx, clientset) + + // Get operator namespace (hashed) + if ns := os.Getenv("POD_NAMESPACE"); ns != "" { + opCtx.OperatorNamespaceHash = HashNamespace(ns) + } + + // Detect region from node labels if possible + opCtx.Region = detectRegion(ctx, clientset) + + // Detect installation method + opCtx.InstallationMethod = detectInstallationMethod() + + return opCtx, nil +} + +// detectCloudProvider attempts to detect the cloud provider. +func detectCloudProvider(ctx context.Context, clientset kubernetes.Interface) CloudProvider { + if clientset == nil { + return CloudProviderUnknown + } + + // Try to detect from node labels or provider ID + nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 1}) + if err != nil || len(nodes.Items) == 0 { + return CloudProviderUnknown + } + + node := nodes.Items[0] + + // Check provider ID + providerID := node.Spec.ProviderID + switch { + case containsAny(providerID, "azure", "aks"): + return CloudProviderAKS + case containsAny(providerID, "aws", "eks"): + return CloudProviderEKS + case containsAny(providerID, "gce", "gke"): + return CloudProviderGKE + } + + // Check node labels + labels := node.Labels + if labels != nil { + if _, ok := labels["kubernetes.azure.com/cluster"]; ok { + return CloudProviderAKS + } + if _, ok := labels["eks.amazonaws.com/nodegroup"]; ok { + return CloudProviderEKS + } + if _, ok := labels["cloud.google.com/gke-nodepool"]; ok { + return CloudProviderGKE + } + } + + return CloudProviderUnknown +} + +// detectRegion attempts to detect the cloud region. +func detectRegion(ctx context.Context, clientset kubernetes.Interface) string { + if clientset == nil { + return "" + } + + nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 1}) + if err != nil || len(nodes.Items) == 0 { + return "" + } + + node := nodes.Items[0] + if labels := node.Labels; labels != nil { + // Standard Kubernetes topology label + if region, ok := labels["topology.kubernetes.io/region"]; ok { + return region + } + // Fallback to failure-domain label (deprecated but still used) + if region, ok := labels["failure-domain.beta.kubernetes.io/region"]; ok { + return region + } + } + + return "" +} + +// detectInstallationMethod attempts to detect how the operator was installed. +func detectInstallationMethod() string { + // Check for Helm-specific annotations/labels + if os.Getenv("HELM_RELEASE_NAME") != "" { + return "helm" + } + + // Check for OLM (Operator Lifecycle Manager) + if os.Getenv("OPERATOR_CONDITION_NAME") != "" { + return "operator-sdk" + } + + return "kubectl" +} + +// getRestartCount returns the restart count (simplified implementation). +func getRestartCount() int { + // In a real implementation, this would track restarts + // For now, return 0 as this is initial startup + return 0 +} + +// containsAny checks if s contains any of the substrings. +func containsAny(s string, substrings ...string) bool { + for _, sub := range substrings { + if len(s) > 0 && len(sub) > 0 { + for i := 0; i <= len(s)-len(sub); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + } + } + return false +} diff --git a/operator/src/internal/telemetry/metrics.go b/operator/src/internal/telemetry/metrics.go new file mode 100644 index 00000000..990c3745 --- /dev/null +++ b/operator/src/internal/telemetry/metrics.go @@ -0,0 +1,157 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package telemetry + +// MetricsTracker provides high-level methods for tracking telemetry metrics. +type MetricsTracker struct { + client *TelemetryClient +} + +// NewMetricsTracker creates a new MetricsTracker. +func NewMetricsTracker(client *TelemetryClient) *MetricsTracker { + return &MetricsTracker{ + client: client, + } +} + +// TrackOperatorHealthStatus tracks the operator health status metric. +func (m *MetricsTracker) TrackOperatorHealthStatus(healthy bool, podName, namespaceHash string) { + value := 0.0 + if healthy { + value = 1.0 + } + m.client.TrackMetric("operator.health.status", value, map[string]interface{}{ + "pod_name": podName, + "namespace_hash": namespaceHash, + }) +} + +// TrackActiveClustersCount tracks the number of active DocumentDB clusters. +func (m *MetricsTracker) TrackActiveClustersCount(count int, namespaceHash, cloudProvider, environment string) { + m.client.TrackMetric("documentdb.clusters.active.count", float64(count), map[string]interface{}{ + "namespace_hash": namespaceHash, + "cloud_provider": cloudProvider, + "environment": environment, + }) +} + +// TrackClusterConfiguration tracks cluster configuration metrics. +func (m *MetricsTracker) TrackClusterConfiguration(metric ClusterConfigurationMetric) { + m.client.TrackMetric("documentdb.cluster.configuration", 1, map[string]interface{}{ + "cluster_id": metric.ClusterID, + "namespace_hash": metric.NamespaceHash, + "node_count": metric.NodeCount, + "instances_per_node": metric.InstancesPerNode, + "total_instances": metric.TotalInstances, + "pvc_size_category": string(metric.PVCSizeCategory), + "documentdb_version": metric.DocumentDBVersion, + }) +} + +// TrackReplicationEnabled tracks replication configuration metrics. +func (m *MetricsTracker) TrackReplicationEnabled(enabled bool, metric ReplicationEnabledMetric) { + value := 0.0 + if enabled { + value = 1.0 + } + m.client.TrackMetric("documentdb.cluster.replication.enabled", value, map[string]interface{}{ + "cluster_id": metric.ClusterID, + "cross_cloud_networking_strategy": metric.CrossCloudNetworkingStrategy, + "primary_cluster_id": metric.PrimaryClusterID, + "replica_count": metric.ReplicaCount, + "high_availability": metric.HighAvailability, + "participating_cluster_count": metric.ParticipatingClusterCount, + "environments": metric.Environments, + }) +} + +// TrackActiveBackupsCount tracks the number of active backups. +func (m *MetricsTracker) TrackActiveBackupsCount(count int, namespaceHash, clusterID, backupType string) { + m.client.TrackMetric("documentdb.backups.active.count", float64(count), map[string]interface{}{ + "namespace_hash": namespaceHash, + "cluster_id": clusterID, + "backup_type": backupType, + }) +} + +// TrackScheduledBackupsCount tracks the number of active scheduled backup jobs. +func (m *MetricsTracker) TrackScheduledBackupsCount(count int) { + m.client.TrackMetric("documentdb.scheduled_backups.active.count", float64(count), nil) +} + +// TrackReplicationLag tracks replication lag metrics. +func (m *MetricsTracker) TrackReplicationLag(metric ReplicationLagMetric) { + m.client.TrackMetric("documentdb.replication.lag.bytes", float64(metric.AvgLagBytes), map[string]interface{}{ + "cluster_id": metric.ClusterID, + "replica_cluster_id": metric.ReplicaClusterID, + "namespace_hash": metric.NamespaceHash, + "min_lag_bytes": metric.MinLagBytes, + "max_lag_bytes": metric.MaxLagBytes, + "avg_lag_bytes": metric.AvgLagBytes, + }) +} + +// TrackReplicationStatus tracks replication health status. +func (m *MetricsTracker) TrackReplicationStatus(healthy bool, clusterID, replicaClusterID, namespaceHash string) { + value := 0.0 + if healthy { + value = 1.0 + } + m.client.TrackMetric("documentdb.replication.status", value, map[string]interface{}{ + "cluster_id": clusterID, + "replica_cluster_id": replicaClusterID, + "namespace_hash": namespaceHash, + }) +} + +// TrackTLSEnabledCount tracks the number of clusters with TLS enabled. +func (m *MetricsTracker) TrackTLSEnabledCount(count int, tlsMode string, serverEnabled, clientEnabled bool) { + m.client.TrackMetric("documentdb.tls.enabled.count", float64(count), map[string]interface{}{ + "tls_mode": tlsMode, + "server_tls_enabled": serverEnabled, + "client_tls_enabled": clientEnabled, + }) +} + +// TrackServiceExposureCount tracks service exposure methods. +func (m *MetricsTracker) TrackServiceExposureCount(count int, serviceType, cloudProvider string) { + m.client.TrackMetric("documentdb.service_exposure.count", float64(count), map[string]interface{}{ + "service_type": serviceType, + "cloud_provider": cloudProvider, + }) +} + +// TrackPluginUsageCount tracks plugin usage. +func (m *MetricsTracker) TrackPluginUsageCount(sidecarInjectorEnabled, walReplicaEnabled bool) { + m.client.TrackMetric("documentdb.plugin.usage.count", 1, map[string]interface{}{ + "sidecar_injector_plugin_enabled": sidecarInjectorEnabled, + "wal_replica_plugin_enabled": walReplicaEnabled, + }) +} + +// TrackReconciliationDuration tracks reconciliation performance. +func (m *MetricsTracker) TrackReconciliationDuration(metric ReconciliationDurationMetric) { + m.client.TrackMetric("documentdb.reconciliation.duration.seconds", metric.DurationSeconds, map[string]interface{}{ + "resource_type": metric.ResourceType, + "operation": metric.Operation, + "status": metric.Status, + }) +} + +// TrackAPICallDuration tracks Kubernetes API call latency. +func (m *MetricsTracker) TrackAPICallDuration(durationSeconds float64, operation, resourceType, result string) { + m.client.TrackMetric("documentdb.api.duration.seconds", durationSeconds, map[string]interface{}{ + "operation": operation, + "resource_type": resourceType, + "result": result, + }) +} + +// TrackBackupRetentionDays tracks backup retention policy. +func (m *MetricsTracker) TrackBackupRetentionDays(retentionDays int, clusterID, policyLevel string) { + m.client.TrackMetric("documentdb.backup.retention.days", float64(retentionDays), map[string]interface{}{ + "cluster_id": clusterID, + "policy_level": policyLevel, + }) +} diff --git a/operator/src/internal/telemetry/types.go b/operator/src/internal/telemetry/types.go new file mode 100644 index 00000000..ac756eaf --- /dev/null +++ b/operator/src/internal/telemetry/types.go @@ -0,0 +1,247 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// Package telemetry provides Application Insights integration for the DocumentDB Kubernetes Operator. +// It implements telemetry collection as specified in docs/designs/appinsights-metrics.md. +package telemetry + +import ( + "time" +) + +// TelemetryAnnotations defines the annotation keys used for telemetry correlation. +const ( + // ClusterIDAnnotation is the annotation key for storing auto-generated cluster GUID. + ClusterIDAnnotation = "telemetry.documentdb.io/cluster-id" + // BackupIDAnnotation is the annotation key for storing auto-generated backup GUID. + BackupIDAnnotation = "telemetry.documentdb.io/backup-id" + // ScheduledBackupIDAnnotation is the annotation key for storing auto-generated scheduled backup GUID. + ScheduledBackupIDAnnotation = "telemetry.documentdb.io/scheduled-backup-id" +) + +// CloudProvider represents the detected cloud environment. +type CloudProvider string + +const ( + CloudProviderAKS CloudProvider = "aks" + CloudProviderEKS CloudProvider = "eks" + CloudProviderGKE CloudProvider = "gke" + CloudProviderUnknown CloudProvider = "unknown" +) + +// KubernetesDistribution represents the detected Kubernetes distribution. +type KubernetesDistribution string + +const ( + DistributionAKS KubernetesDistribution = "aks" + DistributionEKS KubernetesDistribution = "eks" + DistributionGKE KubernetesDistribution = "gke" + DistributionOpenShift KubernetesDistribution = "openshift" + DistributionRancher KubernetesDistribution = "rancher" + DistributionVMwareTanzu KubernetesDistribution = "vmware-tanzu" + DistributionOther KubernetesDistribution = "other" +) + +// PVCSizeCategory categorizes PVC sizes without exposing exact values. +type PVCSizeCategory string + +const ( + PVCSizeSmall PVCSizeCategory = "small" // <50Gi + PVCSizeMedium PVCSizeCategory = "medium" // 50-200Gi + PVCSizeLarge PVCSizeCategory = "large" // >200Gi +) + +// ScheduleFrequency categorizes backup schedule frequency. +type ScheduleFrequency string + +const ( + ScheduleFrequencyHourly ScheduleFrequency = "hourly" + ScheduleFrequencyDaily ScheduleFrequency = "daily" + ScheduleFrequencyWeekly ScheduleFrequency = "weekly" + ScheduleFrequencyCustom ScheduleFrequency = "custom" +) + +// OperatorContext contains deployment context collected at startup. +type OperatorContext struct { + OperatorVersion string + KubernetesVersion string + KubernetesDistribution KubernetesDistribution + CloudProvider CloudProvider + Region string + OperatorNamespaceHash string + InstallationMethod string + HelmChartVersion string + StartupTimestamp time.Time +} + +// OperatorStartupEvent represents the OperatorStartup telemetry event. +type OperatorStartupEvent struct { + OperatorVersion string `json:"operator_version"` + KubernetesVersion string `json:"kubernetes_version"` + CloudProvider string `json:"cloud_provider"` + StartupTimestamp time.Time `json:"startup_timestamp"` + RestartCount int `json:"restart_count"` + HelmChartVersion string `json:"helm_chart_version,omitempty"` +} + +// ClusterCreatedEvent represents the ClusterCreated telemetry event. +type ClusterCreatedEvent struct { + ClusterID string `json:"cluster_id"` + NamespaceHash string `json:"namespace_hash"` + CreationDurationSeconds float64 `json:"creation_duration_seconds"` + NodeCount int `json:"node_count"` + InstancesPerNode int `json:"instances_per_node"` + StorageSize string `json:"storage_size"` + CloudProvider string `json:"cloud_provider"` + TLSEnabled bool `json:"tls_enabled"` + BootstrapType string `json:"bootstrap_type"` + SidecarInjectorPlugin bool `json:"sidecar_injector_plugin"` + ServiceType string `json:"service_type"` +} + +// ClusterUpdatedEvent represents the ClusterUpdated telemetry event. +type ClusterUpdatedEvent struct { + ClusterID string `json:"cluster_id"` + NamespaceHash string `json:"namespace_hash"` + UpdateType string `json:"update_type"` + UpdateDurationSeconds float64 `json:"update_duration_seconds"` +} + +// ClusterDeletedEvent represents the ClusterDeleted telemetry event. +type ClusterDeletedEvent struct { + ClusterID string `json:"cluster_id"` + NamespaceHash string `json:"namespace_hash"` + DeletionDurationSeconds float64 `json:"deletion_duration_seconds"` + ClusterAgeDays int `json:"cluster_age_days"` + BackupCount int `json:"backup_count"` +} + +// BackupCreatedEvent represents the BackupCreated telemetry event. +type BackupCreatedEvent struct { + BackupID string `json:"backup_id"` + ClusterID string `json:"cluster_id"` + NamespaceHash string `json:"namespace_hash"` + BackupType string `json:"backup_type"` // on-demand or scheduled + BackupMethod string `json:"backup_method"` + BackupSizeBytes int64 `json:"backup_size_bytes"` + BackupDurationSeconds float64 `json:"backup_duration_seconds"` + RetentionDays int `json:"retention_days"` + BackupPhase string `json:"backup_phase"` + CloudProvider string `json:"cloud_provider"` + IsPrimaryCluster bool `json:"is_primary_cluster"` +} + +// BackupDeletedEvent represents the BackupDeleted telemetry event. +type BackupDeletedEvent struct { + BackupID string `json:"backup_id"` + DeletionReason string `json:"deletion_reason"` // expired, manual, cluster-deleted + BackupAgeDays int `json:"backup_age_days"` +} + +// ScheduledBackupCreatedEvent represents the ScheduledBackupCreated telemetry event. +type ScheduledBackupCreatedEvent struct { + ScheduledBackupID string `json:"scheduled_backup_id"` + ClusterID string `json:"cluster_id"` + ScheduleFrequency string `json:"schedule_frequency"` + RetentionDays int `json:"retention_days"` +} + +// ClusterRestoredEvent represents the ClusterRestored telemetry event. +type ClusterRestoredEvent struct { + NewClusterID string `json:"new_cluster_id"` + SourceBackupID string `json:"source_backup_id"` + NamespaceHash string `json:"namespace_hash"` + RestoreDurationSeconds float64 `json:"restore_duration_seconds"` + BackupAgeHours float64 `json:"backup_age_hours"` + RestorePhase string `json:"restore_phase"` +} + +// FailoverOccurredEvent represents the FailoverOccurred telemetry event. +type FailoverOccurredEvent struct { + ClusterID string `json:"cluster_id"` + NamespaceHash string `json:"namespace_hash"` + FailoverType string `json:"failover_type"` // automatic, manual, switchover + OldPrimaryIndex int `json:"old_primary_index"` + NewPrimaryIndex int `json:"new_primary_index"` + FailoverDurationSeconds float64 `json:"failover_duration_seconds"` + DowntimeSeconds float64 `json:"downtime_seconds"` + ReplicationLagBytes int64 `json:"replication_lag_bytes"` + TriggerReason string `json:"trigger_reason"` +} + +// ReconciliationErrorEvent represents the ReconciliationError telemetry event. +type ReconciliationErrorEvent struct { + ResourceType string `json:"resource_type"` // DocumentDB, Backup, ScheduledBackup + ResourceID string `json:"resource_id"` + NamespaceHash string `json:"namespace_hash"` + ErrorType string `json:"error_type"` + ErrorMessage string `json:"error_message"` // Sanitized, no PII + ErrorCode string `json:"error_code"` + RetryCount int `json:"retry_count"` + ResolutionStatus string `json:"resolution_status"` // pending, resolved, failed +} + +// VolumeSnapshotErrorEvent represents the VolumeSnapshotError telemetry event. +type VolumeSnapshotErrorEvent struct { + BackupID string `json:"backup_id"` + ClusterID string `json:"cluster_id"` + ErrorType string `json:"error_type"` + CSIDriverType string `json:"csi_driver_type"` + CloudProvider string `json:"cloud_provider"` +} + +// CNPGIntegrationErrorEvent represents the CNPGIntegrationError telemetry event. +type CNPGIntegrationErrorEvent struct { + ClusterID string `json:"cluster_id"` + CNPGResourceType string `json:"cnpg_resource_type"` + ErrorCategory string `json:"error_category"` + Operation string `json:"operation"` +} + +// BackupExpiredEvent represents the BackupExpired telemetry event. +type BackupExpiredEvent struct { + BackupID string `json:"backup_id"` + ClusterID string `json:"cluster_id"` + RetentionDays int `json:"retention_days"` + ActualAgeDays int `json:"actual_age_days"` +} + +// ClusterConfigurationMetric represents cluster configuration metrics. +type ClusterConfigurationMetric struct { + ClusterID string `json:"cluster_id"` + NamespaceHash string `json:"namespace_hash"` + NodeCount int `json:"node_count"` + InstancesPerNode int `json:"instances_per_node"` + TotalInstances int `json:"total_instances"` + PVCSizeCategory PVCSizeCategory `json:"pvc_size_category"` + DocumentDBVersion string `json:"documentdb_version"` +} + +// ReplicationEnabledMetric represents replication configuration metrics. +type ReplicationEnabledMetric struct { + ClusterID string `json:"cluster_id"` + CrossCloudNetworkingStrategy string `json:"cross_cloud_networking_strategy"` + PrimaryClusterID string `json:"primary_cluster_id"` + ReplicaCount int `json:"replica_count"` + HighAvailability bool `json:"high_availability"` + ParticipatingClusterCount int `json:"participating_cluster_count"` + Environments string `json:"environments"` +} + +// ReplicationLagMetric represents replication lag metrics (aggregated). +type ReplicationLagMetric struct { + ClusterID string `json:"cluster_id"` + ReplicaClusterID string `json:"replica_cluster_id"` + NamespaceHash string `json:"namespace_hash"` + MinLagBytes int64 `json:"min_lag_bytes"` + MaxLagBytes int64 `json:"max_lag_bytes"` + AvgLagBytes int64 `json:"avg_lag_bytes"` +} + +// ReconciliationDurationMetric represents reconciliation performance metrics. +type ReconciliationDurationMetric struct { + ResourceType string `json:"resource_type"` + Operation string `json:"operation"` + Status string `json:"status"` + DurationSeconds float64 `json:"duration_seconds"` +} diff --git a/operator/src/internal/telemetry/utils.go b/operator/src/internal/telemetry/utils.go new file mode 100644 index 00000000..f86ee195 --- /dev/null +++ b/operator/src/internal/telemetry/utils.go @@ -0,0 +1,139 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +package telemetry + +import ( + "crypto/sha256" + "encoding/hex" + "strings" + + "k8s.io/apimachinery/pkg/api/resource" +) + +// HashNamespace creates a SHA-256 hash of a namespace name for privacy. +func HashNamespace(namespace string) string { + hash := sha256.Sum256([]byte(namespace)) + return hex.EncodeToString(hash[:]) +} + +// CategorizePVCSize categorizes a PVC size string into small/medium/large. +func CategorizePVCSize(pvcSize string) PVCSizeCategory { + if pvcSize == "" { + return PVCSizeSmall + } + + quantity, err := resource.ParseQuantity(pvcSize) + if err != nil { + return PVCSizeSmall + } + + // Convert to Gi for comparison + sizeGi := quantity.Value() / (1024 * 1024 * 1024) + + switch { + case sizeGi < 50: + return PVCSizeSmall + case sizeGi <= 200: + return PVCSizeMedium + default: + return PVCSizeLarge + } +} + +// CategorizeScheduleFrequency categorizes a cron expression into frequency categories. +func CategorizeScheduleFrequency(cronExpr string) ScheduleFrequency { + if cronExpr == "" { + return ScheduleFrequencyCustom + } + + parts := strings.Fields(cronExpr) + if len(parts) < 5 { + return ScheduleFrequencyCustom + } + + // Simple heuristics for common patterns + minute, hour, dayOfMonth, _, dayOfWeek := parts[0], parts[1], parts[2], parts[3], parts[4] + + // Check for step expressions (e.g., */5) or ranges (e.g., 1-5) which indicate custom schedules + isSimpleValue := func(s string) bool { + // Returns true only for single numeric values (e.g., "0", "15", "23") + if s == "*" { + return false + } + for _, c := range s { + if c < '0' || c > '9' { + return false // Contains */,-/ or other special chars + } + } + return len(s) > 0 + } + + // Hourly: runs every hour at a fixed minute (e.g., "0 * * * *" or "30 * * * *") + // Must be a simple numeric minute, not */5 or similar + if isSimpleValue(minute) && hour == "*" && dayOfMonth == "*" && dayOfWeek == "*" { + return ScheduleFrequencyHourly + } + + // Daily: runs once per day at a fixed time (e.g., "0 2 * * *") + if isSimpleValue(minute) && isSimpleValue(hour) && dayOfMonth == "*" && dayOfWeek == "*" { + return ScheduleFrequencyDaily + } + + // Weekly: runs once per week (e.g., "0 2 * * 0") + if isSimpleValue(minute) && isSimpleValue(hour) && dayOfMonth == "*" && isSimpleValue(dayOfWeek) { + return ScheduleFrequencyWeekly + } + + return ScheduleFrequencyCustom +} + +// CategorizeCSIDriver categorizes a CSI driver name. +func CategorizeCSIDriver(driverName string) string { + switch { + case strings.Contains(driverName, "azure") || strings.Contains(driverName, "disk.csi.azure.com"): + return "azure-disk" + case strings.Contains(driverName, "aws") || strings.Contains(driverName, "ebs.csi.aws.com"): + return "aws-ebs" + case strings.Contains(driverName, "gce") || strings.Contains(driverName, "pd.csi.storage.gke.io"): + return "gce-pd" + default: + return "other" + } +} + +// MapCloudProviderToString converts CloudProvider to string. +func MapCloudProviderToString(env string) string { + switch strings.ToLower(env) { + case "aks": + return "aks" + case "eks": + return "eks" + case "gke": + return "gke" + default: + return "unknown" + } +} + +// DetectKubernetesDistribution detects the Kubernetes distribution from version info. +func DetectKubernetesDistribution(versionInfo string) KubernetesDistribution { + versionLower := strings.ToLower(versionInfo) + + switch { + case strings.Contains(versionLower, "eks"): + return DistributionEKS + case strings.Contains(versionLower, "aks") || strings.Contains(versionLower, "azure"): + return DistributionAKS + case strings.Contains(versionLower, "gke"): + return DistributionGKE + case strings.Contains(versionLower, "openshift"): + return DistributionOpenShift + case strings.Contains(versionLower, "rancher") || strings.Contains(versionLower, "rke"): + return DistributionRancher + case strings.Contains(versionLower, "tanzu") || strings.Contains(versionLower, "vmware"): + return DistributionVMwareTanzu + default: + return DistributionOther + } +}