1 - 发布的主流程

发布的主流程分析

1.1 - 流程概述

Dapr发布的流程和API概述

API 和端口

Dapr runtime 对外提供两个 API,分别是 Dapr HTTP API 和 Dapr gRPC API。两个 Dapr API 对外暴露的端口,默认是:

  • 3500: HTTP 端口,可以通过命令行参数 dapr-http-port 设置
  • 50001: gRPC 端口,可以通过命令行参数 dapr-grpc-port 设置

gRPC API

gRPC API 定义在 dapr/proto/runtime/v1/dapr.proto 文件中的 Dapr service 中:

service Dapr {
  // Publishes events to the specific topic.
  rpc PublishEvent(PublishEventRequest) returns (google.protobuf.Empty) {}
  ......
}

// PublishEventRequest is the message to publish event data to pubsub topic
message PublishEventRequest {
  // The name of the pubsub component
  string pubsub_name = 1;

  // The pubsub topic
  string topic = 2;

  // The data which will be published to topic.
  bytes data = 3;

  // The content type for the data (optional).
  string data_content_type = 4;

  // The metadata passing to pub components
  //
  // metadata property:
  // - key : the key of the message.
  map<string, string> metadata = 5;
}

主要的参数是:

  • pubsub_name:dapr pubsub component的名字
  • topic:发布消息的目标topic
  • data:消息的数据

可选参数有:

  • data_content_type:消息数据的内容类型
  • metadata:可选的元数据信息,用于扩展

HTTP API

HTTP API 没有明确的单独定义,不过可以从代码中获知。在 pkg/http/api.go 中,构建用于 publish 的 endpoint 的代码如下:

func (a *api) constructPubSubEndpoints() []Endpoint {
	return []Endpoint{
		{
      // 发送 POST 或者 PUT 请求
			Methods: []string{fasthttp.MethodPost, fasthttp.MethodPut},
      // 到这个 URL
			Route:   "publish/{pubsubname}/{topic:*}",
			Version: apiVersionV1,
			Handler: a.onPublish,
		},
	}
}

因此,用于 publish 的 daprd URL 类似于 http://localhost:3500/v1.0/publish/pubsubname1/topic1

处理请求的 handler 方法 a.onPublish() 中读取参数的代码如下(忽略其他细节):

const (
  pubsubnameparam          = "pubsubname"


// 从 url 中读取 pubsubname
pubsubName := reqCtx.UserValue(pubsubnameparam).(string)
// 从 url 中读取 topic
topic := reqCtx.UserValue(topicParam).(string)
// 从 HTTP body 
body := reqCtx.PostBody()
// 从 HTTP 的 Content-Type header 中读取 data_content_type
contentType := string(reqCtx.Request.Header.Peek("Content-Type"))
  
// 从 HTTP URL query 中读取 metadata
metadata := getMetadataFromRequest(reqCtx)

Metadata 的读取要稍微复杂一些,需要读取所有的 url query 参数,然后根据 key 的前缀判断是不是 metadata:

const (
	metadataPrefix        = "metadata."
)

func getMetadataFromRequest(reqCtx *fasthttp.RequestCtx) map[string]string {
	metadata := map[string]string{}
  // 游历所有的 url query 参数
	reqCtx.QueryArgs().VisitAll(func(key []byte, value []byte) {
		queryKey := string(key)
    // 如果 query 参数的 key 以 "metadata." 开头,就视为一个 metadata 的key
		if strings.HasPrefix(queryKey, metadataPrefix) {
      // key 的 前缀 "metadata." 要去掉
			k := strings.TrimPrefix(queryKey, metadataPrefix)
			metadata[k] = string(value)
		}
	})

	return metadata
}

总结:用于 publish 的完整的 daprd URL 类似于 http://localhost:3500/v1.0/publish/pubsubname1/topic1?metadata.k1=v1&metadata.k2=v2&metadata.k3=v3。消息内容通过 HTTP body 传递,另外可以通过 Content-Type header 传递消息内容类型参数。

发布流程

gRPC 协议

默认情况下使用 gRPC 协议进行消息发布,daprd 在默认的 50001 端口,通过注册的 dapr service 的 PublishEvent() 方法接收来自客户端通过 dapr SDK 发出的 gRPC 请求,之后根据具体的组件实现,对底层实际使用的消息中间件发布事件。流程大体如下:

title Pub-Sub via gRPC Protocol
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =User Code
    ----
    producer
]
participant SDK_client [
    =Dapr SDK
    ----
    producer
]
end box
participant daprd_client [
    =daprd
    ----
    producer
]
participant message_broker as "Message Broker"

user_code_client -> SDK_client : PublishEvent() 
note left: pubsub_name="name-1"\ntopic="topic-1"\ndata="[...]"\ndata_content_type=""\nmetadata="[...]"
note right: PublishEvent() @ Dapr service
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001
|||
daprd_client -[#red]> message_broker : native protocol (remote call)
|||
message_broker --[#red]> daprd_client :
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

HTTP 协议

HTTP协议类似,daprd 在默认的 3500 端口,通过前面所述的URL接收客户端通过 dapr SDK 发出的 HTTP 请求。流程大体如下:

title Pub-Sub via HTTP Protocol
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =User Code
    ----
    producer
]
participant SDK_client [
    =Dapr SDK
    ----
    producer
]
end box
participant daprd_client [
    =daprd
    ----
    producer
]
participant message_broker as "Message Broker"

user_code_client -> SDK_client : PublishEvent() 
note left: pubsub_name="name-1"\ntopic="topic-1"\ndata="[...]"\ndata_content_type=""\nmetadata="[...]"
note right: POST http://localhost:3500/v1.0/publish/pubsubname1/topic1?\nmetadata.k1=v1&metadata.k2=v2&metadata.k3=v3
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500
|||
daprd_client -[#red]> message_broker : native protocol (remote call)
|||
message_broker --[#red]> daprd_client :
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

1.2 - 发布相关的Runtime初始化

Dapr Runtime中和发布相关的初始化流程

在 dapr runtime 启动进行初始化时,需要开启 API 端口并挂载相应的 handler 来接收并处理发布订阅中的发布请求。另外需要根据配置文件启动 pubsub component 以便连接到外部 message broker。

启动 Dapr gRPC API Server(outbound)

启动 gRPC 服务器

在 dapr runtime 启动时的初始化过程中,会启动 gRPC server, 代码在 pkg/runtime/runtime.go 中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    // Create and start internal and external gRPC servers
	grpcAPI := a.getGRPCAPI()
    
	err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
    ......
}

func (a *DaprRuntime) startGRPCAPIServer(api grpc.API, port int) error {
	serverConf := a.getNewServerConfig(a.runtimeConfig.APIListenAddresses, port)
	server := grpc.NewAPIServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.globalConfig.Spec.APISpec, a.proxy)
    if err := server.StartNonBlocking(); err != nil {
		return err
	}
	......
}

// NewAPIServer returns a new user facing gRPC API server.
func NewAPIServer(api API, config ServerConfig, ......) Server {
	return &server{
		api:         api,
		config:      config,
		kind:        apiServer, // const apiServer = "apiServer"
		......
	}
}

注册 Dapr API

为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr API,需要将定义 dapr api 的 dapr service 注册到 gRPC 服务器上去。

注册的代码实现在 pkg/grpc/server.go 中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:

func (s *server) StartNonBlocking() error {
		if s.kind == internalServer {
			internalv1pb.RegisterServiceInvocationServer(server, s.api)
		} else if s.kind == apiServer {
            runtimev1pb.RegisterDaprServer(server, s.api)		// 注意:s.api (即 gRPC api 实现) 被传递进去
		}
		......
}

而 RegisterDaprServer() 方法的实现代码在 pkg/proto/runtime/v1/dapr_grpc.pb.go:

func RegisterDaprServer(s grpc.ServiceRegistrar, srv DaprServer) {
	s.RegisterService(&Dapr_ServiceDesc, srv)					// srv 即 gRPC api 实现
}

Dapr_ServiceDesc 定义

在文件 pkg/proto/runtime/v1/dapr_grpc.pb.go 中有 Dapr Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。

Dapr_ServiceDesc 中有 Dapr Service 各个方法的定义,和发布相关的是 PublishEvent 方法:

var Dapr_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "dapr.proto.runtime.v1.Dapr",
	HandlerType: (*DaprServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "PublishEvent",				  # 注册方法名
			Handler:    _Dapr_PublishEvent_Handler,	  # 关联实现的 Handler
		},
        ......
        },
	},
	Metadata: "dapr/proto/runtime/v1/dapr.proto",
}

这一段是告诉 gRPC server: 如果收到访问 dapr.proto.runtime.v1.Dapr 服务的 PublishEvent 方法的 gRPC 请求,请把请求转给 _Dapr_PublishEvent_Handler 处理。

title Dapr publish gRPC API 
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    producer
]

-[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/PublishEvent
|||
<[#blue]-- daprd_client

PublishEvent 方法相关联的 handler 方法 _Dapr_PublishEvent_Handler 的实现代码是:

func _Dapr_PublishEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(PublishEventRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(DaprServer).PublishEvent(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/dapr.proto.runtime.v1.Dapr/PublishEvent",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(DaprServer).PublishEvent(ctx, req.(*PublishEventRequest))
	}
	return interceptor(ctx, in, info, handler)
}

最后调用到了 DaprServer 接口实现的 PublishEvent 方法,也就是 gPRC API 实现。

启动 Dapr HTTP API Server(outbound)

在 dapr runtime 中启动 HTTP server

在 dapr runtime 启动时的初始化过程中,会启动 HTTP server, 代码在 pkg/runtime/runtime.go

dapr runtime  HTTP server 用的是 fasthttp

 dapr runtime 启动时的初始化过程中会启动 HTTP server 代码在 pkg/runtime/runtime.go 

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
  ......
  // Start HTTP Server
	err = a.startHTTPServer(a.runtimeConfig.HTTPPort, a.runtimeConfig.PublicPort, a.runtimeConfig.ProfilePort, a.runtimeConfig.AllowedOrigins, pipeline)
	if err != nil {
		log.Fatalf("failed to start HTTP server: %s", err)
	}
  ......
}

func (a *DaprRuntime) startHTTPServer(......) error {
	a.daprHTTPAPI = http.NewAPI(......)

	server := http.NewServer(a.daprHTTPAPI, ......)
  if err := server.StartNonBlocking(); err != nil {		// StartNonBlocking 启动 fasthttp server
		return err
	}
}

挂载 PubSub 的 HTTP 端点

在 HTTP API 的初始化过程中,会在 fast http server 上挂载 PubSub 的 HTTP 端点,代码在 pkg/http/api.go 中:

func NewAPI(
  appID string,
	appChannel channel.AppChannel,
	directMessaging messaging.DirectMessaging,
  ......
  	shutdown func()) API {
  
  	api := &api{
		appChannel:               appChannel,
		directMessaging:          directMessaging,
		......
	}
  
  	// 附加 PubSub 的 HTTP 端点
  	api.endpoints = append(api.endpoints, api.constructPubSubEndpoints()...)
}

PubSub 的 HTTP 端点的具体信息在 constructPubSubEndpoints() 方法中:

func (a *api) constructPubSubEndpoints() []Endpoint {
	return []Endpoint{
		{
			Methods: []string{fasthttp.MethodPost, fasthttp.MethodPut},
			Route:   "publish/{pubsubname}/{topic:*}",
			Version: apiVersionV1,
			Handler: a.onPublish,
		},
	}
}

注意这里的 Route 路径 “publish/{pubsubname}/{topic:*}", dapr sdk 就是就通过这样的 url 来发起 HTTP publish 请求。

title Dapr Publish HTTP API 
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    producer
]

-[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/publish/{pubsubname}/{topic:*}
|||
<[#blue]-- daprd_client

pubsub 组件初始化

为了提供对 pubsub 的功能支持,需要为 dapr runtime 配置 pubsub component。

pubSubRegistry 和 pubSubs 列表

DaprRuntime 的结构体中保存有 pubSubRegistry 和 pubSubs 列表:

type DaprRuntime struct {
	......
	pubSubRegistry         pubsub_loader.Registry
	pubSubs                map[string]pubsub.PubSub
	......
}

runtime 构建时会初始化这两个结构体:

func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration, accessControlList *config.AccessControlList, resiliencyProvider resiliency.Provider) *DaprRuntime {
	ctx, cancel := context.WithCancel(context.Background())
	return &DaprRuntime{
		......
		pubSubs:                map[string]pubsub.PubSub{},
		pubSubRegistry:         pubsub_loader.NewRegistry(),
        ......

PubSubRegistry 保存pubsub组件列表

pubSubRegistry 用于保存 dapr runtime 中支持的所有 pubsub component :

pubSubRegistry struct {
    messageBuses map[string]func() pubsub.PubSub
}

在 runtime binary (cmd/daprd/main.go)的代码中,会列举出所有的 pubsub component ,这也是 darp 和 conponents-contrib 两个仓库的直接联系:

err = rt.Run(
		......
		runtime.WithPubSubs(
			pubsub_loader.New("azure.eventhubs", func() pubs.PubSub {
				return pubsub_eventhubs.NewAzureEventHubs(logContrib)
			}),
			pubsub_loader.New("azure.servicebus", func() pubs.PubSub {
				return servicebus.NewAzureServiceBus(logContrib)
			}),
			pubsub_loader.New("gcp.pubsub", func() pubs.PubSub {
				return pubsub_gcp.NewGCPPubSub(logContrib)
			}),
			pubsub_loader.New("hazelcast", func() pubs.PubSub {
				return pubsub_hazelcast.NewHazelcastPubSub(logContrib)
			}),
			pubsub_loader.New("jetstream", func() pubs.PubSub {
				return pubsub_jetstream.NewJetStream(logContrib)
			}),
			pubsub_loader.New("kafka", func() pubs.PubSub {
				return pubsub_kafka.NewKafka(logContrib)
			}),
			pubsub_loader.New("mqtt", func() pubs.PubSub {
				return pubsub_mqtt.NewMQTTPubSub(logContrib)
			}),
			pubsub_loader.New("natsstreaming", func() pubs.PubSub {
				return natsstreaming.NewNATSStreamingPubSub(logContrib)
			}),
			pubsub_loader.New("pulsar", func() pubs.PubSub {
				return pubsub_pulsar.NewPulsar(logContrib)
			}),
			pubsub_loader.New("rabbitmq", func() pubs.PubSub {
				return rabbitmq.NewRabbitMQ(logContrib)
			}),
			pubsub_loader.New("redis", func() pubs.PubSub {
				return pubsub_redis.NewRedisStreams(logContrib)
			}),
			pubsub_loader.New("snssqs", func() pubs.PubSub {
				return pubsub_snssqs.NewSnsSqs(logContrib)
			}),
			pubsub_loader.New("in-memory", func() pubs.PubSub {
				return pubsub_inmemory.New(logContrib)
			}),
		),
    ......
)

runtime 在初始化时会将这些 pubsub component 信息保存在 pubSubRegistry 中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    ......
	a.pubSubRegistry.Register(opts.pubsubs...)
}

需要注意的是,pubSubRegistry 中保存的组件列表是所有的被 dapr runtime 支持的组件列表,但是,不是每个组件在 runtime 启动时都会被装载。组件的安装时按需的,由组件配置文件(yaml)来决定装载和初始化那些组件的示例。

runtime 装载 pubsub 组件

组件在 dapr runtime 初始化时统一装载:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    ......
	a.pubSubRegistry.Register(opts.pubsubs...)
	a.secretStoresRegistry.Register(opts.secretStores...)
	a.stateStoreRegistry.Register(opts.states...)
    ......
    err = a.loadComponents(opts)
    a.flushOutstandingComponents()
    ......
}

有两种实现,KubernetesMode 和 StandaloneMode:

func (a *DaprRuntime) loadComponents(opts *runtimeOpts) error {
    	var loader components.ComponentLoader

	switch a.runtimeConfig.Mode {
	case modes.KubernetesMode:
		loader = components.NewKubernetesComponents(a.runtimeConfig.Kubernetes, a.namespace, a.operatorClient, a.podName)
	case modes.StandaloneMode:
		loader = components.NewStandaloneComponents(a.runtimeConfig.Standalone)
	default:
		return errors.Errorf("components loader for mode %s not found", a.runtimeConfig.Mode)
	}
    comps, err := loader.LoadComponents()
    ......
}

KubernetesMode 下读取的是 k8s 下的 component CRD:

func (k *KubernetesComponents) LoadComponents() ([]components_v1alpha1.Component, error) {
	resp, err := k.client.ListComponents(context.Background(), &operatorv1pb.ListComponentsRequest{
		Namespace: k.namespace,
		PodName:   k.podName,
	}, ......
}

StandaloneMode 下读取的是由 ComponentsPath 配置(--componentspath)指定的目录下的 component CRD 文件:

func (s *StandaloneComponents) LoadComponents() ([]components_v1alpha1.Component, error) {
	files, err := os.ReadDir(s.config.ComponentsPath)
	......
}

总结

在完成 HTTP server 和 gRPC server 的初始化之后,dapr runtime 就做好了接收 publish 请求的准备。

1.3 - 客户端sdk发出publish请求

Dapr客户端sdk封装dapr api,发出发布订阅的publish请求

Java SDK 实现

在业务代码中使用 pubsub 功能的示例可参考文件 dapr java-sdk 中的代码 /src/main/java/io/dapr/examples/pubsub/http/Publisher.java,代码示意如下:

DaprClient client = (new DaprClientBuilder()).build();
String message = String.format("This is message #%d", i);
client.publishEvent(
    "messagebus",
    "testingtopic",
    message,
    singletonMap(Metadata."ttlInSeconds", "1000")).block();

java SDK 中除了 service invoke 默认使用 HTTP ,其他方法都是默认使用 gRPC,在 DaprClientProxy 类中初始化了两个 daprclient:

  1. client 字段: 类型为 DaprClientGrpc,连接到 127.0.0.1:5001
  2. methodInvocationOverrideClient 字段:类型为 DaprClientHttp,连接到 127.0.0.1:3500

pubsub 方法默认走 gRPC ,使用的是 DaprClientGrpc 类型 (文件为 src/main/java/io/dapr/client/DaprClientGrpc.java):

  @Override
  public Mono<Void> publishEvent(PublishEventRequest request) {
    try {
      String pubsubName = request.getPubsubName();
      String topic = request.getTopic();
      Object data = request.getData();
      DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
      ......
      return Mono.subscriberContext().flatMap(
              context ->
                  this.<Empty>createMono(
                      it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
                  )
      ).then();
  }

在这里根据请求条件设置 PublishEvent 请求的各种参数,debug 时可以看到如下图的数据:

java-client-grpc

发出去给 dapr runtime 的 gRPC 请求如下图所示:

java-client-grpc-send

这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr, 方法是 PublishEvent,和前一章中 dapr runtime 初始化中设定的 gRPC API 对应。

title PublishEvent via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    producer
]
participant SDK_client [
    =SDK
    ----
    producer
]
end box
participant daprd_client [
    =daprd
    ----
    producer
]

user_code_client -> SDK_client : PublishEvent() 
note left: pubsub_name="name-1"\ntopic="topic-1"\ndata="[...]"\ndata_content_type=""\nmetadata="[...]"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n"dapr.proto.runtime.v1.Dapr/PublishEvent"
|||
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

Go sdk实现

在 go 业务代码中使用 service invoke 功能的示例可参考 https://github.com/dapr/go-sdk/blob/main/examples/pubsub/pub/pub.go,代码示意如下:

client, err := dapr.NewClient()
err := client.PublishEvent(ctx, pubsubName, topicName, data)

Go SDK 中定义了 Client 接口,文件为 client/client.go

// Client is the interface for Dapr client implementation.
type Client interface {
	// PublishEvent publishes data onto topic in specific pubsub component.
	PublishEvent(ctx context.Context, pubsubName, topicName string, data interface{}, opts ...PublishEventOption) error
    ......
}

方法的实现在 client/pubsub.go 中,都只是实现了对 PublishEventRequest 对象的组装:

func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
    request := &pb.PublishEventRequest{
		PubsubName: pubsubName,
		Topic:      topicName,
	}
	_, err := c.protoClient.PublishEvent(c.withAuthToken(ctx), request)
	......
}

PublishEvent() 是 protoc 生成的 grpc 代码,在 dapr/proto/runtime/v1/dapr_grpc.pb.go 中,实现如下:

func (c *daprClient) PublishEvent(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
	out := new(emptypb.Empty)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/PublishEvent", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr, 方法是 InvokeService,和 dapr runtime 中 gRPC API 对应。

其他SDK

TODO

1.4 - Dapr Runtime 处理来自客户端的 publish 请求

Dapr Runtime 接收来自客户端的 publish 请求的代码分析

在 dapr runtime 中,提供 HTTP 和 gRPC 两种协议,前面 runtime 初始化时介绍了 HTTP 和 gRPC 两种协议是如何在 runtime 初始化时准备好接收来自客户端的 publish 请求的。现在我们介绍在接收到来自客户端的 publish 请求后,dapr runtime 是如何处理请求的。

gRPC API

在 gRPC API 的实现中,PublishEvent() 方法负责处理接收到的 publish 请求,其主要流程大体是如下4个步骤:

type api struct {
    pubsubAdapter              runtimePubsub.Adapter
}

func (a *api) PublishEvent(ctx context.Context, in *runtimev1pb.PublishEventRequest) (*emptypb.Empty, error) {
  // 1. 根据名称找到可以处理请求的 pubsub 组件
  thepubsub := a.pubsubAdapter.GetPubSub(pubsubName)
  // 2. 处理参数的细节:如是否要封装为 cloudevent
  // 细节忽略,后续展开
  // 3. 构建 PublishRequest 请求对象
  req := pubsub.PublishRequest{
		PubsubName: pubsubName,
		Topic:      topic,
		Data:       data,
		Metadata:   in.Metadata,
	}
  // 4. 未退 pubsub 组件来负责具体的请求发送
  err := a.pubsubAdapter.Publish(&req)
}

查找处理请求的 pubsub 组件

  // 检查是否有初始化 pubsubAdapter,没有的话报错退出
  if a.pubsubAdapter == nil {
		err := status.Error(codes.FailedPrecondition, messages.ErrPubsubNotConfigured)
		apiServerLogger.Debug(err)
		return &emptypb.Empty{}, err
	}

	pubsubName := in.PubsubName
  // 检查请求,pubsubName 参数不能为空
	if pubsubName == "" {
		err := status.Error(codes.InvalidArgument, messages.ErrPubsubEmpty)
		apiServerLogger.Debug(err)
		return &emptypb.Empty{}, err
	}

  // 根据 pubsubName 参数在 pubsubAdapter 中找到对应的组件
	thepubsub := a.pubsubAdapter.GetPubSub(pubsubName)
	if thepubsub == nil {
    // 如果找不到,则报错退出
		err := status.Errorf(codes.InvalidArgument, messages.ErrPubsubNotFound, pubsubName)
		apiServerLogger.Debug(err)
		return &emptypb.Empty{}, err
	}

GetPubSub() 方法的实现很简单,就是根据 pubsubName 在现有已经初始化的 pubsub 组件中进行简单的map查找:

// GetPubSub is an adapter method to find a pubsub by name.
func (a *DaprRuntime) GetPubSub(pubsubName string) pubsub.PubSub {
	ps, ok := a.pubSubs[pubsubName]
	if !ok {
		return nil
	}
	return ps.component
}

委托 pubsub 组件发送请求

func (a *DaprRuntime) Publish(req *pubsub.PublishRequest) error {
  // 这里又根据名称做了一次查找
  // TBD:可以考虑做代码优化了,从前面把找到的组件传递过来就好了
	ps, ok := a.pubSubs[req.PubsubName]
	if !ok {
		return runtimePubsub.NotFoundError{PubsubName: req.PubsubName}
	}

  // 检查 pubsub 操作是否被容许
	if allowed := a.isPubSubOperationAllowed(req.PubsubName, req.Topic, ps.scopedPublishings); !allowed {
		return runtimePubsub.NotAllowedError{Topic: req.Topic, ID: a.runtimeConfig.ID}
	}

  // 执行策略
	policy := a.resiliency.ComponentOutboundPolicy(a.ctx, req.PubsubName)
	return policy(func(ctx context.Context) (err error) {
    // 最终调用到底层实际组件的 Publish 方法来发送请求
		return ps.component.Publish(req)
	})
}

HTTP API

HTTP API 的处理方式和 gRPC API 是一致的,只是 HTTP API 这边由于 HTTP 协议的原因,在请求参数的获取上无法像 gRPC API 那样有一个的 runtimev1pb.PublishEventRequest 对象可以完整的封装所有请求参数,HTTP API 会多出一个请求参数的获取过程。

从 HTTP 请求中获取所有参数

HTTP API 实现中的 onPublish() 方法的前面一段代码就是在处理如何从 HTTP 请求中获取 publish 所需的所有参数:

func (a *api) onPublish(reqCtx *fasthttp.RequestCtx) {
  // 1. pubsubName
  pubsubName := reqCtx.UserValue(pubsubnameparam).(string)
  // 2. topic
  topic := reqCtx.UserValue(topicParam).(string)
  // 3. data
  body := reqCtx.PostBody()
  // 4. data content type
	contentType := string(reqCtx.Request.Header.Peek("Content-Type"))
  // 5. metadata
	metadata := getMetadataFromRequest(reqCtx)
  
  // 后续处理和 gRPC 协议一致
  ......
}

1.5 - 组件实现

组件实现publish的实际功能

组件接口中的 Publish() 方法定义

在 dapr runtime API 实现(包括 HTTP API 和 gRPC API)和底层 pubsub 组件之间,还有一个简单的内部接口,定义了 pubsub 组件的功能:

// PubSub is the interface for message buses.
type PubSub interface {
	Init(metadata Metadata) error
	Features() []Feature
	Publish(req *PublishRequest) error
	Subscribe(ctx context.Context, req SubscribeRequest, handler Handler) error
	Close() error
}

其中的 Publish() 用来发送消息。请求参数 PublishRequest 的字段和 Dapr API 定义中保持一致:

// PublishRequest is the request to publish a message.
type PublishRequest struct {
	Data        []byte            `json:"data"`
	PubsubName  string            `json:"pubsubname"`
	Topic       string            `json:"topic"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

redis 组件实现

以 redis stream 为例,看看 publish 方法的实现:

func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
	_, err := r.client.XAdd(r.ctx, &redis.XAddArgs{
		Stream:       req.Topic,
		MaxLenApprox: r.metadata.maxLenApprox,
		Values:       map[string]interface{}{"data": req.Data},
	}).Result()
	if err != nil {
		return fmt.Errorf("redis streams: error from publish: %s", err)
	}

	return nil
}

redis stream 的实现很简单,req.Topic 参数指定要写入的 redis stream,内容为一个map,其中 key “data” 的值为 req.Data。

2 - 订阅主流程

订阅的主流程分析

2.1 - 流程概述

Dapr订阅的流程和API概述

API 和端口

订阅流程实际包含三个子流程:

  1. 获取应用订阅消息

    daprd 需要获知应用的订阅信息。

    实现中,dapr 会要求应用收集订阅信息并通过指定方式暴露(SDK 可以提供帮助),以便 daprd 可以通过给应用发送请求来获取这些订阅信息。

  2. 执行消息订阅

    Daprd 在拿到应用的订阅信息之后,就可以使用底层组件的订阅机制进行消息订阅。

  3. 转发消息给应用

    daprd 收到来自底层组件的订阅的消息之后,需要将消息转发给应用。

以上子流程1和3都需要 daprd 主动访问应用,因此 dapr 需要获知应用在哪个端口监听并处理订阅请求,这个信息通过命令行参数 app-port 设置。Dapr 的示例中一般喜欢用 3000 端口。

gRPC API

gRPC API 定义在 dapr/proto/runtime/v1/appcallback.proto 文件中的 AppCallback service 中:

service AppCallback {
  // 子流程1:获取应用订阅消息
  rpc ListTopicSubscriptions(google.protobuf.Empty) returns (ListTopicSubscriptionsResponse) {}

  // 子流程3:转发消息给应用
  rpc OnTopicEvent(TopicEventRequest) returns (TopicEventResponse) {}
  ......
}

ListTopicSubscriptionsResponse 的定义:

message ListTopicSubscriptionsResponse {
  repeated common.v1.TopicSubscription subscriptions = 1;
}

message TopicSubscription {
  // pubsub的组件名
  string pubsub_name = 1;

  // 要订阅的topic
  string topic = 2;

  // 可选参数,后面展开
  map<string,string> metadata = 3;
  TopicRoutes routes = 5;
  string dead_letter_topic = 6;
}

即应用可以有多个消息订阅,每个订阅都必须提供 pubsub_name 和 topic 参数。

TopicEventRequest 的定义:

message TopicEventRequest {
  // 这几个参数先忽略
  string id = 1;
  string source = 2;
  string type = 3;
  string spec_version = 4;
  string path = 9;

  // 事件的基本信息
  string data_content_type = 5;
  bytes data = 7;
  string topic = 6;
  string pubsub_name = 8;
}

HTTP API

发布流程

HTTP 协议

title Subscribe via http
hide footbox
skinparam style strictuml
box "App-1"
participant user_code [
    =App-1
    ----
    producer
]
participant SDK [
    =SDK
    ----
    producer
]
end box
participant daprd [
    =daprd
    ----
    producer
]
participant message_broker as "Message Broker"

SDK -> user_code: collection subscribe
user_code --> SDK

daprd -[#blue]> SDK : http
note left: appChannel.InvokeMethod("dapr/subscribe")
SDK --[#blue]> daprd : 

daprd -[#red]> message_broker : subscribe topics
message_broker --[#red]> daprd

|||
|||
|||
|||

message_broker -[#red]> daprd: event
daprd -[#blue]> SDK : http
note left: appChannel.InvokeMethod("/{route}")
SDK -> user_code : 
user_code --> SDK
SDK --[#blue]> daprd
|||

gRPC 方式

title Subscribe via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code [
    =App-1
    ----
    producer
]
participant SDK [
    =SDK
    ----
    producer
]
end box
participant daprd [
    =daprd
    ----
    producer
]
participant message_broker as "Message Broker"

SDK -> user_code: collection subscribe
user_code --> SDK

daprd -[#blue]> SDK : gRPC
note left: appChannel.ListTopicSubscriptions()
SDK --[#blue]> daprd : 

daprd -[#red]> message_broker : subscribe topics
message_broker --[#red]> daprd

|||
|||
|||
|||

message_broker -[#red]> daprd: event
daprd -[#blue]> SDK : gRPC
note left: appChannel.OnTopicEvent()
SDK -> user_code : 
user_code --> SDK
SDK --[#blue]> daprd
|||

2.2 - 订阅相关的Runtime初始化

Dapr Runtime中和订阅相关的初始化流程

在 dapr runtime 启动进行初始化时,需要

  • 访问应用以获取应用的订阅信息:比如应用订阅了哪些topic
  • 根据配置文件启动 subscribe component 以便连接到外部 message broker 进行订阅
  • 将订阅更新的 event 转发给应用

Dapr runtime初始化component列表

dapr runtime 初始化时会创建和 app 的连接,称为 app channel,然后开始发布订阅的初始化:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	......
    // 有一个单独的 go routine 负责处理 component 的初始化
    go a.processComponents()
    err = a.loadComponents(opts)
    
	// 等待应用ready: 前提是设置了 app port
	a.blockUntilAppIsReady()

	// 创建 app channel
	err = a.createAppChannel()
    // app channel 支持 http 和 grpc
	a.daprHTTPAPI.SetAppChannel(a.appChannel)
	grpcAPI.SetAppChannel(a.appChannel)
    ......
    
    // 开始发布订阅的初始化
    a.startSubscribing()
}

这里有一段复杂的并行初始化components并处理相互依赖的逻辑,忽略这些细节,只看执行 component 初始化的代码:

func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
	switch category {
	case pubsubComponent:
		return a.initPubSub(comp)
	......
	}
	return nil
}

func (a *DaprRuntime) initPubSub(c components_v1alpha1.Component) error {
	pubSub, err := a.pubSubRegistry.Create(c.Spec.Type, c.Spec.Version)

    // 初始化 pubSub component
	err = pubSub.Init(pubsub.Metadata{
		Properties: properties,
	})

	pubsubName := c.ObjectMeta.Name
	a.pubSubs[pubsubName] = pubSub
	return nil
}

这个执行完成之后,a.pubSubs 中便保存有当前配置并初始化好的 pubsub 组件列表。

pubsub组件启动

订阅的初始化在 dapr runtime 启动过程的最后阶段

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    ......
    // 开始发布订阅的初始化
    a.startSubscribing()
}

startSubscribing() 方法逐个处理 pubSub 组件:

func (a *DaprRuntime) startSubscribing() {
	for name, pubsub := range a.pubSubs {
		if err := a.beginPubSub(name, pubsub); err != nil {
			log.Errorf("error occurred while beginning pubsub %s: %s", name, err)
		}
	}
}

beginPubSub 方法做了两个事情: 1. 获取应用的订阅信息 2. 让组件开始订阅

func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {
	var publishFunc func(ctx context.Context, msg *pubsubSubscribedMessage) error
    ......
	topicRoutes, err := a.getTopicRoutes()
    ......
}

获取应用订阅信息(AppCallback)

在 getTopicRoutes() 方法中,可以通过 HTTP 或者 gRPC 的方式来获取应用订阅信息:

func (a *DaprRuntime) getTopicRoutes() (map[string]TopicRoute, error) {
    ......
    if a.runtimeConfig.ApplicationProtocol == HTTPProtocol {
        // 走 http channel
		subscriptions, err = runtime_pubsub.GetSubscriptionsHTTP(a.appChannel, log)
	} else if a.runtimeConfig.ApplicationProtocol == GRPCProtocol {
        // 走 grpc channel
		client := runtimev1pb.NewAppCallbackClient(a.grpc.AppClient)
		subscriptions, err = runtime_pubsub.GetSubscriptionsGRPC(client, log)
	}
    ......
}

对于 HTTP 方式,调用的是 AppChannel 上定义的 InvokeMethod 方法,这个方法原来设计是用来实现 service invoke 的,dapr runtime 用来通过它将 service invoke 的 http inbound 请求转发给作为服务器端的应用。而在这里,被用来调用 dapr/subscribe 路径:

func GetSubscriptionsHTTP(channel channel.AppChannel, log logger.Logger) ([]Subscription, error) {
    req := invokev1.NewInvokeMethodRequest("dapr/subscribe")
    channel.InvokeMethod(ctx, req)
    ......
}

感想:理论上说这也不是为一种方便的方式,只是总感觉有点怪怪,pubsub 模块的初始化用到了 service invoke 模块的功能。直接发个http请求代码也不复杂。另外 http AppChannel / app callback 的方法和 grpc AppChannel / app callback 不对称,这在设计上缺乏美感。

对于 gRPC 方式,就比较老实的调用了 gRPC AppCallbackClient 的方法 ListTopicSubscriptions():

resp, err = channel.ListTopicSubscriptions(context.Background(), &emptypb.Empty{})

pubsub 组件开始订阅

在获取到应用的订阅信息之后,dapr runtime 就知道这个应用需要订阅哪些topic了。因此就可以继续开始订阅操作:

func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {
	var publishFunc func(ctx context.Context, msg *pubsubSubscribedMessage) error
    ......
    // 获取订阅信息
	topicRoutes, err := a.getTopicRoutes()
    ......
    // 开始订阅
    for topic, route := range v.routes {
        // 在当前 pubsub 组件上为每个 topic 进行订阅
        err := ps.Subscribe(pubsub.SubscribeRequest{
			Topic:    topic,
			Metadata: route.metadata,
        }, func(ctx context.Context, msg *pubsub.NewMessage) error {......}
    }
}

这里的 Subscribe() 方法的定义在 PubSub 接口上,每个 dapr pubsub 组件都会实现这个接口:

type PubSub interface {
	Publish(req *PublishRequest) error
	Subscribe(req SubscribeRequest, handler Handler) error
}

handler 方法的具体实现后面再展开。

2.3 - 客户端sdk为dapr提供订阅信息

Dapr客户端sdk封装dapr api,接受dapr发出的ListTopicSubscriptions请求

工作原理

对于订阅信息而言,有四个关键的信息。在 dapr proto 中的定义如下:

message TopicSubscription {
  // Required. The name of the pubsub containing the topic below to subscribe to.
  string pubsub_name = 1;

  // Required. The name of topic which will be subscribed
  string topic = 2;

  // The optional properties used for this topic's subscription e.g. session id
  map<string,string> metadata = 3;

  // The optional routing rules to match against. In the gRPC interface, OnTopicEvent
  // is still invoked but the matching path is sent in the TopicEventRequest.
  TopicRoutes routes = 5;
}

pubsub_name 指定要使用的 pubsub component,topic 是要订阅的主题, metadata 携带扩展信息,而 routes 路由则是标记 dapr 应该如何将订阅到的事件发送给应用。

TODO:对于 HTTP 协议和 gRPC 协议处理会有不同。

java sdk中的封装如下:

public class DaprTopicSubscription {
  private final String pubsubName;
  private final String topic;
  private final String route;
  private final Map<String, String> metadata;
}

dapr sdk 需要帮助应用方便的提供上述订阅信息。

Java SDK 实现

在业务代码中使用 subscribe 功能的示例可参考文件 dapr java-sdk 中的代码 /src/main/java/io/dapr/examples/pubsub/http/subscribe.java,代码示意如下:

// 启动应用,监听端口,一般喜欢使用 3000
public static void main(String[] args) throws Exception {
    ......
	DaprApplication.start(port); 
}

@RestController
public class SubscriberController {
  @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
  @PostMapping(path = "/testingtopic")
  public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
      ......
  }
}

sdk收集订阅信息

上面代码中的 @Topic 注解是 dapr java sdk 提供的,用来标记需要进行 subscribe 的 topic,代码在src/main/java/io/dapr/Topic.java

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Topic {
    String name();
    String pubsubName();
    String metadata() default "{}";
}

topic 的收集是典型的 springboot 风格,代码在 sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java:

@Component
public class DaprBeanPostProcessor implements BeanPostProcessor {
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    subscribeToTopics(bean.getClass(), embeddedValueResolver);
    return bean;
  }
}

subscribeToTopics() 方法通过扫描 @topic 注解和 @PostMapping 注解来获取订阅相关的信息:

private static void subscribeToTopics(Class clazz, EmbeddedValueResolver embeddedValueResolver) {

    for (Method method : clazz.getDeclaredMethods()) {
      // 获取 @topic 注解
      Topic topic = method.getAnnotation(Topic.class);
      if (topic == null) {
        continue;
      }

      String route = topic.name();
      // 获取 @PostMapping 注解
      PostMapping mapping = method.getAnnotation(PostMapping.class);

      // 根据 PostMapping 注解获取 route 信息
      if (mapping != null && mapping.path() != null && mapping.path().length >= 1) {
        route = mapping.path()[0];
      } else if (mapping != null && mapping.value() != null && mapping.value().length >= 1) {
        route = mapping.value()[0];
      }

      String topicName = embeddedValueResolver.resolveStringValue(topic.name());
      String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
      if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
        try {
          TypeReference<HashMap<String, String>> typeRef
                  = new TypeReference<HashMap<String, String>>() {};
          Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
          // 保存 subscribe 信息
          DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route, metadata);
        } catch (JsonProcessingException e) {
          throw new IllegalArgumentException("Error while parsing metadata: " + e.toString());
        }
      }
    }
  }

DaprRuntime 是一个单例对象,这里保存有订阅的 topic 列表:

class DaprRuntime {
    private final Set<String> subscribedTopics = new HashSet<>();
    private final List<DaprTopicSubscription> subscriptions = new ArrayList<>();
    
    public synchronized void addSubscribedTopic(String pubsubName,
                                                String topicName,
                                                String route,
                                                Map<String,String> metadata) {
        if (!this.subscribedTopics.contains(topicName)) {
            this.subscribedTopics.add(topicName);
            this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata));
        }
    }
}

sdk暴露订阅信息

为了让 dapr 在 springboot 体系中方便使用,dapr java sdk 提供了 DaprController ,以提供诸如健康检查等通用功能,还有和dapr相关的各种端点,其中就有为 dapr runtime 提供订阅信息的接口:

@RestController
public class DaprController {
  ......
  @GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE)
  public byte[] daprSubscribe() throws IOException {
    return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
  }
}

通过这个URL,就可以将之前收集到的 topic 信息都暴露出去,可以在浏览器中直接访问 http://127.0.0.1:3000/dapr/subscribe,应答内容为:

[{"pubsubName":"messagebus","topic":"testingtopic","route":"/testingtopic","metadata":{}}]

Go sdk实现

在 go 业务代码中使用 subscribe 功能的示例可参考 https://github.com/dapr/go-sdk/blob/main/examples/pubsub/sub/sub.go,代码示意如下:

func main() {
    s := daprd.NewService(":8080")
    err := s.AddTopicEventHandler(defaultSubscription, eventHandler)
    err = s.Start()
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
	......
	return false, nil
}

sdk收集订阅信息

Go sdk 中定义了 Service 接口

// Service represents Dapr callback service.
type Service interface {
	// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
	// Note, retries are only considered when there is an error. Lack of error is considered as a success
	AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
	......
}

Subscription 的定义如下:

// Subscription represents single topic subscription.
type Subscription struct {
	PubsubName string `json:"pubsubname"`
	Topic string `json:"topic"`
	Metadata map[string]string `json:"metadata,omitempty"`
	Route string `json:"route"`
	......
}

这样订阅相关的主要4个参数就通过这个方式指明了。

sdk暴露订阅信息

go sdk 中有 http 和 grpc 两套机制可以实现对外暴露访问端点。

http 的实现在 http/topic.go 中:

func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
	if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil {
		return err
	}

    // 注册 http handle,关联 Route 和 fn
	s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc(
		func(w http.ResponseWriter, r *http.Request) {
            ......
            retry, err := fn(r.Context(), &te)
            ......
        }
    }

grpc类似。

其他SDK

TODO