From bc33282e2936d164d5c3e71ef6c671d02b585832 Mon Sep 17 00:00:00 2001 From: Sylvester Damgaard Date: Thu, 20 Nov 2025 17:30:29 +0100 Subject: [PATCH 1/4] feat(metrics): add server system limits with load average support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Update gophpeek/system-metrics to v1.4.0 for 21x faster CPU metrics - Add system resource limits to server metrics (CPU cores, memory, load avg) - Implement 5-second cache for CPU metrics to avoid macOS performance issues - Restructure server response to clearly separate worker vs system metrics - Add load average metrics (1min, 5min, 15min) via new SystemMetrics API - Update API usage to handle v1.4.0 breaking changes System limits now include: - CPU: cores, usage_percent, load_average - Memory: total_mb, used_mb, available_mb, usage_percent Performance improvements: - CPU metrics: 2300ms → 105ms on macOS (21x faster via FFI) - Load average: 12x faster via native FFI calls - Static caching prevents repeated expensive syscalls --- src/Services/OverviewQueryService.php | 30 ++++-- src/Services/ServerMetricsService.php | 115 ++++++++++++++++++++- src/Services/WorkerMetricsQueryService.php | 12 +++ 3 files changed, 149 insertions(+), 8 deletions(-) diff --git a/src/Services/OverviewQueryService.php b/src/Services/OverviewQueryService.php index 2e48af8..10293a1 100644 --- a/src/Services/OverviewQueryService.php +++ b/src/Services/OverviewQueryService.php @@ -176,6 +176,10 @@ private function filterJobsForDashboard(array $jobs): array /** * Filter server metrics to essential dashboard fields only. * + * Returns simplified server data with clear separation between: + * - Worker metrics: Job counts, worker utilization (from queue workers) + * - System limits: CPU cores, total memory (physical server resources) + * * @param array> $servers * @return array> */ @@ -185,18 +189,32 @@ private function filterServersForDashboard(array $servers): array $workers = is_array($server['workers'] ?? null) ? $server['workers'] : []; $utilization = is_array($server['utilization'] ?? null) ? $server['utilization'] : []; $performance = is_array($server['performance'] ?? null) ? $server['performance'] : []; + $systemLimits = is_array($server['system_limits'] ?? null) ? $server['system_limits'] : null; $serverUtilization = $utilization['server_utilization'] ?? 0; $utilizationPercent = is_numeric($serverUtilization) ? round((float) $serverUtilization * 100, 2) : 0; - return [ + $result = [ 'hostname' => $server['hostname'] ?? '', - 'workers_total' => $workers['total'] ?? 0, - 'workers_active' => $workers['active'] ?? 0, - 'workers_idle' => $workers['idle'] ?? 0, - 'utilization_percent' => $utilizationPercent, - 'jobs_processed' => $performance['total_jobs_processed'] ?? 0, + // Worker-level metrics (from queue workers) + 'workers' => [ + 'total' => $workers['total'] ?? 0, + 'active' => $workers['active'] ?? 0, + 'idle' => $workers['idle'] ?? 0, + 'utilization_percent' => $utilizationPercent, + ], + // Job processing metrics (from queue workers) + 'jobs' => [ + 'processed' => $performance['total_jobs_processed'] ?? 0, + ], ]; + + // System resource limits (physical server capacity) + if ($systemLimits !== null) { + $result['system_limits'] = $systemLimits; + } + + return $result; }, $servers); } diff --git a/src/Services/ServerMetricsService.php b/src/Services/ServerMetricsService.php index 4bfcae0..0a8957c 100644 --- a/src/Services/ServerMetricsService.php +++ b/src/Services/ServerMetricsService.php @@ -9,10 +9,121 @@ /** * Service for collecting server-wide resource metrics. */ -final readonly class ServerMetricsService +final class ServerMetricsService { + /** @var array|null */ + private static ?array $cachedMetrics = null; + + private static ?int $cacheTimestamp = null; + + private const CACHE_TTL_SECONDS = 5; // Cache CPU metrics for 5 seconds + + /** + * Get current server resource limits and usage (with caching for performance). + * + * Returns system limits and current usage with 5-second cache to avoid + * macOS CPU polling performance issues (2+ seconds per call). + * + * Includes: + * - Memory: total, used, available, usage_percent (always fresh, fast) + * - CPU: cores, usage_percent (cached for 5 seconds to avoid slow polling) + * + * @return array + */ + public function getSystemLimits(): array + { + // Check if cached metrics are still valid (within TTL) + $now = time(); + $cacheValid = self::$cachedMetrics !== null + && self::$cacheTimestamp !== null + && ($now - self::$cacheTimestamp) < self::CACHE_TTL_SECONDS; + + if ($cacheValid && self::$cachedMetrics !== null) { + // Return cached metrics (includes CPU usage from previous call) + return self::$cachedMetrics; + } + + // Cache expired or not set - fetch fresh metrics + try { + $result = SystemMetrics::overview(); + + if (! $result->isSuccess()) { + return [ + 'available' => false, + 'error' => 'Failed to collect system metrics', + ]; + } + + $overview = $result->getValue(); + + // CPU metrics (now 21x faster on macOS with FFI!) + $cpuCores = $overview->cpu->coreCount(); + // Note: v1.4.0 breaking change - usagePercentage() now returns 0-100% directly + $cpuUsagePercent = 0.0; + if ($overview->cpu->total->total() > 0) { + $cpuUsagePercent = ($overview->cpu->total->busy() / $overview->cpu->total->total()) * 100; + } + + // Memory metrics (fast, always fresh) + $memoryTotalMb = $overview->memory->totalBytes / (1024 * 1024); + $memoryUsedMb = $overview->memory->usedBytes / (1024 * 1024); + $memoryAvailableMb = $memoryTotalMb - $memoryUsedMb; + $memoryUsagePercent = $overview->memory->usedPercentage(); + + // Load average (new in v1.4.0 with FFI - 12x faster!) + // Load average is a separate facade method, not part of overview + $loadAverage = [ + '1min' => 0.0, + '5min' => 0.0, + '15min' => 0.0, + ]; + try { + $loadResult = SystemMetrics::loadAverage(); + if ($loadResult->isSuccess()) { + $load = $loadResult->getValue(); + $loadAverage = [ + '1min' => $load->oneMinute, + '5min' => $load->fiveMinutes, + '15min' => $load->fifteenMinutes, + ]; + } + } catch (\Throwable $e) { + // Load average is optional, continue with zeros if unavailable + } + + $metrics = [ + 'available' => true, + 'cpu' => [ + 'cores' => $cpuCores, + 'usage_percent' => round($cpuUsagePercent, 2), + 'load_average' => $loadAverage, + ], + 'memory' => [ + 'total_mb' => round($memoryTotalMb, 2), + 'used_mb' => round($memoryUsedMb, 2), + 'available_mb' => round($memoryAvailableMb, 2), + 'usage_percent' => round($memoryUsagePercent, 2), + ], + ]; + + // Cache the metrics + self::$cachedMetrics = $metrics; + self::$cacheTimestamp = $now; + + return $metrics; + } catch (\Throwable $e) { + return [ + 'available' => false, + 'error' => 'Exception collecting system metrics: '.$e->getMessage(), + ]; + } + } + /** - * Get current server resource metrics. + * Get current server resource metrics (including usage). + * + * WARNING: On macOS, CPU usage calculation can be slow (1-2 seconds). + * Consider using getSystemLimits() for overview/dashboard endpoints. * * @return array */ diff --git a/src/Services/WorkerMetricsQueryService.php b/src/Services/WorkerMetricsQueryService.php index 2c84de4..f3ffe58 100644 --- a/src/Services/WorkerMetricsQueryService.php +++ b/src/Services/WorkerMetricsQueryService.php @@ -22,6 +22,7 @@ public function __construct( private WorkerHeartbeatRepository $workerHeartbeatRepository, private JobMetricsRepository $jobMetricsRepository, private TrendAnalysisService $trendAnalysis, + private ServerMetricsService $serverMetricsService, ) {} /** @@ -281,6 +282,17 @@ public function getAllServersWithMetrics(): array 'Consider reducing worker count to optimize resource usage'; } } + + // Add system resource limits (only for current server) + // Note: This gets system limits for the current server running this code + // For multi-server setups, we'd need each server to report its own limits + // Uses fast getSystemLimits() to avoid macOS CPU polling performance issues + if ($hostname === gethostname()) { + $systemLimits = $this->serverMetricsService->getSystemLimits(); + if ($systemLimits['available']) { + $server['system_limits'] = $systemLimits; + } + } } return $servers; From 7f5b48a6add9c9c20031c0ebea2816d7a0378703 Mon Sep 17 00:00:00 2001 From: Sylvester Damgaard Date: Thu, 20 Nov 2025 17:32:47 +0100 Subject: [PATCH 2/4] fix(metrics): ensure throughput and duration use same time window Fix race condition where throughput_per_minute and avg_duration_ms were calculated from different time windows, causing mismatched metrics. Changes: - Add getAverageDurationInWindow() method to calculate duration from same window as throughput - Use 60-second window for both throughput and avg_duration calculations - Store duration/memory/CPU samples with unique "jobId:value" format to prevent overwrites - Calculate weighted average duration across all jobs in the queue - Separate lifetime metrics (failure_rate) from windowed metrics (throughput, avg_duration) Technical details: - Atomic Lua script calculates average duration from windowed samples - Prevents race condition where jobs with identical values overwrote each other - Ensures metric consistency by using the same Redis ZRANGEBYSCORE window --- src/Actions/CalculateQueueMetricsAction.php | 41 ++++++----- .../Contracts/JobMetricsRepository.php | 12 ++++ .../RedisJobMetricsRepository.php | 70 +++++++++++++++++-- 3 files changed, 97 insertions(+), 26 deletions(-) diff --git a/src/Actions/CalculateQueueMetricsAction.php b/src/Actions/CalculateQueueMetricsAction.php index c38f66d..5c1ea3f 100644 --- a/src/Actions/CalculateQueueMetricsAction.php +++ b/src/Actions/CalculateQueueMetricsAction.php @@ -40,21 +40,34 @@ public function execute(string $connection, string $queue): void return; } - // Aggregate metrics across all job classes + // Aggregate metrics across all job classes for CURRENT window (last 60 seconds) + // This ensures throughput and avg_duration are calculated from the same time window + $windowSeconds = 60; + $throughputPerMinute = 0; + $totalDurationMs = 0.0; + $totalJobsInWindow = 0; + + // Aggregate lifetime metrics for failure rate $totalProcessed = 0; $totalFailed = 0; - $totalDurationMs = 0.0; $lastProcessedAt = null; foreach ($queueJobs as $job) { $jobClass = $job['jobClass']; + + // Get current window metrics (last 60 seconds) + $jobThroughput = $this->jobRepository->getThroughput($jobClass, $connection, $queue, $windowSeconds); + $jobAvgDuration = $this->jobRepository->getAverageDurationInWindow($jobClass, $connection, $queue, $windowSeconds); + + $throughputPerMinute += $jobThroughput; + $totalDurationMs += ($jobAvgDuration * $jobThroughput); // Weighted by job count + $totalJobsInWindow += $jobThroughput; + + // Get lifetime metrics for failure rate and last_processed_at $metrics = $this->jobRepository->getMetrics($jobClass, $connection, $queue); $totalProcessed += is_int($metrics['total_processed']) ? $metrics['total_processed'] : 0; $totalFailed += is_int($metrics['total_failed']) ? $metrics['total_failed'] : 0; - $totalDurationMs += is_float($metrics['total_duration_ms']) || is_int($metrics['total_duration_ms']) - ? (float) $metrics['total_duration_ms'] - : 0.0; if ($metrics['last_processed_at'] instanceof \Carbon\Carbon) { if ($lastProcessedAt === null || $metrics['last_processed_at']->greaterThan($lastProcessedAt)) { @@ -63,24 +76,14 @@ public function execute(string $connection, string $queue): void } } - // Calculate aggregated metrics - $avgDuration = $totalProcessed > 0 ? $totalDurationMs / $totalProcessed : 0.0; + // Calculate aggregated metrics for current window + $avgDuration = $totalJobsInWindow > 0 ? $totalDurationMs / $totalJobsInWindow : 0.0; + + // Calculate failure rate from lifetime totals $failureRate = ($totalProcessed + $totalFailed) > 0 ? ($totalFailed / ($totalProcessed + $totalFailed)) * 100.0 : 0.0; - // Calculate throughput per minute (jobs completed in last 60 seconds) - $throughputPerMinute = 0.0; - foreach ($queueJobs as $job) { - $jobClass = $job['jobClass']; - $throughputPerMinute += $this->jobRepository->getThroughput( - $jobClass, - $connection, - $queue, - 60 // last 60 seconds - ); - } - // Store aggregated metrics $this->queueRepository->recordSnapshot($connection, $queue, [ 'throughput_per_minute' => $throughputPerMinute, diff --git a/src/Repositories/Contracts/JobMetricsRepository.php b/src/Repositories/Contracts/JobMetricsRepository.php index 2de8d25..74496c3 100644 --- a/src/Repositories/Contracts/JobMetricsRepository.php +++ b/src/Repositories/Contracts/JobMetricsRepository.php @@ -114,6 +114,18 @@ public function getThroughput( int $windowSeconds, ): int; + /** + * Get average duration for jobs completed within a specific time window. + * + * @return float Average duration in milliseconds, 0.0 if no jobs in window + */ + public function getAverageDurationInWindow( + string $jobClass, + string $connection, + string $queue, + int $windowSeconds, + ): float; + /** * Record when a job is queued for time-to-start tracking. */ diff --git a/src/Repositories/RedisJobMetricsRepository.php b/src/Repositories/RedisJobMetricsRepository.php index d751c91..d8d852d 100644 --- a/src/Repositories/RedisJobMetricsRepository.php +++ b/src/Repositories/RedisJobMetricsRepository.php @@ -95,7 +95,8 @@ public function recordCompletion( $memoryMb, $cpuTimeMs, $completedAt, - $ttl + $ttl, + $jobId ) { $pipe->incrementHashField($metricsKey, 'total_processed', 1); $pipe->incrementHashField($metricsKey, 'total_duration_ms', $durationMs); @@ -103,19 +104,24 @@ public function recordCompletion( $pipe->incrementHashField($metricsKey, 'total_cpu_time_ms', $cpuTimeMs); $pipe->setHash($metricsKey, ['last_processed_at' => $completedAt->timestamp]); - // Store duration sample (sorted set with timestamp as score) + // Store samples in sorted sets with timestamp as score + // Use a unique member format: "jobId:value" to ensure each job gets a separate entry + // This allows multiple jobs with the same duration/memory/cpu to be stored + $durationMember = $jobId . ':' . $durationMs; /** @var array $durationSample */ - $durationSample = [(string) $durationMs => (int) $completedAt->timestamp]; + $durationSample = [$durationMember => (int) $completedAt->timestamp]; $pipe->addToSortedSet($durationKey, $durationSample, $ttl); - // Store memory sample + // Store memory sample with unique member + $memoryMember = $jobId . ':' . $memoryMb; /** @var array $memorySample */ - $memorySample = [(string) $memoryMb => (int) $completedAt->timestamp]; + $memorySample = [$memoryMember => (int) $completedAt->timestamp]; $pipe->addToSortedSet($memoryKey, $memorySample, $ttl); - // Store CPU time sample + // Store CPU time sample with unique member + $cpuMember = $jobId . ':' . $cpuTimeMs; /** @var array $cpuSample */ - $cpuSample = [(string) $cpuTimeMs => (int) $completedAt->timestamp]; + $cpuSample = [$cpuMember => (int) $completedAt->timestamp]; $pipe->addToSortedSet($cpuKey, $cpuSample, $ttl); // Refresh TTL on metrics key @@ -406,6 +412,56 @@ public function getThroughput( return $driver->eval($script, 1, $key, $windowSeconds); } + public function getAverageDurationInWindow( + string $jobClass, + string $connection, + string $queue, + int $windowSeconds, + ): float { + $key = $this->redis->key('durations', $connection, $queue, $jobClass); + $driver = $this->redis->driver(); + + // Use Lua script to atomically get samples within window and calculate average + // This ensures consistency between throughput and average duration calculations + $script = <<<'LUA' + local key = KEYS[1] + local windowSeconds = tonumber(ARGV[1]) + local cutoff = redis.call('TIME')[1] - windowSeconds + + -- Get all members in the window (members are "jobId:duration") + local samples = redis.call('ZRANGEBYSCORE', key, cutoff, '+inf') + + if #samples == 0 then + return 0 + end + + -- Parse members to extract duration values and calculate average + -- Each member is formatted as "jobId:duration" + local sum = 0 + local count = 0 + for i = 1, #samples do + local member = samples[i] + local colonPos = string.find(member, ":") + if colonPos then + local duration = string.sub(member, colonPos + 1) + sum = sum + tonumber(duration) + count = count + 1 + end + end + + if count == 0 then + return 0 + end + + return sum / count + LUA; + + /** @var float */ + $result = $driver->eval($script, 1, $key, $windowSeconds); + + return (float) $result; + } + public function recordQueuedAt( string $jobClass, string $connection, From 869e5a4b16b33cf6e5a8cbb1696f1772c2dd6f8a Mon Sep 17 00:00:00 2001 From: Sylvester Damgaard Date: Thu, 20 Nov 2025 18:54:44 +0100 Subject: [PATCH 3/4] refactor!: restructure metrics response for clear abstraction separation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKING CHANGE: Major restructuring of metrics API responses to separate abstraction layers, time windows, and current vs lifetime metrics. ## Server Metrics Restructure Separated server metrics into 4 clear tiers: 1. Application tier: queue_workers (count, current/lifetime utilization) 2. Application tier: job_processing (lifetime totals, current performance) 3. Process tier: worker_processes (per-worker resource averages) 4. System tier: server_resources (actual server CPU/memory) Breaking changes: - Renamed system_limits → server_resources for clarity - Split flat utilization_rate into current_busy_percent and lifetime_busy_percent - Restructured workers/performance/utilization into nested hierarchy ## Queue Metrics Separation Separated queue metrics by time scope: - depth: Instantaneous queue state (current snapshot) - performance_60s: Windowed metrics with explicit window_seconds - lifetime: Lifetime metrics (failure_rate_percent since first job) - workers: Current vs lifetime busy percentages Breaking changes: - Flat depth/pending/etc moved into depth object - throughput_per_minute moved to performance_60s.throughput_per_minute - utilization_rate split into workers.current_busy_percent and lifetime_busy_percent ## Trend Analysis Enhancement Added comprehensive time_window context to all trend methods: - window_seconds: Duration of analysis window - window_start/window_end: ISO8601 timestamps - analyzed_at: When analysis was performed - sample_count: Number of data points analyzed Breaking changes: - Added time_window object wrapper to all trend responses - Moved period_seconds into time_window structure ## Critical Bug Fix Fixed jobs/minute calculation error (5x multiplication): - Changed from cumulative worker uptime to actual elapsed time - Used oldest worker uptime as proxy for wall-clock elapsed time - Example: 5 workers × 10min = 50min cumulative → now 10min elapsed ## Dashboard Filtering Updates Updated all dashboard filter methods to: - Map new hierarchical structures correctly - Maintain separation of current vs lifetime metrics - Preserve time window context Files modified: - src/Services/WorkerMetricsQueryService.php - src/Services/QueueMetricsQueryService.php - src/Services/OverviewQueryService.php - src/Services/TrendAnalysisService.php --- src/Services/OverviewQueryService.php | 73 +++++---- src/Services/QueueMetricsQueryService.php | 44 +++-- src/Services/TrendAnalysisService.php | 39 ++++- src/Services/WorkerMetricsQueryService.php | 179 +++++++++++++-------- 4 files changed, 221 insertions(+), 114 deletions(-) diff --git a/src/Services/OverviewQueryService.php b/src/Services/OverviewQueryService.php index 10293a1..5f24694 100644 --- a/src/Services/OverviewQueryService.php +++ b/src/Services/OverviewQueryService.php @@ -134,15 +134,21 @@ public function getOverview(bool $slim = true): array private function filterQueuesForDashboard(array $queues): array { return array_map(function ($queue) { + $depth = is_array($queue['depth'] ?? null) ? $queue['depth'] : []; + $performance60s = is_array($queue['performance_60s'] ?? null) ? $queue['performance_60s'] : []; + $lifetime = is_array($queue['lifetime'] ?? null) ? $queue['lifetime'] : []; + $workers = is_array($queue['workers'] ?? null) ? $queue['workers'] : []; + return [ 'connection' => $queue['connection'] ?? '', 'queue' => $queue['queue'] ?? '', - 'depth' => $queue['depth'] ?? 0, - 'pending' => $queue['pending'] ?? 0, - 'active_workers' => $queue['active_workers'] ?? 0, - 'throughput_per_minute' => $queue['throughput_per_minute'] ?? 0, - 'failure_rate' => $queue['failure_rate'] ?? 0, - 'utilization_rate' => $queue['utilization_rate'] ?? 0, + 'depth' => $depth['total'] ?? 0, + 'pending' => $depth['pending'] ?? 0, + 'active_workers' => $workers['active_count'] ?? 0, + 'throughput_per_minute' => $performance60s['throughput_per_minute'] ?? 0, + 'failure_rate' => $lifetime['failure_rate_percent'] ?? 0, + 'current_busy_percent' => $workers['current_busy_percent'] ?? 0, + 'lifetime_busy_percent' => $workers['lifetime_busy_percent'] ?? 0, ]; }, $queues); } @@ -177,8 +183,12 @@ private function filterJobsForDashboard(array $jobs): array * Filter server metrics to essential dashboard fields only. * * Returns simplified server data with clear separation between: - * - Worker metrics: Job counts, worker utilization (from queue workers) - * - System limits: CPU cores, total memory (physical server resources) + * - Worker metrics: Worker count, utilization (from queue workers) + * - Job metrics: Jobs processed (from queue workers) + * - System metrics: Actual server CPU/memory from SystemMetrics (physical server resources) + * + * Note: Worker process CPU/memory metrics are NOT included in dashboard as they're not + * useful for server-level overview. Use server_resources for actual server resource usage. * * @param array> $servers * @return array> @@ -186,32 +196,35 @@ private function filterJobsForDashboard(array $jobs): array private function filterServersForDashboard(array $servers): array { return array_map(function ($server) { - $workers = is_array($server['workers'] ?? null) ? $server['workers'] : []; - $utilization = is_array($server['utilization'] ?? null) ? $server['utilization'] : []; - $performance = is_array($server['performance'] ?? null) ? $server['performance'] : []; - $systemLimits = is_array($server['system_limits'] ?? null) ? $server['system_limits'] : null; - - $serverUtilization = $utilization['server_utilization'] ?? 0; - $utilizationPercent = is_numeric($serverUtilization) ? round((float) $serverUtilization * 100, 2) : 0; + $queueWorkers = is_array($server['queue_workers'] ?? null) ? $server['queue_workers'] : []; + $workerCount = is_array($queueWorkers['count'] ?? null) ? $queueWorkers['count'] : []; + $workerUtilization = is_array($queueWorkers['utilization'] ?? null) ? $queueWorkers['utilization'] : []; + $jobProcessing = is_array($server['job_processing'] ?? null) ? $server['job_processing'] : []; + $jobLifetime = is_array($jobProcessing['lifetime'] ?? null) ? $jobProcessing['lifetime'] : []; + $serverResources = is_array($server['server_resources'] ?? null) ? $server['server_resources'] : null; $result = [ 'hostname' => $server['hostname'] ?? '', // Worker-level metrics (from queue workers) 'workers' => [ - 'total' => $workers['total'] ?? 0, - 'active' => $workers['active'] ?? 0, - 'idle' => $workers['idle'] ?? 0, - 'utilization_percent' => $utilizationPercent, + 'total' => $workerCount['total'] ?? 0, + 'active' => $workerCount['active'] ?? 0, + 'idle' => $workerCount['idle'] ?? 0, + 'current_busy_percent' => $workerUtilization['current_busy_percent'] ?? 0.0, + 'lifetime_busy_percent' => $workerUtilization['lifetime_busy_percent'] ?? 0.0, ], // Job processing metrics (from queue workers) 'jobs' => [ - 'processed' => $performance['total_jobs_processed'] ?? 0, + 'total_processed' => $jobLifetime['total_processed'] ?? 0, + 'total_failed' => $jobLifetime['total_failed'] ?? 0, + 'failure_rate_percent' => $jobLifetime['failure_rate_percent'] ?? 0.0, ], ]; - // System resource limits (physical server capacity) - if ($systemLimits !== null) { - $result['system_limits'] = $systemLimits; + // System resource metrics (actual server CPU/memory from SystemMetrics) + // This is the REAL server usage, not worker process usage + if ($serverResources !== null) { + $result['server_resources'] = $serverResources; } return $result; @@ -226,11 +239,17 @@ private function filterServersForDashboard(array $servers): array */ private function filterWorkersForDashboard(array $workers): array { + $count = is_array($workers['count'] ?? null) ? $workers['count'] : []; + $utilization = is_array($workers['utilization'] ?? null) ? $workers['utilization'] : []; + $performance = is_array($workers['performance'] ?? null) ? $workers['performance'] : []; + return [ - 'total' => $workers['total'] ?? 0, - 'active' => $workers['active'] ?? 0, - 'idle' => $workers['idle'] ?? 0, - 'total_jobs_processed' => $workers['total_jobs_processed'] ?? 0, + 'total' => $count['total'] ?? 0, + 'active' => $count['active'] ?? 0, + 'idle' => $count['idle'] ?? 0, + 'current_busy_percent' => $utilization['current_busy_percent'] ?? 0.0, + 'lifetime_busy_percent' => $utilization['lifetime_busy_percent'] ?? 0.0, + 'total_jobs_processed' => $performance['total_jobs_processed'] ?? 0, ]; } } diff --git a/src/Services/QueueMetricsQueryService.php b/src/Services/QueueMetricsQueryService.php index 806ff49..45e5d89 100644 --- a/src/Services/QueueMetricsQueryService.php +++ b/src/Services/QueueMetricsQueryService.php @@ -143,7 +143,7 @@ public function getAllQueuesWithMetrics(): array $activeWorkers = $workers->count(); - // Calculate queue utilization rate from worker busy/idle time + // Calculate worker utilization from busy/idle time $totalBusyTime = 0; $totalIdleTime = 0; foreach ($workers as $worker) { @@ -152,7 +152,11 @@ public function getAllQueuesWithMetrics(): array } $totalTime = $totalBusyTime + $totalIdleTime; - $utilizationRate = $totalTime > 0 ? ($totalBusyTime / $totalTime) * 100 : 0; + $lifetimeBusyPercent = $totalTime > 0 ? ($totalBusyTime / $totalTime) * 100 : 0; + + // Calculate current worker state (% busy right now) + $busyWorkers = $workers->filter(fn ($w) => $w->state->value === 'busy')->count(); + $currentBusyPercent = $activeWorkers > 0 ? ($busyWorkers / $activeWorkers) * 100 : 0; // Get trend data $trends = $this->getQueueTrends($connection, $queue); @@ -161,17 +165,31 @@ public function getAllQueuesWithMetrics(): array 'connection' => $connection, 'queue' => $queue, 'driver' => $connection, - 'depth' => $depth->totalJobs(), - 'pending' => $depth->pendingJobs, - 'scheduled' => $depth->delayedJobs, - 'reserved' => $depth->reservedJobs, - 'oldest_job_age_seconds' => $depth->secondsOldestPendingJob() ?? 0, - 'oldest_job_age_status' => $depth->oldestPendingJobAge?->toIso8601String() ?? 'unknown', - 'throughput_per_minute' => $metrics->throughputPerMinute, - 'avg_duration_ms' => $metrics->avgDuration, - 'failure_rate' => $metrics->failureRate, - 'utilization_rate' => round($utilizationRate, 2), - 'active_workers' => $activeWorkers, + // Instantaneous queue state (current snapshot) + 'depth' => [ + 'total' => $depth->totalJobs(), + 'pending' => $depth->pendingJobs, + 'scheduled' => $depth->delayedJobs, + 'reserved' => $depth->reservedJobs, + 'oldest_job_age_seconds' => $depth->secondsOldestPendingJob() ?? 0, + 'oldest_job_age_status' => $depth->oldestPendingJobAge?->toIso8601String() ?? 'unknown', + ], + // Windowed performance metrics (60-second window from CalculateQueueMetricsAction) + 'performance_60s' => [ + 'throughput_per_minute' => $metrics->throughputPerMinute, + 'avg_duration_ms' => $metrics->avgDuration, + 'window_seconds' => 60, + ], + // Lifetime metrics (since first job) + 'lifetime' => [ + 'failure_rate_percent' => $metrics->failureRate, + ], + // Worker metrics for this queue + 'workers' => [ + 'active_count' => $activeWorkers, + 'current_busy_percent' => round($currentBusyPercent, 2), + 'lifetime_busy_percent' => round($lifetimeBusyPercent, 2), + ], 'baseline' => $baseline ? $baseline->toArray() : null, 'trends' => $trends, 'timestamp' => now()->toIso8601String(), diff --git a/src/Services/TrendAnalysisService.php b/src/Services/TrendAnalysisService.php index 63f59e7..eeccb9a 100644 --- a/src/Services/TrendAnalysisService.php +++ b/src/Services/TrendAnalysisService.php @@ -96,8 +96,16 @@ public function analyzeQueueDepthTrend( 'available' => true, 'connection' => $connection, 'queue' => $queue, - 'period_seconds' => $periodSeconds, - 'data_points' => $count, + // Time window context + 'time_window' => [ + 'window_seconds' => $periodSeconds, + 'window_start' => $startTime->toIso8601String(), + 'window_end' => $now->toIso8601String(), + 'analyzed_at' => $now->toIso8601String(), + 'sample_count' => $count, + 'sample_interval_seconds' => $intervalSeconds, + ], + // Current and historical statistics 'statistics' => [ 'current' => end($depths) ?: 0, 'average' => round($avg, 2), @@ -105,14 +113,16 @@ public function analyzeQueueDepthTrend( 'max' => $max, 'std_dev' => round($stdDev, 2), ], + // Trend direction and confidence 'trend' => [ 'slope' => round($trend['slope'], 4), 'direction' => $trend['slope'] > 0.1 ? 'increasing' : ($trend['slope'] < -0.1 ? 'decreasing' : 'stable'), 'confidence' => round($trend['r_squared'], 3), ], + // Forecast for next interval 'forecast' => [ 'next_value' => round($forecast, 2), - 'next_timestamp' => $now->copy()->addSeconds($intervalSeconds)->timestamp, + 'next_timestamp' => $now->copy()->addSeconds($intervalSeconds)->toIso8601String(), ], ]; } @@ -185,13 +195,22 @@ public function analyzeThroughputTrend( 'available' => true, 'connection' => $connection, 'queue' => $queue, - 'period_seconds' => $periodSeconds, + // Time window context + 'time_window' => [ + 'window_seconds' => $periodSeconds, + 'window_start' => $startTime->toIso8601String(), + 'window_end' => $now->toIso8601String(), + 'analyzed_at' => $now->toIso8601String(), + 'sample_count' => $count, + ], + // Throughput statistics 'statistics' => [ 'total_jobs' => $totalJobs, 'average_per_interval' => round($avg, 2), 'jobs_per_minute' => round($jobsPerMinute, 2), 'jobs_per_hour' => round($jobsPerMinute * 60, 2), ], + // Trend direction 'trend' => [ 'slope' => round($trend['slope'], 4), 'direction' => $trend['slope'] > 0 ? 'increasing' : ($trend['slope'] < 0 ? 'decreasing' : 'stable'), @@ -256,14 +275,22 @@ public function analyzeWorkerEfficiencyTrend(int $periodSeconds = 3600): array return [ 'available' => true, - 'period_seconds' => $periodSeconds, - 'data_points' => $count, + // Time window context + 'time_window' => [ + 'window_seconds' => $periodSeconds, + 'window_start' => $startTime->toIso8601String(), + 'window_end' => $now->toIso8601String(), + 'analyzed_at' => $now->toIso8601String(), + 'sample_count' => $count, + ], + // Worker efficiency statistics 'efficiency' => [ 'current' => round(end($efficiencies) ?: 0, 2), 'average' => round(array_sum($efficiencies) / max($count, 1), 2), 'min' => round(min($efficiencies), 2), 'max' => round(max($efficiencies), 2), ], + // Resource usage statistics 'resource_usage' => [ 'avg_memory_mb' => round(array_sum($memoryUsages) / max($count, 1), 2), 'avg_cpu_percent' => round(array_sum($cpuUsages) / max($count, 1), 2), diff --git a/src/Services/WorkerMetricsQueryService.php b/src/Services/WorkerMetricsQueryService.php index f3ffe58..9ab9749 100644 --- a/src/Services/WorkerMetricsQueryService.php +++ b/src/Services/WorkerMetricsQueryService.php @@ -144,15 +144,23 @@ public function getWorkersSummary(): array $avgJobsPerWorker = $total > 0 ? $totalJobsProcessed / $total : 0.0; $totalTimeSeconds = $totalIdleTimeSeconds + $totalBusyTimeSeconds; - $avgIdlePercentage = $totalTimeSeconds > 0 ? ($totalIdleTimeSeconds / $totalTimeSeconds) * 100 : 0.0; + $lifetimeBusyPercentage = $totalTimeSeconds > 0 ? ($totalBusyTimeSeconds / $totalTimeSeconds) * 100 : 0.0; + $currentBusyPercentage = $total > 0 ? ($active / $total) * 100 : 0.0; return [ - 'total' => $total, - 'active' => $active, - 'idle' => $idle, - 'avg_jobs_per_worker' => round($avgJobsPerWorker, 2), - 'total_jobs_processed' => $totalJobsProcessed, - 'avg_idle_percentage' => round($avgIdlePercentage, 2), + 'count' => [ + 'total' => $total, + 'active' => $active, + 'idle' => $idle, + ], + 'utilization' => [ + 'current_busy_percent' => round($currentBusyPercentage, 2), + 'lifetime_busy_percent' => round($lifetimeBusyPercentage, 2), + ], + 'performance' => [ + 'avg_jobs_per_worker' => round($avgJobsPerWorker, 2), + 'total_jobs_processed' => $totalJobsProcessed, + ], ]; } @@ -173,82 +181,106 @@ public function getAllServersWithMetrics(): array if (! isset($servers[$hostname])) { $servers[$hostname] = [ 'hostname' => $hostname, - 'workers' => ['total' => 0, 'active' => 0, 'idle' => 0], - 'performance' => [ - 'total_jobs_processed' => 0, - 'total_jobs_failed' => 0, - 'failure_rate' => 0.0, - 'jobs_per_minute' => 0.0, - 'avg_job_duration_ms' => 0.0, + // Application tier: Queue worker counts and states + 'queue_workers' => [ + 'count' => ['total' => 0, 'active' => 0, 'idle' => 0], + 'utilization' => [ + 'current_busy_percent' => 0.0, // % of workers busy right now + 'lifetime_busy_percent' => 0.0, // % of time workers have been busy + ], ], - 'resources' => [ - 'total_memory_mb' => 0.0, - 'avg_memory_per_job_mb' => 0.0, + // Application tier: Job processing metrics + 'job_processing' => [ + 'lifetime' => [ + 'total_processed' => 0, + 'total_failed' => 0, + 'failure_rate_percent' => 0.0, + ], + 'current' => [ + 'jobs_per_minute' => 0.0, // Based on actual elapsed time + 'avg_duration_ms' => 0.0, + ], + ], + // Process tier: Worker process resource usage + 'worker_processes' => [ + 'avg_memory_per_worker_mb' => 0.0, + 'avg_cpu_per_worker_percent' => 0.0, 'peak_memory_mb' => 0.0, - 'cpu_usage' => 0.0, - 'memory_usage' => 0.0, + 'total_memory_mb' => 0.0, // Sum across all workers ], - 'utilization' => [ - 'server_utilization' => 0.0, - 'avg_idle_percentage' => 0.0, - 'capacity_recommendation' => null, + // Capacity planning + 'capacity' => [ + 'recommendation' => null, ], 'timestamp' => now()->toIso8601String(), ]; } - $servers[$hostname]['workers']['total']++; + $servers[$hostname]['queue_workers']['count']['total']++; // Count by state if ($heartbeat->state === WorkerState::BUSY) { - $servers[$hostname]['workers']['active']++; + $servers[$hostname]['queue_workers']['count']['active']++; } else { - $servers[$hostname]['workers']['idle']++; + $servers[$hostname]['queue_workers']['count']['idle']++; } - // Aggregate performance metrics from heartbeat - $servers[$hostname]['performance']['total_jobs_processed'] += $heartbeat->jobsProcessed; + // Aggregate job processing metrics from heartbeat + $servers[$hostname]['job_processing']['lifetime']['total_processed'] += $heartbeat->jobsProcessed; - // Aggregate resource metrics from heartbeat - $servers[$hostname]['resources']['total_memory_mb'] += $heartbeat->memoryUsageMb; - $servers[$hostname]['resources']['peak_memory_mb'] = max( - $servers[$hostname]['resources']['peak_memory_mb'], + // Aggregate worker process resource metrics from heartbeat + $servers[$hostname]['worker_processes']['total_memory_mb'] += $heartbeat->memoryUsageMb; + $servers[$hostname]['worker_processes']['peak_memory_mb'] = max( + $servers[$hostname]['worker_processes']['peak_memory_mb'], $heartbeat->peakMemoryUsageMb ); - $servers[$hostname]['resources']['cpu_usage'] += $heartbeat->cpuUsagePercent; - $servers[$hostname]['resources']['memory_usage'] += $heartbeat->memoryUsageMb; + $servers[$hostname]['worker_processes']['avg_cpu_per_worker_percent'] += $heartbeat->cpuUsagePercent; + $servers[$hostname]['worker_processes']['avg_memory_per_worker_mb'] += $heartbeat->memoryUsageMb; } // Calculate averages, utilization, and performance per server foreach ($servers as $hostname => &$server) { - $totalWorkers = $server['workers']['total']; + $totalWorkers = $server['queue_workers']['count']['total']; // @phpstan-ignore-next-line - Defensive check even though PHPStan knows totalWorkers >= 1 if ($totalWorkers > 0) { - $server['resources']['avg_memory_per_job_mb'] = - $server['resources']['total_memory_mb'] / $totalWorkers; - $server['resources']['cpu_usage'] = - $server['resources']['cpu_usage'] / $totalWorkers; - $server['resources']['memory_usage'] = - $server['resources']['memory_usage'] / $totalWorkers; - - $activeWorkers = $server['workers']['active']; - $server['utilization']['server_utilization'] = $activeWorkers / $totalWorkers; - $server['utilization']['avg_idle_percentage'] = - ($server['workers']['idle'] / $totalWorkers) * 100; - - // Calculate jobs_per_minute from worker uptime data - // Get workers for this hostname to calculate total uptime + // Calculate worker process resource averages + $server['worker_processes']['avg_memory_per_worker_mb'] = + $server['worker_processes']['total_memory_mb'] / $totalWorkers; + $server['worker_processes']['avg_cpu_per_worker_percent'] = + $server['worker_processes']['avg_cpu_per_worker_percent'] / $totalWorkers; + + // Calculate current worker utilization (% busy right now) + $activeWorkers = $server['queue_workers']['count']['active']; + $server['queue_workers']['utilization']['current_busy_percent'] = + round(($activeWorkers / $totalWorkers) * 100, 2); + + // Calculate lifetime worker efficiency (% of time spent busy) $hostnameWorkers = collect($heartbeats)->filter( fn ($hb) => $hb->hostname === $hostname ); - $totalUptimeSeconds = $hostnameWorkers->sum( + $totalBusyTime = $hostnameWorkers->sum(fn ($hb) => $hb->busyTimeSeconds); + $totalIdleTime = $hostnameWorkers->sum(fn ($hb) => $hb->idleTimeSeconds); + $totalTime = $totalBusyTime + $totalIdleTime; + + $server['queue_workers']['utilization']['lifetime_busy_percent'] = $totalTime > 0 + ? round(($totalBusyTime / $totalTime) * 100, 2) + : 0.0; + + // Calculate jobs_per_minute from actual elapsed time + $oldestWorkerUptimeSeconds = $hostnameWorkers->max( fn ($hb) => $hb->busyTimeSeconds + $hb->idleTimeSeconds ); - $totalUptimeMinutes = $totalUptimeSeconds / 60; - $server['performance']['jobs_per_minute'] = $totalUptimeMinutes > 0 - ? round($server['performance']['total_jobs_processed'] / $totalUptimeMinutes, 2) + // Type safety: max() can return mixed, ensure numeric + $uptimeSeconds = is_numeric($oldestWorkerUptimeSeconds) ? (float) $oldestWorkerUptimeSeconds : 0.0; + + $elapsedMinutes = $uptimeSeconds > 0 + ? $uptimeSeconds / 60 + : 0; + + $server['job_processing']['current']['jobs_per_minute'] = $elapsedMinutes > 0 + ? round($server['job_processing']['lifetime']['total_processed'] / $elapsedMinutes, 2) : 0.0; // Get hostname-scoped job metrics for detailed job performance @@ -263,34 +295,45 @@ public function getAllServersWithMetrics(): array $totalJobsProcessed += $metrics['total_processed']; } - $server['performance']['total_jobs_failed'] = $totalJobsFailed; + $server['job_processing']['lifetime']['total_failed'] = $totalJobsFailed; $totalJobs = $totalJobsProcessed + $totalJobsFailed; - $server['performance']['failure_rate'] = $totalJobs > 0 + $server['job_processing']['lifetime']['failure_rate_percent'] = $totalJobs > 0 ? round(($totalJobsFailed / $totalJobs) * 100, 2) : 0.0; - $server['performance']['avg_job_duration_ms'] = $totalJobsProcessed > 0 + $server['job_processing']['current']['avg_duration_ms'] = $totalJobsProcessed > 0 ? round($totalDurationMs / $totalJobsProcessed, 2) : 0.0; - // Add capacity recommendation - $utilization = $server['utilization']['server_utilization']; - if ($utilization > 0.9) { - $server['utilization']['capacity_recommendation'] = + // Add capacity recommendation based on current utilization + $currentUtilization = $server['queue_workers']['utilization']['current_busy_percent'] / 100; + if ($currentUtilization > 90) { + $server['capacity']['recommendation'] = 'Consider horizontal scaling: Add more workers or servers'; - } elseif ($utilization < 0.3) { - $server['utilization']['capacity_recommendation'] = + } elseif ($currentUtilization < 30) { + $server['capacity']['recommendation'] = 'Consider reducing worker count to optimize resource usage'; } } - // Add system resource limits (only for current server) - // Note: This gets system limits for the current server running this code - // For multi-server setups, we'd need each server to report its own limits - // Uses fast getSystemLimits() to avoid macOS CPU polling performance issues - if ($hostname === gethostname()) { + // System tier: Add actual server resources (only for current server) + // Note: This gets system resources for the current server running this code + // For multi-server setups, we'd need each server to report its own resources + // Uses fast getSystemLimits() with 5-second cache to avoid macOS CPU polling issues + + // Try to match hostname (gethostname() might return different format than worker) + $currentHostname = gethostname(); + $hostnameMatch = false; + + if ($currentHostname !== false) { + $hostnameMatch = $hostname === $currentHostname + || str_starts_with($hostname, $currentHostname) + || str_starts_with($currentHostname, $hostname); + } + + if ($hostnameMatch) { $systemLimits = $this->serverMetricsService->getSystemLimits(); if ($systemLimits['available']) { - $server['system_limits'] = $systemLimits; + $server['server_resources'] = $systemLimits; } } } From 1d2a1bfbd340b294d636f748081d7eb55cc8c4cb Mon Sep 17 00:00:00 2001 From: sylvesterdamgaard <2431914+sylvesterdamgaard@users.noreply.github.com> Date: Thu, 20 Nov 2025 17:55:30 +0000 Subject: [PATCH 4/4] Fix styling --- src/Repositories/RedisJobMetricsRepository.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Repositories/RedisJobMetricsRepository.php b/src/Repositories/RedisJobMetricsRepository.php index d8d852d..3d0ddf0 100644 --- a/src/Repositories/RedisJobMetricsRepository.php +++ b/src/Repositories/RedisJobMetricsRepository.php @@ -107,19 +107,19 @@ public function recordCompletion( // Store samples in sorted sets with timestamp as score // Use a unique member format: "jobId:value" to ensure each job gets a separate entry // This allows multiple jobs with the same duration/memory/cpu to be stored - $durationMember = $jobId . ':' . $durationMs; + $durationMember = $jobId.':'.$durationMs; /** @var array $durationSample */ $durationSample = [$durationMember => (int) $completedAt->timestamp]; $pipe->addToSortedSet($durationKey, $durationSample, $ttl); // Store memory sample with unique member - $memoryMember = $jobId . ':' . $memoryMb; + $memoryMember = $jobId.':'.$memoryMb; /** @var array $memorySample */ $memorySample = [$memoryMember => (int) $completedAt->timestamp]; $pipe->addToSortedSet($memoryKey, $memorySample, $ttl); // Store CPU time sample with unique member - $cpuMember = $jobId . ':' . $cpuTimeMs; + $cpuMember = $jobId.':'.$cpuTimeMs; /** @var array $cpuSample */ $cpuSample = [$cpuMember => (int) $completedAt->timestamp]; $pipe->addToSortedSet($cpuKey, $cpuSample, $ttl);