Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions redis/heartbeat.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,73 @@ local zset_key = KEYS[1]
local processed_key = KEYS[2]
local owners_key = KEYS[3]
local worker_queue_key = KEYS[4]
local test_group_timeout_key = KEYS[5]

local current_time = ARGV[1]
local current_time = tonumber(ARGV[1])
local test = ARGV[2]
local default_timeout = tonumber(ARGV[3]) or 0

-- already processed, we do not need to bump the timestamp
if redis.call('sismember', processed_key, test) == 1 then
return false
end

-- we're still the owner of the test, we can bump the timestamp
if redis.call('hget', owners_key, test) == worker_queue_key then
return redis.call('zadd', zset_key, current_time, test)
-- we're still the owner of the test, check if we need to extend the deadline
local owner_value = redis.call('hget', owners_key, test)
if owner_value then
-- Parse owner value: format is "worker_queue_key|initial_reservation_time|last_heartbeat_time"
local first_pipe = string.find(owner_value, "|")
if not first_pipe then
return false
end
local stored_worker_key = string.sub(owner_value, 1, first_pipe - 1)

if stored_worker_key == worker_queue_key then
-- Parse initial reservation time and last heartbeat time
local rest = string.sub(owner_value, first_pipe + 1)
local second_pipe = string.find(rest, "|")
local initial_reservation_time
if second_pipe then
initial_reservation_time = tonumber(string.sub(rest, 1, second_pipe - 1))
else
-- Backward compatibility: old format only has one timestamp
initial_reservation_time = tonumber(rest)
end

-- Update last heartbeat timestamp in owners hash (keep initial reservation time)
local new_owner_value = worker_queue_key .. "|" .. (initial_reservation_time or current_time) .. "|" .. current_time
redis.call('hset', owners_key, test, new_owner_value)

local deadline = redis.call('zscore', zset_key, test)
if deadline then
deadline = tonumber(deadline)

-- Get the estimated timeout for this test
local estimated_timeout = redis.call('hget', test_group_timeout_key, test)
if not estimated_timeout or estimated_timeout == "" then
estimated_timeout = default_timeout
else
estimated_timeout = tonumber(estimated_timeout)
end

-- Cap deadline at 3x the estimated timeout from initial reservation
local max_deadline = (initial_reservation_time or current_time) + (estimated_timeout * 3)

-- Only extend if deadline is within 20 seconds of expiring
if deadline - 20 < current_time then
-- Extend by 1 minute, but don't exceed max deadline
local new_deadline = math.min(current_time + 60, max_deadline)

-- Only update if we're actually extending
if new_deadline > deadline then
redis.call('zadd', zset_key, new_deadline, test)
return {deadline, new_deadline}
end
end
end
-- No extension needed, but heartbeat was recorded
return 0
end
end

return false
17 changes: 12 additions & 5 deletions redis/release.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ local zset_key = KEYS[1]
local worker_queue_key = KEYS[2]
local owners_key = KEYS[3]

-- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...}
-- owned_tests = {"SomeTest", "worker:1|1234567890", "SomeOtherTest", "worker:2|1234567891", ...}
local owned_tests = redis.call('hgetall', owners_key)
for index, owner_or_test in ipairs(owned_tests) do
if owner_or_test == worker_queue_key then -- If we owned a test
local test = owned_tests[index - 1]
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
return nil
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
local pipe_pos = string.find(owner_or_test, "|")
if pipe_pos then
local stored_worker_key = string.sub(owner_or_test, 1, pipe_pos - 1)

if stored_worker_key == worker_queue_key then -- If we owned a test
local test = owned_tests[index - 1]
redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately
redis.call('hdel', owners_key, test) -- Remove from owners hash to clear heartbeat
return nil
end
end
end

Expand Down
13 changes: 11 additions & 2 deletions redis/requeue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@ local global_max_requeues = tonumber(ARGV[2])
local test = ARGV[3]
local offset = ARGV[4]

if redis.call('hget', owners_key, test) == worker_queue_key then
redis.call('hdel', owners_key, test)
local owner_value = redis.call('hget', owners_key, test)
if owner_value then
-- Parse owner value: format is "worker_queue_key|heartbeat_timestamp"
local pipe_pos = string.find(owner_value, "|")
if pipe_pos then
local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1)

if stored_worker_key == worker_queue_key then
redis.call('hdel', owners_key, test)
end
end
end

