Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configs/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ clusters:
status: active
version: 0.0.1
description: Just a localhost
health_check: true
tags:
- type:localhost
- data:local
134 changes: 134 additions & 0 deletions internal/pkg/heimdall/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package heimdall

import (
"context"
"encoding/json"
"net/http"
"sync"
"time"

"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/status"
"github.com/patterninc/heimdall/pkg/plugin"
)

const (
healthCheckTimeout = 30 * time.Second
healthCheckConcurrency = 10
healthStatusOK = `ok`
healthStatusError = `error`
healthStatusUnchecked = `unchecked`
)

type clusterProbe struct {
cluster *cluster.Cluster
handler plugin.Handler
pluginName string
}

type healthCheckResult struct {
ClusterID string `json:"cluster_id"`
ClusterName string `json:"cluster_name"`
Plugin string `json:"plugin"`
Status string `json:"status"`
LatencyMs int64 `json:"latency_ms"`
Error string `json:"error,omitempty"`
}

type healthChecksResponse struct {
Healthy bool `json:"healthy"`
Checks []healthCheckResult `json:"checks"`
}

func (h *Heimdall) healthHandler(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), healthCheckTimeout)
defer cancel()

probes := h.resolveClusterProbes()
results := h.runHealthChecks(ctx, probes)

healthy := true
for _, res := range results {
if res.Status == healthStatusError {
healthy = false
break
}
}

resp := healthChecksResponse{Healthy: healthy, Checks: results}
data, _ := json.Marshal(resp)

Comment thread
alephys26 marked this conversation as resolved.
w.Header().Set(contentTypeKey, contentTypeJSON)
if healthy {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
w.Write(data)
}

func (h *Heimdall) resolveClusterProbes() []*clusterProbe {
var probes []*clusterProbe
for _, cl := range h.Clusters {
if cl.Status != status.Active || !cl.HealthCheck {
continue
}
for _, cmd := range h.Commands {
if cmd.Status != status.Active {
continue
}
if cl.Tags.Contains(cmd.ClusterTags) {
probes = append(probes, &clusterProbe{
cluster: cl,
handler: h.commandHandlers[cmd.ID],
pluginName: cmd.Plugin,
})
break
}
}
}
return probes
}

func (h *Heimdall) runHealthChecks(ctx context.Context, probes []*clusterProbe) []healthCheckResult {
results := make([]healthCheckResult, len(probes))
sem := make(chan struct{}, healthCheckConcurrency)
var wg sync.WaitGroup
for i, probe := range probes {
wg.Add(1)
go func(i int, probe *clusterProbe) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
results[i] = h.checkCluster(ctx, probe)
}(i, probe)
}
wg.Wait()
return results
}

func (h *Heimdall) checkCluster(ctx context.Context, probe *clusterProbe) healthCheckResult {
start := time.Now()
res := healthCheckResult{
ClusterID: probe.cluster.ID,
ClusterName: probe.cluster.Name,
Plugin: probe.pluginName,
}

hc, ok := probe.handler.(plugin.HealthChecker)
if !ok {
res.Status = healthStatusUnchecked
res.LatencyMs = time.Since(start).Milliseconds()
return res
}

err := hc.HealthCheck(ctx, probe.cluster)
res.LatencyMs = time.Since(start).Milliseconds()
if err != nil {
res.Status = healthStatusError
res.Error = err.Error()
} else {
res.Status = healthStatusOK
}
return res
}
1 change: 1 addition & 0 deletions internal/pkg/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (h *Heimdall) Start() error {
apiRouter.Methods(methodPUT).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.submitCluster))
apiRouter.Methods(methodGET).PathPrefix(`/cluster/{id}`).HandlerFunc(payloadHandler(h.getCluster))
apiRouter.Methods(methodGET).PathPrefix(`/clusters`).HandlerFunc(payloadHandler(h.getClusters))
apiRouter.Methods(methodGET).PathPrefix(`/health`).HandlerFunc(h.healthHandler)

// metrics endpoint - proxy to metrics service
router.Path(`/metrics`).HandlerFunc(metricsRouteHandler)
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type Janitor struct {
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"`
CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"`
CleanInterval int `yaml:"clean_interval,omitempty" json:"clean_interval,omitempty"`
db *database.Database
commandHandlers map[string]plugin.Handler
clusters cluster.Clusters
commandHandlers map[string]plugin.Handler
clusters cluster.Clusters
}

