Skip to content

Commit 9ea38fd

Browse files
author
Roman Janusz
committed
fixed timeout handling in ObservableBlockingIterator
1 parent 81cb991 commit 9ea38fd

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

commons-core/jvm/src/main/scala/com/avsystem/commons/concurrent/ObservableBlockingIterator.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package com.avsystem.commons
22
package concurrent
33

4-
import java.util.concurrent.ArrayBlockingQueue
54
import com.avsystem.commons.collection.CloseableIterator
65
import monix.execution.{Ack, Scheduler}
76
import monix.reactive.Observable
87
import monix.reactive.observers.Subscriber
98

9+
import java.util.concurrent.ArrayBlockingQueue
1010
import scala.annotation.nowarn
11-
import scala.concurrent.blocking
12-
import scala.concurrent.duration.TimeUnit
11+
import scala.concurrent.duration.{FiniteDuration, TimeUnit}
12+
import scala.concurrent.{TimeoutException, blocking}
1313

1414
/**
1515
* An `Iterator` backed by a `BlockingQueue` backed by an `Observable`.
@@ -33,18 +33,19 @@ class ObservableBlockingIterator[T](
3333
private val cancelable = observable.subscribe(this)
3434

3535
def onNext(elem: T): Future[Ack] = {
36+
val safeElem = if (elem.asInstanceOf[AnyRef] eq null) Null else elem
3637
// checking size is safe because only `onNext/onError/onComplete` add to the queue
3738
// and they are guaranteed to be invoked sequentially
3839
if (queue.remainingCapacity > 1) {
3940
// there's more than one spot in the queue, add this element and acknowledge immediately
40-
queue.add(elem)
41+
queue.add(safeElem)
4142
Ack.Continue
4243
} else {
4344
// not sure if there's more than one spot in the queue - add the element but return a Promise-backed Future of acknowledgement
4445
// NOTE: the Observable protocol guarantees that `onNext/onError/onComplete` is never called when the queue is full
4546
val promise = Promise[Ack]()
4647
ackPromise = promise
47-
queue.add(elem)
48+
queue.add(safeElem)
4849
// must use promise from local val because `fetchNext()` may have already erased `ackPromise`
4950
promise.future
5051
}
@@ -58,7 +59,10 @@ class ObservableBlockingIterator[T](
5859

5960
private def fetchNext(): Any = last match {
6061
case Empty =>
61-
last = blocking(queue.poll(timeout, unit))
62+
blocking(queue.poll(timeout, unit)) match {
63+
case null => throw new TimeoutException(s"timed out after ${FiniteDuration(timeout, unit)}")
64+
case elem => last = elem
65+
}
6266
val promise = ackPromise
6367
// after the queue got full, wait until at least half of its capacity is free before letting
6468
// the Observable produce more elements
@@ -81,6 +85,9 @@ class ObservableBlockingIterator[T](
8185
def next(): T = fetchNext() match {
8286
case Complete => throw new NoSuchElementException
8387
case Failed(cause) => throw cause
88+
case Null =>
89+
last = Empty
90+
null.asInstanceOf[T]
8491
case value: T@unchecked =>
8592
last = Empty
8693
value
@@ -91,6 +98,7 @@ class ObservableBlockingIterator[T](
9198
}
9299
object ObservableBlockingIterator {
93100
private object Empty
101+
private object Null
94102
private object Complete
95103
private case class Failed(ex: Throwable)
96104
}

0 commit comments

Comments
 (0)