if redis.call('sismember', processed_key, test) == 1 then
Expand Down
5 changes: 4 additions & 1 deletion redis/reserve.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ if test then
redis.call('zadd', zset_key, current_time, test)
end
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key)
-- Store owner with initial reservation time and last heartbeat time
-- Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
local owner_value = worker_queue_key .. "|" .. current_time .. "|" .. current_time
redis.call('hset', owners_key, test, owner_value)
return test
else
return nil
Expand Down
51 changes: 44 additions & 7 deletions redis/reserve_lost.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,49 @@ local timeout = tonumber(ARGV[2])
local use_dynamic_deadline = ARGV[3] == "true"
local default_timeout = tonumber(ARGV[4]) or 0

local lost_tests
if use_dynamic_deadline then
lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time)
else
lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout)
-- Helper: checks if a test can be stolen
-- Returns true if heartbeat is old (> 2 minutes) or missing
-- Owner value format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
local function can_steal_test(test)
local owner_value = redis.call('hget', owners_key, test)
if not owner_value then return true end -- No owner, can steal

local first_pipe = string.find(owner_value, "|")
if not first_pipe then return true end

local rest = string.sub(owner_value, first_pipe + 1)
local second_pipe = string.find(rest, "|")

local last_heartbeat
if second_pipe then
-- New format: worker_key|initial_time|last_heartbeat
last_heartbeat = tonumber(string.sub(rest, second_pipe + 1))
else
-- Old format: worker_key|timestamp (treat as last heartbeat)
last_heartbeat = tonumber(rest)
end

if not last_heartbeat then return true end

local heartbeat_age = current_time - last_heartbeat

-- Only steal if heartbeat is old (> 2 minutes)
return heartbeat_age >= 120
end

-- Collect tests that can be stolen
local stealable_tests = {}

local all_running_tests = redis.call('zrange', zset_key, 0, -1)
for _, test in ipairs(all_running_tests) do
if redis.call('sismember', processed_key, test) == 0 then
if can_steal_test(test) then
table.insert(stealable_tests, test)
end
end
end

for _, test in ipairs(lost_tests) do
for _, test in ipairs(stealable_tests) do
if redis.call('sismember', processed_key, test) == 0 then
if use_dynamic_deadline then
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
Expand All @@ -30,7 +65,9 @@ for _, test in ipairs(lost_tests) do
redis.call('zadd', zset_key, current_time + timeout, test)
end
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
-- Store owner with initial reservation time and last heartbeat time
local new_owner_value = worker_queue_key .. "|" .. current_time .. "|" .. current_time
redis.call('hset', owners_key, test, new_owner_value) -- Take ownership
return test
end
end
Expand Down
8 changes: 8 additions & 0 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ def load_script(script)
@scripts_cache[script] ||= redis.script(:load, read_script(script))
end

def ensure_connection_and_script(script)
# Pre-initialize Redis connection and script in current thread context
# This ensures background threads use the same initialized connection
load_script(script)
# Ping Redis to ensure connection is established
redis.ping
end

def read_script(name)
::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua"))
rescue SystemCallError
Expand Down
58 changes: 57 additions & 1 deletion ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,37 @@ def poll
executable = resolve_executable(id)

if executable
yield executable
# Pre-initialize Redis connection and heartbeat script in main thread
# to ensure background thread uses the same initialized context
ensure_connection_and_script(:heartbeat)

# Start heartbeat thread to extend lease while executing
heartbeat_thread = nil
begin
heartbeat_thread = Thread.new do
heartbeat_interval = 10 # Send heartbeat every 10 seconds
loop do
break if Thread.current[:stop]

sleep heartbeat_interval

break if Thread.current[:stop]

heartbeat(id)
end
end
heartbeat_thread[:stop] = false

yield executable
ensure
# Stop heartbeat thread when execution completes
# This ensures it's stopped after acknowledge has completed
if heartbeat_thread
heartbeat_thread[:stop] = true
heartbeat_thread.wakeup # Interrupt sleep if thread is sleeping
heartbeat_thread.join(2) # Wait up to 2 seconds for thread to finish
end
end
else
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
acknowledge(id)
Expand Down Expand Up @@ -209,6 +239,32 @@ def release!
nil
end

def heartbeat(test_id)
current_time = CI::Queue.time_now.to_f
result = eval_script(
:heartbeat,
keys: [
key('running'),
key('processed'),
key('owners'),
key('worker', worker_id, 'queue'),
key('test-group-timeout')
],
argv: [current_time, test_id, config.timeout]
)
if result.is_a?(Array) && result.size == 2
old_deadline = result[0]
new_deadline = result[1]
old_deadline_readable = Time.at(old_deadline).strftime('%Y-%m-%d %H:%M:%S')
new_deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S')
warn("[heartbeat] Extended deadline for #{test_id.inspect} from #{old_deadline_readable} (#{old_deadline}) to #{new_deadline_readable} (#{new_deadline})")
end
result
rescue *CONNECTION_ERRORS => e
warn("Failed to send heartbeat for #{test_id.inspect}: #{e.class} - #{e.message}")
false
end

private

attr_reader :index
Expand Down
Binary file added ruby/test/ci/queue/.DS_Store
Binary file not shown.
52 changes: 49 additions & 3 deletions ruby/test/ci/queue/redis/dynamic_timeout_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,62 @@ def test_single_test_marked_lost_after_default_timeout
reserved_id = worker1.send(:try_to_reserve_test)
assert_equal 'TestA#test_1', reserved_id

# Wait longer than timeout (0.5s)
# Wait longer than timeout (0.5s) but less than 2 minutes
sleep 0.6

# Try to reserve with worker2 - should get the lost test
# Try to reserve with worker2 - should NOT get the test because heartbeat is recent (< 2 minutes)
worker2_config = config.dup
worker2_config.instance_variable_set(:@worker_id, '2')
worker2 = CI::Queue::Redis.new(@redis_url, worker2_config)

lost_test = worker2.send(:try_to_reserve_lost_test)
assert_equal 'TestA#test_1', lost_test, 'Single test should be marked as lost after default timeout'
assert_nil lost_test, 'Test should not be marked as lost if heartbeat is recent (< 2 minutes)'
end

def test_single_test_marked_lost_after_heartbeat_expires
# Create worker with short timeout
config = CI::Queue::Configuration.new(
build_id: 'heartbeat-timeout-test',
worker_id: '1',
timeout: 0.5 # 0.5 seconds
)

worker1 = CI::Queue::Redis.new(@redis_url, config)

# Populate with single test (no chunk)
tests = create_mock_tests(['TestA#test_1'])

worker1.stub(:reorder_tests, tests) do
worker1.populate(tests)
end

# Reserve the test with worker1
reserved_id = worker1.send(:try_to_reserve_test)
assert_equal 'TestA#test_1', reserved_id

# Manually set the heartbeat timestamp to be older than 2 minutes
# by manipulating Redis directly
# Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
current_time = CI::Queue.time_now.to_f
old_heartbeat_time = current_time - 130 # 130 seconds ago (more than 2 minutes)
initial_reservation_time = current_time - 140 # Initial reservation was before the heartbeat
worker_queue_key = "build:heartbeat-timeout-test:worker:1:queue"
owner_value = "#{worker_queue_key}|#{initial_reservation_time}|#{old_heartbeat_time}"
@redis.hset('build:heartbeat-timeout-test:owners', 'TestA#test_1', owner_value)

# Also set the deadline to be in the past so it's considered "lost"
@redis.zadd('build:heartbeat-timeout-test:running', current_time - 10, 'TestA#test_1')

# Wait a bit to ensure time has passed
sleep 0.1

# Try to reserve with worker2 - should get the lost test now
worker2_config = config.dup
worker2_config.instance_variable_set(:@worker_id, '2')
worker2 = CI::Queue::Redis.new(@redis_url, worker2_config)

lost_test = worker2.send(:try_to_reserve_lost_test)
assert_equal 'TestA#test_1', lost_test, 'Test should be marked as lost after heartbeat expires (> 2 minutes)'
end

def test_batching_with_many_chunks
Expand Down
6 changes: 4 additions & 2 deletions ruby/test/ci/queue/redis/worker_chunk_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,11 @@ def test_resolved_chunk_detects_flaky_tests

def test_acknowledge_chunk
# Set up a chunk as if it were reserved (in running zset)
# Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time"
chunk_id = 'TestA:chunk_0'
@redis.zadd('build:42:running', Time.now.to_i, chunk_id)
@redis.hset('build:42:owners', chunk_id, 'build:42:worker:1:queue')
current_time = Time.now.to_f
@redis.zadd('build:42:running', current_time.to_i, chunk_id)
@redis.hset('build:42:owners', chunk_id, "build:42:worker:1:queue|#{current_time}|#{current_time}")
@worker.instance_variable_set(:@reserved_test, chunk_id)

# Acknowledge the chunk
Expand Down
Loading
Loading