Skip to content

Commit 80b5719

Browse files
committed
[CI] heart beating ongoing chunk
1 parent dbe8b01 commit 80b5719

File tree

9 files changed

+200
-23
lines changed

9 files changed

+200
-23
lines changed

redis/heartbeat.lua

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,45 @@ local processed_key = KEYS[2]
33
local owners_key = KEYS[3]
44
local worker_queue_key = KEYS[4]
55

6-
local current_time = ARGV[1]
6+
local current_time = tonumber(ARGV[1])
77
local test = ARGV[2]
88

99
-- already processed, we do not need to bump the timestamp
1010
if redis.call('sismember', processed_key, test) == 1 then
1111
return false
1212
end
1313

14-
-- we're still the owner of the test, we can bump the timestamp
15-
if redis.call('hget', owners_key, test) == worker_queue_key then
16-
return redis.call('zadd', zset_key, current_time, test)
14+
-- we're still the owner of the test, check if we need to extend the deadline
15+
local owner_value = redis.call('hget', owners_key, test)
16+
if owner_value then
17+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
18+
local pipe_pos = string.find(owner_value, "|")
19+
if not pipe_pos then
20+
return false
21+
end
22+
local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1)
23+
24+
if stored_worker_key == worker_queue_key then
25+
-- Always update last heartbeat timestamp in owners hash
26+
local new_owner_value = worker_queue_key .. "|" .. current_time
27+
redis.call('hset', owners_key, test, new_owner_value)
28+
29+
local deadline = redis.call('zscore', zset_key, test)
30+
if deadline then
31+
deadline = tonumber(deadline)
32+
-- Only extend if deadline is within 20 seconds of expiring
33+
-- This prevents infinite extensions while still allowing reasonable extension
34+
if deadline - 20 < current_time then
35+
-- Extend by 1 minute (more conservative than before)
36+
local new_deadline = current_time + 60
37+
redis.call('zadd', zset_key, new_deadline, test)
38+
-- Return old deadline and new deadline
39+
return {deadline, new_deadline}
40+
end
41+
end
42+
-- No extension needed, but heartbeat was recorded
43+
return 0
44+
end
1745
end
46+
47+
return false

redis/release.lua

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,20 @@ local zset_key = KEYS[1]
22
local worker_queue_key = KEYS[2]
33
local owners_key = KEYS[3]
44

5-
-- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...}
5+
-- owned_tests = {"SomeTest", "worker:1|1234567890", "SomeOtherTest", "worker:2|1234567891", ...}
66
local owned_tests = redis.call('hgetall', owners_key)
77
for index, owner_or_test in ipairs(owned_tests) do
8-
if owner_or_test == worker_queue_key then -- If we owned a test
9-
local test = owned_tests[index - 1]
10-
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
11-
return nil
8+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
9+
local pipe_pos = string.find(owner_or_test, "|")
10+
if pipe_pos then
11+
local stored_worker_key = string.sub(owner_or_test, 1, pipe_pos - 1)
12+
13+
if stored_worker_key == worker_queue_key then -- If we owned a test
14+
local test = owned_tests[index - 1]
15+
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
16+
redis.call('hdel', owners_key, test) -- Remove from owners hash to clear heartbeat
17+
return nil
18+
end
1219
end
1320
end
1421

redis/requeue.lua

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,17 @@ local global_max_requeues = tonumber(ARGV[2])
1010
local test = ARGV[3]
1111
local offset = ARGV[4]
1212

13-
if redis.call('hget', owners_key, test) == worker_queue_key then
14-
redis.call('hdel', owners_key, test)
13+
local owner_value = redis.call('hget', owners_key, test)
14+
if owner_value then
15+
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
16+
local pipe_pos = string.find(owner_value, "|")
17+
if pipe_pos then
18+
local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1)
19+
20+
if stored_worker_key == worker_queue_key then
21+
redis.call('hdel', owners_key, test)
22+
end
23+
end
1524
end
1625

1726
if redis.call('sismember', processed_key, test) == 1 then

redis/reserve.lua

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ if test then
2323
redis.call('zadd', zset_key, current_time, test)
2424
end
2525
redis.call('lpush', worker_queue_key, test)
26-
redis.call('hset', owners_key, test, worker_queue_key)
26+
-- Store owner with initial heartbeat timestamp (current_time)
27+
local owner_value = worker_queue_key .. "|" .. current_time
28+
redis.call('hset', owners_key, test, owner_value)
2729
return test
2830
else
2931
return nil

