1 - 资源绑定的源码概述

Dapr的资源绑定的源码概述

2 - 资源绑定的初始化源码分析

Dapr资源绑定的初始化源码分析

Binding Registry

Binding Registry的初始化准备

Binding Registry 的初始化在 runtime 初始化时进行:

func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration) *DaprRuntime {
  ......
  bindingsRegistry:       bindings_loader.NewRegistry(),
}

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {	
  ......
  a.bindingsRegistry.RegisterInputBindings(opts.inputBindings...)
	a.bindingsRegistry.RegisterOutputBindings(opts.outputBindings...)
  ......
}

这些 opts 来自 runtime 启动时的配置,如 cmd/daprd/main.go 下:

func main() {
	rt, err := runtime.FromFlags()
	if err != nil {
		log.Fatal(err)
	}

	err = rt.Run(
    ......
    runtime.WithInputBindings(
			bindings_loader.NewInput("aws.sqs", func() bindings.InputBinding {
				return sqs.NewAWSSQS(logContrib)
			}),
			bindings_loader.NewInput("aws.kinesis", func() bindings.InputBinding {
				return kinesis.NewAWSKinesis(logContrib)
			}),
			bindings_loader.NewInput("azure.eventhubs", func() bindings.InputBinding {
				return eventhubs.NewAzureEventHubs(logContrib)
			}),
			bindings_loader.NewInput("kafka", func() bindings.InputBinding {
				return kafka.NewKafka(logContrib)
			}),
			bindings_loader.NewInput("mqtt", func() bindings.InputBinding {
				return mqtt.NewMQTT(logContrib)
			}),
			bindings_loader.NewInput("rabbitmq", func() bindings.InputBinding {
				return bindings_rabbitmq.NewRabbitMQ(logContrib)
			}),
			bindings_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
				return servicebusqueues.NewAzureServiceBusQueues(logContrib)
			}),
			bindings_loader.NewInput("azure.storagequeues", func() bindings.InputBinding {
				return storagequeues.NewAzureStorageQueues(logContrib)
			}),
			bindings_loader.NewInput("gcp.pubsub", func() bindings.InputBinding {
				return pubsub.NewGCPPubSub(logContrib)
			}),
			bindings_loader.NewInput("kubernetes", func() bindings.InputBinding {
				return kubernetes.NewKubernetes(logContrib)
			}),
			bindings_loader.NewInput("azure.eventgrid", func() bindings.InputBinding {
				return eventgrid.NewAzureEventGrid(logContrib)
			}),
			bindings_loader.NewInput("twitter", func() bindings.InputBinding {
				return twitter.NewTwitter(logContrib)
			}),
			bindings_loader.NewInput("cron", func() bindings.InputBinding {
				return cron.NewCron(logContrib)
			}),
		),
    runtime.WithOutputBindings(
			bindings_loader.NewOutput("aws.sqs", func() bindings.OutputBinding {
				return sqs.NewAWSSQS(logContrib)
			}),
			bindings_loader.NewOutput("aws.sns", func() bindings.OutputBinding {
				return sns.NewAWSSNS(logContrib)
			}),
			bindings_loader.NewOutput("aws.kinesis", func() bindings.OutputBinding {
				return kinesis.NewAWSKinesis(logContrib)
			}),
			bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
				return eventhubs.NewAzureEventHubs(logContrib)
			}),
			bindings_loader.NewOutput("aws.dynamodb", func() bindings.OutputBinding {
				return dynamodb.NewDynamoDB(logContrib)
			}),
			bindings_loader.NewOutput("azure.cosmosdb", func() bindings.OutputBinding {
				return bindings_cosmosdb.NewCosmosDB(logContrib)
			}),
			bindings_loader.NewOutput("gcp.bucket", func() bindings.OutputBinding {
				return bucket.NewGCPStorage(logContrib)
			}),
			bindings_loader.NewOutput("http", func() bindings.OutputBinding {
				return http.NewHTTP(logContrib)
			}),
			bindings_loader.NewOutput("kafka", func() bindings.OutputBinding {
				return kafka.NewKafka(logContrib)
			}),
			bindings_loader.NewOutput("mqtt", func() bindings.OutputBinding {
				return mqtt.NewMQTT(logContrib)
			}),
			bindings_loader.NewOutput("rabbitmq", func() bindings.OutputBinding {
				return bindings_rabbitmq.NewRabbitMQ(logContrib)
			}),
			bindings_loader.NewOutput("redis", func() bindings.OutputBinding {
				return redis.NewRedis(logContrib)
			}),
			bindings_loader.NewOutput("aws.s3", func() bindings.OutputBinding {
				return s3.NewAWSS3(logContrib)
			}),
			bindings_loader.NewOutput("azure.blobstorage", func() bindings.OutputBinding {
				return blobstorage.NewAzureBlobStorage(logContrib)
			}),
			bindings_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
				return servicebusqueues.NewAzureServiceBusQueues(logContrib)
			}),
			bindings_loader.NewOutput("azure.storagequeues", func() bindings.OutputBinding {
				return storagequeues.NewAzureStorageQueues(logContrib)
			}),
			bindings_loader.NewOutput("gcp.pubsub", func() bindings.OutputBinding {
				return pubsub.NewGCPPubSub(logContrib)
			}),
			bindings_loader.NewOutput("azure.signalr", func() bindings.OutputBinding {
				return signalr.NewSignalR(logContrib)
			}),
			bindings_loader.NewOutput("twilio.sms", func() bindings.OutputBinding {
				return sms.NewSMS(logContrib)
			}),
			bindings_loader.NewOutput("twilio.sendgrid", func() bindings.OutputBinding {
				return sendgrid.NewSendGrid(logContrib)
			}),
			bindings_loader.NewOutput("azure.eventgrid", func() bindings.OutputBinding {
				return eventgrid.NewAzureEventGrid(logContrib)
			}),
			bindings_loader.NewOutput("cron", func() bindings.OutputBinding {
				return cron.NewCron(logContrib)
			}),
			bindings_loader.NewOutput("twitter", func() bindings.OutputBinding {
				return twitter.NewTwitter(logContrib)
			}),
			bindings_loader.NewOutput("influx", func() bindings.OutputBinding {
				return influx.NewInflux(logContrib)
			}),
		),
    ......
}

在这里配置各种 inputbinding 和 output binding的实现。

Binding Registry的实现方式

pkg/components/bindings/registry.go,定义了多个数据结构:

type (
	// InputBinding is an input binding component definition.
	InputBinding struct {
		Name          string
		FactoryMethod func() bindings.InputBinding
	}

	// OutputBinding is an output binding component definition.
	OutputBinding struct {
		Name          string
		FactoryMethod func() bindings.OutputBinding
	}

	// Registry is the interface of a components that allows callers to get registered instances of input and output bindings
	Registry interface {
		RegisterInputBindings(components ...InputBinding)
		RegisterOutputBindings(components ...OutputBinding)
		CreateInputBinding(name string) (bindings.InputBinding, error)
		CreateOutputBinding(name string) (bindings.OutputBinding, error)
	}

	bindingsRegistry struct {
		inputBindings  map[string]func() bindings.InputBinding
		outputBindings map[string]func() bindings.OutputBinding
	}
)

前面 runtime 初始化时,每个实现都通过 NewInput 方法和 NewOutput方法,将 name 和对应的InputBinding/OutputBinding关联起来:

// NewInput creates a InputBinding.
func NewInput(name string, factoryMethod func() bindings.InputBinding) InputBinding {
	return InputBinding{
		Name:          name,
		FactoryMethod: factoryMethod,
	}
}

// NewOutput creates a OutputBinding.
func NewOutput(name string, factoryMethod func() bindings.OutputBinding) OutputBinding {
	return OutputBinding{
		Name:          name,
		FactoryMethod: factoryMethod,
	}
}

RegisterInputBindings 和 RegisterOutputBindings 方法用来注册 input binding 和 output binding

的实现,在runtime 初始化时被调用:

// RegisterInputBindings registers one or more new input bindings.
func (b *bindingsRegistry) RegisterInputBindings(components ...InputBinding) {
	for _, component := range components {
		b.inputBindings[createFullName(component.Name)] = component.FactoryMethod
	}
}

// RegisterOutputBindings registers one or more new output bindings.
func (b *bindingsRegistry) RegisterOutputBindings(components ...OutputBinding) {
	for _, component := range components {
		b.outputBindings[createFullName(component.Name)] = component.FactoryMethod
	}
}

func createFullName(name string) string {
  // createFullName统一增加前缀 bindings.
	return fmt.Sprintf("bindings.%s", name)
}

binding的初始化流程

pkg/runtime/runtime.go :

Binding 的初始化在 runtime 初始化时进行:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	......
	go a.processComponents()
	......
}
func (a *DaprRuntime) processComponents() {
   for {
      comp, more := <-a.pendingComponents
      if !more {
         a.pendingComponentsDone <- true
         return
      }
      if err := a.processOneComponent(comp); err != nil {
         log.Errorf("process component %s error, %s", comp.Name, err)
      }
   }
}

processOneComponent:

