1 - 介绍

Spring Cloud 的介绍,以及 Spring Cloud 的资料收集

1.1 - Spring Cloud 介绍

Spring Cloud 介绍

Spring Cloud 是什么?

Spring Cloud为开发者提供了快速构建分布式系统中一些常见模式的工具(例如,配置管理、服务发现、熔断器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)。分布式系统的协调导致了锅炉板模式(boiler plate patterns),使用Spring Cloud,开发者可以快速建立实现这些模式的服务和应用。它们在任何分布式环境中都能很好地工作,包括开发人员自己的笔记本电脑、裸金属数据中心和管理平台(如Cloud Foundry)。

Spring Cloud专注于为典型的用例和扩展机制提供良好的开箱即用体验,以涵盖其他情况。

  • 分布式/版本化配置
  • 服务注册和发现
  • 路由
  • 服务间调用
  • 负载均衡
  • 熔断器
  • 全局锁
  • 领导选举和集群状态
  • 分布式消息传递

1.2 - Spring Cloud 子项目

Spring Cloud 子项目介绍

特别关注的项目

Spring Cloud Config

由 git 仓库支持的集中式外部配置管理。配置资源直接映射到 Spring Environment,但如果需要的话,也可以被非 Spring 应用使用。

Spring cloud Cluster

领导选举和常见的有状态模式,对Zookeeper、Redis、Hazelcast、Consul进行抽象和实现。

Spring Cloud Data Flow

为现代运行时上的可组合微服务应用提供云原生协调服务。易于使用的DSL、拖放式GUI和REST-APIs共同简化了基于数据管道的微服务的整体协调工作。

Spring Cloud Data Stream

一个轻量级的事件驱动的微服务框架,可以快速构建可以连接到外部系统的应用程序。使用Apache Kafka或RabbitMQ在Spring Boot应用程序之间发送和接收消息的简单声明性模型。

Spring Cloud Data Stream Application

Spring Cloud Stream应用程序是开箱即用的Spring Boot应用程序,使用Spring Cloud Stream中的绑定器抽象,提供与外部中间件系统的集成,如Apache Kafka、RabbitMQ等。

Spring Cloud Function

Spring Cloud Function促进了通过函数实现业务逻辑。它支持跨无服务器提供商的统一编程模型,以及独立运行的能力(本地或PaaS)。

比较关注的项目

Spring Cloud Bus

一个通过分布式消息传递将服务和服务实例联系起来的事件总线。适用于在集群中传播状态变化(例如,配置变更事件)。

Spring Cloud Open Service Broker

为构建一个实现 Open Service Broker API 的服务代理提供一个起点。

Spring Cloud Consul

使用Hashicorp Consul的服务发现和配置管理。

Spring Cloud Sleuth

为Spring Cloud应用程序提供分布式跟踪,与Zipkin、HTrace和基于日志(如ELK)的跟踪兼容。

Spring Cloud Connectors

使得各种平台中的PaaS应用能够轻松连接到数据库和消息代理等后端服务(该项目以前称为 “Spring Cloud”)。

暂时先不关注的项目

Spring Cloud Gateway

Spring Cloud Gateway是一个基于Project Reactor的智能和可编程的路由器。

Spring Cloud OpenFeign

Spring Cloud OpenFeign通过自动配置和与Spring环境及其他Spring编程模型成语的绑定,为Spring Boot应用提供集成。

不关注的项目

Spring Cloud Cloudfoundry

将您的应用程序与Pivotal Cloud Foundry集成。提供了一个服务发现的实现,也使得实现SSO和OAuth2保护的资源变得容易。

Spring Cloud Netflix

与各种Netflix OSS组件(Eureka、Hystrix、Zuul、Archaius等)集成。

Spring Cloud Security

在Zuul代理中提供对负载平衡的OAuth2休息客户端和认证头转发的支持。

Spring Cloud Starters

Spring Boot风格的启动项目,便于Spring Cloud的消费者进行依赖性管理。(在Angel.SR2之后停止作为项目,并与其他项目合并。)

Spring Cloud CLI

Spring Boot CLI插件,用于在Groovy中快速创建Spring Cloud组件应用程序。

Spring Cloud Task

一个短暂的微服务框架,用于快速构建执行有限数量数据处理的应用程序。简单的声明性,用于向Spring Boot应用添加功能和非功能特性。

Spring Cloud Task App Starters

Spring Cloud Task App Starters是Spring Boot应用程序,可以是任何进程,包括Spring Batch作业,不会永远运行,它们在有限的数据处理期后结束/停止。

Spring Cloud Zookeeper

使用Apache Zookeeper进行服务发现和配置管理。

Spring Cloud Pipelines

Spring Cloud Pipelines提供了一个有意见的部署管道,其步骤可以确保你的应用程序可以以零停机时间的方式进行部署,并且在出现问题时可以轻松回滚。

Spring Cloud Contract

Spring Cloud Contract是一个总括性项目,包含了帮助用户成功实施消费者驱动合同方法的解决方案。

1.3 - 资料收集

收集 Spring Cloud 的各种资料

官方资料

官方网站:

社区资料

社区网站:

2 - Spring Cloud Stream

Spring Cloud Stream 项目

2.1 - 概述

Spring Cloud Stream 项目概述

介绍

资料

Spring Cloud Stream是一个框架,用于构建高度可扩展的事件驱动的微服务的,这些服务与共享消息系统相连。

