diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index ac4c523..42e6150 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -2,7 +2,7 @@ ## Project Overview -PolyBus is a **polyglot messaging framework** that enables seamless communication between applications written in different programming languages. It provides unified interfaces across TypeScript, Python, and .NET (with PHP planned) for building interoperable distributed systems and microservices. +PolyBus is a **polyglot messaging framework** that enables seamless communication between applications written in different programming languages. It provides unified interfaces across TypeScript, Python, .NET, and Kotlin (with PHP planned) for building interoperable distributed systems and microservices. ## Core Architecture Principles @@ -111,6 +111,28 @@ class IPolyBus(ABC): - Use `./lint.sh` for code quality checks - Follow `.editorconfig` and `.DotSettings` conventions +### Kotlin (`src/kotlin/`) + +**Technology Stack:** +- Kotlin/JVM 2.1.x +- JDK 17 toolchain +- Gradle Kotlin DSL (`build.gradle.kts`) +- JUnit 5 + Kotlin test + JaCoCo coverage + +**Coding Conventions:** +- Use coroutines (`suspend`) for async workflows +- Keep naming consistent with cross-language parity (`IPolyBus`, `PolyBusBuilder`, `createTransaction`) +- Prefer immutable values and data classes for message models + +**Testing and Coverage:** +- Run tests with `gradle test` +- Generate coverage with `gradle coverage` +- JaCoCo XML report path: `build/reports/jacoco/test/jacocoTestReport.xml` + +**Publishing Notes:** +- Use Gradle publish workflow (`gradle publish`) with repository credentials from CI secrets +- Keep package coordinates stable unless intentionally changing group/artifact naming + ## Cross-Language Consistency ### Naming Conventions @@ -351,6 +373,16 @@ dotnet test # Run tests ./lint.sh # Run linting ``` +### Kotlin +```bash +cd src/kotlin +gradle clean # Clean build outputs +gradle build # Compile and run checks +gradle test # Run tests +gradle coverage # Generate JaCoCo XML/HTML coverage +gradle publish # Publish package (CI credentials required) +``` + ## Resources - **Main README**: `/README.md` diff --git a/.github/workflows/pr-tests.yml b/.github/workflows/pr-tests.yml index ff4d20e..a124a99 100644 --- a/.github/workflows/pr-tests.yml +++ b/.github/workflows/pr-tests.yml @@ -42,6 +42,39 @@ jobs: flags: dotnet name: dotnet-coverage + kotlin-tests: + name: Kotlin Tests + runs-on: ubuntu-latest + defaults: + run: + working-directory: src/kotlin + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '17' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + + - name: Build + run: ./gradlew build + + - name: Run tests with coverage + run: ./gradlew coverage + + - name: Upload Kotlin coverage + uses: codecov/codecov-action@v4 + with: + files: ./src/kotlin/build/reports/jacoco/test/jacocoTestReport.xml + flags: kotlin + name: kotlin-coverage + python-tests: name: Python Tests runs-on: ubuntu-latest @@ -108,7 +141,7 @@ jobs: coverage-summary: name: Coverage Summary runs-on: ubuntu-latest - needs: [dotnet-tests, python-tests, typescript-tests] + needs: [dotnet-tests, python-tests, typescript-tests, kotlin-tests] if: always() steps: @@ -120,3 +153,4 @@ jobs: echo "- ✅ .NET Tests: ${{ needs.dotnet-tests.result }}" >> $GITHUB_STEP_SUMMARY echo "- ✅ Python Tests: ${{ needs.python-tests.result }}" >> $GITHUB_STEP_SUMMARY echo "- ✅ TypeScript Tests: ${{ needs.typescript-tests.result }}" >> $GITHUB_STEP_SUMMARY + echo "- ✅ Kotlin Tests: ${{ needs.kotlin-tests.result }}" >> $GITHUB_STEP_SUMMARY diff --git a/.github/workflows/release-and-publish.yml b/.github/workflows/release-and-publish.yml index d61baac..f9cde58 100644 --- a/.github/workflows/release-and-publish.yml +++ b/.github/workflows/release-and-publish.yml @@ -7,6 +7,10 @@ on: paths: - 'src/**' - '.github/workflows/release-and-publish.yml' + pull_request: + paths: + - 'src/**' + - '.github/workflows/release-and-publish.yml' permissions: contents: write @@ -14,9 +18,18 @@ permissions: id-token: write jobs: + pr-status: + name: PR Status + if: github.event_name == 'pull_request' + runs-on: ubuntu-latest + steps: + - name: Skip release workflow on pull requests + run: echo "Release and publish only runs on pushes to main." + # Determine version using GitVersion determine-version: name: Determine Version + if: github.event_name == 'push' runs-on: ubuntu-latest outputs: version: ${{ steps.gitversion.outputs.semVer }} @@ -67,7 +80,7 @@ jobs: name: Build and Test All Packages runs-on: ubuntu-latest needs: determine-version - if: needs.determine-version.outputs.should_release == 'true' + if: github.event_name == 'push' && needs.determine-version.outputs.should_release == 'true' steps: - name: Checkout code uses: actions/checkout@v4 @@ -117,6 +130,22 @@ jobs: dotnet test --configuration Release dotnet pack --configuration Release --output ./nupkg + # Kotlin build and test + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '17' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + + - name: Build Kotlin package + working-directory: src/kotlin + run: | + ./gradlew clean build + ./gradlew coverage + # Upload build artifacts - name: Upload TypeScript artifact uses: actions/upload-artifact@v4 @@ -136,12 +165,18 @@ jobs: name: dotnet-package path: src/dotnet/PolyBus/nupkg/ + - name: Upload Kotlin artifact + uses: actions/upload-artifact@v4 + with: + name: kotlin-package + path: src/kotlin/build/libs/ + # Publish to npm publish-npm: name: Publish to npm runs-on: ubuntu-latest needs: [determine-version, build-and-test] - if: needs.determine-version.outputs.should_release == 'true' + if: github.event_name == 'push' && needs.determine-version.outputs.should_release == 'true' permissions: id-token: write contents: read @@ -179,7 +214,7 @@ jobs: name: Publish to PyPI runs-on: ubuntu-latest needs: [determine-version, build-and-test] - if: needs.determine-version.outputs.should_release == 'true' + if: github.event_name == 'push' && needs.determine-version.outputs.should_release == 'true' permissions: id-token: write steps: @@ -215,7 +250,7 @@ jobs: name: Publish to NuGet runs-on: ubuntu-latest needs: [determine-version, build-and-test] - if: needs.determine-version.outputs.should_release == 'true' + if: github.event_name == 'push' && needs.determine-version.outputs.should_release == 'true' steps: - name: Checkout code uses: actions/checkout@v4 @@ -243,12 +278,51 @@ jobs: run: | dotnet nuget push "./nupkg/*.nupkg" --api-key ${{ secrets.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate + # Publish Gradle package + publish-gradle: + name: Publish Gradle Package + runs-on: ubuntu-latest + needs: [determine-version, build-and-test] + if: github.event_name == 'push' && needs.determine-version.outputs.should_release == 'true' + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '17' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + + - name: Update package version + working-directory: src/kotlin + run: | + sed -i "s/^version = .*/version = \"${{ needs.determine-version.outputs.version }}\"/" build.gradle.kts + + - name: Build and publish + working-directory: src/kotlin + env: + MAVEN_REPOSITORY_URL: ${{ secrets.MAVEN_REPOSITORY_URL }} + MAVEN_USERNAME: ${{ secrets.MAVEN_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.MAVEN_PASSWORD }} + run: | + chmod +x ./gradlew + ./gradlew clean build + if ./gradlew tasks --all | grep -qE '(^|[[:space:]])publish([[:space:]]|$)'; then + ./gradlew publish -PmavenRepoUrl="$MAVEN_REPOSITORY_URL" -PmavenUsername="$MAVEN_USERNAME" -PmavenPassword="$MAVEN_PASSWORD" + else + echo "Skipping Gradle publish: no 'publish' task is configured in src/kotlin." + fi + # Summary job publish-summary: name: Publish Summary runs-on: ubuntu-latest - needs: [determine-version, publish-pypi, publish-nuget] - if: always() && needs.determine-version.outputs.should_release == 'true' + needs: [determine-version, publish-npm, publish-pypi, publish-nuget, publish-gradle] + if: github.event_name == 'push' && always() && needs.determine-version.outputs.should_release == 'true' steps: - name: Summary run: | @@ -259,6 +333,7 @@ jobs: echo "- 📦 **npm**: ${{ needs.publish-npm.result }}" >> $GITHUB_STEP_SUMMARY echo "- 🐍 **PyPI**: ${{ needs.publish-pypi.result }}" >> $GITHUB_STEP_SUMMARY echo "- ⚡ **NuGet**: ${{ needs.publish-nuget.result }}" >> $GITHUB_STEP_SUMMARY + echo "- 🟠 **Gradle/Maven**: ${{ needs.publish-gradle.result }}" >> $GITHUB_STEP_SUMMARY echo "" >> $GITHUB_STEP_SUMMARY echo "### Installation Commands" >> $GITHUB_STEP_SUMMARY echo "" >> $GITHUB_STEP_SUMMARY @@ -271,4 +346,7 @@ jobs: echo "" >> $GITHUB_STEP_SUMMARY echo "# .NET" >> $GITHUB_STEP_SUMMARY echo "dotnet add package PolyBus --version ${{ needs.determine-version.outputs.version }}" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "# Kotlin (Gradle/Maven)" >> $GITHUB_STEP_SUMMARY + echo "implementation(\"com.cyascott:poly-bus:${{ needs.determine-version.outputs.version }}\")" >> $GITHUB_STEP_SUMMARY echo '```' >> $GITHUB_STEP_SUMMARY diff --git a/.gitignore b/.gitignore index dd3d86c..676d90e 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,9 @@ npm-debug.log* yarn-debug.log* yarn-error.log* +# Gradle +**/.gradle/* + # Environment .env .env.local diff --git a/README.md b/README.md index e8c72fb..db38b14 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ PolyBus provides a unified interface for sending and receiving messages between ## 🌟 Key Features -- **🔄 Multi-Language Support**: Native implementations for TypeScript, Python, and .NET +- **🔄 Multi-Language Support**: Native implementations for TypeScript, Python, .NET, and Kotlin - **🚀 Flexible Transport**: Pluggable transport layer supporting various messaging systems - **⚡ Async/Await**: Modern asynchronous APIs in all language implementations - **🔌 Middleware Pipeline**: Extensible handler chains for incoming and outgoing messages @@ -26,6 +26,7 @@ PolyBus provides a unified interface for sending and receiving messages between | **TypeScript/JavaScript** | Node.js 14+ | ✅ Stable | CommonJS, ESM, UMD | | **Python** | 3.8-3.12 | ✅ Stable | PyPI package | | **.NET** | .NET Standard 2.1 | ✅ Stable | NuGet package | +| **Kotlin** | JDK 17+ | ✅ Stable | Gradle/JVM package | | **PHP** | 8.0+ | 🚧 Planned | - | ## 🚀 Quick Start @@ -56,6 +57,38 @@ await transaction.commit(); await bus.stop(); ``` +### Kotlin + +```kotlin +import kotlinx.coroutines.runBlocking +import polybus.PolyBusBuilder +import polybus.transport.transactions.messages.MessageInfo +import polybus.transport.transactions.messages.MessageType +import polybus.transport.transactions.messages.handlers.serializers.JsonHandlers + +@MessageInfo(MessageType.EVENT, "users", "user-created", 1, 0, 0) +data class UserCreated(val userId: Int) + +fun main() = runBlocking { + val builder = PolyBusBuilder() + builder.name = "my-service" + + val jsonHandlers = JsonHandlers() + builder.incomingPipeline.add(jsonHandlers::deserializer) + builder.outgoingPipeline.add(jsonHandlers::serializer) + builder.messages.add(UserCreated::class.java) + + val bus = builder.build() + bus.start() + + val transaction = bus.createOutgoingTransaction() + transaction.add(UserCreated(userId = 123)) + transaction.commit() + + bus.stop() +} +``` + ### Python ```python @@ -217,6 +250,18 @@ dotnet test See [.NET README](src/dotnet/README.md) for detailed development instructions. +### Kotlin Development + +```bash +cd src/kotlin +gradle clean +gradle build +gradle test +gradle coverage +``` + +See [Kotlin README](src/kotlin/README.md) for detailed development instructions. + ## 🧪 Testing All implementations include comprehensive test suites: @@ -248,6 +293,15 @@ pip install poly-bus dotnet add package PolyBus ``` +### Kotlin + +```bash +cd src/kotlin +gradle build +``` + +For Kotlin package usage and publishing details, see [Kotlin README](src/kotlin/README.md). + ## 🤝 Contributing We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details on: @@ -277,6 +331,6 @@ This project is licensed under the terms specified in the [LICENSE](LICENSE) fil ## ⭐ Project Status -PolyBus is actively maintained and production-ready for TypeScript, Python, and .NET implementations. PHP support is planned for future releases. +PolyBus is actively maintained and production-ready for TypeScript, Python, .NET, and Kotlin implementations. PHP support is planned for future releases. If you find PolyBus useful, please consider giving it a star ⭐ on GitHub! \ No newline at end of file diff --git a/src/kotlin/README.md b/src/kotlin/README.md new file mode 100644 index 0000000..4b379b7 --- /dev/null +++ b/src/kotlin/README.md @@ -0,0 +1,223 @@ +# PolyBus Kotlin + +A Kotlin implementation of the PolyBus messaging library, providing a unified interface for message transport across different messaging systems. + +## Prerequisites + +- [JDK 17](https://adoptium.net/) or later +- [Gradle](https://gradle.org/install/) (if you are not using a Gradle wrapper) +- Any IDE that supports Kotlin/JVM development (IntelliJ IDEA, VS Code, Android Studio) + +## Quick Start + +### Building the Project + +```bash +# Navigate to the kotlin directory +cd src/kotlin + +# Build the project +gradle build +``` + +### Running Tests + +```bash +# Run all tests +gradle test + +# Force all tests to run even if Gradle considers them up-to-date +gradle test --rerun-tasks + +# Run checks (includes tests) +gradle check + +# Run tests with detailed output +gradle test --info +``` + +Note: `gradle test` may finish very quickly when the `test` task is up-to-date. +That means Gradle reused previous test results instead of re-running tests. +Use `gradle test --rerun-tasks` (or `gradle clean test`) when you want a full re-run. + +### Generating Coverage (Codecov XML) + +Codecov accepts JaCoCo XML reports. This project generates that format via Gradle. + +```bash +# Run tests and produce coverage reports (XML + HTML) +gradle coverage + +# Equivalent explicit command +gradle test jacocoTestReport +``` + +Coverage report output files: +- XML (for Codecov): `build/reports/jacoco/test/jacocoTestReport.xml` +- HTML (local viewing): `build/reports/jacoco/test/html/index.html` + +If you only need to regenerate coverage from a fresh run: + +```bash +gradle clean coverage +``` + +### Running Specific Tests + +```bash +# Run one test class +gradle test --tests "polybus.transport.inmemory.InMemoryTransportTests" + +# Run tests matching a name pattern +gradle test --tests "*JsonHandlersTests*" +``` + +## Development Workflow + +### Code Quality and Validation + +This module uses Kotlin and Gradle defaults with JUnit 5 test execution: + +```bash +# Clean build artifacts +gradle clean + +# Compile and run all checks +gradle build + +# Run only verification lifecycle tasks +gradle check + +# Run tests only +gradle test + +# Force test re-run (ignores up-to-date optimization) +gradle test --rerun-tasks + +# Run tests and generate Codecov-compatible XML coverage +gradle coverage +``` + +### IDE Integration + +#### Visual Studio Code +1. Install the Kotlin extension and Java Extension Pack +2. Open the `src/kotlin` folder in VS Code +3. Use the Java/Kotlin test explorer to run and debug tests +4. Use Problems panel output for compile and test failures + +#### IntelliJ IDEA / Android Studio +1. Open the `src/kotlin` folder as a Gradle project +2. Let Gradle import and resolve dependencies +3. Run tests from the gutter or the Gradle tool window + +## Configuration + +### Build Configuration + +The project is configured with Gradle Kotlin DSL (`build.gradle.kts`): + +- **Kotlin Plugin**: `org.jetbrains.kotlin.jvm` 2.1.10 +- **Java Toolchain**: JDK 17 +- **Test Platform**: JUnit 5 (`useJUnitPlatform()`) +- **Project Name**: `poly-bus` + +### Source Layout + +- Main sources: `src/main/kotlin/polybus/` +- Tests: `src/test/kotlin/polybus/` + +Core areas in this module include: +- Bus APIs and builders (`IPolyBus`, `PolyBus`, `PolyBusBuilder`) +- Transport abstractions and in-memory transport +- Transaction and message models +- Handler pipeline and JSON serializers + +## Dependencies + +### Main +- `org.jetbrains.kotlinx:kotlinx-coroutines-core` (1.10.2) +- `com.fasterxml.jackson.core:jackson-databind` (2.18.2) +- `com.fasterxml.jackson.module:jackson-module-kotlin` (2.18.2) + +### Test +- `org.jetbrains.kotlin:kotlin-test` (via Kotlin plugin) +- `org.junit.jupiter:junit-jupiter` (5.11.4) + +## Common Commands + +```bash +# Clean all generated files +gradle clean + +# Compile main source +gradle compileKotlin + +# Compile tests +gradle compileTestKotlin + +# Run all tests +gradle test + +# Run tests and generate JaCoCo XML/HTML coverage +gradle coverage + +# Generate JaCoCo report explicitly +gradle jacocoTestReport + +# Build artifact and run checks +gradle build + +# Show available tasks +gradle tasks + +# Inspect dependency tree +gradle dependencies +``` + +## Troubleshooting + +### Build Issues + +1. **Wrong Java version**: verify JDK 17+ + ```bash + java -version + ``` + +2. **Gradle cache/dependency problems**: refresh dependencies + ```bash + gradle --refresh-dependencies + ``` + +3. **Stale build state**: clean and rebuild + ```bash + gradle clean build + ``` + +### Test Issues + +1. **Tests not discovered**: ensure test classes are in `src/test/kotlin` and named with `*Test`/`*Tests` +2. **JUnit 5 mismatch**: ensure `useJUnitPlatform()` remains enabled in `build.gradle.kts` + +### Coverage Issues + +1. **No XML report found**: run `gradle clean coverage`, then check `build/reports/jacoco/test/jacocoTestReport.xml` +2. **Stale coverage report**: use `gradle clean coverage` to force a fresh test + report generation + +## Contributing + +1. Follow the established Kotlin coding style and existing module patterns +2. Run `gradle test` before committing +3. Run `gradle build` to validate compilation and checks +4. Add tests for new functionality +5. Update documentation when behavior or APIs change + +## Additional Resources + +- [Kotlin Documentation](https://kotlinlang.org/docs/home.html) +- [Gradle User Manual](https://docs.gradle.org/current/userguide/userguide.html) +- [JUnit 5 Documentation](https://junit.org/junit5/docs/current/user-guide/) + +## License + +See the main project LICENSE file for licensing information. diff --git a/src/kotlin/build.gradle.kts b/src/kotlin/build.gradle.kts new file mode 100644 index 0000000..55dde03 --- /dev/null +++ b/src/kotlin/build.gradle.kts @@ -0,0 +1,76 @@ +plugins { + kotlin("jvm") version "2.1.10" + jacoco + `maven-publish` +} + +group = "com.cyascott" +version = "1.0.0" + +repositories { + mavenCentral() +} + +dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2") + implementation("com.fasterxml.jackson.core:jackson-databind:2.18.2") + implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.18.2") + + testImplementation(kotlin("test")) + testImplementation("org.junit.jupiter:junit-jupiter:5.11.4") +} + +tasks.test { + useJUnitPlatform() + finalizedBy(tasks.jacocoTestReport) +} + +tasks.jacocoTestReport { + dependsOn(tasks.test) + + reports { + xml.required.set(true) + html.required.set(true) + csv.required.set(false) + } +} + +tasks.register("coverage") { + group = "verification" + description = "Runs tests and generates JaCoCo XML/HTML coverage reports." + dependsOn(tasks.jacocoTestReport) +} + +kotlin { + jvmToolchain(17) +} + +publishing { + publications { + create("mavenJava") { + from(components["java"]) + groupId = project.group.toString() + artifactId = "poly-bus" + version = project.version.toString() + } + } + + repositories { + maven { + val repoUrl = (findProperty("mavenRepoUrl") as String?) + ?: System.getenv("MAVEN_REPOSITORY_URL") + ?: "" + + if (repoUrl.isNotBlank()) { + url = uri(repoUrl) + } + + credentials { + username = (findProperty("mavenUsername") as String?) + ?: System.getenv("MAVEN_USERNAME") + password = (findProperty("mavenPassword") as String?) + ?: System.getenv("MAVEN_PASSWORD") + } + } + } +} diff --git a/src/kotlin/gradle/wrapper/gradle-wrapper.jar b/src/kotlin/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..b1b8ef5 Binary files /dev/null and b/src/kotlin/gradle/wrapper/gradle-wrapper.jar differ diff --git a/src/kotlin/gradle/wrapper/gradle-wrapper.properties b/src/kotlin/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..df6a6ad --- /dev/null +++ b/src/kotlin/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,9 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-9.5.1-bin.zip +networkTimeout=10000 +retries=0 +retryBackOffMs=500 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/src/kotlin/gradlew b/src/kotlin/gradlew new file mode 100755 index 0000000..b9bb139 --- /dev/null +++ b/src/kotlin/gradlew @@ -0,0 +1,248 @@ +#!/bin/sh + +# +# Copyright © 2015 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/3d91ce3b8caaf77ad09f381f43615b715b53f72c/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC2039,SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/src/kotlin/gradlew.bat b/src/kotlin/gradlew.bat new file mode 100644 index 0000000..24c62d5 --- /dev/null +++ b/src/kotlin/gradlew.bat @@ -0,0 +1,82 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables, and ensure extensions are enabled +setlocal EnableExtensions + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +"%COMSPEC%" /c exit 1 + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 + +"%COMSPEC%" /c exit 1 + +:execute +@rem Setup the command line + + + +@rem Execute Gradle +@rem endlocal doesn't take effect until after the line is parsed and variables are expanded +@rem which allows us to clear the local environment before executing the java command +endlocal & "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* & call :exitWithErrorLevel + +:exitWithErrorLevel +@rem Use "%COMSPEC%" /c exit to allow operators to work properly in scripts +"%COMSPEC%" /c exit %ERRORLEVEL% diff --git a/src/kotlin/settings.gradle.kts b/src/kotlin/settings.gradle.kts new file mode 100644 index 0000000..7c0e8e4 --- /dev/null +++ b/src/kotlin/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name = "poly-bus" diff --git a/src/kotlin/src/main/kotlin/polybus/Headers.kt b/src/kotlin/src/main/kotlin/polybus/Headers.kt new file mode 100644 index 0000000..9764bc4 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/Headers.kt @@ -0,0 +1,8 @@ +package polybus + +object Headers { + const val CORRELATION_ID = "correlation-id" + const val CONTENT_TYPE = "content-type" + const val MESSAGE_TYPE = "x-type" + const val REQUEST_ID = "request-id" +} diff --git a/src/kotlin/src/main/kotlin/polybus/IPolyBus.kt b/src/kotlin/src/main/kotlin/polybus/IPolyBus.kt new file mode 100644 index 0000000..47702e8 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/IPolyBus.kt @@ -0,0 +1,26 @@ +package polybus + +import polybus.transport.ITransport +import polybus.transport.transactions.IncomingTransaction +import polybus.transport.transactions.OutgoingTransaction +import polybus.transport.transactions.Transaction +import polybus.transport.transactions.messages.IncomingMessage +import polybus.transport.transactions.messages.Messages +import polybus.transport.transactions.messages.handlers.IncomingHandler +import polybus.transport.transactions.messages.handlers.OutgoingHandler + +interface IPolyBus { + val properties: MutableMap + var transport: ITransport + val incomingPipeline: MutableList + val outgoingPipeline: MutableList + val messages: Messages + + suspend fun createIncomingTransaction(message: IncomingMessage): IncomingTransaction + suspend fun createOutgoingTransaction(): OutgoingTransaction + suspend fun send(transaction: Transaction) + suspend fun start() + suspend fun stop() + + val name: String +} diff --git a/src/kotlin/src/main/kotlin/polybus/PolyBus.kt b/src/kotlin/src/main/kotlin/polybus/PolyBus.kt new file mode 100644 index 0000000..a0cb7a4 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/PolyBus.kt @@ -0,0 +1,67 @@ +package polybus + +import polybus.transport.ITransport +import polybus.transport.transactions.IncomingTransaction +import polybus.transport.transactions.OutgoingTransaction +import polybus.transport.transactions.Transaction +import polybus.transport.transactions.messages.IncomingMessage +import polybus.transport.transactions.messages.Messages +import polybus.transport.transactions.messages.handlers.IncomingHandler +import polybus.transport.transactions.messages.handlers.OutgoingHandler + +class PolyBus(private val _builder: PolyBusBuilder) : IPolyBus { + override val properties: MutableMap + get() = _builder.properties + + override lateinit var transport: ITransport + + override val incomingPipeline: MutableList = _builder.incomingPipeline + override val outgoingPipeline: MutableList = _builder.outgoingPipeline + override val messages: Messages = _builder.messages + + override suspend fun createIncomingTransaction(message: IncomingMessage): IncomingTransaction = + _builder.incomingTransactionFactory(_builder, this, message) + + override suspend fun createOutgoingTransaction(): OutgoingTransaction = + _builder.outgoingTransactionFactory(_builder, this) + + override suspend fun send(transaction: Transaction) { + var step: suspend () -> Unit = { transport.handle(transaction) } + + when (transaction) { + is IncomingTransaction -> { + for (index in transaction.bus.incomingPipeline.indices.reversed()) { + val handler = transaction.bus.incomingPipeline[index] + val next = step + step = { handler(transaction, next) } + } + } + + is OutgoingTransaction -> { + for (index in transaction.bus.outgoingPipeline.indices.reversed()) { + val handler = transaction.bus.outgoingPipeline[index] + val next = step + step = { handler(transaction, next) } + } + } + } + + try { + step() + } catch (error: Throwable) { + transaction.abort() + throw error + } + } + + override suspend fun start() { + transport.start() + } + + override suspend fun stop() { + transport.stop() + } + + override val name: String + get() = _builder.name +} diff --git a/src/kotlin/src/main/kotlin/polybus/PolyBusBuilder.kt b/src/kotlin/src/main/kotlin/polybus/PolyBusBuilder.kt new file mode 100644 index 0000000..f7e2ed3 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/PolyBusBuilder.kt @@ -0,0 +1,38 @@ +package polybus + +import polybus.transport.TransportFactory +import polybus.transport.inmemory.InMemoryMessageBroker +import polybus.transport.transactions.IncomingTransaction +import polybus.transport.transactions.IncomingTransactionFactory +import polybus.transport.transactions.OutgoingTransaction +import polybus.transport.transactions.OutgoingTransactionFactory +import polybus.transport.transactions.messages.Messages +import polybus.transport.transactions.messages.handlers.IncomingHandler +import polybus.transport.transactions.messages.handlers.OutgoingHandler + +open class PolyBusBuilder { + var incomingTransactionFactory: IncomingTransactionFactory = { _, bus, message -> + IncomingTransaction(bus, message) + } + + var outgoingTransactionFactory: OutgoingTransactionFactory = { _, bus -> + OutgoingTransaction(bus) + } + + var transportFactory: TransportFactory = { builder, bus -> + val transport = InMemoryMessageBroker() + transport.addEndpoint(builder, bus) + } + + val properties: MutableMap = mutableMapOf() + val incomingPipeline: MutableList = mutableListOf() + val outgoingPipeline: MutableList = mutableListOf() + val messages: Messages = Messages() + var name: String = "polybus" + + open suspend fun build(): IPolyBus { + val bus = PolyBus(this) + bus.transport = transportFactory(this, bus) + return bus + } +} diff --git a/src/kotlin/src/main/kotlin/polybus/PolyBusError.kt b/src/kotlin/src/main/kotlin/polybus/PolyBusError.kt new file mode 100644 index 0000000..0301f99 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/PolyBusError.kt @@ -0,0 +1,3 @@ +package polybus + +open class PolyBusError(val errorCode: Int, message: String) : RuntimeException(message) diff --git a/src/kotlin/src/main/kotlin/polybus/transport/ITransport.kt b/src/kotlin/src/main/kotlin/polybus/transport/ITransport.kt new file mode 100644 index 0000000..11d98a6 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/ITransport.kt @@ -0,0 +1,15 @@ +package polybus.transport + +import polybus.transport.transactions.Transaction +import polybus.transport.transactions.messages.MessageInfoValue + +interface ITransport { + val deadLetterEndpoint: String + suspend fun handle(transaction: Transaction) + val supportsDelayedCommands: Boolean + val supportsCommandMessages: Boolean + suspend fun subscribe(messageInfo: MessageInfoValue) + val supportsSubscriptions: Boolean + suspend fun start() + suspend fun stop() +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/PolyBusNotStartedError.kt b/src/kotlin/src/main/kotlin/polybus/transport/PolyBusNotStartedError.kt new file mode 100644 index 0000000..7f3b33d --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/PolyBusNotStartedError.kt @@ -0,0 +1,8 @@ +package polybus.transport + +import polybus.PolyBusError + +class PolyBusNotStartedError : PolyBusError( + 1, + "PolyBus has not been started. Please call IPolyBus.start() before using the bus." +) diff --git a/src/kotlin/src/main/kotlin/polybus/transport/TransportFactory.kt b/src/kotlin/src/main/kotlin/polybus/transport/TransportFactory.kt new file mode 100644 index 0000000..9d21b19 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/TransportFactory.kt @@ -0,0 +1,6 @@ +package polybus.transport + +import polybus.IPolyBus +import polybus.PolyBusBuilder + +typealias TransportFactory = suspend (builder: PolyBusBuilder, bus: IPolyBus) -> ITransport diff --git a/src/kotlin/src/main/kotlin/polybus/transport/inmemory/InMemoryEndpoint.kt b/src/kotlin/src/main/kotlin/polybus/transport/inmemory/InMemoryEndpoint.kt new file mode 100644 index 0000000..f52d468 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/inmemory/InMemoryEndpoint.kt @@ -0,0 +1,68 @@ +package polybus.transport.inmemory + +import polybus.IPolyBus +import polybus.transport.ITransport +import polybus.transport.PolyBusNotStartedError +import polybus.transport.transactions.Transaction +import polybus.transport.transactions.messages.IncomingMessage +import polybus.transport.transactions.messages.MessageInfoValue +import java.util.concurrent.ConcurrentHashMap + +open class InMemoryEndpoint( + private val _broker: InMemoryMessageBroker, + private val _bus: IPolyBus +) : ITransport { + val bus: IPolyBus + get() = _bus + + var deadLetterHandler: ((IncomingMessage) -> Unit)? = null + + var active: Boolean = false + private set + + override val deadLetterEndpoint: String + get() = "${_bus.name}.dead.letters" + + open suspend fun handleMessage(message: IncomingMessage, isDeadLetter: Boolean) { + if (!active) { + return + } + if (isDeadLetter) { + deadLetterHandler?.invoke(message) + return + } + val transaction = _bus.createIncomingTransaction(message) + _bus.send(transaction) + } + + override suspend fun handle(transaction: Transaction) { + if (!active) { + throw PolyBusNotStartedError() + } + _broker.send(transaction) + } + + override val supportsDelayedCommands: Boolean = true + override val supportsCommandMessages: Boolean = true + override val supportsSubscriptions: Boolean = true + + private val _subscriptions = ConcurrentHashMap() + + override suspend fun subscribe(messageInfo: MessageInfoValue) { + if (!active) { + throw PolyBusNotStartedError() + } + _subscriptions[messageInfo.toHeaderString(false)] = true + } + + fun isSubscribed(messageInfo: MessageInfoValue): Boolean = + _subscriptions.containsKey(messageInfo.toHeaderString(false)) + + override suspend fun start() { + active = true + } + + override suspend fun stop() { + active = false + } +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/inmemory/InMemoryMessageBroker.kt b/src/kotlin/src/main/kotlin/polybus/transport/inmemory/InMemoryMessageBroker.kt new file mode 100644 index 0000000..f337d4b --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/inmemory/InMemoryMessageBroker.kt @@ -0,0 +1,111 @@ +package polybus.transport.inmemory + +import polybus.IPolyBus +import polybus.PolyBusBuilder +import polybus.transport.ITransport +import polybus.transport.transactions.Transaction +import polybus.transport.transactions.messages.IncomingMessage +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import java.time.Duration +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import java.util.logging.Level +import java.util.logging.Logger + +open class InMemoryMessageBroker { + val endpoints: ConcurrentHashMap = ConcurrentHashMap() + var log: Logger = Logger.getLogger(InMemoryMessageBroker::class.java.name) + + private val _scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + private val _activeSends = AtomicInteger(0) + private val _jobs: ConcurrentHashMap.KeySetView = ConcurrentHashMap.newKeySet() + + open suspend fun addEndpoint(builder: PolyBusBuilder, bus: IPolyBus): ITransport { + val endpoint = InMemoryEndpoint(this, bus) + endpoints[bus.name] = endpoint + return endpoint + } + + open fun send(transaction: Transaction) { + if (transaction.outgoingMessages.isEmpty()) { + return + } + + val job = _scope.launch { + _activeSends.incrementAndGet() + try { + val now = Instant.now() + transaction.outgoingMessages.forEach { message -> + endpoints.values.forEach { endpoint -> + val isDeadLetter = endpoint.deadLetterEndpoint == message.endpoint + val shouldSend = isDeadLetter || + endpoint.bus.name == message.endpoint || + (message.endpoint == null && + (message.messageInfo.endpoint == endpoint.bus.name || + endpoint.isSubscribed(message.messageInfo))) + + if (!shouldSend) { + return@forEach + } + + val incomingMessage = IncomingMessage(endpoint.bus, message.body, message.messageInfo) + incomingMessage.headers = message.headers.toMutableMap() + + val deliverAt = message.deliverAt + if (deliverAt != null) { + val wait = Duration.between(now, deliverAt) + if (!wait.isNegative && !wait.isZero) { + delayedSend(endpoint, incomingMessage, wait, isDeadLetter) + return@forEach + } + } + + endpoint.handleMessage(incomingMessage, isDeadLetter) + } + } + } catch (error: Throwable) { + log.log(Level.SEVERE, error.message, error) + } finally { + _activeSends.decrementAndGet() + } + } + + _jobs.add(job) + job.invokeOnCompletion { _jobs.remove(job) } + } + + private fun delayedSend( + endpoint: InMemoryEndpoint, + message: IncomingMessage, + delayBy: Duration, + isDeadLetter: Boolean + ) { + val job = _scope.launch { + try { + delay(delayBy.toMillis()) + endpoint.handleMessage(message, isDeadLetter) + } catch (_: CancellationException) { + } catch (error: Throwable) { + log.log(Level.SEVERE, error.message, error) + } + } + + _jobs.add(job) + job.invokeOnCompletion { _jobs.remove(job) } + } + + open suspend fun stop() { + endpoints.values.forEach { it.stop() } + _jobs.forEach { it.cancel() } + _jobs.toList().forEach { it.join() } + while (_activeSends.get() > 0) { + delay(10) + } + } +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/IncomingTransaction.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/IncomingTransaction.kt new file mode 100644 index 0000000..9e59b45 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/IncomingTransaction.kt @@ -0,0 +1,8 @@ +package polybus.transport.transactions + +import polybus.IPolyBus +import polybus.transport.transactions.messages.IncomingMessage + +open class IncomingTransaction(bus: IPolyBus, incomingMessage: IncomingMessage) : Transaction(bus) { + open var incomingMessage: IncomingMessage = incomingMessage +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/IncomingTransactionFactory.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/IncomingTransactionFactory.kt new file mode 100644 index 0000000..76e3fab --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/IncomingTransactionFactory.kt @@ -0,0 +1,11 @@ +package polybus.transport.transactions + +import polybus.IPolyBus +import polybus.PolyBusBuilder +import polybus.transport.transactions.messages.IncomingMessage + +typealias IncomingTransactionFactory = suspend ( + builder: PolyBusBuilder, + bus: IPolyBus, + message: IncomingMessage +) -> IncomingTransaction diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/OutgoingTransaction.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/OutgoingTransaction.kt new file mode 100644 index 0000000..1119a6d --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/OutgoingTransaction.kt @@ -0,0 +1,5 @@ +package polybus.transport.transactions + +import polybus.IPolyBus + +open class OutgoingTransaction(bus: IPolyBus) : Transaction(bus) diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/OutgoingTransactionFactory.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/OutgoingTransactionFactory.kt new file mode 100644 index 0000000..28f0301 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/OutgoingTransactionFactory.kt @@ -0,0 +1,6 @@ +package polybus.transport.transactions + +import polybus.IPolyBus +import polybus.PolyBusBuilder + +typealias OutgoingTransactionFactory = suspend (builder: PolyBusBuilder, bus: IPolyBus) -> OutgoingTransaction diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/Transaction.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/Transaction.kt new file mode 100644 index 0000000..d0ce100 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/Transaction.kt @@ -0,0 +1,25 @@ +package polybus.transport.transactions + +import polybus.IPolyBus +import polybus.transport.transactions.messages.OutgoingMessage + +open class Transaction(private val _bus: IPolyBus) { + val bus: IPolyBus + get() = _bus + + open val state: MutableMap = mutableMapOf() + open val outgoingMessages: MutableList = mutableListOf() + + open fun add(message: Any, endpoint: String? = null): OutgoingMessage { + val outgoingMessage = OutgoingMessage(_bus, message, endpoint) + outgoingMessages.add(outgoingMessage) + return outgoingMessage + } + + open suspend fun abort() { + } + + open suspend fun commit() { + bus.send(this) + } +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/IncomingMessage.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/IncomingMessage.kt new file mode 100644 index 0000000..a8101b0 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/IncomingMessage.kt @@ -0,0 +1,14 @@ +package polybus.transport.transactions.messages + +import polybus.IPolyBus + +open class IncomingMessage( + bus: IPolyBus, + body: String, + messageInfo: MessageInfoValue +) : Message(bus) { + open var messageInfo: MessageInfoValue = messageInfo + open var messageType: Class<*> = bus.messages.getTypeByMessageInfo(messageInfo) + open var body: String = body + open var message: Any = body +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/Message.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/Message.kt new file mode 100644 index 0000000..a70f99f --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/Message.kt @@ -0,0 +1,11 @@ +package polybus.transport.transactions.messages + +import polybus.IPolyBus + +open class Message(private val _bus: IPolyBus) { + open val state: MutableMap = mutableMapOf() + open var headers: MutableMap = mutableMapOf() + + open val bus: IPolyBus + get() = _bus +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/MessageInfo.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/MessageInfo.kt new file mode 100644 index 0000000..5dd52b1 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/MessageInfo.kt @@ -0,0 +1,79 @@ +package polybus.transport.transactions.messages + +@Target(AnnotationTarget.CLASS) +@Retention(AnnotationRetention.RUNTIME) +annotation class MessageInfo( + val type: MessageType, + val endpoint: String, + val name: String, + val major: Int, + val minor: Int, + val patch: Int +) + +class MessageInfoValue( + val type: MessageType, + val endpoint: String, + val name: String, + val major: Int, + val minor: Int, + val patch: Int +) { + fun equalsInfo(other: MessageInfoValue?): Boolean { + return other != null + && type == other.type + && endpoint == other.endpoint + && name == other.name + && major == other.major + } + + override fun equals(other: Any?): Boolean = other is MessageInfoValue && equalsInfo(other) + + override fun hashCode(): Int { + var result = type.hashCode() + result = 31 * result + endpoint.hashCode() + result = 31 * result + name.hashCode() + result = 31 * result + major + result = 31 * result + minor + result = 31 * result + patch + return result + } + + fun toHeaderString(includeVersion: Boolean): String { + val base = "endpoint=$endpoint, type=$type, name=$name" + return if (includeVersion) "$base, version=$major.$minor.$patch" else base + } + + override fun toString(): String = toHeaderString(true) + + companion object { + private val _headerPattern = Regex( + "^endpoint\\s*=\\s*(?[^,\\s]+),\\s*type\\s*=\\s*(?[^,\\s]+),\\s*name\\s*=\\s*(?[^,\\s]+),\\s*version\\s*=\\s*(?\\d+)\\.(?\\d+)\\.(?\\d+)", + setOf(RegexOption.IGNORE_CASE) + ) + + fun getAttributeFromHeader(header: String): MessageInfoValue? { + val match = _headerPattern.find(header) ?: return null + val endpoint = match.groups["endpoint"]?.value ?: return null + val name = match.groups["name"]?.value ?: return null + val typeText = match.groups["type"]?.value ?: return null + val type = MessageType.entries.firstOrNull { it.name.equals(typeText, ignoreCase = true) } + ?: throw IllegalArgumentException("Requested value '$typeText' was not found.") + val major = match.groups["major"]?.value?.toIntOrNull() ?: return null + val minor = match.groups["minor"]?.value?.toIntOrNull() ?: return null + val patch = match.groups["patch"]?.value?.toIntOrNull() ?: return null + return MessageInfoValue(type, endpoint, name, major, minor, patch) + } + + fun fromAnnotation(annotation: MessageInfo): MessageInfoValue { + return MessageInfoValue( + type = annotation.type, + endpoint = annotation.endpoint, + name = annotation.name, + major = annotation.major, + minor = annotation.minor, + patch = annotation.patch + ) + } + } +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/MessageType.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/MessageType.kt new file mode 100644 index 0000000..51d912d --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/MessageType.kt @@ -0,0 +1,6 @@ +package polybus.transport.transactions.messages + +enum class MessageType { + COMMAND, + EVENT +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/Messages.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/Messages.kt new file mode 100644 index 0000000..d41f608 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/Messages.kt @@ -0,0 +1,36 @@ +package polybus.transport.transactions.messages + +class Messages { + protected val types: MutableMap, Pair> = mutableMapOf() + + open fun getMessageInfo(type: Class<*>): MessageInfoValue { + return types[type]?.first ?: throw PolyBusMessageNotFoundError() + } + + open fun getHeaderByMessageInfo(messageInfo: MessageInfoValue): String { + val contains = types.values.any { it.first.equalsInfo(messageInfo) } + if (!contains) { + throw PolyBusMessageNotFoundError() + } + return messageInfo.toHeaderString(true) + } + + open fun add(messageType: Class<*>): MessageInfoValue { + val annotation = messageType.getAnnotation(MessageInfo::class.java) + ?: throw PolyBusMessageNotFoundError() + val attribute = MessageInfoValue.fromAnnotation(annotation) + val header = attribute.toHeaderString(true) + + if (types.containsKey(messageType)) { + throw PolyBusMessageNotFoundError() + } + + types[messageType] = attribute to header + return attribute + } + + open fun getTypeByMessageInfo(messageInfo: MessageInfoValue): Class<*> { + return types.entries.firstOrNull { it.value.first.equalsInfo(messageInfo) }?.key + ?: throw PolyBusMessageNotFoundError() + } +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/OutgoingMessage.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/OutgoingMessage.kt new file mode 100644 index 0000000..c7e7e03 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/OutgoingMessage.kt @@ -0,0 +1,18 @@ +package polybus.transport.transactions.messages + +import polybus.IPolyBus +import java.time.Instant + +open class OutgoingMessage( + bus: IPolyBus, + message: Any, + endpoint: String? = null, + messageInfo: MessageInfoValue? = null +) : Message(bus) { + open var deliverAt: Instant? = null + open var messageInfo: MessageInfoValue = messageInfo ?: bus.messages.getMessageInfo(message.javaClass) + open var messageType: Class<*> = message.javaClass + open var endpoint: String? = endpoint + open var body: String = "" + open var message: Any = message +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/PolyBusMessageNotFoundError.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/PolyBusMessageNotFoundError.kt new file mode 100644 index 0000000..a291f6f --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/PolyBusMessageNotFoundError.kt @@ -0,0 +1,8 @@ +package polybus.transport.transactions.messages + +import polybus.PolyBusError + +class PolyBusMessageNotFoundError : PolyBusError( + 2, + "The requested type, annotation, or header was not found." +) diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/IncomingHandler.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/IncomingHandler.kt new file mode 100644 index 0000000..a4c0787 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/IncomingHandler.kt @@ -0,0 +1,5 @@ +package polybus.transport.transactions.messages.handlers + +import polybus.transport.transactions.IncomingTransaction + +typealias IncomingHandler = suspend (transaction: IncomingTransaction, next: suspend () -> Unit) -> Unit diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/OutgoingHandler.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/OutgoingHandler.kt new file mode 100644 index 0000000..ff067fc --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/OutgoingHandler.kt @@ -0,0 +1,5 @@ +package polybus.transport.transactions.messages.handlers + +import polybus.transport.transactions.OutgoingTransaction + +typealias OutgoingHandler = suspend (transaction: OutgoingTransaction, next: suspend () -> Unit) -> Unit diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandler.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandler.kt new file mode 100644 index 0000000..e75d9ef --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandler.kt @@ -0,0 +1,73 @@ +package polybus.transport.transactions.messages.handlers.error + +import polybus.transport.transactions.IncomingTransaction +import polybus.transport.transactions.messages.OutgoingMessage +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.logging.Level +import java.util.logging.Logger + +open class ErrorHandler { + open var log: Logger = Logger.getLogger(ErrorHandler::class.java.name) + open var delayIncrement: Int = 30 + open var delayedRetryCount: Int = 3 + open var immediateRetryCount: Int = 3 + open var errorMessageHeader: String = "x-error-message" + open var errorStackTraceHeader: String = "x-error-stack-trace" + open var retryCountHeader: String = "x-retry-count" + + open suspend fun retrier(transaction: IncomingTransaction, next: suspend () -> Unit) { + var delayedAttempt = transaction.incomingMessage.headers[retryCountHeader]?.toIntOrNull() ?: 0 + val delayedRetries = maxOf(1, delayedRetryCount) + val immediateRetries = maxOf(1, immediateRetryCount) + + for (immediateAttempt in 0 until immediateRetries) { + try { + next() + break + } catch (error: Throwable) { + log.log( + Level.SEVERE, + "Error processing message ${transaction.incomingMessage.messageInfo} (immediate attempts: $immediateAttempt, delayed attempts: $delayedAttempt): ${error.message}", + error + ) + + transaction.outgoingMessages.clear() + + if (immediateAttempt < immediateRetries - 1) { + continue + } + + if (transaction.incomingMessage.bus.transport.supportsDelayedCommands && delayedAttempt < delayedRetries) { + delayedAttempt += 1 + val delayedMessage = OutgoingMessage( + transaction.bus, + transaction.incomingMessage.message, + transaction.bus.name, + transaction.incomingMessage.messageInfo + ) + delayedMessage.deliverAt = getNextRetryTime(delayedAttempt) + delayedMessage.headers = transaction.incomingMessage.headers.toMutableMap() + delayedMessage.headers[retryCountHeader] = delayedAttempt.toString() + transaction.outgoingMessages.add(delayedMessage) + continue + } + + val deadLetterMessage = OutgoingMessage( + transaction.bus, + transaction.incomingMessage.message, + transaction.bus.transport.deadLetterEndpoint, + transaction.incomingMessage.messageInfo + ) + deadLetterMessage.headers = transaction.incomingMessage.headers.toMutableMap() + deadLetterMessage.headers[errorMessageHeader] = error.message.orEmpty() + deadLetterMessage.headers[errorStackTraceHeader] = + if (error.stackTrace.isEmpty()) "" else error.stackTraceToString() + transaction.outgoingMessages.add(deadLetterMessage) + } + } + } + + open fun getNextRetryTime(attempt: Int): Instant = + Instant.now().plus((attempt * delayIncrement).toLong(), ChronoUnit.SECONDS) +} diff --git a/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlers.kt b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlers.kt new file mode 100644 index 0000000..f3bd202 --- /dev/null +++ b/src/kotlin/src/main/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlers.kt @@ -0,0 +1,27 @@ +package polybus.transport.transactions.messages.handlers.serializers + +import polybus.Headers +import polybus.transport.transactions.IncomingTransaction +import polybus.transport.transactions.OutgoingTransaction +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.registerKotlinModule + +open class JsonHandlers { + open var objectMapper: ObjectMapper = ObjectMapper().registerKotlinModule() + open var contentType: String = "application/json" + open var header: String = Headers.CONTENT_TYPE + + open suspend fun deserializer(transaction: IncomingTransaction, next: suspend () -> Unit) { + val incomingMessage = transaction.incomingMessage + incomingMessage.message = objectMapper.readValue(incomingMessage.body, incomingMessage.messageType) + next() + } + + open suspend fun serializer(transaction: OutgoingTransaction, next: suspend () -> Unit) { + transaction.outgoingMessages.forEach { message -> + message.body = objectMapper.writeValueAsString(message.message) + message.headers[header] = contentType + } + next() + } +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/inmemory/AlphaCommand.kt b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/AlphaCommand.kt new file mode 100644 index 0000000..bad0732 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/AlphaCommand.kt @@ -0,0 +1,9 @@ +package polybus.transport.inmemory + +import polybus.transport.transactions.messages.MessageInfo +import polybus.transport.transactions.messages.MessageType + +@MessageInfo(MessageType.COMMAND, "alpha", "alpha-command", 1, 0, 0) +class AlphaCommand { + lateinit var name: String +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/inmemory/AlphaEvent.kt b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/AlphaEvent.kt new file mode 100644 index 0000000..7c9dc97 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/AlphaEvent.kt @@ -0,0 +1,9 @@ +package polybus.transport.inmemory + +import polybus.transport.transactions.messages.MessageInfo +import polybus.transport.transactions.messages.MessageType + +@MessageInfo(MessageType.EVENT, "alpha", "alpha-event", 1, 0, 0) +class AlphaEvent { + lateinit var name: String +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/inmemory/InMemoryTransportTests.kt b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/InMemoryTransportTests.kt new file mode 100644 index 0000000..cfb4503 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/InMemoryTransportTests.kt @@ -0,0 +1,205 @@ +package polybus.transport.inmemory + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import polybus.transport.PolyBusNotStartedError +import java.time.Instant + +class InMemoryTransportTests { + private lateinit var testEnvironment: TestEnvironment + + @BeforeEach + fun setUp() = runBlocking { + testEnvironment = TestEnvironment() + testEnvironment.setup() + } + + @AfterEach + fun tearDown() = runBlocking { + testEnvironment.stop() + } + + @Test + fun sendBeforeStarting() = runBlocking { + // Arrange + val transaction = testEnvironment.beta.bus.createOutgoingTransaction() + val called = CompletableDeferred() + testEnvironment.alpha.onMessageReceived = { called.complete(true) } + + // Act + transaction.add(AlphaCommand().apply { name = "Test" }) + + // Assert - should throw an error because the transport is not started + try { + transaction.commit() + fail("Expected PolyBusNotStartedError to be thrown") + } catch (e: PolyBusNotStartedError) { + // Expected - exception was thrown as expected + } + assertFalse(called.isCompleted) + } + + @Test + fun sendAfterStarted() = runBlocking { + // Arrange + val transaction = testEnvironment.beta.bus.createOutgoingTransaction() + val called = CompletableDeferred() + testEnvironment.alpha.onMessageReceived = { called.complete(true) } + + // Act - send a command from the beta endpoint to alpha endpoint + testEnvironment.start() + transaction.add(AlphaCommand().apply { name = "Test" }) + transaction.commit() + + // Assert + assertTrue(called.await()) + } + + @Test + fun sendWithExplicitEndpoint() = runBlocking { + // Arrange + val transaction = testEnvironment.alpha.bus.createOutgoingTransaction() + val result = CompletableDeferred() + testEnvironment.alpha.onMessageReceived = { + // This should NOT be called + result.complete(it.bus.name) + } + testEnvironment.alpha.transport.deadLetterHandler = { + // This should be called + result.complete(testEnvironment.alpha.transport.deadLetterEndpoint) + } + val endpoint = testEnvironment.alpha.transport.deadLetterEndpoint + + // Act - send the alpha command to dead letter endpoint + testEnvironment.start() + transaction.add(AlphaCommand().apply { name = "Test" }, endpoint = endpoint) + transaction.commit() + + // Assert + assertEquals(endpoint, result.await()) + } + + @Test + fun sendWithHeaders() = runBlocking { + // Arrange + val key = "X-Custom-Header" + val value = "HeaderValue" + val transaction = testEnvironment.alpha.bus.createOutgoingTransaction() + val result = CompletableDeferred() + testEnvironment.alpha.onMessageReceived = { + result.complete(it.incomingMessage.headers[key].orEmpty()) + } + + // Act - send a command with a custom header + testEnvironment.start() + val message = transaction.add(AlphaCommand().apply { name = "Test" }) + message.headers[key] = value + transaction.commit() + + // Assert + assertEquals(value, result.await()) + } + + @Test + fun sendWithDelay() = runBlocking { + // Arrange + val delayMs = 5000L // 5 seconds + val transaction = testEnvironment.alpha.bus.createOutgoingTransaction() + val result = CompletableDeferred() + var startedAt = 0L + testEnvironment.alpha.onMessageReceived = { + result.complete(System.currentTimeMillis() - startedAt) + } + + // Act - send to the dead letters queue instead of normal processing queue + testEnvironment.start() + val message = transaction.add(AlphaCommand().apply { name = "Test" }) + message.deliverAt = Instant.now().plusMillis(delayMs) + startedAt = System.currentTimeMillis() + transaction.commit() + val elapsedMs = result.await() + + // Assert - allow 1 second of leeway + assertTrue(elapsedMs in (delayMs - 1000)..(delayMs + 1000)) + } + + @Test + fun sendWithExpiredDelay() = runBlocking { + // Arrange + val transaction = testEnvironment.alpha.bus.createOutgoingTransaction() + val called = CompletableDeferred() + testEnvironment.alpha.onMessageReceived = { + called.complete(true) + } + + // Act - schedule command to be delivered in the past + testEnvironment.start() + val message = transaction.add(AlphaCommand().apply { name = "Test" }) + message.deliverAt = Instant.now().minusMillis(1000) // 1 second in the past + transaction.commit() + + // Assert + assertTrue(called.await()) + } + + @Test + fun startWhenAlreadyStarted() = runBlocking { + // Act + testEnvironment.start() + + // Assert - starting again should not throw an error + testEnvironment.start() + } + + @Test + fun subscribeBeforeStarted() = runBlocking { + // Arrange + val transaction = testEnvironment.alpha.bus.createOutgoingTransaction() + val called = CompletableDeferred() + testEnvironment.beta.onMessageReceived = { called.complete(true) } + + // Act + Assert - subscribing and sending before starting should throw + try { + testEnvironment.beta.transport.subscribe( + testEnvironment.beta.bus.messages.getMessageInfo(AlphaEvent::class.java) + ) + fail("Expected PolyBusNotStartedError to be thrown") + } catch (e: PolyBusNotStartedError) { + // Expected - exception was thrown as expected + } + transaction.add(AlphaEvent().apply { name = "Test" }) + try { + transaction.commit() + fail("Expected PolyBusNotStartedError to be thrown") + } catch (e: PolyBusNotStartedError) { + // Expected - exception was thrown as expected + } + + // Assert + assertFalse(called.isCompleted) + } + + @Test + fun subscribe() = runBlocking { + // Arrange + val transaction = testEnvironment.alpha.bus.createOutgoingTransaction() + val called = CompletableDeferred() + testEnvironment.beta.onMessageReceived = { called.complete(true) } + + // Act - subscribing before starting should throw an error + testEnvironment.start() + testEnvironment.beta.transport.subscribe( + testEnvironment.beta.bus.messages.getMessageInfo(AlphaEvent::class.java) + ) + transaction.add(AlphaEvent().apply { name = "Test" }) + transaction.commit() + assertTrue(called.await()) + } +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/inmemory/TestEndpoint.kt b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/TestEndpoint.kt new file mode 100644 index 0000000..88342ac --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/TestEndpoint.kt @@ -0,0 +1,18 @@ +package polybus.transport.inmemory + +import polybus.IPolyBus +import polybus.PolyBusBuilder +import polybus.transport.transactions.IncomingTransaction + +class TestEndpoint { + var onMessageReceived: suspend (IncomingTransaction) -> Unit = {} + lateinit var bus: IPolyBus + val builder: PolyBusBuilder = PolyBusBuilder() + val transport: InMemoryEndpoint + get() = bus.transport as InMemoryEndpoint + + suspend fun handler(transaction: IncomingTransaction, next: suspend () -> Unit) { + onMessageReceived(transaction) + next() + } +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/inmemory/TestEnvironment.kt b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/TestEnvironment.kt new file mode 100644 index 0000000..f89bea4 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/inmemory/TestEnvironment.kt @@ -0,0 +1,48 @@ +package polybus.transport.inmemory + +import polybus.transport.transactions.messages.handlers.serializers.JsonHandlers +import java.util.logging.Logger + +class TestEnvironment { + val inMemoryMessageBroker: InMemoryMessageBroker = InMemoryMessageBroker() + val alpha: TestEndpoint = TestEndpoint() + val beta: TestEndpoint = TestEndpoint() + + suspend fun setup() { + setupEndpoint(alpha, "alpha") + setupEndpoint(beta, "beta") + } + + private suspend fun setupEndpoint(testEndpoint: TestEndpoint, name: String) { + val jsonHandlers = JsonHandlers() + + // add handlers for incoming messages + testEndpoint.builder.incomingPipeline.add(jsonHandlers::deserializer) + testEndpoint.builder.incomingPipeline.add(testEndpoint::handler) + + // add messages + testEndpoint.builder.messages.add(AlphaCommand::class.java) + testEndpoint.builder.messages.add(AlphaEvent::class.java) + testEndpoint.builder.name = name + + // add handlers for outgoing messages + testEndpoint.builder.outgoingPipeline.add(jsonHandlers::serializer) + + // configure InMemory transport + testEndpoint.builder.transportFactory = inMemoryMessageBroker::addEndpoint + inMemoryMessageBroker.log = Logger.getLogger(InMemoryMessageBroker::class.java.name) + + // create the bus instance + testEndpoint.bus = testEndpoint.builder.build() + } + + suspend fun start() { + alpha.bus.start() + beta.bus.start() + } + + suspend fun stop() { + alpha.bus.stop() + beta.bus.stop() + } +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/Command.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/Command.kt new file mode 100644 index 0000000..7298674 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/Command.kt @@ -0,0 +1,4 @@ +package polybus.transport.transactions.messages + +@MessageInfo(MessageType.COMMAND, "polybus", "polybus-command", 1, 0, 0) +class Command diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/Event.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/Event.kt new file mode 100644 index 0000000..135e15d --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/Event.kt @@ -0,0 +1,4 @@ +package polybus.transport.transactions.messages + +@MessageInfo(MessageType.EVENT, "polybus", "polybus-event", 2, 1, 3) +class Event diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessageInfoTests.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessageInfoTests.kt new file mode 100644 index 0000000..370c677 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessageInfoTests.kt @@ -0,0 +1,235 @@ +package polybus.transport.transactions.messages + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +class MessageInfoTests { + @Test + fun getAttributeFromHeaderWithValidHeaderReturnsCorrectAttribute() { + // Arrange + val header = "endpoint=user-service, type=COMMAND, name=create-user, version=1.2.3" + + // Act + val result = MessageInfoValue.getAttributeFromHeader(header) + + // Assert + assertNotNull(result) + assertEquals("user-service", result!!.endpoint) + assertEquals(MessageType.COMMAND, result.type) + assertEquals("create-user", result.name) + assertEquals(1, result.major) + assertEquals(2, result.minor) + assertEquals(3, result.patch) + } + + @Test + fun getAttributeFromHeaderWithEventTypeReturnsCorrectAttribute() { + // Arrange + val header = "endpoint=notification-service, type=EVENT, name=user-created, version=2.0.1" + + // Act + val result = MessageInfoValue.getAttributeFromHeader(header) + + // Assert + assertNotNull(result) + assertEquals("notification-service", result!!.endpoint) + assertEquals(MessageType.EVENT, result.type) + assertEquals("user-created", result.name) + assertEquals(2, result.major) + assertEquals(0, result.minor) + assertEquals(1, result.patch) + } + + @Test + fun getAttributeFromHeaderWithExtraSpacesReturnsCorrectAttribute() { + // Arrange - the current regex doesn't handle spaces within values well, so testing valid spacing + val header = "endpoint=payment-service, type=COMMAND, name=process-payment, version=3.14.159" + + // Act + val result = MessageInfoValue.getAttributeFromHeader(header) + + // Assert + assertNotNull(result) + assertEquals("payment-service", result!!.endpoint) + assertEquals(MessageType.COMMAND, result.type) + assertEquals("process-payment", result.name) + assertEquals(3, result.major) + assertEquals(14, result.minor) + assertEquals(159, result.patch) + } + + @Test + fun getAttributeFromHeaderWithCaseInsensitiveTypeReturnsCorrectAttribute() { + // Arrange + val header = "endpoint=order-service, type=command, name=place-order, version=1.0.0" + + // Act + val result = MessageInfoValue.getAttributeFromHeader(header) + + // Assert + assertNotNull(result) + assertEquals(MessageType.COMMAND, result!!.type) + } + + @ParameterizedTest + @ValueSource( + strings = [ + "", + "invalid header", + "endpoint=test", + "endpoint=test, type=COMMAND", + "endpoint=test, type=COMMAND, name=test", + "endpoint=test, type=COMMAND, name=test, version=invalid", + "type=COMMAND, name=test, version=1.0.0" + ] + ) + fun getAttributeFromHeaderWithInvalidHeaderReturnsNull(header: String) { + // Act + val result = MessageInfoValue.getAttributeFromHeader(header) + + // Assert + assertNull(result) + } + + @Test + fun getAttributeFromHeaderWithInvalidEnumTypeThrowsIllegalArgumentException() { + // Arrange + val header = "endpoint=test, type=INVALID_TYPE, name=test, version=1.0.0" + + // Act & Assert + assertThrows(IllegalArgumentException::class.java) { + MessageInfoValue.getAttributeFromHeader(header) + } + } + + @Test + fun getAttributeFromHeaderWithMissingVersionReturnsNull() { + // Arrange + val header = "endpoint=test-service, type=COMMAND, name=test-command, version=" + + // Act + val result = MessageInfoValue.getAttributeFromHeader(header) + + // Assert + assertNull(result) + } + + @Test + fun getAttributeFromHeaderWithIncompleteVersionReturnsNull() { + // Arrange + val header = "endpoint=test-service, type=COMMAND, name=test-command, version=1.0" + + // Act + val result = MessageInfoValue.getAttributeFromHeader(header) + + // Assert + assertNull(result) + } + + @Test + fun equalsWithIdenticalAttributesReturnsTrue() { + // Arrange + val attr1 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + val attr2 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + + // Act & Assert + assertTrue(attr1.equals(attr2)) + assertTrue(attr2.equals(attr1)) + } + + @Test + fun equalsWithSameObjectReturnsTrue() { + // Arrange + val attr = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + + // Act & Assert + assertTrue(attr.equals(attr)) + } + + @Test + fun equalsWithDifferentTypeReturnsFalse() { + // Arrange + val attr1 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + val attr2 = MessageInfoValue(MessageType.EVENT, "user-service", "create-user", 1, 2, 3) + + // Act & Assert + assertFalse(attr1.equals(attr2)) + } + + @Test + fun equalsWithDifferentMajorVersionReturnsFalse() { + // Arrange + val attr1 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + val attr2 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 2, 2, 3) + + // Act & Assert + assertFalse(attr1.equals(attr2)) + } + + @Test + fun equalsWithDifferentMinorVersionReturnsTrue() { + // Arrange + val attr1 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + val attr2 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 3, 3) + + // Act & Assert + assertTrue(attr1.equals(attr2)) + } + + @Test + fun equalsWithDifferentPatchVersionReturnsTrue() { + val attr1 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + val attr2 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 4) + + // Act & Assert + assertTrue(attr1.equals(attr2)) + } + + @Test + fun equalsWithNullObjectReturnsFalse() { + // Arrange + val attr = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + + // Act & Assert + assertFalse(attr.equals(null)) + } + + @Test + fun equalsWithDifferentObjectTypeReturnsFalse() { + // Arrange + val attr = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + val other: Any = "not a MessageAttribute" + + // Act & Assert + assertFalse(attr.equals(other)) + } + + @Test + fun getHashCodeWithIdenticalAttributesReturnsSameHashCode() { + // Arrange + val attr1 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + val attr2 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + + // Act & Assert + assertEquals(attr1.hashCode(), attr2.hashCode()) + } + + @Test + fun getHashCodeWithDifferentAttributesReturnsDifferentHashCodes() { + // Arrange + val attr1 = MessageInfoValue(MessageType.COMMAND, "user-service", "create-user", 1, 2, 3) + val attr2 = MessageInfoValue(MessageType.EVENT, "user-service", "create-user", 1, 2, 3) + + // Act & Assert + assertNotEquals(attr1.hashCode(), attr2.hashCode()) + } + +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessageWithoutAttribute.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessageWithoutAttribute.kt new file mode 100644 index 0000000..0392b75 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessageWithoutAttribute.kt @@ -0,0 +1,3 @@ +package polybus.transport.transactions.messages + +class MessageWithoutAttribute diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessagesTests.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessagesTests.kt new file mode 100644 index 0000000..67c88bd --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/MessagesTests.kt @@ -0,0 +1,150 @@ +package polybus.transport.transactions.messages + +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows + +class MessagesTests { + private lateinit var messages: Messages + + @BeforeEach + fun setUp() { + messages = Messages() + } + + @Test + fun addValidMessageTypeReturnsMessageInfo() { + // Act + val result = messages.add(Command::class.java) + + // Assert + assertNotNull(result) + assertEquals(MessageType.COMMAND, result.type) + assertEquals("polybus", result.endpoint) + assertEquals("polybus-command", result.name) + assertEquals(1, result.major) + assertEquals(0, result.minor) + assertEquals(0, result.patch) + } + + @Test + fun addMessageTypeWithoutAttributeThrowsError() { + // Act & Assert + assertThrows { messages.add(MessageWithoutAttribute::class.java) } + } + + @Test + fun getMessageInfoExistingTypeReturnsCorrectMessageInfo() { + // Arrange + messages.add(Command::class.java) + + // Act + val result = messages.getMessageInfo(Command::class.java) + + // Assert + assertNotNull(result) + assertEquals(MessageType.COMMAND, result.type) + assertEquals("polybus", result.endpoint) + assertEquals("polybus-command", result.name) + } + + @Test + fun getMessageInfoNonExistentTypeThrowsError() { + // Act & Assert + assertThrows { messages.getMessageInfo(Command::class.java) } + } + + @Test + fun getTypeByMessageInfoExistingMessageInfoReturnsCorrectType() { + // Arrange + messages.add(Event::class.java) + val messageInfo = MessageInfoValue(MessageType.EVENT, "polybus", "polybus-event", 2, 1, 3) + + // Act + val result = messages.getTypeByMessageInfo(messageInfo) + + // Assert + assertEquals(Event::class.java, result) + } + + @Test + fun getTypeByMessageInfoNonExistentMessageInfoThrowsError() { + // Arrange + val messageInfo = MessageInfoValue(MessageType.COMMAND, "unknown", "unknown-command", 1, 0, 0) + + // Act & Assert + assertThrows { messages.getTypeByMessageInfo(messageInfo) } + } + + @Test + fun getTypeByMessageInfoDifferentMinorPatchVersionsReturnsType() { + // Arrange + messages.add(Event::class.java) // Has version 2.1.3 + val messageInfoDifferentMinor = MessageInfoValue(MessageType.EVENT, "polybus", "polybus-event", 2, 5, 3) + val messageInfoDifferentPatch = MessageInfoValue(MessageType.EVENT, "polybus", "polybus-event", 2, 1, 9) + + // Act + val result1 = messages.getTypeByMessageInfo(messageInfoDifferentMinor) + val result2 = messages.getTypeByMessageInfo(messageInfoDifferentPatch) + + // Assert + assertEquals(Event::class.java, result1) + assertEquals(Event::class.java, result2) + } + + @Test + fun getTypeByMessageInfoDifferentMajorVersionThrowsError() { + // Arrange + messages.add(Event::class.java) // Has version 2.1.3 + val messageInfoDifferentMajor = MessageInfoValue(MessageType.EVENT, "polybus", "polybus-event", 3, 1, 3) + + // Act & Assert + assertThrows { messages.getTypeByMessageInfo(messageInfoDifferentMajor) } + } + + @Test + fun addSameTypeTwiceThrowsError() { + // Arrange + messages.add(Command::class.java) + + // Act & Assert + assertThrows { messages.add(Command::class.java) } + } + + @Test + fun getHeaderByMessageInfoExistingMessageInfoReturnsCorrectHeader() { + // Arrange + messages.add(Command::class.java) + val messageInfo = MessageInfoValue(MessageType.COMMAND, "polybus", "polybus-command", 1, 0, 0) + + // Act + val result = messages.getHeaderByMessageInfo(messageInfo) + + // Assert + assertNotNull(result) + assertFalse(result.isEmpty()) + assertEquals(messageInfo.toHeaderString(true), result) + } + + @Test + fun getHeaderByMessageInfoNonExistentMessageInfoThrowsError() { + // Arrange + val messageInfo = MessageInfoValue(MessageType.COMMAND, "unknown", "unknown-command", 1, 0, 0) + + // Act & Assert + assertThrows { messages.getHeaderByMessageInfo(messageInfo) } + } + + @Test + fun getHeaderByMessageInfoDifferentMajorVersionThrowsError() { + // Arrange + messages.add(Event::class.java) // Has version 2.1.3 + val messageInfoDifferentMajor = MessageInfoValue(MessageType.EVENT, "polybus", "polybus-event", 3, 1, 3) + + // Act & Assert + assertThrows { messages.getHeaderByMessageInfo(messageInfoDifferentMajor) } + } +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandlerTestMessage.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandlerTestMessage.kt new file mode 100644 index 0000000..1314104 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandlerTestMessage.kt @@ -0,0 +1,7 @@ +package polybus.transport.transactions.messages.handlers.error + +import polybus.transport.transactions.messages.MessageInfo +import polybus.transport.transactions.messages.MessageType + +@MessageInfo(MessageType.COMMAND, "polybus", "error-handler-test-message", 1, 0, 0) +class ErrorHandlerTestMessage diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandlerTests.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandlerTests.kt new file mode 100644 index 0000000..333c9eb --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/ErrorHandlerTests.kt @@ -0,0 +1,303 @@ +package polybus.transport.transactions.messages.handlers.error + +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import polybus.transport.transactions.IncomingTransaction +import polybus.transport.transactions.messages.IncomingMessage +import polybus.transport.transactions.messages.handlers.error.ErrorHandler +import java.time.Instant +import java.time.temporal.ChronoUnit + +class ErrorHandlerTests { + private lateinit var testBus: TestBus + private lateinit var transaction: IncomingTransaction + private lateinit var errorHandler: TestableErrorHandler + + @BeforeEach + fun setUp() { + testBus = TestBus("TestBus") + testBus.messages.add(ErrorHandlerTestMessage::class.java) + val incomingMessage = IncomingMessage( + bus = testBus, + body = "{}", + messageInfo = testBus.messages.getMessageInfo(ErrorHandlerTestMessage::class.java) + ) + transaction = IncomingTransaction(testBus, incomingMessage) + errorHandler = TestableErrorHandler() + } + + @Test + fun retrierSucceedsOnFirstAttemptDoesNotRetry() = runBlocking { + // Arrange + var nextCalled = false + + // Act + errorHandler.retrier(transaction) { + nextCalled = true + } + + // Assert + assertTrue(nextCalled) + assertEquals(0, transaction.outgoingMessages.size) + } + + @Test + fun retrierFailsOnceRetriesImmediately() = runBlocking { + // Arrange + var callCount = 0 + + // Act + errorHandler.retrier(transaction) { + callCount++ + if (callCount == 1) { + throw RuntimeException("Test error") + } + } + + // Assert + assertEquals(2, callCount) + assertEquals(0, transaction.outgoingMessages.size) + } + + @Test + fun retrierFailsAllImmediateRetriesSchedulesDelayedRetry() = runBlocking { + // Arrange + val expectedRetryTime = Instant.now().plusSeconds(300) + errorHandler.setNextRetryTime(expectedRetryTime) + var callCount = 0 + + // Act + errorHandler.retrier(transaction) { + callCount++ + throw RuntimeException("Test error") + } + + // Assert + assertEquals(errorHandler.immediateRetryCount, callCount) + assertEquals(1, transaction.outgoingMessages.size) + + val delayedMessage = transaction.outgoingMessages.first() + assertEquals(expectedRetryTime, delayedMessage.deliverAt) + assertEquals("1", delayedMessage.headers[errorHandler.retryCountHeader]) + assertEquals("TestBus", delayedMessage.endpoint) + } + + @Test + fun retrierWithExistingRetryCountIncrementsCorrectly() = runBlocking { + // Arrange + transaction.incomingMessage.headers[errorHandler.retryCountHeader] = "2" + val expectedRetryTime = Instant.now().plusSeconds(600) + errorHandler.setNextRetryTime(expectedRetryTime) + + // Act + errorHandler.retrier(transaction) { + throw RuntimeException("Test error") + } + + // Assert + assertEquals(1, transaction.outgoingMessages.size) + + val delayedMessage = transaction.outgoingMessages.first() + assertEquals("3", delayedMessage.headers[errorHandler.retryCountHeader]) + assertEquals(expectedRetryTime, delayedMessage.deliverAt) + } + + @Test + fun retrierExceedsMaxDelayedRetriesSendsToDeadLetter() = runBlocking { + // Arrange + transaction.incomingMessage.headers[errorHandler.retryCountHeader] = + errorHandler.delayedRetryCount.toString() + val testException = RuntimeException("Final error") + + // Act + errorHandler.retrier(transaction) { + throw testException + } + + // Assert + assertEquals(1, transaction.outgoingMessages.size) + + val deadLetterMessage = transaction.outgoingMessages.first() + assertEquals(TestTransport.DEFAULT_DEAD_LETTER_ENDPOINT, deadLetterMessage.endpoint) + assertEquals("Final error", deadLetterMessage.headers[errorHandler.errorMessageHeader]) + assertNotNull(deadLetterMessage.headers[errorHandler.errorStackTraceHeader]) + } + + @Test + fun retrierClearsOutgoingMessagesOnEachRetry() = runBlocking { + // Arrange + var callCount = 0 + + // Act + errorHandler.retrier(transaction) { + callCount++ + transaction.add(ErrorHandlerTestMessage()) + throw RuntimeException("Test error") + } + + // Assert + assertEquals(errorHandler.immediateRetryCount, callCount) + // Should only have the delayed retry message, not the messages added in next() + assertEquals(1, transaction.outgoingMessages.size) + assertTrue( + transaction.outgoingMessages.first().headers.containsKey(errorHandler.retryCountHeader) + ) + } + + @Test + fun retrierWithZeroImmediateRetriesSchedulesDelayedRetryImmediately() = runBlocking { + // Arrange + errorHandler.immediateRetryCount = 0 + val expectedRetryTime = Instant.now().plusSeconds(300) + errorHandler.setNextRetryTime(expectedRetryTime) + var callCount = 0 + + // Act + errorHandler.retrier(transaction) { + callCount++ + throw RuntimeException("Test error") + } + + // Assert + assertEquals(1, callCount) // Should enforce minimum of 1 + assertEquals(1, transaction.outgoingMessages.size) + assertEquals("1", transaction.outgoingMessages.first().headers[errorHandler.retryCountHeader]) + } + + @Test + fun retrierWithZeroDelayedRetriesStillGetsMinimumOfOne() = runBlocking { + // Arrange + errorHandler.delayedRetryCount = 0 + val expectedRetryTime = Instant.now().plusSeconds(300) + errorHandler.setNextRetryTime(expectedRetryTime) + + // Act + errorHandler.retrier(transaction) { + throw RuntimeException("Test error") + } + + // Assert + assertEquals(1, transaction.outgoingMessages.size) + assertEquals("1", transaction.outgoingMessages.first().headers[errorHandler.retryCountHeader]) + assertEquals(expectedRetryTime, transaction.outgoingMessages.first().deliverAt) + } + + @Test + fun getNextRetryTimeDefaultImplementationUsesDelayCorrectly() { + // Arrange + val handler = ErrorHandler().apply { delayIncrement = 60 } + val beforeTime = Instant.now() + + // Act + val result1 = handler.getNextRetryTime(1) + val result2 = handler.getNextRetryTime(2) + val result3 = handler.getNextRetryTime(3) + + val afterTime = Instant.now() + + // Assert + assertTrue(result1 >= beforeTime.plus(60, ChronoUnit.SECONDS)) + assertTrue(result1 <= afterTime.plus(60, ChronoUnit.SECONDS)) + + assertTrue(result2 >= beforeTime.plus(120, ChronoUnit.SECONDS)) + assertTrue(result2 <= afterTime.plus(120, ChronoUnit.SECONDS)) + + assertTrue(result3 >= beforeTime.plus(180, ChronoUnit.SECONDS)) + assertTrue(result3 <= afterTime.plus(180, ChronoUnit.SECONDS)) + } + + @Test + fun retrierSucceedsAfterSomeImmediateRetriesStopsRetrying() = runBlocking { + // Arrange + var callCount = 0 + + // Act + errorHandler.retrier(transaction) { + callCount++ + if (callCount < 3) { + throw RuntimeException("Test error") + } + } + + // Assert + assertEquals(3, callCount) + assertEquals(0, transaction.outgoingMessages.size) + } + + @Test + fun retrierInvalidRetryCountHeaderTreatsAsZero() = runBlocking { + // Arrange + transaction.incomingMessage.headers[errorHandler.retryCountHeader] = "invalid" + val expectedRetryTime = Instant.now().plusSeconds(300) + errorHandler.setNextRetryTime(expectedRetryTime) + + // Act + errorHandler.retrier(transaction) { + throw RuntimeException("Test error") + } + + // Assert + assertEquals(1, transaction.outgoingMessages.size) + val delayedMessage = transaction.outgoingMessages.first() + assertEquals("1", delayedMessage.headers[errorHandler.retryCountHeader]) + } + + @Test + fun retrierExceptionStackTraceIsStoredInHeader() = runBlocking { + // Arrange + transaction.incomingMessage.headers[errorHandler.retryCountHeader] = + errorHandler.delayedRetryCount.toString() + + val exceptionWithStackTrace = RuntimeException("Error with stack trace") + + // Act + errorHandler.retrier(transaction) { + throw exceptionWithStackTrace + } + + // Assert + assertEquals(1, transaction.outgoingMessages.size) + val deadLetterMessage = transaction.outgoingMessages.first() + val storedStackTrace = deadLetterMessage.headers[errorHandler.errorStackTraceHeader] + assertNotNull(storedStackTrace) + assertFalse(storedStackTrace.isNullOrEmpty()) + } + + @Test + fun retrierExceptionWithNullStackTraceUsesEmptyString() = runBlocking { + // Arrange + transaction.incomingMessage.headers[errorHandler.retryCountHeader] = + errorHandler.delayedRetryCount.toString() + + // Create an exception with null StackTrace using custom exception + val exceptionWithoutStackTrace = RuntimeException("Error without stack trace") + exceptionWithoutStackTrace.stackTrace = emptyArray() + + // Act + errorHandler.retrier(transaction) { + throw exceptionWithoutStackTrace + } + + // Assert + assertEquals(1, transaction.outgoingMessages.size) + val deadLetterMessage = transaction.outgoingMessages.first() + assertEquals("", deadLetterMessage.headers[errorHandler.errorStackTraceHeader]) + } + + private class TestableErrorHandler : ErrorHandler() { + private var _nextRetryTime: Instant? = null + + fun setNextRetryTime(retryTime: Instant) { + _nextRetryTime = retryTime + } + + override fun getNextRetryTime(attempt: Int): Instant { + return _nextRetryTime ?: super.getNextRetryTime(attempt) + } + } +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/TestBus.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/TestBus.kt new file mode 100644 index 0000000..aba41ed --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/TestBus.kt @@ -0,0 +1,28 @@ +package polybus.transport.transactions.messages.handlers.error + +import polybus.IPolyBus +import polybus.transport.ITransport +import polybus.transport.transactions.IncomingTransaction +import polybus.transport.transactions.OutgoingTransaction +import polybus.transport.transactions.Transaction +import polybus.transport.transactions.messages.IncomingMessage +import polybus.transport.transactions.messages.Messages +import polybus.transport.transactions.messages.handlers.IncomingHandler +import polybus.transport.transactions.messages.handlers.OutgoingHandler + +class TestBus(override val name: String) : IPolyBus { + override val properties: MutableMap = mutableMapOf() + override var transport: ITransport = TestTransport() + override val incomingPipeline: MutableList = mutableListOf() + override val outgoingPipeline: MutableList = mutableListOf() + override val messages: Messages = Messages() + + override suspend fun createIncomingTransaction(message: IncomingMessage): IncomingTransaction = + IncomingTransaction(this, message) + + override suspend fun createOutgoingTransaction(): OutgoingTransaction = OutgoingTransaction(this) + + override suspend fun send(transaction: Transaction) {} + override suspend fun start() {} + override suspend fun stop() {} +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/TestTransport.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/TestTransport.kt new file mode 100644 index 0000000..81f75c6 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/error/TestTransport.kt @@ -0,0 +1,24 @@ +package polybus.transport.transactions.messages.handlers.error + +import polybus.transport.ITransport +import polybus.transport.transactions.Transaction +import polybus.transport.transactions.messages.MessageInfoValue + +class TestTransport : ITransport { + companion object { + const val DEFAULT_DEAD_LETTER_ENDPOINT = "dead-letters" + } + + override val deadLetterEndpoint: String = DEFAULT_DEAD_LETTER_ENDPOINT + override suspend fun handle(transaction: Transaction) { + throw NotImplementedError() + } + + override val supportsCommandMessages: Boolean = true + override val supportsDelayedCommands: Boolean = true + override val supportsSubscriptions: Boolean = false + + override suspend fun subscribe(messageInfo: MessageInfoValue) {} + override suspend fun start() {} + override suspend fun stop() {} +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlerTestMessage.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlerTestMessage.kt new file mode 100644 index 0000000..98dce50 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlerTestMessage.kt @@ -0,0 +1,9 @@ +package polybus.transport.transactions.messages.handlers.serializers + +import polybus.transport.transactions.messages.MessageInfo +import polybus.transport.transactions.messages.MessageType + +@MessageInfo(MessageType.COMMAND, "polybus", "json-handler-test-message", 1, 0, 0) +class JsonHandlerTestMessage { + lateinit var text: String +} diff --git a/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlersTests.kt b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlersTests.kt new file mode 100644 index 0000000..a803bf1 --- /dev/null +++ b/src/kotlin/src/test/kotlin/polybus/transport/transactions/messages/handlers/serializers/JsonHandlersTests.kt @@ -0,0 +1,28 @@ +package polybus.transport.transactions.messages.handlers.serializers + +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import polybus.Headers +import polybus.PolyBusBuilder + +class JsonHandlersTests { + @Test + fun serializerSetsBodyAndContentType() = runBlocking { + // Arrange + val jsonHandlers = JsonHandlers() + val builder = PolyBusBuilder() + builder.messages.add(JsonHandlerTestMessage::class.java) + val bus = builder.build() + val transaction = bus.createOutgoingTransaction() + val message = JsonHandlerTestMessage().apply { text = "Hello, World!" } + val outgoing = transaction.add(message) + + // Act + jsonHandlers.serializer(transaction) {} + + // Assert + assertTrue(outgoing.body.isNotBlank()) + assertTrue(outgoing.headers[Headers.CONTENT_TYPE] == "application/json") + } +}