内容摘录自官方文档 Producing and Consuming Messages 一节
你可以通过简单地编写函数并将其作为 “@Bean " 公开,来编写 Spring Cloud Stream 应用程序。你也可以使用基于 Spring Integration 注解的配置或基于 Spring Cloud Stream 注解的配置,不过从 spring-cloud-stream 3.x 开始,我们建议使用函数实现。
内容摘录自官方文档 Producing and Consuming Messages 一节
你可以通过简单地编写函数并将其作为 “@Bean " 公开,来编写 Spring Cloud Stream 应用程序。你也可以使用基于 Spring Integration 注解的配置或基于 Spring Cloud Stream 注解的配置,不过从 spring-cloud-stream 3.x 开始,我们建议使用函数实现。
内容摘录自官方文档 Spring Cloud Function support 一节
内容摘录自官方文档 Overview 一节
自 Spring Cloud Stream v2.1以来,定义流处理程序和源的另一个选择是使用 Spring Cloud Function 的内置支持,在那里它们可以被表达为 java.util.function.[Supplier/Function/Consumer]
类型的 bean。
要指定哪个函数式 bean 绑定到绑定所暴露的外部目标,你必须提供 spring.cloud.function.define
属性。
如果你只有 java.util.function.[Supplier/Function/Consumer]
类型的单个Bean,你可以跳过 spring.cloud.function.define
属性,因为这种函数 Bean 会被自动发现。然而,使用这种属性被认为是最好的做法,以避免任何混淆。有些时候,这种自动发现可能会妨碍工作,因为 java.util.function.[Supplier/Function/Consumer]
类型的单体 Bean 可能有处理消息以外的目的,但由于是单体,它被自动发现并自动绑定。对于这些罕见的情况,你可以通过提供 spring.cloud.stream.function.autodetect
属性来禁用自动发现,其值设置为 false。
下面是应用程序将消息处理程序暴露为 java.util.function.Function
的例子,通过充当数据的消费者和生产者,有效地支持直通语义。
@SpringBootApplication
public class MyFunctionBootApp {
public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class);
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}
在前面的例子中,我们定义了一个名为 toUpperCase 的 java.util.function.Function
类型的 bean,作为消息处理程序,其 “input” 和 “output” 必须绑定到所提供的目标绑定器所暴露的外部目的地。默认情况下,‘inpu’ 和 ‘output’ binding name 将是 toUpperCase-in-0
和 toUpperCase-0
。请参阅函数绑定名称部分,了解用于建立绑定名称的命名规则的细节。
下面是支持其他语义的简单功能应用的例子。
下面是一个以 java.util.function.Supplier
形式暴露的 source 语义的例子:
@SpringBootApplication
public static class SourceFromSupplier {
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
下面是一个以 java.util.function.Consumer
形式暴露的 sink 语义的例子
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
root
topic1
subtopic
topic2
subtopic
内容摘录自官方文档 Suppliers (Sources) 一节
当涉及到它们的调用如何被触发时,Function
和 Consumer
是非常直接的。它们是根据发送到它们所绑定的目的地的数据(事件)来触发的。换句话说,它们是典型的事件驱动型组件。
然而,当谈到触发时,Supplier
属于自己的类别。因为根据定义,它是数据的源头(the origin),它不订阅任何绑定的目的地,因此,必须由其他机制来触发。还有一个 Supplier
实现的问题,它可以是命令式的(imperative),也可以是反应性的(reactive),它直接关系到这些 Supplier 的触发。
请看下面的例子:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
前面的 Supplier
Bean 在调用其 get() 方法时产生一个字符串。然而,谁会调用这个方法,多久调用一次?框架提供了一个默认的轮询机制(回答了 “谁?“的问题),它将触发 Supplier v的调用,默认情况下,它每隔一秒就会调用一次(回答了 “多久一次?“的问题)。换句话说,上述配置每秒钟产生一条消息,每条消息都被发送到一个由 binder 暴露的 output
目的地。要了解如何定制轮询机制,请看轮询配置属性部分。
考虑一个不同的例子:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
上面的 Supplier
Bean采用了反应式编程风格。通常情况下,与命令式 Supplier 不同,它应该只被触发一次,因为调用它的 get() 方法会产生(供应)连续的消息流,而不是单个消息。
该框架认识到了编程风格的不同,并保证这样的 Supplier 只被触发一次。
然而,想象一下这样的用例:你想轮询一些数据源并返回代表结果集的有限数据流。反应式编程风格是这种 Supplier 的完美机制。然而,考虑到产生的数据流的有限性,这样的 Supplier 仍然需要被定期调用。
考虑一下下面的例子,它通过产生一个有限的数据流来模拟这种用例:
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
Bean本身被注解了 PollableBean
注解(@Bean的子集),从而向框架发出信号,尽管这样一个 Supplier 的实现是反应式的,但它仍然需要被轮询。
正如你现在所了解的,与 Function 和 Consumer 不同,Function 和 Consumer 是由事件触发的(它们有输入数据),Supplier 没有任何输入,因此由不同的机制– poller 触发,它可能有一个不可预知的线程机制。虽然大多数时候线程机制的细节与函数的下游执行无关,但在某些情况下可能会出现问题,特别是对于那些可能对线程亲和力有一定期望的集成框架。例如,Spring Cloud Sleuth 就依赖于存储在 thread local 中的追踪数据。对于这些情况,我们有另一种通过 StreamBridge 的机制,用户可以对线程机制有更多的控制。你可以在发送任意数据到输出端(例如Foreign事件驱动源)一节中获得更多细节。
内容摘录自官方文档 Consumer (Reactive) 一节
Reactive Consumer
有点特别,因为它的返回类型是空的,没有给框架留下可以订阅的引用。你很可能不需要写 Consumer<Flux<?>
,而是把它写成 Function<Flux<?>, Mono<Void>>
,调用 then
operator 作为你流中的最后一个 operator。
比如说。
public Function<Flux<?>, Mono<Void>>consumer() {
return flux -> flux.map(..).filter(..).then();
}
但如果你确实需要写一个显式的 Consumer<Flux<?>
,记得要订阅传入的 Flux
。
另外,请记住,当混合反应式和命令式函数时,同样的规则也适用于函数组合。Spring Cloud Function 确实支持将反应式函数与命令式函数进行组合,但是你必须注意到某些限制。例如,假设你将反应式函数与命令式消费者进行组合。这种组合的结果是一个反应式 Consumer
。然而,正如本节前面所讨论的那样,没有办法订阅这样的消费者,所以这个限制只能通过使你的消费者成为反应式并手动订阅(如前所述),或者将你的函数改为命令式来解决。
以下属性由 Spring Cloud Stream 公开(尽管自3.2版本起已被废弃),并以 spring.cloud.stream.poller
为前缀。
fixedDelay
默认轮询器的固定延迟,单位是毫秒。
默认值:1000L。
maxMessagesPerPoll
默认轮询器的每个轮询事件的最大信息。
默认值:1L。
cron
Cron触发器的Cron表达式值。
默认值:无。
initialDelay
定期触发器的初始延迟。
默认值:0。
timeUnit
应用于延迟值的时间单位。
默认值。MILLISECONDS。
例如 --spring.cloud.stream.poller.fixed-delay=2000
设置轮询器的间隔为每两秒轮询一次。
org.springframework.boot.autoconfigure.integration.IntegrationProperties.Poller
以了解更多信息。
上一节展示了如何配置一个将应用于所有绑定的默认轮询器。虽然它很适合 spring-cloud-stream 设计的微服务模型,即每个微服务代表一个组件(例如,供应商),因此默认轮询器配置就足够了,但在一些边缘情况下,你可能有几个组件需要不同的轮询配置。
在这种情况下,请使用按绑定方式配置轮询器。在这种情况下,你可以使用 spring.cloud.stream.bindings.supply-out-0.producer.poller...
前缀为这种绑定配置轮询器(例如,spring.cloud.bindings.supply-out-0.producer.poller.fixed-delay=2000
)。
内容摘录自官方文档 Sending arbitrary data to an output (e.g. Foreign event-driven sources) 一节
将任意的数据发送到输出端(如 Foreign 的事件驱动源)。
有些情况下,实际的数据源可能来自于不是绑定器的外部(Foreign)系统。例如,数据源可能是一个经典的 REST 端点。我们如何将这样的 source 与 spring-cloud-stream 使用的函数机制连接起来?
Spring Cloud Stream 提供了两种机制,让我们来详细了解一下。
在这里,对于这两个样本,我们将使用一个标准的MVC端点方法,名为 delegateToSupplier
,与根 Web 上下文绑定,通过StreamBridge 机制将传入的请求委托给流。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.source=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream-out-0", body);
}
}
在这里,我们自动装箱(Autowire)了一个 StreamBridge
Bean,它允许我们将数据发送到 output binding,有效地将非流应用程序与 spring-cloud-stream
连接起来。请注意,前面的例子没有定义任何 source 函数(例如,Supplier Bean),因此框架没有触发器来提前创建源绑定,这在配置包含函数 Bean 的情况下是很典型的。这很好,因为StreamBridge会在第一次调用send(.)操作时为非现有的绑定启动创建输出绑定(以及必要时的目的地自动配置),并将其缓存起来供后续重用(更多细节见StreamBridge和动态目的地)。
然而,如果你想在初始化(启动)时预先创建一个输出绑定,你可以从spring.cloud.stream.source属性中获益,在那里你可以声明你的源的名称。所提供的名称将被用作触发器来创建一个源绑定。所以在前面的例子中,输出绑定的名称将是toStream-out-0,这与函数使用的绑定命名惯例一致(见绑定和绑定名称)。你可以使用;来表示多个源(多个输出绑定)(例如,–spring.cloud.stream.source=foo;bar)
另外,注意streamBridge.send(..)方法需要一个对象作为数据。这意味着你可以向它发送POJO或消息,它在发送输出时将经过同样的程序,就像它来自任何函数或供应商一样,提供与函数相同的一致性。这意味着输出类型的转换、分区等都会被尊重,就像它是由函数产生的输出一样。