Skip to content

Conversation

@alexeyzimarev
Copy link
Contributor

@alexeyzimarev alexeyzimarev commented Nov 22, 2025

User description

Fixes #471
Replaces #470


PR Type

Enhancement


Description

  • Add SessionId and ReplyToSessionId support to producer options

  • Implement ServiceBusSessionProcessor for session-based message consumption

  • Refactor message handling to support both standard and session processors

  • Update subscription options with SessionProcessorOptions configuration

  • Improve message property extraction with fallback mechanisms


Diagram Walkthrough

flowchart LR
  Producer["Producer Options<br/>SessionId Support"]
  MessageBuilder["Message Builder<br/>Apply SessionId"]
  Subscription["Subscription<br/>Dual Processor Support"]
  SessionProcessor["Session Processor<br/>Ordered Processing"]
  StandardProcessor["Standard Processor<br/>Regular Processing"]
  
  Producer -- "Configure" --> MessageBuilder
  MessageBuilder -- "Create Messages" --> Subscription
  Subscription -- "SessionProcessorOptions" --> SessionProcessor
  Subscription -- "ProcessorOptions" --> StandardProcessor
Loading

File Walkthrough

Relevant files
Enhancement
4 files
ServiceBusProduceOptions.cs
Add SessionId and ReplyToSessionId properties                       
+10/-0   
ServiceBusMessageBuilder.cs
Apply SessionId to outgoing messages conditionally             
+9/-1     
ServiceBusSubscriptionOptions.cs
Add SessionProcessorOptions and MakeSessionProcessor interface
+41/-0   
ServiceBusSubscription.cs
Implement dual processor support with session handling     
+62/-20 
Tests
1 files
AggregateCommandsTests.cs
Shorten test method names and add IgnoreParameters             
+6/-6     
Dependencies
1 files
Directory.Packages.props
Update Testcontainers version to 4.8.1                                     
+1/-1     
Additional files
4 files
AggregateCommandsTests.MapContractExplicitly.verified.txt [link]   
AggregateCommandsTests.MapContractExplicitlyWithoutRoute.verified.txt [link]   
AggregateCommandsTests.MapContractExplicitlyWithoutRouteWithGenericAttr.verified.txt [link]   
AggregateCommandsTests.MapEnrichedCommand.verified.txt [link]   

@qodo-free-for-open-source-projects
Copy link
Contributor

qodo-free-for-open-source-projects bot commented Nov 22, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🟢
🎫 #471
🟢 Add SessionId and ReplyToSessionId properties to producer options
Apply SessionId and ReplyToSessionId to outgoing messages in the message builder
Implement ServiceBusSessionProcessor support in subscription
Enable conditional activation of session processor based on configuration
Create session-aware message handler that processes session messages with proper context
Implement proper error handling for session-based message processing
Add SessionProcessorOptions to subscription options
Ensure SessionProcessorOptions takes priority over standard ProcessorOptions when
specified
Support both standard and session-based message processing modes
Codebase Duplication Compliance
🟢
No codebase code duplication found No new components were introduced in the PR code
Custom Compliance
🟢
Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

🔴
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Missing null validation: The SessionProcessorOptions property is accessed without null validation before being
passed to MakeSessionProcessor, which could cause runtime exceptions if the property is
unexpectedly null.

Referred Code
_sessionProcessor = Options.QueueOrTopic.MakeSessionProcessor(_client, Options);

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Missing input validation: The message properties extracted from msg.ApplicationProperties and msg.Subject lack
validation before being used, potentially allowing malformed or malicious data to
propagate through the system.

Referred Code
var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType)
    ? messageType.ToString()
    : msg.Subject) ?? throw new InvalidOperationException("Message type is missing in message properties");
var contentType = msg.ContentType;

// Should this be a stream name? or topic or something
var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream)
    ? stream.ToString()
    : Options.QueueOrTopic switch {
        Queue queue => queue.Name,
        Topic topic => topic.Name,
        _           => null
    }) ?? throw new InvalidOperationException("Stream name is missing in message properties");

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-free-for-open-source-projects
Copy link
Contributor