该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的Spring习语和最佳实践之上,包括对持久化 pub/sub 语义、消费者组和有状态分区的支持。

项目背景

内容来自官方文档:

Spring的数据集成之旅始于Spring Integration。通过其编程模型,它为开发人员提供了一致的开发经验,以构建可以包含企业集成模式以与外部系统(例如数据库,消息代理等)连接的应用程序。

快进到云时代,微服务已在企业环境中变得突出。Spring Boot改变了开发人员构建应用程序的方式。借助Spring的编程模型和Spring Boot处理的运行时职责,无缝开发了基于生产,生产级Spring的独立微服务。

为了将其扩展到数据集成工作负载,Spring Integration和Spring Boot被放到一个新项目中。Spring Cloud Stream出生了。

架构

binder 的实现

Spring Cloud Stream 支持多种 binder 实现,下表包括 GitHub 项目的链接:

Spring Cloud Stream的核心构件是:

  • Destination Binders(目的地绑定器):负责提供与外部消息系统集成的组件。

  • Destination Bindings(目的地绑定):在外部消息系统和最终用户提供的应用程序代码(生产者/消费者)之间建立桥梁。

  • 消息。生产者和消费者用来与Destination Binders(从而通过外部消息系统与其他应用程序)进行通信的典型数据结构。

2.2 - Spring Cloud Stream 项目背景

Spring Cloud Stream 项目背景以及和 Spring Messaging 和 Spring Integration 这两个项目的关系

官方文档的说明

在 Spring Cloud Stream 的官方文档中,有这么一段说明:

A Brief History of Spring’s Data Integration Journey

Spring’s journey on Data Integration started with Spring Integration. With its programming model, it provided a consistent developer experience to build applications that can embrace Enterprise Integration Patterns to connect with external systems such as, databases, message brokers, and among others.

Spring的数据集成之旅始于Spring Integration。通过其编程模型,它为开发人员提供了一致的开发经验,以构建可以包含 企业集成模式 ,用来与外部系统(例如数据库,消息代理等)连接的应用程序。

Fast forward to the cloud-era, where microservices have become prominent in the enterprise setting. Spring Boot transformed the way how developers built Applications. With Spring’s programming model and the runtime responsibilities handled by Spring Boot, it became seamless to develop stand-alone, production-grade Spring-based microservices.

快进到云时代,微服务已在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,无缝开发了基于spring的生产级的独立微服务。

To extend this to Data Integration workloads, Spring Integration and Spring Boot were put together into a new project. Spring Cloud Stream was born.

为了将其扩展到数据集成工作负载,Spring Integration和Spring Boot被放到一个新项目中。Spring Cloud Stream 诞生了。

来自阿里云文章的说明

来自 阿里巴巴云原生公众号 的文章 (作者 洛夜) 更详细的阐述了 Spring Cloud Stream 项目的背景,尤其是 Spring Cloud Stream 项目和 Spring Messaging 和 Spring Integration 这两个项目的关系。

Spring Cloud Stream 体系及原理介绍

以下内容摘录自该文章

Spring Messaging

Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。

  • 比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header:

    messaging-model

    package org.springframework.messaging;
    public interface Message<T> {
        T getPayload();
        MessageHeaders getHeaders();
    }
    
  • 消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:

    messaging-channel

    @FunctionalInterface
    public interface MessageChannel {
        long INDEFINITE_TIMEOUT = -1;
        default boolean send(Message<?> message) {
             return send(message, INDEFINITE_TIMEOUT);
         }
         boolean send(Message<?> message, long timeout);
    }
    

消息通道里的消息如何被消费呢?

  • 由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅:

    public interface SubscribableChannel extends MessageChannel {
        boolean subscribe(MessageHandler handler);
        boolean unsubscribe(MessageHandler handler);
    }
    
  • 由MessageHandler真正地消费/处理消息:

    @FunctionalInterface
    public interface MessageHandler {
        void handleMessage(Message<?> message) throws MessagingException;
    }
    

Spring Messaging 内部在消息模型的基础上衍生出了其它的一些功能,如:

  • 消息接收参数及返回值处理:消息接收参数处理器HandlerMethodArgumentResolver配合@Header, @Payload等注解使用;消息接收后的返回值处理器HandlerMethodReturnValueHandler配合@SendTo注解使用;
  • 消息体内容转换器MessageConverter;
  • 统一抽象的消息发送模板AbstractMessageSendingTemplate;
  • 消息通道拦截器ChannelInterceptor;

Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了 MessageChannel 和 MessageHandler 的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel 和 MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

这里为大家介绍几种消息的处理方式:

  • 消息的分割:

    4.png

  • 消息的聚合:

    5.png

  • 消息的过滤:

    6.png

  • 消息的分发:

    7.png

接下来,我们以一个最简单的例子来尝试一下 Spring Integration。

这段代码解释为:

SubscribableChannel messageChannel = new DirectChannel(); // 1

