介绍Reactor学习笔记的基本资料和访问方式
Reactor学习笔记
- 1: 介绍
- 1.1: Reactor介绍
- 1.2: Reactor的历史
- 1.3: 资料收集
- 2: 核心概念
- 2.1: 核心概念概述
- 2.2: Publisher
- 2.2.1: Publisher介绍
- 2.3: Flux
- 2.4: Mono
- 2.5: Subscribe
- 2.5.1: Subscribe介绍
- 2.6: Operator
- 2.6.1: Operator介绍
- 2.6.2: buffer操作符
- 2.6.3: filter操作符
- 2.6.4: window操作符
- 2.6.5: reduce操作符
- 2.6.6: zipWith操作符
- 2.6.7: take操作符
- 2.6.8: merge操作符
- 2.6.9: map操作符
- 2.6.10: flatMap操作符
- 2.6.11: concatMap操作符
- 2.6.12: combineLatest操作符
- 2.6.13: retry操作符
- 2.6.14: log操作符
- 2.7: Scheduler
- 2.7.1: Scheduler介绍
1 - 介绍
1.1 - Reactor介绍
静态网站生成器,专注内容,快速创作。
Reactor是什么?
Reactor 框架是 Pivotal 公司开发的,实现了 Reactive Programming 思想,符合 Reactive Streams 规范(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司发起的)的一项开源项目。
Reactor 是完全基于反应式流规范设计和实现的库,是 Spring 5 中反应式编程的基础。
Reactor 官方的描述:
Reactive Streams based projects for backpressure-ready asynchronous message passing.
Reactor 的主要模块
Reactor 框架主要有两个主要的模块:
- reactor-core:负责 Reactive Programming 相关的核心 API 的实现
- reactor-ipc:负责高性能网络通信的实现,目前是基于 Netty 实现的。
Reactor 的主要类
在 Reactor 中,经常使用的类并不是很多,主要有以下两个:
Mono
实现了org.reactivestreams.Publisher
接口,代表0到1个元素的发布者。Flux
同样实现了org.reactivestreams.Publisher
接口,代表0到N个元素的发表者。
可能会使用到的类
Scheduler
表示背后驱动反应式流的调度器,通常由各种线程池实现。
1.2 - Reactor的历史
静态网站生成器,专注内容,快速创作。
背景:响应式编程
响应式编程(Reactive Programming)是一种新的编程范式,中文称响应式(或反应式)编程,是一种高性能应用的编程方式。其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术。在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在大家比较熟知的 Hystrix 就是以 RxJava 为基础开发的。
响应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。
比如求值一个简单的表达式 c=a+b,当 a 或者 b 的值发生变化时,传统的编程范式需要对 a+b 进行重新计算来得到 c 的值。如果使用反应式编程,当 a 或者 b 的值发生变化时,c 的值会自动更新。
在传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式。每次由调用者通过 next()方法来获取序列中的下一个值。使用反应式流时采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。
响应式编程的基础
响应式编程的基础:
- 数据流的概念
- Observable 类和它的各种操作
- 通过工厂方法创建静态和动态的Observable 对象
Observable 是事件的源头,Observer 提供了一组简单的接口,并通过订阅事件源来消费 Observable 的事件。Observable 通过 onNext 向 Observer 通知事件的到达,后面可能会跟上 onError 或 onComplete 来表示事件的结束。
反应式编程其实并不神秘,通过与我们熟悉的迭代器模式对比便可了解其基本思想:
event | Iterable (pull) | Observable (push) |
---|---|---|
retrieve data | T next() |
onNext(T) |
discover error | throws Exception |
onError(Exception) |
complete | !hasNext() |
onCompleted() |
上面表格的中的 Observable 那一列便代表反应式编程的 API 使用方式。可见,它就是常见的观察者模式的一种延伸。如果将迭代器看作是拉模式,那观测者模式便是推模式。被订阅者(Publisher)主动的推送数据给订阅者(Subscriber),触发 onNext
方法。异常和完成时触发另外两个方法。
回压(Backpressure)
反应式流中第一个重要概念是回压(backpressure)。在基本的消息推送模式中,当消息发布者产生数据的速度过快时,会使得消息订阅者的处理速度无法跟上产生的速度,从而给订阅者造成很大的压力。当压力过大时,有可能造成订阅者本身的奔溃,所产生的级联效应甚至可能造成整个系统的瘫痪。负压的作用在于提供一种从订阅者到生产者的反馈渠道。订阅者可以通过 request()方法来声明其一次所能处理的消息数量,而生产者就只会产生相应数量的消息,直到下一次 request()方法调用。这实际上变成了推拉结合的模式。
背压是反应流中的一个重要概念,可以理解为,生产者可以感受到消费者反馈的消费压力,并根据压力进行动态调整生产速率。形象点可以按照下面理解:
回压是 RS 规范和 Reactor 主要关注点之一(如果还有其它关注点的话)。回压的原理是说,在一个推送场景里,生产者的生产速度比消费者的消费速度快,消费者会向生产者发出信号说“嘿,慢一点,我处理不过来了。”生产者可以借机控制数据生成的速度,而不是抛弃数据或者冒着产生级联错误的风险继续生成数据。
你也许会想,在 Mono 里为什么也需要回压:什么样的消费者会被一个单独的触发事件压垮?答案是“应该不会有这样的消费者”。不过,在 Mono 和 CompletableFuture 工作原理之间仍然有一个关键的不同点。后者只有推送:如果你持有一个 Future 的引用,那么说明一个异步任务已经在执行了。另一方面,回压的 Flux 或 Mono 会启动延迟的拉取 - 推送迭代:
- 延迟是因为在调用 subscribe() 方法之前不会发生任何事情
- 拉取是因为在订阅和发出请求时,Subscriber 会向上游发出信号,准备拉取下一个数据块
- 接下来生产者向消费者推送数据,这些数据在消费者的请求范围之内
对 Mono 来说,subscribe() 方法就相当于一个按钮,按下它就等于说“我准备好接收数据了”。Flux 也有一个类似的按钮,不过它是 request(n) 方法,这个方法是 subscribe() 的一般化用法。
Mono 作为一个 Publisher,它往往代表着一个耗费资源的任务(在 IO、延迟等方面),意识到这点是理解回压的关键:如果不对其进行订阅,你就不需要为之付出任何代价。因为 Mono 经常跟具有回压的 Flux 一起被编排到一个响应式链上,来自多个异步数据源的结果有可能被组合到一起,这种按需触发的能力是避免阻塞的关键。
我们可以使用回压来区分 Mono 的不同使用场景,相比上述的例子,Mono 有另外一个常见的使用场景:把 Flux 的数据异步地聚合到 Mono 里。reduce 和 hasElement 可以消费 Flux 里的每一个元素,再把这些数据以某种形式聚合起来(分别是 reduce 函数的调用结果和一个 boolean 值),作为一个 Mono 对外暴露数据。在这种情况下,使用 Long.MAX_VALUE 向上游发出回压信号,上游会以完全推送的方式工作。
关于回压另一个有意思的话题是它如何对存储在内存里的流的对象数量进行限制。作为一个 Publisher,数据源很有可能出现生成数据缓慢的问题,而来自下游的请求超出了可用数据项。在这种情况下,整个流很自然地进入到推送模式,消费者会在有新数据到达时收到通知。当生产高峰来临,或者在生产速度加快的情况下,整个流又回到了拉取模式。在以上两种情况下,最多有 N 项数据(request() 请求的数据量)会被保留在内存里。
你可以对内存的使用情况进行更精确的推算,把 N 项数据跟每项数据需要消耗的内存 W 结合起来:这样你就可以推算出最多需要消耗 W*N 的内存。实际上,Reactor 在大多数情况下会根据 N 来做出优化:根据情况创建内部队列,并应用预取策略,每次自动请求 75% 的数据量。
Reactor 的操作有时候会根据它们所代表的语义和调用者的期望来改变回压信号。例如对于操作 buffer(10):下游请求 N 项数据,而这个操作会向上游请求 10N 的数据量,这样就可以填满缓冲区,为订阅者提供足够的数据。这通常被称为“主动式回压”,开发人员可以充分利用这种特性,例如在微批次场景里,可以显式地告诉 Reactor 该如何从一个输入源切换到一个输出地。
Imperative vs Reactive
对于 Iterable 和 Observale 两种风格,还有另一个称呼,便是 Imperative(指令式编程)和 Reactive(反应式编程)这两种风格。其实就是拉模型和推模型的另一种表述。
对于 Imperative,老外写的文章有时会用,直译就是指令式编程,其实就是我们大家平时用 Java、Python 等语言写代码的常见风格,代码执行顺序和编写顺序基本一致(这里不考虑 JVM 指令重排)
发展历程
Reactive Extensions (Rx) 库
反应式编程最早由 .NET 平台上的 Reactive Extensions (Rx) 库来实现。
后来迁移到 Java 平台之后就产生了著名的 RxJava 库,并产生了很多其他编程语言上的对应实现。在这些实现的基础上产生了后来的反应式流(Reactive Streams)规范。
RxJava
RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。
不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前,虽然可以和反应式流的接口进行转换,但是由于底层实现的原因,使用起来并不是很直观。
在 RxJava 1 里,只有少部分操作支持回压,RxJava 1 的 Observable 并没有实现 RS 里的任何类型,不过它有一些 RS 类型的适配器。可以说,RxJava 1 实际上比 RS 规范出现得更早,而且在 RS 规范设计期间,RxJava 1 充当了函数式工作者的角色。
正如“RxJava 实例解析”里所说的,从设计概念方面来看,RxJava 有点类似 Java 8 Steams API。
Reactive Streams 规范
在 Java 平台上,Netflix(开发了 RxJava)、TypeSafe(开发了 Scala、Akka)、Pivatol(开发了 Spring、Reactor)共同制定了一个被称为 Reactive Streams 项目(规范),用于制定反应式编程相关的规范以及接口。其主要的接口有这三个:
Publisher
Subscriber
Subcription
其中,Subcriber
中包含 onNext
、onError
、onCompleted
这三个方法。
该规范定义了反应式流的相关接口,并将集成到 Java 9 中。
Reactive Streams (简称为 RS)是“一种规范,它为基于非阻塞回压的异步流处理提供了标准”。它是一组包含了 TCK 工具套件和四个简单接口(Publisher、Subscriber、Subscription 和 Processor)的规范,这些接口将被集成到 Java 9.
RS 主要跟响应式回压(稍后会详细介绍)以及多个响应式事件源之间的交互操作有关。它并不提供任何操作方法,它只关注流的生命周期。
RxJava 2
RxJava 2 在 RxJava 的基础上做了很多的更新。
RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。
RxJava 2 是在 RS 规范之后出现的,所以它直接在 Flowable 类型里实现了 Publisher。不过除了 RS 类型,RxJava 2 还保留了 RxJava 1 的“遗留”类型(Observable、Completable 和 Single)并且引入了其它一些可选类型——Maybe。这些类型提供了不同的语义,不过它们并没有实现 RS 接口,这是它们的不足之处。
跟 RxJava 1 不一样,RxJava 2 的 Observable 不支持 RxJava 2 的回压协议(只有 Flowable 具备这个特性)。之所以这样设计是为了能够为一些场景提供一组丰富且流畅的 API,比如用户界面发出的事件,在这样的场景里是不需要用到回压的,而且也不可能用到。Completable、Single 和 Maybe 不需要支持回压,不过它们也提供了一组丰富的 API,而且在被订阅之前不会做任何事情。
Reactor
Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。
Reactor 是第四代响应式框架,跟RxJava 2 有些相似。Reactor 项目由Pivotal 启动,以响应式流规范、Java8 和ReactiveX 术语表为基础。它的设计是Reactor 2(上一个主要版本)和RxJava 核心贡献者共同努力的结果。
Reactor 不同于其它框架的最关键一点就是 RS。Flux 和 Mono 这两者都是 RS 的 Publisher 实现,它们都具备了响应式回压的特点。Mono 最多只触发一个事件,它跟 RxJava 的 Single 和 Maybe 类似,所以可以把 Mono用于在异步任务完成时发出通知。
因为这两种类型之间的简单区别,我们可以很容易地区分响应式 API 的类型:从返回的类型我们就可以知道一个方法会“发射并忘记”或“请求并等待”(Mono),还是在处理一个包含多个数据项的流(Flux)。
Flux 和 Mono 的一些操作利用了这个特点在这两种类型间互相转换。例如,调用 Flux的 single() 方法将返回一个 Mono,而使用 concatWith() 方法把两个 Mono 串在一起就可以得到一个 Flux。类似地,有些操作对 Mono 来说毫无意义(例如 take(n) 会得到 n>1 的结果),而有些操作只有作用在 Mono 上才有意义(例如 or(otherMono))。
Reactor 设计的原则之一是要保持 API 的精简,而对这两种响应式类型的分离,是表现力与 API 易用性之间的折中。
“使用响应式流,基于 Rx 构建”
正如“RxJava 实例解析”里所说的,从设计概念方面来看,RxJava 有点类似 Java 8 Steams API。而 Reactor 看起来有点像 RxJava,不过这决不只是个巧合。这样的设计是为了能够给复杂的异步逻辑提供一套原生的具有 Rx 操作风格的响应式流 API。所以说 Reactor 扎根于响应式流,同时在 API 方面尽可能地与 RxJava 靠拢。
在响应式领域,Reactor 变得愈加精益,它的 Mono 和 Flux 两种类型都实现了 Publisher,并且都支持回压。虽然把 Mono 作为一个 Publisher 需要付出一些额外的开销,不过 Mono 在其它方面的优势弥补了它的缺点。在后续部分我们将看到对 Mono 来说回压意味着什么。
Spring5
Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。
Reactor 是 Spring 整个生态系统的基础,特别是 Spring 5(通过 Spring Web Reactive)和 Spring Data “kay”(跟 spring-data-commons 2.0 相对应的)。
这两个项目的响应式版本是非常有用的,我们因此可以开发出完全响应式的 Web 应用:异步地处理请求,一直到数据库,最后异步地返回结果。Spring 应用因此可以更有效地利用资源,避免为每个请求单独分配一个线程,还要等待 I/O 阻塞。
Reactor 将被用于未来 Spring 应用的内部响应式核心组件,以及这些 Spring 组件暴露出来的 API。一般情况下,它们可以处理 RS Publisher,不过大多数时候它们要面对的是 Flux/Mono,需要用到 Reactor 的丰富特性。当然,你也可以自行选择其它响应式框架,Reactor 提供了可以用来适配其它 Reactor 类型和 RxJava 类型甚至简单的 RS 类型的钩子接口。
另外,虽然 Spring API 是以 Reactor 类型为基础的,不过在 Spring Web Reactive 模块里可以为请求和响应使用各种各样的响应式类型:
- Mono:作为 @RequestBody,请求实体 T 会被异步反序列化,之后的处理可以跟 Mono 关联起来。作为返回类型,每次 Mono 发出了一个值,T 就会被异步序列化并发回客户端。你可以把请求 Mono 作为参数,并把参数化了的关联处理作为结果 Mono 返回。
- Flux:在流场景里使用(作为 @RequestBody 使用的输入流以及包含了 Flux 返回类型的 Server Sent Events)。
- Single/Observable:分别对应 Mono 和 Flux,不过会切换回 RxJava。
- Mono 作为返回类型:在 Mono 结束时请求的处理也跟着完成。
- 非响应式返回类型(void 和 T):这个时候你的 @Controller 方法是同步的,不过它应该是非阻塞的(短暂的处理)。请求处理在方法执行完毕时结束,返回的 T 被异步地序列化并发回客户端。
参考资料
1.3 - 资料收集
官方资料
-
官方提供的学习和练习资料
社区资料
学习资料
-
Intro To Reactor Core: 2020
-
Java反应式框架Reactor中的Mono和Flux: 2020, 写mono和flux最好的文章之一
-
并发编程之reactor : 2018
-
使用 Reactor 进行反应式编程: 2017,文章有一点早,但内容不错
-
Reactor模式详解+源码实现: 2017
-
Spring Reactor 入门与实践: 2017
-
Reactor 实例解析: 2016,文章很长,很细致
2 - 核心概念
2.1 - 核心概念概述
核心概念
Flux 和 Mono
Flux 和 Mono 是 Reactor 中的两个基本概念。
-
Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。
-
Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。
Flux 和 Mono 之间可以进行转换:
- 对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象
- 把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。
2.2 - Publisher
2.2.1 - Publisher介绍
由于响应流的特点,我们不能再返回一个简单的POJO对象来表示结果了。必须返回一个类似Java中的Future
的概念,在有结果可用时通知消费者进行消费响应。
Reactive Stream规范中这种被定义为Publisher<T>
,Publisher<T>
是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>
的需求推送元素。一个Publisher<T>
可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。下面这个Excel计算就能说明一些Publisher<T>
的特点。
A1-A9就可以看做Publisher<T>
及其提供的元素序列。A10-A13分别是求和函数SUM(A1:A9)
、平均函数AVERAGE(A1:A9)
、最大值函数MAX(A1:A9)
、最小值函数MIN(A1:A9)
,可以看作订阅者Subscriber
。假如说我们没有A10-A13,那么A1-A9就没有实际意义,它们并不产生计算。这也是响应式的一个重要特点:当没有订阅时发布者什么也不做。
而Flux
和Mono
都是Publisher<T>
在Reactor 3实现。Publisher<T>
提供了subscribe
方法,允许消费者在有结果可用时进行消费。如果没有消费者Publisher<T>
不会做任何事情,他根据消费情况进行响应。 Publisher<T>
可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型Mono
和Flux
。
参考资料
2.3 - Flux
2.3.1 - Flux介绍
Flux 类似 RaxJava 的 Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。
Flux
是一个发出(emit) 0-N
个元素组成的异步序列的 Publisher<T>
,可以被 onComplete
信号或者 onError
信号所终止。在响应流规范中存在三种给下游消费者调用的方法 onNext
, onComplete
, 和 onError
。
下图表示了Flux的抽象模型:
~
参考资料
2.3.2 - 创建Flux
有多种不同的方式可以创建 Flux 序列。
静态方法
通过 Flux 类中的静态方法:
- just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
- fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
- empty():创建一个不包含任何元素,只发布结束消息的序列。
- error(Throwable error):创建一个只包含错误消息的序列。
- never():创建一个不包含任何消息通知的序列。
- range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
- interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
- intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
静态方法适合生成简单的序列,当需要复杂的逻辑时,则应该使用 generate() 或 create() 方法。
generate() 方法
generate() 方法通过同步和逐一的方式来产生 Flux 序列。
-
同步是指序列的产生是通过调用所提供的 SynchronousSink 对象的 next(),complete()和 error(Throwable)方法来完成的。
-
逐一生成的含义是在具体的生成逻辑中,next() 方法只能最多被调用一次。
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
有状态系列的生成
在有些情况下,序列的生成可能是有状态的,需要用到某些状态对象。此时可以使用 generate() 方法的另外一种形式 generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator)
,其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);
create()方法
create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。
FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。下面的代码在一次调用中就产生了全部的 10 个元素:
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);
2.4 - Mono
2.4.1 - Mono介绍
Mono 最多只触发一个事件,它跟 RxJava 的 Single 和 Maybe 类似,所以可以把 Mono用于在异步任务完成时发出通知。
Mono
是一个发出(emit)0-1
个元素的Publisher<T>
,可以被onComplete
信号或者onError
信号所终止。
参考资料
2.4.2 - 创建Mono
静态方法
Mono 的创建方式与 Flux 比较相似。Mono 类中也包含了一些与 Flux 类中相同的静态方法。这些方法包括 just(),empty(),error()和 never()等。
除了这些方法之外,Mono 还有一些独有的静态方法:
-
fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
-
delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
-
ignoreElements(Publisher source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
-
justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
代码示例:
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
create()
可以通过 create()方法来使用 MonoSink 来创建 Mono:
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
2.5 - Subscribe
2.5.1 - Subscribe介绍
2.6 - Operator
2.6.1 - Operator介绍
和 RxJava 一样,Reactor 的强大之处在于可以在反应式流上通过声明式的方式添加多种不同的操作符。
2.6.2 - buffer操作符
buffer
buffer 操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。
在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。
方法 buffer()仅使用一个条件,而 bufferTimeout()可以同时指定两个条件。
// 输出的是 5 个包含 20 个元素的数组
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
bufferTimeout
指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()或 bufferTimeoutMillis()两个方法。
// 输出的是 2 个包含了 10 个元素的数组
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
需要注意的是,这里的代码首先通过 toStream()方法把 Flux 序列转换成 Java 8 中的 Stream 对象,再通过 forEach()方法来进行输出。这是因为序列的生成是异步的,而转换成 Stream 对象可以保证主线程在序列生成完成之前不会退出,从而可以正确地输出序列中的所有元素。
bufferUntil 和 bufferWhile
除了元素数量和时间间隔之外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。
这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象:
-
bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;
// 输出的是 5 个包含 2 个元素的数组 // 每当遇到一个偶数就会结束当前的收集 Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
-
bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。
// 第四行语句输出的是 5 个包含 1 个元素的数组 // 数组里面包含的只有偶数。 Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
2.6.3 - filter操作符
filter操作符对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素。
下面代码的输出的是 1 到 10 中的所有偶数。
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
2.6.4 - window操作符
window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<flux>
。
下面的示例代码,输出结果 5 个字符。这是因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,
Flux.range(1, 100).window(20).subscribe(System.out::println);
而下面的代码的输出结果则是 2 个 UnicastProcessor 字符:
Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);
2.6.5 - reduce操作符
reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。
累积操作是通过 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。
下面的示例代码中对流中的元素进行相加操作,结果为 5050;
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
下面的示例代码中也是同样的相加操作,不过通过 Supplier 给出了初始值 100,所以结果为 5050 + 100 = 5150;
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
2.6.6 - zipWith操作符
zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。
合并方式:不做处理
在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"))
.subscribe(System.out::println);
两个流中包含的元素分别是 a,b 和 c,d。这里 zipWith 操作符没有使用合并函数,因此结果流中的元素类型为 Tuple2。
合并方式:使用 BiFunction
zipWith 操作符也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
.subscribe(System.out::println);
两个流中包含的元素分别是 a,b 和 c,d。zipWith 操作通过合并函数把元素类型变为 String。
2.6.7 - take操作符
take 系列操作符用来从当前流中提取元素。
提取的方式有
-
take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的数量或时间间隔来提取。
// 输出的是数字 1 到 10 Flux.range(1, 1000).take(10).subscribe(System.out::println);
-
takeLast(long n):提取流中的最后 N 个元素。
// 输出的是数字 991 到 1000 Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
-
takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。
// 输出的是数字 1 到 10 // 使得 Predicate 返回 true 的元素也是包含在内的。 Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
-
takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。
// 输出的是数字 1 到 9 Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
-
takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素。
2.6.8 - merge操作符
merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。
不同之处在于 :
- merge 按照所有流中元素的实际产生顺序来合并
- mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
进行合并的流都是每隔 100 毫秒产生一个元素,不过第二个流中的每个元素的产生都比第一个流要延迟 50 毫秒。
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
.toStream()
.forEach(System.out::println);
在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起。
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
.toStream()
.forEach(System.out::println);
而使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素。
2.6.9 - map操作符
TODO
2.6.10 - flatMap操作符
flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。
flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的:
- flatMap 按照所有流中元素的实际产生顺序来合并
- flatMapSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
Flux.just(5, 10)
.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
上面的代码中,流中的元素被转换成每隔 100 毫秒产生的数量不同的流,再进行合并。由于第一个流中包含的元素数量较少,所以在结果流中一开始是两个流的元素交织在一起,然后就只有第二个流中的元素。
TODO:没理解,后面代码验证
2.6.11 - concatMap操作符
concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。
与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。
Flux.just(5, 10)
.concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
上面的代码中,只不过把 flatMap 换成了 concatMap,结果流中依次包含了第一个流和第二个流中的全部元素。
TODO:没理解,后面代码验证
2.6.12 - combineLatest操作符
combineLatest 操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。
只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。
Flux.combineLatest(
Arrays::toString,
Flux.intervalMillis(100).take(5),
Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);
上面的代码中,流中最新产生的元素会被收集到一个数组中,通过 Arrays.toString 方法来把数组转换成 String。
TODO:没理解,后面代码验证
2.6.13 - retry操作符
当出现错误时,可以通过 retry 操作符来进行重试。
重试的动作是通过重新订阅序列来实现的。在使用 retry 操作符时可以指定重试的次数。
下面的代码指定了重试次数为 1,所输出的结果是 1,2,1,2 和错误信息。
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println);
2.6.14 - log操作符
log 操作符将流相关的事件记录在日志中。
下面的代码添加了 log 操作符并指定了日志分类的名称:
Flux.range(1, 2).log("Range").subscribe(System.out::println);
在实际的运行时,所产生的输出如下所示:
13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
13:07:56.753 [main] INFO Range - | request(unbounded)
13:07:56.754 [main] INFO Range - | onNext(1)
1
13:07:56.754 [main] INFO Range - | onNext(2)
2
13:07:56.754 [main] INFO Range - | onComplete()