这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Fortio源码学习

学习 Fortio 的源代码

1 - main

Fortio的main函数

1.1 - main.go

Fortio的main.go实现

main函数入口

// nolint: funlen // well yes it's fairly big and lotsa ifs.
func main() {
	flag.Var(&proxiesFlags, "P",
		"Tcp proxies to run, e.g -P \"localport1 dest_host1:dest_port1\" -P \"[::1]:0 www.google.com:443\" ...")
	flag.Var(&httpMultiFlags, "M", "Http multi proxy to run, e.g -M \"localport1 baseDestURL1 baseDestURL2\" -M ...")
	bincommon.SharedMain(usage)
	if len(os.Args) < 2 {
		usageErr("Error: need at least 1 command parameter")
	}
  // 命令行传进来的第一个参数是fortio命令,其他参数是这个命令的参数
	command := os.Args[1]
	os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
	flag.Parse()
  // help命令
	if *bincommon.HelpFlag {
		usage(os.Stdout)
		os.Exit(0)
	}
  // quit 命令
	if *bincommon.QuietFlag {
		log.SetLogLevelQuiet(log.Error)
	}
	confDir := *bincommon.ConfigDirectoryFlag
	if confDir != "" {
		if _, err := configmap.Setup(flag.CommandLine, confDir); err != nil {
			log.Critf("Unable to watch config/flag changes in %v: %v", confDir, err)
		}
	}
	fnet.ChangeMaxPayloadSize(*newMaxPayloadSizeKb * fnet.KILOBYTE)
	percList, err := stats.ParsePercentiles(*percentilesFlag)
	if err != nil {
		usageErr("Unable to extract percentiles from -p: ", err)
	}
	baseURL := strings.Trim(*baseURLFlag, " \t\n\r/") // remove trailing slash and other whitespace
	sync := strings.TrimSpace(*syncFlag)
	if sync != "" {
		if !ui.Sync(os.Stdout, sync, *dataDirFlag) {
			os.Exit(1)
		}
	}
  
  // 处理各种子命令,有部分命令是作为服务器端的,isServer 参数用来做标记
	isServer := false
	switch command {
	case "curl":
		fortioLoad(true, nil)
	case "nc":
		fortioNC()
	case "load":
		fortioLoad(*curlFlag, percList)
	case "redirect":
		isServer = true
		fhttp.RedirectToHTTPS(*redirectFlag)
	case "report":
		isServer = true
		if *redirectFlag != disabled {
			fhttp.RedirectToHTTPS(*redirectFlag)
		}
		if !ui.Report(baseURL, *echoPortFlag, *dataDirFlag) {
			os.Exit(1) // error already logged
		}
	case "tcp-echo":
		isServer = true
		fnet.TCPEchoServer("tcp-echo", *tcpPortFlag)
		startProxies()
	case "udp-echo":
		isServer = true
		fnet.UDPEchoServer("udp-echo", *udpPortFlag, *udpAsyncFlag)
		startProxies()
	case "proxies":
		if len(flag.Args()) != 0 {
			usageErr("Error: fortio proxies command only takes -P / -M flags")
		}
		isServer = true
		if startProxies() == 0 {
			usageErr("Error: fortio proxies command needs at least one -P / -M flag")
		}
	case "server":
		isServer = true
		if *tcpPortFlag != disabled {
			fnet.TCPEchoServer("tcp-echo", *tcpPortFlag)
		}
		if *udpPortFlag != disabled {
			fnet.UDPEchoServer("udp-echo", *udpPortFlag, *udpAsyncFlag)
		}
		if *grpcPortFlag != disabled {
			fgrpc.PingServer(*grpcPortFlag, *bincommon.CertFlag, *bincommon.KeyFlag, fgrpc.DefaultHealthServiceName, uint32(*maxStreamsFlag))
		}
		if *redirectFlag != disabled {
			fhttp.RedirectToHTTPS(*redirectFlag)
		}
		if !ui.Serve(baseURL, *echoPortFlag, *echoDbgPathFlag, *uiPathFlag, *dataDirFlag, percList) {
			os.Exit(1) // error already logged
		}
		startProxies()
	case "grpcping":
		grpcClient()
	default:
		usageErr("Error: unknown command ", command)
	}
	if isServer {
		if confDir == "" {
			log.Infof("Note: not using dynamic flag watching (use -config to set watch directory)")
		}
		serverLoop(sync)
	}
}

2 - command

Fortio的命令

2.1 - load command

Fortio 的 load 命令

