binder
- 1: binder
- 2: binding
- 3: ConsumerProperties
- 4: ProducerProperties
1 - binder
Binder 定义
A strategy interface used to bind an app interface to a logical name. The name is intended to identify a logical consumer or producer of messages. This may be a queue, a channel adapter, another message channel, a Spring bean, etc.
一个策略接口,用于将应用接口与逻辑名称绑定。该名称旨在识别消息的逻辑消费者或生产者。这可能是队列、通道适配器、另一个消息通道、Spring Bean,等等。
接口定义:
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String name, String group, T inboundBindTarget,
C consumerProperties);
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
name:消息目标的逻辑身份
group:该消费者所属的消费者组(consumer group) - 订阅在同一组的消费者之间共享(如果为 null
或空的String,必须被视为一个匿名组,不与任何其他消费者共享订阅)。
inboundBindTarget:绑定为消费者的应用程序接口
outboundBindTarget:绑定为生产者的应用程序接口
consumerProperties:消费者属性
producerProperties: 生产者属性
**Binding**<**T**>
: 返回设置好的binding
2 - binding
binding 定义
Represents a binding between an input or output and an adapter endpoint that connects via a Binder. The binding could be for a consumer or a producer. A consumer binding represents a connection from an adapter to an input. A producer binding represents a connection from an output to an adapter.
代表一个输入或输出和一个适配器端点之间的绑定,这个绑定是通过Binder连接的。该绑定可以是用于消费者或生产者。消费者绑定表示从适配器到输入的连接。生产者绑定代表从输出到适配器的连接。
接口定义:
// `Binding<T>`: binding的类型
public interface Binding<T> extends Pausable {
default Map<String, Object> getExtendedInfo() {
return Collections.emptyMap();
}
// 组件启动之后实例就已经 start,因此 stop() / start() 方法通常用于 re-bind / re-start。
default void start() {
}
default void stop() {
}
// 当且仅当组件实现了 Pausable 时,pause() / resume() 方法可以用于实现 pause/resume 操作。
default void pause() {
this.stop();
}
default void resume() {
this.start();
}
// isRunning() 方法在当前实例所表示的目标组件在运行时返回true。
default boolean isRunning() {
return false;
}
// getName() 方法返回的是 `destination name`,也就是当前binding的目的地的名字。
default String getName() {
return null;
}
// getBindingName() 返回的是 `binding name`,也就是当前绑定目标的名字,如 channel name
default String getBindingName() {
return null;
}
// 解除这个实例所代表的目标组件的绑定,并停止任何活动组件。实现必须是idempotent的。
// 在这个方法被调用后,目标组件不会收到任何消息;这个实例应该被丢弃,而应该创建一个新的Binding。
void unbind();
// isInput() 方法表明当前 binding 的类型,true 是 input binding,false 是 output binding。
// 由于 @input 和 @output 只能用一个,因此不会同时即是 input 又是 output。
// (TBD:有点怪,如果真要同时支持 input 和 output 该怎么办?创建两个实例?)
default boolean isInput() {
throw new UnsupportedOperationException(
"Binding implementation `" + this.getClass().getName()
+ "` must implement this operation before it is called");
}
}
默认实现
@JsonPropertyOrder({ "bindingName", "name", "group", "pausable", "state" })
@JsonIgnoreProperties("running")
public class DefaultBinding<T> implements Binding<T> {
}
getName() 和 getBindingName() 的差异就很清楚了,getName() 返回的就是构建binding时给出的 name,而 getBindingName() 则要看 target 是不是 IntegrationObjectSupport ,如果是,则取 target 的 getComponentName。
@Override
public String getName() {
return this.name;
}
@Override
public String getBindingName() {
String resolvedName = (this.target instanceof IntegrationObjectSupport)
? ((IntegrationObjectSupport) this.target).getComponentName() : getName();
return resolvedName == null ? getName() : resolvedName;
}
3 - ConsumerProperties
ConsumerProperties 类
ConsumerProperties 类定义通用的 consumer 属性,这些属性对应到 spring.cloud.stream.bindings.[destinationName].consumer.*
:
public class ConsumerProperties {
// 此消费者绑定的绑定名称
private String bindingName;
// 标志着这个消费者是否需要自动启动。默认值: true
private boolean autoStartup = true;
// 消费者的并发设置,默认为1
private int concurrency = 1;
// 消费者是否从分区的生产者那里接收数据。默认:'false'。
private boolean partitioned;
......
}
TBD:以下参数的意思待查明:
private String[] requiredGroups = new String[] {};
private HeaderMode headerMode;
private boolean useNativeEncoding = false;
private boolean errorChannelEnabled = false;
实例相关的参数
instance count
// 当设置为大于等于零的值时,允许自定义此消费者的实例计数(如果与 spring.cloud.stream.instanceCount 不同)。
// 当设置为负值时,它将默认为 spring.cloud.stream.instanceCount。
// 更多信息请参见该属性。默认值:-1
// 注意:该设置将覆盖 覆盖 "spring.cloud.stream.instance-count "中的设置。
private int instanceCount = -1;
instance index
// 当设置为大于等于零的值时,允许自定义此消费者的实例索引(如果与 spring.cloud.stream.instanceIndex 不同)。
// 当设置为负值时,它将默认为 spring.cloud.stream.instanceIndex。
// 更多信息请参见该属性。默认值:-1 注意:
// 该设置将覆盖 'spring.cloud.stream.instance-index'中的设置。
private int instanceIndex = -1;
// 当设置时,它将允许定制的消费者为列表中的每个项目产生一个消费者。
// 所有负数的索引将被丢弃。
// 默认值:null
// 注意:该设置将禁用 instance-index
private List<Integer> instanceIndexList;
重试相关的参数
retryableExceptions
// 一个键值为Throwable类名的映射,值为一个布尔值。
// 指定那些将被或不被重试的异常(和子类)。
private Map<Class<? extends Throwable>, Boolean> retryableExceptions = new LinkedHashMap<>();
defaultRetryable
// 监听器抛出的异常如果没有列在 "retryableExceptions" 中,是否可以重试。
private boolean defaultRetryable = true;
maxAttempts
// 在处理失败的情况下,尝试处理消息的次数(包括第一次)。
// 这是一个RetryTemplate配置,由框架提供。默认值:3。
// 设置为1可以禁用重试。
// 在你想完全控制RetryTemplate的情况下,你也可以提供自定义RetryTemplate。只需在你的应用程序配置中把它配置为@Bean。
private int maxAttempts = 3;
backOffInitialInterval
// 重试时的回退初始时间间隔。
// 这是一个RetryTemplate配置,由框架提供。默认值:1000毫秒。
// 如果你想完全控制RetryTemplate,你也可以提供自定义RetryTemplate。只需在你的应用程序配置中把它配置为@Bean。
private int backOffInitialInterval = 1000;
backOffMaxInterval
// 最大的回避间隔。
// 这是一个RetryTemplate配置,由框架提供。默认值:10000毫秒。
// 如果你想完全控制RetryTemplate,你也可以提供自定义RetryTemplate。只需在你的应用程序配置中把它配置为@Bean。
private int backOffMaxInterval = 10000;
backOffMultiplier
// Backoff multiplier.
// 这是一个RetryTemplate配置,由框架提供。默认值:2.0。
// 如果你想完全控制RetryTemplate,你也可以提供自定义RetryTemplate。只需在你的应用程序配置中把它配置为@Bean。
private double backOffMultiplier = 2.0;
retryTemplateName
// 允许你进一步限定对于特定的消费者绑定应该用哪一个RetryTemplate。
private String retryTemplateName;
批量模式
// 当设置为 "true" 时,如果绑定器支持,发出的消息将有一个有效载荷的列表;
// 当与函数结合使用时,函数可以接收一个带有有效载荷的对象(或 Message)列表,如果有必要的话,有效载荷将被转换。
private boolean batchMode;
多路(multiplex)
// 当设置为 "true" 时,底层绑定器将在同一输入绑定上自然地复用目的地。
// 例如,在逗号分隔的多个目的地的情况下,如果设置为 "true",核心框架将跳过单独绑定它们,而是将这一责任委托给绑定器。
// 默认情况下,这个属性被设置为 "false",在逗号分隔的多目的地列表中,绑定器将单独绑定每个目的地。
// 需要原生支持多个输入绑定的单个绑定器实现(多路)可以启用这个属性。
private boolean multiplex;
header 模式
// 当设置为 "none" 时,禁止对输入的头进行解析。
// 仅对不支持消息头并需要嵌入头的信息中间件有效。
// 当从非 Spring Cloud Stream 应用程序消耗数据,而不支持原生报头时,该选项很有用。
// 当设置为 headers 时,使用中间件的原生头信息机制。
// 当设置为'embeddedHeaders'时,将头信息嵌入到消息的有效载荷中。
// 默认值:取决于绑定器的实现。
// 目前与 spring cloud stream 一起分发的 Rabbit 和 Kafka 绑定器支持原生报头。
private HeaderMode headerMode;
useNativeDecoding
// 当设置为 "true" 时,入站消息会被客户端库直接反序列化.
// 客户端库必须进行相应的配置(例如,设置一个合适的 Kafka 生产者值序列化器)。
// 注意:这是绑定器的特定设置,如果绑定器不支持本地序列化/反序列化,则没有影响。目前只有Kafka绑定器支持它。默认:'false'。
private boolean useNativeDecoding;
4 - ProducerProperties
ProducerProperties 类
ProducerProperties 类定义通用的 producer 属性:
public class ProducerProperties {
// 此生产者绑定的绑定名称
private String bindingName;
// 标志着这个生产者是否需要自动启动。默认值: true
private boolean autoStartup = true;
// 同 ComsumerProperties
private HeaderMode headerMode;
// 同 ComsumerProperties
private boolean useNativeEncoding = false;
......
}
TBD:以下参数的意思待查明:
private String[] requiredGroups = new String[] {};
private boolean errorChannelEnabled = false;
分区相关的参数
partiton key
获取 partiton key 的方式有两种:通过 partitionKeyExpression 或者 通过 partitionKeyExtractorName 指定的 bean。两种方式只能选择其中一个:
@JsonSerialize(using = ExpressionSerializer.class)
private Expression partitionKeyExpression;
// 实现 PartitionKeyExtractorStrategy 的 bean 的名字
// 用于提取 key,而这个 key 将用于计算 partition id (参见 partitionSelector)
private String partitionKeyExtractorName;
根据 partitionKeyExpression 和 partitionKeyExtractorName 的设置可以判断是否有做分区:
public boolean isPartitioned() {
return this.partitionKeyExpression != null
|| this.partitionKeyExtractorName != null;
}
partiton id
从 partiton key 计算出 partiton id 的方式也有两种:通过 partitionSelectorExpression 或者通过 partitionSelectorName 指定的 bean。同样,两种方式只能选择其中一个:
// 实现 PartitionSelectorStrategy 的 bean 的名字
// 用于根据 partition key 来决定 partition id
private String partitionSelectorName;
@JsonSerialize(using = ExpressionSerializer.class)
private Expression partitionSelectorExpression;
partiton key 和 partiton id 共用同一个 ExpressionSerializer 的实现:
static class ExpressionSerializer extends JsonSerializer<Expression> {
@Override
public void serialize(Expression expression, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
if (expression != null) {
jsonGenerator.writeString(expression.getExpressionString());
}
}
}
partition count
partitionCount 参数指定分区数量:
private int partitionCount = 1;
PollerProperties
PollerProperties 的参数:
public static class PollerProperties {
private Duration fixedDelay = Duration.ofMillis(1000);
private long maxMessagesPerPoll = 1L;
private String cron;
private Duration initialDelay = Duration.ofMillis(0);
}