Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions src/Actions/CalculateQueueMetricsAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,34 @@ public function execute(string $connection, string $queue): void
return;
}

// Aggregate metrics across all job classes
// Aggregate metrics across all job classes for CURRENT window (last 60 seconds)
// This ensures throughput and avg_duration are calculated from the same time window
$windowSeconds = 60;
$throughputPerMinute = 0;
$totalDurationMs = 0.0;
$totalJobsInWindow = 0;

// Aggregate lifetime metrics for failure rate
$totalProcessed = 0;
$totalFailed = 0;
$totalDurationMs = 0.0;
$lastProcessedAt = null;

foreach ($queueJobs as $job) {
$jobClass = $job['jobClass'];

// Get current window metrics (last 60 seconds)
$jobThroughput = $this->jobRepository->getThroughput($jobClass, $connection, $queue, $windowSeconds);
$jobAvgDuration = $this->jobRepository->getAverageDurationInWindow($jobClass, $connection, $queue, $windowSeconds);

$throughputPerMinute += $jobThroughput;
$totalDurationMs += ($jobAvgDuration * $jobThroughput); // Weighted by job count
$totalJobsInWindow += $jobThroughput;

// Get lifetime metrics for failure rate and last_processed_at
$metrics = $this->jobRepository->getMetrics($jobClass, $connection, $queue);

$totalProcessed += is_int($metrics['total_processed']) ? $metrics['total_processed'] : 0;
$totalFailed += is_int($metrics['total_failed']) ? $metrics['total_failed'] : 0;
$totalDurationMs += is_float($metrics['total_duration_ms']) || is_int($metrics['total_duration_ms'])
? (float) $metrics['total_duration_ms']
: 0.0;

if ($metrics['last_processed_at'] instanceof \Carbon\Carbon) {
if ($lastProcessedAt === null || $metrics['last_processed_at']->greaterThan($lastProcessedAt)) {
Expand All @@ -63,24 +76,14 @@ public function execute(string $connection, string $queue): void
}
}

// Calculate aggregated metrics
$avgDuration = $totalProcessed > 0 ? $totalDurationMs / $totalProcessed : 0.0;
// Calculate aggregated metrics for current window
$avgDuration = $totalJobsInWindow > 0 ? $totalDurationMs / $totalJobsInWindow : 0.0;

// Calculate failure rate from lifetime totals
$failureRate = ($totalProcessed + $totalFailed) > 0
? ($totalFailed / ($totalProcessed + $totalFailed)) * 100.0
: 0.0;

// Calculate throughput per minute (jobs completed in last 60 seconds)
$throughputPerMinute = 0.0;
foreach ($queueJobs as $job) {
$jobClass = $job['jobClass'];
$throughputPerMinute += $this->jobRepository->getThroughput(
$jobClass,
$connection,
$queue,
60 // last 60 seconds
);
}

// Store aggregated metrics
$this->queueRepository->recordSnapshot($connection, $queue, [
'throughput_per_minute' => $throughputPerMinute,
Expand Down
12 changes: 12 additions & 0 deletions src/Repositories/Contracts/JobMetricsRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ public function getThroughput(
int $windowSeconds,
): int;

/**
* Get average duration for jobs completed within a specific time window.
*
* @return float Average duration in milliseconds, 0.0 if no jobs in window
*/
public function getAverageDurationInWindow(
string $jobClass,
string $connection,
string $queue,
int $windowSeconds,
): float;

/**
* Record when a job is queued for time-to-start tracking.
*/
Expand Down
70 changes: 63 additions & 7 deletions src/Repositories/RedisJobMetricsRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,27 +95,33 @@ public function recordCompletion(
$memoryMb,
$cpuTimeMs,
$completedAt,
$ttl
$ttl,
$jobId
) {
$pipe->incrementHashField($metricsKey, 'total_processed', 1);
$pipe->incrementHashField($metricsKey, 'total_duration_ms', $durationMs);
$pipe->incrementHashField($metricsKey, 'total_memory_mb', $memoryMb);
$pipe->incrementHashField($metricsKey, 'total_cpu_time_ms', $cpuTimeMs);
$pipe->setHash($metricsKey, ['last_processed_at' => $completedAt->timestamp]);

// Store duration sample (sorted set with timestamp as score)
// Store samples in sorted sets with timestamp as score
// Use a unique member format: "jobId:value" to ensure each job gets a separate entry
// This allows multiple jobs with the same duration/memory/cpu to be stored
$durationMember = $jobId.':'.$durationMs;
/** @var array<string, int> $durationSample */
$durationSample = [(string) $durationMs => (int) $completedAt->timestamp];
$durationSample = [$durationMember => (int) $completedAt->timestamp];
$pipe->addToSortedSet($durationKey, $durationSample, $ttl);

// Store memory sample
// Store memory sample with unique member
$memoryMember = $jobId.':'.$memoryMb;
/** @var array<string, int> $memorySample */
$memorySample = [(string) $memoryMb => (int) $completedAt->timestamp];
$memorySample = [$memoryMember => (int) $completedAt->timestamp];
$pipe->addToSortedSet($memoryKey, $memorySample, $ttl);

// Store CPU time sample
// Store CPU time sample with unique member
$cpuMember = $jobId.':'.$cpuTimeMs;
/** @var array<string, int> $cpuSample */
$cpuSample = [(string) $cpuTimeMs => (int) $completedAt->timestamp];
$cpuSample = [$cpuMember => (int) $completedAt->timestamp];
$pipe->addToSortedSet($cpuKey, $cpuSample, $ttl);

// Refresh TTL on metrics key
Expand Down Expand Up @@ -406,6 +412,56 @@ public function getThroughput(
return $driver->eval($script, 1, $key, $windowSeconds);
}

public function getAverageDurationInWindow(
string $jobClass,
string $connection,
string $queue,
int $windowSeconds,
): float {
$key = $this->redis->key('durations', $connection, $queue, $jobClass);
$driver = $this->redis->driver();

// Use Lua script to atomically get samples within window and calculate average
// This ensures consistency between throughput and average duration calculations
$script = <<<'LUA'
local key = KEYS[1]
local windowSeconds = tonumber(ARGV[1])
local cutoff = redis.call('TIME')[1] - windowSeconds

-- Get all members in the window (members are "jobId:duration")
local samples = redis.call('ZRANGEBYSCORE', key, cutoff, '+inf')

if #samples == 0 then
return 0
end

-- Parse members to extract duration values and calculate average
-- Each member is formatted as "jobId:duration"
local sum = 0
local count = 0
for i = 1, #samples do
local member = samples[i]
local colonPos = string.find(member, ":")
if colonPos then
local duration = string.sub(member, colonPos + 1)
sum = sum + tonumber(duration)
count = count + 1
end
end

if count == 0 then
return 0
end

return sum / count
LUA;

/** @var float */
$result = $driver->eval($script, 1, $key, $windowSeconds);

return (float) $result;
}

public function recordQueuedAt(
string $jobClass,
string $connection,
Expand Down
83 changes: 60 additions & 23 deletions src/Services/OverviewQueryService.php
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,21 @@ public function getOverview(bool $slim = true): array
private function filterQueuesForDashboard(array $queues): array
{
return array_map(function ($queue) {
$depth = is_array($queue['depth'] ?? null) ? $queue['depth'] : [];
$performance60s = is_array($queue['performance_60s'] ?? null) ? $queue['performance_60s'] : [];
$lifetime = is_array($queue['lifetime'] ?? null) ? $queue['lifetime'] : [];
$workers = is_array($queue['workers'] ?? null) ? $queue['workers'] : [];

return [
'connection' => $queue['connection'] ?? '',
'queue' => $queue['queue'] ?? '',
'depth' => $queue['depth'] ?? 0,
'pending' => $queue['pending'] ?? 0,
'active_workers' => $queue['active_workers'] ?? 0,
'throughput_per_minute' => $queue['throughput_per_minute'] ?? 0,
'failure_rate' => $queue['failure_rate'] ?? 0,
'utilization_rate' => $queue['utilization_rate'] ?? 0,
'depth' => $depth['total'] ?? 0,
'pending' => $depth['pending'] ?? 0,
'active_workers' => $workers['active_count'] ?? 0,
'throughput_per_minute' => $performance60s['throughput_per_minute'] ?? 0,
'failure_rate' => $lifetime['failure_rate_percent'] ?? 0,
'current_busy_percent' => $workers['current_busy_percent'] ?? 0,
'lifetime_busy_percent' => $workers['lifetime_busy_percent'] ?? 0,
];
}, $queues);
}
Expand Down Expand Up @@ -176,27 +182,52 @@ private function filterJobsForDashboard(array $jobs): array
/**
* Filter server metrics to essential dashboard fields only.
*
* Returns simplified server data with clear separation between:
* - Worker metrics: Worker count, utilization (from queue workers)
* - Job metrics: Jobs processed (from queue workers)
* - System metrics: Actual server CPU/memory from SystemMetrics (physical server resources)
*
* Note: Worker process CPU/memory metrics are NOT included in dashboard as they're not
* useful for server-level overview. Use server_resources for actual server resource usage.
*
* @param array<string, array<string, mixed>> $servers
* @return array<string, array<string, mixed>>
*/
private function filterServersForDashboard(array $servers): array
{
return array_map(function ($server) {
$workers = is_array($server['workers'] ?? null) ? $server['workers'] : [];
$utilization = is_array($server['utilization'] ?? null) ? $server['utilization'] : [];
$performance = is_array($server['performance'] ?? null) ? $server['performance'] : [];

$serverUtilization = $utilization['server_utilization'] ?? 0;
$utilizationPercent = is_numeric($serverUtilization) ? round((float) $serverUtilization * 100, 2) : 0;

return [
$queueWorkers = is_array($server['queue_workers'] ?? null) ? $server['queue_workers'] : [];
$workerCount = is_array($queueWorkers['count'] ?? null) ? $queueWorkers['count'] : [];
$workerUtilization = is_array($queueWorkers['utilization'] ?? null) ? $queueWorkers['utilization'] : [];
$jobProcessing = is_array($server['job_processing'] ?? null) ? $server['job_processing'] : [];
$jobLifetime = is_array($jobProcessing['lifetime'] ?? null) ? $jobProcessing['lifetime'] : [];
$serverResources = is_array($server['server_resources'] ?? null) ? $server['server_resources'] : null;

$result = [
'hostname' => $server['hostname'] ?? '',
'workers_total' => $workers['total'] ?? 0,
'workers_active' => $workers['active'] ?? 0,
'workers_idle' => $workers['idle'] ?? 0,
'utilization_percent' => $utilizationPercent,
'jobs_processed' => $performance['total_jobs_processed'] ?? 0,
// Worker-level metrics (from queue workers)
'workers' => [
'total' => $workerCount['total'] ?? 0,
'active' => $workerCount['active'] ?? 0,
'idle' => $workerCount['idle'] ?? 0,
'current_busy_percent' => $workerUtilization['current_busy_percent'] ?? 0.0,
'lifetime_busy_percent' => $workerUtilization['lifetime_busy_percent'] ?? 0.0,
],
// Job processing metrics (from queue workers)
'jobs' => [
'total_processed' => $jobLifetime['total_processed'] ?? 0,
'total_failed' => $jobLifetime['total_failed'] ?? 0,
'failure_rate_percent' => $jobLifetime['failure_rate_percent'] ?? 0.0,
],
];

// System resource metrics (actual server CPU/memory from SystemMetrics)
// This is the REAL server usage, not worker process usage
if ($serverResources !== null) {
$result['server_resources'] = $serverResources;
}

return $result;
}, $servers);
}

Expand All @@ -208,11 +239,17 @@ private function filterServersForDashboard(array $servers): array
*/
private function filterWorkersForDashboard(array $workers): array
{
$count = is_array($workers['count'] ?? null) ? $workers['count'] : [];
$utilization = is_array($workers['utilization'] ?? null) ? $workers['utilization'] : [];
$performance = is_array($workers['performance'] ?? null) ? $workers['performance'] : [];

return [
'total' => $workers['total'] ?? 0,
'active' => $workers['active'] ?? 0,
'idle' => $workers['idle'] ?? 0,
'total_jobs_processed' => $workers['total_jobs_processed'] ?? 0,
'total' => $count['total'] ?? 0,
'active' => $count['active'] ?? 0,
'idle' => $count['idle'] ?? 0,
'current_busy_percent' => $utilization['current_busy_percent'] ?? 0.0,
'lifetime_busy_percent' => $utilization['lifetime_busy_percent'] ?? 0.0,
'total_jobs_processed' => $performance['total_jobs_processed'] ?? 0,
];
}
}
44 changes: 31 additions & 13 deletions src/Services/QueueMetricsQueryService.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public function getAllQueuesWithMetrics(): array

$activeWorkers = $workers->count();

// Calculate queue utilization rate from worker busy/idle time
// Calculate worker utilization from busy/idle time
$totalBusyTime = 0;
$totalIdleTime = 0;
foreach ($workers as $worker) {
Expand All @@ -152,7 +152,11 @@ public function getAllQueuesWithMetrics(): array
}

$totalTime = $totalBusyTime + $totalIdleTime;
$utilizationRate = $totalTime > 0 ? ($totalBusyTime / $totalTime) * 100 : 0;
$lifetimeBusyPercent = $totalTime > 0 ? ($totalBusyTime / $totalTime) * 100 : 0;

// Calculate current worker state (% busy right now)
$busyWorkers = $workers->filter(fn ($w) => $w->state->value === 'busy')->count();
$currentBusyPercent = $activeWorkers > 0 ? ($busyWorkers / $activeWorkers) * 100 : 0;

// Get trend data
$trends = $this->getQueueTrends($connection, $queue);
Expand All @@ -161,17 +165,31 @@ public function getAllQueuesWithMetrics(): array
'connection' => $connection,
'queue' => $queue,
'driver' => $connection,
'depth' => $depth->totalJobs(),
'pending' => $depth->pendingJobs,
'scheduled' => $depth->delayedJobs,
'reserved' => $depth->reservedJobs,
'oldest_job_age_seconds' => $depth->secondsOldestPendingJob() ?? 0,
'oldest_job_age_status' => $depth->oldestPendingJobAge?->toIso8601String() ?? 'unknown',
'throughput_per_minute' => $metrics->throughputPerMinute,
'avg_duration_ms' => $metrics->avgDuration,
'failure_rate' => $metrics->failureRate,
'utilization_rate' => round($utilizationRate, 2),
'active_workers' => $activeWorkers,
// Instantaneous queue state (current snapshot)
'depth' => [
'total' => $depth->totalJobs(),
'pending' => $depth->pendingJobs,
'scheduled' => $depth->delayedJobs,
'reserved' => $depth->reservedJobs,
'oldest_job_age_seconds' => $depth->secondsOldestPendingJob() ?? 0,
'oldest_job_age_status' => $depth->oldestPendingJobAge?->toIso8601String() ?? 'unknown',
],
// Windowed performance metrics (60-second window from CalculateQueueMetricsAction)
'performance_60s' => [
'throughput_per_minute' => $metrics->throughputPerMinute,
'avg_duration_ms' => $metrics->avgDuration,
'window_seconds' => 60,
],
// Lifetime metrics (since first job)
'lifetime' => [
'failure_rate_percent' => $metrics->failureRate,
],
// Worker metrics for this queue
'workers' => [
'active_count' => $activeWorkers,
'current_busy_percent' => round($currentBusyPercent, 2),
'lifetime_busy_percent' => round($lifetimeBusyPercent, 2),
],
'baseline' => $baseline ? $baseline->toArray() : null,
'trends' => $trends,
'timestamp' => now()->toIso8601String(),
Expand Down
Loading