资源绑定的源码
- 1: 资源绑定的源码概述
- 2: 资源绑定的初始化源码分析
- 3: 资源绑定的Redis output实现源码分析
- 4: 资源绑定的output处理源码分析
- 5: 资源绑定的Metadata总结
1 - 资源绑定的源码概述
2 - 资源绑定的初始化源码分析
Binding Registry
Binding Registry的初始化准备
Binding Registry 的初始化在 runtime 初始化时进行:
func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration) *DaprRuntime {
......
bindingsRegistry: bindings_loader.NewRegistry(),
}
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
a.bindingsRegistry.RegisterInputBindings(opts.inputBindings...)
a.bindingsRegistry.RegisterOutputBindings(opts.outputBindings...)
......
}
这些 opts 来自 runtime 启动时的配置,如 cmd/daprd/main.go 下:
func main() {
rt, err := runtime.FromFlags()
if err != nil {
log.Fatal(err)
}
err = rt.Run(
......
runtime.WithInputBindings(
bindings_loader.NewInput("aws.sqs", func() bindings.InputBinding {
return sqs.NewAWSSQS(logContrib)
}),
bindings_loader.NewInput("aws.kinesis", func() bindings.InputBinding {
return kinesis.NewAWSKinesis(logContrib)
}),
bindings_loader.NewInput("azure.eventhubs", func() bindings.InputBinding {
return eventhubs.NewAzureEventHubs(logContrib)
}),
bindings_loader.NewInput("kafka", func() bindings.InputBinding {
return kafka.NewKafka(logContrib)
}),
bindings_loader.NewInput("mqtt", func() bindings.InputBinding {
return mqtt.NewMQTT(logContrib)
}),
bindings_loader.NewInput("rabbitmq", func() bindings.InputBinding {
return bindings_rabbitmq.NewRabbitMQ(logContrib)
}),
bindings_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
return servicebusqueues.NewAzureServiceBusQueues(logContrib)
}),
bindings_loader.NewInput("azure.storagequeues", func() bindings.InputBinding {
return storagequeues.NewAzureStorageQueues(logContrib)
}),
bindings_loader.NewInput("gcp.pubsub", func() bindings.InputBinding {
return pubsub.NewGCPPubSub(logContrib)
}),
bindings_loader.NewInput("kubernetes", func() bindings.InputBinding {
return kubernetes.NewKubernetes(logContrib)
}),
bindings_loader.NewInput("azure.eventgrid", func() bindings.InputBinding {
return eventgrid.NewAzureEventGrid(logContrib)
}),
bindings_loader.NewInput("twitter", func() bindings.InputBinding {
return twitter.NewTwitter(logContrib)
}),
bindings_loader.NewInput("cron", func() bindings.InputBinding {
return cron.NewCron(logContrib)
}),
),
runtime.WithOutputBindings(
bindings_loader.NewOutput("aws.sqs", func() bindings.OutputBinding {
return sqs.NewAWSSQS(logContrib)
}),
bindings_loader.NewOutput("aws.sns", func() bindings.OutputBinding {
return sns.NewAWSSNS(logContrib)
}),
bindings_loader.NewOutput("aws.kinesis", func() bindings.OutputBinding {
return kinesis.NewAWSKinesis(logContrib)
}),
bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
return eventhubs.NewAzureEventHubs(logContrib)
}),
bindings_loader.NewOutput("aws.dynamodb", func() bindings.OutputBinding {
return dynamodb.NewDynamoDB(logContrib)
}),
bindings_loader.NewOutput("azure.cosmosdb", func() bindings.OutputBinding {
return bindings_cosmosdb.NewCosmosDB(logContrib)
}),
bindings_loader.NewOutput("gcp.bucket", func() bindings.OutputBinding {
return bucket.NewGCPStorage(logContrib)
}),
bindings_loader.NewOutput("http", func() bindings.OutputBinding {
return http.NewHTTP(logContrib)
}),
bindings_loader.NewOutput("kafka", func() bindings.OutputBinding {
return kafka.NewKafka(logContrib)
}),
bindings_loader.NewOutput("mqtt", func() bindings.OutputBinding {
return mqtt.NewMQTT(logContrib)
}),
bindings_loader.NewOutput("rabbitmq", func() bindings.OutputBinding {
return bindings_rabbitmq.NewRabbitMQ(logContrib)
}),
bindings_loader.NewOutput("redis", func() bindings.OutputBinding {
return redis.NewRedis(logContrib)
}),
bindings_loader.NewOutput("aws.s3", func() bindings.OutputBinding {
return s3.NewAWSS3(logContrib)
}),
bindings_loader.NewOutput("azure.blobstorage", func() bindings.OutputBinding {
return blobstorage.NewAzureBlobStorage(logContrib)
}),
bindings_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
return servicebusqueues.NewAzureServiceBusQueues(logContrib)
}),
bindings_loader.NewOutput("azure.storagequeues", func() bindings.OutputBinding {
return storagequeues.NewAzureStorageQueues(logContrib)
}),
bindings_loader.NewOutput("gcp.pubsub", func() bindings.OutputBinding {
return pubsub.NewGCPPubSub(logContrib)
}),
bindings_loader.NewOutput("azure.signalr", func() bindings.OutputBinding {
return signalr.NewSignalR(logContrib)
}),
bindings_loader.NewOutput("twilio.sms", func() bindings.OutputBinding {
return sms.NewSMS(logContrib)
}),
bindings_loader.NewOutput("twilio.sendgrid", func() bindings.OutputBinding {
return sendgrid.NewSendGrid(logContrib)
}),
bindings_loader.NewOutput("azure.eventgrid", func() bindings.OutputBinding {
return eventgrid.NewAzureEventGrid(logContrib)
}),
bindings_loader.NewOutput("cron", func() bindings.OutputBinding {
return cron.NewCron(logContrib)
}),
bindings_loader.NewOutput("twitter", func() bindings.OutputBinding {
return twitter.NewTwitter(logContrib)
}),
bindings_loader.NewOutput("influx", func() bindings.OutputBinding {
return influx.NewInflux(logContrib)
}),
),
......
}
在这里配置各种 inputbinding 和 output binding的实现。
Binding Registry的实现方式
pkg/components/bindings/registry.go,定义了多个数据结构:
type (
// InputBinding is an input binding component definition.
InputBinding struct {
Name string
FactoryMethod func() bindings.InputBinding
}
// OutputBinding is an output binding component definition.
OutputBinding struct {
Name string
FactoryMethod func() bindings.OutputBinding
}
// Registry is the interface of a components that allows callers to get registered instances of input and output bindings
Registry interface {
RegisterInputBindings(components ...InputBinding)
RegisterOutputBindings(components ...OutputBinding)
CreateInputBinding(name string) (bindings.InputBinding, error)
CreateOutputBinding(name string) (bindings.OutputBinding, error)
}
bindingsRegistry struct {
inputBindings map[string]func() bindings.InputBinding
outputBindings map[string]func() bindings.OutputBinding
}
)
前面 runtime 初始化时,每个实现都通过 NewInput 方法和 NewOutput方法,将 name 和对应的InputBinding/OutputBinding关联起来:
// NewInput creates a InputBinding.
func NewInput(name string, factoryMethod func() bindings.InputBinding) InputBinding {
return InputBinding{
Name: name,
FactoryMethod: factoryMethod,
}
}
// NewOutput creates a OutputBinding.
func NewOutput(name string, factoryMethod func() bindings.OutputBinding) OutputBinding {
return OutputBinding{
Name: name,
FactoryMethod: factoryMethod,
}
}
RegisterInputBindings 和 RegisterOutputBindings 方法用来注册 input binding 和 output binding
的实现,在runtime 初始化时被调用:
// RegisterInputBindings registers one or more new input bindings.
func (b *bindingsRegistry) RegisterInputBindings(components ...InputBinding) {
for _, component := range components {
b.inputBindings[createFullName(component.Name)] = component.FactoryMethod
}
}
// RegisterOutputBindings registers one or more new output bindings.
func (b *bindingsRegistry) RegisterOutputBindings(components ...OutputBinding) {
for _, component := range components {
b.outputBindings[createFullName(component.Name)] = component.FactoryMethod
}
}
func createFullName(name string) string {
// createFullName统一增加前缀 bindings.
return fmt.Sprintf("bindings.%s", name)
}
binding的初始化流程
pkg/runtime/runtime.go :
Binding 的初始化在 runtime 初始化时进行:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
go a.processComponents()
......
}
func (a *DaprRuntime) processComponents() {
for {
comp, more := <-a.pendingComponents
if !more {
a.pendingComponentsDone <- true
return
}
if err := a.processOneComponent(comp); err != nil {
log.Errorf("process component %s error, %s", comp.Name, err)
}
}
}
processOneComponent:
func (a *DaprRuntime) processOneComponent(comp components_v1alpha1.Component) error {
res := a.preprocessOneComponent(&comp)
compCategory := a.figureOutComponentCategory(comp)
......
return nil
}
doProcessOneComponent:
func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
switch category {
case bindingsComponent:
return a.initBinding(comp)
......
}
return nil
}
initBinding:
func (a *DaprRuntime) initBinding(c components_v1alpha1.Component) error {
if err := a.initOutputBinding(c); err != nil {
log.Errorf("failed to init output bindings: %s", err)
return err
}
if err := a.initInputBinding(c); err != nil {
log.Errorf("failed to init input bindings: %s", err)
return err
}
return nil
}
在这里进行 input binding 和 output binding 的初始化。
Output Binding的初始化
pkg/runtime/runtime.go:
func (a *DaprRuntime) initOutputBinding(c components_v1alpha1.Component) error {
// 成功
binding, err := a.bindingsRegistry.CreateOutputBinding(c.Spec.Type)
if err != nil {
log.Warnf("failed to create output binding %s (%s): %s", c.ObjectMeta.Name, c.Spec.Type, err)
diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation")
return err
}
if binding != nil {
err := binding.Init(bindings.Metadata{
Properties: a.convertMetadataItemsToProperties(c.Spec.Metadata),
Name: c.ObjectMeta.Name,
})
if err != nil {
log.Errorf("failed to init output binding %s (%s): %s", c.ObjectMeta.Name, c.Spec.Type, err)
diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init")
return err
}
log.Infof("successful init for output binding %s (%s)", c.ObjectMeta.Name, c.Spec.Type)
a.outputBindings[c.ObjectMeta.Name] = binding
diag.DefaultMonitoring.ComponentInitialized(c.Spec.Type)
}
return nil
}
其中 CreateOutputBinding 方法的实现在 pkg/components/bindings/registry.go
中:
// Create instantiates an output binding based on `name`.
func (b *bindingsRegistry) CreateOutputBinding(name string) (bindings.OutputBinding, error) {
if method, ok := b.outputBindings[name]; ok {
// 调用 factory 方法生成具体实现的 outputBinding
return method(), nil
}
return nil, errors.Errorf("couldn't find output binding %s", name)
}
Input Binding的初始化
TODO
3 - 资源绑定的Redis output实现源码分析
备注:根据 https://github.com/dapr/docs/blob/master/concepts/bindings/README.md 的描述,redis 只实现了 output binding。
output binding 的实现
Redis的实现在 dapr/components-contrib 下,/bindings/redis/redis.go 中:
func (r *Redis) Operations() []bindings.OperationKind {
// 只支持create
return []bindings.OperationKind{bindings.CreateOperation}
}
func (r *Redis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
// 通过 metadata 传递 key
if val, ok := req.Metadata["key"]; ok && val != "" {
key := val
// 调用标准 redis 客户端,执行 SET 命令
_, err := r.client.DoContext(context.Background(), "SET", key, req.Data).Result()
if err != nil {
return nil, err
}
return nil, nil
}
return nil, errors.New("redis binding: missing key on write request metadata")
}
完整分析
初始化:
在 dapr runtime 初始化时,关联 redis 的 output binding实现:
bindings_loader.NewOutput("redis", func() bindings.OutputBinding {
return redis.NewRedis(logContrib)
}),
然后 Init 方法会在 output binding初始化时被 dapr runtime 调用,Redis的实现内容为:
// Init performs metadata parsing and connection creation
func (r *Redis) Init(meta bindings.Metadata) error {
// 解析metadata
m, err := r.parseMetadata(meta)
if err != nil {
return err
}
// redis 连接属性
opts := &redis.Options{
Addr: m.host,
Password: m.password,
DB: defaultDB,
MaxRetries: m.maxRetries,
MaxRetryBackoff: m.maxRetryBackoff,
}
/* #nosec */
if m.enableTLS {
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: m.enableTLS,
}
}
// 建立redis连接
r.client = redis.NewClient(opts)
_, err = r.client.Ping().Result()
if err != nil {
return fmt.Errorf("redis binding: error connecting to redis at %s: %s", m.host, err)
}
return err
}
4 - 资源绑定的output处理源码分析
pkc/grpc/api.go
中的 InvokeBinding 方法:
func (a *api) InvokeBinding(ctx context.Context, in *runtimev1pb.InvokeBindingRequest) (*runtimev1pb.InvokeBindingResponse, error) {
req := &bindings.InvokeRequest{
Metadata: in.Metadata,
Operation: bindings.OperationKind(in.Operation),
}
if in.Data != nil {
req.Data = in.Data
}
r := &runtimev1pb.InvokeBindingResponse{}
// 关键实现在这里
resp, err := a.sendToOutputBindingFn(in.Name, req)
if err != nil {
err = fmt.Errorf("ERR_INVOKE_OUTPUT_BINDING: %s", err)
apiServerLogger.Debug(err)
return r, err
}
if resp != nil {
r.Data = resp.Data
r.Metadata = resp.Metadata
}
return r, nil
}
sendToOutputBindingFn 方法的初始化在这里:
func (a *DaprRuntime) getGRPCAPI() grpc.API {
return grpc.NewAPI(a.runtimeConfig.ID, a.appChannel, a.stateStores, a.secretStores, a.getPublishAdapter(), a.directMessaging, a.actor, a.sendToOutputBinding, a.globalConfig.Spec.TracingSpec)
}
sendToOutputBinding 方法的实现在 pkg/runtime/runtime.go
:
func (a *DaprRuntime) sendToOutputBinding(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
if req.Operation == "" {
return nil, errors.New("operation field is missing from request")
}
// 根据 name 找已经注册好的 binding
if binding, ok := a.outputBindings[name]; ok {
ops := binding.Operations()
for _, o := range ops {
// 找到改 binding 下支持的 operation
if o == req.Operation {
// 关键代码,需要转到具体的实现了
return binding.Invoke(req)
}
}
supported := make([]string, len(ops))
for _, o := range ops {
supported = append(supported, string(o))
}
return nil, errors.Errorf("binding %s does not support operation %s. supported operations:%s", name, req.Operation, strings.Join(supported, " "))
}
return nil, errors.Errorf("couldn't find output binding %s", name)
}
5 - 资源绑定的Metadata总结
总结一下各种binding实现中 metadata 的设计和使用:
实现 | 配置级别的metadata | 请求级别的metadata |
---|---|---|
alicloud oss | key | |
HTTP | url / method | 无 |
cron | schedule | 无 |
MQTT | url / topic | 无 |
RabbitMQ | host / queueName / durable deleteWhenUnused / prefetchCount |
ttlInSeconds |
Redis | host / password / enableTLS / maxRetries / maxRetryBackoff |
key |
Influx | url / token / org / bucket | 无 |
Kafka | brokers / topics / publishTopic consumerGroup / authRequried saslUsername / saslPassword |
key |
Kubernetes | namespace / resyncPeriodInSec / | 无 |
twilio-sendgrid | apiKey / emailFrom / emailTo subject / emailCc / emailBcc |
emailFrom / emailTo / subject emailCc / emailBcc |
twilio-sms | toNumber / fromNumber / accountSid authToken / timeout |
toNumber |
consumerKey / consumerSecret / accessToken accessSecret / query |
query / lang / result / since_id | |
gcp-bucket | bucket / type / project_id / private_key_id private_key / client_email / client_id auth_uri / token_uri auth_provider_x509_cert_url / client_x509_cert_url |
name |
gcp-pubsub | topic / subscription / type / project_id / private_key_id / private_key client_email / client_id / auth_uri / token_uri auth_provider_x509_cert_url / client_x509_cert_url |
topic |
Azure-blobstorage | storageAccount / storageAccessKey / container | blobName / ContentType / ContentMD5 ContentEncoding / ContentLanguage ContentDisposition / CacheControl |
Azure-cosmosDB | url / masterKey / database / collection / partitionKey |
无 |
Azure-EventGrid | tenantId / subscriptionId / clientId clientSecret / subscriberEndpoint handshakePort / scope eventSubscriptionName / accessKey topicEndpoint |
无 |
Azure-EventHubs | connection / consumerGroup / storageAccountName / storageAccountKey / storageContainerName partitionID / partitionKey |
partitionKey |
Azure-ServiceBusQueues | connectionString / queueName / ttl | id / correlationID / ttlInSeconds |
Azure-SignalR | connectionString / hub | hub / group / user |
Azure-storagequeue | ttlInSeconds | |
Aws-dynamodb | region / endpoint / accessKey secretKey / table |
无 |
Aws-kinesis | streamName / consumerName / region endpoint / accessKey secretKey / mode |
partitionKey |
Aws-s3 | region / endpoint / accessKey secretKey / bucket |
key |
Aws-sns | topicArn / region / endpoint accessKey / secretKey |
无 |
Aws-sqs | queueName / region / endpoint accessKey / secretKey |
无 |