Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions redis/heartbeat.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,39 @@ local zset_key = KEYS[1]
local processed_key = KEYS[2]
local owners_key = KEYS[3]
local worker_queue_key = KEYS[4]
local heartbeats_key = KEYS[5]
local test_group_timeout_key = KEYS[6]

local current_time = ARGV[1]
local current_time = tonumber(ARGV[1])
local test = ARGV[2]
local default_timeout = tonumber(ARGV[3]) or 0

-- already processed, we do not need to bump the timestamp
if redis.call('sismember', processed_key, test) == 1 then
return false
return nil
end

-- we're still the owner of the test, we can bump the timestamp
if redis.call('hget', owners_key, test) == worker_queue_key then
return redis.call('zadd', zset_key, current_time, test)
-- Record last heartbeat time in a separate hash for "recent activity" tracking
redis.call('hset', heartbeats_key, test, current_time)

-- Get the dynamic timeout for this test (if any) or use default
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
local timeout_to_use
if dynamic_timeout and dynamic_timeout ~= "" then
timeout_to_use = tonumber(dynamic_timeout)
else
timeout_to_use = default_timeout
end

local new_deadline = current_time + timeout_to_use

-- Extend the deadline by setting score to current_time + timeout
redis.call('zadd', zset_key, new_deadline, test)

-- Return the new deadline and timeout used for logging
return { new_deadline, timeout_to_use }
end

return nil
52 changes: 42 additions & 10 deletions redis/reserve_lost.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ local processed_key = KEYS[2]
local worker_queue_key = KEYS[3]
local owners_key = KEYS[4]
local test_group_timeout_key = KEYS[5]
local heartbeats_key = KEYS[6]

local current_time = tonumber(ARGV[1])
local timeout = tonumber(ARGV[2])
local use_dynamic_deadline = ARGV[3] == "true"
local default_timeout = tonumber(ARGV[4]) or 0
local heartbeat_grace_period = tonumber(ARGV[5]) or 30

local lost_tests
if use_dynamic_deadline then
Expand All @@ -18,20 +20,50 @@ end

for _, test in ipairs(lost_tests) do
if redis.call('sismember', processed_key, test) == 0 then
if use_dynamic_deadline then
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
if not dynamic_timeout or dynamic_timeout == "" then
dynamic_timeout = default_timeout
-- Check if the owner is still actively heartbeating
local last_heartbeat = redis.call('hget', heartbeats_key, test)
if last_heartbeat and last_heartbeat ~= "" then
local heartbeat_age = current_time - tonumber(last_heartbeat)
-- If heartbeated recently (within grace period), skip this test
-- The owner is still actively working on it
if heartbeat_age < heartbeat_grace_period then
-- Skip this test, try the next one
-- Don't claim it since the worker is still alive
else
dynamic_timeout = tonumber(dynamic_timeout)
-- Heartbeat is stale, safe to claim
if use_dynamic_deadline then
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
if not dynamic_timeout or dynamic_timeout == "" then
dynamic_timeout = default_timeout
else
dynamic_timeout = tonumber(dynamic_timeout)
end
redis.call('zadd', zset_key, current_time + dynamic_timeout, test)
else
redis.call('zadd', zset_key, current_time + timeout, test)
end
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
redis.call('hdel', heartbeats_key, test) -- Clear stale heartbeat
return test
end
redis.call('zadd', zset_key, current_time + dynamic_timeout, test)
else
redis.call('zadd', zset_key, current_time + timeout, test)
-- No heartbeat record, proceed with claiming (legacy behavior or never heartbeated)
if use_dynamic_deadline then
local dynamic_timeout = redis.call('hget', test_group_timeout_key, test)
if not dynamic_timeout or dynamic_timeout == "" then
dynamic_timeout = default_timeout
else
dynamic_timeout = tonumber(dynamic_timeout)
end
redis.call('zadd', zset_key, current_time + dynamic_timeout, test)
else
redis.call('zadd', zset_key, current_time + timeout, test)
end
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
return test
end
redis.call('lpush', worker_queue_key, test)
redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership
return test
end
end

