Skip to content

Commit 004f443

Browse files
fix(redis): Redis set operations require spread operator for variadic args
Fixed critical bug in PipelineWrapper where addToSet() and removeFromSet() were passing arrays directly to PhpRedis sadd()/srem() commands instead of using spread operator. This caused job discovery to fail completely. Changes: - PipelineWrapper::addToSet(): Changed `sadd($key, $members)` to `sadd($key, ...$members)` - PipelineWrapper::removeFromSet(): Changed `srem($key, $members)` to `srem($key, ...$members)` - CalculateQueueMetricsTest: Added recordStart() calls before recordCompletion() - CalculateQueueMetricsTest: Added Redis flush in beforeEach() for clean state - CalculateQueueMetricsTest: Changed float assertions from toBe() to toEqualWithDelta() Root Cause: PhpRedis expects variadic arguments for set operations: - sadd(key, member1, member2, ...) ✅ - sadd(key, [member1, member2]) ❌ Impact: This bug prevented all job discovery tracking, causing listJobs() to return empty arrays even though job metrics were being recorded correctly. This made queue-level metric aggregation fail with 0 values.
1 parent 4ac41c1 commit 004f443

File tree

2 files changed

+35
-13
lines changed

2 files changed

+35
-13
lines changed

src/Support/PipelineWrapper.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public function removeFromSortedSet(string $key, string $member): void
101101
public function addToSet(string $key, array $members): void
102102
{
103103
if (! empty($members)) {
104-
$this->pipe->sadd($key, $members);
104+
$this->pipe->sadd($key, ...$members);
105105
}
106106
}
107107

@@ -116,7 +116,7 @@ public function getSetMembers(string $key): mixed
116116
public function removeFromSet(string $key, array $members): void
117117
{
118118
if (! empty($members)) {
119-
$this->pipe->srem($key, $members);
119+
$this->pipe->srem($key, ...$members);
120120
}
121121
}
122122

tests/Feature/CalculateQueueMetricsTest.php

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
config()->set('queue-metrics.enabled', true);
1717
config()->set('queue-metrics.storage.driver', 'redis');
1818
config()->set('queue-metrics.storage.connection', 'default');
19+
20+
// Flush Redis before each test to ensure clean state
21+
\Illuminate\Support\Facades\Redis::connection('default')->flushdb();
1922
});
2023

