1 - 核心概念概述

核心概念概述

核心概念

Flux 和 Mono

Flux 和 Mono 是 Reactor 中的两个基本概念。

  • Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。

  • Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。

Flux 和 Mono 之间可以进行转换:

  • 对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象
  • 把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

2 - Publisher

Reactor Publisher生产者

2.1 - Publisher介绍

Publisher介绍

由于响应流的特点,我们不能再返回一个简单的POJO对象来表示结果了。必须返回一个类似Java中的Future的概念,在有结果可用时通知消费者进行消费响应。

Reactive Stream规范中这种被定义为Publisher<T>Publisher<T>是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>的需求推送元素。一个Publisher<T>可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。下面这个Excel计算就能说明一些Publisher<T>的特点。

A1-A9就可以看做Publisher<T>及其提供的元素序列。A10-A13分别是求和函数SUM(A1:A9)、平均函数AVERAGE(A1:A9)、最大值函数MAX(A1:A9)、最小值函数MIN(A1:A9),可以看作订阅者Subscriber。假如说我们没有A10-A13,那么A1-A9就没有实际意义,它们并不产生计算。这也是响应式的一个重要特点:当没有订阅时发布者什么也不做

FluxMono都是Publisher<T>Reactor 3实现。Publisher<T>提供了subscribe方法,允许消费者在有结果可用时进行消费。如果没有消费者Publisher<T>不会做任何事情,他根据消费情况进行响应。 Publisher<T>可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型MonoFlux

参考资料

3 - Flux

Reactor Flux

3.1 - Flux介绍

Flux介绍

Flux 类似 RaxJava 的 Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。

Flux 是一个发出(emit) 0-N 个元素组成的异步序列的 Publisher<T>,可以被 onComplete 信号或者 onError 信号所终止。在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和 onError

下图表示了Flux的抽象模型:

~

参考资料

3.2 - 创建Flux

创建Flux

有多种不同的方式可以创建 Flux 序列。

静态方法

通过 Flux 类中的静态方法:

  • just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
  • fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
  • empty():创建一个不包含任何元素,只发布结束消息的序列。
  • error(Throwable error):创建一个只包含错误消息的序列。
  • never():创建一个不包含任何消息通知的序列。
  • range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
  • interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
  • intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);

静态方法适合生成简单的序列,当需要复杂的逻辑时,则应该使用 generate() 或 create() 方法。

generate() 方法

generate() 方法通过同步和逐一的方式来产生 Flux 序列。

  • 同步是指序列的产生是通过调用所提供的 SynchronousSink 对象的 next(),complete()和 error(Throwable)方法来完成的。

  • 逐一生成的含义是在具体的生成逻辑中,next() 方法只能最多被调用一次。

Flux.generate(sink -> {
    sink.next("Hello");
    sink.complete();
}).subscribe(System.out::println);

有状态系列的生成

在有些情况下,序列的生成可能是有状态的,需要用到某些状态对象。此时可以使用 generate() 方法的另外一种形式 generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator),其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。

final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
    int value = random.nextInt(100);
    list.add(value);
    sink.next(value);
    if (list.size() == 10) {
        sink.complete();
    }
    return list;
}).subscribe(System.out::println);

create()方法

create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。

FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。下面的代码在一次调用中就产生了全部的 10 个元素:

Flux.create(sink -> {
    for (int i = 0; i < 10; i++) {
        sink.next(i);
    }
    sink.complete();
}).subscribe(System.out::println);

4 - Mono

Reactor Mono

4.1 - Mono介绍

Mono介绍

Mono 最多只触发一个事件,它跟 RxJava 的 Single Maybe 类似,所以可以把 Mono用于在异步任务完成时发出通知。

Mono 是一个发出(emit)0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。

参考资料

4.2 - 创建Mono

创建Mono

静态方法

Mono 的创建方式与 Flux 比较相似。Mono 类中也包含了一些与 Flux 类中相同的静态方法。这些方法包括 just(),empty(),error()和 never()等。

除了这些方法之外,Mono 还有一些独有的静态方法:

  • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。

  • delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。

  • ignoreElements(Publisher source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。

  • justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。

代码示例:

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

create()

可以通过 create()方法来使用 MonoSink 来创建 Mono:

Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

5 - Subscribe

Reactor Subscribe订阅者

5.1 - Subscribe介绍

Subscribe介绍

6 - Operator

Reactor Operator操作符

6.1 - Operator介绍

Operator介绍

和 RxJava 一样,Reactor 的强大之处在于可以在反应式流上通过声明式的方式添加多种不同的操作符。

6.2 - buffer操作符

buffer 操作符缓冲流中的元素

buffer

buffer 操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。

在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔

方法 buffer()仅使用一个条件,而 bufferTimeout()可以同时指定两个条件。

// 输出的是 5 个包含 20 个元素的数组
Flux.range(1, 100).buffer(20).subscribe(System.out::println);

bufferTimeout

指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()或 bufferTimeoutMillis()两个方法。

// 输出的是 2 个包含了 10 个元素的数组
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

需要注意的是,这里的代码首先通过 toStream()方法把 Flux 序列转换成 Java 8 中的 Stream 对象,再通过 forEach()方法来进行输出。这是因为序列的生成是异步的,而转换成 Stream 对象可以保证主线程在序列生成完成之前不会退出,从而可以正确地输出序列中的所有元素。

bufferUntil 和 bufferWhile

除了元素数量和时间间隔之外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。

这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象:

  • bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;

    // 输出的是 5 个包含 2 个元素的数组
    // 每当遇到一个偶数就会结束当前的收集
     Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
    
  • bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。

    // 第四行语句输出的是 5 个包含 1 个元素的数组
    // 数组里面包含的只有偶数。
     Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
    

6.3 - filter操作符

filter操作符对流中的元素进行过滤

filter操作符对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素。

下面代码的输出的是 1 到 10 中的所有偶数。

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

6.4 - window操作符

window 操作符把流中的元素收集到另外的 Flux 序列中

window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<flux>

下面的示例代码,输出结果 5 个字符。这是因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,

Flux.range(1, 100).window(20).subscribe(System.out::println);

而下面的代码的输出结果则是 2 个 UnicastProcessor 字符:

Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

6.5 - reduce操作符

reduce 操作符对流中包含的所有元素进行累积操作

reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。

累积操作是通过 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

下面的示例代码中对流中的元素进行相加操作,结果为 5050;

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);

