diff --git a/redis/heartbeat.lua b/redis/heartbeat.lua index 6c9c1e9..0680f89 100644 --- a/redis/heartbeat.lua +++ b/redis/heartbeat.lua @@ -2,16 +2,73 @@ local zset_key = KEYS[1] local processed_key = KEYS[2] local owners_key = KEYS[3] local worker_queue_key = KEYS[4] +local test_group_timeout_key = KEYS[5] -local current_time = ARGV[1] +local current_time = tonumber(ARGV[1]) local test = ARGV[2] +local default_timeout = tonumber(ARGV[3]) or 0 -- already processed, we do not need to bump the timestamp if redis.call('sismember', processed_key, test) == 1 then return false end --- we're still the owner of the test, we can bump the timestamp -if redis.call('hget', owners_key, test) == worker_queue_key then - return redis.call('zadd', zset_key, current_time, test) +-- we're still the owner of the test, check if we need to extend the deadline +local owner_value = redis.call('hget', owners_key, test) +if owner_value then + -- Parse owner value: format is "worker_queue_key|initial_reservation_time|last_heartbeat_time" + local first_pipe = string.find(owner_value, "|") + if not first_pipe then + return false + end + local stored_worker_key = string.sub(owner_value, 1, first_pipe - 1) + + if stored_worker_key == worker_queue_key then + -- Parse initial reservation time and last heartbeat time + local rest = string.sub(owner_value, first_pipe + 1) + local second_pipe = string.find(rest, "|") + local initial_reservation_time + if second_pipe then + initial_reservation_time = tonumber(string.sub(rest, 1, second_pipe - 1)) + else + -- Backward compatibility: old format only has one timestamp + initial_reservation_time = tonumber(rest) + end + + -- Update last heartbeat timestamp in owners hash (keep initial reservation time) + local new_owner_value = worker_queue_key .. "|" .. (initial_reservation_time or current_time) .. "|" .. current_time + redis.call('hset', owners_key, test, new_owner_value) + + local deadline = redis.call('zscore', zset_key, test) + if deadline then + deadline = tonumber(deadline) + + -- Get the estimated timeout for this test + local estimated_timeout = redis.call('hget', test_group_timeout_key, test) + if not estimated_timeout or estimated_timeout == "" then + estimated_timeout = default_timeout + else + estimated_timeout = tonumber(estimated_timeout) + end + + -- Cap deadline at 3x the estimated timeout from initial reservation + local max_deadline = (initial_reservation_time or current_time) + (estimated_timeout * 3) + + -- Only extend if deadline is within 20 seconds of expiring + if deadline - 20 < current_time then + -- Extend by 1 minute, but don't exceed max deadline + local new_deadline = math.min(current_time + 60, max_deadline) + + -- Only update if we're actually extending + if new_deadline > deadline then + redis.call('zadd', zset_key, new_deadline, test) + return {deadline, new_deadline} + end + end + end + -- No extension needed, but heartbeat was recorded + return 0 + end end + +return false diff --git a/redis/release.lua b/redis/release.lua index 4bcc592..96c745b 100644 --- a/redis/release.lua +++ b/redis/release.lua @@ -2,13 +2,20 @@ local zset_key = KEYS[1] local worker_queue_key = KEYS[2] local owners_key = KEYS[3] --- owned_tests = {"SomeTest", "worker:1", "SomeOtherTest", "worker:2", ...} +-- owned_tests = {"SomeTest", "worker:1|1234567890", "SomeOtherTest", "worker:2|1234567891", ...} local owned_tests = redis.call('hgetall', owners_key) for index, owner_or_test in ipairs(owned_tests) do - if owner_or_test == worker_queue_key then -- If we owned a test - local test = owned_tests[index - 1] - redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately - return nil + -- Parse owner value: format is "worker_queue_key|heartbeat_timestamp" + local pipe_pos = string.find(owner_or_test, "|") + if pipe_pos then + local stored_worker_key = string.sub(owner_or_test, 1, pipe_pos - 1) + + if stored_worker_key == worker_queue_key then -- If we owned a test + local test = owned_tests[index - 1] + redis.call('zadd', zset_key, "0", test) -- We expire the lease immediately + redis.call('hdel', owners_key, test) -- Remove from owners hash to clear heartbeat + return nil + end end end diff --git a/redis/requeue.lua b/redis/requeue.lua index a1fcc25..079f143 100644 --- a/redis/requeue.lua +++ b/redis/requeue.lua @@ -10,8 +10,17 @@ local global_max_requeues = tonumber(ARGV[2]) local test = ARGV[3] local offset = ARGV[4] -if redis.call('hget', owners_key, test) == worker_queue_key then - redis.call('hdel', owners_key, test) +local owner_value = redis.call('hget', owners_key, test) +if owner_value then + -- Parse owner value: format is "worker_queue_key|heartbeat_timestamp" + local pipe_pos = string.find(owner_value, "|") + if pipe_pos then + local stored_worker_key = string.sub(owner_value, 1, pipe_pos - 1) + + if stored_worker_key == worker_queue_key then + redis.call('hdel', owners_key, test) + end + end end if redis.call('sismember', processed_key, test) == 1 then diff --git a/redis/reserve.lua b/redis/reserve.lua index bd9c4af..da1ccee 100644 --- a/redis/reserve.lua +++ b/redis/reserve.lua @@ -23,7 +23,10 @@ if test then redis.call('zadd', zset_key, current_time, test) end redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) + -- Store owner with initial reservation time and last heartbeat time + -- Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time" + local owner_value = worker_queue_key .. "|" .. current_time .. "|" .. current_time + redis.call('hset', owners_key, test, owner_value) return test else return nil diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua index 1f178e9..4ecaaa6 100644 --- a/redis/reserve_lost.lua +++ b/redis/reserve_lost.lua @@ -9,14 +9,49 @@ local timeout = tonumber(ARGV[2]) local use_dynamic_deadline = ARGV[3] == "true" local default_timeout = tonumber(ARGV[4]) or 0 -local lost_tests -if use_dynamic_deadline then - lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time) -else - lost_tests = redis.call('zrangebyscore', zset_key, 0, current_time - timeout) +-- Helper: checks if a test can be stolen +-- Returns true if heartbeat is old (> 2 minutes) or missing +-- Owner value format: "worker_queue_key|initial_reservation_time|last_heartbeat_time" +local function can_steal_test(test) + local owner_value = redis.call('hget', owners_key, test) + if not owner_value then return true end -- No owner, can steal + + local first_pipe = string.find(owner_value, "|") + if not first_pipe then return true end + + local rest = string.sub(owner_value, first_pipe + 1) + local second_pipe = string.find(rest, "|") + + local last_heartbeat + if second_pipe then + -- New format: worker_key|initial_time|last_heartbeat + last_heartbeat = tonumber(string.sub(rest, second_pipe + 1)) + else + -- Old format: worker_key|timestamp (treat as last heartbeat) + last_heartbeat = tonumber(rest) + end + + if not last_heartbeat then return true end + + local heartbeat_age = current_time - last_heartbeat + + -- Only steal if heartbeat is old (> 2 minutes) + return heartbeat_age >= 120 +end + +-- Collect tests that can be stolen +local stealable_tests = {} + +local all_running_tests = redis.call('zrange', zset_key, 0, -1) +for _, test in ipairs(all_running_tests) do + if redis.call('sismember', processed_key, test) == 0 then + if can_steal_test(test) then + table.insert(stealable_tests, test) + end + end end -for _, test in ipairs(lost_tests) do +for _, test in ipairs(stealable_tests) do if redis.call('sismember', processed_key, test) == 0 then if use_dynamic_deadline then local dynamic_timeout = redis.call('hget', test_group_timeout_key, test) @@ -30,7 +65,9 @@ for _, test in ipairs(lost_tests) do redis.call('zadd', zset_key, current_time + timeout, test) end redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership + -- Store owner with initial reservation time and last heartbeat time + local new_owner_value = worker_queue_key .. "|" .. current_time .. "|" .. current_time + redis.call('hset', owners_key, test, new_owner_value) -- Take ownership return test end end diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 5a638e9..ce77dbf 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -122,6 +122,14 @@ def load_script(script) @scripts_cache[script] ||= redis.script(:load, read_script(script)) end + def ensure_connection_and_script(script) + # Pre-initialize Redis connection and script in current thread context + # This ensures background threads use the same initialized connection + load_script(script) + # Ping Redis to ensure connection is established + redis.ping + end + def read_script(name) ::File.read(::File.join(CI::Queue::DEV_SCRIPTS_ROOT, "#{name}.lua")) rescue SystemCallError diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 284d401..da3fbec 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -87,7 +87,37 @@ def poll executable = resolve_executable(id) if executable - yield executable + # Pre-initialize Redis connection and heartbeat script in main thread + # to ensure background thread uses the same initialized context + ensure_connection_and_script(:heartbeat) + + # Start heartbeat thread to extend lease while executing + heartbeat_thread = nil + begin + heartbeat_thread = Thread.new do + heartbeat_interval = 10 # Send heartbeat every 10 seconds + loop do + break if Thread.current[:stop] + + sleep heartbeat_interval + + break if Thread.current[:stop] + + heartbeat(id) + end + end + heartbeat_thread[:stop] = false + + yield executable + ensure + # Stop heartbeat thread when execution completes + # This ensures it's stopped after acknowledge has completed + if heartbeat_thread + heartbeat_thread[:stop] = true + heartbeat_thread.wakeup # Interrupt sleep if thread is sleeping + heartbeat_thread.join(2) # Wait up to 2 seconds for thread to finish + end + end else warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.") acknowledge(id) @@ -209,6 +239,32 @@ def release! nil end + def heartbeat(test_id) + current_time = CI::Queue.time_now.to_f + result = eval_script( + :heartbeat, + keys: [ + key('running'), + key('processed'), + key('owners'), + key('worker', worker_id, 'queue'), + key('test-group-timeout') + ], + argv: [current_time, test_id, config.timeout] + ) + if result.is_a?(Array) && result.size == 2 + old_deadline = result[0] + new_deadline = result[1] + old_deadline_readable = Time.at(old_deadline).strftime('%Y-%m-%d %H:%M:%S') + new_deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S') + warn("[heartbeat] Extended deadline for #{test_id.inspect} from #{old_deadline_readable} (#{old_deadline}) to #{new_deadline_readable} (#{new_deadline})") + end + result + rescue *CONNECTION_ERRORS => e + warn("Failed to send heartbeat for #{test_id.inspect}: #{e.class} - #{e.message}") + false + end + private attr_reader :index diff --git a/ruby/test/ci/queue/.DS_Store b/ruby/test/ci/queue/.DS_Store new file mode 100644 index 0000000..ed4be95 Binary files /dev/null and b/ruby/test/ci/queue/.DS_Store differ diff --git a/ruby/test/ci/queue/redis/dynamic_timeout_test.rb b/ruby/test/ci/queue/redis/dynamic_timeout_test.rb index bb485f9..645262a 100644 --- a/ruby/test/ci/queue/redis/dynamic_timeout_test.rb +++ b/ruby/test/ci/queue/redis/dynamic_timeout_test.rb @@ -268,16 +268,62 @@ def test_single_test_marked_lost_after_default_timeout reserved_id = worker1.send(:try_to_reserve_test) assert_equal 'TestA#test_1', reserved_id - # Wait longer than timeout (0.5s) + # Wait longer than timeout (0.5s) but less than 2 minutes sleep 0.6 - # Try to reserve with worker2 - should get the lost test + # Try to reserve with worker2 - should NOT get the test because heartbeat is recent (< 2 minutes) worker2_config = config.dup worker2_config.instance_variable_set(:@worker_id, '2') worker2 = CI::Queue::Redis.new(@redis_url, worker2_config) lost_test = worker2.send(:try_to_reserve_lost_test) - assert_equal 'TestA#test_1', lost_test, 'Single test should be marked as lost after default timeout' + assert_nil lost_test, 'Test should not be marked as lost if heartbeat is recent (< 2 minutes)' + end + + def test_single_test_marked_lost_after_heartbeat_expires + # Create worker with short timeout + config = CI::Queue::Configuration.new( + build_id: 'heartbeat-timeout-test', + worker_id: '1', + timeout: 0.5 # 0.5 seconds + ) + + worker1 = CI::Queue::Redis.new(@redis_url, config) + + # Populate with single test (no chunk) + tests = create_mock_tests(['TestA#test_1']) + + worker1.stub(:reorder_tests, tests) do + worker1.populate(tests) + end + + # Reserve the test with worker1 + reserved_id = worker1.send(:try_to_reserve_test) + assert_equal 'TestA#test_1', reserved_id + + # Manually set the heartbeat timestamp to be older than 2 minutes + # by manipulating Redis directly + # Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time" + current_time = CI::Queue.time_now.to_f + old_heartbeat_time = current_time - 130 # 130 seconds ago (more than 2 minutes) + initial_reservation_time = current_time - 140 # Initial reservation was before the heartbeat + worker_queue_key = "build:heartbeat-timeout-test:worker:1:queue" + owner_value = "#{worker_queue_key}|#{initial_reservation_time}|#{old_heartbeat_time}" + @redis.hset('build:heartbeat-timeout-test:owners', 'TestA#test_1', owner_value) + + # Also set the deadline to be in the past so it's considered "lost" + @redis.zadd('build:heartbeat-timeout-test:running', current_time - 10, 'TestA#test_1') + + # Wait a bit to ensure time has passed + sleep 0.1 + + # Try to reserve with worker2 - should get the lost test now + worker2_config = config.dup + worker2_config.instance_variable_set(:@worker_id, '2') + worker2 = CI::Queue::Redis.new(@redis_url, worker2_config) + + lost_test = worker2.send(:try_to_reserve_lost_test) + assert_equal 'TestA#test_1', lost_test, 'Test should be marked as lost after heartbeat expires (> 2 minutes)' end def test_batching_with_many_chunks diff --git a/ruby/test/ci/queue/redis/worker_chunk_test.rb b/ruby/test/ci/queue/redis/worker_chunk_test.rb index e0b2592..d02de07 100644 --- a/ruby/test/ci/queue/redis/worker_chunk_test.rb +++ b/ruby/test/ci/queue/redis/worker_chunk_test.rb @@ -161,9 +161,11 @@ def test_resolved_chunk_detects_flaky_tests def test_acknowledge_chunk # Set up a chunk as if it were reserved (in running zset) + # Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time" chunk_id = 'TestA:chunk_0' - @redis.zadd('build:42:running', Time.now.to_i, chunk_id) - @redis.hset('build:42:owners', chunk_id, 'build:42:worker:1:queue') + current_time = Time.now.to_f + @redis.zadd('build:42:running', current_time.to_i, chunk_id) + @redis.hset('build:42:owners', chunk_id, "build:42:worker:1:queue|#{current_time}|#{current_time}") @worker.instance_variable_set(:@reserved_test, chunk_id) # Acknowledge the chunk diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index 8129549..cbac126 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true + require 'test_helper' class CI::Queue::RedisTest < Minitest::Test @@ -28,7 +29,7 @@ def test_requeue # redefine the shared one previous_offset = CI::Queue::Redis.requeue_offset CI::Queue::Redis.requeue_offset = 2 failed_once = false - test_order = poll(@queue, ->(test) { + test_order = poll(@queue, lambda { |test| if test == shuffled_test_list.last && !failed_once failed_once = true false @@ -158,6 +159,7 @@ def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker poll(@queue, false) do break if acquired + acquired = true monitor.synchronize do condition.signal @@ -188,6 +190,7 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker @queue.poll do |test| break if acquired + acquired = true monitor.synchronize do condition.signal @@ -200,31 +203,29 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker end def test_workers_register - assert_equal 1, @redis.scard(('build:42:workers')) + assert_equal 1, @redis.scard('build:42:workers') worker(2) - assert_equal 2, @redis.scard(('build:42:workers')) + assert_equal 2, @redis.scard('build:42:workers') end def test_timeout_warning - begin - threads = 2.times.map do |i| - Thread.new do - queue = worker(i, tests: [TEST_LIST.first], build_id: '24') - queue.poll do |test| - sleep 1 # timeout - queue.acknowledge(test) - end + threads = 2.times.map do |i| + Thread.new do + queue = worker(i, tests: [TEST_LIST.first], build_id: '24') + queue.poll do |test| + sleep 1 # timeout + queue.acknowledge(test) end end + end - threads.each { |t| t.join(3) } - threads.each { |t| refute_predicate t, :alive? } + threads.each { |t| t.join(3) } + threads.each { |t| refute_predicate t, :alive? } - queue = worker(12, build_id: '24') - assert_equal [[:RESERVED_LOST_TEST, {test: 'ATest#test_foo', timeout: 0.2}]], queue.build.pop_warnings - ensure - threads.each(&:kill) - end + queue = worker(12, build_id: '24') + assert_equal [[:RESERVED_LOST_TEST, { test: 'ATest#test_foo', timeout: 0.2 }]], queue.build.pop_warnings + ensure + threads.each(&:kill) end def test_continuously_timing_out_tests @@ -262,9 +263,9 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker tests = (1..10).map { |i| MockTest.new("ChunkSuite#test_#{i}") } worker1 = worker(1, tests: tests, build_id: '100', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) worker2 = worker(2, tests: tests, build_id: '100', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0, populate: false) + suite_max_duration: 120_000, timing_fallback_duration: 100.0, populate: false) acquired = false worker2_tried = false @@ -300,7 +301,7 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker worker1.poll do |test| reserved_test = test refute_nil reserved_test - assert reserved_test.respond_to?(:chunk?) && reserved_test.chunk?, "Expected a chunk to be reserved" + assert reserved_test.respond_to?(:chunk?) && reserved_test.chunk?, 'Expected a chunk to be reserved' # Signal worker2 to try stealing, then wait for it to finish trying acquired = true @@ -314,7 +315,7 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker break end - refute worker2_got_test, "Worker 2 should not steal chunk before dynamic timeout expires" + refute worker2_got_test, 'Worker 2 should not steal chunk before dynamic timeout expires' end def test_chunk_with_dynamic_timeout_picked_up_after_timeout @@ -323,9 +324,9 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout tests = (1..5).map { |i| MockTest.new("TimeoutSuite#test_#{i}") } worker1 = worker(1, tests: tests, build_id: '101', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) worker2 = worker(2, tests: tests, build_id: '101', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) acquired = false done = false @@ -334,13 +335,11 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout monitor = Monitor.new condition = monitor.new_cond - # Worker 2 thread: waits for worker1 to acquire, then waits for timeout and steals + # Worker 2 thread: waits for worker1 to acquire, then steals after heartbeat is set to old Thread.start do monitor.synchronize do condition.wait_until { acquired } - # Wait longer than dynamic timeout (1.2s > 1.0s) - sleep 1.2 - # Now poll - should successfully steal the timed-out chunk + # Now poll - should successfully steal the chunk (heartbeat was set to old) worker2.poll do |test| stolen_test = test worker2.acknowledge(test) @@ -355,9 +354,18 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout worker1.poll do |test| reserved_test = test refute_nil reserved_test - assert reserved_test.respond_to?(:chunk?) && reserved_test.chunk?, "Expected a chunk to be reserved" - - # Signal worker2 to start waiting, then wait for it to steal the chunk + assert reserved_test.respond_to?(:chunk?) && reserved_test.chunk?, 'Expected a chunk to be reserved' + + # Set heartbeat to be old (> 2 minutes) so worker2 can steal it + # Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time" + current_time = CI::Queue.time_now.to_f + old_heartbeat_time = current_time - 130 # 130 seconds ago (more than 2 minutes) + initial_reservation_time = current_time - 140 + worker_queue_key = 'build:101:worker:1:queue' + owner_value = "#{worker_queue_key}|#{initial_reservation_time}|#{old_heartbeat_time}" + @redis.hset('build:101:owners', reserved_test.id, owner_value) + + # Signal worker2 to start stealing acquired = true monitor.synchronize do condition.signal @@ -366,7 +374,7 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout break end - refute_nil stolen_test, "Worker 2 should pick up chunk after dynamic timeout expires" + refute_nil stolen_test, 'Worker 2 should pick up chunk after heartbeat expires' assert_equal reserved_test.id, stolen_test.id # Verify the RESERVED_LOST_TEST warning was recorded @@ -376,14 +384,14 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout end def test_individual_test_uses_default_timeout_after_requeue - # Test that individual tests (not in chunks) use the default timeout + # Test that individual tests (not in chunks) can be stolen after heartbeat expires @redis.flushdb # Create individual tests from different suites (won't be chunked together) tests = [ - MockTest.new("SuiteA#test_1"), - MockTest.new("SuiteB#test_1"), - MockTest.new("SuiteC#test_1") + MockTest.new('SuiteA#test_1'), + MockTest.new('SuiteB#test_1'), + MockTest.new('SuiteC#test_1') ] worker1 = worker(1, tests: tests, build_id: '102', timeout: 0.2) @@ -396,13 +404,11 @@ def test_individual_test_uses_default_timeout_after_requeue monitor = Monitor.new condition = monitor.new_cond - # Worker 2 thread: waits for worker1 to acquire, then waits for default timeout and steals + # Worker 2 thread: waits for worker1 to acquire, then steals after heartbeat is set to old Thread.start do monitor.synchronize do condition.wait_until { acquired } - # Wait for default timeout (0.3s > 0.2s default) - sleep 0.3 - # Now poll - should successfully steal the timed-out test + # Now poll - should successfully steal the test (heartbeat was set to old) worker2.poll do |test| stolen_test = test worker2.acknowledge(test) @@ -417,9 +423,18 @@ def test_individual_test_uses_default_timeout_after_requeue worker1.poll do |test| reserved_test = test refute_nil reserved_test - refute (reserved_test.respond_to?(:chunk?) && reserved_test.chunk?), "Expected an individual test, not a chunk" - - # Signal worker2 to start waiting, then wait for it to steal the test + refute (reserved_test.respond_to?(:chunk?) && reserved_test.chunk?), 'Expected an individual test, not a chunk' + + # Set heartbeat to be old (> 2 minutes) so worker2 can steal it + # Format: "worker_queue_key|initial_reservation_time|last_heartbeat_time" + current_time = CI::Queue.time_now.to_f + old_heartbeat_time = current_time - 130 # 130 seconds ago (more than 2 minutes) + initial_reservation_time = current_time - 140 + worker_queue_key = 'build:102:worker:1:queue' + owner_value = "#{worker_queue_key}|#{initial_reservation_time}|#{old_heartbeat_time}" + @redis.hset('build:102:owners', reserved_test.id, owner_value) + + # Signal worker2 to start stealing acquired = true monitor.synchronize do condition.signal @@ -428,7 +443,7 @@ def test_individual_test_uses_default_timeout_after_requeue break end - refute_nil stolen_test, "Worker 2 should steal individual test after default timeout" + refute_nil stolen_test, 'Worker 2 should steal individual test after heartbeat expires' assert_equal reserved_test.id, stolen_test.id end @@ -436,7 +451,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration @redis.flushdb updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["TestSuite#test_1", 5000.0], ["TestSuite#test_2", 3000.0]]) + updater.update_batch([['TestSuite#test_1', 5000.0], ['TestSuite#test_2', 3000.0]]) tests = [ MockTest.new('TestSuite#test_1'), @@ -444,7 +459,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration ] worker = worker(1, tests: tests, build_id: '200', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) chunks = [] worker.poll do |chunk| @@ -454,7 +469,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration assert_equal 1, chunks.size chunk = chunks.first - assert chunk.chunk?, "Expected a chunk" + assert chunk.chunk?, 'Expected a chunk' assert_equal 'TestSuite:chunk_0', chunk.id assert_equal 8000.0, chunk.estimated_duration end @@ -466,7 +481,7 @@ def test_moving_average_takes_precedence_over_timing_file timing_data = { 'TestSuite#test_1' => 10_000.0 } updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["TestSuite#test_1", 2000.0]]) + updater.update_batch([['TestSuite#test_1', 2000.0]]) tests = [MockTest.new('TestSuite#test_1')] @@ -475,8 +490,8 @@ def test_moving_average_takes_precedence_over_timing_file file.close worker = worker(1, tests: tests, build_id: '201', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0, - timing_file: file.path) + suite_max_duration: 120_000, timing_fallback_duration: 100.0, + timing_file: file.path) chunks = [] worker.poll do |chunk| @@ -501,8 +516,8 @@ def test_falls_back_to_timing_file_when_no_moving_average file.close worker = worker(1, tests: tests, build_id: '202', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0, - timing_file: file.path) + suite_max_duration: 120_000, timing_fallback_duration: 100.0, + timing_file: file.path) chunks = [] worker.poll do |chunk| @@ -521,7 +536,7 @@ def test_falls_back_to_default_when_no_moving_average_or_timing_data tests = [MockTest.new('UnknownTest#test_1')] worker = worker(1, tests: tests, build_id: '203', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 500.0) + suite_max_duration: 120_000, timing_fallback_duration: 500.0) chunks = [] worker.poll do |chunk| @@ -538,7 +553,7 @@ def test_mixed_duration_sources_in_suite_splitting require 'tempfile' updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["MixedTest#test_1", 60_000.0], ["MixedTest#test_2", 50_000.0]]) + updater.update_batch([['MixedTest#test_1', 60_000.0], ['MixedTest#test_2', 50_000.0]]) timing_data = { 'MixedTest#test_3' => 40_000.0, @@ -557,8 +572,8 @@ def test_mixed_duration_sources_in_suite_splitting file.close worker = worker(1, tests: tests, build_id: '204', strategy: :suite_bin_packing, - suite_max_duration: 120_000, suite_buffer_percent: 10, - timing_fallback_duration: 100.0, timing_file: file.path) + suite_max_duration: 120_000, suite_buffer_percent: 10, + timing_fallback_duration: 100.0, timing_file: file.path) chunks = [] worker.poll do |chunk| @@ -571,7 +586,7 @@ def test_mixed_duration_sources_in_suite_splitting effective_max = 120_000 * (1 - 10 / 100.0) chunks.each do |chunk| assert chunk.estimated_duration <= effective_max, - "Chunk duration #{chunk.estimated_duration} exceeds effective max #{effective_max}" + "Chunk duration #{chunk.estimated_duration} exceeds effective max #{effective_max}" end end end @@ -580,7 +595,7 @@ def test_moving_average_ordering_by_duration @redis.flushdb updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["FastTest#test_1", 1000.0], ["SlowTest#test_1", 10_000.0], ["MediumTest#test_1", 5000.0]]) + updater.update_batch([['FastTest#test_1', 1000.0], ['SlowTest#test_1', 10_000.0], ['MediumTest#test_1', 5000.0]]) tests = [ MockTest.new('FastTest#test_1'), @@ -589,7 +604,7 @@ def test_moving_average_ordering_by_duration ] worker = worker(1, tests: tests, build_id: '205', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) chunks = [] worker.poll do |chunk| @@ -612,7 +627,7 @@ def test_moving_average_with_partial_coverage # Only one test has moving average data updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["PartialTest#test_1", 3000.0]]) + updater.update_batch([['PartialTest#test_1', 3000.0]]) tests = [ MockTest.new('PartialTest#test_1'), @@ -621,7 +636,7 @@ def test_moving_average_with_partial_coverage ] worker = worker(1, tests: tests, build_id: '206', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 500.0) + suite_max_duration: 120_000, timing_fallback_duration: 500.0) chunks = [] worker.poll do |chunk| @@ -638,12 +653,12 @@ def test_moving_average_updates_persist_across_workers # Manually update moving average as if a previous worker completed the test updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["PersistTest#test_1", 5500.0]]) + updater.update_batch([['PersistTest#test_1', 5500.0]]) # New worker should see the persisted moving average tests = [MockTest.new('PersistTest#test_1')] worker1 = worker(1, tests: tests, build_id: '207', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 1000.0) + suite_max_duration: 120_000, timing_fallback_duration: 1000.0) chunks = [] worker1.poll do |chunk| @@ -700,13 +715,11 @@ def worker(id, **args) worker_id: id.to_s, timeout: 0.2, timing_redis_url: @redis_url, - **args, + **args ) ) - if skip_populate - return queue - else - populate(queue, tests: tests) - end + return queue if skip_populate + + populate(queue, tests: tests) end end