messageChannel.subscribe(msg-> { // 2
 System.out.println("receive: " +msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3
  • 构造一个可订阅的消息通道messageChannel。
  • 使用MessageHandler去消费这个消息通道里的消息。
  • 发送一条消息到这个消息通道,消息最终被消息通道里的MessageHandler所消费。
  • 最后控制台打印出:receive: msg from alibaba。

DirectChannel内部有个UnicastingDispatcher类型的消息分发器,会分发到对应的消息通道MessageChannel中,从名字也可以看出来,UnicastingDispatcher是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了LoadBalancingStrategy负载均衡策略,默认只有轮询的实现,可以进行扩展。

我们对上段代码做一点修改,使用多个 MessageHandler 去处理消息:

SubscribableChannel messageChannel = new DirectChannel();

messageChannel.subscribe(msg -> {
     System.out.println("receive1: " + msg.getPayload());
});

messageChannel.subscribe(msg -> {
     System.out.println("receive2: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

由于DirectChannel内部的消息分发器是UnicastingDispatcher单播的方式,并且采用轮询的负载均衡策略,所以这里两次的消费分别对应这两个MessageHandler。控制台打印出:

receive1: msg from alibaba
receive2: msg from alibaba

既然存在单播的消息分发器UnicastingDispatcher,必然也会存在广播的消息分发器,那就是BroadcastingDispatcher,它被 PublishSubscribeChannel 这个消息通道所使用。广播消息分发器会把消息分发给所有的 MessageHandler:

SubscribableChannel messageChannel = new PublishSubscribeChannel();

messageChannel.subscribe(msg -> {
     System.out.println("receive1: " + msg.getPayload());
});

messageChannel.subscribe(msg -> {
     System.out.println("receive2: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());

Spring Cloud Stream

SCS 与各模块之间的关系是:

  • SCS 在 Spring Integration 的基础上进行了封装,提出了Binder, Binding, @EnableBinding, @StreamListener等概念。
  • SCS 与 Spring Boot Actuator 整合,提供了/bindings, /channels endpoint。
  • SCS 与 Spring Boot Externalized Configuration 整合,提供了BindingProperties, BinderProperties等外部化配置类。
  • SCS 增强了消息发送失败的和消费失败情况下的处理逻辑等功能。
  • SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。

Binder是提供与外部消息中间件集成的组件,为构造Binding提供了 2 个方法,分别是bindConsumer和bindProducer,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了 RocketMQ Binder。

8.png

从图中可以看出,Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。我们来看一个最简单的使用 RocketMQ Binder 的例子,然后分析一下它的底层处理原理:

  • 启动类及消息的发送:

    @SpringBootApplication
    @EnableBinding({ Source.class, Sink.class }) // 1
    public class SendAndReceiveApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SendAndReceiveApplication.class, args);
        }
    
           @Bean // 2
        public CustomRunner customRunner() {
            return new CustomRunner();
        }
    
        public static class CustomRunner implements CommandLineRunner {
    
            @Autowired
            private Source source;
    
            @Override
            public void run(String... args) throws Exception {
                int count = 5;
                for (int index = 1; index <= count; index++) {
                    source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3
                }
            }
        }
    }
    
  • 消息的接收:

    @Service
    public class StreamListenerReceiveService {
    
        @StreamListener(Sink.INPUT) // 4
        public void receiveByStreamListener1(String receiveMsg) {
            System.out.println("receiveByStreamListener: " + receiveMsg);
        }
    }
    

这段代码很简单,没有涉及到 RocketMQ 相关的代码,消息的发送和接收都是基于 SCS 体系完成的。如果想切换成 RabbitMQ 或 Kafka,只需修改配置文件即可,代码无需修改。

我们来分析下这段代码的原理:

  1. @EnableBinding对应的两个接口属性Source和Sink是 SCS 内部提供的。SCS 内部会基于Source和Sink构造BindableProxyFactory,且对应的 output 和 input 方法返回的 MessageChannel 是DirectChannel。output 和 input 方法修饰的注解对应的 value 是配置文件中 binding 的 name。
public interface Source {
    String OUTPUT = "output";
    @Output(Source.OUTPUT)
    MessageChannel output();
}
public interface Sink {
    String INPUT = "input";
    @Input(Sink.INPUT)
    SubscribableChannel input();
}

配置文件里 bindings 的 name 为 output 和 input,对应Source和Sink接口的方法上的注解里的 value:

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1
  1. 构造CommandLineRunner,程序启动的时候会执行CustomRunner的run方法。
  2. 调用Source接口里的 output 方法获取DirectChannel,并发送消息到这个消息通道中。这里跟之前 Spring Integration 章节里的代码一致。
  • Source 里的 output 发送消息到DirectChannel消息通道之后会被AbstractMessageChannelBinder#SendingHandler这个MessageHandler处理,然后它会委托给AbstractMessageChannelBinder#createProducerMessageHandler创建的 MessageHandler 处理(该方法由不同的消息中间件实现)。
  • 不同的消息中间件对应的AbstractMessageChannelBinder#createProducerMessageHandler方法返回的 MessageHandler 内部会把 Spring Message 转换成对应中间件的 Message 模型并发送到对应中间件的 broker。
  1. 使用@StreamListener进行消息的订阅。请注意,注解里的Sink.input对应的值是 “input”,会根据配置文件里 binding 对应的 name 为 input 的值进行配置:
  • 不同的消息中间件对应的AbstractMessageChannelBinder#createConsumerEndpoint方法会使用 Consumer 订阅消息,订阅到消息后内部会把中间件对应的 Message 模型转换成 Spring Message。
  • 消息转换之后会把 Spring Message 发送至 name 为 input 的消息通道中。
  • @StreamListener对应的StreamListenerMessageHandler订阅了 name 为 input 的消息通道,进行了消息的消费。

这个过程文字描述有点啰嗦,用一张图总结一下(黄色部分涉及到各消息中间件的 Binder 实现以及 MQ 基本的订阅发布功能):

9.png

SCS 章节的最后,我们来看一段 SCS 关于消息的处理方式的一段代码:

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {
     System.out.println("receive by headers['index']=='1': " + msg);
}

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {
     System.out.println("receive Person: " + person);
}

@StreamListener(value = Sink.INPUT)
public void receiveAllMsg(String msg) {
     System.out.println("receive allMsg by StreamListener. content: " + msg);
}

@StreamListener(value = Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {
     System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}

有没有发现这段代码跟 Spring MVC Controller 中接收请求的代码很像? 实际上他们的架构都是类似的,Spring MVC 对于 Controller 中参数和返回值的处理类分别是org.springframework.web.method.support.HandlerMethodArgumentResolver、org.springframework.web.method.support.HandlerMethodReturnValueHandler。

Spring Messaging 中对于参数和返回值的处理类之前也提到过,分别是org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver、org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler。

它们的类名一模一样,甚至内部的方法名也一样。

总结

SCS 是 Spring Integration 的加强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。

值得特别关注的是:

  • “它屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。”

  • “通过其编程模型,它为开发人员提供了一致的开发经验”

  • “代码很简单,没有涉及到 RocketMQ 相关的代码,消息的发送和接收都是基于 SCS 体系完成的。如果想切换成 RabbitMQ 或 Kafka,只需修改配置文件即可,代码无需修改。”

这个理念和 Dapr 的 pub/sub 构建块的设计初衷很相似。

类似的总结在这个文章中也有提到:

官方文档中文版!Spring Cloud Stream 快速入门

Spring Cloud Stream 解决什么问题:

  • 无感知的使用消息中间件

    Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。

  • 中间件和服务的高度解耦

    Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

2.3 - 主要概念

Spring Cloud Stream 的主要概念和应用模型

内容摘录自官方文档 Main Concepts 一节

Spring Cloud Stream提供了许多抽象和原语,简化了消息驱动的微服务应用的编写。本节对以下内容进行了概述。

  • Spring Cloud Stream的应用模型

  • 绑定器抽象

  • 持久的发布-订阅支持

  • 消费者组支持

  • 分区支持

  • 可插拔的Binder SPI

2.3.1 - 应用模型

Spring Cloud Stream 的应用模型

官方文档的描述

内容摘录自官方文档 Application Model 一节

Spring Cloud Stream 应用程序由一个中间件中立的核心组成。应用程序通过在外部 broker 暴露的目的地和你代码中的输入/输出参数之间建立 binding (绑定) 来与外部世界进行通信。建立 binding 所需的特定 broker 细节由特定于中间件的 Binder 实现来处理。

SCSt-with-binder

2.3.2 - binder抽象

Spring Cloud Stream 应用模型中的binder抽象

官方文档的描述

内容摘录自官方文档 The Binder Abstraction 一节

Spring Cloud Stream 为 Kafka 和 Rabbit MQ 提供了 Binder 的实现。该框架还包括一个测试 binder ,用于将你的应用程序作为 spring-cloud-stream 应用程序进行集成测试。

Binder 抽象也是该框架的扩展点之一,这意味着你可以在 Spring Cloud Stream 之上实现自己的 binder。

Spring Cloud Stream 使用 Spring Boot 进行配置,Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接到中间件。例如,部署者可以在运行时动态地选择外部目的地(如Kafka topic 或RabbitMQ exchange)与消息处理程序的输入和输出(如函数的输入参数及其返回参数)之间的映射。这种配置可以通过外部配置属性和 Spring Boot 支持的任何形式提供(包括应用程序参数、环境变量和application.yml 或 application.properties 文件)。在介绍Spring Cloud Stream部分的 sink 示例中,将 spring.cloud.stream.bindings.input.destination 应用属性设置为 raw-sensor-data 会导致它从 raw-sensor-data Kafka topic 或从绑定到 raw-sensor-data RabbitMQ exchange 的队列中读取。

Spring Cloud Stream 会自动检测并使用在 classpath上 找到的 binder。你可以在相同的代码中使用不同类型的中间件。要做到这一点,在构建时包括一个不同的 binder。对于更复杂的用例,你也可以将多个 binder 与你的应用程序打包,让它在运行时选择 binder(甚至是是否为不同的 binding 使用不同的 binder)。

2.3.3 - 持久化pub-sub支持

Spring Cloud Stream 应用模型中的持久化发布订阅支持

官方文档的描述

内容摘录自官方文档 Persistent Publish-Subscribe Support 一节

应用程序之间的通信遵循 发布-订阅 (publish-subscribe model)模型,数据通过共享主题进行广播。这可以从下图中看出,该图显示了一组相互交流的 Spring Cloud Stream 应用程序的典型部署。

SCSt-with-binder

由传感器报告给HTTP端点的数据被发送到一个名为 raw-sensor-data 的共同目的地。从目的地开始,它被一个计算时间窗口平均数的微服务应用程序和另一个将原始数据摄入HDFS(Hadoop分布式文件系统)的微服务应用程序独立处理。为了处理数据,这两个应用程序都在运行时声明该主题为其输入。

发布-订阅通信模型降低了生产者和消费者的复杂性,并让新的应用程序被添加到拓扑结构中,而不会破坏现有的流程。例如,在平均计算应用程序的下游,你可以添加一个计算最高温度值的应用程序,用于显示和监控。然后,你可以添加另一个应用程序,解释相同的平均数流以进行故障检测。通过共享主题而不是点对点队列进行所有通信,可以减少微服务之间的耦合。

虽然发布-订阅消息的概念并不新鲜,但Spring Cloud Stream采取了额外的措施,使之成为其应用模型的意见选择。通过使用原生中间件支持,Spring Cloud Stream还简化了发布-订阅模型在不同平台上的使用。

2.3.4 - consumer group

Spring Cloud Stream 应用模型中的consumer group

官方文档的描述

内容摘录自官方文档 Consumer Groups 一节

虽然发布-订阅模型使得通过共享主题连接应用程序变得容易,但通过创建特定应用程序的多个实例来扩大规模的能力同样重要。当这样做时,应用程序的不同实例被置于竞争的消费者关系中,其中只有一个实例被期望处理一个给定的消息。

Spring Cloud Stream 通过 consumer group (消费者组)的概念来模拟这种行为。(Spring Cloud Stream consumer group 与Kafka consumer group 相似,并受其启发。) 每个消费者 binding 可以使用 spring.cloud.stream.bindings.<bindingName>.group 属性来指定一个组名。对于下图所示的消费者,这个属性将被设置为 spring.cloud.stream.bindings.<bindingName>.group=hdfsWritespring.cloud.stream.bindings.<bindingName>.group=average

SCSt-with-binder

所有订阅给定目的地的组都会收到一份已发布数据的副本,但每个组中只有一个成员收到来自该目的地的给定消息。默认情况下,当没有指定组时,Spring Cloud Stream 会将应用程序分配给一个匿名的、独立的单成员 consumer group,该组与所有其他 consumer group 都是发布-订阅关系。

消费者类型

支持两种类型的消费者:

  • 消息驱动(有时称为异步)

  • 轮询(有时称为同步)。

在2.0版本之前,只支持异步的消费者。只要有消息,并且有线程可以处理,消息就会被传递。

当你想控制消息的处理速度时,你可能想使用一个同步消费者。

持久性

与 Spring Cloud Stream 的 opinionated (翻译为 有主见的?) 应用模型一致,消费者组的订阅是持久的。也就是说,binder 的实现可以确保组的订阅是持久的,而且一旦为一个组创建了至少一个订阅,该组就会收到消息,即使这些消息是在该组的所有应用都停止时发送的。

匿名订阅在本质上是不可持久的。对于一些 binder 的实现(如RabbitMQ),可以有非持久性的组订阅。

一般来说,当把应用程序绑定到一个特定的目的地时,最好总是指定 consumer group。当扩展 Spring Cloud Stream 应用程序时,你必须为其每个输入 binding 指定 consumer group。这样做可以防止应用程序的实例收到重复的消息(除非需要这种行为,这是不正常的)。

2.3.5 - 分区

Spring Cloud Stream 应用模型中的分区

官方文档的描述

内容摘录自官方文档 Partitioning Support 一节

Spring Cloud Stream 提供了对特定应用程序的多个实例之间的数据分区的支持。在分区方案中,物理通信介质(如 broker topic)被视为被结构化为多个分区。一个或多个生产者应用实例向多个消费者应用实例发送数据,并确保由共同特征识别的数据由同一个消费者实例处理。

Spring Cloud Stream 为以统一方式实现分区处理用例提供了一个通用抽象。因此,无论 broker 本身是自然分区(例如Kafka)还是不分区(例如RabbitMQ),都可以使用分区。!

SCSt-partitioning

分区是有状态处理中的一个关键概念,在有状态处理中,确保所有相关数据被一起处理是非常关键的(出于性能或一致性的原因)。例如,在时间窗口平均计算的例子中,重要的是来自任何给定传感器的所有测量都由同一个应用实例处理。

​ 要搭建分区处理方案,必须配置数据生产端和数据消费端。

2.4 - 编程模型

Spring Cloud Stream 的主要概念和应用模型

内容摘录自官方文档 Programming Model 一节

为了理解这个编程模型,你应该熟悉以下核心概念:

  • 目的地绑定器(Destination Binders):负责提供与外部消息系统的集成的组件。
  • 绑定(Bindings):外部消息系统和应用程序之间的桥梁,提供消息生产者和消费者(由目的地绑定器创建)。
  • 消息(Message):生产者和消费者使用的典型数据结构,用于与目的地绑定器通信(从而通过外部消息系统与其他应用程序通信)。

2.4.1 - Destination Binders

Spring Cloud Stream 编程模型之 Destination Binders

官方文档的描述

内容摘录自官方文档 Destination Binders 一节

目的地绑定器(Destination Binders)是 Spring Cloud Stream 的扩展组件,负责提供必要的配置和实现,以促进与外部消息系统的集成。这种集成负责生产者和消费者之间的连接、委托和消息的路由、数据类型转换、用户代码的调用等。

Binder 处理了很多本来要落在开发者身上的责任。然而,为了达到这个目的,Binder 仍然需要一些帮助,其形式是来自用户的极简而必要的指令集,这些指令通常以某种类型的 binding 配置的形式出现。

虽然讨论所有可用的 binder 和 binding 配置选项超出了本节的范围(本手册的其他部分将广泛涉及这些选项),但 binding 作为一个概念,确实需要特别注意。下一节将详细讨论它。

2.4.2 - Binding

Spring Cloud Stream 编程模型之 Binding

官方文档的描述

内容摘录自官方文档 Bindings 一节

如前所述,binding 在外部消息系统(如队列、主题等)和应用程序提供的生产者和消费者之间提供了一座桥梁。

下面的例子显示了一个完全配置和运行的 Spring Cloud Stream 应用程序,该应用程序接收作为字符串类型的消息的有效载荷(见内容类型协商部分),将其记录到控制台,并在将其转换为大写字母后向下发送。

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

	@Bean
	public Function<String, String> uppercase() {
	    return value -> {
	        System.out.println("Received: " + value);
	        return value.toUpperCase();
	    };
	}
}

上面的例子看起来和任何 spring-boot 应用程序没有什么不同。它定义了一个 Function 类型的 bean,这就是它。那么,它是如何成为 spring-cloud-stream 应用程序的呢?它之所以成为 spring-cloud-stream 应用程序,仅仅是因为在 classpath 上存在 spring-cloud-stream 和 binder 的依赖关系以及自动配置类,有效地将启动应用程序的上下文设置为 spring-cloud-stream 应用程序。在这种情况下,Supplier、Function 或 Consumer 类型的 bean 被视为事实上的消息处理程序,触发绑定到所提供的绑定器所暴露的目的地,并遵循一定的命名惯例和规则以避免额外的配置。

绑定和绑定名称

绑定 (binding) 是一个抽象概念,代表了绑定器(binder)和用户代码所暴露的源和目标之间的桥梁,这个抽象概念有一个名字,虽然我们尽力限制运行 spring-cloud-stream 应用程序所需的配置,但在需要对每个绑定 (binding) 进行额外配置的情况下,了解这些名字是必要的。

在本手册中,你会看到一些配置属性的例子,如 spring.cloud.stream.bindings.input.destination=myQueue。这个属性名称中的 input 段就是我们所说的绑定名称(binding name),它可以通过几种机制衍生出来。下面的小节将描述 spring-cloud-stream 用于控制绑定名称(binding name)的命名惯例和配置元素。

函数式绑定名称

与 spring-cloud-stream 以前的版本中使用的基于注解的支持(legacy)所要求的显式命名不同,函数式编程模型在涉及到绑定名称时默认为简单的约定,从而大大简化了应用配置。让我们来看看第一个例子。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

在前面的例子中,我们有一个应用程序,它有一个单一的函数作为消息处理程序。作为 Function,它有一个输入和输出。用来命名输入和输出绑定的命名规则如下:

  • input - <functionName> + -in- + <index>
  • output - <functionName> + -out- + <index>

inout 对应的是 binding 的类型(如输入或输出)。index是输入或输出绑定的索引。对于典型的单一输入/输出函数,它总是0,所以它只与具有多个输入和输出参数的函数有关。

因此,如果你想把这个函数的输入映射到一个叫做 “my-topic” 的远程目标(例如,主题、队列等),你可以通过以下属性来实现:

--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic

请注意 uppercase-in-0 是如何作为属性名称中的一个段的。同样,uppercase-out-0也是如此。

描述性的绑定名称

有些时候,为了提高可读性,你可能想给你的 binding 一个更具描述性的名字(比如 “账户”,“订单” 等)。另一种方式是你可以将隐式绑定名称映射到显式绑定名称。你可以用 spring.cloud.stream.function.bindings.<binding-name> 属性来做。该属性还为依赖基于自定义接口的绑定的现有应用程序提供了一个迁移路径,这些绑定需要显式名称。

例如:

--spring.cloud.stream.function.bindings.uppercase-in-0=input

在前面的例子中,你把 uppercase-in-0 binding name 映射并有效地重命名为 input。现在,所有的配置属性都可以参考 input的binding name(例如, --spring.cloud.bindings.input.destination=my-topic)。

虽然描述性的 binding name 可能会增强配置的可读性,但它们也会通过将隐式绑定名称映射到显式绑定名称而产生另一种程度的误导。而且,由于所有后续的配置属性都将使用显式绑定名称,你必须始终参考这个 “bindings” 属性,以确定它实际上对应的是哪个功能。我们认为,对于大多数情况(函数组合除外),这可能是一种矫枉过正的做法,所以,我们建议完全避免使用它,尤其是不使用它可以在 binding 目的地和 binding name 之间提供一条清晰的路径,比如 spring.cloud.stream.bindings.uppercase-in-0.destination=sample-topic,在这里你可以清楚地将 uppercase 函数的输入与 sample-topic 的目的地相关联。

关于属性和其他配置选项的更多信息,请参见配置选项部分。

显式绑定的创建

在上一节中,我们解释了如何通过你的应用程序提供的 Function, Supplier 或 Consumer 驱动来隐式地创建 binding。然而,有时你可能需要显式地创建绑定,而绑定并不与任何函数挂钩。这通常是为了支持与其他框架(如Spring integration 框架)的集成,在那里你可能需要直接访问底层的 MessageChannel。

Spring Cloud Stream 允许你通过 spring.cloud.stream.input-bindingsspring.cloud.output-bindings 属性明确定义输入和输出绑定。注意到属性名称中的复数,允许你通过简单地使用;作为分隔符来定义多个绑定。请看下面的测试案例作为一个例子:

@Test
public void testExplicitBindings() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
		TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false",
					"--spring.cloud.stream.input-bindings=fooin;barin",
					"--spring.cloud.stream.output-bindings=fooout;barout")) {

	assertThat(context.getBean("fooin-in-0", MessageChannel.class)).isNotNull();
	assertThat(context.getBean("barin-in-0", MessageChannel.class)).isNotNull();
	assertThat(context.getBean("fooout-out-0", MessageChannel.class)).isNotNull();
	assertThat(context.getBean("barout-out-0", MessageChannel.class)).isNotNull();
	}
}

@EnableAutoConfiguration
@Configuration
public static class EmptyConfiguration {
}

正如你所看到的,我们声明了两个 input 绑定和两个 output 绑定,而我们的配置中没有定义任何函数,但我们还是能够成功地创建这些绑定并访问它们相应的通道。

其余适用于隐式绑定的绑定规则也适用于此(例如,你可以看到 fooin 变成了 fooin-in-0 绑定/通道等)。

2.4.3 - 生产和消费消息

Spring Cloud Stream 生产和消费消息

内容摘录自官方文档 Producing and Consuming Messages 一节

你可以通过简单地编写函数并将其作为 “@Bean " 公开,来编写 Spring Cloud Stream 应用程序。你也可以使用基于 Spring Integration 注解的配置或基于 Spring Cloud Stream 注解的配置,不过从 spring-cloud-stream 3.x 开始,我们建议使用函数实现。

2.4.3.1 - Spring Cloud Function

Spring Cloud Stream 的 Spring Cloud Function 支持

内容摘录自官方文档 Spring Cloud Function support 一节

2.4.3.1.1 - Spring Cloud Function支持概述

Spring Cloud Stream 的 Spring Cloud Function 支持概述

内容摘录自官方文档 Overview 一节

自 Spring Cloud Stream v2.1以来,定义流处理程序和源的另一个选择是使用 Spring Cloud Function 的内置支持,在那里它们可以被表达为 java.util.function.[Supplier/Function/Consumer] 类型的 bean。

要指定哪个函数式 bean 绑定到绑定所暴露的外部目标,你必须提供 spring.cloud.function.define 属性。

如果你只有 java.util.function.[Supplier/Function/Consumer] 类型的单个Bean,你可以跳过 spring.cloud.function.define 属性,因为这种函数 Bean 会被自动发现。然而,使用这种属性被认为是最好的做法,以避免任何混淆。有些时候,这种自动发现可能会妨碍工作,因为 java.util.function.[Supplier/Function/Consumer] 类型的单体 Bean 可能有处理消息以外的目的,但由于是单体,它被自动发现并自动绑定。对于这些罕见的情况,你可以通过提供 spring.cloud.stream.function.autodetect 属性来禁用自动发现,其值设置为 false。

下面是应用程序将消息处理程序暴露为 java.util.function.Function 的例子,通过充当数据的消费者和生产者,有效地支持直通语义。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在前面的例子中,我们定义了一个名为 toUpperCase 的 java.util.function.Function 类型的 bean,作为消息处理程序,其 “input” 和 “output” 必须绑定到所提供的目标绑定器所暴露的外部目的地。默认情况下,‘inpu’ 和 ‘output’ binding name 将是 toUpperCase-in-0toUpperCase-0。请参阅函数绑定名称部分,了解用于建立绑定名称的命名规则的细节。

下面是支持其他语义的简单功能应用的例子。

下面是一个以 java.util.function.Supplier 形式暴露的 source 语义的例子:

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

下面是一个以 java.util.function.Consumer 形式暴露的 sink 语义的例子

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}
root
  topic1
    subtopic
  topic2
    subtopic

2.4.3.1.2 - Suppliers (Sources)

Spring Cloud Function 之 Suppliers (Sources)

内容摘录自官方文档 Suppliers (Sources) 一节

当涉及到它们的调用如何被触发时,FunctionConsumer 是非常直接的。它们是根据发送到它们所绑定的目的地的数据(事件)来触发的。换句话说,它们是典型的事件驱动型组件。

然而,当谈到触发时,Supplier 属于自己的类别。因为根据定义,它是数据的源头(the origin),它不订阅任何绑定的目的地,因此,必须由其他机制来触发。还有一个 Supplier 实现的问题,它可以是命令式的(imperative),也可以是反应性的(reactive),它直接关系到这些 Supplier 的触发。

请看下面的例子:

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

前面的 Supplier Bean 在调用其 get() 方法时产生一个字符串。然而,谁会调用这个方法,多久调用一次?框架提供了一个默认的轮询机制(回答了 “谁?“的问题),它将触发 Supplier v的调用,默认情况下,它每隔一秒就会调用一次(回答了 “多久一次?“的问题)。换句话说,上述配置每秒钟产生一条消息,每条消息都被发送到一个由 binder 暴露的 output 目的地。要了解如何定制轮询机制,请看轮询配置属性部分。

考虑一个不同的例子:

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

上面的 Supplier Bean采用了反应式编程风格。通常情况下,与命令式 Supplier 不同,它应该只被触发一次,因为调用它的 get() 方法会产生(供应)连续的消息流,而不是单个消息。

该框架认识到了编程风格的不同,并保证这样的 Supplier 只被触发一次。

然而,想象一下这样的用例:你想轮询一些数据源并返回代表结果集的有限数据流。反应式编程风格是这种 Supplier 的完美机制。然而,考虑到产生的数据流的有限性,这样的 Supplier 仍然需要被定期调用。

考虑一下下面的例子,它通过产生一个有限的数据流来模拟这种用例:

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

Bean本身被注解了 PollableBean 注解(@Bean的子集),从而向框架发出信号,尽管这样一个 Supplier 的实现是反应式的,但它仍然需要被轮询。

Supplier & 线程

正如你现在所了解的,与 Function 和 Consumer 不同,Function 和 Consumer 是由事件触发的(它们有输入数据),Supplier 没有任何输入,因此由不同的机制– poller 触发,它可能有一个不可预知的线程机制。虽然大多数时候线程机制的细节与函数的下游执行无关,但在某些情况下可能会出现问题,特别是对于那些可能对线程亲和力有一定期望的集成框架。例如,Spring Cloud Sleuth 就依赖于存储在 thread local 中的追踪数据。对于这些情况,我们有另一种通过 StreamBridge 的机制,用户可以对线程机制有更多的控制。你可以在发送任意数据到输出端(例如Foreign事件驱动源)一节中获得更多细节。

2.4.3.1.3 - Consumer (Reactive)

Spring Cloud Function 之 Consumer (Reactive)

内容摘录自官方文档 Consumer (Reactive) 一节

Reactive Consumer 有点特别,因为它的返回类型是空的,没有给框架留下可以订阅的引用。你很可能不需要写 Consumer<Flux<?>,而是把它写成 Function<Flux<?>, Mono<Void>>,调用 then operator 作为你流中的最后一个 operator。

比如说。

public Function<Flux<?>, Mono<Void>>consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但如果你确实需要写一个显式的 Consumer<Flux<?>,记得要订阅传入的 Flux

另外,请记住,当混合反应式和命令式函数时,同样的规则也适用于函数组合。Spring Cloud Function 确实支持将反应式函数与命令式函数进行组合,但是你必须注意到某些限制。例如,假设你将反应式函数与命令式消费者进行组合。这种组合的结果是一个反应式 Consumer。然而,正如本节前面所讨论的那样,没有办法订阅这样的消费者,所以这个限制只能通过使你的消费者成为反应式并手动订阅(如前所述),或者将你的函数改为命令式来解决。

轮询配置属性

以下属性由 Spring Cloud Stream 公开(尽管自3.2版本起已被废弃),并以 spring.cloud.stream.poller 为前缀。

  • fixedDelay

    默认轮询器的固定延迟,单位是毫秒。

    默认值:1000L。

  • maxMessagesPerPoll

    默认轮询器的每个轮询事件的最大信息。

    默认值:1L。

  • cron

    Cron触发器的Cron表达式值。

    默认值:无。

  • initialDelay

    定期触发器的初始延迟。

    默认值:0。

  • timeUnit

    应用于延迟值的时间单位。

    默认值。MILLISECONDS。

    例如 --spring.cloud.stream.poller.fixed-delay=2000 设置轮询器的间隔为每两秒轮询一次。

每个绑定的轮询配置

上一节展示了如何配置一个将应用于所有绑定的默认轮询器。虽然它很适合 spring-cloud-stream 设计的微服务模型,即每个微服务代表一个组件(例如,供应商),因此默认轮询器配置就足够了,但在一些边缘情况下,你可能有几个组件需要不同的轮询配置。

在这种情况下,请使用按绑定方式配置轮询器。在这种情况下,你可以使用 spring.cloud.stream.bindings.supply-out-0.producer.poller...前缀为这种绑定配置轮询器(例如,spring.cloud.bindings.supply-out-0.producer.poller.fixed-delay=2000)。

2.4.3.1.4 - 将任意的数据发送到输出端

Spring Cloud Function 之将任意的数据发送到输出端

内容摘录自官方文档 Sending arbitrary data to an output (e.g. Foreign event-driven sources) 一节

将任意的数据发送到输出端(如 Foreign 的事件驱动源)。

有些情况下,实际的数据源可能来自于不是绑定器的外部(Foreign)系统。例如,数据源可能是一个经典的 REST 端点。我们如何将这样的 source 与 spring-cloud-stream 使用的函数机制连接起来?

Spring Cloud Stream 提供了两种机制,让我们来详细了解一下。

在这里,对于这两个样本,我们将使用一个标准的MVC端点方法,名为 delegateToSupplier,与根 Web 上下文绑定,通过StreamBridge 机制将传入的请求委托给流。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.source=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream-out-0", body);
	}
}