下面的示例代码中也是同样的相加操作,不过通过 Supplier 给出了初始值 100,所以结果为 5050 + 100 = 5150;

Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

6.6 - zipWith操作符

zipWith 操作符以一对一的方式合并两个流中的元素

zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。

合并方式:不做处理

在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;

Flux.just("a", "b")
   .zipWith(Flux.just("c", "d"))
   .subscribe(System.out::println);

两个流中包含的元素分别是 a,b 和 c,d。这里 zipWith 操作符没有使用合并函数,因此结果流中的元素类型为 Tuple2。

合并方式:使用 BiFunction

zipWith 操作符也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。

Flux.just("a", "b")
   .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
   .subscribe(System.out::println);

两个流中包含的元素分别是 a,b 和 c,d。zipWith 操作通过合并函数把元素类型变为 String。

6.7 - take操作符

take 操作符从流中提取元素

take 系列操作符用来从当前流中提取元素。

提取的方式有

  • take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的数量或时间间隔来提取。

    // 输出的是数字 1 到 10
    Flux.range(1, 1000).take(10).subscribe(System.out::println);
    
  • takeLast(long n):提取流中的最后 N 个元素。

    // 输出的是数字 991 到 1000
    Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
    
  • takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。

    // 输出的是数字 1 到 10
    // 使得 Predicate 返回 true 的元素也是包含在内的。
    Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
    
  • takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。

    // 输出的是数字 1 到 9
    Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
    
  • takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素。

6.8 - merge操作符

merge 操作符将多个流合并成一个Flux序列

merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。

不同之处在于 :

  • merge 按照所有流中元素的实际产生顺序来合并
  • mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。

进行合并的流都是每隔 100 毫秒产生一个元素,不过第二个流中的每个元素的产生都比第一个流要延迟 50 毫秒。

Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
        .toStream()
        .forEach(System.out::println);

在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起。

Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
        .toStream()
        .forEach(System.out::println);

而使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素。

6.9 - map操作符

map操作符转换流中的每个元素

TODO

6.10 - flatMap操作符

flatMap 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的:

  • flatMap 按照所有流中元素的实际产生顺序来合并
  • flatMapSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
Flux.just(5, 10)
        .flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
        .toStream()
        .forEach(System.out::println);

上面的代码中,流中的元素被转换成每隔 100 毫秒产生的数量不同的流,再进行合并。由于第一个流中包含的元素数量较少,所以在结果流中一开始是两个流的元素交织在一起,然后就只有第二个流中的元素。

TODO:没理解,后面代码验证

6.11 - concatMap操作符

concatMap 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。

与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。

Flux.just(5, 10)
        .concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
        .toStream()
        .forEach(System.out::println);

上面的代码中,只不过把 flatMap 换成了 concatMap,结果流中依次包含了第一个流和第二个流中的全部元素。

TODO:没理解,后面代码验证

6.12 - combineLatest操作符

combineLatest 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

combineLatest 操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。

只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。

Flux.combineLatest(
        Arrays::toString,
        Flux.intervalMillis(100).take(5),
        Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);

上面的代码中,流中最新产生的元素会被收集到一个数组中,通过 Arrays.toString 方法来把数组转换成 String。

TODO:没理解,后面代码验证

6.13 - retry操作符

retry 操作符进行重试

当出现错误时,可以通过 retry 操作符来进行重试。

重试的动作是通过重新订阅序列来实现的。在使用 retry 操作符时可以指定重试的次数。

下面的代码指定了重试次数为 1,所输出的结果是 1,2,1,2 和错误信息。

Flux.just(1, 2)
        .concatWith(Mono.error(new IllegalStateException()))
        .retry(1)
        .subscribe(System.out::println);

6.14 - log操作符

log 操作符记录事件

log 操作符将流相关的事件记录在日志中。

下面的代码添加了 log 操作符并指定了日志分类的名称:

Flux.range(1, 2).log("Range").subscribe(System.out::println);

在实际的运行时,所产生的输出如下所示:

13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
13:07:56.753 [main] INFO Range - | request(unbounded)
13:07:56.754 [main] INFO Range - | onNext(1)
1
13:07:56.754 [main] INFO Range - | onNext(2)
2
13:07:56.754 [main] INFO Range - | onComplete()

7 - Scheduler

Reactor Scheduler调度器

7.1 - Scheduler介绍

Scheduler介绍