ProducerProperties
spring cloud stream的核心概念-ProducerProperties
ProducerProperties 类
ProducerProperties 类定义通用的 producer 属性:
public class ProducerProperties {
// 此生产者绑定的绑定名称
private String bindingName;
// 标志着这个生产者是否需要自动启动。默认值: true
private boolean autoStartup = true;
// 同 ComsumerProperties
private HeaderMode headerMode;
// 同 ComsumerProperties
private boolean useNativeEncoding = false;
......
}
TBD:以下参数的意思待查明:
private String[] requiredGroups = new String[] {};
private boolean errorChannelEnabled = false;
分区相关的参数
partiton key
获取 partiton key 的方式有两种:通过 partitionKeyExpression 或者 通过 partitionKeyExtractorName 指定的 bean。两种方式只能选择其中一个:
@JsonSerialize(using = ExpressionSerializer.class)
private Expression partitionKeyExpression;
// 实现 PartitionKeyExtractorStrategy 的 bean 的名字
// 用于提取 key,而这个 key 将用于计算 partition id (参见 partitionSelector)
private String partitionKeyExtractorName;
根据 partitionKeyExpression 和 partitionKeyExtractorName 的设置可以判断是否有做分区:
public boolean isPartitioned() {
return this.partitionKeyExpression != null
|| this.partitionKeyExtractorName != null;
}
partiton id
从 partiton key 计算出 partiton id 的方式也有两种:通过 partitionSelectorExpression 或者通过 partitionSelectorName 指定的 bean。同样,两种方式只能选择其中一个:
// 实现 PartitionSelectorStrategy 的 bean 的名字
// 用于根据 partition key 来决定 partition id
private String partitionSelectorName;
@JsonSerialize(using = ExpressionSerializer.class)
private Expression partitionSelectorExpression;
partiton key 和 partiton id 共用同一个 ExpressionSerializer 的实现:
static class ExpressionSerializer extends JsonSerializer<Expression> {
@Override
public void serialize(Expression expression, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
if (expression != null) {
jsonGenerator.writeString(expression.getExpressionString());
}
}
}
partition count
partitionCount 参数指定分区数量:
private int partitionCount = 1;
PollerProperties
PollerProperties 的参数:
public static class PollerProperties {
private Duration fixedDelay = Duration.ofMillis(1000);
private long maxMessagesPerPoll = 1L;
private String cron;
private Duration initialDelay = Duration.ofMillis(0);
}