2.1.1 - load command

Fortio 的 load 命令

入口

Load 命令的入口在main函数中:

var (
curlFlag   = flag.Bool("curl", false, "Just fetch the content once")
percentilesFlag = flag.String("p", "50,75,90,99,99.9", "List of pXX to calculate")
)

func main() {
	percList, err := stats.ParsePercentiles(*percentilesFlag)
	......
	
	isServer := false
	switch command {
	......
	case "curl":
		fortioLoad(true, nil)
	case "load":
		fortioLoad(*curlFlag, percList)
	......
}

load 命令和 curl 命令的实现是相同的,只是 curl 命令只执行一次请求来获取内容。

fortioLoad() 的实现

curl 命令

// nolint: funlen, gocognit // maybe refactor/shorten later.
func fortioLoad(justCurl bool, percList []float64) {
	if len(flag.Args()) != 1 {
		usageErr("Error: fortio load/curl needs a url or destination")
	}
	httpOpts := bincommon.SharedHTTPOptions()
	if justCurl {
    // 对于 curl 命令,实现很简单
		bincommon.FetchURL(httpOpts)
		return
	}
	......
}

以下是 load 命令。

处理各种参数

先处理各种参数:

	url := httpOpts.URL
	prevGoMaxProcs := runtime.GOMAXPROCS(*goMaxProcsFlag)
	out := os.Stderr
	qps := *qpsFlag // TODO possibly use translated <=0 to "max" from results/options normalization in periodic/
	if *calcQPS {
		if *exactlyFlag == 0 || *durationFlag <= 0 {
			usageErr("Error: can't use `-calc-qps` without also specifying `-n` and `-t`")
		}
		qps = float64(*exactlyFlag) / (*durationFlag).Seconds()
		log.LogVf("Calculated QPS to do %d request in %v: %f", *exactlyFlag, *durationFlag, qps)
	}
	_, _ = fmt.Fprintf(out, "Fortio %s running at %g queries per second, %d->%d procs",
		version.Short(), qps, prevGoMaxProcs, runtime.GOMAXPROCS(0))
	if *exactlyFlag > 0 {
		_, _ = fmt.Fprintf(out, ", for %d calls: %s\n", *exactlyFlag, url)
	} else {
		if *durationFlag <= 0 {
			// Infinite mode is determined by having a negative duration value
			*durationFlag = -1
			_, _ = fmt.Fprintf(out, ", until interrupted: %s\n", url)
		} else {
			_, _ = fmt.Fprintf(out, ", for %v: %s\n", *durationFlag, url)
		}
	}
	if qps <= 0 {
		qps = -1 // 0==unitialized struct == default duration, -1 (0 for flag) is max
	}
	labels := *labelsFlag
	if labels == "" {
		hname, _ := os.Hostname()
		shortURL := url
		for _, p := range []string{"https://", "http://"} {
			if strings.HasPrefix(url, p) {
				shortURL = url[len(p):]
				break
			}
		}
		labels = shortURL + " , " + strings.SplitN(hname, ".", 2)[0]
		log.LogVf("Generated Labels: %s", labels)
	}

准备RunnerOptions

	ro := periodic.RunnerOptions{
		QPS:         qps,
		Duration:    *durationFlag,
		NumThreads:  *numThreadsFlag,
		Percentiles: percList,
		Resolution:  *resolutionFlag,
		Out:         out,
		Labels:      labels,
		Exactly:     *exactlyFlag,
		Jitter:      *jitterFlag,
		Uniform:     *uniformFlag,
		RunID:       *bincommon.RunIDFlag,
		Offset:      *offsetFlag,
	}
	err := ro.AddAccessLogger(*accessLogFileFlag, *accessLogFileFormat)
	if err != nil {
		// Error already logged.
		os.Exit(1)
	}

运行测试

gRPC测试

	var res periodic.HasRunnerResult
	if *grpcFlag {
		o := fgrpc.GRPCRunnerOptions{
			RunnerOptions:      ro,
			Destination:        url,
			CACert:             *bincommon.CACertFlag,
			Insecure:           bincommon.TLSInsecure(),
			Service:            *healthSvcFlag,
			Streams:            *streamsFlag,
			AllowInitialErrors: *allowInitialErrorsFlag,
			Payload:            httpOpts.PayloadString(),
			Delay:              *pingDelayFlag,
			UsePing:            *doPingLoadFlag,
			UnixDomainSocket:   httpOpts.UnixDomainSocket,
		}
		res, err = fgrpc.RunGRPCTest(&o)
	} 

TCP测试

    else if strings.HasPrefix(url, tcprunner.TCPURLPrefix) {
		o := tcprunner.RunnerOptions{
			RunnerOptions: ro,
		}
		o.ReqTimeout = httpOpts.HTTPReqTimeOut
		o.Destination = url
		o.Payload = httpOpts.Payload
		res, err = tcprunner.RunTCPTest(&o)
	}

UDP测试

  else if strings.HasPrefix(url, udprunner.UDPURLPrefix) {
		o := udprunner.RunnerOptions{
			RunnerOptions: ro,
		}
		o.ReqTimeout = *udpTimeoutFlag
		o.Destination = url
		o.Payload = httpOpts.Payload
		res, err = udprunner.RunUDPTest(&o)
	}
}