func (j *Janitor) Start(d *database.Database, commandHandlers map[string]plugin.Handler, clusters cluster.Clusters) error {
Expand Down
25 changes: 25 additions & 0 deletions internal/pkg/object/command/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,31 @@ func collectResults(rows driver.Rows) (*result.Result, error) {
return out, nil
}

// HealthCheck implements the plugin.HealthChecker interface
func (cmd *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error {
clusterCtx := &clusterContext{}
if c.Context != nil {
if err := c.Context.Unmarshal(clusterCtx); err != nil {
return err
}
}

conn, err := clickhouse.Open(&clickhouse.Options{
Addr: clusterCtx.Endpoints,
Auth: clickhouse.Auth{
Database: clusterCtx.Database,
Username: cmd.Username,
Password: cmd.Password,
},
})
if err != nil {
return err
}
defer conn.Close()

return conn.Ping(ctx)
}

func (cmd *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error {
// No cleanup needed. CLickhouse queries should always be synchronous.
return nil
Expand Down
49 changes: 24 additions & 25 deletions internal/pkg/object/command/clickhouse/column_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,32 +148,31 @@ func handleDecimal(nullable bool) (any, func() any) {
}
}
func handleTuple(nullable bool) (any, func() any) {
if nullable {
var p *any
return &p, func() any {
if p == nil || *p == nil {
return nil
}
return *p
}
}
var v any
return &v, func() any { return v }
if nullable {
var p *any
return &p, func() any {
if p == nil || *p == nil {
return nil
}
return *p
}
}
var v any
return &v, func() any { return v }
}


func handleArray(nullable bool) (any, func() any) {
if nullable {
var p *any
return &p, func() any {
if p == nil || *p == nil {
return nil
}
return *p
}
}
var v any
return &v, func() any { return v }
if nullable {
var p *any
return &p, func() any {
if p == nil || *p == nil {
return nil
}
return *p
}
}
var v any
return &v, func() any { return v }
}

func handleDefault(nullable bool) (any, func() any) {
Expand Down Expand Up @@ -207,8 +206,8 @@ func unwrapCHType(t string) (base string, nullable bool) {
return "Array", nullable
}
if strings.HasPrefix(s, "Tuple(") {
return "Tuple", nullable
}
return "Tuple", nullable
}

// Decimal(N,S) normalize to "Decimal"
if isDecimal(s) {
Expand Down
39 changes: 39 additions & 0 deletions internal/pkg/object/command/dynamo/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,45 @@ func (d *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job.

}

// HealthCheck implements the plugin.HealthChecker interface
func (d *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error {
clusterCtx := &clusterContext{}
if c.Context != nil {
if err := c.Context.Unmarshal(clusterCtx); err != nil {
return err
}
}

awsConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
return err
}

assumeRoleOptions := func(_ *dynamodb.Options) {}
if clusterCtx.RoleARN != nil {
stsSvc := sts.NewFromConfig(awsConfig)
out, err := stsSvc.AssumeRole(ctx, &sts.AssumeRoleInput{
RoleArn: clusterCtx.RoleARN,
RoleSessionName: assumeRoleSession,
})
if err != nil {
return err
}
assumeRoleOptions = func(o *dynamodb.Options) {
o.Credentials = credentials.NewStaticCredentialsProvider(
*out.Credentials.AccessKeyId,
*out.Credentials.SecretAccessKey,
*out.Credentials.SessionToken,
)
Comment on lines +171 to +175
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium SAST Finding

Hardcoded AWS Credentials in Go Application

More Details

Embedding static AWS credentials directly into a Go application using the credentials.NewStaticCredentialsProvider function poses a significant security risk. This practice exposes the AWS access keys and secret keys in plaintext within the application code, making them vulnerable to theft or misuse. If an attacker gains access to the application code or the compiled binary, they can extract the hardcoded credentials and potentially gain unauthorized access to AWS resources, leading to data breaches, financial losses, or other malicious activities.

Hardcoded credentials should never be used in production environments. Instead, applications should retrieve credentials securely from trusted sources, such as environment variables, secure key management services, or temporary credentials obtained through AWS Identity and Access Management (IAM) roles. Failing to properly manage and protect AWS credentials can lead to severe consequences, including data exfiltration, resource hijacking, and compliance violations.

Attribute Value
Impact Medium
Likelihood Medium

Remediation

Hardcoding AWS credentials into an application poses a significant security risk. If the application's code is compromised or accidentally exposed, the hardcoded credentials can be easily extracted and misused by attackers to gain unauthorized access to AWS resources, potentially leading to data breaches, financial losses, and other severe consequences.

To fix this issue securely, applications should retrieve AWS credentials from secure sources at runtime, such as environment variables, AWS credential files, or AWS credential providers. This approach ensures that credentials are not embedded in the application's code and can be easily rotated or revoked if needed.

Code examples:

// VULNERABLE CODE - Hardcoded AWS credentials
import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
)

creds := credentials.NewStaticCredentialsProvider(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"",
)

// SECURE CODE - Using AWS credential provider
import (
"github.com/aws/aws-sdk-go-v2/config"
)

cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
// Handle error
}

// AWS credentials are retrieved securely from the environment or other sources

Additional recommendations:

  • Follow the AWS best practices for managing AWS access keys and secret access keys.
  • Implement least privilege access principles by granting only the necessary permissions to AWS resources.
  • Regularly rotate AWS credentials and revoke unused or compromised credentials.
  • Consider using temporary security credentials (AWS STS) for enhanced security and auditing capabilities.
  • Adhere to relevant security standards and guidelines, such as the AWS Security Best Practices and the OWASP Application Security Verification Standard (ASVS).

Rule ID: WS-GO-00051

}
}

svc := dynamodb.NewFromConfig(awsConfig, assumeRoleOptions)
maxResults := int32(1)
_, err = svc.ListTables(ctx, &dynamodb.ListTablesInput{Limit: &maxResults})
return err
}

func (d *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error {
// TODO: Implement cleanup if needed
return nil
Expand Down
29 changes: 29 additions & 0 deletions internal/pkg/object/command/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,3 +852,32 @@ func isThrottlingError(err error) bool {
}
return false
}

// HealthCheck implements the plugin.HealthChecker interface
func (e *commandContext) HealthCheck(ctx context.Context, c *cluster.Cluster) error {
clusterCtx := &clusterContext{}
if c.Context != nil {
if err := c.Context.Unmarshal(clusterCtx); err != nil {
return err
}
}

cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return err
}

ecsClient := ecs.NewFromConfig(cfg)
out, err := ecsClient.DescribeClusters(ctx, &ecs.DescribeClustersInput{
Clusters: []string{clusterCtx.ClusterName},
})
if err != nil {
return err
}

if len(out.Clusters) == 0 {
return fmt.Errorf("ECS cluster %q not found", clusterCtx.ClusterName)
}

return nil
}
16 changes: 16 additions & 0 deletions internal/pkg/object/command/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package glue
import (
"context"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/glue"
"github.com/patterninc/heimdall/internal/pkg/aws"
heimdallContext "github.com/patterninc/heimdall/pkg/context"
"github.com/patterninc/heimdall/pkg/object/cluster"
Expand Down Expand Up @@ -56,6 +59,19 @@ func (g *commandContext) Execute(ctx context.Context, _ *plugin.Runtime, j *job.

}

// HealthCheck implements the plugin.HealthChecker interface
func (g *commandContext) HealthCheck(ctx context.Context, _ *cluster.Cluster) error {
cfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
return err
}

glueClient := glue.NewFromConfig(cfg)
maxResults := awssdk.Int32(1)
_, err = glueClient.GetDatabases(ctx, &glue.GetDatabasesInput{MaxResults: maxResults})
return err
}

// Cleanup implements the plugin.Handler interface
func (g *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error {
// TODO: Implement cleanup if needed
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/object/command/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func (p *commandContext) Execute(ctx context.Context, _ *plugin.Runtime, j *job.

}

// HealthCheck implements the plugin.HealthChecker interface
func (p *commandContext) HealthCheck(_ context.Context, _ *cluster.Cluster) error {
return nil
}

// Cleanup implements the plugin.Handler interface
func (p *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error {
// TODO: Implement cleanup if needed
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/object/command/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func splitAndTrimQueries(query string) []string {
return queries
}

func (p *postgresCommandContext)Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error{
func (p *postgresCommandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error {
// implement me
return nil
}
}
Loading
Loading