redis/reserve_lost.lua

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,34 @@ local timeout = tonumber(ARGV[2])
99
local use_dynamic_deadline = ARGV[3] == "true"
1010
local default_timeout = tonumber(ARGV[4]) or 0
1111

12-
local lost_tests
13-
if use_dynamic_deadline then
14-
lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time)
15-
else
16-
lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
12+
-- Helper: returns heartbeat age in seconds, or nil if no valid heartbeat
13+
local function get_heartbeat_age(test)
14+
local owner_value = redis.call('hget', owners_key, test)
15+
if not owner_value then return nil end
16+
local pipe_pos = string.find(owner_value, "|")
17+
if not pipe_pos then return nil end
18+
local last_heartbeat = tonumber(string.sub(owner_value, pipe_pos + 1))
19+
if not last_heartbeat then return nil end
20+
return current_time - last_heartbeat
1721
end
1822

19-
for _, test in ipairs(lost_tests) do
23+
-- Collect tests that can be stolen:
24+
-- 1. Expired deadline AND old heartbeat (> 2 minutes)
25+
-- 2. Non-expired deadline AND old heartbeat (> 2 minutes)
26+
local stealable_tests = {}
27+
28+
local all_running_tests = redis.call('zrange', zset_key, 0, -1)
29+
for _, test in ipairs(all_running_tests) do
30+
if redis.call('sismember', processed_key, test) == 0 then
31+
local heartbeat_age = get_heartbeat_age(test)
32+
-- Only steal if heartbeat is old (> 2 minutes) or missing
33+
if not heartbeat_age or heartbeat_age >= 120 then
34+
table.insert(stealable_tests, test)
35+
end
36+
end
37+
end
38+
39+
for _, test in ipairs(stealable_tests) do
2040
if redis.call('sismember', processed_key, test) == 0 then
2141
if use_dynamic_deadline then
2242
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
@@ -30,7 +50,9 @@ for _, test in ipairs(lost_tests) do
3050
redis.call('zadd', zset_key, current_time + timeout, test)
3151
end
3252
redis.call('lpush', worker_queue_key, test)
33-
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
53+
-- Store owner with initial heartbeat timestamp (current_time)
54+
local new_owner_value = worker_queue_key .. "|" .. current_time
55+
redis.call('hset', owners_key, test, new_owner_value) -- Take ownership
3456
return test
3557
end
3658
end

ruby/lib/ci/queue/redis/base.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ def load_script(script)
122122
@scripts_cache[script] ||= redis.script(:load, read_script(script))
123123
end
124124

125+
def ensure_connection_and_script(script)
126+
# Pre-initialize Redis connection and script in current thread context
127+
# This ensures background threads use the same initialized connection
128+
load_script(script)
129+
# Ping Redis to ensure connection is established
130+
redis.ping
131+
end
132+
125133
def read_script(name)
126134
::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua"))
127135
rescue SystemCallError

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,37 @@ def poll
8787
executable = resolve_executable(id)
8888

8989
if executable
90-
yield executable
90+
# Pre-initialize Redis connection and heartbeat script in main thread
91+
# to ensure background thread uses the same initialized context
92+
ensure_connection_and_script(:heartbeat)
93+
94+
# Start heartbeat thread to extend lease while executing
95+
heartbeat_thread = nil
96+
begin
97+
heartbeat_thread = Thread.new do
98+
heartbeat_interval = 10 # Send heartbeat every 10 seconds
99+
loop do
100+
break if Thread.current[:stop]
101+
102+
sleep heartbeat_interval
103+
104+
break if Thread.current[:stop]
105+
106+
heartbeat(id)
107+
end
108+
end
109+
heartbeat_thread[:stop] = false
110+
111+
yield executable
112+
ensure
113+
# Stop heartbeat thread when execution completes
114+
# This ensures it's stopped after acknowledge has completed
115+
if heartbeat_thread
116+
heartbeat_thread[:stop] = true
117+
heartbeat_thread.wakeup # Interrupt sleep if thread is sleeping
118+
heartbeat_thread.join(2) # Wait up to 2 seconds for thread to finish
119+
end
120+
end
91121
else
92122
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
93123
acknowledge(id)
@@ -209,6 +239,31 @@ def release!
209239
nil
210240
end
211241

