diff --git a/configs/local.yaml b/configs/local.yaml index a61c8da9..5d0123ab 100644 --- a/configs/local.yaml +++ b/configs/local.yaml @@ -39,6 +39,7 @@ clusters: status: active version: 0.0.1 description: Just a localhost + health_check: true tags: - type:localhost - data:local \ No newline at end of file diff --git a/internal/pkg/heimdall/health.go b/internal/pkg/heimdall/health.go new file mode 100644 index 00000000..e23a5551 --- /dev/null +++ b/internal/pkg/heimdall/health.go @@ -0,0 +1,134 @@ +package heimdall + +import ( + "context" + "encoding/json" + "net/http" + "sync" + "time" + + "github.com/patterninc/heimdall/pkg/object/cluster" + "github.com/patterninc/heimdall/pkg/object/status" + "github.com/patterninc/heimdall/pkg/plugin" +) + +const ( + healthCheckTimeout = 30 * time.Second + healthCheckConcurrency = 10 + healthStatusOK = `ok` + healthStatusError = `error` + healthStatusUnchecked = `unchecked` +) + +type clusterProbe struct { + cluster *cluster.Cluster + handler plugin.Handler + pluginName string +} + +type healthCheckResult struct { + ClusterID string `json:"cluster_id"` + ClusterName string `json:"cluster_name"` + Plugin string `json:"plugin"` + Status string `json:"status"` + LatencyMs int64 `json:"latency_ms"` + Error string `json:"error,omitempty"` +} + +type healthChecksResponse struct { + Healthy bool `json:"healthy"` + Checks []healthCheckResult `json:"checks"` +} + +func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout) + defer cancel() + + probes := h.resolveClusterProbes() + results := h.runHealthChecks(ctx, probes) + + healthy := true + for _, res := range results { + if res.Status == healthStatusError { + healthy = false + break + } + } + + resp := healthChecksResponse{Healthy: healthy, Checks: results} + data, _ := json.Marshal(resp) + + w.Header().Set(contentTypeKey, contentTypeJSON) + if healthy { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + w.Write(data) +} + +func (h *Heimdall) resolveClusterProbes() []*clusterProbe { + var probes []*clusterProbe + for _, cl := range h.Clusters { + if cl.Status != status.Active || !cl.HealthCheck { + continue + } + for _, cmd := range h.Commands { + if cmd.Status != status.Active { + continue + } + if cl.Tags.Contains(cmd.ClusterTags) { + probes = append(probes, &clusterProbe{ + cluster: cl, + handler: h.commandHandlers[cmd.ID], + pluginName: cmd.Plugin, + }) + break + } + } + } + return probes +} + +func (h *Heimdall) runHealthChecks(ctx context.Context, probes []*clusterProbe) []healthCheckResult { + results := make([]healthCheckResult, len(probes)) + sem := make(chan struct{}, healthCheckConcurrency) + var wg sync.WaitGroup + for i, probe := range probes { + wg.Add(1) + go func(i int, probe *clusterProbe) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + results[i] = h.checkCluster(ctx, probe) + }(i, probe) + } + wg.Wait() + return results +} + +func (h *Heimdall) checkCluster(ctx context.Context, probe *clusterProbe) healthCheckResult { + start := time.Now() + res := healthCheckResult{ + ClusterID: probe.cluster.ID, + ClusterName: probe.cluster.Name, + Plugin: probe.pluginName, + } + + hc, ok := probe.handler.(plugin.HealthChecker) + if !ok { + res.Status = healthStatusUnchecked + res.LatencyMs = time.Since(start).Milliseconds() + return res + } + + err := hc.HealthCheck(ctx, probe.cluster) + res.LatencyMs = time.Since(start).Milliseconds() + if err != nil { + res.Status = healthStatusError + res.Error = err.Error() + } else { + res.Status = healthStatusOK + } + return res +} diff --git a/internal/pkg/heimdall/heimdall.go b/internal/pkg/heimdall/heimdall.go index 10ac68a5..889c1e19 100644 --- a/internal/pkg/heimdall/heimdall.go +++ b/internal/pkg/heimdall/heimdall.go @@ -191,6 +191,7 @@ func (h *Heimdall) Start() error { apiRouter.Methods(methodPUT).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.submitCluster)) apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.getCluster)) apiRouter.Methods(methodGET).PathPrefix(`/clusters`).HandlerFunc(payloadHandler(h.getClusters)) + apiRouter.Methods(methodGET).PathPrefix(`/health`).HandlerFunc(h.healthHandler) // metrics endpoint - proxy to metrics service router.Path(`/metrics`).HandlerFunc(metricsRouteHandler) diff --git a/internal/pkg/janitor/janitor.go b/internal/pkg/janitor/janitor.go index a2698882..543deef7 100644 --- a/internal/pkg/janitor/janitor.go +++ b/internal/pkg/janitor/janitor.go @@ -21,10 +21,10 @@ type Janitor struct { Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"` StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"` FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"` - CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` + CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"` db *database.Database - commandHandlers map[string]plugin.Handler - clusters cluster.Clusters + commandHandlers map[string]plugin.Handler + clusters cluster.Clusters } func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin.Handler, clusters cluster.Clusters) error { diff --git a/internal/pkg/object/command/clickhouse/clickhouse.go b/internal/pkg/object/command/clickhouse/clickhouse.go index 1fe327e8..cf44f3bd 100644 --- a/internal/pkg/object/command/clickhouse/clickhouse.go +++ b/internal/pkg/object/command/clickhouse/clickhouse.go @@ -184,6 +184,31 @@ func collectResults(rows driver.Rows) (*result.Result, error) { return out, nil } +// HealthCheck implements the plugin.HealthChecker interface +func (cmd *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: clusterCtx.Endpoints, + Auth: clickhouse.Auth{ + Database: clusterCtx.Database, + Username: cmd.Username, + Password: cmd.Password, + }, + }) + if err != nil { + return err + } + defer conn.Close() + + return conn.Ping(ctx) +} + func (cmd *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // No cleanup needed. CLickhouse queries should always be synchronous. return nil diff --git a/internal/pkg/object/command/clickhouse/column_types.go b/internal/pkg/object/command/clickhouse/column_types.go index 79b05a98..aafd5c79 100644 --- a/internal/pkg/object/command/clickhouse/column_types.go +++ b/internal/pkg/object/command/clickhouse/column_types.go @@ -148,32 +148,31 @@ func handleDecimal(nullable bool) (any, func() any) { } } func handleTuple(nullable bool) (any, func() any) { - if nullable { - var p *any - return &p, func() any { - if p == nil || *p == nil { - return nil - } - return *p - } - } - var v any - return &v, func() any { return v } + if nullable { + var p *any + return &p, func() any { + if p == nil || *p == nil { + return nil + } + return *p + } + } + var v any + return &v, func() any { return v } } - func handleArray(nullable bool) (any, func() any) { - if nullable { - var p *any - return &p, func() any { - if p == nil || *p == nil { - return nil - } - return *p - } - } - var v any - return &v, func() any { return v } + if nullable { + var p *any + return &p, func() any { + if p == nil || *p == nil { + return nil + } + return *p + } + } + var v any + return &v, func() any { return v } } func handleDefault(nullable bool) (any, func() any) { @@ -207,8 +206,8 @@ func unwrapCHType(t string) (base string, nullable bool) { return "Array", nullable } if strings.HasPrefix(s, "Tuple(") { - return "Tuple", nullable - } + return "Tuple", nullable + } // Decimal(N,S) normalize to "Decimal" if isDecimal(s) { diff --git a/internal/pkg/object/command/dynamo/dynamo.go b/internal/pkg/object/command/dynamo/dynamo.go index f8c640ca..b2619d30 100644 --- a/internal/pkg/object/command/dynamo/dynamo.go +++ b/internal/pkg/object/command/dynamo/dynamo.go @@ -143,6 +143,45 @@ func (d *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (d *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + awsConfig, err := config.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + assumeRoleOptions := func(_ *dynamodb.Options) {} + if clusterCtx.RoleARN != nil { + stsSvc := sts.NewFromConfig(awsConfig) + out, err := stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: clusterCtx.RoleARN, + RoleSessionName: assumeRoleSession, + }) + if err != nil { + return err + } + assumeRoleOptions = func(o *dynamodb.Options) { + o.Credentials = credentials.NewStaticCredentialsProvider( + *out.Credentials.AccessKeyId, + *out.Credentials.SecretAccessKey, + *out.Credentials.SessionToken, + ) + } + } + + svc := dynamodb.NewFromConfig(awsConfig, assumeRoleOptions) + maxResults := int32(1) + _, err = svc.ListTables(ctx, &dynamodb.ListTablesInput{Limit: &maxResults}) + return err +} + func (d *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil diff --git a/internal/pkg/object/command/ecs/ecs.go b/internal/pkg/object/command/ecs/ecs.go index b2750b7c..a1ef4d50 100644 --- a/internal/pkg/object/command/ecs/ecs.go +++ b/internal/pkg/object/command/ecs/ecs.go @@ -852,3 +852,32 @@ func isThrottlingError(err error) bool { } return false } + +// HealthCheck implements the plugin.HealthChecker interface +func (e *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + ecsClient := ecs.NewFromConfig(cfg) + out, err := ecsClient.DescribeClusters(ctx, &ecs.DescribeClustersInput{ + Clusters: []string{clusterCtx.ClusterName}, + }) + if err != nil { + return err + } + + if len(out.Clusters) == 0 { + return fmt.Errorf("ECS cluster %q not found", clusterCtx.ClusterName) + } + + return nil +} diff --git a/internal/pkg/object/command/glue/glue.go b/internal/pkg/object/command/glue/glue.go index de150531..81a10b5d 100644 --- a/internal/pkg/object/command/glue/glue.go +++ b/internal/pkg/object/command/glue/glue.go @@ -3,6 +3,9 @@ package glue import ( "context" + awssdk "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/glue" "github.com/patterninc/heimdall/internal/pkg/aws" heimdallContext "github.com/patterninc/heimdall/pkg/context" "github.com/patterninc/heimdall/pkg/object/cluster" @@ -56,6 +59,19 @@ func (g *commandContext) Execute(ctx context.Context, _ *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (g *commandContext) HealthCheck(ctx context.Context, _ *cluster.Cluster) error { + cfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + glueClient := glue.NewFromConfig(cfg) + maxResults := awssdk.Int32(1) + _, err = glueClient.GetDatabases(ctx, &glue.GetDatabasesInput{MaxResults: maxResults}) + return err +} + // Cleanup implements the plugin.Handler interface func (g *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed diff --git a/internal/pkg/object/command/ping/ping.go b/internal/pkg/object/command/ping/ping.go index dd7efcc0..3dcd8b00 100644 --- a/internal/pkg/object/command/ping/ping.go +++ b/internal/pkg/object/command/ping/ping.go @@ -32,6 +32,11 @@ func (p *commandContext) Execute(ctx context.Context, _ *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (p *commandContext) HealthCheck(_ context.Context, _ *cluster.Cluster) error { + return nil +} + // Cleanup implements the plugin.Handler interface func (p *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed diff --git a/internal/pkg/object/command/postgres/postgres.go b/internal/pkg/object/command/postgres/postgres.go index 56dbd87c..034a13c5 100644 --- a/internal/pkg/object/command/postgres/postgres.go +++ b/internal/pkg/object/command/postgres/postgres.go @@ -156,7 +156,7 @@ func splitAndTrimQueries(query string) []string { return queries } -func (p *postgresCommandContext)Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error{ +func (p *postgresCommandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // implement me return nil -} \ No newline at end of file +} diff --git a/internal/pkg/object/command/shell/shell.go b/internal/pkg/object/command/shell/shell.go index 603fca8c..e6f19ff6 100644 --- a/internal/pkg/object/command/shell/shell.go +++ b/internal/pkg/object/command/shell/shell.go @@ -110,6 +110,11 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (s *commandContext) HealthCheck(_ context.Context, _ *cluster.Cluster) error { + return nil +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // Implement cleanup if needed return nil diff --git a/internal/pkg/object/command/snowflake/snowflake.go b/internal/pkg/object/command/snowflake/snowflake.go index 19d87cc0..8d59c760 100644 --- a/internal/pkg/object/command/snowflake/snowflake.go +++ b/internal/pkg/object/command/snowflake/snowflake.go @@ -146,6 +146,47 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + privateKeyBytes, err := os.ReadFile(clusterCtx.PrivateKey) + if err != nil { + return err + } + + privateKey, err := parsePrivateKey(privateKeyBytes) + if err != nil { + return err + } + + dsn, err := sf.DSN(&sf.Config{ + Account: clusterCtx.Account, + User: clusterCtx.User, + Database: clusterCtx.Database, + Warehouse: clusterCtx.Warehouse, + Role: s.Role, + Authenticator: sf.AuthTypeJwt, + PrivateKey: privateKey, + }) + if err != nil { + return err + } + + db, err := sql.Open(snowflakeDriverName, dsn) + if err != nil { + return err + } + defer db.Close() + + return db.PingContext(ctx) +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil diff --git a/internal/pkg/object/command/spark/spark.go b/internal/pkg/object/command/spark/spark.go index 9f8faae5..3de19ef0 100644 --- a/internal/pkg/object/command/spark/spark.go +++ b/internal/pkg/object/command/spark/spark.go @@ -261,6 +261,47 @@ timeoutLoop: } +// HealthCheck implements the plugin.HealthChecker interface +func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + awsConfig, err := config.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + assumeRoleOptions := func(_ *emrcontainers.Options) {} + if clusterCtx.RoleARN != nil { + stsSvc := sts.NewFromConfig(awsConfig) + out, err := stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: clusterCtx.RoleARN, + RoleSessionName: assumeRoleSession, + }) + if err != nil { + return err + } + assumeRoleOptions = func(o *emrcontainers.Options) { + o.Credentials = credentials.NewStaticCredentialsProvider( + *out.Credentials.AccessKeyId, + *out.Credentials.SecretAccessKey, + *out.Credentials.SessionToken, + ) + } + } + + emrClient := emrcontainers.NewFromConfig(awsConfig, assumeRoleOptions) + maxResults := int32(1) + _, err = emrClient.ListVirtualClusters(ctx, &emrcontainers.ListVirtualClustersInput{ + MaxResults: &maxResults, + }) + return err +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index 658f0cbd..ec208acc 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -192,6 +192,32 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. return nil } +// HealthCheck implements the plugin.HealthChecker interface +func (s *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c != nil && c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + if clusterCtx.RoleARN == nil { + return nil + } + + awsCfg, err := awsconfig.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + stsSvc := sts.NewFromConfig(awsCfg) + _, err = stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{ + RoleArn: clusterCtx.RoleARN, + RoleSessionName: aws.String("heimdall-health"), + }) + return err +} + func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // get app name and namespace from job id and command context diff --git a/internal/pkg/object/command/trino/trino.go b/internal/pkg/object/command/trino/trino.go index 88a11b6b..45b24971 100644 --- a/internal/pkg/object/command/trino/trino.go +++ b/internal/pkg/object/command/trino/trino.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "net/http" "time" "github.com/hladush/go-telemetry/pkg/telemetry" @@ -95,6 +96,33 @@ func (t *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } +// HealthCheck implements the plugin.HealthChecker interface +func (t *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error { + clusterCtx := &clusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterCtx); err != nil { + return err + } + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, clusterCtx.Endpoint+"/v1/info", nil) + if err != nil { + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("trino /v1/info returned status %d", resp.StatusCode) + } + + return nil +} + func (t *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { // TODO: Implement cleanup if needed return nil diff --git a/pkg/object/cluster/cluster.go b/pkg/object/cluster/cluster.go index f4d7aec1..a31fc647 100644 --- a/pkg/object/cluster/cluster.go +++ b/pkg/object/cluster/cluster.go @@ -15,6 +15,7 @@ var ( type Cluster struct { object.Object `yaml:",inline" json:",inline"` Status status.Status `yaml:"status,omitempty" json:"status,omitempty"` + HealthCheck bool `yaml:"health_check,omitempty" json:"health_check,omitempty"` RBACNames []string `yaml:"rbacs,omitempty" json:"rbacs,omitempty"` RBACs []rbac.RBAC `yaml:"-" json:"-"` } diff --git a/pkg/plugin/plugin.go b/pkg/plugin/plugin.go index 2f72af2e..ef7c6910 100644 --- a/pkg/plugin/plugin.go +++ b/pkg/plugin/plugin.go @@ -11,3 +11,7 @@ type Handler interface { Execute(context.Context, *Runtime, *job.Job, *cluster.Cluster) error Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error } + +type HealthChecker interface { + HealthCheck(ctx context.Context, c *cluster.Cluster) error +}