qodo-free-for-open-source-projects bot commented Nov 22, 2025

PR Code Suggestions ✨

Latest suggestions up to 35baa5a

CategorySuggestion                                                                                                                                    Impact
Incremental [*]
Add null check for message type
Suggestion Impact:The suggestion was directly implemented in the commit. The code now includes a null check (`&& messageType is not null`) before calling ToString() on the messageType variable, exactly as suggested.

code diff:

+        var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) && messageType is not null

Add a null check for messageType before calling ToString() to prevent a
potential NullReferenceException.

src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs [77-79]

-var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType)
+var eventType = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.MessageType, out var messageType) && messageType is not null
     ? messageType.ToString()
     : msg.Subject) ?? throw new InvalidOperationException("Message type is missing in message properties");

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential NullReferenceException if the messageType property exists but its value is null. Adding a null check prevents this runtime error, making the message processing more robust.

Medium
Add null check for stream name
Suggestion Impact:The suggestion was directly implemented. The commit added the null check `&& stream is not null` on line 13, exactly as suggested. Additionally, the same pattern was applied to the messageType variable on line 6, showing the suggestion influenced a broader fix for the same issue.

code diff:

-        var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream)
+        var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) && stream is not null
             ? stream.ToString()

Add a null check for stream before calling ToString() to prevent a potential
NullReferenceException.

src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs [83-89]

-var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream)
+var streamName = (msg.ApplicationProperties.TryGetValue(Options.AttributeNames.StreamName, out var stream) && stream is not null
     ? stream.ToString()
     : Options.QueueOrTopic switch {
         Queue queue => queue.Name,
         Topic topic => topic.Name,
         _           => null
     }) ?? throw new InvalidOperationException("Stream name is missing in message properties");

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential NullReferenceException if the stream property exists in the message but its value is null. Adding a null check prevents this runtime error, improving the robustness of message processing.

Medium
  • Update

Previous suggestions

Suggestions up to commit f832640
CategorySuggestion                                                                                                                                    Impact
High-level
Refactor subscription to use composition

Refactor the ServiceBusSubscription class to use composition instead of
conditional logic for handling session and non-session processors. This can be
achieved by extracting processor management into separate strategy classes.

Examples:

src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs [40-56]
    protected override ValueTask Subscribe(CancellationToken cancellationToken) {
        if (Options.SessionProcessorOptions is not null) {
            _sessionProcessor = Options.QueueOrTopic.MakeSessionProcessor(_client, Options);

            _sessionProcessor.ProcessMessageAsync += HandleSessionMessage;
            _sessionProcessor.ProcessErrorAsync   += _defaultErrorHandler;

            return new(_sessionProcessor.StartProcessingAsync(cancellationToken));
        }


 ... (clipped 7 lines)
src/Azure/src/Eventuous.Azure.ServiceBus/Subscriptions/ServiceBusSubscription.cs [169-176]
    protected override async ValueTask Unsubscribe(CancellationToken cancellationToken) {
        if (_sessionProcessor is not null) {
            await _sessionProcessor.StopProcessingAsync(cancellationToken).NoContext();
        }
        else if (_processor is not null) {
            await _processor.StopProcessingAsync(cancellationToken).NoContext();
        }
    }

Solution Walkthrough:

Before:

class ServiceBusSubscription : EventSubscription<...> {
    ServiceBusProcessor? _processor;
    ServiceBusSessionProcessor? _sessionProcessor;

    protected override ValueTask Subscribe(...) {
        if (Options.SessionProcessorOptions is not null) {
            _sessionProcessor = ...CreateSessionProcessor(...);
            _sessionProcessor.ProcessMessageAsync += HandleSessionMessage;
            // ... more setup
            return new(_sessionProcessor.StartProcessingAsync(...));
        }

        _processor = ...CreateProcessor(...);
        _processor.ProcessMessageAsync += HandleMessage;
        // ... more setup
        return new(_processor.StartProcessingAsync(...));
    }

    protected override async ValueTask Unsubscribe(...) {
        if (_sessionProcessor is not null) { ... }
        else if (_processor is not null) { ... }
    }
}

After:

interface IServiceBusProcessorStrategy {
    ValueTask Start(CancellationToken cancellationToken);
    ValueTask Stop(CancellationToken cancellationToken);
}

class SessionProcessorStrategy : IServiceBusProcessorStrategy { /* ... */ }
class StandardProcessorStrategy : IServiceBusProcessorStrategy { /* ... */ }

class ServiceBusSubscription : EventSubscription<...> {
    readonly IServiceBusProcessorStrategy _processorStrategy;

    public ServiceBusSubscription(ServiceBusClient client, ServiceBusSubscriptionOptions options, ...) {
        // ...
        _processorStrategy = options.SessionProcessorOptions is not null
            ? new SessionProcessorStrategy(client, options, HandleSessionMessage, ...)
            : new StandardProcessorStrategy(client, options, HandleMessage, ...);
    }

    protected override ValueTask Subscribe(CancellationToken ct)
        => _processorStrategy.Start(ct);

    protected override ValueTask Unsubscribe(CancellationToken ct)
        => _processorStrategy.Stop(ct);
}
Suggestion importance[1-10]: 7

__

Why: This is a valid architectural suggestion that improves the design by applying the strategy pattern, which would make the ServiceBusSubscription class cleaner and more maintainable.

Medium

@github-actions
Copy link

github-actions bot commented Nov 22, 2025

Test Results

 51 files  + 34   51 suites  +34   35m 44s ⏱️ + 24m 49s
279 tests + 10  279 ✅ + 10  0 💤 ±0  0 ❌ ±0 
840 runs  +560  840 ✅ +560  0 💤 ±0  0 ❌ ±0 

Results for commit c08bd14. ± Comparison against base commit 05689c4.

This pull request removes 5 and adds 15 tests. Note that renamed tests count towards both.
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/22/2025 2:41:35 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/22/2025 2:41:35 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(393180c3-a2a7-4de1-bf68-874ac08b21e2)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2025-11-22T14:41:35.8604159+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-22T14:41:35.8604159+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2025-11-22T14:41:35.8604159+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2025-11-22T14:41:35.8604159+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-22T14:41:35.8604159+00:00 })
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2025-11-22T14:41:35.8604159+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-22T14:41:35.8604159+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2025-11-22T14:41:35.8604159+00:00 }, CommitPosition { Position: 0, Sequence: 8, Timestamp: 2025-11-22T14:41:35.8604159+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-22T14:41:35.8604159+00:00 })
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/22/2025 3:54:03 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/22/2025 3:54:03 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/22/2025 3:54:07 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/22/2025 3:54:07 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/22/2025 3:54:37 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(11/22/2025 3:54:37 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(855a817a-e5ca-4670-83ed-4c53695d729d)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(b316a449-52bf-4cc6-9430-3c7cfcfd1bc8)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(fc9392cf-30b2-4bc6-92e3-f7ecdd75f32a)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2025-11-22T15:54:03.6580816+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-22T15:54:03.6580816+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2025-11-22T15:54:03.6580816+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2025-11-22T15:54:03.6580816+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2025-11-22T15:54:03.6580816+00:00 })
…

♻️ This comment has been updated with latest results.

@alexeyzimarev
Copy link
Contributor Author

@alexey-troshkin-xpress check if it's ok

…BusSubscription.cs

Co-authored-by: qodo-merge-for-open-source[bot] <189517486+qodo-merge-for-open-source[bot]@users.noreply.github.com>
…BusSubscription.cs

Co-authored-by: qodo-merge-for-open-source[bot] <189517486+qodo-merge-for-open-source[bot]@users.noreply.github.com>
@alexey-troshkin-xpress
Copy link
Contributor

@alexeyzimarev, сhecked the changes and that it works – everything's fine

@alexeyzimarev alexeyzimarev merged commit 22a65b9 into dev Nov 24, 2025
5 checks passed
@alexeyzimarev alexeyzimarev deleted the Xpress-PH-feature/service-bus-sessions branch November 24, 2025 11:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable SessionId support for ordered message processing in Service Bus

4 participants