Skip to content

Performance: Limited publish through-put when there are many idle clients #2174

@AlexNik4

Description

@AlexNik4

Describe the bug

I can publish 25k messages per second to one client (QoS 1) if there are few other clients. But that number quickly diminishes as the number of other clients grows. The other clients are subscribed to unrelated topics. No wildcards are used at all.

Which component is your bug related to?

  • Client
  • Server

To Reproduce

Run the code below in release mode. This is hardware dependent but first try something like:

const int MsgPerSecond = 20000;
const int SubscriberClients = 100;

Observe in the console output that the publisher channel size is pretty much always at 0 and the received message count is increasing by the configured MsgPerSecond. Everything is working fine. Then try:

const int MsgPerSecond = 2000;
const int SubscriberClients = 10000;

You should notice that the publisher is no longer able to keep up. The channel size is continuously increasing in size, even though we are publishing 90% less messages and the other subscribes are not really 'relevant'.

Expected behavior

I was hoping to support a scenario where there are many clients(10k) subscribed to unique topic like 'user/[username]' and be able to publish to a small group of those clients. Since the topics don't have any wildcards I expected this to scale at the speed of a hashmap lookup.

Additional context / logging

If all the other clients are subscribed to a constant topic (different from the one that is actually published to), then there is no performance issue. Adding additional publishers to help did not scale either. Maybe this scenario is not really what MQTTNet was designed for, but it would be nice to be able to support this scenario along with the 'normal' cases.

Code example

const int MsgPerSecond = 2000;
const int SubscriberClients = 10000;
const MqttQualityOfServiceLevel ServiceQuality = MqttQualityOfServiceLevel.AtLeastOnce;

MqttServerFactory serverFactory = new();

MqttServerOptions serverOptions = new MqttServerOptionsBuilder()
    .WithDefaultEndpoint()
    .Build();

var mqttServer = serverFactory.CreateMqttServer(serverOptions);
await mqttServer.StartAsync();

MqttClientFactory factory = new();
IMqttClient publisherClient = factory.CreateMqttClient();

var publisherOptions = new MqttClientOptionsBuilder()
            .WithTcpServer("localhost")
            .WithClientId("publisher")
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(10))
            .WithCleanSession()
            .Build();

// Connect the publisher client
await publisherClient.ConnectAsync(publisherOptions);

Console.WriteLine("Connecting the subscriber clients...");

TestClient mainReceiverClient = new TestClient(factory.CreateMqttClient(), "main", ServiceQuality);
await mainReceiverClient.StartAsync();

// Connect the other dummy clients
for(int i = 0; i < SubscriberClients - 1; i++)
{
    TestClient subscriberClient = new TestClient(factory.CreateMqttClient(), $"{i}", ServiceQuality);
    await subscriberClient.StartAsync();
}

// Create a channel for publishing messages
var channel = Channel.CreateUnbounded<(string topic, string payload)>();

// Task to continuously read from the channel and publish messages
_ = Task.Run(async () =>
{
    await foreach (var (topic, payload) in channel.Reader.ReadAllAsync())
    {
        var message = new MqttApplicationMessageBuilder()
            .WithTopic(topic)
            .WithPayload(payload)
            .WithQualityOfServiceLevel(ServiceQuality)
            .Build();

        await publisherClient.PublishAsync(message);
    }
});

// Timer to send messages to the channel every second
var timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
    Console.WriteLine($"Received Msg Count: {mainReceiverClient.RecievedMsgCount}");
    Console.WriteLine($"PUBLISHER CHANNEL SIZE: {channel.Reader.Count}");
    for (int i = 0; i < MsgPerSecond; i++)
    {
        channel.Writer.TryWrite(("client/main", "Hello World"));
    }
};
timer.Start();

Console.ReadLine();

class TestClient
{
    private readonly IMqttClient _client;
    private readonly MqttClientOptions _options;
    private readonly MqttQualityOfServiceLevel _serviceQuality;

    public int RecievedMsgCount { get; private set; } = 0;

    public TestClient(IMqttClient client, string clientId, MqttQualityOfServiceLevel serviceQuality)
    {
        _client = client;
        _serviceQuality = serviceQuality;
        _options = new MqttClientOptionsBuilder()
            .WithTcpServer("localhost")
            .WithClientId(clientId)
            .WithKeepAlivePeriod(TimeSpan.FromSeconds(10))
            .WithCleanSession()
            .Build();
    }

    public async Task StartAsync()
    {
        _client.ApplicationMessageReceivedAsync += e =>
        {
            RecievedMsgCount++;
            return Task.CompletedTask;
        };
        await _client.ConnectAsync(_options);
        await _client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
            .WithTopicFilter($"client/{_options.ClientId}", _serviceQuality)
            .Build());
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions