Skip to content

Commit dbac955

Browse files
committed
feat: add mcpulse command
1 parent deb1679 commit dbac955

File tree

1 file changed

+222
-0
lines changed

1 file changed

+222
-0
lines changed

cmd/mcpulse/main.go

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
"flag"
8+
"fmt"
9+
"os"
10+
"os/signal"
11+
"path/filepath"
12+
"syscall"
13+
"time"
14+
15+
"github.com/golang-migrate/migrate/v4"
16+
githubPgx "github.com/golang-migrate/migrate/v4/database/pgx/v5"
17+
_ "github.com/golang-migrate/migrate/v4/source/file"
18+
_ "github.com/jackc/pgx/v5/stdlib"
19+
"github.com/sirrobot01/mcpulse/internal/auth"
20+
"github.com/sirrobot01/mcpulse/internal/config"
21+
"github.com/sirrobot01/mcpulse/internal/database"
22+
"github.com/sirrobot01/mcpulse/internal/ingestion"
23+
"github.com/sirrobot01/mcpulse/internal/metrics"
24+
"github.com/sirrobot01/mcpulse/internal/server"
25+
grpcserver "github.com/sirrobot01/mcpulse/internal/server/grpc"
26+
"go.uber.org/zap"
27+
"google.golang.org/grpc"
28+
)
29+
30+
func main() {
31+
configPath := flag.String("config", "", "path to config file")
32+
flag.Parse()
33+
34+
cfg, err := config.Load(*configPath)
35+
if err != nil {
36+
_, _ = fmt.Fprintf(os.Stderr, "failed to load config: %v\n", err)
37+
os.Exit(1)
38+
}
39+
40+
if err := cfg.Validate(); err != nil {
41+
_, _ = fmt.Fprintf(os.Stderr, "invalid config: %v\n", err)
42+
os.Exit(1)
43+
}
44+
45+
logger, err := initLogger(cfg.Logging)
46+
if err != nil {
47+
_, _ = fmt.Fprintf(os.Stderr, "failed to initialize logger: %v\n", err)
48+
os.Exit(1)
49+
}
50+
defer func(logger *zap.Logger) {
51+
_ = logger.Sync()
52+
}(logger)
53+
54+
logger.Info("starting MCPulse server",
55+
zap.String("http_addr", fmt.Sprintf(":%d", cfg.Server.Port)),
56+
zap.String("grpc_addr", fmt.Sprintf(":%d", cfg.Server.GRPCPort)),
57+
)
58+
59+
promMetrics := metrics.NewMetrics()
60+
61+
ctx := context.Background()
62+
dbConfig := database.Config{
63+
Host: cfg.Database.Host,
64+
Port: cfg.Database.Port,
65+
Database: cfg.Database.Database,
66+
Username: cfg.Database.Username,
67+
Password: cfg.Database.Password,
68+
MaxConnections: cfg.Database.MaxConnections,
69+
MaxIdleConnections: cfg.Database.MaxIdleConnections,
70+
MaxLifetime: cfg.Database.MaxLifetime,
71+
}
72+
73+
if err := runMigrations(dbConfig, cfg.Database.MigrationsPath, logger); err != nil {
74+
logger.Fatal("failed to run database migrations", zap.Error(err))
75+
}
76+
77+
db, err := database.New(ctx, dbConfig)
78+
if err != nil {
79+
logger.Fatal("failed to connect to database", zap.Error(err))
80+
}
81+
defer db.Close()
82+
83+
logger.Info("connected to database")
84+
85+
repo := database.NewRepository(db)
86+
87+
authRepo := auth.NewRepository(db)
88+
jwtService := auth.NewJWTService(
89+
cfg.Auth.JWTSecret,
90+
cfg.Auth.AccessTokenTTL,
91+
cfg.Auth.RefreshTokenTTL,
92+
)
93+
authService := auth.NewService(authRepo, jwtService)
94+
authMiddleware := auth.NewMiddleware(jwtService, authRepo)
95+
96+
logger.Info("authentication enabled",
97+
zap.Duration("access_token_ttl", cfg.Auth.AccessTokenTTL),
98+
zap.Duration("refresh_token_ttl", cfg.Auth.RefreshTokenTTL),
99+
)
100+
101+
ingestService := ingestion.NewService(cfg.Ingestion, cfg.Privacy, repo, promMetrics, logger)
102+
103+
srv := server.NewServer(cfg, repo, ingestService, promMetrics, logger, authService, authMiddleware)
104+
gSrv := grpcserver.New(cfg, repo, ingestService, promMetrics, logger, authService)
105+
106+
serverErrors := make(chan error, 1)
107+
go func() {
108+
serverErrors <- srv.Start()
109+
}()
110+
111+
grpcErrors := make(chan error, 1)
112+
go func() {
113+
if err := gSrv.Start(); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
114+
grpcErrors <- err
115+
}
116+
close(grpcErrors)
117+
}()
118+
119+
shutdown := make(chan os.Signal, 1)
120+
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
121+
122+
select {
123+
case err := <-serverErrors:
124+
logger.Fatal("server error", zap.Error(err))
125+
case err := <-grpcErrors:
126+
if err != nil {
127+
logger.Fatal("gRPC server error", zap.Error(err))
128+
}
129+
case sig := <-shutdown:
130+
logger.Info("received shutdown signal", zap.String("signal", sig.String()))
131+
132+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
133+
defer cancel()
134+
135+
gSrv.Stop()
136+
137+
if err := srv.Stop(ctx); err != nil {
138+
logger.Error("failed to gracefully shutdown server", zap.Error(err))
139+
}
140+
logger.Info("shutdown complete")
141+
}
142+
}
143+
144+
func runMigrations(cfg database.Config, migrationsPath string, logger *zap.Logger) error {
145+
if migrationsPath == "" {
146+
migrationsPath = "migrations"
147+
}
148+
149+
absPath, err := filepath.Abs(migrationsPath)
150+
if err != nil {
151+
return fmt.Errorf("resolve migrations path: %w", err)
152+
}
153+
154+
if info, statErr := os.Stat(absPath); statErr != nil {
155+
if os.IsNotExist(statErr) {
156+
return fmt.Errorf("migrations path does not exist: %s", absPath)
157+
}
158+
return fmt.Errorf("stat migrations path: %w", statErr)
159+
} else if !info.IsDir() {
160+
return fmt.Errorf("migrations path is not a directory: %s", absPath)
161+
}
162+
163+
sourceURL := fmt.Sprintf("file://%s", absPath)
164+
165+
db, err := sql.Open("pgx/v5", database.ConnectionString(cfg))
166+
if err != nil {
167+
return fmt.Errorf("open database for migrations: %w", err)
168+
}
169+
defer func(db *sql.DB) {
170+
err := db.Close()
171+
if err != nil {
172+
logger.Warn("failed to close database after migrations", zap.Error(err))
173+
}
174+
}(db)
175+
176+
driver, err := githubPgx.WithInstance(db, &githubPgx.Config{})
177+
if err != nil {
178+
return fmt.Errorf("create migration driver: %w", err)
179+
}
180+
181+
m, err := migrate.NewWithDatabaseInstance(sourceURL, cfg.Database, driver)
182+
if err != nil {
183+
return fmt.Errorf("create migrator: %w", err)
184+
}
185+
defer func() {
186+
if sourceErr, dbErr := m.Close(); sourceErr != nil || dbErr != nil {
187+
logger.Warn("migration close error", zap.Error(errors.Join(sourceErr, dbErr)))
188+
}
189+
}()
190+
191+
if err := m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) {
192+
return fmt.Errorf("run migrations: %w", err)
193+
}
194+
195+
logger.Info("database migrations applied", zap.String("path", absPath))
196+
return nil
197+
}
198+
199+
func initLogger(cfg config.LoggingConfig) (*zap.Logger, error) {
200+
var zapConfig zap.Config
201+
202+
if cfg.Format == "json" {
203+
zapConfig = zap.NewProductionConfig()
204+
} else {
205+
zapConfig = zap.NewDevelopmentConfig()
206+
}
207+
208+
switch cfg.Level {
209+
case "debug":
210+
zapConfig.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
211+
case "info":
212+
zapConfig.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
213+
case "warn":
214+
zapConfig.Level = zap.NewAtomicLevelAt(zap.WarnLevel)
215+
case "error":
216+
zapConfig.Level = zap.NewAtomicLevelAt(zap.ErrorLevel)
217+
default:
218+
zapConfig.Level = zap.NewAtomicLevelAt(zap.InfoLevel)
219+
}
220+
221+
return zapConfig.Build()
222+
}

0 commit comments

Comments
 (0)