Expand Down
7 changes: 6 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Configuration
attr_accessor :branch
attr_accessor :timing_redis_url
attr_accessor :write_duration_averages
attr_accessor :heartbeat_grace_period, :heartbeat_interval
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -61,7 +62,9 @@ def initialize(
strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil,
suite_max_duration: 120_000, suite_buffer_percent: 10,
branch: nil,
timing_redis_url: nil
timing_redis_url: nil,
heartbeat_grace_period: 30,
heartbeat_interval: 10
)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
Expand Down Expand Up @@ -96,6 +99,8 @@ def initialize(
@branch = branch
@timing_redis_url = timing_redis_url
@write_duration_averages = false
@heartbeat_grace_period = heartbeat_grace_period
@heartbeat_interval = heartbeat_interval
end

def queue_init_timeout
Expand Down
86 changes: 83 additions & 3 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def poll
executable = resolve_executable(id)

if executable
yield executable
with_heartbeat(id) do
yield executable
end
else
warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.")
acknowledge(id)
Expand Down Expand Up @@ -209,10 +211,87 @@ def release!
nil
end

# Send a heartbeat for the currently reserved test to indicate the worker is still active.
# This extends the deadline and prevents other workers from stealing the test.
# Returns true if heartbeat was successful, false if test was already processed or
# we're no longer the owner.
def heartbeat(test_or_id = nil)
test_key = if test_or_id
test_or_id.respond_to?(:id) ? test_or_id.id : test_or_id
else
@reserved_test
end
return false unless test_key

current_time = CI::Queue.time_now.to_f
result = eval_script(
:heartbeat,
keys: [
key('running'),
key('processed'),
key('owners'),
key('worker', worker_id, 'queue'),
key('heartbeats'),
key('test-group-timeout')
],
argv: [current_time, test_key, config.timeout]
)

if result.is_a?(Array) && result.length == 2
new_deadline, timeout_used = result
current_time_readable = Time.at(current_time).strftime('%Y-%m-%d %H:%M:%S')
deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S')
warn "[heartbeat] test=#{test_key} current_time=#{current_time_readable} extended_deadline=#{deadline_readable} timeout=#{timeout_used}s"
true
else
false
end
rescue *CONNECTION_ERRORS
false
end

private

attr_reader :index

# Runs a block while sending periodic heartbeats in a background thread.
# This prevents other workers from stealing the test while it's being executed.
def with_heartbeat(test_id)
return yield unless config.heartbeat_interval&.positive?

# Pre-initialize Redis connection and script in current thread context
# This ensures background threads use the same initialized connection
ensure_connection_and_script(:heartbeat)

stop_heartbeat = false
heartbeat_thread = Thread.new do
until stop_heartbeat
sleep(config.heartbeat_interval)
break if stop_heartbeat

begin
heartbeat(test_id)
rescue StandardError => e
warn("[heartbeat] Failed to send heartbeat for #{test_id}: #{e.message}")
end
end
end

yield
ensure
stop_heartbeat = true
heartbeat_thread&.kill
heartbeat_thread&.join(1) # Wait up to 1 second for thread to finish
end

def ensure_connection_and_script(script)
# Pre-initialize Redis connection and script in current thread context
# This ensures background threads use the same initialized connection
load_script(script)
# Ping Redis to ensure connection is established
redis.ping
end

def worker_id
config.worker_id
end
Expand Down Expand Up @@ -295,9 +374,10 @@ def try_to_reserve_lost_test
key('completed'),
key('worker', worker_id, 'queue'),
key('owners'),
key('test-group-timeout')
key('test-group-timeout'),
key('heartbeats')
],
argv: [current_time, timeout, 'true', config.timeout]
argv: [current_time, timeout, 'true', config.timeout, config.heartbeat_grace_period]
)

if lost_test
Expand Down
Binary file added ruby/test/ci/queue/.DS_Store
Binary file not shown.
6 changes: 4 additions & 2 deletions ruby/test/ci/queue/redis/dynamic_timeout_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -194,17 +194,19 @@ def test_reserve_lost_test_passes_dynamic_deadline_flag
'build:42:completed',
'build:42:worker:1:queue',
'build:42:owners',
'build:42:test-group-timeout' # 5th key for dynamic deadline
'build:42:test-group-timeout', # 5th key for dynamic deadline
'build:42:heartbeats' # 6th key for heartbeat tracking
]

@worker.stub(:eval_script, proc { |script, keys:, argv:|
assert_equal :reserve_lost, script
assert_equal expected_keys, keys
assert_equal 4, argv.length
assert_equal 5, argv.length
assert_instance_of Float, argv[0]
assert_equal @config.timeout, argv[1]
assert_equal 'true', argv[2]
assert_equal @config.timeout, argv[3]
assert_equal @config.heartbeat_grace_period, argv[4]
nil # Return nil (no lost test)
}) do
@worker.send(:try_to_reserve_lost_test)
Expand Down
Loading