HTTP测试

  else {
		o := fhttp.HTTPRunnerOptions{
			HTTPOptions:        *httpOpts,
			RunnerOptions:      ro,
			Profiler:           *profileFlag,
			AllowInitialErrors: *allowInitialErrorsFlag,
			AbortOn:            *abortOnFlag,
		}
		res, err = fhttp.RunHTTPTest(&o)
	}

处理测试结果

	if err != nil {
		_, _ = fmt.Fprintf(out, "Aborting because of %v\n", err)
		os.Exit(1)
	}
	rr := res.Result()
	warmup := *numThreadsFlag
	if ro.Exactly > 0 {
		warmup = 0
	}
	_, _ = fmt.Fprintf(out, "All done %d calls (plus %d warmup) %.3f ms avg, %.1f qps\n",
		rr.DurationHistogram.Count,
		warmup,
		1000.*rr.DurationHistogram.Avg,
		rr.ActualQPS)
	jsonFileName := *jsonFlag
	if *autoSaveFlag || len(jsonFileName) > 0 { //nolint: nestif // but probably should breakup this function
		var j []byte
		j, err = json.MarshalIndent(res, "", "  ")
		if err != nil {
			log.Fatalf("Unable to json serialize result: %v", err)
		}
		var f *os.File
		if jsonFileName == "-" {
			f = os.Stdout
			jsonFileName = "stdout"
		} else {
			if len(jsonFileName) == 0 {
				jsonFileName = path.Join(*dataDirFlag, rr.ID()+".json")
			}
			f, err = os.Create(jsonFileName)
			if err != nil {
				log.Fatalf("Unable to create %s: %v", jsonFileName, err)
			}
		}
		n, err := f.Write(append(j, '\n'))
		if err != nil {
			log.Fatalf("Unable to write json to %s: %v", jsonFileName, err)
		}
		if f != os.Stdout {
			err := f.Close()
			if err != nil {
				log.Fatalf("Close error for %s: %v", jsonFileName, err)
			}
		}
		_, _ = fmt.Fprintf(out, "Successfully wrote %d bytes of Json data to %s\n", n, jsonFileName)
	}
}

3 - runner

Fortio的runner

3.1 - gRPC runner

Fortio的gRPC runner

3.1.1 - grpcrunner.go

Fortio的grpcrunner.go源码

GRPCRunnerResults 结构体定义

GRPCRunnerResults是 GRPCRunner 的结果聚合。也是每个线程/goroutine使用的内部类型。

// GRPCRunnerResults is the aggregated result of an GRPCRunner.
// Also is the internal type used per thread/goroutine.
type GRPCRunnerResults struct {
	periodic.RunnerResults
	clientH     grpc_health_v1.HealthClient					// 用于 health
	reqH        grpc_health_v1.HealthCheckRequest   // 用于 health
	clientP     PingServerClient										// 用于 ping
	reqP        PingMessage													// 用于 ping
	RetCodes    HealthResultMap											// 用于 health
	Destination string
	Streams     int
	Ping        bool
}

GRPCRunnerOptions 结构体定义

GRPCRunnerOptions 包括基本的 RunnerOptions 和 Grpc 特定的选项。

// GRPCRunnerOptions includes the base RunnerOptions plus grpc specific
// options.
type GRPCRunnerOptions struct {
	periodic.RunnerOptions
	Destination        string
	Service            string        // Service to be checked when using grpc health check
	Profiler           string        // file to save profiles to. defaults to no profiling
	Payload            string        // Payload to be sent for grpc ping service
	Streams            int           // number of streams. total go routines and data streams will be streams*numthreads.
	Delay              time.Duration // Delay to be sent when using grpc ping service
	CACert             string        // Path to CA certificate for grpc TLS
	CertOverride       string        // Override the cert virtual host of authority for testing
	Insecure           bool          // Allow unknown CA / self signed
	AllowInitialErrors bool          // whether initial errors don't cause an abort
	UsePing            bool          // use our own Ping proto for grpc load instead of standard health check one.
	UnixDomainSocket   string        // unix domain socket path to use for physical connection instead of Destination
}

