-
Notifications
You must be signed in to change notification settings - Fork 1
IntersectionSeq
The IntersectionSeq[K,V] class is used to create an intersection of multiple sequences. These streams are joined by their keys and only keys which are present in all the sequences are included. The values in the intersection stream is a list of all the values associate with the same key.
Because of the parallel nature of the algorithm used, the conditional tailrec pattern (see Recursion) had to be extended. So it is worth looking at the implementation. But first lets see how to use IntersectionSeq.
Synchronized test code.
val seqs = new java.util.ArrayList[Sequence[Int, Int]]
val range = Range(1, 200)
seqs.add(new FilterSeq(range, (x: Int) => x % 2 == 0))
seqs.add(new FilterSeq(range, (x: Int) => x % 3 == 0))
seqs.add(new FilterSeq(range, (x: Int) => x % 5 == 0))
val intersection = new IntersectionSeq(null, seqs)
Future(intersection, Loop((key: Int, value: java.util.List[Int]) => println(key+" "+value)))
Output.
30 [30, 30, 30]
60 [60, 60, 60]
90 [90, 90, 90]
120 [120, 120, 120]
150 [150, 150, 150]
180 [180, 180, 180]
Asynchronous test code.
val seqs = new java.util.ArrayList[Sequence[Int, Int]]
val range2 = new Range(new Mailbox, null, 1, 200)
val range3 = new Range(new Mailbox, null, 1, 200)
val range5 = new Range(new Mailbox, null, 1, 200)
seqs.add(new FilterSeq(range2, (x: Int) => x % 2 == 0))
seqs.add(new FilterSeq(range3, (x: Int) => x % 3 == 0))
seqs.add(new FilterSeq(range5, (x: Int) => x % 5 == 0))
val intersection = new IntersectionSeq(new Mailbox, seqs)
Future(intersection, Loop((key: Int, value: java.util.List[Int]) => println(key+" "+value)))
(Output is the same as the above.)
##Cursor
IntersectionSeq uses the Cursor class as a wrapper for the sequences it operates on. The Cursor class tracks the last KVPair result returned. Cursor objects are also comparable based on the key of the last returned KVPair.
Cursor objects are created with the same mailbox as the IntersectionSeq object, allowing the IntersectionSeq object to directly access the state of the Cursor objects.
Because the IntersectionSeq object depends on the state of the Cursor objects, it is not safe for actors with different mailboxes to use the same IntersectionSeq object.