在这里,我们自动装箱(Autowire)了一个 StreamBridge Bean,它允许我们将数据发送到 output binding,有效地将非流应用程序与 spring-cloud-stream 连接起来。请注意,前面的例子没有定义任何 source 函数(例如,Supplier Bean),因此框架没有触发器来提前创建源绑定,这在配置包含函数 Bean 的情况下是很典型的。这很好,因为StreamBridge会在第一次调用send(.)操作时为非现有的绑定启动创建输出绑定(以及必要时的目的地自动配置),并将其缓存起来供后续重用(更多细节见StreamBridge和动态目的地)。

然而,如果你想在初始化(启动)时预先创建一个输出绑定,你可以从spring.cloud.stream.source属性中获益,在那里你可以声明你的源的名称。所提供的名称将被用作触发器来创建一个源绑定。所以在前面的例子中,输出绑定的名称将是toStream-out-0,这与函数使用的绑定命名惯例一致(见绑定和绑定名称)。你可以使用;来表示多个源(多个输出绑定)(例如,–spring.cloud.stream.source=foo;bar)

另外,注意streamBridge.send(..)方法需要一个对象作为数据。这意味着你可以向它发送POJO或消息,它在发送输出时将经过同样的程序,就像它来自任何函数或供应商一样,提供与函数相同的一致性。这意味着输出类型的转换、分区等都会被尊重,就像它是由函数产生的输出一样。