Dial()方法实现

当 serverAddr 有 HTTPS 前缀或有提供 cert 时,Dial 使用不安全或 tls 传输安全的grpc。如果 override 被设置为一个非空字符串。它将覆盖请求中权威的虚拟主机名。

// Dial dials grpc using insecure or tls transport security when serverAddr
// has prefixHTTPS or cert is provided. If override is set to a non empty string,
// it will override the virtual host name of authority in requests.
func Dial(o *GRPCRunnerOptions) (conn *grpc.ClientConn, err error) {
	var opts []grpc.DialOption
	switch {
	case o.CACert != "":
		var creds credentials.TransportCredentials
		creds, err = credentials.NewClientTLSFromFile(o.CACert, o.CertOverride)
		if err != nil {
			log.Errf("Invalid TLS credentials: %v\n", err)
			return nil, err
		}
		log.Infof("Using CA certificate %v to construct TLS credentials", o.CACert)
		opts = append(opts, grpc.WithTransportCredentials(creds))
	case strings.HasPrefix(o.Destination, fnet.PrefixHTTPS):
		creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: o.Insecure}) // nolint: gosec // explicit flag
		opts = append(opts, grpc.WithTransportCredentials(creds))
	default:
		opts = append(opts, grpc.WithInsecure())
	}
	serverAddr := grpcDestination(o.Destination)
  
  // 支持 UnixDomainSocket
	if o.UnixDomainSocket != "" {
		log.Warnf("Using domain socket %v instead of %v for grpc connection", o.UnixDomainSocket, serverAddr)
		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
			return net.Dial(fnet.UnixDomainSocket, o.UnixDomainSocket)
		}))
	}
  
  // 开始建立grpc连接
	conn, err = grpc.Dial(serverAddr, opts...)
	if err != nil {
		log.Errf("failed to connect to %s with certificate %s and override %s: %v", serverAddr, o.CACert, o.CertOverride, err)
	}
	return conn, err
}

Run()方法实现

Run 方法以目标 QPS 进行 GRPC 健康检查或PING。要在RunnerOptions中设置为功能。

// Run exercises GRPC health check or ping at the target QPS.
// To be set as the Function in RunnerOptions.
func (grpcstate *GRPCRunnerResults) Run(t int) {
	log.Debugf("Calling in %d", t)
	var err error
	var res interface{}
	status := grpc_health_v1.HealthCheckResponse_SERVING
	if grpcstate.Ping {
    // 如果指定要 ping,则以 ping 方法替代默认的 health check
		res, err = grpcstate.clientP.Ping(context.Background(), &grpcstate.reqP)
	} else {
    // 默认为标准的 health check
		var r *grpc_health_v1.HealthCheckResponse
		r, err = grpcstate.clientH.Check(context.Background(), &grpcstate.reqH)
		if r != nil {
			status = r.Status
			res = r
		}
	}
	log.Debugf("For %d (ping=%v) got %v %v", t, grpcstate.Ping, err, res)
  
  // 保存运行的结果
	if err != nil {
		log.Warnf("Error making grpc call: %v", err)
		grpcstate.RetCodes[Error]++
	} else {
		grpcstate.RetCodes[status.String()]++
	}
}

RunGRPCTest() 方法实现

处理参数

