From e56ba8a99c8f8dcd6aa99d752b017ec44360fc17 Mon Sep 17 00:00:00 2001 From: Lihao He Date: Tue, 2 Dec 2025 17:45:34 -0800 Subject: [PATCH] [CI] heart beating ongoing chunk --- redis/heartbeat.lua | 29 ++- redis/reserve_lost.lua | 52 ++++- ruby/lib/ci/queue/configuration.rb | 7 +- ruby/lib/ci/queue/redis/worker.rb | 86 ++++++- ruby/test/ci/queue/.DS_Store | Bin 0 -> 8196 bytes .../ci/queue/redis/dynamic_timeout_test.rb | 6 +- ruby/test/ci/queue/redis_test.rb | 212 +++++++++++++----- 7 files changed, 317 insertions(+), 75 deletions(-) create mode 100644 ruby/test/ci/queue/.DS_Store diff --git a/redis/heartbeat.lua b/redis/heartbeat.lua index 6c9c1e9d..0b03f448 100644 --- a/redis/heartbeat.lua +++ b/redis/heartbeat.lua @@ -2,16 +2,39 @@ local zset_key = KEYS[1] local processed_key = KEYS[2] local owners_key = KEYS[3] local worker_queue_key = KEYS[4] +local heartbeats_key = KEYS[5] +local test_group_timeout_key = KEYS[6] -local current_time = ARGV[1] +local current_time = tonumber(ARGV[1]) local test = ARGV[2] +local default_timeout = tonumber(ARGV[3]) or 0 -- already processed, we do not need to bump the timestamp if redis.call('sismember', processed_key, test) == 1 then - return false + return nil end -- we're still the owner of the test, we can bump the timestamp if redis.call('hget', owners_key, test) == worker_queue_key then - return redis.call('zadd', zset_key, current_time, test) + -- Record last heartbeat time in a separate hash for "recent activity" tracking + redis.call('hset', heartbeats_key, test, current_time) + + -- Get the dynamic timeout for this test (if any) or use default + local dynamic_timeout = redis.call('hget', test_group_timeout_key, test) + local timeout_to_use + if dynamic_timeout and dynamic_timeout ~= "" then + timeout_to_use = tonumber(dynamic_timeout) + else + timeout_to_use = default_timeout + end + + local new_deadline = current_time + timeout_to_use + + -- Extend the deadline by setting score to current_time + timeout + redis.call('zadd', zset_key, new_deadline, test) + + -- Return the new deadline and timeout used for logging + return { new_deadline, timeout_to_use } end + +return nil diff --git a/redis/reserve_lost.lua b/redis/reserve_lost.lua index 1f178e93..be384bd7 100644 --- a/redis/reserve_lost.lua +++ b/redis/reserve_lost.lua @@ -3,11 +3,13 @@ local processed_key = KEYS[2] local worker_queue_key = KEYS[3] local owners_key = KEYS[4] local test_group_timeout_key = KEYS[5] +local heartbeats_key = KEYS[6] local current_time = tonumber(ARGV[1]) local timeout = tonumber(ARGV[2]) local use_dynamic_deadline = ARGV[3] == "true" local default_timeout = tonumber(ARGV[4]) or 0 +local heartbeat_grace_period = tonumber(ARGV[5]) or 30 local lost_tests if use_dynamic_deadline then @@ -18,20 +20,50 @@ end for _, test in ipairs(lost_tests) do if redis.call('sismember', processed_key, test) == 0 then - if use_dynamic_deadline then - local dynamic_timeout = redis.call('hget', test_group_timeout_key, test) - if not dynamic_timeout or dynamic_timeout == "" then - dynamic_timeout = default_timeout + -- Check if the owner is still actively heartbeating + local last_heartbeat = redis.call('hget', heartbeats_key, test) + if last_heartbeat and last_heartbeat ~= "" then + local heartbeat_age = current_time - tonumber(last_heartbeat) + -- If heartbeated recently (within grace period), skip this test + -- The owner is still actively working on it + if heartbeat_age < heartbeat_grace_period then + -- Skip this test, try the next one + -- Don't claim it since the worker is still alive else - dynamic_timeout = tonumber(dynamic_timeout) + -- Heartbeat is stale, safe to claim + if use_dynamic_deadline then + local dynamic_timeout = redis.call('hget', test_group_timeout_key, test) + if not dynamic_timeout or dynamic_timeout == "" then + dynamic_timeout = default_timeout + else + dynamic_timeout = tonumber(dynamic_timeout) + end + redis.call('zadd', zset_key, current_time + dynamic_timeout, test) + else + redis.call('zadd', zset_key, current_time + timeout, test) + end + redis.call('lpush', worker_queue_key, test) + redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership + redis.call('hdel', heartbeats_key, test) -- Clear stale heartbeat + return test end - redis.call('zadd', zset_key, current_time + dynamic_timeout, test) else - redis.call('zadd', zset_key, current_time + timeout, test) + -- No heartbeat record, proceed with claiming (legacy behavior or never heartbeated) + if use_dynamic_deadline then + local dynamic_timeout = redis.call('hget', test_group_timeout_key, test) + if not dynamic_timeout or dynamic_timeout == "" then + dynamic_timeout = default_timeout + else + dynamic_timeout = tonumber(dynamic_timeout) + end + redis.call('zadd', zset_key, current_time + dynamic_timeout, test) + else + redis.call('zadd', zset_key, current_time + timeout, test) + end + redis.call('lpush', worker_queue_key, test) + redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership + return test end - redis.call('lpush', worker_queue_key, test) - redis.call('hset', owners_key, test, worker_queue_key) -- Take ownership - return test end end diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 30c93975..f79d8144 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -13,6 +13,7 @@ class Configuration attr_accessor :branch attr_accessor :timing_redis_url attr_accessor :write_duration_averages + attr_accessor :heartbeat_grace_period, :heartbeat_interval attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout @@ -61,7 +62,9 @@ def initialize( strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil, suite_max_duration: 120_000, suite_buffer_percent: 10, branch: nil, - timing_redis_url: nil + timing_redis_url: nil, + heartbeat_grace_period: 30, + heartbeat_interval: 10 ) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @@ -96,6 +99,8 @@ def initialize( @branch = branch @timing_redis_url = timing_redis_url @write_duration_averages = false + @heartbeat_grace_period = heartbeat_grace_period + @heartbeat_interval = heartbeat_interval end def queue_init_timeout diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 284d4017..6dbfa9d8 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -87,7 +87,9 @@ def poll executable = resolve_executable(id) if executable - yield executable + with_heartbeat(id) do + yield executable + end else warn("Warning: Could not resolve executable for ID #{id.inspect}. Acknowledging to remove from queue.") acknowledge(id) @@ -209,10 +211,87 @@ def release! nil end + # Send a heartbeat for the currently reserved test to indicate the worker is still active. + # This extends the deadline and prevents other workers from stealing the test. + # Returns true if heartbeat was successful, false if test was already processed or + # we're no longer the owner. + def heartbeat(test_or_id = nil) + test_key = if test_or_id + test_or_id.respond_to?(:id) ? test_or_id.id : test_or_id + else + @reserved_test + end + return false unless test_key + + current_time = CI::Queue.time_now.to_f + result = eval_script( + :heartbeat, + keys: [ + key('running'), + key('processed'), + key('owners'), + key('worker', worker_id, 'queue'), + key('heartbeats'), + key('test-group-timeout') + ], + argv: [current_time, test_key, config.timeout] + ) + + if result.is_a?(Array) && result.length == 2 + new_deadline, timeout_used = result + current_time_readable = Time.at(current_time).strftime('%Y-%m-%d %H:%M:%S') + deadline_readable = Time.at(new_deadline).strftime('%Y-%m-%d %H:%M:%S') + warn "[heartbeat] test=#{test_key} current_time=#{current_time_readable} extended_deadline=#{deadline_readable} timeout=#{timeout_used}s" + true + else + false + end + rescue *CONNECTION_ERRORS + false + end + private attr_reader :index + # Runs a block while sending periodic heartbeats in a background thread. + # This prevents other workers from stealing the test while it's being executed. + def with_heartbeat(test_id) + return yield unless config.heartbeat_interval&.positive? + + # Pre-initialize Redis connection and script in current thread context + # This ensures background threads use the same initialized connection + ensure_connection_and_script(:heartbeat) + + stop_heartbeat = false + heartbeat_thread = Thread.new do + until stop_heartbeat + sleep(config.heartbeat_interval) + break if stop_heartbeat + + begin + heartbeat(test_id) + rescue StandardError => e + warn("[heartbeat] Failed to send heartbeat for #{test_id}: #{e.message}") + end + end + end + + yield + ensure + stop_heartbeat = true + heartbeat_thread&.kill + heartbeat_thread&.join(1) # Wait up to 1 second for thread to finish + end + + def ensure_connection_and_script(script) + # Pre-initialize Redis connection and script in current thread context + # This ensures background threads use the same initialized connection + load_script(script) + # Ping Redis to ensure connection is established + redis.ping + end + def worker_id config.worker_id end @@ -295,9 +374,10 @@ def try_to_reserve_lost_test key('completed'), key('worker', worker_id, 'queue'), key('owners'), - key('test-group-timeout') + key('test-group-timeout'), + key('heartbeats') ], - argv: [current_time, timeout, 'true', config.timeout] + argv: [current_time, timeout, 'true', config.timeout, config.heartbeat_grace_period] ) if lost_test diff --git a/ruby/test/ci/queue/.DS_Store b/ruby/test/ci/queue/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..82d28b9de6f54a506381aea883e8493aa906ec1e GIT binary patch literal 8196 zcmeHMTWl0n7(U;$z!`?NQ?Bix%gQ1KT3`#vRbYGJekpVdy#ec920AjGsXMc~g$B|G zBPzxhOuR%Bm3WB{K6p!r4<^P74`ReLBqYWcBbfN24?ggu|IC>s-2#aZ#t@v7%=zd4 z&-u^H|DErj*?*QXhPFbkg|P<47^lONszTiznm6tpw>2rSRFeexGv;JU&Tz)GGq=qR z4Iu&{0wDq+0wDq+0`~?2^v?Dgzsq}HjD~%PK#0Kq5&`e|5T(PDi9n7C=|4KC^Gg7d zUJ2kgI;T3o_lX8F5y&wieOFpjR1XMT5tJAZ?ld0trjtwra!g3!&LG?wf-57aP~cyk z{G$GJhJ=t|A0iMUa61BC`b=SYrn4L~D(d%(nPJm#+<5#dq^jyEQ>XE1T;WHOgLWzD zrrmgQJp03Q$dV2emfnFnPTS?=To>J=^ifSjR`}2B{qUp)nW?{&3 zQfhrRZDtE*+SOZ4)0<4TnckGnVb`!N=a8OvNcOt=i0cjaJ9ge}_q|i9T5$9(ot%xL z-a$(^{ppeJU}7v07g5L`)Xa^<9%*h_voX=JbMJxi+B&s<<}5|&C!{RbIA-YigC!&9 z4)&#;tZwBD>v&JmaEy$pcU$Qpy&h*|d3;_&W34t{d$2q-*@dey-do5RPZkXKm2&=w7S$GWKH%Bg^E&mytSr%*WHrsZ9ST$|v0RDEsv~!w zplueetCUB%nv)Nj>5Og)(>i4XS9i-jMS+ztZc?`JdNj+uS$k-R;&g;LssI-mwrM-KdWrWtgXz59St6bZk8Ro>&iySUyXUkuXSjA=020;qbKd7I*P6@G zO0^A*^SY9nMtfseMT1yPn}PhJ-C)16KiEz77i3Jw4AfvQ79fg+h#`*k*no}L zj{P`*B)ZXu46@L13@6~CfD%S<250dMp2Inu$7^^UZ{SV5kB{&%KEao`jPLM0uHZ-f zg6p_}o023|N!3!VG*4P8HA~B+<ErE(@6u8ddeC4IuffifxhxF?;x zVqvkT8+q^*>mPDShQsMs$|2CA^GRaRG1R9lVS8a1meN8(f-1(A|{;#g$5e7L$3)w5{W0EmOME zA=D-!P$iTGum0b9_wWA`UBdGafe?ZF3;|SkBs(test) { + test_order = poll(@queue, lambda { |test| if test == shuffled_test_list.last && !failed_once failed_once = true false @@ -158,6 +159,7 @@ def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker poll(@queue, false) do break if acquired + acquired = true monitor.synchronize do condition.signal @@ -188,6 +190,7 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker @queue.poll do |test| break if acquired + acquired = true monitor.synchronize do condition.signal @@ -200,31 +203,29 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker end def test_workers_register - assert_equal 1, @redis.scard(('build:42:workers')) + assert_equal 1, @redis.scard('build:42:workers') worker(2) - assert_equal 2, @redis.scard(('build:42:workers')) + assert_equal 2, @redis.scard('build:42:workers') end def test_timeout_warning - begin - threads = 2.times.map do |i| - Thread.new do - queue = worker(i, tests: [TEST_LIST.first], build_id: '24') - queue.poll do |test| - sleep 1 # timeout - queue.acknowledge(test) - end + threads = 2.times.map do |i| + Thread.new do + queue = worker(i, tests: [TEST_LIST.first], build_id: '24') + queue.poll do |test| + sleep 1 # timeout + queue.acknowledge(test) end end + end - threads.each { |t| t.join(3) } - threads.each { |t| refute_predicate t, :alive? } + threads.each { |t| t.join(3) } + threads.each { |t| refute_predicate t, :alive? } - queue = worker(12, build_id: '24') - assert_equal [[:RESERVED_LOST_TEST, {test: 'ATest#test_foo', timeout: 0.2}]], queue.build.pop_warnings - ensure - threads.each(&:kill) - end + queue = worker(12, build_id: '24') + assert_equal [[:RESERVED_LOST_TEST, { test: 'ATest#test_foo', timeout: 0.2 }]], queue.build.pop_warnings + ensure + threads.each(&:kill) end def test_continuously_timing_out_tests @@ -262,9 +263,9 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker tests = (1..10).map { |i| MockTest.new("ChunkSuite#test_#{i}") } worker1 = worker(1, tests: tests, build_id: '100', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) worker2 = worker(2, tests: tests, build_id: '100', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0, populate: false) + suite_max_duration: 120_000, timing_fallback_duration: 100.0, populate: false) acquired = false worker2_tried = false @@ -300,7 +301,7 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker worker1.poll do |test| reserved_test = test refute_nil reserved_test - assert reserved_test.respond_to?(:chunk?) && reserved_test.chunk?, "Expected a chunk to be reserved" + assert reserved_test.respond_to?(:chunk?) && reserved_test.chunk?, 'Expected a chunk to be reserved' # Signal worker2 to try stealing, then wait for it to finish trying acquired = true @@ -314,7 +315,7 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker break end - refute worker2_got_test, "Worker 2 should not steal chunk before dynamic timeout expires" + refute worker2_got_test, 'Worker 2 should not steal chunk before dynamic timeout expires' end def test_chunk_with_dynamic_timeout_picked_up_after_timeout @@ -323,9 +324,9 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout tests = (1..5).map { |i| MockTest.new("TimeoutSuite#test_#{i}") } worker1 = worker(1, tests: tests, build_id: '101', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) worker2 = worker(2, tests: tests, build_id: '101', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) acquired = false done = false @@ -355,7 +356,7 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout worker1.poll do |test| reserved_test = test refute_nil reserved_test - assert reserved_test.respond_to?(:chunk?) && reserved_test.chunk?, "Expected a chunk to be reserved" + assert reserved_test.respond_to?(:chunk?) && reserved_test.chunk?, 'Expected a chunk to be reserved' # Signal worker2 to start waiting, then wait for it to steal the chunk acquired = true @@ -366,7 +367,7 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout break end - refute_nil stolen_test, "Worker 2 should pick up chunk after dynamic timeout expires" + refute_nil stolen_test, 'Worker 2 should pick up chunk after dynamic timeout expires' assert_equal reserved_test.id, stolen_test.id # Verify the RESERVED_LOST_TEST warning was recorded @@ -381,9 +382,9 @@ def test_individual_test_uses_default_timeout_after_requeue # Create individual tests from different suites (won't be chunked together) tests = [ - MockTest.new("SuiteA#test_1"), - MockTest.new("SuiteB#test_1"), - MockTest.new("SuiteC#test_1") + MockTest.new('SuiteA#test_1'), + MockTest.new('SuiteB#test_1'), + MockTest.new('SuiteC#test_1') ] worker1 = worker(1, tests: tests, build_id: '102', timeout: 0.2) @@ -417,7 +418,7 @@ def test_individual_test_uses_default_timeout_after_requeue worker1.poll do |test| reserved_test = test refute_nil reserved_test - refute (reserved_test.respond_to?(:chunk?) && reserved_test.chunk?), "Expected an individual test, not a chunk" + refute (reserved_test.respond_to?(:chunk?) && reserved_test.chunk?), 'Expected an individual test, not a chunk' # Signal worker2 to start waiting, then wait for it to steal the test acquired = true @@ -428,7 +429,7 @@ def test_individual_test_uses_default_timeout_after_requeue break end - refute_nil stolen_test, "Worker 2 should steal individual test after default timeout" + refute_nil stolen_test, 'Worker 2 should steal individual test after default timeout' assert_equal reserved_test.id, stolen_test.id end @@ -436,7 +437,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration @redis.flushdb updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["TestSuite#test_1", 5000.0], ["TestSuite#test_2", 3000.0]]) + updater.update_batch([['TestSuite#test_1', 5000.0], ['TestSuite#test_2', 3000.0]]) tests = [ MockTest.new('TestSuite#test_1'), @@ -444,7 +445,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration ] worker = worker(1, tests: tests, build_id: '200', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) chunks = [] worker.poll do |chunk| @@ -454,7 +455,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration assert_equal 1, chunks.size chunk = chunks.first - assert chunk.chunk?, "Expected a chunk" + assert chunk.chunk?, 'Expected a chunk' assert_equal 'TestSuite:chunk_0', chunk.id assert_equal 8000.0, chunk.estimated_duration end @@ -466,7 +467,7 @@ def test_moving_average_takes_precedence_over_timing_file timing_data = { 'TestSuite#test_1' => 10_000.0 } updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["TestSuite#test_1", 2000.0]]) + updater.update_batch([['TestSuite#test_1', 2000.0]]) tests = [MockTest.new('TestSuite#test_1')] @@ -475,8 +476,8 @@ def test_moving_average_takes_precedence_over_timing_file file.close worker = worker(1, tests: tests, build_id: '201', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0, - timing_file: file.path) + suite_max_duration: 120_000, timing_fallback_duration: 100.0, + timing_file: file.path) chunks = [] worker.poll do |chunk| @@ -501,8 +502,8 @@ def test_falls_back_to_timing_file_when_no_moving_average file.close worker = worker(1, tests: tests, build_id: '202', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0, - timing_file: file.path) + suite_max_duration: 120_000, timing_fallback_duration: 100.0, + timing_file: file.path) chunks = [] worker.poll do |chunk| @@ -521,7 +522,7 @@ def test_falls_back_to_default_when_no_moving_average_or_timing_data tests = [MockTest.new('UnknownTest#test_1')] worker = worker(1, tests: tests, build_id: '203', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 500.0) + suite_max_duration: 120_000, timing_fallback_duration: 500.0) chunks = [] worker.poll do |chunk| @@ -538,7 +539,7 @@ def test_mixed_duration_sources_in_suite_splitting require 'tempfile' updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["MixedTest#test_1", 60_000.0], ["MixedTest#test_2", 50_000.0]]) + updater.update_batch([['MixedTest#test_1', 60_000.0], ['MixedTest#test_2', 50_000.0]]) timing_data = { 'MixedTest#test_3' => 40_000.0, @@ -557,8 +558,8 @@ def test_mixed_duration_sources_in_suite_splitting file.close worker = worker(1, tests: tests, build_id: '204', strategy: :suite_bin_packing, - suite_max_duration: 120_000, suite_buffer_percent: 10, - timing_fallback_duration: 100.0, timing_file: file.path) + suite_max_duration: 120_000, suite_buffer_percent: 10, + timing_fallback_duration: 100.0, timing_file: file.path) chunks = [] worker.poll do |chunk| @@ -571,7 +572,7 @@ def test_mixed_duration_sources_in_suite_splitting effective_max = 120_000 * (1 - 10 / 100.0) chunks.each do |chunk| assert chunk.estimated_duration <= effective_max, - "Chunk duration #{chunk.estimated_duration} exceeds effective max #{effective_max}" + "Chunk duration #{chunk.estimated_duration} exceeds effective max #{effective_max}" end end end @@ -580,7 +581,7 @@ def test_moving_average_ordering_by_duration @redis.flushdb updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["FastTest#test_1", 1000.0], ["SlowTest#test_1", 10_000.0], ["MediumTest#test_1", 5000.0]]) + updater.update_batch([['FastTest#test_1', 1000.0], ['SlowTest#test_1', 10_000.0], ['MediumTest#test_1', 5000.0]]) tests = [ MockTest.new('FastTest#test_1'), @@ -589,7 +590,7 @@ def test_moving_average_ordering_by_duration ] worker = worker(1, tests: tests, build_id: '205', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0) chunks = [] worker.poll do |chunk| @@ -612,7 +613,7 @@ def test_moving_average_with_partial_coverage # Only one test has moving average data updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["PartialTest#test_1", 3000.0]]) + updater.update_batch([['PartialTest#test_1', 3000.0]]) tests = [ MockTest.new('PartialTest#test_1'), @@ -621,7 +622,7 @@ def test_moving_average_with_partial_coverage ] worker = worker(1, tests: tests, build_id: '206', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 500.0) + suite_max_duration: 120_000, timing_fallback_duration: 500.0) chunks = [] worker.poll do |chunk| @@ -638,12 +639,12 @@ def test_moving_average_updates_persist_across_workers # Manually update moving average as if a previous worker completed the test updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) - updater.update_batch([["PersistTest#test_1", 5500.0]]) + updater.update_batch([['PersistTest#test_1', 5500.0]]) # New worker should see the persisted moving average tests = [MockTest.new('PersistTest#test_1')] worker1 = worker(1, tests: tests, build_id: '207', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 1000.0) + suite_max_duration: 120_000, timing_fallback_duration: 1000.0) chunks = [] worker1.poll do |chunk| @@ -678,6 +679,107 @@ def tests end end + def test_heartbeat_returns_true_for_owned_test + reserved_test = nil + @queue.poll do |test| + reserved_test = test + break + end + refute_nil reserved_test + + # Heartbeat should succeed for currently reserved test + assert @queue.heartbeat(reserved_test), 'Heartbeat should succeed for owned test' + end + + def test_heartbeat_returns_false_for_unowned_test + second_queue = worker(2) + reserved_test = nil + + @queue.poll do |test| + reserved_test = test + break + end + refute_nil reserved_test + + # Second queue should not be able to heartbeat a test it doesn't own + refute second_queue.heartbeat(reserved_test), 'Heartbeat should fail for unowned test' + end + + def test_heartbeat_returns_false_for_processed_test + reserved_test = nil + @queue.poll do |test| + reserved_test = test + @queue.acknowledge(test) + break + end + refute_nil reserved_test + + # Heartbeat should fail for already processed test + # Need a new worker instance since the original one no longer has the test reserved + new_queue = worker(1) + refute new_queue.heartbeat(reserved_test), 'Heartbeat should fail for processed test' + end + + def test_heartbeat_extends_deadline_and_prevents_stealing + # Use a short timeout and short grace period for testing + first_queue = worker(1, tests: [TEST_LIST.first], build_id: '100', timeout: 0.5, heartbeat_grace_period: 0.3) + second_queue = worker(2, tests: [TEST_LIST.first], build_id: '100', timeout: 0.5, heartbeat_grace_period: 0.3) + + reserved_test = nil + stolen = false + + # First worker reserves the test + first_queue.poll do |test| + reserved_test = test + # Wait for deadline to pass + sleep 0.6 + + # Before the deadline passes, send a heartbeat to extend it + assert first_queue.heartbeat(test), 'Heartbeat should succeed' + + # Now try to have second worker steal - should fail because of recent heartbeat + second_queue.poll do |stolen_test| + stolen = true + second_queue.acknowledge(stolen_test) + end + + first_queue.acknowledge(test) + end + + refute_nil reserved_test + refute stolen, 'Second worker should not steal test with recent heartbeat' + end + + def test_stale_heartbeat_allows_stealing + # Use a short timeout and very short grace period for testing + first_queue = worker(1, tests: [TEST_LIST.first], build_id: '101', timeout: 0.3, heartbeat_grace_period: 0.2) + second_queue = worker(2, tests: [TEST_LIST.first], build_id: '101', timeout: 0.3, heartbeat_grace_period: 0.2) + + reserved_test = nil + stolen_test = nil + + # First worker reserves the test + first_queue.poll do |test| + reserved_test = test + + # Send a heartbeat + assert first_queue.heartbeat(test), 'Heartbeat should succeed' + + # Wait for both the heartbeat grace period AND the extended deadline to pass + sleep 0.6 + + # Now second worker should be able to steal since heartbeat is stale + second_queue.poll do |test2| + stolen_test = test2 + second_queue.acknowledge(test2) + end + end + + refute_nil reserved_test + refute_nil stolen_test, 'Second worker should steal test with stale heartbeat' + assert_equal reserved_test.id, stolen_test.id + end + def shuffled_test_list TEST_LIST.sort.shuffle(random: Random.new(0)).freeze end @@ -700,13 +802,11 @@ def worker(id, **args) worker_id: id.to_s, timeout: 0.2, timing_redis_url: @redis_url, - **args, + **args ) ) - if skip_populate - return queue - else - populate(queue, tests: tests) - end + return queue if skip_populate + + populate(queue, tests: tests) end end