From a4fbb861850b37818e87864ec9809bc8f58ae34e Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Thu, 26 Mar 2026 17:54:02 +1100 Subject: [PATCH 1/4] docker: use container naming consistent across projects --- docker-compose.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index ed2a45f..20aa18b 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,7 +1,7 @@ services: sync_init: user: "${UID}:${GID}" - image: git_hg_sync + image: git-hg-sync entrypoint: ["/app/create_clones.sh", "/clones"] environment: # Let the script create .ssh somewhere writable @@ -9,9 +9,9 @@ services: volumes: - .:/app - ./clones:/clones:z - sync: &sync_config + git-hg-sync: &sync_config user: "${UID}:${GID}" - image: git_hg_sync + image: git-hg-sync build: . command: ["--config", "config-docker.toml", "--log-raw-level", "debug"] volumes: From e81aae10affc675a2a4f30f132e72aef33d19942 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Fri, 27 Mar 2026 19:12:38 +1100 Subject: [PATCH 2/4] tests: improve pulse integration tests Add tests to exercise routing_key mismatches, and fix tests for successful delivery that were passing by luck. One key learning here is that each binding needs to be attached to a separate queue, otherwise any worker listening to a given queue may receive all messages matching the bindings. --- docker-compose.yaml | 3 ++ git_hg_sync/__main__.py | 2 +- rabbitmq/definitions.json | 2 +- tests/conftest.py | 36 ++++++++++------ tests/pulse_utils.py | 10 ++++- tests/test_integration.py | 89 +++++++++++++++++++++++++++++++-------- 6 files changed, 110 insertions(+), 32 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 20aa18b..20affbc 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -53,6 +53,9 @@ services: - ./tests_output:/app/tests_output:z profiles: - test + depends_on: + pulse: + condition: service_healthy networks: - pulse_network diff --git a/git_hg_sync/__main__.py b/git_hg_sync/__main__.py index d3140af..cd598d9 100644 --- a/git_hg_sync/__main__.py +++ b/git_hg_sync/__main__.py @@ -36,7 +36,7 @@ def get_connection(config: PulseConfig) -> Connection: ) -def get_queue(config: Config | PulseConfig) -> Queue: +def get_queue(config: PulseConfig) -> Queue: exchange = Exchange(config.exchange, type="topic") return Queue( name=config.queue, diff --git a/rabbitmq/definitions.json b/rabbitmq/definitions.json index f92494b..0d2c6c0 100644 --- a/rabbitmq/definitions.json +++ b/rabbitmq/definitions.json @@ -4,7 +4,7 @@ "arguments": {}, "destination": "queue/guest/test", "destination_type": "queue", - "routing_key": "#", + "routing_key": "default", "source": "exchange/guest/test", "vhost": "/" } diff --git a/tests/conftest.py b/tests/conftest.py index 700a535..cf17cfc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,38 +27,50 @@ def pulse_config() -> PulseConfig: host="pulse", port=5672, exchange="exchange/guest/test", - routing_key="#", + routing_key="default", queue="queue/guest/test", password="guest", heartbeat=30, ssl=False, ) + @pytest.fixture def test_config(pulse_config: PulseConfig) -> Config: return Config( pulse=pulse_config, clones=ClonesConfig(directory=Path("clones")), tracked_repositories=[ - TrackedRepository(name="mozilla-central", url="https://github.com/mozilla-firefox/firefox.git"), + TrackedRepository( + name="mozilla-central", + url="https://github.com/mozilla-firefox/firefox.git", + ), + ], + branch_mappings=[ + BranchMapping( + branch_pattern=".*", + source_url="https://github.com/mozilla-firefox/firefox.git", + destination_url="destination_url", + destination_branch="destination_branch", + ) ], - branch_mappings=[BranchMapping( - branch_pattern = '.*', - source_url = "https://github.com/mozilla-firefox/firefox.git", - destination_url = 'destination_url', - destination_branch = 'destination_branch', - )], ) + @pytest.fixture def get_payload() -> Callable: - - def get_payload(**kwargs: dict) -> dict: + def get_payload( + request: pytest.FixtureRequest | None = None, **kwargs: dict + ) -> dict: """Return a default payload, with override via kwargs.""" + repo_url = "repo.git" + if request: + repo_url = request.node.name + payload = { "type": "push", - "repo_url": "repo.git", - "branches": { "main": 40 * "0"}, + "repo_url": repo_url, + "branches": {"main": 40 * "0"}, "tags": {}, "time": 0, "push_id": 0, diff --git a/tests/pulse_utils.py b/tests/pulse_utils.py index 250e868..4a9dff3 100644 --- a/tests/pulse_utils.py +++ b/tests/pulse_utils.py @@ -13,11 +13,14 @@ def send_pulse_message( pulse_config: PulseConfig, payload: Any, purge: bool = False -) -> None: +) -> tuple[kombu.Connection, kombu.Queue]: """Send a pulse message The routing key will be constructed from the repository URL. The Pulse message will be constructed from the specified payload and sent to the requested exchange. + + This function takes care of declaring and binding a Queue, so it can purge it before + the start of the test. It returns the Connectiond and Queue for use by the caller. """ userid = pulse_config.userid password = pulse_config.password @@ -40,6 +43,9 @@ def send_pulse_message( with connection: ex = kombu.Exchange(exchange, type="topic") + + # Declare the queue prior to sending, so we can purge it of potential spurious + # messages from previous tests. queue = kombu.Queue( name=queue_name, exchange=exchange, @@ -70,6 +76,8 @@ def send_pulse_message( print(f"publishing message to {exchange}") producer.publish(data) + return (connection, queue) + if __name__ == "__main__": config = Config.from_file(HERE.parent / "config.toml") diff --git a/tests/test_integration.py b/tests/test_integration.py index 446ed00..f09c9af 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -22,22 +22,73 @@ HERE = Path(__file__).parent -@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq") -def test_send_and_receive(pulse_config: PulseConfig, get_payload: Callable) -> None: - payload = get_payload() +@pytest.mark.parametrize( + "queue_key", + ( + (""), # Use the default from the config. + ("one"), + ("two.three"), + ), +) +@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ") +def test_send_and_receive( + request: pytest.FixtureRequest, + pulse_config: PulseConfig, + get_payload: Callable, + queue_key: str, +) -> None: + payload = get_payload(request=request) + + if queue_key: + pulse_config.routing_key = queue_key + pulse_config.queue = f"{pulse_config.queue}-{queue_key}" def callback(body: Any, message: kombu.Message) -> None: message.ack() assert body["payload"] == payload - pulse_utils.send_pulse_message(pulse_config, payload, purge=True) - connection = get_connection(pulse_config) - queue = get_queue(pulse_config) + connection, queue = pulse_utils.send_pulse_message( + pulse_config, payload, purge=True + ) + with connection.Consumer(queue, auto_declare=False, callbacks=[callback]): connection.drain_events(timeout=5) -@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq") +@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ") +@pytest.mark.parametrize( + "send_key,queue_key", + ( + ("three", "three.four"), + ("five.six", "five"), + ), +) +def test_send_and_receive_routing_key_mismatch( + pulse_config: PulseConfig, get_payload: Callable, send_key: str, queue_key: str +) -> None: + payload = get_payload() + + def callback(_body: Any, _message: kombu.Message) -> None: + raise AssertionError("No message should be received") + + pulse_config.routing_key = queue_key + # We need to create a unique queue for this binding. + pulse_config.queue = f"{pulse_config.queue}-{send_key}-{queue_key}" + + pulse_config_sender = pulse_config.model_copy() + pulse_config_sender.routing_key = send_key + + connection, queue = _setup_connection_and_queue(pulse_config) + pulse_utils.send_pulse_message(pulse_config_sender, payload, purge=True) + + with ( + connection.Consumer(queue, auto_declare=False, callbacks=[callback]), + pytest.raises(TimeoutError), + ): + connection.drain_events(timeout=5) + + +@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ") def test_full_app( tmp_path: Path, get_payload: Callable, @@ -112,7 +163,7 @@ def test_full_app( assert hg_rev(hg_remote_repo_path, destination_branch) in tag_log -@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq") +@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ") def test_no_duplicated_ack_messages( test_config: Config, get_payload: Callable, @@ -125,10 +176,7 @@ def test_no_duplicated_ack_messages( wait = 30 - connection = get_connection(test_config.pulse) - queue = get_queue(test_config.pulse) - queue(connection).queue_declare() - queue(connection).queue_bind() + connection, queue = _setup_connection_and_queue(test_config.pulse) worker = PulseWorker(connection, queue, one_shot=True) @@ -142,7 +190,7 @@ def test_no_duplicated_ack_messages( callback.assert_called_once() -@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without rabbitMq") +@pytest.mark.skipif(NO_RABBITMQ, reason="This test doesn't work without RabbitMQ") def test_messages_in_order( test_config: Config, get_payload: Callable, @@ -151,10 +199,7 @@ def test_messages_in_order( It may also timeout, which is likely indicative of the same issue. """ - connection = get_connection(test_config.pulse) - queue = get_queue(test_config.pulse) - queue(connection).queue_declare() - queue(connection).queue_bind() + connection, queue = _setup_connection_and_queue(test_config.pulse) worker = PulseWorker(connection, queue, one_shot=False) @@ -184,3 +229,13 @@ def event_handler(event: Event) -> None: worker.run() assert events_log == [0, 0, 1, 1] + + +def _setup_connection_and_queue( + pulse_config: PulseConfig, +) -> tuple[kombu.Connection, kombu.Queue]: + connection = get_connection(pulse_config) + queue = get_queue(pulse_config) + queue(connection).queue_declare() + queue(connection).queue_bind() + return connection, queue From e3c425ca8ef68b89d61d4763ceca88e703c0af27 Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Tue, 21 Apr 2026 16:16:10 +1000 Subject: [PATCH 3/4] fixup! tests: improve pulse integration tests --- tests/data/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data/config.toml b/tests/data/config.toml index fb84554..12241e4 100644 --- a/tests/data/config.toml +++ b/tests/data/config.toml @@ -3,7 +3,7 @@ userid = "guest" host = "pulse" port = 5672 exchange = "exchange/guest/test" -routing_key = "#" +routing_key = "default" queue = "queue/guest/test" password = "guest" heartbeat = 30 From 97b0951c94089042dd3556b281381b237b4bfb4c Mon Sep 17 00:00:00 2001 From: Olivier Mehani Date: Tue, 21 Apr 2026 16:35:14 +1000 Subject: [PATCH 4/4] fixup! fixup! tests: improve pulse integration tests --- tests/test_integration.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_integration.py b/tests/test_integration.py index f09c9af..5cc9ccb 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -47,6 +47,9 @@ def callback(body: Any, message: kombu.Message) -> None: message.ack() assert body["payload"] == payload + # Create queue and bindings prior to sending. + _, _ = _setup_connection_and_queue(pulse_config) + connection, queue = pulse_utils.send_pulse_message( pulse_config, payload, purge=True )