// RunGRPCTest runs an http test and returns the aggregated stats.
// nolint: funlen, gocognit
func RunGRPCTest(o *GRPCRunnerOptions) (*GRPCRunnerResults, error) {
	if o.Streams < 1 {
		o.Streams = 1
	}
	if o.NumThreads < 1 {
		// sort of todo, this redoing some of periodic normalize (but we can't use normalize which does too much)
		o.NumThreads = periodic.DefaultRunnerOptions.NumThreads
	}
  // 目前只支持 ping 和 health,可惜
	if o.UsePing {
		o.RunType = "GRPC Ping"
		if o.Delay > 0 {
			o.RunType += fmt.Sprintf(" Delay=%v", o.Delay)
		}
	} else {
		o.RunType = "GRPC Health"
	}
	pll := len(o.Payload)
	if pll > 0 {
		o.RunType += fmt.Sprintf(" PayloadLength=%d", pll)
	}

开始执行测试

	log.Infof("Starting %s test for %s with %d*%d threads at %.1f qps", o.RunType, o.Destination, o.Streams, o.NumThreads, o.QPS)
	o.NumThreads *= o.Streams
	r := periodic.NewPeriodicRunner(&o.RunnerOptions)
	defer r.Options().Abort()
	numThreads := r.Options().NumThreads // may change
	total := GRPCRunnerResults{
		RetCodes:    make(HealthResultMap),
		Destination: o.Destination,
		Streams:     o.Streams,
		Ping:        o.UsePing,
	}
	grpcstate := make([]GRPCRunnerResults, numThreads)
	out := r.Options().Out // Important as the default value is set from nil to stdout inside NewPeriodicRunner
	var conn *grpc.ClientConn
	var err error
	ts := time.Now().UnixNano()
	for i := 0; i < numThreads; i++ {
		r.Options().Runners[i] = &grpcstate[i]
		if (i % o.Streams) == 0 {
			conn, err = Dial(o)
			if err != nil {
				log.Errf("Error in grpc dial for %s %v", o.Destination, err)
				return nil, err
			}
		} else {
			log.Debugf("Reusing previous client connection for %d", i)
		}
		grpcstate[i].Ping = o.UsePing
		var err error
		if o.UsePing { // nolint: nestif
			grpcstate[i].clientP = NewPingServerClient(conn)
			if grpcstate[i].clientP == nil {
				return nil, fmt.Errorf("unable to create ping client %d for %s", i, o.Destination)
			}
			grpcstate[i].reqP = PingMessage{Payload: o.Payload, DelayNanos: o.Delay.Nanoseconds(), Seq: int64(i), Ts: ts}
			if o.Exactly <= 0 {
				_, err = grpcstate[i].clientP.Ping(context.Background(), &grpcstate[i].reqP)
			}
		} else {
			grpcstate[i].clientH = grpc_health_v1.NewHealthClient(conn)
			if grpcstate[i].clientH == nil {
				return nil, fmt.Errorf("unable to create health client %d for %s", i, o.Destination)
			}
			grpcstate[i].reqH = grpc_health_v1.HealthCheckRequest{Service: o.Service}
			if o.Exactly <= 0 {
				_, err = grpcstate[i].clientH.Check(context.Background(), &grpcstate[i].reqH)
			}
		}
		if !o.AllowInitialErrors && err != nil {
			log.Errf("Error in first grpc call (ping = %v) for %s: %v", o.UsePing, o.Destination, err)
			return nil, err
		}
		// Setup the stats for each 'thread'
		grpcstate[i].RetCodes = make(HealthResultMap)
	}

profile和测试结果

	if o.Profiler != "" {
		fc, err := os.Create(o.Profiler + ".cpu")
		if err != nil {
			log.Critf("Unable to create .cpu profile: %v", err)
			return nil, err
		}
		if err = pprof.StartCPUProfile(fc); err != nil {
			log.Critf("Unable to start cpu profile: %v", err)
		}
	}
	total.RunnerResults = r.Run()
	if o.Profiler != "" {
		pprof.StopCPUProfile()
		fm, err := os.Create(o.Profiler + ".mem")
		if err != nil {
			log.Critf("Unable to create .mem profile: %v", err)
			return nil, err
		}
		runtime.GC() // get up-to-date statistics
		if err = pprof.WriteHeapProfile(fm); err != nil {
			log.Critf("Unable to write heap profile: %v", err)
		}
		fm.Close()
		fmt.Printf("Wrote profile data to %s.{cpu|mem}\n", o.Profiler)
	}
	// Numthreads may have reduced
	numThreads = r.Options().NumThreads
	keys := []string{}
	for i := 0; i < numThreads; i++ {
		// Q: is there some copying each time stats[i] is used?
		for k := range grpcstate[i].RetCodes {
			if _, exists := total.RetCodes[k]; !exists {
				keys = append(keys, k)
			}
			total.RetCodes[k] += grpcstate[i].RetCodes[k]
		}
		// TODO: if grpc client needs 'cleanup'/Close like http one, do it on original NumThreads
	}
	// Cleanup state:
	r.Options().ReleaseRunners()
	which := "Health"
	if o.UsePing {
		which = "Ping"
	}
	_, _ = fmt.Fprintf(out, "Jitter: %t\n", total.Jitter)
	for _, k := range keys {
		_, _ = fmt.Fprintf(out, "%s %s : %d\n", which, k, total.RetCodes[k])
	}
	return &total, nil
}

