1 - 源码和项目概况

spring cloud stream项目和源码情况

代码地址:

https://github.com/spring-cloud/spring-cloud-stream

版本:v.3.2.4

1.1 - 项目版本和JDK版本

spring cloud stream 项目版本和JDK版本的关系说明
spring cloud stream version jdk version 说明
v3.2.4 1.8 最后一个支持jdk8的版本
v4.0.0-M1 17 第一个要求jdk17的版本

相关说明资料:

1.2 - 子项目情况

spring cloud stream 子项目情况

bom

bom目录下有两个子项目,定义 starter 和 dependencies。

spring-cloud-starter-parent

依赖的 spring boot 版本是 2.6.8:

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.6.8</version>
  <relativePath/>
</parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-starter-parent</artifactId>
<version>3.2.4</version>
<name>spring-cloud-stream-starter-parent</name>

spring-cloud-stream-dependencies

定义 spring-cloud-stream 下各个子项目的版本:

<parent>
  <artifactId>spring-cloud-dependencies-parent</artifactId>
  <groupId>org.springframework.cloud</groupId>
  <version>3.1.3</version>
  <relativePath/>
</parent>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>3.2.4</version>
<packaging>pom</packaging>
<name>spring-cloud-stream-dependencies</name>
<description>Spring Cloud Stream Dependencies</description>

Core

spring-cloud-stream

使用Spring integration 的消息微服务

spring-cloud-stream-binder-test

对 binder 实现的测试支持。

spring-cloud-stream-integaration-tests

Spring Cloud Stream 的集成测试。

spring-cloud-stream-test-support

一组类,以方便对Spring Cloud Stream模块的测试。

spring-cloud-stream-test-support-internal

一系列的类和实用程序代码,可以帮助测试 spring-cloud-stream 本身,以及模块。

Binder

spring cloud stream 项目只提供两个 binder,分别支持 kafka 和 RabbitMQ

kafka binder

RabbitMQ binder

2 - 概念

spring cloud stream的核心概念

2.1 - binder

spring cloud stream的核心概念-binder

2.1.1 - binder

spring cloud stream的核心概念-binder定义

Binder 定义

A strategy interface used to bind an app interface to a logical name. The name is intended to identify a logical consumer or producer of messages. This may be a queue, a channel adapter, another message channel, a Spring bean, etc.

一个策略接口,用于将应用接口与逻辑名称绑定。该名称旨在识别消息的逻辑消费者或生产者。这可能是队列、通道适配器、另一个消息通道、Spring Bean,等等。

接口定义:

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
  	Binding<T> bindConsumer(String name, String group, T inboundBindTarget,
			C consumerProperties);
  	Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

name:消息目标的逻辑身份

group:该消费者所属的消费者组(consumer group) - 订阅在同一组的消费者之间共享(如果为 null或空的String,必须被视为一个匿名组,不与任何其他消费者共享订阅)。

inboundBindTarget:绑定为消费者的应用程序接口

outboundBindTarget:绑定为生产者的应用程序接口

consumerProperties:消费者属性

producerProperties: 生产者属性

**Binding**<**T**>: 返回设置好的binding

2.1.2 - binding

spring cloud stream的核心概念-binding定义

binding 定义

Represents a binding between an input or output and an adapter endpoint that connects via a Binder. The binding could be for a consumer or a producer. A consumer binding represents a connection from an adapter to an input. A producer binding represents a connection from an output to an adapter.

代表一个输入或输出和一个适配器端点之间的绑定,这个绑定是通过Binder连接的。该绑定可以是用于消费者或生产者。消费者绑定表示从适配器到输入的连接。生产者绑定代表从输出到适配器的连接。

接口定义:

// `Binding<T>`: binding的类型
public interface Binding<T> extends Pausable {

	default Map<String, Object> getExtendedInfo() {
		return Collections.emptyMap();
	}

  // 组件启动之后实例就已经 start,因此 stop() / start() 方法通常用于 re-bind / re-start。
	default void start() {
	}

	default void stop() {
	}

  // 当且仅当组件实现了 Pausable 时,pause() / resume() 方法可以用于实现 pause/resume 操作。
	default void pause() {
		this.stop();
	}

	default void resume() {
		this.start();
	}

  // isRunning() 方法在当前实例所表示的目标组件在运行时返回true。
	default boolean isRunning() {
		return false;
	}

  // getName() 方法返回的是 `destination name`,也就是当前binding的目的地的名字。
	default String getName() {
		return null;
	}

  // getBindingName() 返回的是 `binding name`,也就是当前绑定目标的名字,如 channel name 
	default String getBindingName() {
		return null;
	}

  // 解除这个实例所代表的目标组件的绑定,并停止任何活动组件。实现必须是idempotent的。
  // 在这个方法被调用后,目标组件不会收到任何消息;这个实例应该被丢弃,而应该创建一个新的Binding。 
	void unbind();

