Skip to content

Commit fec1080

Browse files
feat: add aggregated job metrics calculation
Add ability to calculate aggregated metrics for a job class across all queues: - CalculateAggregatedJobMetricsAction aggregates metrics across connection/queue combinations - AggregatedJobMetricsData DTO with weighted averages for duration and memory - Per-queue breakdown included in aggregated data - Helper methods: hasFailures() and isHealthy() (failure rate < 5%) - Weighted averages account for varying execution counts per queue
1 parent 669e0ab commit fec1080

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PHPeek\LaravelQueueMetrics\Actions;
6+
7+
use Carbon\Carbon;
8+
use PHPeek\LaravelQueueMetrics\DataTransferObjects\AggregatedJobMetricsData;
9+
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\JobMetricsRepository;
10+
11+
/**
12+
* Calculate aggregated metrics for a job class across all queues.
13+
*/
14+
final readonly class CalculateAggregatedJobMetricsAction
15+
{
16+
public function __construct(
17+
private JobMetricsRepository $repository,
18+
private CalculateJobMetricsAction $calculateJobMetrics,
19+
) {}
20+
21+
public function execute(string $jobClass): AggregatedJobMetricsData
22+
{
23+
// Get all jobs from the discovery set
24+
$allJobs = $this->repository->listJobs();
25+
26+
// Filter to find all connection/queue combinations for this job class
27+
$jobQueues = array_filter($allJobs, fn (array $job): bool => $job['jobClass'] === $jobClass);
28+
29+
// If no queues found, return empty aggregated metrics
30+
if (empty($jobQueues)) {
31+
return new AggregatedJobMetricsData(
32+
jobClass: $jobClass,
33+
totalExecutions: 0,
34+
totalFailures: 0,
35+
avgDurationMs: 0.0,
36+
avgMemoryMb: 0.0,
37+
failureRate: 0.0,
38+
throughputPerMinute: 0.0,
39+
byQueue: [],
40+
calculatedAt: Carbon::now(),
41+
);
42+
}
43+
44+
// Calculate metrics for each connection/queue combination
45+
$byQueue = [];
46+
$totalExecutions = 0;
47+
$totalFailures = 0;
48+
$weightedDuration = 0.0;
49+
$weightedMemory = 0.0;
50+
$totalThroughput = 0.0;
51+
52+
foreach ($jobQueues as $job) {
53+
$metrics = $this->calculateJobMetrics->execute(
54+
$job['jobClass'],
55+
$job['connection'],
56+
$job['queue']
57+
);
58+
59+
$executions = $metrics->execution->totalProcessed;
60+
$failures = $metrics->execution->totalFailed;
61+
62+
// Accumulate totals
63+
$totalExecutions += $executions;
64+
$totalFailures += $failures;
65+
66+
// Weighted averages (weight by number of executions)
67+
if ($executions > 0) {
68+
$weightedDuration += $metrics->duration->avg * $executions;
69+
$weightedMemory += $metrics->memory->avg * $executions;
70+
}
71+
72+
$totalThroughput += $metrics->throughput->perMinute;
73+
74+
// Calculate failure rate for this queue
75+
$totalJobs = $executions + $failures;
76+
$failureRate = $totalJobs > 0 ? ($failures / $totalJobs) * 100 : 0.0;
77+
78+
// Add to by_queue breakdown
79+
$byQueue[] = [
80+
'connection' => $job['connection'],
81+
'queue' => $job['queue'],
82+
'executions' => $executions,
83+
'failures' => $failures,
84+
'avg_duration_ms' => round($metrics->duration->avg, 2),
85+
'avg_memory_mb' => round($metrics->memory->avg, 2),
86+
'failure_rate' => round($failureRate, 2),
87+
'throughput_per_minute' => round($metrics->throughput->perMinute, 2),
88+
];
89+
}
90+
91+
// Calculate overall weighted averages
92+
$avgDurationMs = $totalExecutions > 0 ? $weightedDuration / $totalExecutions : 0.0;
93+
$avgMemoryMb = $totalExecutions > 0 ? $weightedMemory / $totalExecutions : 0.0;
94+
95+
// Calculate overall failure rate
96+
$totalJobs = $totalExecutions + $totalFailures;
97+
$failureRate = $totalJobs > 0 ? ($totalFailures / $totalJobs) * 100 : 0.0;
98+
99+
return new AggregatedJobMetricsData(
100+
jobClass: $jobClass,
101+
totalExecutions: $totalExecutions,
102+
totalFailures: $totalFailures,
103+
avgDurationMs: $avgDurationMs,
104+
avgMemoryMb: $avgMemoryMb,
105+
failureRate: $failureRate,
106+
throughputPerMinute: $totalThroughput,
107+
byQueue: $byQueue,
108+
calculatedAt: Carbon::now(),
109+
);
110+
}
111+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace PHPeek\LaravelQueueMetrics\DataTransferObjects;
6+
7+
use Carbon\Carbon;
8+
9+
/**
10+
* Aggregated metrics for a job class across all queues.
11+
*/
12+
final readonly class AggregatedJobMetricsData
13+
{
14+
/**
15+
* @param array<array{connection: string, queue: string, executions: int, avg_duration_ms: float, avg_memory_mb: float, failures: int, failure_rate: float, throughput_per_minute: float}> $byQueue
16+
*/
17+
public function __construct(
18+
public string $jobClass,
19+
public int $totalExecutions,
20+
public int $totalFailures,
21+
public float $avgDurationMs,
22+
public float $avgMemoryMb,
23+
public float $failureRate,
24+
public float $throughputPerMinute,
25+
public array $byQueue,
26+
public Carbon $calculatedAt,
27+
) {}
28+
29+
/**
30+
* @return array<string, mixed>
31+
*/
32+
public function toArray(): array
33+
{
34+
return [
35+
'job_class' => $this->jobClass,
36+
'total_executions' => $this->totalExecutions,
37+
'total_failures' => $this->totalFailures,
38+
'avg_duration_ms' => round($this->avgDurationMs, 2),
39+
'avg_memory_mb' => round($this->avgMemoryMb, 2),
40+
'failure_rate' => round($this->failureRate, 2),
41+
'throughput_per_minute' => round($this->throughputPerMinute, 2),
42+
'by_queue' => $this->byQueue,
43+
'calculated_at' => $this->calculatedAt->toIso8601String(),
44+
];
45+
}
46+
47+
public function hasFailures(): bool
48+
{
49+
return $this->totalFailures > 0;
50+
}
51+
52+
public function isHealthy(): bool
53+
{
54+
return $this->failureRate < 5.0;
55+
}
56+
}

0 commit comments

Comments
 (0)