grpcDestination() 方法实现

grpcDestination 方法解析 dest,并根据 dest 是一个主机名、IP地址、hostname:portip:port 返回 dest:port。如果 dest 是一个无效的主机名或无效的 IP 地址,则返回原始 dest。如果存在 http/https 前缀,将从 destination 中删除,如果在 destination 中没有指定http、https或 :port,则端口号被设置为StandardHTTPPort,StandardHTTPSPort用于https,或 DefaultGRPCPort。

func grpcDestination(dest string) (parsedDest string) {
	var port string
  // 从dest中剥离任何无意的http/https方案前缀,并设置端口号。
	// strip any unintentional http/https scheme prefixes from dest
	// and set the port number.
	switch {
	case strings.HasPrefix(dest, fnet.PrefixHTTP):
		parsedDest = strings.TrimSuffix(strings.Replace(dest, fnet.PrefixHTTP, "", 1), "/")
		port = fnet.StandardHTTPPort
		log.Infof("stripping http scheme. grpc destination: %v: grpc port: %s",
			parsedDest, port)
	case strings.HasPrefix(dest, fnet.PrefixHTTPS):
		parsedDest = strings.TrimSuffix(strings.Replace(dest, fnet.PrefixHTTPS, "", 1), "/")
		port = fnet.StandardHTTPSPort
		log.Infof("stripping https scheme. grpc destination: %v. grpc port: %s",
			parsedDest, port)
	default:
		parsedDest = dest
		port = fnet.DefaultGRPCPort
	}
  
  
	if _, _, err := net.SplitHostPort(parsedDest); err == nil {
		return parsedDest
	}
	if ip := net.ParseIP(parsedDest); ip != nil {
		switch {
		case ip.To4() != nil:
			parsedDest = ip.String() + fnet.NormalizePort(port)
			return parsedDest
		case ip.To16() != nil:
			parsedDest = "[" + ip.String() + "]" + fnet.NormalizePort(port)
			return parsedDest
		}
	}
	// parsedDest is in the form of a domain name,
	// append ":port" and return.
	parsedDest += fnet.NormalizePort(port)
	return parsedDest
}

三个默认的端口分别是:

	// DefaultGRPCPort is the Fortio gRPC server default port number.
	DefaultGRPCPort = "8079"
	// StandardHTTPPort is the Standard http port number.
	StandardHTTPPort = "80"
	// StandardHTTPSPort is the Standard https port number.
	StandardHTTPSPort = "443"

3.1.2 - pingsrv.go

Fortio的pingsrv.go源码

常量和结构体定义

const (
   // DefaultHealthServiceName is the default health service name used by fortio.
   DefaultHealthServiceName = "ping"
   // Error indicates that something went wrong with healthcheck in grpc.
   Error = "ERROR"
)

type pingSrv struct{}

Ping() 方法实现

Ping() 方法的服务器端实现,没啥特殊。

func (s *pingSrv) Ping(c context.Context, in *PingMessage) (*PingMessage, error) {
	log.LogVf("Ping called %+v (ctx %+v)", *in, c)
	out := *in // copy the input including the payload etc
	out.Ts = time.Now().UnixNano()
	if in.DelayNanos > 0 {
		s := time.Duration(in.DelayNanos)
		log.LogVf("GRPC ping: sleeping for %v", s)
		time.Sleep(s)
	}
	return &out, nil
}

PingServer()方法实现

PingServer 方法启动 grpc ping(和 health)echo 服务器。

返回被绑定的端口(当传递 “0” 作为端口以获得一个动态服务器时非常有用)。

传递 healthServiceName 以用于 grpc 服务名称的健康检查(或传递 DefaultHealthServiceName),以标记为SERVING。

传递 maxConcurrentStreams > 0 来设置该选项。

