gRPC学习笔记
- 1: gRPC介绍
- 2: nameresolver介绍
- 2.1: 设计
- 2.1.1: 名称解析的设计及文档
- 2.1.2: 文档:gRPC名称解析
- 2.1.3: 文档:gRPC服务配置
- 2.1.4: 提案:service configs via dns
- 2.2: naming
- 2.2.1: Naming Package
- 2.2.2: Naming的Go代码实现
- 2.3: resolver
- 2.3.1: Resolver Package
- 2.3.2: Resolver的Go代码实现
- 2.4: Name Resolver
1 - gRPC介绍
1.1 - gRPC介绍
1.2 - 资料收集
网站
- grpc官网
- grpc-java gRPC Java实现.
- javadoc: grpc java 的javadoc地址
- grpc google groups
- grpc-ecosystem
文档
- grpc-common 是官方提供的文档和例子, 但是内容实际是指向下面的grpc.io上的Documentation.
- Documentation@grpc.io 是grpc.io提供的文档,这个适合入门
Documentation@grpc.io中内容比较重要:
注:开源中国组织人手翻译这份文档,gRPC 官方文档中文版
Demo
- grpc-android-demo: andriod的demo
- grpc-streaming-demo: A quick demo of bi-directional streaming RPC’s using grpc, go and python
- yeyincai/grpc-demo: introduces using grpc about encryption、stream、oneof、interceptor、loadbalance demo http://blog.csdn.net/yeyincai
工具
- grpc-tools: Tools useful with gRPC libraries, provided by grpc
项目
- kafka-pixy: gRPC/REST proxy for Kafka
- grpc-experiments: Experiments and proposals for gRPC features.
- grpc-gateway: gRPC to JSON proxy generator
- LogNet/grpc-spring-boot-starter: Spring Boot starter module for gRPC framework.
- grpc-opentracing: OpenTracing is a set of consistent, expressive, vendor-neutral APIs for distributed tracing and context propagation
周边项目
-
grpc-gateway: 是一个基于go语言的项目.
grpc-gateway是protoc的插件. 它读取gRPC 服务定义, 然后生成一个反向代理服务器, 将RESTful JSON API转为gRPC.
用于帮助为API同时提供gRPC 和 RESTful接口.
这个工具似乎不错,对于某些需要提供restul接口场合可以快速的在grpc接口上转换出来.
-
grpc-docker-library: 包含官方gRPC Docker镜像的Git仓库
-
grpc-spring-boot-starter: 介绍 gRPC Spring Boot Starter - SprintBoot 的 gRPC 模块
2 - nameresolver介绍
2.1 - 设计
2.1.1 - 名称解析的设计及文档
名称解析
- 文档:gRPC服务配置
服务配置
-
文档:gRPC服务配置
-
Proposal:Service Configs in DNS: 关于此提案的讨论在 https://groups.google.com/g/grpc-io/c/DkweyrWEXxU/discussion
-
service_config.proto: 超长内容,xds的配置也在里面
2.1.2 - 文档:gRPC名称解析
概述
gRPC支持DNS作为默认的名称系统。在不同的部署中,有许多替代的名称系统被使用。我们支持一个足够通用的API,以支持一系列的名称系统和相应的名称语法。各种语言的 gRPC 客户端库将提供一个插件机制,因此可以插入不同名称系统的解析器。
详细设计
名称语法
用于 gRPC channel 构建的完全限定的自包含名称,使用 RFC 3986中定义的URI语法。
URI schema 指示要使用的解析器插件,如果没有指定schema前缀或schema未知,默认使用dns方案。
URI路径表示要解析的名称。
大多数 gRPC 实现都支持以下URI schema:
-
dns:[//authority/]host[:port]
– DNS (默认)host
是要通过DNS解析的主机名port
是要为每个地址返回的端口,如果没有指定,则使用443 (但是某些实现对于非加密channel默认使用80)authority
表示要使用的 DNS 服务器,尽管这只被一些实现所支持。在C-core中,默认的DNS解析器不支持这个功能,但基于c-ares的解析器支持以 “IP:port “的形式指定这个功能)。
-
unix:path
orunix://absolute_path
– Unix domain sockets (Unix systems only)path
表示所需socket的位置。- 在第一种形式中,路径可以是相对的,也可以是绝对的;在第二种形式中,路径必须是绝对的(即实际上会有三个斜线,两个在path之前,另一个用来开始绝对路径)。
gRPC C-core实现支持以下方案,但其他语言可能不支持:
-
ipv4:address[:port][,address[:port],...]
– IPv4 addresses-
可以指定多个逗号分隔的地址,地址形式为
address[:port]
address
是使用的 IPv4 地址port
是使用的端口。如果没有指定,使用443
-
-
ipv6:address[:port][,address[:port],...]
– IPv6 addresses- 可以指定多个逗号分隔的地址,地址形式为
address[:port]
address
是使用的 IPv6 地址. 要使用port
则地址address
必须用中括号 ([
and]
)包起来. 例如:ipv6:[2607:f8b0:400e:c00::ef]:443
oripv6:[::]:1234
port
是使用的端口。如果没有指定,使用443
- 可以指定多个逗号分隔的地址,地址形式为
今后还可以增加其他schema,如 “etcd”。
Resolver 插件
gRPC客户端类库将使用指定的schema来挑选合适的解析器插件,并将完全限定的名称字符串传递给它。
解析器应该能够联系权威机构(authority)并得到解析,然后将其返回给gRPC客户端类库。返回的内容包括:
-
解析的地址列表(包括IP地址和端口)。每个地址可以有一组与之相关的任意属性(键/值对),这些属性可用于从解析器向负载均衡策略传递信息。
-
service config
插件API允许解析器持续观察一个端点,并根据需要返回更新的解析。
2.1.3 - 文档:gRPC服务配置
https://github.com/grpc/grpc/blob/master/doc/service_config.md
目标
服务配置是一种机制,它允许服务所有者发布参数,让其服务的所有客户端自动使用。
格式
服务配置的格式由 grpc.service_config.ServiceConfig
protocol buffer
message 定义。请注意,随着新功能的引入,未来可能会添加新的字段。
架构
服务配置与服务器名相关联。名称解析器插件在被要求解析某个服务器名称时,会同时返回解析的地址和服务配置。
名称解析器以 JSON 形式将服务配置返回给 gRPC 客户端。各个解析器实现决定服务配置的存储位置和格式。如果解析器实现以 protobuf 形式获取服务配置,则必须使用正常的 protobuf 到 JSON 的转换规则将其转换为 JSON。另外,解析器实现也可以以 JSON 形式获取服务配置,在这种情况下,它可以直接返回服务配置。
有关 DNS 解析器插件如何支持服务配置的详情,请参见 gRFC A2: Service Config via DNS.
例子
下面是一个protobuf形式的服务配置示例:
{
// 使用 round_robin 负载均衡策略
load_balancing_config: { round_robin: {} }
// 这个方法配置适用于 "foo/bar" 方法和 service "baz"的所有方法
method_config: {
name: {
service: "foo"
method: "bar"
}
name: {
service: "baz"
}
// 匹配方法的默认超时
timeout: {
seconds: 1
nanos: 1
}
}
}
下面是同样的JSON形式的服务配置示例:
{
"loadBalancingConfig": [ { "round_robin": {} } ],
"methodConfig": [
{
"name": [
{ "service": "foo", "method": "bar" },
{ "service": "baz" }
],
"timeout": "1.0000000001s"
}
]
}
API
服务配置在以下API中使用:
- 在 resolver API中,用于解析器插件,以便将服务配置返回给gRPC客户端
- 在 gRPC 客户端 API 中,用户可以查询 channel 以获取与通道相关的服务配置(用于调试)。
- 在 gRPC 客户端 API 中,用户可以显式地设置服务配置。这可以用来在单元测试中设置配置。它还可以用来设置默认配置,如果解析器插件没有返回服务配置,就会使用该配置。
2.1.4 - 提案:service configs via dns
https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md
摘要
本文档提出了一种在DNS中编码gRPC服务配置数据的机制,供开源世界使用。
背景
服务配置机制最初是为Google内部使用而设计的。然而,除了一部分之外,所有原始设计在开源世界中都能正常工作。这一部分就是服务配置数据在DNS中如何编码的规范。这个提案填补了这个缺失的部分。
提案
这个提案有两个部分。第一部分是增加一些JSON包装,用于控制服务配置的更改如何进行金丝雀测试。第二部分是描述服务配置如何在DNS中进行编码。
Canarying Changes
金丝雀变更
当部署对服务配置的变更时,通过缓慢增加看到新版本的客户端数量,能够对更改进行金丝雀测试以避免大范围的中断,这是非常有用的。为此,可以按顺序列出多个服务配置的选择,以及决定特定客户机将选择哪个的标准:
// A list of one or more service config choices.
// The first matching entry wins.
[
{
// Criteria used to select this choice.
// If a field is absent or empty, it matches all clients.
// All fields must match a client for this choice to be selected.
// If any unexpected field name is present in this object, the entire
// config is considered invalid.
//
// Client language(s): a list of strings (e.g., "c++", "java", "go",
// "python", etc). Each string is case insensitive.
"clientLanguage": [string],
// Percentage: integer from 0 to 100 indicating the percentage of
// clients that should use this choice. If present, the number must
// match the regular expression `^0|[0-9]|[1-9][0-9]|100$`
// All other numbers are considered invalid.
"percentage": number,
// Client hostname(s): a list of strings. Each name is case
// sensitive and must be an exact match of the hostname according to
// the system.
"clientHostname": [string],
// The service config data object for clients that match the above
// criteria. (The format for this object is defined in
// https://github.com/grpc/grpc/blob/master/doc/service_config.md.)
// If this field is not an object, or is missing, or is otherwise
// invalid, the entire config is considered invalid.
"serviceConfig": object
}
]
如果服务配置选择不能被解析,或者在其他方面语义上无效,整个配置必须按照 Service Config Error Handling 丢弃。
在DNS TXT记录中编码
在DNS中,服务配置数据(以上一节中记录的形式)将通过RFC-1464中描述的机制,使用属性名 grpc_config 在TXT记录中进行编码。属性值将是一个包含服务配置选择的JSON列表。TXT记录将是一个与gRPC服务器名称相同的DNS名称,但前缀为 _grpc_config.
……
例如,这里是服务器 myserver 的 TXT 记录示例:
_grpc_config.myserver 3600 TXT "grpc_config=[{\"serviceConfig\":{\"loadBalancingPolicy\":\"round_robin\",\"methodConfig\":[{\"name\":[{\"service\":\"MyService\",\"method\":\"Foo\"}],\"waitForReady\":true}]}}]"
请注意,根据RFC-1035第3.3节的规定,TXT记录每个字符串限制为255字节。然而,可以有多个字符串,它们将被连接在一起,如RFC-4408第3.1.3节所述。总的DNS响应不能超过65535字节。(更多讨论请参见下面的 “未解决的问题 “部分。)
需要注意的是,由于TXT记录必须是ASCII码,这也限制了服务配置的内容也是ASCII码(如服务和方法名称、负载均衡策略名称等)。
理由
服务配置被设计为作为名称解析的一部分而返回,所以在DNS中进行编码是最合理的。当然,使用 DNS 以外的其他命名系统的网站可以用自己的机制实现自己的解析器,对服务配置数据进行编码。
当在DNS中对服务配置进行编码时,TXT记录是 “显而易见 “的选择,因为服务配置实际上是与DNS名称相关联的附加元数据。
我们在DNS条目中使用 _grpc_config
前缀,允许为主记录为CNAME记录的服务指定服务配置,因为DNS不允许为包含CNAME记录的同一名称指定任何其他记录。
实现
在C-core中,作为c-ares DNS解析器的一部分,已经完成了实施。我们目前正在努力使c-ares解析器成为C-core的默认DNS解析器。这需要诸如Windows和Node的支持,以及增加地址排序。
未解决的问题(如果适用)
DNS TXT记录确实有一些限制,这里需要考虑到。尤其是
- 如果DNS响应超过512字节 就会从UDP退回到TCP,这就增加了开销
- DNS的总响应不能超过65535字节。
- 目前还不清楚各个DNS实现是否会允许接近65535字节,尽管规范中说应该允许。
请反馈这些考虑因素是否会成为本设计的重大缺点(在这种情况下,很可能要改变设计)。
2.2 - naming
2.2.1 - Naming Package
gRPC Naming Package
资料
golang相关:
-
导入 naming 的包: 在 naming 包被删除之后,这些包都会被影响
2.2.2 - Naming的Go代码实现
备注: 由于 naming package 在grpc-go v1.30.0 版本之后被删除,所以源码实现以最后一个版本 v1.29.1 为准。
Naming的定义
代码在 naming/naming.go
中。
数据结构定义
Update struct 定义命名解析的更新。Addr和metadata在同一个update中不能同时为空。
// Update defines a name resolution update. Notice that it is not valid having both
// empty string Addr and nil Metadata in an Update.
//
// Deprecated: please use package resolver.
type Update struct {
// Op indicates the operation of the update.
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
Metadata interface{}
}
其中 operation 定义了用于命名解析变更的对应操作:
// Operation defines the corresponding operations for a name resolution change.
//
// Deprecated: please use package resolver.
type Operation uint8
const (
// Add indicates a new address is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)
接口定义
Resolver 接口创建 Watcher,用于跟踪指定目标的解析变更。
// Resolver creates a Watcher for a target to track its resolution changes.
//
// Deprecated: please use package resolver.
type Resolver interface {
// Resolve creates a Watcher for target.
Resolve(target string) (Watcher, error)
}
Watcher 接口用于监控特定目标的更新:
// Watcher watches for the updates on the specified target.
//
// Deprecated: please use package resolver.
type Watcher interface {
// Next 方法阻塞直至更新或者错误发生。
// 可能返回一个或者多个更新。
// 第一次调用因该得到结果的全集。
// 当且仅当 Watcher 无法恢复时返回错误。
Next() ([]*Update, error)
// Close closes the Watcher.
Close()
}
dns resolver的实现
gRPC 只内置了这一个实现: naming/dns_resolver.go
dns resolver的结构
dnsResolver 处理遵循dns模式的名称的命名解析。
type dnsResolver struct {
// 该解析器创建的 watcher 将使用的 DNS 服务器的轮询频率。
freq time.Duration
}
构建dns resolver
const (
// 默认频率是30分钟。
defaultFreq = time.Minute * 30
)
// NewDNSResolverWithFreq 创建 DNS Resolver,用于解析 DNS 名称,并创建watcher,使用 freq 参数设置的频率来查询DNS 服务器
func NewDNSResolverWithFreq(freq time.Duration) (Resolver, error) {
return &dnsResolver{freq: freq}, nil
}
// NewDNSResolverWithFreq 创建 DNS Resolver,用于解析 DNS 名称,并创建watcher,使用默认的频率来查询DNS 服务器
func NewDNSResolver() (Resolver, error) {
return NewDNSResolverWithFreq(defaultFreq)
}
解析dns
Resolve 方法创建watcher,watcher用来监控目标的命名解析。
func (r *dnsResolver) Resolve(target string) (Watcher, error) {
host, port, err := parseTarget(target)
if err != nil {
return nil, err
}
// 尝试一下检测host是不是一个IP地址
if net.ParseIP(host) != nil {
// 如果是IP地址,不用做dns解析,走 ipWatcher
ipWatcher := &ipWatcher{
updateChan: make(chan *Update, 1),
}
host, _ = formatIP(host)
ipWatcher.updateChan <- &Update{Op: Add, Addr: host + ":" + port}
return ipWatcher, nil
}
// 如果host不是IP地址,则走 dnsWatcher
ctx, cancel := context.WithCancel(context.Background())
return &dnsWatcher{
r: r,
host: host,
port: port,
ctx: ctx,
cancel: cancel,
t: time.NewTimer(0),
}, nil
}
细节代码:parseTarget 处理用户输入的 target 字符串,返回格式化后的 host 和 port 信息。
// parseTarget takes the user input target string, returns formatted host and port info.
// If target doesn't specify a port, set the port to be the defaultPort.
// If target is in IPv6 format and host-name is enclosed in square brackets, brackets
// are stripped when setting the host.
// examples:
// target: "www.google.com" returns host: "www.google.com", port: "443"
// target: "ipv4-host:80" returns host: "ipv4-host", port: "80"
// target: "[ipv6-host]" returns host: "ipv6-host", port: "443"
// target: ":80" returns host: "localhost", port: "80"
// target: ":" returns host: "localhost", port: "443"
func parseTarget(target string) (host, port string, err error) {
if target == "" {
return "", "", errMissingAddr
}
if ip := net.ParseIP(target); ip != nil {
// target is an IPv4 or IPv6(without brackets) address
return target, defaultPort, nil
}
if host, port, err := net.SplitHostPort(target); err == nil {
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {
// Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed.
host = "localhost"
}
if port == "" {
// If the port field is empty(target ends with colon), e.g. "[::1]:", defaultPort is used.
port = defaultPort
}
return host, port, nil
}
if host, port, err := net.SplitHostPort(target + ":" + defaultPort); err == nil {
// target doesn't have port
return host, port, nil
}
return "", "", fmt.Errorf("invalid target address %v", target)
}
细节代码:formatIP方法,如果addr不是一个有效的IP地址文本表示方式,则返回 ok=false。如果addr是IPv4地址,则返回addr和ok = true。如果addr是ipv6地址,则返回包含在"[]“中的addr和ok=true。
func formatIP(addr string) (addrIP string, ok bool) {
ip := net.ParseIP(addr)
if ip == nil {
return "", false
}
if ip.To4() != nil {
return addr, true
}
return "[" + addr + "]", true
}
ipWatcher的实现
ipWatcher 监控IP地址的命名解析更新
type ipWatcher struct {
updateChan chan *Update
}
Next方法 返回目标的地址解析更新。对于IP地址,解析结果是IP地址自身,因此不需要轮询 name server。因此,Next()在第一次调用时将返回一个Update,在关闭Watcher之前,由于不存在Update,因此后续的所有调用都会被阻塞。
func (i *ipWatcher) Next() ([]*Update, error) {
u, ok := <-i.updateChan
if !ok {
return nil, errWatcherClose
}
return []*Update{u}, nil
}
// Close closes the ipWatcher.
func (i *ipWatcher) Close() {
close(i.updateChan)
}
结合 Resolve() 的调用一起看:
if net.ParseIP(host) != nil {
ipWatcher := &ipWatcher{
// 构建update channel,缓存大小为1
updateChan: make(chan *Update, 1),
}
// 格式化IP地址
host, _ = formatIP(host)
// 往 updateChan 中发一个Update:操作为Add,Addr为IP地址+端口
ipWatcher.updateChan <- &Update{Op: Add, Addr: host + ":" + port}
return ipWatcher, nil
}
dnsWatcher的实现
// dnsWatcher watches for the name resolution update for a specific target
type dnsWatcher struct {
r *dnsResolver
host string
port string
// The latest resolved address set
curAddrs map[string]*Update
ctx context.Context
cancel context.CancelFunc
t *time.Timer
}
Next 方法返回被解析的目标地址(增量)更新。如果没有变化,则默认为sleep30分钟然后尝试再次解析。
func (w *dnsWatcher) Next() ([]*Update, error) {
for {
select {
case <-w.ctx.Done():
return nil, errWatcherClose
case <-w.t.C:
}
result := w.lookup()
// 下一次 lookup 应该在 w.r.freq 定义的间隔之后发生
w.t.Reset(w.r.freq)
if len(result) > 0 {
return result, nil
}
}
}
func (w *dnsWatcher) Close() {
w.cancel()
}
实现细节:lookup()方法先查查 SRV 记录,找不到再查 A 记录。
func (w *dnsWatcher) lookup() []*Update {
newAddrs := w.lookupSRV()
if newAddrs == nil {
// If failed to get any balancer address (either no corresponding SRV for the
// target, or caused by failure during resolution/parsing of the balancer target),
// return any A record info available.
newAddrs = w.lookupHost()
}
result := w.compileUpdate(newAddrs)
w.curAddrs = newAddrs
return result
}
SVR 记录的查询
查询名为 “grpclb” 的 SRV 记录:
func (w *dnsWatcher) lookupSRV() map[string]*Update {
newAddrs := make(map[string]*Update)
_, srvs, err := lookupSRV(w.ctx, "grpclb", "tcp", w.host)
if err != nil {
grpclog.Infof("grpc: failed dns SRV record lookup due to %v.\n", err)
return nil
}
for _, s := range srvs {
lbAddrs, err := lookupHost(w.ctx, s.Target)
if err != nil {
grpclog.Warningf("grpc: failed load balancer address dns lookup due to %v.\n", err)
continue
}
for _, a := range lbAddrs {
a, ok := formatIP(a)
if !ok {
grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
continue
}
addr := a + ":" + strconv.Itoa(int(s.Port))
newAddrs[addr] = &Update{Addr: addr,
Metadata: AddrMetadataGRPCLB{AddrType: GRPCLB, ServerName: s.Target}}
}
}
return newAddrs
}
A记录的查询
func (w *dnsWatcher) lookupHost() map[string]*Update {
newAddrs := make(map[string]*Update)
addrs, err := lookupHost(w.ctx, w.host)
if err != nil {
grpclog.Warningf("grpc: failed dns A record lookup due to %v.\n", err)
return nil
}
for _, a := range addrs {
a, ok := formatIP(a)
if !ok {
grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
continue
}
addr := a + ":" + w.port
newAddrs[addr] = &Update{Addr: addr}
}
return newAddrs
}
2.3 - resolver
2.3.1 - Resolver Package
gRPC Resolver Package
2.3.2 - Resolver的Go代码实现
备注:以 v1.32.0 的代码为准
resolver 包定义
schema builder的注册
var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
)
// Register registers the resolver builder to the resolver map. b.Scheme will be
// used as the scheme registered with this builder.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Resolvers are
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
m[b.Scheme()] = b
}
// Get returns the resolver builder registered with the given scheme.
//
// If no builder is register with the scheme, nil will be returned.
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
默认schema
默认 schema 是 passthrough。
var (
// defaultScheme is the default scheme to use.
defaultScheme = "passthrough"
)
// SetDefaultScheme sets the default scheme that will be used. The default
// default scheme is "passthrough".
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. The scheme set last overrides
// previously set values.
func SetDefaultScheme(scheme string) {
defaultScheme = scheme
}
// GetDefaultScheme gets the default scheme that will be used.
func GetDefaultScheme() string {
return defaultScheme
}
Address 结构定义
Address 用于表示客户端连接到的一个服务器。
// This is the EXPERIMENTAL API and may be changed or extended in the future.
type Address struct {
// Addr is the server address on which a connection will be established.
Addr string
// ServerName is the name of this address.
// If non-empty, the ServerName is used as the transport certification authority for
// the address, instead of the hostname from the Dial target string. In most cases,
// this should not be set.
//
// If Type is GRPCLB, ServerName should be the name of the remote load
// balancer, not the name of the backend.
//
// WARNING: ServerName must only be populated with trusted values. It
// is insecure to populate it with data from untrusted inputs since untrusted
// values could be used to bypass the authority checks performed by TLS.
ServerName string
// Attributes contains arbitrary data about this address intended for
// consumption by the load balancing policy.
Attributes *attributes.Attributes
// Type is the type of this address.
//
// Deprecated: use Attributes instead.
Type AddressType
// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
//
// Deprecated: use Attributes instead.
Metadata interface{}
}
小心这个新的替代 naming 的 resolver 也还是 EXPERIMENTAL API,而且 Type 和 Metadata 这两个字段已经 Deprecated,改为使用 Attributes。
// Attributes是一个不可变的结构体,用于存储和检索通用键/值对。 key必须是可哈希的,用户应该为key定义自己的类型。
type Attributes struct {
m map[interface{}]interface{}
}
builder定义
Builder 创建 resolver, resolver将用于观察名称解析更新。
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
Target的说明:
// Target represents a target for gRPC, as specified in:
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// It is parsed from the target string that gets passed into Dial or DialContext by the user. And
// grpc passes it to the resolver and the balancer.
//
// If the target follows the naming spec, and the parsed scheme is registered with grpc, we will
// parse the target string according to the spec. e.g. "dns://some_authority/foo.bar" will be parsed
// into &Target{Scheme: "dns", Authority: "some_authority", Endpoint: "foo.bar"}
//
// If the target does not contain a scheme, we will apply the default scheme, and set the Target to
// be the full target string. e.g. "foo.bar" will be parsed into
// &Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "foo.bar"}.
//
// If the parsed scheme is not registered (i.e. no corresponding resolver available to resolve the
// endpoint), we set the Scheme to be the default scheme, and set the Endpoint to be the full target
// string. e.g. target string "unknown_scheme://authority/endpoint" will be parsed into
// &Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "unknown_scheme://authority/endpoint"}.
type Target struct {
Scheme string
Authority string
Endpoint string
}
ClientConn 包含用于 resolver 的 callback,以将更新通知到grpc ClientConn。
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately.
UpdateState(State)
// ReportError notifies the ClientConn that the Resolver encountered an
// error. The ClientConn will notify the load balancer and begin calling
// ResolveNow on the Resolver with exponential backoff.
ReportError(error)
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
//
// Deprecated: Use UpdateState instead.
NewAddress(addresses []Address)
// NewServiceConfig is called by resolver to notify ClientConn a new
// service config. The service config should be provided as a json string.
//
// Deprecated: Use UpdateState instead.
NewServiceConfig(serviceConfig string)
// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}
BuildOptions 包含builder用于创建resolver的额外信息。
type BuildOptions struct {
// DisableServiceConfig 指示resolver的实现是否要获取service config数据
DisableServiceConfig bool
// DialCreds is the transport credentials used by the ClientConn for
// communicating with the target gRPC service (set via
// WithTransportCredentials). In cases where a name resolution service
// requires the same credentials, the resolver may use this field. In most
// cases though, it is not appropriate, and this field may be ignored.
DialCreds credentials.TransportCredentials
// CredsBundle is the credentials bundle used by the ClientConn for
// communicating with the target gRPC service (set via
// WithCredentialsBundle). In cases where a name resolution service
// requires the same credentials, the resolver may use this field. In most
// cases though, it is not appropriate, and this field may be ignored.
CredsBundle credentials.Bundle
// Dialer is the custom dialer used by the ClientConn for dialling the
// target gRPC service (set via WithDialer). In cases where a name
// resolution service requires the same dialer, the resolver may use this
// field. In most cases though, it is not appropriate, and this field may
// be ignored.
Dialer func(context.Context, string) (net.Conn, error)
}
resolver定义
Resolver 监控指定目标的更新。更新包括地址更新和服务配置的更新。
type Resolver interface {
// ResolveNow 将被 gRPC 调用来尝试再次解析目标名称。
// 这仅仅是一个提示(hint),resolver 可以忽略它,如果没有必要。
// 可以并发的多次调用。
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}
// ResolveNowOptions includes additional information for ResolveNow.
type ResolveNowOptions struct{}
passthrough resolver代码实现
Package passthrough实现了一个直通式的解析器。它将不含scheme的目标名称作为解析地址发回给gRPC。
const scheme = "passthrough"
type passthroughBuilder struct{}
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &passthroughResolver{
target: target,
cc: cc,
}
r.start()
return r, nil
}
func (*passthroughBuilder) Scheme() string {
return scheme
}
start方法中就直接进行解析了:
func (r *passthroughResolver) start() {
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}
resolver 接口定义的方法都置空:
func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*passthroughResolver) Close() {}
package初始化时就自动注册
func init() {
resolver.Register(&passthroughBuilder{})
}
// 配合 passthrough.go
import _ "google.golang.org/grpc/internal/resolver/passthrough" // import for side effects after package was moved
manual resolverd代码实现
package manual 定义了一个解析器,可以用来手动发送解析地址到ClientConn。
用于测试目标,在构造时给出一个初始化状态,之后就直接用这个状态通知 ClientConn:
// InitialState adds initial state to the resolver so that UpdateState doesn't
// need to be explicitly called after Dial.
func (r *Resolver) InitialState(s resolver.State) {
r.bootstrapState = &s
}
// Build returns itself for Resolver, because it's both a builder and a resolver.
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.CC = cc
if r.bootstrapState != nil {
r.UpdateState(*r.bootstrapState)
}
return r, nil
}
dns resolver代码实现
dns resolver的builder
// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
host, port, err := parseTarget(target.Endpoint, defaultPort)
if err != nil {
return nil, err
}
// IP address.
if ipAddr, ok := formatIP(host); ok {
addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
cc.UpdateState(resolver.State{Addresses: addr})
return deadResolver{}, nil
}
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
d := &dnsResolver{
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
if target.Authority == "" {
d.resolver = defaultResolver
} else {
d.resolver, err = customAuthorityResolver(target.Authority)
if err != nil {
return nil, err
}
}
d.wg.Add(1)
go d.watcher()
d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}
dns resolver的结构体
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
// If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes
// will warns lookup (READ the lookup function pointers) inside watcher() goroutine
// has data race with replaceNetFunc (WRITE the lookup function pointers).
wg sync.WaitGroup
disableServiceConfig bool
}
resolver接口方法的实现
// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case d.rn <- struct{}{}:
default:
}
}
// Close closes the dnsResolver.
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
}
watcher的实现
func (d *dnsResolver) watcher() {
defer d.wg.Done()
for {
select {
case <-d.ctx.Done():
return
case <-d.rn:
}
state, err := d.lookup()
if err != nil {
d.cc.ReportError(err)
} else {
d.cc.UpdateState(*state)
}
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
t := time.NewTimer(minDNSResRate)
select {
case <-t.C:
case <-d.ctx.Done():
t.Stop()
return
}
}
}
2.4 - Name Resolver
gRPC Name Resolver