Skip to content

multiprocessing.Queue() causes memory buildup when using a ProcessSubscriptionProducer with low latency #40

@henriksod

Description

@henriksod

Hi, I discovered that I had memory buildup when using ProcessSubscriptionProducer with a low latency source, meaning that calls to self._put_nowait(data) occurred quite rapidly. The reason for the memory buildup was that the self._producerQueue.get(timeout=self._joinTimeout) in __queueReader did not catch up with the puts to the queue, causing a queue buildup.

I discovered this thread:
https://stackoverflow.com/questions/47085458/why-is-multiprocessing-queue-get-so-slow

They proposed to use multiprocessing.Manager().Queue() instead. I tried this and it actually worked better. get calls are much faster using multiprocessing.Manager().Queue() than with multiprocessing.Queue().

For my case, I also had to clear the queue after each get, because the memory buildup was still there, even if it was much slower than before. Maybe it could be considered to allow similar functionality to MostRecentSubscription inside the ProcessSubscriptionProducer communication between the processes.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions