这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

生产和消费消息

Spring Cloud Stream 生产和消费消息

内容摘录自官方文档 Producing and Consuming Messages 一节

你可以通过简单地编写函数并将其作为 “@Bean " 公开,来编写 Spring Cloud Stream 应用程序。你也可以使用基于 Spring Integration 注解的配置或基于 Spring Cloud Stream 注解的配置,不过从 spring-cloud-stream 3.x 开始,我们建议使用函数实现。

1 - Spring Cloud Function

Spring Cloud Stream 的 Spring Cloud Function 支持

内容摘录自官方文档 Spring Cloud Function support 一节

1.1 - Spring Cloud Function支持概述

Spring Cloud Stream 的 Spring Cloud Function 支持概述

内容摘录自官方文档 Overview 一节

自 Spring Cloud Stream v2.1以来,定义流处理程序和源的另一个选择是使用 Spring Cloud Function 的内置支持,在那里它们可以被表达为 java.util.function.[Supplier/Function/Consumer] 类型的 bean。

要指定哪个函数式 bean 绑定到绑定所暴露的外部目标,你必须提供 spring.cloud.function.define 属性。

如果你只有 java.util.function.[Supplier/Function/Consumer] 类型的单个Bean,你可以跳过 spring.cloud.function.define 属性,因为这种函数 Bean 会被自动发现。然而,使用这种属性被认为是最好的做法,以避免任何混淆。有些时候,这种自动发现可能会妨碍工作,因为 java.util.function.[Supplier/Function/Consumer] 类型的单体 Bean 可能有处理消息以外的目的,但由于是单体,它被自动发现并自动绑定。对于这些罕见的情况,你可以通过提供 spring.cloud.stream.function.autodetect 属性来禁用自动发现,其值设置为 false。

下面是应用程序将消息处理程序暴露为 java.util.function.Function 的例子,通过充当数据的消费者和生产者,有效地支持直通语义。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在前面的例子中,我们定义了一个名为 toUpperCase 的 java.util.function.Function 类型的 bean,作为消息处理程序,其 “input” 和 “output” 必须绑定到所提供的目标绑定器所暴露的外部目的地。默认情况下,‘inpu’ 和 ‘output’ binding name 将是 toUpperCase-in-0toUpperCase-0。请参阅函数绑定名称部分,了解用于建立绑定名称的命名规则的细节。

下面是支持其他语义的简单功能应用的例子。

下面是一个以 java.util.function.Supplier 形式暴露的 source 语义的例子:

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

下面是一个以 java.util.function.Consumer 形式暴露的 sink 语义的例子

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}
root
  topic1
    subtopic
  topic2
    subtopic

1.2 - Suppliers (Sources)

Spring Cloud Function 之 Suppliers (Sources)

内容摘录自官方文档 Suppliers (Sources) 一节

当涉及到它们的调用如何被触发时,FunctionConsumer 是非常直接的。它们是根据发送到它们所绑定的目的地的数据(事件)来触发的。换句话说,它们是典型的事件驱动型组件。

然而,当谈到触发时,Supplier 属于自己的类别。因为根据定义,它是数据的源头(the origin),它不订阅任何绑定的目的地,因此,必须由其他机制来触发。还有一个 Supplier 实现的问题,它可以是命令式的(imperative),也可以是反应性的(reactive),它直接关系到这些 Supplier 的触发。

请看下面的例子:

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

前面的 Supplier Bean 在调用其 get() 方法时产生一个字符串。然而,谁会调用这个方法,多久调用一次?框架提供了一个默认的轮询机制(回答了 “谁?“的问题),它将触发 Supplier v的调用,默认情况下,它每隔一秒就会调用一次(回答了 “多久一次?“的问题)。换句话说,上述配置每秒钟产生一条消息,每条消息都被发送到一个由 binder 暴露的 output 目的地。要了解如何定制轮询机制,请看轮询配置属性部分。

考虑一个不同的例子:

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

上面的 Supplier Bean采用了反应式编程风格。通常情况下,与命令式 Supplier 不同,它应该只被触发一次,因为调用它的 get() 方法会产生(供应)连续的消息流,而不是单个消息。

该框架认识到了编程风格的不同,并保证这样的 Supplier 只被触发一次。

然而,想象一下这样的用例:你想轮询一些数据源并返回代表结果集的有限数据流。反应式编程风格是这种 Supplier 的完美机制。然而,考虑到产生的数据流的有限性,这样的 Supplier 仍然需要被定期调用。

考虑一下下面的例子,它通过产生一个有限的数据流来模拟这种用例:

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

Bean本身被注解了 PollableBean 注解(@Bean的子集),从而向框架发出信号,尽管这样一个 Supplier 的实现是反应式的,但它仍然需要被轮询。

Supplier & 线程

正如你现在所了解的,与 Function 和 Consumer 不同,Function 和 Consumer 是由事件触发的(它们有输入数据),Supplier 没有任何输入,因此由不同的机制– poller 触发,它可能有一个不可预知的线程机制。虽然大多数时候线程机制的细节与函数的下游执行无关,但在某些情况下可能会出现问题,特别是对于那些可能对线程亲和力有一定期望的集成框架。例如,Spring Cloud Sleuth 就依赖于存储在 thread local 中的追踪数据。对于这些情况,我们有另一种通过 StreamBridge 的机制,用户可以对线程机制有更多的控制。你可以在发送任意数据到输出端(例如Foreign事件驱动源)一节中获得更多细节。

1.3 - Consumer (Reactive)

Spring Cloud Function 之 Consumer (Reactive)

内容摘录自官方文档 Consumer (Reactive) 一节

Reactive Consumer 有点特别,因为它的返回类型是空的,没有给框架留下可以订阅的引用。你很可能不需要写 Consumer<Flux<?>,而是把它写成 Function<Flux<?>, Mono<Void>>,调用 then operator 作为你流中的最后一个 operator。

比如说。

public Function<Flux<?>, Mono<Void>>consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但如果你确实需要写一个显式的 Consumer<Flux<?>,记得要订阅传入的 Flux

另外,请记住,当混合反应式和命令式函数时,同样的规则也适用于函数组合。Spring Cloud Function 确实支持将反应式函数与命令式函数进行组合,但是你必须注意到某些限制。例如,假设你将反应式函数与命令式消费者进行组合。这种组合的结果是一个反应式 Consumer。然而,正如本节前面所讨论的那样,没有办法订阅这样的消费者,所以这个限制只能通过使你的消费者成为反应式并手动订阅(如前所述),或者将你的函数改为命令式来解决。

轮询配置属性

以下属性由 Spring Cloud Stream 公开(尽管自3.2版本起已被废弃),并以 spring.cloud.stream.poller 为前缀。

  • fixedDelay

    默认轮询器的固定延迟,单位是毫秒。

    默认值:1000L。

  • maxMessagesPerPoll

    默认轮询器的每个轮询事件的最大信息。

    默认值:1L。

  • cron

    Cron触发器的Cron表达式值。

    默认值:无。

  • initialDelay

    定期触发器的初始延迟。

    默认值:0。

  • timeUnit

    应用于延迟值的时间单位。

    默认值。MILLISECONDS。

    例如 --spring.cloud.stream.poller.fixed-delay=2000 设置轮询器的间隔为每两秒轮询一次。

每个绑定的轮询配置

上一节展示了如何配置一个将应用于所有绑定的默认轮询器。虽然它很适合 spring-cloud-stream 设计的微服务模型,即每个微服务代表一个组件(例如,供应商),因此默认轮询器配置就足够了,但在一些边缘情况下,你可能有几个组件需要不同的轮询配置。

在这种情况下,请使用按绑定方式配置轮询器。在这种情况下,你可以使用 spring.cloud.stream.bindings.supply-out-0.producer.poller...前缀为这种绑定配置轮询器(例如,spring.cloud.bindings.supply-out-0.producer.poller.fixed-delay=2000)。

1.4 - 将任意的数据发送到输出端

Spring Cloud Function 之将任意的数据发送到输出端

内容摘录自官方文档 Sending arbitrary data to an output (e.g. Foreign event-driven sources) 一节

将任意的数据发送到输出端(如 Foreign 的事件驱动源)。

有些情况下,实际的数据源可能来自于不是绑定器的外部(Foreign)系统。例如,数据源可能是一个经典的 REST 端点。我们如何将这样的 source 与 spring-cloud-stream 使用的函数机制连接起来?

Spring Cloud Stream 提供了两种机制,让我们来详细了解一下。

在这里,对于这两个样本,我们将使用一个标准的MVC端点方法,名为 delegateToSupplier,与根 Web 上下文绑定,通过StreamBridge 机制将传入的请求委托给流。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.source=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream-out-0", body);
	}
}

在这里,我们自动装箱(Autowire)了一个 StreamBridge Bean,它允许我们将数据发送到 output binding,有效地将非流应用程序与 spring-cloud-stream 连接起来。请注意,前面的例子没有定义任何 source 函数(例如,Supplier Bean),因此框架没有触发器来提前创建源绑定,这在配置包含函数 Bean 的情况下是很典型的。这很好,因为StreamBridge会在第一次调用send(.)操作时为非现有的绑定启动创建输出绑定(以及必要时的目的地自动配置),并将其缓存起来供后续重用(更多细节见StreamBridge和动态目的地)。

然而,如果你想在初始化(启动)时预先创建一个输出绑定,你可以从spring.cloud.stream.source属性中获益,在那里你可以声明你的源的名称。所提供的名称将被用作触发器来创建一个源绑定。所以在前面的例子中,输出绑定的名称将是toStream-out-0,这与函数使用的绑定命名惯例一致(见绑定和绑定名称)。你可以使用;来表示多个源(多个输出绑定)(例如,–spring.cloud.stream.source=foo;bar)

另外,注意streamBridge.send(..)方法需要一个对象作为数据。这意味着你可以向它发送POJO或消息,它在发送输出时将经过同样的程序,就像它来自任何函数或供应商一样,提供与函数相同的一致性。这意味着输出类型的转换、分区等都会被尊重,就像它是由函数产生的输出一样。