@@ -9,14 +9,73 @@ local timeout = tonumber(ARGV[2])
99local use_dynamic_deadline = ARGV [3 ] == " true"
1010local default_timeout = tonumber (ARGV [4 ]) or 0
1111
12- local lost_tests
13- if use_dynamic_deadline then
14- lost_tests = redis .call (' zrangebyscore' , zset_key , 0 , current_time )
15- else
16- lost_tests = redis .call (' zrangebyscore' , zset_key , 0 , current_time - timeout )
12+ -- Helper: checks if a test can be stolen
13+ -- Returns true if:
14+ -- 1. Heartbeat is old (> 2 minutes) or missing, OR
15+ -- 2. Deadline has expired AND no heartbeat has been sent since reservation
16+ -- Owner value format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
17+ local function can_steal_test (test )
18+ local owner_value = redis .call (' hget' , owners_key , test )
19+ if not owner_value then return true end -- No owner, can steal
20+
21+ local first_pipe = string.find (owner_value , " |" )
22+ if not first_pipe then return true end
23+
24+ local rest = string.sub (owner_value , first_pipe + 1 )
25+ local second_pipe = string.find (rest , " |" )
26+
27+ local initial_reservation_time , last_heartbeat
28+ if second_pipe then
29+ -- New format: worker_key|initial_time|last_heartbeat
30+ initial_reservation_time = tonumber (string.sub (rest , 1 , second_pipe - 1 ))
31+ last_heartbeat = tonumber (string.sub (rest , second_pipe + 1 ))
32+ else
33+ -- Old format: worker_key|timestamp (treat as both initial and last heartbeat)
34+ initial_reservation_time = tonumber (rest )
35+ last_heartbeat = initial_reservation_time
36+ end
37+
38+ if not last_heartbeat then return true end
39+
40+ local heartbeat_age = current_time - last_heartbeat
41+
42+ -- If heartbeat is old (> 2 minutes), allow stealing
43+ if heartbeat_age >= 120 then
44+ return true
45+ end
46+
47+ -- If deadline has expired AND no heartbeat has been sent since reservation, allow stealing
48+ -- This handles the case where worker reserved but never sent a heartbeat
49+ local deadline = redis .call (' zscore' , zset_key , test )
50+ if deadline then
51+ deadline = tonumber (deadline )
52+ if deadline <= current_time then
53+ -- Deadline expired - check if heartbeat was ever updated
54+ if initial_reservation_time and last_heartbeat then
55+ -- If initial == last, no heartbeat was sent, allow stealing
56+ if math.abs (initial_reservation_time - last_heartbeat ) < 1 then
57+ return true
58+ end
59+ end
60+ end
61+ end
62+
63+ return false
64+ end
65+
66+ -- Collect tests that can be stolen
67+ local stealable_tests = {}
68+
69+ local all_running_tests = redis .call (' zrange' , zset_key , 0 , - 1 )
70+ for _ , test in ipairs (all_running_tests ) do
71+ if redis .call (' sismember' , processed_key , test ) == 0 then
72+ if can_steal_test (test ) then
73+ table.insert (stealable_tests , test )
74+ end
75+ end
1776end
1877
19- for _ , test in ipairs (lost_tests ) do
78+ for _ , test in ipairs (stealable_tests ) do
2079 if redis .call (' sismember' , processed_key , test ) == 0 then
2180 if use_dynamic_deadline then
2281 local dynamic_timeout = redis .call (' hget' , test_group_timeout_key , test )
@@ -30,7 +89,9 @@ for _, test in ipairs(lost_tests) do
3089 redis .call (' zadd' , zset_key , current_time + timeout , test )
3190 end
3291 redis .call (' lpush' , worker_queue_key , test )
33- redis .call (' hset' , owners_key , test , worker_queue_key ) -- Take ownership
92+ -- Store owner with initial reservation time and last heartbeat time
93+ local new_owner_value = worker_queue_key .. " |" .. current_time .. " |" .. current_time
94+ redis .call (' hset' , owners_key , test , new_owner_value ) -- Take ownership
3495 return test
3596 end
3697end
0 commit comments