  // isInput() 方法表明当前 binding 的类型,true 是 input binding,false 是 output binding。
  // 由于 @input 和 @output 只能用一个,因此不会同时即是 input 又是 output。
  // (TBD:有点怪,如果真要同时支持 input 和 output 该怎么办?创建两个实例?)
	default boolean isInput() {
		throw new UnsupportedOperationException(
				"Binding implementation `" + this.getClass().getName()
						+ "` must implement this operation before it is called");
	}

}

默认实现

@JsonPropertyOrder({ "bindingName", "name", "group", "pausable", "state" })
@JsonIgnoreProperties("running")
public class DefaultBinding<T> implements Binding<T> {
  
}

getName() 和 getBindingName() 的差异就很清楚了,getName() 返回的就是构建binding时给出的 name,而 getBindingName() 则要看 target 是不是 IntegrationObjectSupport ,如果是,则取 target 的 getComponentName。

	@Override
	public String getName() {
		return this.name;
	}

	@Override
	public String getBindingName() {
		String resolvedName = (this.target instanceof IntegrationObjectSupport)
				? ((IntegrationObjectSupport) this.target).getComponentName() : getName();
		return resolvedName == null ? getName() : resolvedName;
	}

2.1.3 - ConsumerProperties

spring cloud stream的核心概念-ConsumerProperties

ConsumerProperties 类

ConsumerProperties 类定义通用的 consumer 属性,这些属性对应到 spring.cloud.stream.bindings.[destinationName].consumer.*:

public class ConsumerProperties {
	// 此消费者绑定的绑定名称
	private String bindingName;

	// 标志着这个消费者是否需要自动启动。默认值: true
	private boolean autoStartup = true;
  
  // 消费者的并发设置,默认为1
  private int concurrency = 1;
  
  // 消费者是否从分区的生产者那里接收数据。默认:'false'。
  private boolean partitioned;
	......
}

TBD:以下参数的意思待查明:

  private String[] requiredGroups = new String[] {};

	private HeaderMode headerMode;

	private boolean useNativeEncoding = false;

	private boolean errorChannelEnabled = false;

实例相关的参数

instance count

// 当设置为大于等于零的值时,允许自定义此消费者的实例计数(如果与 spring.cloud.stream.instanceCount 不同)。
// 当设置为负值时,它将默认为 spring.cloud.stream.instanceCount。
// 更多信息请参见该属性。默认值:-1 
// 注意:该设置将覆盖 覆盖 "spring.cloud.stream.instance-count "中的设置。
private int instanceCount = -1;

instance index

// 当设置为大于等于零的值时,允许自定义此消费者的实例索引(如果与 spring.cloud.stream.instanceIndex 不同)。
// 当设置为负值时,它将默认为 spring.cloud.stream.instanceIndex。
// 更多信息请参见该属性。默认值:-1 注意:
// 该设置将覆盖 'spring.cloud.stream.instance-index'中的设置。
private int instanceIndex = -1;

// 当设置时,它将允许定制的消费者为列表中的每个项目产生一个消费者。
// 所有负数的索引将被丢弃。
// 默认值:null 
// 注意:该设置将禁用 instance-index
private List<Integer> instanceIndexList;

重试相关的参数

retryableExceptions

// 一个键值为Throwable类名的映射,值为一个布尔值。
// 指定那些将被或不被重试的异常(和子类)。
private Map<Class<? extends Throwable>, Boolean> retryableExceptions = new LinkedHashMap<>();

defaultRetryable

// 监听器抛出的异常如果没有列在 "retryableExceptions" 中,是否可以重试。
private boolean defaultRetryable = true;

maxAttempts

// 在处理失败的情况下,尝试处理消息的次数(包括第一次)。
// 这是一个RetryTemplate配置,由框架提供。默认值:3。
// 设置为1可以禁用重试。
// 在你想完全控制RetryTemplate的情况下,你也可以提供自定义RetryTemplate。只需在你的应用程序配置中把它配置为@Bean。
private int maxAttempts = 3;

backOffInitialInterval

// 重试时的回退初始时间间隔。
// 这是一个RetryTemplate配置,由框架提供。默认值:1000毫秒。
// 如果你想完全控制RetryTemplate,你也可以提供自定义RetryTemplate。只需在你的应用程序配置中把它配置为@Bean。
private int backOffInitialInterval = 1000;

backOffMaxInterval

// 最大的回避间隔。
// 这是一个RetryTemplate配置,由框架提供。默认值:10000毫秒。
// 如果你想完全控制RetryTemplate,你也可以提供自定义RetryTemplate。只需在你的应用程序配置中把它配置为@Bean。
private int backOffMaxInterval = 10000;

backOffMultiplier

// Backoff multiplier.
// 这是一个RetryTemplate配置,由框架提供。默认值:2.0。
// 如果你想完全控制RetryTemplate,你也可以提供自定义RetryTemplate。只需在你的应用程序配置中把它配置为@Bean。
private double backOffMultiplier = 2.0;

