Skip to content

Commit dedebfa

Browse files
feat(metrics): add queue-level metrics aggregation
Implement queue-level metrics by aggregating job-level data to provide real-time queue performance insights for autoscaling decisions. - Add CalculateQueueMetricsAction for weighted average calculations - Add queue-metrics:calculate artisan command - Schedule automatic calculation every minute - Calculate throughput_per_minute and avg_duration_ms - Calculate failure_rate as percentage (0-100) - Support both all-queues and single-queue modes - Add comprehensive test coverage This enables the scaling package to accurately calculate worker requirements based on near real-time queue metrics.
1 parent 50f6fdb commit dedebfa

File tree

4 files changed

+403
-0
lines changed

4 files changed

+403
-0
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PHPeek\LaravelQueueMetrics\Actions;
6+
7+
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\JobMetricsRepository;
8+
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\QueueMetricsRepository;
9+
10+
/**
11+
* Calculate aggregated queue-level metrics from job-level metrics.
12+
*
13+
* This action aggregates metrics across all job classes for a given queue,
14+
* calculating weighted averages and totals for throughput, duration, and failure rates.
15+
*/
16+
final readonly class CalculateQueueMetricsAction
17+
{
18+
public function __construct(
19+
private JobMetricsRepository $jobRepository,
20+
private QueueMetricsRepository $queueRepository,
21+
) {}
22+
23+
/**
24+
* Calculate and store aggregated metrics for a specific queue.
25+
*/
26+
public function execute(string $connection, string $queue): void
27+
{
28+
// Get all jobs for this queue
29+
$allJobs = $this->jobRepository->listJobs();
30+
$queueJobs = array_filter($allJobs, fn ($job) => $job['connection'] === $connection && $job['queue'] === $queue);
31+
32+
if (empty($queueJobs)) {
33+
// No jobs found for this queue - record zero metrics
34+
$this->queueRepository->recordSnapshot($connection, $queue, [
35+
'throughput_per_minute' => 0.0,
36+
'avg_duration' => 0.0,
37+
'failure_rate' => 0.0,
38+
]);
39+
40+
return;
41+
}
42+
43+
// Aggregate metrics across all job classes
44+
$totalProcessed = 0;
45+
$totalFailed = 0;
46+
$totalDurationMs = 0.0;
47+
$lastProcessedAt = null;
48+
49+
foreach ($queueJobs as $job) {
50+
$jobClass = $job['jobClass'];
51+
$metrics = $this->jobRepository->getMetrics($jobClass, $connection, $queue);
52+
53+
$totalProcessed += $metrics['total_processed'];
54+
$totalFailed += $metrics['total_failed'];
55+
$totalDurationMs += $metrics['total_duration_ms'];
56+
57+
if ($metrics['last_processed_at'] !== null) {
58+
if ($lastProcessedAt === null || $metrics['last_processed_at']->greaterThan($lastProcessedAt)) {
59+
$lastProcessedAt = $metrics['last_processed_at'];
60+
}
61+
}
62+
}
63+
64+
// Calculate aggregated metrics
65+
$avgDuration = $totalProcessed > 0 ? $totalDurationMs / $totalProcessed : 0.0;
66+
$failureRate = ($totalProcessed + $totalFailed) > 0
67+
? ($totalFailed / ($totalProcessed + $totalFailed)) * 100.0
68+
: 0.0;
69+
70+
// Calculate throughput per minute (jobs completed in last 60 seconds)
71+
$throughputPerMinute = 0.0;
72+
foreach ($queueJobs as $job) {
73+
$jobClass = $job['jobClass'];
74+
$throughputPerMinute += $this->jobRepository->getThroughput(
75+
$jobClass,
76+
$connection,
77+
$queue,
78+
60 // last 60 seconds
79+
);
80+
}
81+
82+
// Store aggregated metrics
83+
$this->queueRepository->recordSnapshot($connection, $queue, [
84+
'throughput_per_minute' => $throughputPerMinute,
85+
'avg_duration' => $avgDuration,
86+
'failure_rate' => $failureRate,
87+
'total_processed' => $totalProcessed,
88+
'total_failed' => $totalFailed,
89+
'last_processed_at' => $lastProcessedAt?->timestamp,
90+
]);
91+
}
92+
93+
/**
94+
* Calculate metrics for all discovered queues.
95+
*/
96+
public function executeForAllQueues(): int
97+
{
98+
$queues = $this->queueRepository->listQueues();
99+
$count = 0;
100+
101+
foreach ($queues as $queue) {
102+
$this->execute($queue['connection'], $queue['queue']);
103+
$count++;
104+
}
105+
106+
return $count;
107+
}
108+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PHPeek\LaravelQueueMetrics\Console;
6+
7+
use Illuminate\Console\Command;
8+
use PHPeek\LaravelQueueMetrics\Actions\CalculateQueueMetricsAction;
9+
10+
/**
11+
* Calculate and update queue-level aggregated metrics.
12+
*
13+
* This command aggregates job-level metrics into queue-level metrics including
14+
* throughput_per_minute, avg_duration, and failure_rate. These metrics are
15+
* essential for auto-scaling calculations and queue health monitoring.
16+
*/
17+
final class CalculateQueueMetricsCommand extends Command
18+
{
19+
protected $signature = 'queue-metrics:calculate
20+
{--connection= : Calculate metrics only for this connection}
21+
{--queue= : Calculate metrics only for this queue (requires --connection)}';
22+
23+
protected $description = 'Calculate aggregated queue-level metrics from job metrics';
24+
25+
public function __construct(
26+
private readonly CalculateQueueMetricsAction $action,
27+
) {
28+
parent::__construct();
29+
}
30+
31+
public function handle(): int
32+
{
33+
$connection = $this->option('connection');
34+
$queue = $this->option('queue');
35+
36+
// Validate options
37+
if ($queue !== null && $connection === null) {
38+
$this->error('The --queue option requires --connection to be specified');
39+
40+
return self::FAILURE;
41+
}
42+
43+
try {
44+
if ($connection !== null && $queue !== null) {
45+
// Calculate for specific queue
46+
$this->info("Calculating metrics for {$connection}:{$queue}...");
47+
$this->action->execute($connection, $queue);
48+
$this->info('✓ Metrics calculated successfully');
49+
50+
return self::SUCCESS;
51+
}
52+
53+
// Calculate for all queues
54+
$this->info('Calculating metrics for all queues...');
55+
$count = $this->action->executeForAllQueues();
56+
$this->info("✓ Metrics calculated for {$count} queue(s)");
57+
58+
return self::SUCCESS;
59+
} catch (\Exception $e) {
60+
$this->error("Failed to calculate queue metrics: {$e->getMessage()}");
61+
62+
return self::FAILURE;
63+
}
64+
}
65+
}

src/LaravelQueueMetricsServiceProvider.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use PHPeek\LaravelQueueMetrics\Commands\CleanupStaleWorkersCommand;
2525
use PHPeek\LaravelQueueMetrics\Config\QueueMetricsConfig;
2626
use PHPeek\LaravelQueueMetrics\Config\StorageConfig;
27+
use PHPeek\LaravelQueueMetrics\Console\CalculateQueueMetricsCommand;
2728
use PHPeek\LaravelQueueMetrics\Console\DetectStaleWorkersCommand;
2829
use PHPeek\LaravelQueueMetrics\Console\RecordTrendDataCommand;
2930
use PHPeek\LaravelQueueMetrics\Contracts\QueueInspector;
@@ -69,6 +70,7 @@ public function configurePackage(Package $package): void
6970
->hasRoute('api')
7071
->hasMigration('2024_01_01_000001_create_queue_metrics_storage_tables')
7172
->hasCommand(CalculateBaselinesCommand::class)
73+
->hasCommand(CalculateQueueMetricsCommand::class)
7274
->hasCommand(CleanupStaleWorkersCommand::class)
7375
->hasCommand(DetectStaleWorkersCommand::class)
7476
->hasCommand(RecordTrendDataCommand::class);
@@ -199,6 +201,11 @@ protected function registerScheduledTasks(): void
199201
// Schedule adaptive baseline calculation
200202
$this->scheduleAdaptiveBaselineCalculation($scheduler);
201203

204+
// Schedule queue metrics calculation (aggregate job metrics into queue metrics)
205+
$scheduler->command('queue-metrics:calculate')
206+
->everyMinute()
207+
->withoutOverlapping();
208+
202209
// Schedule trend data recording (every minute for real-time trends)
203210
$scheduler->command('queue-metrics:record-trends')
204211
->everyMinute()

0 commit comments

Comments
 (0)