2124
it('calculates queue metrics from job metrics', function () {
@@ -28,34 +31,39 @@
2831
$jobRepo = app(JobMetricsRepository::class);
2932
$queueRepo = app(QueueMetricsRepository::class);
3033

31-
// Mark jobs as discovered
32-
$jobRepo->markJobDiscovered($jobClass1, $connection, $queue);
33-
$jobRepo->markJobDiscovered($jobClass2, $connection, $queue);
34-
3534
// Mark queue as discovered
3635
$queueRepo->markQueueDiscovered($connection, $queue);
3736

3837
// Record job completions for job 1: 10 jobs, 100ms avg duration
38+
// Note: recordStart() now handles job discovery atomically
3939
for ($i = 0; $i < 10; $i++) {
40+
$jobId = "job-1-{$i}";
41+
$jobRepo->recordStart($jobId, $jobClass1, $connection, $queue, \Carbon\Carbon::now());
4042
$jobRepo->recordCompletion(
43+
jobId: $jobId,
4144
jobClass: $jobClass1,
4245
connection: $connection,
4346
queue: $queue,
4447
durationMs: 100.0,
4548
memoryMb: 10.0,
4649
cpuTimeMs: 50.0,
50+
completedAt: \Carbon\Carbon::now(),
4751
);
4852
}
4953

5054
// Record job completions for job 2: 5 jobs, 200ms avg duration
5155
for ($i = 0; $i < 5; $i++) {
56+
$jobId = "job-2-{$i}";
57+
$jobRepo->recordStart($jobId, $jobClass2, $connection, $queue, \Carbon\Carbon::now());
5258
$jobRepo->recordCompletion(
59+
jobId: $jobId,
5360
jobClass: $jobClass2,
5461
connection: $connection,
5562
queue: $queue,
5663
durationMs: 200.0,
5764
memoryMb: 15.0,
5865
cpuTimeMs: 75.0,
66+
completedAt: \Carbon\Carbon::now(),
5967
);
6068
}
6169

@@ -70,7 +78,7 @@
7078

7179
// Weighted average duration: (10 * 100 + 5 * 200) / 15 = 133.33ms
7280
$expectedAvgDuration = (10 * 100.0 + 5 * 200.0) / 15;
73-
expect($metrics['avg_duration'])->toBe($expectedAvgDuration);
81+
expect($metrics['avg_duration'])->toEqualWithDelta($expectedAvgDuration, 0.01);
7482

7583
// Throughput should be sum of both job classes
7684
expect($metrics['throughput_per_minute'])->toBeGreaterThan(0.0);
@@ -107,29 +115,34 @@
107115
$jobRepo = app(JobMetricsRepository::class);
108116
$queueRepo = app(QueueMetricsRepository::class);
109117

110-
$jobRepo->markJobDiscovered($jobClass, $connection, $queue);
111118
$queueRepo->markQueueDiscovered($connection, $queue);
112119

113120
// Record 7 successful completions
121+
// Note: recordStart() now handles job discovery atomically
114122
for ($i = 0; $i < 7; $i++) {
123+
$jobId = "job-success-{$i}";
124+
$jobRepo->recordStart($jobId, $jobClass, $connection, $queue, \Carbon\Carbon::now());
115125
$jobRepo->recordCompletion(
126+
jobId: $jobId,
116127
jobClass: $jobClass,
117128
connection: $connection,
118129
queue: $queue,
119130
durationMs: 100.0,
120131
memoryMb: 10.0,
121132
cpuTimeMs: 50.0,
133+
completedAt: \Carbon\Carbon::now(),
122134
);
123135
}
124136

125137
// Record 3 failures
126138
for ($i = 0; $i < 3; $i++) {
127139
$jobRepo->recordFailure(
140+
jobId: "job-failure-{$i}",
128141
jobClass: $jobClass,
129142
connection: $connection,
130143
queue: $queue,
131144
exception: 'Test exception',
132-
failedAt: time(),
145+
failedAt: \Carbon\Carbon::now(),
133146
);
134147
}
135148

@@ -156,16 +169,20 @@
156169
foreach ($queues as $q) {
157170
$queueRepo->markQueueDiscovered($q['connection'], $q['queue']);
158171
$jobClass = 'App\\Jobs\\TestJob'.ucfirst($q['queue']);
159-
$jobRepo->markJobDiscovered($jobClass, $q['connection'], $q['queue']);
160172

161173
// Record some completions
174+
// Note: recordStart() now handles job discovery atomically
175+
$jobId = "job-{$q['queue']}-1";
176+
$jobRepo->recordStart($jobId, $jobClass, $q['connection'], $q['queue'], \Carbon\Carbon::now());
162177
$jobRepo->recordCompletion(
178+
jobId: $jobId,
163179
jobClass: $jobClass,
164180
connection: $q['connection'],
165181
queue: $q['queue'],
166182
durationMs: 150.0,
167183
memoryMb: 12.0,
168184
cpuTimeMs: 60.0,
185+
completedAt: \Carbon\Carbon::now(),
169186
);
170187
}
171188

@@ -178,7 +195,7 @@
178195
foreach ($queues as $q) {
179196
$metrics = $queueRepo->getLatestMetrics($q['connection'], $q['queue']);
180197
expect($metrics)->not()->toBeEmpty();
181-
expect($metrics['avg_duration'])->toBe(150.0);
198+
expect($metrics['avg_duration'])->toEqualWithDelta(150.0, 0.01);
182199
}
183200
})->group('redis');
184201

@@ -191,15 +208,20 @@
191208
$queueRepo = app(QueueMetricsRepository::class);
192209

193210
$queueRepo->markQueueDiscovered($connection, $queue);
194-
$jobRepo->markJobDiscovered($jobClass, $connection, $queue);
195211

212+
// Record job completion
213+
// Note: recordStart() now handles job discovery atomically
214+
$jobId = 'job-specific-1';
215+
$jobRepo->recordStart($jobId, $jobClass, $connection, $queue, \Carbon\Carbon::now());
196216
$jobRepo->recordCompletion(
217+
jobId: $jobId,
197218
jobClass: $jobClass,
198219
connection: $connection,
199220
queue: $queue,
200221
durationMs: 250.0,
201222
memoryMb: 20.0,
202223
cpuTimeMs: 100.0,
224+
completedAt: \Carbon\Carbon::now(),
203225
);
204226

205227
// Execute command for specific queue
@@ -211,7 +233,7 @@
211233
expect($exitCode)->toBe(0);
212234

213235
$metrics = $queueRepo->getLatestMetrics($connection, $queue);
214-
expect($metrics['avg_duration'])->toBe(250.0);
236+
expect($metrics['avg_duration'])->toEqualWithDelta(250.0, 0.01);
215237
})->group('redis');
216238

217239
it('command fails when queue specified without connection', function () {

0 commit comments

Comments
 (0)