diff --git a/.github/workflows/Lint-and-test.yml b/.github/workflows/Lint-and-test.yml deleted file mode 100644 index d329f10..0000000 --- a/.github/workflows/Lint-and-test.yml +++ /dev/null @@ -1,48 +0,0 @@ -# This workflow will install Python dependencies, run tests and lint with a single version of Python -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python - -name: Python application - -on: - workflow_call: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -jobs: - call-workflow: - uses: ISISComputingGroup/reusable-workflows/.github/workflows/linters.yml@main - with: - compare-branch: origin/main - python-ver: '3.13' - runs-on: 'ubuntu-latest' - tests: - strategy: - matrix: - version: ['3.12', '3.13', '3.14'] - os: ["ubuntu-latest", "windows-latest"] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v6 - - name: Install uv and set the python version - uses: astral-sh/setup-uv@v7 - with: - python-version: ${{ matrix.version }} - - name: Install dependencies - run: uv sync --all-extras --dev - - name: Test with pytest - run: uv run pytest tests - results: - if: ${{ always() }} - runs-on: ubuntu-latest - name: Final Results - needs: [tests, call-workflow] - steps: - - run: exit 1 - # see https://stackoverflow.com/a/67532120/4907315 - if: >- - ${{ - contains(needs.*.result, 'failure') - || contains(needs.*.result, 'cancelled') - }} diff --git a/.github/workflows/build-nightly.yml b/.github/workflows/build-nightly.yml new file mode 100644 index 0000000..08af7e1 --- /dev/null +++ b/.github/workflows/build-nightly.yml @@ -0,0 +1,12 @@ +name: Nightly build +on: + schedule: + - cron: "16 3 * * *" + workflow_dispatch: + +permissions: + contents: write + +jobs: + nightly-build: + uses: ./.github/workflows/build.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..4bb9e0b --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,70 @@ +name: Build +on: [push, workflow_call] + +permissions: + contents: write + +jobs: + tests: + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ "ubuntu-latest", "windows-latest" ] + fail-fast: false + steps: + - uses: actions/checkout@v5 + with: + submodules: 'recursive' + - name: install stable rust + run: rustup install stable + - uses: Swatinem/rust-cache@v2 + with: + key: ${{ matrix.os }} + - name: install curl-dev + if: ${{ matrix.os == 'ubuntu-latest' }} + run: sudo apt-get install -y libcurl4-openssl-dev + - name: Unit tests + run: cargo test + - name: Unit tests (release mode) + run: cargo test --release + - name: Build (debug) + run: cargo build + - name: Build (release) + run: cargo build --release + - name: Format check + run: cargo fmt --check + - name: Clippy + run: cargo clippy -- --deny warnings + - name: Upload artifacts (debug) + uses: actions/upload-artifact@v4 + with: + name: ${{ format('debug-{0}', matrix.os) }} + path: | + target/debug/saluki + target/debug/saluki.exe + target/debug/saluki.pdb + if-no-files-found: error + retention-days: 1 + - name: Upload artifacts (release) + uses: actions/upload-artifact@v4 + with: + name: ${{ format('release-{0}', matrix.os) }} + path: | + target/release/saluki + target/release/saluki.exe + target/release/saluki.pdb + if-no-files-found: error + retention-days: 1 + results: + if: ${{ always() }} + runs-on: ubuntu-latest + name: Final Results + needs: [tests] + steps: + - run: exit 1 + # see https://stackoverflow.com/a/67532120/4907315 + if: >- + ${{ + contains(needs.*.result, 'failure') + || contains(needs.*.result, 'cancelled') + }} diff --git a/.github/workflows/container_upload.yml b/.github/workflows/container_upload.yml new file mode 100644 index 0000000..214287e --- /dev/null +++ b/.github/workflows/container_upload.yml @@ -0,0 +1,59 @@ +#name: Create and publish a Docker image + +# Configures this workflow to run every time a change is pushed to the branch called `release`. +on: + push: + branches: ['main'] + +# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds. +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +# There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu. +jobs: + build-and-push-image: + runs-on: ubuntu-latest + # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job. + permissions: + contents: read + packages: write + attestations: write + id-token: write + # + steps: + - name: Checkout repository + uses: actions/checkout@v4 + # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels. + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. + # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see [Usage](https://github.com/docker/build-push-action#usage) in the README of the `docker/build-push-action` repository. + # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. + - name: Build and push Docker image + id: push + uses: docker/build-push-action@v6 + with: + context: . + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + file: "Containerfile" + + # This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see [AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds). + - name: Generate artifact attestation + uses: actions/attest-build-provenance@v2 + with: + subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}} + subject-digest: ${{ steps.push.outputs.digest }} + push-to-registry: true diff --git a/.github/workflows/lint-and-test-nightly.yml b/.github/workflows/lint-and-test-nightly.yml deleted file mode 100644 index c52f1da..0000000 --- a/.github/workflows/lint-and-test-nightly.yml +++ /dev/null @@ -1,10 +0,0 @@ -name: lint-and-test-nightly -on: - workflow_dispatch: - schedule: - - cron: "0 0 * * *" - - -jobs: - lint-and-test-nightly: - uses: ./.github/workflows/Lint-and-test.yml diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml deleted file mode 100644 index b904c8a..0000000 --- a/.github/workflows/python-publish.yml +++ /dev/null @@ -1,92 +0,0 @@ -name: Publish Python distribution to PyPI -on: push -jobs: - lint-and-test: - if: github.ref_type == 'tag' - name: Run linter and tests - uses: ./.github/workflows/Lint-and-test.yml - build: - needs: lint-and-test - if: github.ref_type == 'tag' - name: build distribution - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v6 - - name: Set up Python - uses: actions/setup-python@v6 - with: - python-version: "3.11" - - name: Install pypa/build - run: >- - python3 -m - pip install - build - --user - - name: Build a binary wheel and a source tarball - run: python3 -m build - - name: Store the distribution packages - uses: actions/upload-artifact@v7 - with: - name: python-package-distributions - path: dist/ - publish-to-pypi: - name: >- - Publish Python distribution to PyPI - if: github.ref_type == 'tag' - needs: [lint-and-test, build] - runs-on: ubuntu-latest - environment: - name: release - url: https://pypi.org/p/saluki - permissions: - id-token: write # IMPORTANT: mandatory for trusted publishing - steps: - - name: Download all the dists - uses: actions/download-artifact@v8 - with: - name: python-package-distributions - path: dist/ - - name: Publish distribution to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 - github-release: - name: >- - Sign the Python distribution with Sigstore - and upload them to GitHub Release - needs: [lint-and-test, build, publish-to-pypi] - runs-on: ubuntu-latest - - permissions: - contents: write # IMPORTANT: mandatory for making GitHub Releases - id-token: write # IMPORTANT: mandatory for sigstore - - steps: - - name: Download all the dists - uses: actions/download-artifact@v8 - with: - name: python-package-distributions - path: dist/ - - name: Sign the dists with Sigstore - uses: sigstore/gh-action-sigstore-python@v3.2.0 - with: - inputs: >- - ./dist/*.tar.gz - ./dist/*.whl - - name: Create GitHub Release - env: - GITHUB_TOKEN: ${{ github.token }} - run: >- - gh release create - '${{ github.ref_name }}' - --repo '${{ github.repository }}' - --notes "" - - name: Upload artifact signatures to GitHub Release - env: - GITHUB_TOKEN: ${{ github.token }} - # Upload to GitHub Release using the `gh` CLI. - # `dist/` contains the built packages, and the - # sigstore-produced signatures and certificates. - run: >- - gh release upload - '${{ github.ref_name }}' dist/** - --repo '${{ github.repository }}' diff --git a/.gitignore b/.gitignore index e8e547a..a10c481 100644 --- a/.gitignore +++ b/.gitignore @@ -173,3 +173,8 @@ cython_debug/ .idea/ + + +# Added by cargo + +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..9b01ece --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,1106 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse 0.2.7", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse 1.0.0", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "bitflags" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" + +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + +[[package]] +name = "cc" +version = "1.2.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423" +dependencies = [ + "find-msvc-tools", + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures", + "rand_core", +] + +[[package]] +name = "clap" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap-verbosity-flag" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d92b1fab272fe943881b77cc6e920d6543e5b1bfadbd5ed81c7c5a755742394" +dependencies = [ + "clap", + "log", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream 1.0.0", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + +[[package]] +name = "cmake" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +dependencies = [ + "cc", +] + +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + +[[package]] +name = "env_filter" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" +dependencies = [ + "anstream 0.6.21", + "anstyle", + "env_filter", + "jiff", + "log", +] + +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "flatbuffers" +version = "25.12.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35f6839d7b3b98adde531effaf34f0c2badc6f4735d26fe74709d8e513a96ef3" +dependencies = [ + "bitflags", + "rustc_version", +] + +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "rand_core", + "wasip2", + "wasip3", +] + +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + +[[package]] +name = "indexmap" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +dependencies = [ + "equivalent", + "hashbrown 0.16.1", + "serde", + "serde_core", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + +[[package]] +name = "isis_streaming_data_types" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470be50383d88e3010588fa9d6e98466a8f0c718b1b50c91ad51116c7b44a6b5" +dependencies = [ + "flatbuffers", +] + +[[package]] +name = "itoa" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" + +[[package]] +name = "jiff" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a3546dc96b6d42c5f24902af9e2538e82e39ad350b0c766eb3fbf2d8f3d8359" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", +] + +[[package]] +name = "jiff-static" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "js-sys" +version = "0.3.91" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49715b7073f385ba4bc528e5747d02e66cb39c6146efb66b781f131f0fb399c" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + +[[package]] +name = "libc" +version = "0.2.183" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" + +[[package]] +name = "libm" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" + +[[package]] +name = "libz-sys" +version = "1.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52f4c29e2a68ac30c9087e1b772dc9f44a2b66ed44edf2266cf2be9b03dafc1" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", + "libm", +] + +[[package]] +name = "num_enum" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0bca838442ec211fa11de3a8b0e0e8f3a4522575b5c4c06ed722e005036f26" +dependencies = [ + "num_enum_derive", + "rustversion", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "680998035259dcfcafe653688bf2aa6d3e2dc05e98be6ab46afb089dc84f1df8" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + +[[package]] +name = "portable-atomic-util" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "091397be61a01d4be58e7841595bd4bfedb15f1cd54977d79b8271e94ed799a3" +dependencies = [ + "portable-atomic", +] + +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + +[[package]] +name = "proc-macro-crate" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" +dependencies = [ + "toml_edit", +] + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + +[[package]] +name = "rand" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" +dependencies = [ + "chacha20", + "getrandom", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba" + +[[package]] +name = "rand_distr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d431c2703ccf129de4d45253c03f49ebb22b97d6ad79ee3ecfc7e3f4862c1d8" +dependencies = [ + "num-traits", + "rand", +] + +[[package]] +name = "rdkafka" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7956f9ac12b5712e50372d9749a3102f4810a8d42481c5eae3748d36d585bcf" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.10.0+2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e234cf318915c1059d4921ef7f75616b5219b10b46e9f3a511a15eb4b56a3f77" +dependencies = [ + "cmake", + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + +[[package]] +name = "rstest" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5a3193c063baaa2a95a33f03035c8a72b83d97a54916055ba22d35ed3839d49" +dependencies = [ + "futures-timer", + "futures-util", + "rstest_macros", +] + +[[package]] +name = "rstest_macros" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c845311f0ff7951c5506121a9ad75aec44d083c31583b2ea5a30bcb0b0abba0" +dependencies = [ + "cfg-if", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn", + "unicode-ident", +] + +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "saluki" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "clap-verbosity-flag", + "env_logger", + "flatbuffers", + "isis_streaming_data_types", + "log", + "rand", + "rand_distr", + "rdkafka", + "rstest", + "serde_json", + "uuid", +] + +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +dependencies = [ + "pin-project-lite", +] + +[[package]] +name = "toml_datetime" +version = "1.0.1+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b320e741db58cac564e26c607d3cc1fdc4a88fd36c879568c07856ed83ff3e9" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_edit" +version = "0.25.5+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ca1a40644a28bce036923f6a431df0b34236949d111cc07cb6dca830c9ef2e1" +dependencies = [ + "indexmap", + "toml_datetime", + "toml_parser", + "winnow", +] + +[[package]] +name = "toml_parser" +version = "1.0.10+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7df25b4befd31c4816df190124375d5a20c6b6921e2cad937316de3fccd63420" +dependencies = [ + "winnow", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "uuid" +version = "1.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" +dependencies = [ + "getrandom", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "wasip2" +version = "1.0.2+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6532f9a5c1ece3798cb1c2cfdba640b9b3ba884f5db45973a6f442510a87d38e" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18a2d50fcf105fb33bb15f00e7a77b772945a2ee45dcf454961fd843e74c18e6" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03ce4caeaac547cdf713d280eda22a730824dd11e6b8c3ca9e42247b25c631e3" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75a326b8c223ee17883a4251907455a2431acc2791c98c26279376490c378c16" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "winnow" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a90e88e4667264a994d34e6d1ab2d26d398dcdca8b7f52bec8668957517fc7d8" +dependencies = [ + "memchr", +] + +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f9c3d86 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "saluki" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0.102" +clap = { version = "4.6.0", features = ["derive"] } +log = "0.4.29" +rdkafka = { version = "0.39.0", features = ["cmake-build"] } +uuid = { version = "1.22.0", features = ["v4"] } +env_logger = "0.11.9" +clap-verbosity-flag = "3.0.4" +isis_streaming_data_types = "0.0.6" +rand = { version = "0.10.0", features = ["thread_rng"]} +rand_distr = "0.6.0" +flatbuffers = "25.12.19" +serde_json = "1.0.149" + +[dev-dependencies] +rstest = "0.26.1" diff --git a/Containerfile b/Containerfile new file mode 100644 index 0000000..4741f04 --- /dev/null +++ b/Containerfile @@ -0,0 +1,9 @@ +FROM rust:1-trixie AS builder +WORKDIR /usr/src/saluki +RUN apt-get update && apt-get install -y build-essential cmake libcurl4-openssl-dev && rm -rf /var/lib/apt/lists/* +COPY . . +RUN cargo install --path . + +FROM debian:trixie-slim +COPY --from=builder /usr/local/cargo/bin/saluki /usr/local/bin/saluki +ENTRYPOINT ["saluki"] diff --git a/README.md b/README.md index 04e27db..27f3f77 100644 --- a/README.md +++ b/README.md @@ -13,19 +13,20 @@ alternatively you can `pip install saluki` and run it from a `venv`. See `saluki --help` for all options. -## `listen` - Listen to a topic for updates -`saluki listen mybroker:9092/mytopic` - This will listen for updates for `mytopic` on `mybroker`. - -### Filter to specific schemas +## `consume`- Consume from a topic +> [!NOTE] +> An alias for `saluki listen` exists for `consume` - the two commands were merged when `saluki` was rewritten in rust. -`saluki listen mybroker:9092/mytopic -f f144 -f f142` - This will listen for updates but ignore messages with schema IDs of `f142` or `f144` +This continuously listens to a topic, deserialises messages and prints them. -## `consume`- Consume from a topic +It can also be given a `--messages` flag to limit the number of messages to print: `saluki consume mybroker:9092/mytopic -p 1 -o 123456 -m 10` - This will print 9 messages before (and inclusively the offset specified) offset `123456` of `mytopic` on `mybroker`, in partition 1. -Use the `-g` flag to go the other way, ie. in the above example to consume the 9 messages _after_ offset 123456 +Use the `--last` flag to consume the last `x` messages on the topic, ie. `saluki consume mybroker:9092/mytopic --last 5` will print the last 5 messages. -You can also filter out messages to specific schema(s) with the `-f` flag, like the example above for `listen`. +### Filter to specific schemas + +`saluki consume mybroker:9092/mytopic -f f144` - This will listen for updates but ignore messages with schema IDs of `f144` ## `sniff` - List all topics and their high, low watermarks and number of messages `saluki sniff mybroker:9092` @@ -51,6 +52,9 @@ INFO:saluki: 0 - low:7515, high:7551, num_messages:36 ## `play` - Replay data from one topic to another +> [!IMPORTANT] +> This functionality was not ported over when `saluki` was rewritten in rust. [this issue](https://github.com/ISISComputingGroup/saluki/issues/50) exists to do so. + ### Between offsets `saluki play mybroker:9092/source_topic mybroker:9092/dest_topic -o 123 125` - This will forward messages at offset 123, 124 and 125 in the `source_topic` to the `dest_topic` diff --git a/pyproject.toml b/pyproject.toml deleted file mode 100644 index f882e18..0000000 --- a/pyproject.toml +++ /dev/null @@ -1,56 +0,0 @@ -[build-system] -requires = ["setuptools>=64", "setuptools_scm>=8"] -build-backend = "setuptools.build_meta" - -[project] -name = "saluki" -dynamic = ["version"] -dependencies = [ - "ess-streaming-data-types", - "confluent-kafka>=2.12.1", # for produce_batch in play() - "python-dateutil", - "tzdata" -] -readme = {file = "README.md", content-type = "text/markdown"} -license-files = ["LICENSE"] -requires-python = ">=3.12" - -[project.scripts] -saluki = "saluki.main:main" - -[project.urls] -"Homepage" = "https://github.com/ISISComputingGroup/saluki" -"Bug Reports" = "https://github.com/ISISComputingGroup/saluki/issues" -"Source" = "https://github.com/ISISComputingGroup/saluki" - -[project.optional-dependencies] -dev = [ - "pyright", - "ruff", - "pytest", - "pytest-cov" -] - -[tool.coverage.run] -branch = true -source = ["src"] - -[tool.coverage.report] -fail_under = 100 -exclude_lines = [ - "pragma: no cover", - "if TYPE_CHECKING:", - "if typing.TYPE_CHECKING:", - "@abstractmethod", -] -omit = ["main.py"] # No logic here besides argparse - - -[tool.coverage.html] -directory = "coverage_html_report" - -[tool.pytest.ini_options] -testpaths = "tests" -addopts = "--cov --cov-report=html -vv" - -[tool.setuptools_scm] diff --git a/src/cli_utils.rs b/src/cli_utils.rs new file mode 100644 index 0000000..c1baee8 --- /dev/null +++ b/src/cli_utils.rs @@ -0,0 +1,125 @@ +use anyhow::{Context, Result, bail}; + +pub(crate) fn parse_broker_spec(s: &str) -> Result { + let b = parse_broker_spec_optional_topic(s)?; + + if b.topic.is_none() { + bail!("Topic cannot be empty"); + } + + Ok(BrokerAndTopic { + host: b.host.to_string(), + port: b.port, + topic: b.topic.unwrap(), + }) +} + +pub(crate) fn parse_broker_spec_optional_topic(s: &str) -> Result { + let (host_port, topic) = match s.split_once('/') { + Some((l, r)) => (l.to_string(), Some(r.to_string())), + None => (s.to_string(), None), + }; + + let (host, port_str) = host_port + .rsplit_once(':') + .context("Missing ':' separating host and port")?; + + let port = port_str + .parse::() + .with_context(|| "Invalid port number")?; + + Ok(BrokerAndOptionalTopic { + host: host.to_string(), + port, + topic, + }) +} + +#[derive(Clone, Debug)] +pub struct BrokerAndTopic { + pub host: String, + pub port: u16, + pub topic: String, +} + +impl BrokerAndTopic { + pub(crate) fn broker(&self) -> String { + format!("{}:{}", self.host, self.port) + } +} + +#[derive(Clone, Debug)] +pub struct BrokerAndOptionalTopic { + pub host: String, + pub port: u16, + pub topic: Option, +} + +impl BrokerAndOptionalTopic { + pub(crate) fn broker(&self) -> String { + format!("{}:{}", self.host, self.port) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::*; + + #[rstest] + #[case("localhost", 9092, "mytopic")] + #[case("localhost.co.uk", 9093, "mytopic1")] + #[case("192.168.0.1", 9094, "mytopic2")] + fn test_parse_correct_topic(#[case] host: String, #[case] port: u16, #[case] topic: String) { + let res = parse_broker_spec(&format!("{host}:{port}/{topic}")).unwrap(); + assert_eq!(res.host, host); + assert_eq!(res.port, port); + assert_eq!(res.topic, topic); + } + + #[test] + fn test_parse_empty_topic() { + let res = parse_broker_spec_optional_topic("localhost:9092").unwrap(); + assert_eq!(res.host, "localhost"); + assert_eq!(res.port, 9092); + assert_eq!(res.topic, None); + } + + #[test] + fn test_parse_missing_port_bails() { + let res = parse_broker_spec_optional_topic("localhost").unwrap_err(); + assert_eq!(res.to_string(), "Missing ':' separating host and port"); + } + + #[test] + fn test_parse_empty_string() { + let res = parse_broker_spec_optional_topic("").unwrap_err(); + assert_eq!(res.to_string(), "Missing ':' separating host and port"); + } + + #[test] + fn parse_broker_spec_missing_topic() { + let res = parse_broker_spec("localhost:9092").unwrap_err(); + assert_eq!(res.to_string(), "Topic cannot be empty"); + } + + #[test] + fn broker_and_topic_to_string() { + let b = BrokerAndTopic { + host: "localhost".to_string(), + port: 9092, + topic: "mytopic".to_string(), + }; + assert_eq!(b.broker(), "localhost:9092"); + } + + #[test] + fn broker_and_optional_topic_to_string() { + let b = BrokerAndOptionalTopic { + host: "localhost".to_string(), + port: 9092, + topic: None, + }; + assert_eq!(b.broker(), "localhost:9092"); + } +} diff --git a/src/consume.rs b/src/consume.rs new file mode 100644 index 0000000..e10c389 --- /dev/null +++ b/src/consume.rs @@ -0,0 +1,123 @@ +use crate::cli_utils::BrokerAndTopic; +use isis_streaming_data_types::{deserialize_message, get_schema_id}; +use log::{debug, error, info}; +use rdkafka::consumer::{BaseConsumer, Consumer}; +use rdkafka::message::ToBytes; +use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; +use std::time::Duration; +use uuid::Uuid; + +pub fn consume( + topic: &BrokerAndTopic, + partition: Option, + filter: &Option, + num_messages: Option, + offset: Option, + last: Option, + timestamp: Option, +) { + debug!( + "Listening to topic: {} partition {:?} on broker {}:{}, filtering {}", + topic.topic, + partition, + topic.host, + topic.port, + filter.as_deref().unwrap_or("none") + ); + let consumer: BaseConsumer = ClientConfig::new() + .set("group.id", Uuid::new_v4().to_string()) + .set("bootstrap.servers", topic.broker()) + .create() + .expect("Consumer creation failed"); + + let start: Option; + + let (low_watermark, high_watermark) = consumer + .fetch_watermarks(&topic.topic, partition.unwrap_or(0), Duration::from_secs(1)) + .unwrap_or_else(|_| panic!("Failed to get watermarks for topic {}", topic.topic)); + let num_messages_on_topic = high_watermark - low_watermark; + + if timestamp.is_some() { + let mut tpl = TopicPartitionList::new(); + tpl.add_partition(&topic.topic, partition.unwrap_or(0)); + start = Some( + consumer + .offsets_for_times(tpl, Duration::from_secs(1)) + .expect("Failed to get offset for time: {timestamp}") + .elements() + .first() + .unwrap() + .offset(), + ); + } else if offset.is_some() { + assert!( + offset.unwrap() >= low_watermark && offset.unwrap() <= high_watermark, + "offset ({offset:?}) must be between high ({high_watermark}) and low({low_watermark}) watermarks" + ); + start = Some(Offset::Offset(offset.unwrap())); + } else if let Some(last_num_messages) = last { + assert!( + last_num_messages <= num_messages_on_topic, + "Cannot consume {last_num_messages:?} messages from a topic which only has {num_messages_on_topic} messages" + ); + start = Some(Offset::Offset(high_watermark - last_num_messages)); + } else { + start = None; + } + + if let Some(_start) = start { + info!("Starting at {_start:?}"); + let mut tpl = TopicPartitionList::new(); + tpl.add_partition_offset(&topic.topic, partition.unwrap_or(0), _start) + .unwrap_or_else(|_| panic!("Failed to set partition offset to {:?}", _start)); + consumer.assign(&tpl).expect("Failed to assign to topic"); + } + + consumer + .subscribe(&[&*topic.topic]) + .unwrap_or_else(|_| panic!("Failed to subscribe to topic {}", topic.topic)); + + let mut counter = 0; + for message in consumer.iter() { + match message { + Ok(message) => { + if partition.is_some() && message.partition() != partition.unwrap() { + continue; + } + + match message.payload() { + Some(p) => { + if let Some(f) = filter { + if let Some(schema_id) = get_schema_id(p) + && schema_id != f.to_bytes() + { + continue; + } + debug!("Message has no schema id, ignoring filter") + } + + match deserialize_message(p) { + Ok(d) => println!("{d:?}"), + Err(e) => error!("Failed to deserialize message: {e:?}"), + } + } + None => { + error!("No payload in message"); + continue; + } + } + } + Err(e) => { + error!("Consumer error: {e:?}"); + break; + } + } + counter += 1; + if (num_messages.is_some() && counter == num_messages.unwrap()) + || (last.is_some() && counter == last.unwrap()) + { + info!("Reached {} messages, exiting", num_messages.unwrap()); + break; + } + } +} diff --git a/src/howl.rs b/src/howl.rs new file mode 100644 index 0000000..e870824 --- /dev/null +++ b/src/howl.rs @@ -0,0 +1,345 @@ +use std::thread; +use std::time::{Duration, SystemTime}; + +use flatbuffers::FlatBufferBuilder; +use isis_streaming_data_types::flatbuffers_generated::events_ev44::{ + Event44Message, Event44MessageArgs, finish_event_44_message_buffer, +}; +use isis_streaming_data_types::flatbuffers_generated::run_start_pl72::{ + RunStart, RunStartArgs, SpectraDetectorMapping, SpectraDetectorMappingArgs, + finish_run_start_buffer, +}; +use isis_streaming_data_types::flatbuffers_generated::run_stop_6s4t::{ + RunStop, RunStopArgs, finish_run_stop_buffer, +}; +use log::{debug, error, info, warn}; +use rand::RngExt; +use rand::prelude::ThreadRng; +use rand_distr::{Distribution, Normal}; +use rdkafka::ClientConfig; +use rdkafka::producer::{BaseRecord, DefaultProducerContext, ThreadedProducer}; +use serde_json::json; +use uuid::Uuid; + +fn generate_run_start<'a>( + fbb: &'a mut FlatBufferBuilder<'_>, + det_max: i32, + topic_prefix: &str, + job_id: &str, +) -> &'a [u8] { + fbb.reset(); + let args = SpectraDetectorMappingArgs { + spectrum: Some(fbb.create_vector(&(0..=det_max).collect::>())), + detector_id: Some(fbb.create_vector(&(0..=det_max).collect::>())), + n_spectra: det_max, + }; + let events_topic = format!("{topic_prefix}_rawEvents"); + + let nexus_structure = json!( { + "children": [ + { + "type": "group", + "name": "raw_data_1", + "children": [ + { + "type": "group", + "name": "events", + "children": [ + { + "type": "stream", + "stream": { + "topic": events_topic, + "source": "saluki_howl", + "writer_module": "ev44", + }, + }, + ], + "attributes": [{"name": "NX_class", "values": "NXentry"}], + }, + ], + "attributes": [{"name": "NX_class", "values": "NXentry"}], + } + ] + }); + + let det_spec_map_buf = SpectraDetectorMapping::create(fbb, &args); + let file_name = Uuid::new_v4().to_string(); + let run_name = format!("saluki-howl-{}", Uuid::new_v4()); + + let start_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis(); + + let run_start_args = RunStartArgs { + start_time: start_time as u64, + stop_time: 0, // TODO check this - it's optional so not necessarily 0 + run_name: Some(fbb.create_string(&run_name)), + instrument_name: Some(fbb.create_string("saluki-howl")), + nexus_structure: Some(fbb.create_string(&nexus_structure.to_string())), + job_id: Some(fbb.create_string(job_id)), + broker: None, + service_id: None, + filename: Some(fbb.create_string(&file_name)), + n_periods: 1, + detector_spectrum_map: Some(det_spec_map_buf), + metadata: None, + control_topic: None, + }; + let run_start_buf = RunStart::create(fbb, &run_start_args); + + finish_run_start_buffer(fbb, run_start_buf); + fbb.finished_data() +} + +fn generate_run_stop<'a>(fbb: &'a mut FlatBufferBuilder<'_>, job_id: &str) -> &'a [u8] { + fbb.reset(); + let stop_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis(); + + let run_stop_args = RunStopArgs { + stop_time: stop_time as u64, + run_name: None, + job_id: Some(fbb.create_string(job_id)), + service_id: None, + command_id: None, + }; + let run_stop_buf = RunStop::create(fbb, &run_stop_args); + finish_run_stop_buffer(fbb, run_stop_buf); + fbb.finished_data() +} + +#[allow(clippy::too_many_arguments)] +fn produce_messages( + producer: &ThreadedProducer, + fbb: &mut FlatBufferBuilder, + rng: &mut ThreadRng, + frame: u32, + topic_prefix: &str, + events_per_message: i32, + messages_per_frame: u32, + frames_per_run: u32, + tof_peak: f32, + tof_sigma: f32, + det_min: i32, + det_max: i32, + mut current_job_id: String, +) -> String { + // get currnet time + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as f32; + + for _ in 0..messages_per_frame { + match producer.send( + BaseRecord::to(format!("{topic_prefix}_rawEvents").as_str()) + .key("") + .payload(generate_fake_events( + fbb, + rng, + frame, + events_per_message, + tof_peak, + tof_sigma, + det_min, + det_max, + now, + )), + ) { + Ok(_) => {} + Err(err) => { + error!("Failed to send messages: {}", err.0); + } + } + } + + if frames_per_run > 0 && frame.is_multiple_of(frames_per_run) { + info!("Starting new run after {frames_per_run} simulated frames"); + match producer.send( + BaseRecord::to(format!("{topic_prefix}_runInfo").as_str()) + .key("") + .payload(generate_run_start( + fbb, + det_max, + topic_prefix, + ¤t_job_id, + )), + ) { + Ok(_) => {} + Err(err) => { + error!("Failed to send run start: {}", err.0); + } + } + current_job_id = Uuid::new_v4().to_string(); + match producer.send( + BaseRecord::to(format!("{topic_prefix}_runInfo").as_str()) + .key("") + .payload(generate_run_stop(fbb, ¤t_job_id)), + ) { + Ok(_) => {} + Err(err) => { + error!("Failed to send run stop: {}", err.0); + } + } + } + + current_job_id +} + +#[allow(clippy::too_many_arguments)] +fn generate_fake_events<'a>( + fbb: &'a mut FlatBufferBuilder<'_>, + rng: &mut ThreadRng, + msg_id: u32, + events_per_message: i32, + tof_peak: f32, + tof_sigma: f32, + det_min: i32, + det_max: i32, + timestamp: f32, +) -> &'a [u8] { + fbb.reset(); + + let det_ids: Vec = (0..events_per_message) + .map(|_| rng.random_range(det_min..=det_max)) + .collect(); + + let normal = Normal::new(tof_peak, tof_sigma).unwrap(); + let tofs: Vec = (0..events_per_message) + .map(|_| normal.sample(rng) as i32) + .collect(); + + let args = Event44MessageArgs { + source_name: Some(fbb.create_string("saluki")), + message_id: msg_id as i64, + reference_time: Some(fbb.create_vector(&[(timestamp * 1_000_000_000.0) as i64])), + reference_time_index: Some(fbb.create_vector(&[0])), + time_of_flight: Some(fbb.create_vector(&tofs)), + pixel_id: Some(fbb.create_vector(&det_ids)), + }; + let ev44 = Event44Message::create(fbb, &args); + finish_event_44_message_buffer(fbb, ev44); + fbb.finished_data() +} + +#[allow(clippy::too_many_arguments)] +pub fn howl( + broker: &str, + topic_prefix: &str, + events_per_message: i32, + messages_per_frame: u32, + frames_per_second: u32, + frames_per_run: u32, + tof_peak: f32, + tof_sigma: f32, + det_min: i32, + det_max: i32, +) { + // create producer + let mut fbb = FlatBufferBuilder::new(); + let mut rng = rand::rng(); + + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as f32; + let ev44_size = generate_fake_events( + &mut fbb, + &mut rng, + 0, + events_per_message, + tof_peak, + tof_sigma, + det_min, + det_max, + now, + ) + .len() as u32; + + debug!("ev44 size is {ev44_size} bytes"); + + // calculate rate + let rate_bytes_per_sec = ev44_size * messages_per_frame * frames_per_second; + debug!("bytes per second: {rate_bytes_per_sec}"); + + let rate_mbit_per_sec: f32 = (rate_bytes_per_sec as f32 / 1024f32.powf(2.0)) * 8.0; + let rate_mebibits_per_sec = rate_mbit_per_sec / 8.0; + debug!("rate mbit per sec: {rate_mbit_per_sec}"); + println!( + "Attempting to simulate data rate: {rate_mbit_per_sec:.3} Mbit/s ({rate_mebibits_per_sec:.3} MiB/s)" + ); + println!("Each ev44 is {ev44_size} bytes"); + + let producer: ThreadedProducer = ClientConfig::new() + .set("bootstrap.servers", broker) + .create() + .expect("Producer creation error"); + + let mut current_job_id = Uuid::new_v4().to_string(); + + let runinfo_topic = format!("{topic_prefix}_runInfo"); + + producer + .send( + BaseRecord::to(runinfo_topic.as_str()) + .key("") + .payload(generate_run_start( + &mut fbb, + det_max, + topic_prefix, + ¤t_job_id, + )), + ) + .expect("Failed to enqueue run start message"); + + let target_frame_time = Duration::from_secs_f64(1.0 / frames_per_second as f64); + debug!("Target frame time: {target_frame_time:?}"); + + let mut frames: u32 = 0; + + let mut target_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + debug!("Target time: {target_time:?}"); + loop { + target_time += target_frame_time; + debug!("New target: {target_time:?}"); + frames += 1; + + current_job_id = produce_messages( + &producer, + &mut fbb, + &mut rng, + frames, + topic_prefix, + events_per_message, + messages_per_frame, + frames_per_run, + tof_peak, + tof_sigma, + det_min, + det_max, + current_job_id, + ); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + + debug!("Current time: {now:?}"); + debug!("Target time: {target_time:?}"); + + if target_time > now { + let sleep_time = target_time - now; + thread::sleep(sleep_time); + } else { + let behind = now - target_time; + warn!( + "saluki howl running {} ms behind schedule", + behind.as_millis() + ) + } + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..500424e --- /dev/null +++ b/src/main.rs @@ -0,0 +1,129 @@ +mod cli_utils; +mod consume; +mod howl; +mod sniff; + +use crate::cli_utils::BrokerAndOptionalTopic; +use crate::howl::howl; +use crate::sniff::sniff; +use clap::{Parser, Subcommand}; +use cli_utils::{BrokerAndTopic, parse_broker_spec, parse_broker_spec_optional_topic}; + +#[derive(Parser, Debug)] +struct Cli { + #[command(subcommand)] + command: Commands, + #[command(flatten)] + verbosity: clap_verbosity_flag::Verbosity, +} + +#[derive(Subcommand, Debug)] +enum Commands { + #[command(alias = "listen")] // for muscle memory's sake... + /// Consume from a topic and deserialise messages. Optionally specify a number of messages to consume. + Consume { + /// topic name, including broker and port. format: broker:port/topic + #[arg(value_parser = parse_broker_spec)] + topic: BrokerAndTopic, + /// Partition - default is all partitions or if using offset/timestamp default is 0. + #[arg(short, long)] + partition: Option, + #[arg(short, long)] + filter: Option, + /// Timestamp to consume from + #[arg(short, long, conflicts_with = "offset")] + timestamp: Option, + /// Offset to consume from + #[arg(short, long, conflicts_with = "timestamp")] + offset: Option, + /// Stop printing after x messages + #[arg(short, long)] + messages: Option, + /// Print last x messages on topic + #[arg(short, long, conflicts_with_all = ["offset","timestamp","messages","filter"])] + last: Option, + }, + /// Print broker metadata. + Sniff { + /// The broker to look at metadata. Optionally suffixed with a topic name to filter to that topic. + #[arg(value_parser = parse_broker_spec_optional_topic)] + broker: BrokerAndOptionalTopic, + }, + Howl { + /// Kafka Broker URL, including port + broker: String, + /// topic prefix to use eg. INSTNAME + topic_prefix: String, + /// Events per ev44 to simulate + #[arg(short, long, default_value = "100")] + events_per_message: i32, + /// Number of ev44 per frame to simulate + #[arg(short, long, default_value = "20")] + messages_per_frame: u32, + /// Frames per second to simulate + #[arg(short, long, default_value = "1")] + frames_per_second: u32, + /// Frames to take before beginning new run (0 to run forever) + #[arg(long, default_value = "0")] + frames_per_run: u32, + /// Time-of-flight peak (ns) + #[arg(long, default_value = "10000000.0")] + tof_peak: f32, + /// Time-of-flight sigma (ns) + #[arg(long, default_value = "2000000.0")] + tof_sigma: f32, + /// Minimum detector ID + #[arg(long, default_value = "0")] + det_min: i32, + /// Maximum detector ID + #[arg(long, default_value = "1000")] + det_max: i32, + }, // TODO Play {}, +} + +fn main() { + let cli = Cli::parse(); + env_logger::Builder::new() + .filter_level(cli.verbosity.into()) + .format_timestamp_micros() + .init(); + + match cli.command { + Commands::Consume { + topic, + partition, + filter, + messages, + offset, + last, + timestamp, + } => consume::consume( + &topic, partition, &filter, messages, offset, last, timestamp, + ), + Commands::Sniff { broker } => sniff(&broker), + Commands::Howl { + broker, + topic_prefix, + events_per_message, + messages_per_frame, + frames_per_second, + frames_per_run, + tof_peak, + tof_sigma, + det_min, + det_max, + } => howl( + &broker, + &topic_prefix, + events_per_message, + messages_per_frame, + frames_per_second, + frames_per_run, + tof_peak, + tof_sigma, + det_min, + det_max, + ), + // Commands::Play {} => {} + } +} diff --git a/src/saluki/__init__.py b/src/saluki/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/src/saluki/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/saluki/consume.py b/src/saluki/consume.py deleted file mode 100644 index 18f9aa1..0000000 --- a/src/saluki/consume.py +++ /dev/null @@ -1,74 +0,0 @@ -import logging -import uuid - -from confluent_kafka import Consumer, TopicPartition - -from saluki.utils import deserialise_and_print_messages - -logger = logging.getLogger("saluki") - - -def consume( - broker: str, - topic: str, - partition: int = 0, - num_messages: int = 1, - offset: int | None = None, - go_forwards: bool = False, - schemas_to_filter_to: list[str] | None = None, - timestamp: int | None = None, -) -> None: - """ - consume from a topic and deserialise each message - - :param broker: the broker address, including the port - :param topic: the topic to use - :param partition: the partition to listen to (default is all partitions in a given topic) - :param num_messages: number of messages to consume - :param offset: offset to consume from/to - :param go_forwards: whether to consume forwards or backwards - :param schemas_to_filter_to: schemas in messages to filter to - :param timestamp: optionally a timestamp as a starting point - :return: None - """ - c = Consumer( - { - "bootstrap.servers": broker, - "group.id": f"saluki-consume-{uuid.uuid4()}", - "session.timeout.ms": 6000, - "auto.offset.reset": "latest", - "enable.auto.offset.store": False, - "enable.auto.commit": False, - "metadata.max.age.ms": 6000, - } - ) - - if timestamp is not None: - offset = c.offsets_for_times([TopicPartition(topic, partition, timestamp)])[0].offset - logger.debug(f"offset for timestamp {timestamp} is {offset}") - - if go_forwards: - if offset is None: - raise ValueError("Can't go forwards without an offset") - start = offset - else: - if offset is not None: - start = offset - num_messages + 1 - else: - start = ( - c.get_watermark_offsets(TopicPartition(topic, partition), cached=False)[1] - - num_messages - ) - - logger.info(f"Starting at offset {start}") - c.assign([TopicPartition(topic, partition, start)]) - - try: - logger.info(f"Consuming {num_messages} messages") - msgs = c.consume(num_messages) - deserialise_and_print_messages(msgs, partition, schemas_to_filter_to) - except Exception: - logger.exception("Got exception while consuming:") - finally: - logger.debug(f"Closing consumer {c}") - c.close() diff --git a/src/saluki/listen.py b/src/saluki/listen.py deleted file mode 100644 index 82a205d..0000000 --- a/src/saluki/listen.py +++ /dev/null @@ -1,47 +0,0 @@ -import logging -import uuid - -from confluent_kafka import Consumer, TopicPartition - -from saluki.utils import deserialise_and_print_messages - -logger = logging.getLogger("saluki") - - -def listen( - broker: str, - topic: str, - partition: int | None = None, - schemas_to_filter_to: list[str] | None = None, -) -> None: - """ - Listen to a topic and deserialise each message - :param broker: the broker address, including the port - :param topic: the topic to use - :param partition: the partition to listen to (default is all partitions in a given topic) - :param schemas_to_filter_to: schemas to filter when listening to messages - :return: None - """ - c = Consumer( - { - "bootstrap.servers": broker, - "group.id": f"saluki-listen-{uuid.uuid4()}", - "auto.offset.reset": "latest", - "enable.auto.commit": False, - } - ) - c.subscribe([topic]) - if partition is not None: - c.assign([TopicPartition(topic, partition)]) - try: - logger.info(f"listening to {broker}/{topic}") - while True: - msg = c.poll(1.0) - deserialise_and_print_messages( - [msg], partition, schemas_to_filter_to=schemas_to_filter_to - ) - except KeyboardInterrupt: - logger.debug("finished listening") - finally: - logger.debug(f"closing consumer {c}") - c.close() diff --git a/src/saluki/main.py b/src/saluki/main.py deleted file mode 100644 index 1bcf97b..0000000 --- a/src/saluki/main.py +++ /dev/null @@ -1,175 +0,0 @@ -import argparse -import logging -import sys - -from saluki.consume import consume -from saluki.listen import listen -from saluki.play import play -from saluki.sniff import sniff -from saluki.utils import dateutil_parsable_or_unix_timestamp, parse_kafka_uri - -logger = logging.getLogger("saluki") -logging.basicConfig(level=logging.INFO) - -_LISTEN = "listen" -_CONSUME = "consume" -_PLAY = "play" -_SNIFF = "sniff" - - -def main() -> None: - parser = argparse.ArgumentParser( - prog="saluki", - description="serialise/de-serialise flatbuffers and consume/produce from/to kafka", - ) - common_options = argparse.ArgumentParser(add_help=False) - common_options.add_argument("-v", "--verbose", help="show DEBUG logs", action="store_true") - common_options.add_argument( - "-l", - "--log-file", - help="filename to output all data to", - required=False, - default=None, - type=argparse.FileType("a"), - ) - - topic_parser = argparse.ArgumentParser(add_help=False) - topic_parser.add_argument("topic", type=str, help="Kafka topic. format is broker<:port>/topic") - - topic_parser.add_argument( - "-X", - "--kafka-config", - help="kafka options to pass through to librdkafka", - required=False, - default=None, - ) - topic_parser.add_argument("-p", "--partition", required=False, type=int, default=0) - topic_parser.add_argument("-f", "--filter", required=False, action="append") - - sub_parsers = parser.add_subparsers(help="sub-command help", required=True, dest="command") - - sniff_parser = sub_parsers.add_parser( - _SNIFF, help="sniff - broker metadata", parents=[common_options] - ) - sniff_parser.add_argument( - "broker", type=str, help="broker, optionally suffixed with a topic name to filter to" - ) - - consumer_parser = argparse.ArgumentParser(add_help=False) - consumer_parser.add_argument( - "-e", - "--entire", - help="show all elements of an array in a message (truncated by default)", - default=False, - required=False, - ) - - consumer_mode_parser = sub_parsers.add_parser( - _CONSUME, help="consumer mode", parents=[topic_parser, consumer_parser, common_options] - ) - consumer_mode_parser.add_argument( - "-m", - "--messages", - help="How many messages to go back", - type=int, - required=False, - default=1, - ) - - consumer_mode_parser.add_argument("-g", "--go-forwards", required=False, action="store_true") - cg = consumer_mode_parser.add_mutually_exclusive_group(required=False) - cg.add_argument( - "-o", - "--offset", - help="offset to consume from", - type=int, - ) - cg.add_argument( - "-t", - "--timestamp", - help="timestamp to consume from", - type=dateutil_parsable_or_unix_timestamp, - ) - - listen_parser = sub_parsers.add_parser( # noqa: F841 - _LISTEN, - help="listen mode - listen until KeyboardInterrupt", - parents=[topic_parser, consumer_parser, common_options], - ) - - play_parser = sub_parsers.add_parser( - _PLAY, - help="replay mode - replay data into another topic", - parents=[common_options], - ) - play_parser.add_argument("topics", type=str, nargs=2, help="SRC topic DEST topic") - g = play_parser.add_mutually_exclusive_group(required=True) - g.add_argument( - "-o", - "--offsets", - help="offsets to replay between (inclusive)", - type=int, - nargs=2, - ) - g.add_argument( - "-t", - "--timestamps", - help="timestamps to replay between in ISO8601 or RFC3339 format ie." - ' "2025-11-17 07:00:00 or as a unix timestamp" ', - type=dateutil_parsable_or_unix_timestamp, - nargs=2, - ) - - if len(sys.argv) == 1: - parser.print_help() - sys.exit(1) - args = parser.parse_args() - - if args.verbose: - logger.setLevel(logging.DEBUG) - - if args.log_file: - logger.addHandler(logging.FileHandler(args.log_file.name)) - - if "kafka_config" in args and args.kafka_config is not None: - raise NotImplementedError("-X is not implemented yet.") - - if args.command == _LISTEN: - broker, topic = parse_kafka_uri(args.topic) - listen(broker, topic, args.partition, args.filter) - elif args.command == _CONSUME: - broker, topic = parse_kafka_uri(args.topic) - consume( - broker, - topic, - args.partition, - args.messages, - args.offset, - args.go_forwards, - args.filter, - args.timestamp, - ) - elif args.command == _PLAY: - src_broker, src_topic = parse_kafka_uri(args.topics[0]) - dest_broker, dest_topic = parse_kafka_uri(args.topics[1]) - - play( - src_broker, - src_topic, - dest_broker, - dest_topic, - args.offsets, - args.timestamps, - ) - elif args.command == _SNIFF: - try: - broker, topic = parse_kafka_uri(args.broker) - logger.debug(f"Sniffing single topic {topic} on broker {broker}") - sniff(broker, topic) - except RuntimeError: - logger.debug(f"Sniffing whole broker {args.broker}") - sniff(args.broker) - - -if __name__ == "__main__": - main() diff --git a/src/saluki/play.py b/src/saluki/play.py deleted file mode 100644 index 1f058e8..0000000 --- a/src/saluki/play.py +++ /dev/null @@ -1,84 +0,0 @@ -import logging -import uuid - -from confluent_kafka import Consumer, Producer, TopicPartition - -logger = logging.getLogger("saluki") - - -def play( - src_broker: str, - src_topic: str, - dest_broker: str, - dest_topic: str, - offsets: list[int] | None, - timestamps: list[int] | None, -) -> None: - """ - Replay data from src_topic to dest_topic between the offsets OR timestamps specified. - This currently assumes contiguous data in a topic (ie. no log compaction) and uses partition 0. - It also does not copy message timestamps. - - :param src_broker: The source broker, including port. - :param src_topic: The topic to replay data from. - :param dest_broker: The destination broker, including port. - :param dest_topic: The topic to replay data to. - :param offsets: The start and finish offsets to replay data from. - :param timestamps: The start and finish timestamps to replay data from. - """ - - consumer = Consumer( - { - "bootstrap.servers": src_broker, - "group.id": f"saluki-play-{uuid.uuid4()}", - } - ) - producer = Producer( - { - "bootstrap.servers": dest_broker, - } - ) - src_partition = 0 - - if timestamps is not None: - logger.debug(f"getting offsets for times: {timestamps[0]} and {timestamps[1]}") - start_offset = consumer.offsets_for_times( - [ - TopicPartition(src_topic, src_partition, timestamps[0]), - ] - )[0] - # See https://github.com/confluentinc/confluent-kafka-python/issues/1178 - # as to why offsets_for_times is called twice. - stop_offset = consumer.offsets_for_times( - [TopicPartition(src_topic, src_partition, timestamps[1])] - )[0] - elif offsets is not None: - start_offset = TopicPartition(src_topic, src_partition, offsets[0]) - stop_offset = TopicPartition(src_topic, src_partition, offsets[1]) - else: - raise ValueError("offsets and timestamps cannot both be None") - - logger.debug(f"start_offset: {start_offset.offset}, stop_offset: {stop_offset.offset}") - - logger.debug(f"assigning to offset {start_offset.offset}") - consumer.assign([start_offset]) - - num_messages = stop_offset.offset - start_offset.offset + 1 - - try: - msgs = consumer.consume(num_messages) - logger.debug(f"finished consuming {num_messages} messages") - consumer.close() - producer.produce_batch( - dest_topic, [{"key": message.key(), "value": message.value()} for message in msgs] - ) - logger.debug(f"flushing producer. len(p): {len(producer)}") - producer.flush(timeout=10) - - logger.debug(f"length after flushing: {len(producer)}") - - except Exception: - logger.exception("Got exception while replaying:") - finally: - logger.debug(f"Closing consumer {consumer}") - consumer.close() diff --git a/src/saluki/sniff.py b/src/saluki/sniff.py deleted file mode 100644 index 795e3b0..0000000 --- a/src/saluki/sniff.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging -import uuid - -from confluent_kafka import Consumer, TopicPartition -from confluent_kafka.admin import AdminClient - -logger = logging.getLogger("saluki") - - -def sniff(broker: str, topic: str | None = None) -> None: - """ - Prints the broker and topic metadata for a given broker. - If a topic is given, only this topic's partitions and watermarks will be printed. - :param broker: The broker address including port number. - :param topic: Optional topic to filter information to. - """ - a = AdminClient({"bootstrap.servers": broker}) - c = Consumer({"bootstrap.servers": broker, "group.id": f"saluki-sniff-{uuid.uuid4()}"}) - t = a.list_topics(timeout=5) - if topic is not None and topic not in t.topics.keys(): - logger.warning(f"Topic {topic} not found on broker {broker}") - return - - if topic is None: - logger.info(f"Cluster ID: {t.cluster_id}") - logger.info("Brokers:") - for value in t.brokers.values(): - logger.info(f"\t{value}") - - logger.info("Topics:") - - for k, v in t.topics.items(): - if topic is not None and k != topic: - continue - partitions = v.partitions.keys() - logger.info(f"\t{k}:") - for p in partitions: - tp = TopicPartition(k, p) - low, high = c.get_watermark_offsets(tp) - logger.info(f"\t\t{tp.partition} - low:{low}, high:{high}, num_messages:{high - low}") diff --git a/src/saluki/utils.py b/src/saluki/utils.py deleted file mode 100644 index d033a57..0000000 --- a/src/saluki/utils.py +++ /dev/null @@ -1,112 +0,0 @@ -import datetime -import logging -from argparse import ArgumentTypeError -from typing import List, Tuple -from zoneinfo import ZoneInfo - -from confluent_kafka import Message -from dateutil.parser import ParserError, parse -from streaming_data_types import DESERIALISERS -from streaming_data_types.exceptions import ShortBufferException -from streaming_data_types.utils import get_schema - -logger = logging.getLogger("saluki") - - -def _try_to_deserialise_message(payload: bytes) -> Tuple[str | None, str | None]: - logger.debug(f"got some data: {payload}") - try: - schema = get_schema(payload) - except ShortBufferException: - schema = None - - logger.debug(f"schema: {schema}") - - def fallback_deserialiser(payload: bytes) -> str: - return payload.decode() - - deserialiser = DESERIALISERS.get(schema if schema is not None else "", fallback_deserialiser) - logger.debug(f"Deserialiser: {deserialiser}") - - ret = deserialiser(payload) - - return schema, ret - - -def deserialise_and_print_messages( - msgs: List[Message], partition: int | None, schemas_to_filter_to: list[str] | None = None -) -> None: - for msg in msgs: - try: - if msg is None: - continue - if msg.error(): - logger.error("Consumer error: {}".format(msg.error())) - continue - if partition is not None and msg.partition() != partition: - continue - schema, deserialised = _try_to_deserialise_message(msg.value()) - if schemas_to_filter_to is not None and schema not in schemas_to_filter_to: - continue - time = _parse_timestamp(msg) - logger.info(f"(o:{msg.offset()},t:{time},s:{schema}) {deserialised}") - except Exception as e: - logger.exception(f"Got error while deserialising: {e}") - - -def _parse_timestamp(msg: Message) -> str: - """ - Parse a message timestamp. - - See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Message.timestamp - :param msg: the message to parse. - :return: either the string-formatted timestamp or "Unknown" if not able to parse. - """ - timestamp_type, timestamp_ms_from_epoch = msg.timestamp() - if timestamp_type == 1: # TIMESTAMP_CREATE_TIME - return ( - datetime.datetime.fromtimestamp(timestamp_ms_from_epoch / 1000) - .astimezone(ZoneInfo("UTC")) - .strftime("%Y-%m-%d %H:%M:%S.%f") - ) - else: - # TIMESTAMP_NOT_AVAILABLE or TIMESTAMP_LOG_APPEND_TIME - return "Unknown" - - -def parse_kafka_uri(uri: str) -> Tuple[str, str]: - """Parse Kafka connection URI. - - A broker hostname/ip must be present. - If username is provided, a SASL mechanism must also be provided. - Any other validation must be performed in the calling code. - """ - broker, topic = uri.split("/") if "/" in uri else (uri, None) - if topic is None: - raise RuntimeError( - f"Unable to parse URI {uri}, topic not defined. URI should be of form" - f" broker[:port]/topic" - ) - return ( - broker, - topic, - ) - - -def dateutil_parsable_or_unix_timestamp(inp: str) -> int: - """ - Parse a dateutil string, if this fails then try to parse a unix timestamp. - This returns a unix timestamp as an int - """ - try: - try: - return int(round(parse(inp).timestamp() * 1000)) - except (ParserError, OverflowError): - logger.debug( - f"Failed to parse {inp} as a dateutil parsable. Falling back to unix timestamp" - ) - return int(inp) - except ValueError: - raise ArgumentTypeError( - f"timestamp {inp} is not parsable by dateutil.parse() and is not a unix timestamp" - ) diff --git a/src/sniff.rs b/src/sniff.rs new file mode 100644 index 0000000..f9e1ff6 --- /dev/null +++ b/src/sniff.rs @@ -0,0 +1,46 @@ +use crate::cli_utils::BrokerAndOptionalTopic; +use rdkafka::ClientConfig; +use rdkafka::consumer::{BaseConsumer, Consumer}; +use std::time::Duration; + +pub fn sniff(broker: &BrokerAndOptionalTopic) { + let consumer: BaseConsumer = ClientConfig::new() + .set("bootstrap.servers", broker.broker()) + .create() + .expect("Consumer creation failed"); + + let metadata = consumer + .fetch_metadata(broker.topic.as_deref(), Duration::from_secs(1)) + .expect("Failed to fetch metadata"); + + if broker.topic.is_none() { + println!("Brokers ({}):", metadata.brokers().len()); + for broker in metadata.brokers() { + println!("\t{}: {}:{} ", broker.id(), broker.host(), broker.port()); + } + println!("\nTopics:"); + } + + for topic in metadata.topics() { + println!("\tTopic: {} Error: {:?}", topic.name(), topic.error()); + for partition in topic.partitions() { + println!( + "\t\tPartition: {} Leader: {} Replicas: {:?}(in sync: {:?}) Err: {:?}", + partition.id(), + partition.leader(), + partition.replicas(), + partition.isr(), + partition.error() + ); + let (low, high) = consumer + .fetch_watermarks(topic.name(), partition.id(), Duration::from_secs(1)) + .unwrap_or((-1, -1)); + println!( + "\t\t\tLow watermark: {} High watermark: {} (messages: {})", + low, + high, + high - low + ); + } + } +} diff --git a/tests/test_consume.py b/tests/test_consume.py deleted file mode 100644 index d72c061..0000000 --- a/tests/test_consume.py +++ /dev/null @@ -1,101 +0,0 @@ -from unittest import mock -from unittest.mock import patch - -import pytest -from confluent_kafka import TopicPartition - -from saluki.consume import consume - - -@patch("saluki.consume.Consumer") -def test_go_forwards_with_no_offset_raises(_): - with pytest.raises(ValueError): - consume("broker", "topic", go_forwards=True, offset=None) - - -@patch("saluki.consume.Consumer") -def test_go_forwards_with_offset_assigns_at_offset(mock_consumer): - expected_topic = "topic" - expected_offset = 1234 - expected_partition = 1 - consume( - "broker", - expected_topic, - go_forwards=True, - offset=expected_offset, - partition=expected_partition, - ) - mock_assign = mock_consumer.return_value.assign - - mock_assign.assert_called_with( - [TopicPartition(expected_topic, expected_partition, expected_offset)] - ) - - -@patch("saluki.consume.Consumer") -def test_consume_with_offset_and_num_of_messages_goes_back_offset_minus_messages( - mock_consumer, -): - expected_offset = 1234 - expected_topic = "sometopic" - num_messages = 3 - expected_start_offset = expected_offset - num_messages + 1 - - consume("broker", expected_topic, offset=expected_offset, num_messages=num_messages) - - mock_assign = mock_consumer.return_value.assign - mock_assign.assert_called_once() - - mock_assign_call = mock_assign.call_args.args[0][0] - assert mock_assign_call.topic == expected_topic - assert mock_assign_call.offset == expected_start_offset - - -@patch("saluki.consume.Consumer") -def test_consume_with_no_offset_and_num_of_messages_goes_back_high_watermark_minus_messages( - mock_consumer, -): - expected_topic = "sometopic" - num_messages = 3 - high_watermark_offset = 2345 - expected_start_offset = high_watermark_offset - num_messages - - mock_consumer.return_value.get_watermark_offsets.return_value = ( - None, - high_watermark_offset, - ) - - consume("broker", topic=expected_topic, num_messages=num_messages) - mock_assign = mock_consumer.return_value.assign - mock_assign.assert_called_once() - - mock_assign_call = mock_assign.call_args.args[0][0] - - assert mock_assign_call.topic == expected_topic - assert mock_assign_call.offset == expected_start_offset - - -def test_consume_but_exception_thrown_consumer_is_closed(): - with ( - mock.patch("saluki.consume.Consumer") as c, - ): - c.return_value.consume.side_effect = Exception - consume("somebroker", "sometopic", num_messages=1) - c.return_value.close.assert_called_once() - - -@patch("saluki.consume.Consumer") -def test_consume_with_timestamp(mock_consumer): - expected_topic = "sometopic" - partition = 0 - timestamp = 1234 - offset = 2345 - - mock_consumer.offsets_for_times.return_value = [ - TopicPartition(expected_topic, partition, offset) - ] - consume("somebroker", topic=expected_topic, timestamp=timestamp, partition=partition) - - mock_consumer.return_value.assign.assert_called_with( - [TopicPartition(expected_topic, partition, offset)] - ) diff --git a/tests/test_listen.py b/tests/test_listen.py deleted file mode 100644 index 4ff4c2e..0000000 --- a/tests/test_listen.py +++ /dev/null @@ -1,31 +0,0 @@ -from unittest import mock - -from confluent_kafka import TopicPartition - -from saluki.listen import listen - - -def test_listen_with_partition_assigns_to_partition(): - expected_partition = 123 - topic = "sometopic" - with ( - mock.patch( - "saluki.listen.deserialise_and_print_messages", - side_effect=KeyboardInterrupt, - ), - mock.patch("saluki.listen.Consumer") as c, - ): - listen("somebroker", "sometopic", partition=expected_partition) - c.return_value.assign.assert_called_with([TopicPartition(topic, expected_partition)]) - - -def test_keyboard_interrupt_causes_consumer_to_close(): - with ( - mock.patch( - "saluki.listen.deserialise_and_print_messages", - side_effect=KeyboardInterrupt, - ), - mock.patch("saluki.listen.Consumer") as c, - ): - listen("somebroker", "sometopic") - c.return_value.close.assert_called_once() diff --git a/tests/test_play.py b/tests/test_play.py deleted file mode 100644 index 94b49af..0000000 --- a/tests/test_play.py +++ /dev/null @@ -1,103 +0,0 @@ -from unittest.mock import Mock, patch - -import pytest -from confluent_kafka import Message, TopicPartition - -from saluki.play import play - - -def test_play_with_offsets(): - src_broker = "broker1" - src_topic = "topic1" - dest_broker = "broker2" - dest_topic = "topic2" - offsets = [1, 2] - - message_1 = Mock(spec=Message) - message_1_key = "msg1key" - message_1.key.return_value = message_1_key - message_1_val = "msg1" - message_1.value.return_value = message_1_val - - message_2 = Mock(spec=Message) - message_2_key = "msg2key" - message_2.key.return_value = message_2_key - message_2_val = "msg2" - message_2.value.return_value = message_2_val - - with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: - consumer_obj = c() - consumer_obj.consume.return_value = [message_1, message_2] - - play(src_broker, src_topic, dest_broker, dest_topic, offsets, None) - - assert consumer_obj.assign.call_args.args[0][0].topic == src_topic - assert consumer_obj.assign.call_args.args[0][0].offset == offsets[0] - - consumer_obj.consume.assert_called_with(2) # stop - start + 1 - - p_obj = p() - produce_batch_call = p_obj.produce_batch.call_args.args - assert dest_topic == produce_batch_call[0] - assert {"key": message_1_key, "value": message_1_val} in produce_batch_call[1] - assert {"key": message_2_key, "value": message_2_val} in produce_batch_call[1] - - -def test_play_with_timestamps(): - src_broker = "broker1" - src_topic = "topic1" - dest_broker = "broker2" - dest_topic = "topic2" - timestamps = [1762444369, 1762444375] - - message_1 = Mock(spec=Message) - message_1_key = "msg1key" - message_1.key.return_value = message_1_key - message_1_val = "msg1" - message_1.value.return_value = message_1_val - - message_2 = Mock(spec=Message) - message_2_key = "msg2key" - message_2.key.return_value = message_2_key - message_2_val = "msg2" - message_2.value.return_value = message_2_val - - with patch("saluki.play.Consumer") as c, patch("saluki.play.Producer") as p: - consumer_obj = c() - consumer_obj.offsets_for_times.side_effect = [ - [TopicPartition(src_topic, partition=0, offset=2)], - [TopicPartition(src_topic, partition=0, offset=3)], - ] - consumer_obj.consume.return_value = [message_1, message_2] - - play(src_broker, src_topic, dest_broker, dest_topic, None, timestamps) - - assert consumer_obj.assign.call_args.args[0][0].topic == src_topic - assert consumer_obj.assign.call_args.args[0][0].offset == 2 - - consumer_obj.consume.assert_called_with(2) # stop - start + 1 - - p_obj = p() - produce_batch_call = p_obj.produce_batch.call_args.args - assert dest_topic == produce_batch_call[0] - assert {"key": message_1_key, "value": message_1_val} in produce_batch_call[1] - assert {"key": message_2_key, "value": message_2_val} in produce_batch_call[1] - - -def test_play_with_exception_when_consuming_consumer_still_closed(): - with ( - patch("saluki.play.Consumer") as mock_consumer, - patch("saluki.play.Producer"), - patch("saluki.play.logger") as mock_logger, - ): - mock_consumer().consume.side_effect = Exception("blah") - play("", "", "", "", [1, 2], None) - - mock_logger.exception.assert_called_once() - - mock_consumer().close.assert_called_once() - - -def test_play_raises_when_offsets_and_timestamps_are_none(): - with pytest.raises(ValueError): - play("", "", "", "", None, None) diff --git a/tests/test_sniff.py b/tests/test_sniff.py deleted file mode 100644 index 8c37d87..0000000 --- a/tests/test_sniff.py +++ /dev/null @@ -1,79 +0,0 @@ -from unittest.mock import patch - -import pytest -from confluent_kafka.admin import BrokerMetadata, ClusterMetadata, TopicMetadata - -from saluki.sniff import sniff - - -@pytest.fixture() -def fake_cluster_md(): - """ - Returns a fake cluster metadata object with two topics; - one with 1 partition and the other with 2. - """ - fake_cluster_md = ClusterMetadata() - broker1 = BrokerMetadata() - broker1.id = "id1" # type: ignore - broker1.host = "mybroker" # type: ignore - broker1.port = 9093 - fake_cluster_md.brokers = {0: broker1} - - topic1 = TopicMetadata() - topic1.partitions = {0: {}} - - topic2 = TopicMetadata() - topic2.partitions = {0: {}, 1: {}} - - fake_cluster_md.topics = {"topic1": topic1, "topic2": topic2} - return fake_cluster_md - - -def test_sniff_with_two_partitions_in_a_topic(fake_cluster_md): - with ( - patch("saluki.sniff.AdminClient") as a, - patch("saluki.sniff.Consumer") as c, - patch("saluki.sniff.logger") as logger, - ): - a().list_topics.return_value = fake_cluster_md - c().get_watermark_offsets.return_value = 1, 2 - sniff("whatever") - - brokers_call = logger.info.call_args_list[2] - - assert "mybroker:9093/id1" in brokers_call.args[0] - - topic1_call = logger.info.call_args_list[5] - assert "0 - low:1, high:2, num_messages:1" in topic1_call.args[0] - - topic2_call1 = logger.info.call_args_list[7] - assert "0 - low:1, high:2, num_messages:1" in topic2_call1.args[0] - - topic2_call2 = logger.info.call_args_list[8] - assert "1 - low:1, high:2, num_messages:1" in topic2_call2.args[0] - - -def test_sniff_with_single_topic(fake_cluster_md): - with ( - patch("saluki.sniff.AdminClient") as a, - patch("saluki.sniff.Consumer") as c, - patch("saluki.sniff.logger") as logger, - ): - a().list_topics.return_value = fake_cluster_md - c().get_watermark_offsets.return_value = 1, 2 - sniff("mybroker:9093", "topic1") - - assert "\ttopic1" in logger.info.call_args_list[0].args[0] - assert "\t\t0 - low:1, high:2, num_messages:1" in logger.info.call_args_list[1].args[0] - - -def test_sniff_with_single_nonexistent_topic(): - with ( - patch("saluki.sniff.AdminClient") as a, - patch("saluki.sniff.Consumer"), - patch("saluki.sniff.logger") as logger, - ): - # Deliberately blank cluster metadata ie. no topics - a().list_topics.return_value = ClusterMetadata() - sniff("somebroker:9092", "sometopic") - logger.warning.assert_called_with("Topic sometopic not found on broker somebroker:9092") diff --git a/tests/test_utils.py b/tests/test_utils.py deleted file mode 100644 index 263c280..0000000 --- a/tests/test_utils.py +++ /dev/null @@ -1,202 +0,0 @@ -from argparse import ArgumentTypeError -from unittest.mock import Mock, patch - -import pytest -from confluent_kafka import Message -from streaming_data_types import serialise_f144 -from streaming_data_types.forwarder_config_update_fc00 import ( - ConfigurationUpdate, - StreamInfo, - serialise_fc00, -) - -from saluki.utils import ( - _parse_timestamp, - _try_to_deserialise_message, - dateutil_parsable_or_unix_timestamp, - deserialise_and_print_messages, - parse_kafka_uri, -) - - -@pytest.fixture -def mock_message(): - return Mock(spec=Message) - - -def test_deserialising_message_with_no_message_continues(): - with patch("saluki.utils._try_to_deserialise_message") as mock_deserialise_message: - deserialise_and_print_messages([None], None) - mock_deserialise_message.assert_not_called() - - -def test_deserialising_message_with_error_continues(mock_message): - mock_message.error.return_value = "Some error" - with patch("saluki.utils._try_to_deserialise_message") as mock_deserialise_message: - deserialise_and_print_messages([mock_message], None) - mock_deserialise_message.assert_not_called() - - -def test_deserialising_message_with_wrong_partition_continues(mock_message): - noninteresting_partition = 123 - mock_message.error.return_value = False - mock_message.partition.return_value = noninteresting_partition - with patch("saluki.utils._try_to_deserialise_message") as mock_deserialise_message: - deserialise_and_print_messages([mock_message], 234) - mock_deserialise_message.assert_not_called() - - -def test_deserialising_message_with_correct_partition_calls_deserialise(mock_message): - partition = 123 - mock_message.error.return_value = False - mock_message.partition.return_value = partition - with patch("saluki.utils._try_to_deserialise_message") as mock_deserialise_message: - deserialise_and_print_messages([mock_message], partition) - mock_deserialise_message.assert_called_once() - - -def test_deserialising_empty_message(mock_message): - assert (None, "") == _try_to_deserialise_message(b"") - - -def test_deserialising_message_with_invalid_schema_falls_back_to_raw_bytes_decode(): - assert _try_to_deserialise_message(b"blah") == (None, "blah") - - -def test_deserialising_message_which_raises_does_not_stop_loop(mock_message): - with patch("saluki.utils.logger") as logger: - ok_message = Mock(spec=Message) - ok_message.value.return_value = b"" - ok_message.error.return_value = False - ok_message.timestamp.return_value = 2, 1 - - mock_message.value.side_effect = Exception - mock_message.error.return_value = False - mock_message.timestamp.return_value = 2, 1 - - deserialise_and_print_messages([mock_message, ok_message], None) - assert logger.info.call_count == 1 - - -def test_deserialising_with_schema_list_ignores_messages_with_schema_not_in_list(mock_message): - with patch("saluki.utils.logger") as logger: - ok_message = Mock(spec=Message) - ok_message.value.return_value = serialise_fc00(config_change=1, streams=[]) # type: ignore - ok_message.error.return_value = False - ok_message.timestamp.return_value = 2, 1 - - mock_message.value.return_value = serialise_f144(source_name="test", value=123) - mock_message.error.return_value = False - mock_message.timestamp.return_value = 2, 1 - - deserialise_and_print_messages( - [mock_message, ok_message], None, schemas_to_filter_to=["fc00"] - ) - assert logger.info.call_count == 1 - - -def test_message_that_has_valid_schema_but_empty_payload(): - with pytest.raises(Exception): - # Empty fc00 message - valid schema but not valid payload - _try_to_deserialise_message(b" fc00") - - -def test_schema_that_isnt_in_deserialiser_list(mock_message): - assert _try_to_deserialise_message(b" blah123") == ("blah", " \t blah123") - - -def test_message_that_has_valid_schema_but_invalid_payload(mock_message): - with pytest.raises(Exception): - _try_to_deserialise_message(b" fc0012345") - - -def test_message_that_has_valid_schema_and_valid_payload(mock_message): - assert _try_to_deserialise_message( - b"\x10\x00\x00\x00\x66\x63\x30\x30\x08\x00\x0c\x00\x06\x00\x08\x00\x08\x00\x00\x00\x00\x00\x01\x00\x04\x00\x00\x00\x03\x00\x00\x00\x0c\x00\x00\x00\x2c\x00\x00\x00\x4c\x00\x00\x00\xea\xff\xff\xff\x00\x00\x00\x00\x7c\x00\x00\x00\x6c\x00\x00\x00\x50\x00\x00\x00\x01\x00\x0e\x00\x16\x00\x08\x00\x0c\x00\x10\x00\x14\x00\x04\x00\x0e\x00\x00\x00\x00\x00\x00\x00\x9c\x00\x00\x00\x8c\x00\x00\x00\x70\x00\x00\x00\x01\x00\x0e\x00\x18\x00\x08\x00\x0c\x00\x10\x00\x16\x00\x04\x00\x0e\x00\x00\x00\x00\x00\x00\x00\xbc\x00\x00\x00\xac\x00\x00\x00\x90\x00\x00\x00\x00\x00\x01\x00\x11\x00\x00\x00\x4e\x44\x57\x32\x36\x37\x32\x5f\x73\x61\x6d\x70\x6c\x65\x45\x6e\x76\x00\x00\x00\x04\x00\x00\x00\x66\x31\x34\x34\x00\x00\x00\x00\x1b\x00\x00\x00\x54\x45\x3a\x4e\x44\x57\x32\x36\x37\x32\x3a\x43\x53\x3a\x53\x42\x3a\x4d\x42\x42\x49\x5f\x42\x4c\x4f\x43\x4b\x00\x11\x00\x00\x00\x4e\x44\x57\x32\x36\x37\x32\x5f\x73\x61\x6d\x70\x6c\x65\x45\x6e\x76\x00\x00\x00\x04\x00\x00\x00\x66\x31\x34\x34\x00\x00\x00\x00\x19\x00\x00\x00\x54\x45\x3a\x4e\x44\x57\x32\x36\x37\x32\x3a\x43\x53\x3a\x53\x42\x3a\x42\x49\x5f\x42\x4c\x4f\x43\x4b\x00\x00\x00\x11\x00\x00\x00\x4e\x44\x57\x32\x36\x37\x32\x5f\x73\x61\x6d\x70\x6c\x65\x45\x6e\x76\x00\x00\x00\x04\x00\x00\x00\x66\x31\x34\x34\x00\x00\x00\x00\x1c\x00\x00\x00\x54\x45\x3a\x4e\x44\x57\x32\x36\x37\x32\x3a\x43\x53\x3a\x53\x42\x3a\x46\x4c\x4f\x41\x54\x5f\x42\x4c\x4f\x43\x4b\x00\x00\x00\x00" - ) == ( - "fc00", - ConfigurationUpdate( - config_change=1, - streams=[ - StreamInfo( - channel="TE:NDW2672:CS:SB:MBBI_BLOCK", - schema="f144", - topic="NDW2672_sampleEnv", - protocol=1, - periodic=0, - ), - StreamInfo( - channel="TE:NDW2672:CS:SB:BI_BLOCK", - schema="f144", - topic="NDW2672_sampleEnv", - protocol=1, - periodic=0, - ), - StreamInfo( - channel="TE:NDW2672:CS:SB:FLOAT_BLOCK", - schema="f144", - topic="NDW2672_sampleEnv", - protocol=1, - periodic=0, - ), - ], - ), - ) - - -def test_parse_timestamp_with_valid_timestamp(mock_message): - mock_message.timestamp.return_value = (1, 1753434939336) - assert _parse_timestamp(mock_message) == "2025-07-25 09:15:39.336000" - - -def test_parse_timestamp_with_timestamp_not_available(mock_message): - mock_message.timestamp.return_value = (2, "blah") - assert _parse_timestamp(mock_message) == "Unknown" - - -def test_uri_with_broker_name_and_topic_successfully_split(): - test_broker = "localhost" - test_topic = "some_topic" - test_uri = f"{test_broker}/{test_topic}" - broker, topic = parse_kafka_uri(test_uri) - assert broker == test_broker - assert topic == test_topic - - -def test_uri_with_port_after_broker_is_included_in_broker_output(): - test_broker = "localhost:9092" - test_topic = "some_topic" - test_uri = f"{test_broker}/{test_topic}" - broker, topic = parse_kafka_uri(test_uri) - assert broker == test_broker - assert topic == test_topic - - -def test_uri_with_no_topic(): - test_broker = "some_broker" - with pytest.raises(RuntimeError): - parse_kafka_uri(test_broker) - - -@pytest.mark.parametrize( - "timestamp", ["2025-11-19T15:27:11", "2025-11-19T15:27:11Z", "2025-11-19T15:27:11+00:00"] -) -def test_parses_datetime_properly_with_string(timestamp): - assert dateutil_parsable_or_unix_timestamp(timestamp) == 1763566031000 - - -@pytest.mark.parametrize( - "timestamp", - [ - "1763566031000", - "1763566031", - "1763566031000000", - ], -) -def test_parses_datetime_properly_and_leaves_unix_timestamp_alone(timestamp): - assert dateutil_parsable_or_unix_timestamp(timestamp) == int(timestamp) - - -def test_invalid_timestamp_raises(): - with pytest.raises(ArgumentTypeError): - dateutil_parsable_or_unix_timestamp("invalid")