retryTemplateName

// 允许你进一步限定对于特定的消费者绑定应该用哪一个RetryTemplate。
private String retryTemplateName;

批量模式

// 当设置为 "true" 时,如果绑定器支持,发出的消息将有一个有效载荷的列表;
// 当与函数结合使用时,函数可以接收一个带有有效载荷的对象(或 Message)列表,如果有必要的话,有效载荷将被转换。
private boolean batchMode;

多路(multiplex)

// 当设置为 "true" 时,底层绑定器将在同一输入绑定上自然地复用目的地。
// 例如,在逗号分隔的多个目的地的情况下,如果设置为 "true",核心框架将跳过单独绑定它们,而是将这一责任委托给绑定器。

// 默认情况下,这个属性被设置为 "false",在逗号分隔的多目的地列表中,绑定器将单独绑定每个目的地。
// 需要原生支持多个输入绑定的单个绑定器实现(多路)可以启用这个属性。
private boolean multiplex;

header 模式

// 当设置为 "none" 时,禁止对输入的头进行解析。
// 仅对不支持消息头并需要嵌入头的信息中间件有效。
// 当从非 Spring Cloud Stream 应用程序消耗数据,而不支持原生报头时,该选项很有用。
// 当设置为 headers 时,使用中间件的原生头信息机制。
// 当设置为'embeddedHeaders'时,将头信息嵌入到消息的有效载荷中。
// 默认值:取决于绑定器的实现。
// 目前与 spring cloud stream 一起分发的 Rabbit 和 Kafka 绑定器支持原生报头。
private HeaderMode headerMode;

useNativeDecoding

// 当设置为 "true" 时,入站消息会被客户端库直接反序列化.
// 客户端库必须进行相应的配置(例如,设置一个合适的 Kafka 生产者值序列化器)。
// 注意:这是绑定器的特定设置,如果绑定器不支持本地序列化/反序列化,则没有影响。目前只有Kafka绑定器支持它。默认:'false'。
private boolean useNativeDecoding;

2.1.4 - ProducerProperties

spring cloud stream的核心概念-ProducerProperties

ProducerProperties 类

ProducerProperties 类定义通用的 producer 属性:

public class ProducerProperties {
	// 此生产者绑定的绑定名称
	private String bindingName;

	// 标志着这个生产者是否需要自动启动。默认值: true
	private boolean autoStartup = true;
  
  // 同 ComsumerProperties
	private HeaderMode headerMode;

  // 同 ComsumerProperties
	private boolean useNativeEncoding = false;
	......
}

TBD:以下参数的意思待查明:

  private String[] requiredGroups = new String[] {};
	private boolean errorChannelEnabled = false;

分区相关的参数

partiton key

获取 partiton key 的方式有两种:通过 partitionKeyExpression 或者 通过 partitionKeyExtractorName 指定的 bean。两种方式只能选择其中一个:

@JsonSerialize(using = ExpressionSerializer.class)
private Expression partitionKeyExpression;

// 实现 PartitionKeyExtractorStrategy 的 bean 的名字
// 用于提取 key,而这个 key 将用于计算 partition id (参见 partitionSelector)
private String partitionKeyExtractorName;


根据 partitionKeyExpression 和 partitionKeyExtractorName 的设置可以判断是否有做分区:

	public boolean isPartitioned() {
		return this.partitionKeyExpression != null
				|| this.partitionKeyExtractorName != null;
	}

partiton id

从 partiton key 计算出 partiton id 的方式也有两种:通过 partitionSelectorExpression 或者通过 partitionSelectorName 指定的 bean。同样,两种方式只能选择其中一个:

// 实现 PartitionSelectorStrategy 的 bean 的名字
// 用于根据 partition key 来决定 partition id 
private String partitionSelectorName;

@JsonSerialize(using = ExpressionSerializer.class)
private Expression partitionSelectorExpression;

partiton key 和 partiton id 共用同一个 ExpressionSerializer 的实现:

static class ExpressionSerializer extends JsonSerializer<Expression> {

  @Override
  public void serialize(Expression expression, JsonGenerator jsonGenerator,
                        SerializerProvider serializerProvider) throws IOException {
    if (expression != null) {
      jsonGenerator.writeString(expression.getExpressionString());
    }
  }
}

partition count

partitionCount 参数指定分区数量:

private int partitionCount = 1;

PollerProperties

PollerProperties 的参数:

public static class PollerProperties {

  private Duration fixedDelay = Duration.ofMillis(1000);

  private long maxMessagesPerPoll = 1L;

  private String cron;

  private Duration initialDelay = Duration.ofMillis(0);
}

requiredGroups

errorChannelEnabled

3 - kafka binder

kafka binder源码

4 - RabbitMQ binder

RabbitMQ binder源码