分區(qū)(Partition)是一種針對并行、并發(fā)作業(yè)的 Stream
。
以響應式編程的方式編寫的功能組件,有一個重要的方面就是它的工作可以審慎的切塊,交由任意調(diào)度器完成。這意味著你可以很輕松的將輸入值組合沖一個工作流——在另一個線程執(zhí)行操作,然后當結(jié)果可用時將其交給子序列,完成轉(zhuǎn)化。這是 Reactor 很常見的使用模式。
DispatcherSupplier supplier1 = Environment.newCachedDispatchers(2, "groupByPool");
DispatcherSupplier supplier2 = Environment.newCachedDispatchers(5, "partitionPool");
Streams
.range(1, 10)
.groupBy(n -> n % 2 == 0) //1
.flatMap(stream -> stream
.dispatchOn(supplier1.get()) //2
.log("groupBy")
)
.partition(5) //3
.flatMap(stream -> stream
.dispatchOn(supplier2.get()) //4
.log("partition")
)
.dispatchOn(Environment.sharedDispatcher()) //5
.log("join")
.consume();
onNext(T)
信號分發(fā)給匹配的數(shù)據(jù)流。GroupByAction
,為兩個正在發(fā)送的 Stream
添加一個已經(jīng)生成好的調(diào)度器。通過像這樣使用分配于各自調(diào)度器的兩個分區(qū),數(shù)據(jù)流得到了有效的擴充。FlatMap
將合并兩個分區(qū)的返回值,這個過程運行在兩個線程之一,但絕不會并行處理。onNext(T)
信號以循環(huán)的方式分發(fā)給它們。Environment.sharedDispatcher()
而不是前兩個線程池分派數(shù)據(jù)。 五個線程將在 Dispatcher
線程合并。提取輸出
03:53:42.060 [groupByPool-3] INFO groupBy - onNext: 4
03:53:42.060 [partitionPool-8] INFO partition - onNext: 9
03:53:42.061 [groupByPool-3] INFO groupBy - onNext: 6
03:53:42.061 [partitionPool-8] INFO partition - onNext: 4
03:53:42.061 [shared-1] INFO join - onNext: 9
03:53:42.061 [groupByPool-3] INFO groupBy - onNext: 8
03:53:42.061 [partitionPool-4] INFO partition - onNext: 6
03:53:42.061 [shared-1] INFO join - onNext: 4
03:53:42.061 [groupByPool-3] INFO groupBy - onNext: 10
03:53:42.061 [shared-1] INFO join - onNext: 6
03:53:42.061 [groupByPool-3] INFO groupBy - complete: DispatcherAction
http://wiki.jikexueyuan.com/project/reactor-2.0/images/41.png" alt="" />