Naming的Go代码实现
备注: 由于 naming package 在grpc-go v1.30.0 版本之后被删除,所以源码实现以最后一个版本 v1.29.1 为准。
Naming的定义
代码在 naming/naming.go
中。
数据结构定义
Update struct 定义命名解析的更新。Addr和metadata在同一个update中不能同时为空。
// Update defines a name resolution update. Notice that it is not valid having both
// empty string Addr and nil Metadata in an Update.
//
// Deprecated: please use package resolver.
type Update struct {
// Op indicates the operation of the update.
Op Operation
// Addr is the updated address. It is empty string if there is no address update.
Addr string
// Metadata is the updated metadata. It is nil if there is no metadata update.
// Metadata is not required for a custom naming implementation.
Metadata interface{}
}
其中 operation 定义了用于命名解析变更的对应操作:
// Operation defines the corresponding operations for a name resolution change.
//
// Deprecated: please use package resolver.
type Operation uint8
const (
// Add indicates a new address is added.
Add Operation = iota
// Delete indicates an existing address is deleted.
Delete
)
接口定义
Resolver 接口创建 Watcher,用于跟踪指定目标的解析变更。
// Resolver creates a Watcher for a target to track its resolution changes.
//
// Deprecated: please use package resolver.
type Resolver interface {
// Resolve creates a Watcher for target.
Resolve(target string) (Watcher, error)
}
Watcher 接口用于监控特定目标的更新:
// Watcher watches for the updates on the specified target.
//
// Deprecated: please use package resolver.
type Watcher interface {
// Next 方法阻塞直至更新或者错误发生。
// 可能返回一个或者多个更新。
// 第一次调用因该得到结果的全集。
// 当且仅当 Watcher 无法恢复时返回错误。
Next() ([]*Update, error)
// Close closes the Watcher.
Close()
}
dns resolver的实现
gRPC 只内置了这一个实现: naming/dns_resolver.go
dns resolver的结构
dnsResolver 处理遵循dns模式的名称的命名解析。
type dnsResolver struct {
// 该解析器创建的 watcher 将使用的 DNS 服务器的轮询频率。
freq time.Duration
}
构建dns resolver
const (
// 默认频率是30分钟。
defaultFreq = time.Minute * 30
)
// NewDNSResolverWithFreq 创建 DNS Resolver,用于解析 DNS 名称,并创建watcher,使用 freq 参数设置的频率来查询DNS 服务器
func NewDNSResolverWithFreq(freq time.Duration) (Resolver, error) {
return &dnsResolver{freq: freq}, nil
}
// NewDNSResolverWithFreq 创建 DNS Resolver,用于解析 DNS 名称,并创建watcher,使用默认的频率来查询DNS 服务器
func NewDNSResolver() (Resolver, error) {
return NewDNSResolverWithFreq(defaultFreq)
}
解析dns
Resolve 方法创建watcher,watcher用来监控目标的命名解析。
func (r *dnsResolver) Resolve(target string) (Watcher, error) {
host, port, err := parseTarget(target)
if err != nil {
return nil, err
}
// 尝试一下检测host是不是一个IP地址
if net.ParseIP(host) != nil {
// 如果是IP地址,不用做dns解析,走 ipWatcher
ipWatcher := &ipWatcher{
updateChan: make(chan *Update, 1),
}
host, _ = formatIP(host)
ipWatcher.updateChan <- &Update{Op: Add, Addr: host + ":" + port}
return ipWatcher, nil
}
// 如果host不是IP地址,则走 dnsWatcher
ctx, cancel := context.WithCancel(context.Background())
return &dnsWatcher{
r: r,
host: host,
port: port,
ctx: ctx,
cancel: cancel,
t: time.NewTimer(0),
}, nil
}
细节代码:parseTarget 处理用户输入的 target 字符串,返回格式化后的 host 和 port 信息。
// parseTarget takes the user input target string, returns formatted host and port info.
// If target doesn't specify a port, set the port to be the defaultPort.
// If target is in IPv6 format and host-name is enclosed in square brackets, brackets
// are stripped when setting the host.
// examples:
// target: "www.google.com" returns host: "www.google.com", port: "443"
// target: "ipv4-host:80" returns host: "ipv4-host", port: "80"
// target: "[ipv6-host]" returns host: "ipv6-host", port: "443"
// target: ":80" returns host: "localhost", port: "80"
// target: ":" returns host: "localhost", port: "443"
func parseTarget(target string) (host, port string, err error) {
if target == "" {
return "", "", errMissingAddr
}
if ip := net.ParseIP(target); ip != nil {
// target is an IPv4 or IPv6(without brackets) address
return target, defaultPort, nil
}
if host, port, err := net.SplitHostPort(target); err == nil {
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {
// Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed.
host = "localhost"
}
if port == "" {
// If the port field is empty(target ends with colon), e.g. "[::1]:", defaultPort is used.
port = defaultPort
}
return host, port, nil
}
if host, port, err := net.SplitHostPort(target + ":" + defaultPort); err == nil {
// target doesn't have port
return host, port, nil
}
return "", "", fmt.Errorf("invalid target address %v", target)
}
细节代码:formatIP方法,如果addr不是一个有效的IP地址文本表示方式,则返回 ok=false。如果addr是IPv4地址,则返回addr和ok = true。如果addr是ipv6地址,则返回包含在"[]“中的addr和ok=true。
func formatIP(addr string) (addrIP string, ok bool) {
ip := net.ParseIP(addr)
if ip == nil {
return "", false
}
if ip.To4() != nil {
return addr, true
}
return "[" + addr + "]", true
}
ipWatcher的实现
ipWatcher 监控IP地址的命名解析更新
type ipWatcher struct {
updateChan chan *Update
}
Next方法 返回目标的地址解析更新。对于IP地址,解析结果是IP地址自身,因此不需要轮询 name server。因此,Next()在第一次调用时将返回一个Update,在关闭Watcher之前,由于不存在Update,因此后续的所有调用都会被阻塞。
func (i *ipWatcher) Next() ([]*Update, error) {
u, ok := <-i.updateChan
if !ok {
return nil, errWatcherClose
}
return []*Update{u}, nil
}
// Close closes the ipWatcher.
func (i *ipWatcher) Close() {
close(i.updateChan)
}
结合 Resolve() 的调用一起看:
if net.ParseIP(host) != nil {
ipWatcher := &ipWatcher{
// 构建update channel,缓存大小为1
updateChan: make(chan *Update, 1),
}
// 格式化IP地址
host, _ = formatIP(host)
// 往 updateChan 中发一个Update:操作为Add,Addr为IP地址+端口
ipWatcher.updateChan <- &Update{Op: Add, Addr: host + ":" + port}
return ipWatcher, nil
}
dnsWatcher的实现
// dnsWatcher watches for the name resolution update for a specific target
type dnsWatcher struct {
r *dnsResolver
host string
port string
// The latest resolved address set
curAddrs map[string]*Update
ctx context.Context
cancel context.CancelFunc
t *time.Timer
}
Next 方法返回被解析的目标地址(增量)更新。如果没有变化,则默认为sleep30分钟然后尝试再次解析。
func (w *dnsWatcher) Next() ([]*Update, error) {
for {
select {
case <-w.ctx.Done():
return nil, errWatcherClose
case <-w.t.C:
}
result := w.lookup()
// 下一次 lookup 应该在 w.r.freq 定义的间隔之后发生
w.t.Reset(w.r.freq)
if len(result) > 0 {
return result, nil
}
}
}
func (w *dnsWatcher) Close() {
w.cancel()
}
实现细节:lookup()方法先查查 SRV 记录,找不到再查 A 记录。
func (w *dnsWatcher) lookup() []*Update {
newAddrs := w.lookupSRV()
if newAddrs == nil {
// If failed to get any balancer address (either no corresponding SRV for the
// target, or caused by failure during resolution/parsing of the balancer target),
// return any A record info available.
newAddrs = w.lookupHost()
}
result := w.compileUpdate(newAddrs)
w.curAddrs = newAddrs
return result
}
SVR 记录的查询
查询名为 “grpclb” 的 SRV 记录:
func (w *dnsWatcher) lookupSRV() map[string]*Update {
newAddrs := make(map[string]*Update)
_, srvs, err := lookupSRV(w.ctx, "grpclb", "tcp", w.host)
if err != nil {
grpclog.Infof("grpc: failed dns SRV record lookup due to %v.\n", err)
return nil
}
for _, s := range srvs {
lbAddrs, err := lookupHost(w.ctx, s.Target)
if err != nil {
grpclog.Warningf("grpc: failed load balancer address dns lookup due to %v.\n", err)
continue
}
for _, a := range lbAddrs {
a, ok := formatIP(a)
if !ok {
grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
continue
}
addr := a + ":" + strconv.Itoa(int(s.Port))
newAddrs[addr] = &Update{Addr: addr,
Metadata: AddrMetadataGRPCLB{AddrType: GRPCLB, ServerName: s.Target}}
}
}
return newAddrs
}
A记录的查询
func (w *dnsWatcher) lookupHost() map[string]*Update {
newAddrs := make(map[string]*Update)
addrs, err := lookupHost(w.ctx, w.host)
if err != nil {
grpclog.Warningf("grpc: failed dns A record lookup due to %v.\n", err)
return nil
}
for _, a := range addrs {
a, ok := formatIP(a)
if !ok {
grpclog.Errorf("grpc: failed IP parsing due to %v.\n", err)
continue
}
addr := a + ":" + w.port
newAddrs[addr] = &Update{Addr: addr}
}
return newAddrs
}