1 - Operator介绍

Operator介绍

和 RxJava 一样,Reactor 的强大之处在于可以在反应式流上通过声明式的方式添加多种不同的操作符。

2 - buffer操作符

buffer 操作符缓冲流中的元素

buffer

buffer 操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。

在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔

方法 buffer()仅使用一个条件,而 bufferTimeout()可以同时指定两个条件。

// 输出的是 5 个包含 20 个元素的数组
Flux.range(1, 100).buffer(20).subscribe(System.out::println);

bufferTimeout

指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()或 bufferTimeoutMillis()两个方法。

// 输出的是 2 个包含了 10 个元素的数组
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

需要注意的是,这里的代码首先通过 toStream()方法把 Flux 序列转换成 Java 8 中的 Stream 对象,再通过 forEach()方法来进行输出。这是因为序列的生成是异步的,而转换成 Stream 对象可以保证主线程在序列生成完成之前不会退出,从而可以正确地输出序列中的所有元素。

bufferUntil 和 bufferWhile

除了元素数量和时间间隔之外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。

这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象:

  • bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;

    // 输出的是 5 个包含 2 个元素的数组
    // 每当遇到一个偶数就会结束当前的收集
     Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
    
  • bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。

    // 第四行语句输出的是 5 个包含 1 个元素的数组
    // 数组里面包含的只有偶数。
     Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
    

3 - filter操作符

filter操作符对流中的元素进行过滤

filter操作符对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素。

下面代码的输出的是 1 到 10 中的所有偶数。

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

4 - window操作符

window 操作符把流中的元素收集到另外的 Flux 序列中

window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<flux>

下面的示例代码,输出结果 5 个字符。这是因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,

Flux.range(1, 100).window(20).subscribe(System.out::println);

而下面的代码的输出结果则是 2 个 UnicastProcessor 字符:

Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

5 - reduce操作符

reduce 操作符对流中包含的所有元素进行累积操作

reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。

累积操作是通过 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

下面的示例代码中对流中的元素进行相加操作,结果为 5050;

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);

下面的示例代码中也是同样的相加操作,不过通过 Supplier 给出了初始值 100,所以结果为 5050 + 100 = 5150;

Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

6 - zipWith操作符

zipWith 操作符以一对一的方式合并两个流中的元素

zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。

合并方式:不做处理

在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;

Flux.just("a", "b")
   .zipWith(Flux.just("c", "d"))
   .subscribe(System.out::println);

两个流中包含的元素分别是 a,b 和 c,d。这里 zipWith 操作符没有使用合并函数,因此结果流中的元素类型为 Tuple2。

合并方式:使用 BiFunction

zipWith 操作符也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。

Flux.just("a", "b")
   .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
   .subscribe(System.out::println);

两个流中包含的元素分别是 a,b 和 c,d。zipWith 操作通过合并函数把元素类型变为 String。

7 - take操作符

take 操作符从流中提取元素

take 系列操作符用来从当前流中提取元素。

提取的方式有

  • take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的数量或时间间隔来提取。

    // 输出的是数字 1 到 10
    Flux.range(1, 1000).take(10).subscribe(System.out::println);
    
  • takeLast(long n):提取流中的最后 N 个元素。

    // 输出的是数字 991 到 1000
    Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
    
  • takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。

    // 输出的是数字 1 到 10
    // 使得 Predicate 返回 true 的元素也是包含在内的。
    Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
    
  • takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。

    // 输出的是数字 1 到 9
    Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
    
  • takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素。

8 - merge操作符

merge 操作符将多个流合并成一个Flux序列

merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。

不同之处在于 :

  • merge 按照所有流中元素的实际产生顺序来合并
  • mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。

进行合并的流都是每隔 100 毫秒产生一个元素,不过第二个流中的每个元素的产生都比第一个流要延迟 50 毫秒。

Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
        .toStream()
        .forEach(System.out::println);

在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起。

Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
        .toStream()
        .forEach(System.out::println);

而使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素。

9 - map操作符

map操作符转换流中的每个元素

TODO

10 - flatMap操作符

flatMap 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的:

  • flatMap 按照所有流中元素的实际产生顺序来合并
  • flatMapSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
Flux.just(5, 10)
        .flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
        .toStream()
        .forEach(System.out::println);

上面的代码中,流中的元素被转换成每隔 100 毫秒产生的数量不同的流,再进行合并。由于第一个流中包含的元素数量较少,所以在结果流中一开始是两个流的元素交织在一起,然后就只有第二个流中的元素。

TODO:没理解,后面代码验证

11 - concatMap操作符

concatMap 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。

与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。

Flux.just(5, 10)
        .concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
        .toStream()
        .forEach(System.out::println);

上面的代码中,只不过把 flatMap 换成了 concatMap,结果流中依次包含了第一个流和第二个流中的全部元素。

TODO:没理解,后面代码验证

12 - combineLatest操作符

combineLatest 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

combineLatest 操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。

只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。

Flux.combineLatest(
        Arrays::toString,
        Flux.intervalMillis(100).take(5),
        Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);

上面的代码中,流中最新产生的元素会被收集到一个数组中,通过 Arrays.toString 方法来把数组转换成 String。

TODO:没理解,后面代码验证

13 - retry操作符

retry 操作符进行重试

当出现错误时,可以通过 retry 操作符来进行重试。

重试的动作是通过重新订阅序列来实现的。在使用 retry 操作符时可以指定重试的次数。

下面的代码指定了重试次数为 1,所输出的结果是 1,2,1,2 和错误信息。

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);

14 - log操作符

log 操作符记录事件

log 操作符将流相关的事件记录在日志中。

下面的代码添加了 log 操作符并指定了日志分类的名称:

Flux.range(1, 2).log("Range").subscribe(System.out::println);

在实际的运行时,所产生的输出如下所示:

13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
13:07:56.753 [main] INFO Range - | request(unbounded)
13:07:56.754 [main] INFO Range - | onNext(1)
1
13:07:56.754 [main] INFO Range - | onNext(2)
2
13:07:56.754 [main] INFO Range - | onComplete()