242+
def heartbeat(test_id)
243+
current_time = CI::Queue.time_now.to_f
244+
result = eval_script(
245+
:heartbeat,
246+
keys: [
247+
key('running'),
248+
key('processed'),
249+
key('owners'),
250+
key('worker', worker_id, 'queue')
251+
],
252+
argv: [current_time, test_id]
253+
)
254+
if result.is_a?(Array) && result.size == 2
255+
old_deadline = result[0]
256+
new_deadline = result[1]
257+
old_deadline_readable = Time.at(old_deadline).strftime('%Y-%m-%d %H:%M:%S')
258+
new_deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S')
259+
warn("[heartbeat] Extended deadline for #{test_id.inspect} from #{old_deadline_readable} (#{old_deadline}) to #{new_deadline_readable} (#{new_deadline})")
260+
end
261+
result
262+
rescue *CONNECTION_ERRORS => e
263+
warn("Failed to send heartbeat for #{test_id.inspect}: #{e.class} - #{e.message}")
264+
false
265+
end
266+
212267
private
213268

214269
attr_reader :index

ruby/test/ci/queue/.DS_Store

8 KB
Binary file not shown.

ruby/test/ci/queue/redis/dynamic_timeout_test.rb

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,16 +268,60 @@ def test_single_test_marked_lost_after_default_timeout
268268
reserved_id = worker1.send(:try_to_reserve_test)
269269
assert_equal 'TestA#test_1', reserved_id
270270

271-
# Wait longer than timeout (0.5s)
271+
# Wait longer than timeout (0.5s) but less than 2 minutes
272272
sleep 0.6
273273

274-
# Try to reserve with worker2 - should get the lost test
274+
# Try to reserve with worker2 - should NOT get the test because heartbeat is recent (< 2 minutes)
275275
worker2_config = config.dup
276276
worker2_config.instance_variable_set(:@worker_id, '2')
277277
worker2 = CI::Queue::Redis.new(@redis_url, worker2_config)
278278

279279
lost_test = worker2.send(:try_to_reserve_lost_test)
280-
assert_equal 'TestA#test_1', lost_test, 'Single test should be marked as lost after default timeout'
280+
assert_nil lost_test, 'Test should not be marked as lost if heartbeat is recent (< 2 minutes)'
281+
end
282+
283+
def test_single_test_marked_lost_after_heartbeat_expires
284+
# Create worker with short timeout
285+
config = CI::Queue::Configuration.new(
286+
build_id: 'heartbeat-timeout-test',
287+
worker_id: '1',
288+
timeout: 0.5 # 0.5 seconds
289+
)
290+
291+
worker1 = CI::Queue::Redis.new(@redis_url, config)
292+
293+
# Populate with single test (no chunk)
294+
tests = create_mock_tests(['TestA#test_1'])
295+
296+
worker1.stub(:reorder_tests, tests) do
297+
worker1.populate(tests)
298+
end
299+
300+
# Reserve the test with worker1
301+
reserved_id = worker1.send(:try_to_reserve_test)
302+
assert_equal 'TestA#test_1', reserved_id
303+
304+
# Manually set the heartbeat timestamp to be older than 2 minutes
305+
# by manipulating Redis directly
306+
current_time = CI::Queue.time_now.to_f
307+
old_heartbeat_time = current_time - 130 # 130 seconds ago (more than 2 minutes)
308+
worker_queue_key = "build:heartbeat-timeout-test:worker:1:queue"
309+
owner_value = "#{worker_queue_key}|#{old_heartbeat_time}"
310+
@redis.hset('build:heartbeat-timeout-test:owners', 'TestA#test_1', owner_value)
311+
312+
# Also set the deadline to be in the past so it's considered "lost"
313+
@redis.zadd('build:heartbeat-timeout-test:running', current_time - 10, 'TestA#test_1')
314+
315+
# Wait a bit to ensure time has passed
316+
sleep 0.1
317+
318+
# Try to reserve with worker2 - should get the lost test now
319+
worker2_config = config.dup
320+
worker2_config.instance_variable_set(:@worker_id, '2')
321+
worker2 = CI::Queue::Redis.new(@redis_url, worker2_config)
322+
323+
lost_test = worker2.send(:try_to_reserve_lost_test)
324+
assert_equal 'TestA#test_1', lost_test, 'Test should be marked as lost after heartbeat expires (> 2 minutes)'
281325
end
282326

283327
def test_batching_with_many_chunks

0 commit comments

Comments
 (0)