Added a bounded Async writer that prevents oom errors#174
Added a bounded Async writer that prevents oom errors#174Kaldie wants to merge 1 commit intomtth:masterfrom
Conversation
|
@mtth can you have a look? |
| if buffersize is None: | ||
| return AsyncWriter(consumer) | ||
| else: | ||
| return BoundedAsyncWriter(consumer, buffer_size=buffersize) |
There was a problem hiding this comment.
buffersize is an HDFS argument, better not to overload its use here. Let's introduce a separate argument instead.
| self.exception = kwargs.get("exception") | ||
|
|
||
|
|
||
| def wrapped_consumer(asyncWriter, data): |
There was a problem hiding this comment.
Let's make this private and use Python naming conventions (recommend shortening asyncWriter to writer).
|
|
||
|
|
||
| def wrapped_consumer(asyncWriter, data): | ||
| """Wrapped consumer that lets us get a child's exception.""" |
There was a problem hiding this comment.
This needs to be updated now that the function is top-level.
| except RuntimeError: | ||
| pass |
There was a problem hiding this comment.
This is a big code smell. Is there a way to avoid this? Locks are best used via with, perhaps it can help us here.
| """A Bounded asynchronous publisher-consumer. | ||
|
|
||
| :param consumer: Function which takes a single generator as argument. | ||
| :param buffer_size: Number of entities that are buffered. When this number is exeeded, |
There was a problem hiding this comment.
Please stick to 80 chars per line throughout.
| _logger.debug('Child terminated without errors.') | ||
| self._queue = None | ||
|
|
||
| def write(self, chunk): |
There was a problem hiding this comment.
Much of the logic in this class is identical to the original writer, is there a way to consolidate?
Solves the issue reported in #172
Another solution was to bound the queue, however that would be very unbalanced if the size writes were not equal. The downside of this that the write needs to have a len(). However that