func (a *DaprRuntime) processOneComponent(comp components_v1alpha1.Component) error {
	res := a.preprocessOneComponent(&comp)
  
	compCategory := a.figureOutComponentCategory(comp)

	......
	return nil
}

doProcessOneComponent:

func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
	switch category {
	case bindingsComponent:
		return a.initBinding(comp)
		......
	}
	return nil
}

initBinding:

func (a *DaprRuntime) initBinding(c components_v1alpha1.Component) error {
	if err := a.initOutputBinding(c); err != nil {
		log.Errorf("failed to init output bindings: %s", err)
		return err
	}

	if err := a.initInputBinding(c); err != nil {
		log.Errorf("failed to init input bindings: %s", err)
		return err
	}
	return nil
}

在这里进行 input binding 和 output binding 的初始化。

Output Binding的初始化

pkg/runtime/runtime.go:

func (a *DaprRuntime) initOutputBinding(c components_v1alpha1.Component) error {
  // 成功
	binding, err := a.bindingsRegistry.CreateOutputBinding(c.Spec.Type)
	if err != nil {
		log.Warnf("failed to create output binding %s (%s): %s", c.ObjectMeta.Name, c.Spec.Type, err)
		diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation")
		return err
	}

	if binding != nil {
		err := binding.Init(bindings.Metadata{
			Properties: a.convertMetadataItemsToProperties(c.Spec.Metadata),
			Name:       c.ObjectMeta.Name,
		})
		if err != nil {
			log.Errorf("failed to init output binding %s (%s): %s", c.ObjectMeta.Name, c.Spec.Type, err)
			diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init")
			return err
		}
		log.Infof("successful init for output binding %s (%s)", c.ObjectMeta.Name, c.Spec.Type)
		a.outputBindings[c.ObjectMeta.Name] = binding
		diag.DefaultMonitoring.ComponentInitialized(c.Spec.Type)
	}
	return nil
}

其中 CreateOutputBinding 方法的实现在 pkg/components/bindings/registry.go 中:

// Create instantiates an output binding based on `name`.
func (b *bindingsRegistry) CreateOutputBinding(name string) (bindings.OutputBinding, error) {
	if method, ok := b.outputBindings[name]; ok {
    // 调用 factory 方法生成具体实现的 outputBinding
		return method(), nil
	}
	return nil, errors.Errorf("couldn't find output binding %s", name)
}

Input Binding的初始化

TODO

3 - 资源绑定的Redis output实现源码分析

Dapr资源绑定的Redis output实现源码分析

备注:根据 https://github.com/dapr/docs/blob/master/concepts/bindings/README.md 的描述,redis 只实现了 output binding。

output binding 的实现

Redis的实现在 dapr/components-contrib 下,/bindings/redis/redis.go 中:

func (r *Redis) Operations() []bindings.OperationKind {
  // 只支持create
	return []bindings.OperationKind{bindings.CreateOperation}
}

func (r *Redis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
  // 通过 metadata 传递 key
	if val, ok := req.Metadata["key"]; ok && val != "" {
		key := val
    // 调用标准 redis 客户端,执行 SET 命令
		_, err := r.client.DoContext(context.Background(), "SET", key, req.Data).Result()
		if err != nil {
			return nil, err
		}
		return nil, nil
	}
	return nil, errors.New("redis binding: missing key on write request metadata")
}

完整分析

初始化:

在 dapr runtime 初始化时,关联 redis 的 output binding实现:

bindings_loader.NewOutput("redis", func() bindings.OutputBinding {
   return redis.NewRedis(logContrib)
}),

然后 Init 方法会在 output binding初始化时被 dapr runtime 调用,Redis的实现内容为:

// Init performs metadata parsing and connection creation
func (r *Redis) Init(meta bindings.Metadata) error {
  // 解析metadata
	m, err := r.parseMetadata(meta)
	if err != nil {
		return err
	}

  // redis 连接属性
	opts := &redis.Options{
		Addr:            m.host,
		Password:        m.password,
		DB:              defaultDB,
		MaxRetries:      m.maxRetries,
		MaxRetryBackoff: m.maxRetryBackoff,
	}

	/* #nosec */
	if m.enableTLS {
		opts.TLSConfig = &tls.Config{
			InsecureSkipVerify: m.enableTLS,
		}
	}

  // 建立redis连接
	r.client = redis.NewClient(opts)
	_, err = r.client.Ping().Result()
	if err != nil {
		return fmt.Errorf("redis binding: error connecting to redis at %s: %s", m.host, err)
	}

	return err
}

4 - 资源绑定的output处理源码分析

Dapr资源绑定的output处理源码分析

pkc/grpc/api.go 中的 InvokeBinding 方法:

func (a *api) InvokeBinding(ctx context.Context, in *runtimev1pb.InvokeBindingRequest) (*runtimev1pb.InvokeBindingResponse, error) {
	req := &bindings.InvokeRequest{
		Metadata:  in.Metadata,
		Operation: bindings.OperationKind(in.Operation),
	}
	if in.Data != nil {
		req.Data = in.Data
	}

	r := &runtimev1pb.InvokeBindingResponse{}
  // 关键实现在这里
	resp, err := a.sendToOutputBindingFn(in.Name, req)
	if err != nil {
		err = fmt.Errorf("ERR_INVOKE_OUTPUT_BINDING: %s", err)
		apiServerLogger.Debug(err)
		return r, err
	}

	if resp != nil {
		r.Data = resp.Data
		r.Metadata = resp.Metadata
	}
	return r, nil
}

sendToOutputBindingFn 方法的初始化在这里:

func (a *DaprRuntime) getGRPCAPI() grpc.API {
	return grpc.NewAPI(a.runtimeConfig.ID, a.appChannel, a.stateStores, a.secretStores, a.getPublishAdapter(), a.directMessaging, a.actor, a.sendToOutputBinding, a.globalConfig.Spec.TracingSpec)
}

sendToOutputBinding 方法的实现在 pkg/runtime/runtime.go:

func (a *DaprRuntime) sendToOutputBinding(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
   if req.Operation == "" {
      return nil, errors.New("operation field is missing from request")
   }

   // 根据 name 找已经注册好的 binding
   if binding, ok := a.outputBindings[name]; ok {
      ops := binding.Operations()
      for _, o := range ops {
      	 // 找到改 binding 下支持的 operation
         if o == req.Operation {
         		// 关键代码,需要转到具体的实现了
            return binding.Invoke(req)
         }
      }
      supported := make([]string, len(ops))
      for _, o := range ops {
         supported = append(supported, string(o))
      }
      return nil, errors.Errorf("binding %s does not support operation %s. supported operations:%s", name, req.Operation, strings.Join(supported, " "))
   }
   return nil, errors.Errorf("couldn't find output binding %s", name)
}

5 - 资源绑定的Metadata总结

Dapr资源绑定的Metadata总结

总结一下各种binding实现中 metadata 的设计和使用:

实现 配置级别的metadata 请求级别的metadata
alicloud oss key
HTTP url / method
cron schedule
MQTT url / topic
RabbitMQ host / queueName / durable
deleteWhenUnused / prefetchCount
ttlInSeconds
Redis host / password / enableTLS /
maxRetries / maxRetryBackoff
key
Influx url / token / org / bucket
Kafka brokers / topics / publishTopic
consumerGroup / authRequried
saslUsername / saslPassword
key
Kubernetes namespace / resyncPeriodInSec /
twilio-sendgrid apiKey / emailFrom / emailTo
subject / emailCc / emailBcc
emailFrom / emailTo / subject
emailCc / emailBcc
twilio-sms toNumber / fromNumber / accountSid
authToken / timeout
toNumber
twitter consumerKey / consumerSecret / accessToken
accessSecret / query
query / lang / result / since_id
gcp-bucket bucket / type / project_id / private_key_id
private_key / client_email / client_id
auth_uri / token_uri
auth_provider_x509_cert_url / client_x509_cert_url
name
gcp-pubsub topic / subscription / type /
project_id / private_key_id / private_key
client_email / client_id / auth_uri / token_uri
auth_provider_x509_cert_url / client_x509_cert_url
topic
Azure-blobstorage storageAccount / storageAccessKey / container blobName / ContentType / ContentMD5
ContentEncoding / ContentLanguage
ContentDisposition / CacheControl
Azure-cosmosDB url / masterKey / database /
collection / partitionKey
Azure-EventGrid tenantId / subscriptionId / clientId
clientSecret / subscriberEndpoint
handshakePort / scope
eventSubscriptionName / accessKey
topicEndpoint
Azure-EventHubs connection / consumerGroup / storageAccountName /
storageAccountKey / storageContainerName
partitionID / partitionKey
partitionKey
Azure-ServiceBusQueues connectionString / queueName / ttl id / correlationID / ttlInSeconds
Azure-SignalR connectionString / hub hub / group / user
Azure-storagequeue ttlInSeconds
Aws-dynamodb region / endpoint / accessKey
secretKey / table
Aws-kinesis streamName / consumerName / region
endpoint / accessKey
secretKey / mode
partitionKey
Aws-s3 region / endpoint / accessKey
secretKey / bucket
key
Aws-sns topicArn / region / endpoint
accessKey / secretKey
Aws-sqs queueName / region / endpoint
accessKey / secretKey