// PingServer starts a grpc ping (and health) echo server.
// returns the port being bound (useful when passing "0" as the port to
// get a dynamic server). Pass the healthServiceName to use for the
// grpc service name health check (or pass DefaultHealthServiceName)
// to be marked as SERVING. Pass maxConcurrentStreams > 0 to set that option.
func PingServer(port, cert, key, healthServiceName string, maxConcurrentStreams uint32) net.Addr {
	socket, addr := fnet.Listen("grpc '"+healthServiceName+"'", port)
	if addr == nil {
		return nil
	}
	var grpcOptions []grpc.ServerOption
	if maxConcurrentStreams > 0 {
		log.Infof("Setting grpc.MaxConcurrentStreams server to %d", maxConcurrentStreams)
		grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(maxConcurrentStreams))
	}
	if cert != "" && key != "" {
		creds, err := credentials.NewServerTLSFromFile(cert, key)
		if err != nil {
			log.Fatalf("Invalid TLS credentials: %v\n", err)
		}
		log.Infof("Using server certificate %v to construct TLS credentials", cert)
		log.Infof("Using server key %v to construct TLS credentials", key)
		grpcOptions = append(grpcOptions, grpc.Creds(creds))
	}
  
  // 创建 grpc server
	grpcServer := grpc.NewServer(grpcOptions...)
	reflection.Register(grpcServer)
  
  // 在 grpc server上注册 healthServer
	healthServer := health.NewServer()
	healthServer.SetServingStatus(healthServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
	grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
  
  // 在 grpc server上注册 pingServer
	RegisterPingServerServer(grpcServer, &pingSrv{})
  
  // 启动 grpc server
	go func() {
		if err := grpcServer.Serve(socket); err != nil {
			log.Fatalf("failed to start grpc server: %v", err)
		}
	}()
	return addr
}

PingServerTCP()方法实现

PingServerTCP 是假设 tcp 而不是可能的 unix domain socket 端口的 PingServer() ,返回数字端口。

// PingServerTCP is PingServer() assuming tcp instead of possible unix domain socket port, returns
// the numeric port.
func PingServerTCP(port, cert, key, healthServiceName string, maxConcurrentStreams uint32) int {
	addr := PingServer(port, cert, key, healthServiceName, maxConcurrentStreams)
	if addr == nil {
		return -1
	}
	return addr.(*net.TCPAddr).Port
}

PingClientCall() 方法实现

PingClientCall调用ping服务(估计是作为PingServer在目的地运行)。返回以秒为单位的平均往返行程。

func PingClientCall(serverAddr, cacert string, n int, payload string, delay time.Duration, insecure bool) (float64, error) {
	o := GRPCRunnerOptions{Destination: serverAddr, CACert: cacert, Insecure: insecure}
  // 建立连接
	conn, err := Dial(&o) // somehow this never seem to error out, error comes later
	if err != nil {
		return -1, err // error already logged
	}
	msg := &PingMessage{Payload: payload, DelayNanos: delay.Nanoseconds()}
  // 创建 ping client
	cli := NewPingServerClient(conn)
  
	// Warm up: 热身
	_, err = cli.Ping(context.Background(), msg)
	if err != nil {
		log.Errf("grpc error from Ping0 %v", err)
		return -1, err
	}
  
	skewHistogram := stats.NewHistogram(-10, 2)
	rttHistogram := stats.NewHistogram(0, 10)
	for i := 1; i <= n; i++ {
    // 执行一次 ping 
		msg.Seq = int64(i)
		t1a := time.Now().UnixNano()
		msg.Ts = t1a
		res1, err := cli.Ping(context.Background(), msg)
		t2a := time.Now().UnixNano()
		if err != nil {
			log.Errf("grpc error from Ping1 iter %d: %v", i, err)
			return -1, err
		}
    
    // 再执行一次 ping 
		t1b := res1.Ts
		res2, err := cli.Ping(context.Background(), msg)
		t3a := time.Now().UnixNano()
		t2b := res2.Ts
		if err != nil {
			log.Errf("grpc error from Ping2 iter %d: %v", i, err)
			return -1, err
		}
    
    // 计算结果
		rt1 := t2a - t1a
		rttHistogram.Record(float64(rt1) / 1000.)
		rt2 := t3a - t2a
		rttHistogram.Record(float64(rt2) / 1000.)
		rtR := t2b - t1b
		rttHistogram.Record(float64(rtR) / 1000.)
		midR := t1b + (rtR / 2)
		avgRtt := (rt1 + rt2 + rtR) / 3
		x := (midR - t2a)
		log.Infof("Ping RTT %d (avg of %d, %d, %d ns) clock skew %d",
			avgRtt, rt1, rtR, rt2, x)
		skewHistogram.Record(float64(x) / 1000.)
		msg = res2
	}
	skewHistogram.Print(os.Stdout, "Clock skew histogram usec", []float64{50})
	rttHistogram.Print(os.Stdout, "RTT histogram usec", []float64{50})
	return rttHistogram.Avg() / 1e6, nil
}

GrpcHealthCheck 实现

GrpcHealthCheck对标准的grpc健康检查服务进行grpc客户端调用。

// GrpcHealthCheck makes a grpc client call to the standard grpc health check
// service.
func GrpcHealthCheck(serverAddr, cacert string, svcname string, n int, insecure bool) (*HealthResultMap, error) {
	log.Debugf("GrpcHealthCheck for %s svc '%s', %d iterations", serverAddr, svcname, n)
	o := GRPCRunnerOptions{Destination: serverAddr, CACert: cacert, Insecure: insecure}
  
  // 建立连接
	conn, err := Dial(&o)
	if err != nil {
		return nil, err
	}
  
  
	msg := &grpc_health_v1.HealthCheckRequest{Service: svcname}
	cli := grpc_health_v1.NewHealthClient(conn)
	rttHistogram := stats.NewHistogram(0, 10)
	statuses := make(HealthResultMap)

	for i := 1; i <= n; i++ {
    // 执行 check
		start := time.Now()
		res, err := cli.Check(context.Background(), msg)
		dur := time.Since(start)
		log.LogVf("Reply from health check %d: %+v", i, res)
		if err != nil {
			log.Errf("grpc error from Check %v", err)
			return nil, err
		}
		statuses[res.Status.String()]++
		rttHistogram.Record(dur.Seconds() * 1000000.)
	}
	rttHistogram.Print(os.Stdout, "RTT histogram usec", []float64{50})
	for k, v := range statuses {
		fmt.Printf("Health %s : %d\n", k, v)
	}
	return &statuses, nil
}

GrpcHealthCheck()是单线程循环调用 check,因此如果要多线程并发,就需要多线程下调用 GrpcHealthCheck()。

3.1.3 - ping测试

Fortio的gRPC ping测试

整体看一下 gRPC ping 的实现。

proto 定义

fgrpc/ping.proto 文件定义了 PingServer 服务和 Ping 方法:

syntax = "proto3";
package fgrpc;

message PingMessage {
  int64 seq      = 1; // sequence number
  int64 ts       = 2; // src send ts / dest receive ts  //这个是timestamp
  string payload = 3; // extra packet data
  int64 delayNanos = 4; // delay the response by x nanoseconds
}

service PingServer {
  rpc Ping (PingMessage) returns (PingMessage) {}
}

生成的对应的 go 代码在 fgrpc/ping.pb.go 中。

这是一个非常简单的方法,参数也足够简单。

客户端发起 ping 测试请求

ts := time.Now().UnixNano()
// 创建 PingServerClient
grpcstate[i].clientP = NewPingServerClient(conn)
if grpcstate[i].clientP == nil {
  return nil, fmt.Errorf("unable to create ping client %d for %s", i, o.Destination)
}
// 组装请求的message
grpcstate[i].reqP = PingMessage{Payload: o.Payload, DelayNanos: o.Delay.Nanoseconds(), Seq: int64(i), Ts: ts}
if o.Exactly <= 0 {
  // 调用 ping 方法
  _, err = grpcstate[i].clientP.Ping(context.Background(), &grpcstate[i].reqP)
}

很正统的 gRPC 调用方式。

疑虑:为什么要为每个线程都建立一个 gRPC 连接?其实可以多线程共用一个 gRPC 连接的。

服务器端接收并响应 ping 测试请求

fgrpc/pingsrv.go 中实现了一个简单的 ping 方法:

func (s *pingSrv) Ping(c context.Context, in *PingMessage) (*PingMessage, error) {
	log.LogVf("Ping called %+v (ctx %+v)", *in, c)
	out := *in // copy the input including the payload etc
	out.Ts = time.Now().UnixNano()
	if in.DelayNanos > 0 {
		s := time.Duration(in.DelayNanos)
		log.LogVf("GRPC ping: sleeping for %v", s)
		time.Sleep(s)
	}
	return &out, nil
}

3.1.4 - health测试

Fortio的gRPC health测试

执行 health 测试

			grpcstate[i].clientH = grpc_health_v1.NewHealthClient(conn)
			if grpcstate[i].clientH == nil {
				return nil, fmt.Errorf("unable to create health client %d for %s", i, o.Destination)
			}
			grpcstate[i].reqH = grpc_health_v1.HealthCheckRequest{Service: o.Service}
			if o.Exactly <= 0 {
				_, err = grpcstate[i].clientH.Check(context.Background(), &grpcstate[i].reqH)
			}