1 - grpcrunner.go
GRPCRunnerResults 结构体定义
GRPCRunnerResults是 GRPCRunner 的结果聚合。也是每个线程/goroutine使用的内部类型。
// GRPCRunnerResults is the aggregated result of an GRPCRunner.
// Also is the internal type used per thread/goroutine.
type GRPCRunnerResults struct {
periodic.RunnerResults
clientH grpc_health_v1.HealthClient // 用于 health
reqH grpc_health_v1.HealthCheckRequest // 用于 health
clientP PingServerClient // 用于 ping
reqP PingMessage // 用于 ping
RetCodes HealthResultMap // 用于 health
Destination string
Streams int
Ping bool
}
GRPCRunnerOptions 结构体定义
GRPCRunnerOptions 包括基本的 RunnerOptions 和 Grpc 特定的选项。
// GRPCRunnerOptions includes the base RunnerOptions plus grpc specific
// options.
type GRPCRunnerOptions struct {
periodic.RunnerOptions
Destination string
Service string // Service to be checked when using grpc health check
Profiler string // file to save profiles to. defaults to no profiling
Payload string // Payload to be sent for grpc ping service
Streams int // number of streams. total go routines and data streams will be streams*numthreads.
Delay time.Duration // Delay to be sent when using grpc ping service
CACert string // Path to CA certificate for grpc TLS
CertOverride string // Override the cert virtual host of authority for testing
Insecure bool // Allow unknown CA / self signed
AllowInitialErrors bool // whether initial errors don't cause an abort
UsePing bool // use our own Ping proto for grpc load instead of standard health check one.
UnixDomainSocket string // unix domain socket path to use for physical connection instead of Destination
}
Dial()方法实现
当 serverAddr 有 HTTPS 前缀或有提供 cert 时,Dial 使用不安全或 tls 传输安全的grpc。如果 override 被设置为一个非空字符串。它将覆盖请求中权威的虚拟主机名。
// Dial dials grpc using insecure or tls transport security when serverAddr
// has prefixHTTPS or cert is provided. If override is set to a non empty string,
// it will override the virtual host name of authority in requests.
func Dial(o *GRPCRunnerOptions) (conn *grpc.ClientConn, err error) {
var opts []grpc.DialOption
switch {
case o.CACert != "":
var creds credentials.TransportCredentials
creds, err = credentials.NewClientTLSFromFile(o.CACert, o.CertOverride)
if err != nil {
log.Errf("Invalid TLS credentials: %v\n", err)
return nil, err
}
log.Infof("Using CA certificate %v to construct TLS credentials", o.CACert)
opts = append(opts, grpc.WithTransportCredentials(creds))
case strings.HasPrefix(o.Destination, fnet.PrefixHTTPS):
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: o.Insecure}) // nolint: gosec // explicit flag
opts = append(opts, grpc.WithTransportCredentials(creds))
default:
opts = append(opts, grpc.WithInsecure())
}
serverAddr := grpcDestination(o.Destination)
// 支持 UnixDomainSocket
if o.UnixDomainSocket != "" {
log.Warnf("Using domain socket %v instead of %v for grpc connection", o.UnixDomainSocket, serverAddr)
opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return net.Dial(fnet.UnixDomainSocket, o.UnixDomainSocket)
}))
}
// 开始建立grpc连接
conn, err = grpc.Dial(serverAddr, opts...)
if err != nil {
log.Errf("failed to connect to %s with certificate %s and override %s: %v", serverAddr, o.CACert, o.CertOverride, err)
}
return conn, err
}
Run()方法实现
Run 方法以目标 QPS 进行 GRPC 健康检查或PING。要在RunnerOptions中设置为功能。
// Run exercises GRPC health check or ping at the target QPS.
// To be set as the Function in RunnerOptions.
func (grpcstate *GRPCRunnerResults) Run(t int) {
log.Debugf("Calling in %d", t)
var err error
var res interface{}
status := grpc_health_v1.HealthCheckResponse_SERVING
if grpcstate.Ping {
// 如果指定要 ping,则以 ping 方法替代默认的 health check
res, err = grpcstate.clientP.Ping(context.Background(), &grpcstate.reqP)
} else {
// 默认为标准的 health check
var r *grpc_health_v1.HealthCheckResponse
r, err = grpcstate.clientH.Check(context.Background(), &grpcstate.reqH)
if r != nil {
status = r.Status
res = r
}
}
log.Debugf("For %d (ping=%v) got %v %v", t, grpcstate.Ping, err, res)
// 保存运行的结果
if err != nil {
log.Warnf("Error making grpc call: %v", err)
grpcstate.RetCodes[Error]++
} else {
grpcstate.RetCodes[status.String()]++
}
}
RunGRPCTest() 方法实现
处理参数
// RunGRPCTest runs an http test and returns the aggregated stats.
// nolint: funlen, gocognit
func RunGRPCTest(o *GRPCRunnerOptions) (*GRPCRunnerResults, error) {
if o.Streams < 1 {
o.Streams = 1
}
if o.NumThreads < 1 {
// sort of todo, this redoing some of periodic normalize (but we can't use normalize which does too much)
o.NumThreads = periodic.DefaultRunnerOptions.NumThreads
}
// 目前只支持 ping 和 health,可惜
if o.UsePing {
o.RunType = "GRPC Ping"
if o.Delay > 0 {
o.RunType += fmt.Sprintf(" Delay=%v", o.Delay)
}
} else {
o.RunType = "GRPC Health"
}
pll := len(o.Payload)
if pll > 0 {
o.RunType += fmt.Sprintf(" PayloadLength=%d", pll)
}
开始执行测试
log.Infof("Starting %s test for %s with %d*%d threads at %.1f qps", o.RunType, o.Destination, o.Streams, o.NumThreads, o.QPS)
o.NumThreads *= o.Streams
r := periodic.NewPeriodicRunner(&o.RunnerOptions)
defer r.Options().Abort()
numThreads := r.Options().NumThreads // may change
total := GRPCRunnerResults{
RetCodes: make(HealthResultMap),
Destination: o.Destination,
Streams: o.Streams,
Ping: o.UsePing,
}
grpcstate := make([]GRPCRunnerResults, numThreads)
out := r.Options().Out // Important as the default value is set from nil to stdout inside NewPeriodicRunner
var conn *grpc.ClientConn
var err error
ts := time.Now().UnixNano()
for i := 0; i < numThreads; i++ {
r.Options().Runners[i] = &grpcstate[i]
if (i % o.Streams) == 0 {
conn, err = Dial(o)
if err != nil {
log.Errf("Error in grpc dial for %s %v", o.Destination, err)
return nil, err
}
} else {
log.Debugf("Reusing previous client connection for %d", i)
}
grpcstate[i].Ping = o.UsePing
var err error
if o.UsePing { // nolint: nestif
grpcstate[i].clientP = NewPingServerClient(conn)
if grpcstate[i].clientP == nil {
return nil, fmt.Errorf("unable to create ping client %d for %s", i, o.Destination)
}
grpcstate[i].reqP = PingMessage{Payload: o.Payload, DelayNanos: o.Delay.Nanoseconds(), Seq: int64(i), Ts: ts}
if o.Exactly <= 0 {
_, err = grpcstate[i].clientP.Ping(context.Background(), &grpcstate[i].reqP)
}
} else {
grpcstate[i].clientH = grpc_health_v1.NewHealthClient(conn)
if grpcstate[i].clientH == nil {
return nil, fmt.Errorf("unable to create health client %d for %s", i, o.Destination)
}
grpcstate[i].reqH = grpc_health_v1.HealthCheckRequest{Service: o.Service}
if o.Exactly <= 0 {
_, err = grpcstate[i].clientH.Check(context.Background(), &grpcstate[i].reqH)
}
}
if !o.AllowInitialErrors && err != nil {
log.Errf("Error in first grpc call (ping = %v) for %s: %v", o.UsePing, o.Destination, err)
return nil, err
}
// Setup the stats for each 'thread'
grpcstate[i].RetCodes = make(HealthResultMap)
}
profile和测试结果
if o.Profiler != "" {
fc, err := os.Create(o.Profiler + ".cpu")
if err != nil {
log.Critf("Unable to create .cpu profile: %v", err)
return nil, err
}
if err = pprof.StartCPUProfile(fc); err != nil {
log.Critf("Unable to start cpu profile: %v", err)
}
}
total.RunnerResults = r.Run()
if o.Profiler != "" {
pprof.StopCPUProfile()
fm, err := os.Create(o.Profiler + ".mem")
if err != nil {
log.Critf("Unable to create .mem profile: %v", err)
return nil, err
}
runtime.GC() // get up-to-date statistics
if err = pprof.WriteHeapProfile(fm); err != nil {
log.Critf("Unable to write heap profile: %v", err)
}
fm.Close()
fmt.Printf("Wrote profile data to %s.{cpu|mem}\n", o.Profiler)
}
// Numthreads may have reduced
numThreads = r.Options().NumThreads
keys := []string{}
for i := 0; i < numThreads; i++ {
// Q: is there some copying each time stats[i] is used?
for k := range grpcstate[i].RetCodes {
if _, exists := total.RetCodes[k]; !exists {
keys = append(keys, k)
}
total.RetCodes[k] += grpcstate[i].RetCodes[k]
}
// TODO: if grpc client needs 'cleanup'/Close like http one, do it on original NumThreads
}
// Cleanup state:
r.Options().ReleaseRunners()
which := "Health"
if o.UsePing {
which = "Ping"
}
_, _ = fmt.Fprintf(out, "Jitter: %t\n", total.Jitter)
for _, k := range keys {
_, _ = fmt.Fprintf(out, "%s %s : %d\n", which, k, total.RetCodes[k])
}
return &total, nil
}
grpcDestination() 方法实现
grpcDestination 方法解析 dest,并根据 dest 是一个主机名、IP地址、hostname:port
或 ip:port
返回 dest:port
。如果 dest 是一个无效的主机名或无效的 IP 地址,则返回原始 dest。如果存在 http/https 前缀,将从 destination 中删除,如果在 destination 中没有指定http、https或 :port
,则端口号被设置为StandardHTTPPort,StandardHTTPSPort用于https,或 DefaultGRPCPort。
func grpcDestination(dest string) (parsedDest string) {
var port string
// 从dest中剥离任何无意的http/https方案前缀,并设置端口号。
// strip any unintentional http/https scheme prefixes from dest
// and set the port number.
switch {
case strings.HasPrefix(dest, fnet.PrefixHTTP):
parsedDest = strings.TrimSuffix(strings.Replace(dest, fnet.PrefixHTTP, "", 1), "/")
port = fnet.StandardHTTPPort
log.Infof("stripping http scheme. grpc destination: %v: grpc port: %s",
parsedDest, port)
case strings.HasPrefix(dest, fnet.PrefixHTTPS):
parsedDest = strings.TrimSuffix(strings.Replace(dest, fnet.PrefixHTTPS, "", 1), "/")
port = fnet.StandardHTTPSPort
log.Infof("stripping https scheme. grpc destination: %v. grpc port: %s",
parsedDest, port)
default:
parsedDest = dest
port = fnet.DefaultGRPCPort
}
if _, _, err := net.SplitHostPort(parsedDest); err == nil {
return parsedDest
}
if ip := net.ParseIP(parsedDest); ip != nil {
switch {
case ip.To4() != nil:
parsedDest = ip.String() + fnet.NormalizePort(port)
return parsedDest
case ip.To16() != nil:
parsedDest = "[" + ip.String() + "]" + fnet.NormalizePort(port)
return parsedDest
}
}
// parsedDest is in the form of a domain name,
// append ":port" and return.
parsedDest += fnet.NormalizePort(port)
return parsedDest
}
三个默认的端口分别是:
// DefaultGRPCPort is the Fortio gRPC server default port number.
DefaultGRPCPort = "8079"
// StandardHTTPPort is the Standard http port number.
StandardHTTPPort = "80"
// StandardHTTPSPort is the Standard https port number.
StandardHTTPSPort = "443"
2 - pingsrv.go
常量和结构体定义
const (
// DefaultHealthServiceName is the default health service name used by fortio.
DefaultHealthServiceName = "ping"
// Error indicates that something went wrong with healthcheck in grpc.
Error = "ERROR"
)
type pingSrv struct{}
Ping() 方法实现
Ping() 方法的服务器端实现,没啥特殊。
func (s *pingSrv) Ping(c context.Context, in *PingMessage) (*PingMessage, error) {
log.LogVf("Ping called %+v (ctx %+v)", *in, c)
out := *in // copy the input including the payload etc
out.Ts = time.Now().UnixNano()
if in.DelayNanos > 0 {
s := time.Duration(in.DelayNanos)
log.LogVf("GRPC ping: sleeping for %v", s)
time.Sleep(s)
}
return &out, nil
}
PingServer()方法实现
PingServer 方法启动 grpc ping(和 health)echo 服务器。
返回被绑定的端口(当传递 “0” 作为端口以获得一个动态服务器时非常有用)。
传递 healthServiceName 以用于 grpc 服务名称的健康检查(或传递 DefaultHealthServiceName),以标记为SERVING。
传递 maxConcurrentStreams > 0 来设置该选项。
// PingServer starts a grpc ping (and health) echo server.
// returns the port being bound (useful when passing "0" as the port to
// get a dynamic server). Pass the healthServiceName to use for the
// grpc service name health check (or pass DefaultHealthServiceName)
// to be marked as SERVING. Pass maxConcurrentStreams > 0 to set that option.
func PingServer(port, cert, key, healthServiceName string, maxConcurrentStreams uint32) net.Addr {
socket, addr := fnet.Listen("grpc '"+healthServiceName+"'", port)
if addr == nil {
return nil
}
var grpcOptions []grpc.ServerOption
if maxConcurrentStreams > 0 {
log.Infof("Setting grpc.MaxConcurrentStreams server to %d", maxConcurrentStreams)
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(maxConcurrentStreams))
}
if cert != "" && key != "" {
creds, err := credentials.NewServerTLSFromFile(cert, key)
if err != nil {
log.Fatalf("Invalid TLS credentials: %v\n", err)
}
log.Infof("Using server certificate %v to construct TLS credentials", cert)
log.Infof("Using server key %v to construct TLS credentials", key)
grpcOptions = append(grpcOptions, grpc.Creds(creds))
}
// 创建 grpc server
grpcServer := grpc.NewServer(grpcOptions...)
reflection.Register(grpcServer)
// 在 grpc server上注册 healthServer
healthServer := health.NewServer()
healthServer.SetServingStatus(healthServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
// 在 grpc server上注册 pingServer
RegisterPingServerServer(grpcServer, &pingSrv{})
// 启动 grpc server
go func() {
if err := grpcServer.Serve(socket); err != nil {
log.Fatalf("failed to start grpc server: %v", err)
}
}()
return addr
}
PingServerTCP()方法实现
PingServerTCP 是假设 tcp 而不是可能的 unix domain socket 端口的 PingServer() ,返回数字端口。
// PingServerTCP is PingServer() assuming tcp instead of possible unix domain socket port, returns
// the numeric port.
func PingServerTCP(port, cert, key, healthServiceName string, maxConcurrentStreams uint32) int {
addr := PingServer(port, cert, key, healthServiceName, maxConcurrentStreams)
if addr == nil {
return -1
}
return addr.(*net.TCPAddr).Port
}
PingClientCall() 方法实现
PingClientCall调用ping服务(估计是作为PingServer在目的地运行)。返回以秒为单位的平均往返行程。
func PingClientCall(serverAddr, cacert string, n int, payload string, delay time.Duration, insecure bool) (float64, error) {
o := GRPCRunnerOptions{Destination: serverAddr, CACert: cacert, Insecure: insecure}
// 建立连接
conn, err := Dial(&o) // somehow this never seem to error out, error comes later
if err != nil {
return -1, err // error already logged
}
msg := &PingMessage{Payload: payload, DelayNanos: delay.Nanoseconds()}
// 创建 ping client
cli := NewPingServerClient(conn)
// Warm up: 热身
_, err = cli.Ping(context.Background(), msg)
if err != nil {
log.Errf("grpc error from Ping0 %v", err)
return -1, err
}
skewHistogram := stats.NewHistogram(-10, 2)
rttHistogram := stats.NewHistogram(0, 10)
for i := 1; i <= n; i++ {
// 执行一次 ping
msg.Seq = int64(i)
t1a := time.Now().UnixNano()
msg.Ts = t1a
res1, err := cli.Ping(context.Background(), msg)
t2a := time.Now().UnixNano()
if err != nil {
log.Errf("grpc error from Ping1 iter %d: %v", i, err)
return -1, err
}
// 再执行一次 ping
t1b := res1.Ts
res2, err := cli.Ping(context.Background(), msg)
t3a := time.Now().UnixNano()
t2b := res2.Ts
if err != nil {
log.Errf("grpc error from Ping2 iter %d: %v", i, err)
return -1, err
}
// 计算结果
rt1 := t2a - t1a
rttHistogram.Record(float64(rt1) / 1000.)
rt2 := t3a - t2a
rttHistogram.Record(float64(rt2) / 1000.)
rtR := t2b - t1b
rttHistogram.Record(float64(rtR) / 1000.)
midR := t1b + (rtR / 2)
avgRtt := (rt1 + rt2 + rtR) / 3
x := (midR - t2a)
log.Infof("Ping RTT %d (avg of %d, %d, %d ns) clock skew %d",
avgRtt, rt1, rtR, rt2, x)
skewHistogram.Record(float64(x) / 1000.)
msg = res2
}
skewHistogram.Print(os.Stdout, "Clock skew histogram usec", []float64{50})
rttHistogram.Print(os.Stdout, "RTT histogram usec", []float64{50})
return rttHistogram.Avg() / 1e6, nil
}
GrpcHealthCheck 实现
GrpcHealthCheck对标准的grpc健康检查服务进行grpc客户端调用。
// GrpcHealthCheck makes a grpc client call to the standard grpc health check
// service.
func GrpcHealthCheck(serverAddr, cacert string, svcname string, n int, insecure bool) (*HealthResultMap, error) {
log.Debugf("GrpcHealthCheck for %s svc '%s', %d iterations", serverAddr, svcname, n)
o := GRPCRunnerOptions{Destination: serverAddr, CACert: cacert, Insecure: insecure}
// 建立连接
conn, err := Dial(&o)
if err != nil {
return nil, err
}
msg := &grpc_health_v1.HealthCheckRequest{Service: svcname}
cli := grpc_health_v1.NewHealthClient(conn)
rttHistogram := stats.NewHistogram(0, 10)
statuses := make(HealthResultMap)
for i := 1; i <= n; i++ {
// 执行 check
start := time.Now()
res, err := cli.Check(context.Background(), msg)
dur := time.Since(start)
log.LogVf("Reply from health check %d: %+v", i, res)
if err != nil {
log.Errf("grpc error from Check %v", err)
return nil, err
}
statuses[res.Status.String()]++
rttHistogram.Record(dur.Seconds() * 1000000.)
}
rttHistogram.Print(os.Stdout, "RTT histogram usec", []float64{50})
for k, v := range statuses {
fmt.Printf("Health %s : %d\n", k, v)
}
return &statuses, nil
}
GrpcHealthCheck()是单线程循环调用 check,因此如果要多线程并发,就需要多线程下调用 GrpcHealthCheck()。
3 - ping测试
整体看一下 gRPC ping 的实现。
proto 定义
fgrpc/ping.proto
文件定义了 PingServer 服务和 Ping 方法:
syntax = "proto3";
package fgrpc;
message PingMessage {
int64 seq = 1; // sequence number
int64 ts = 2; // src send ts / dest receive ts //这个是timestamp
string payload = 3; // extra packet data
int64 delayNanos = 4; // delay the response by x nanoseconds
}
service PingServer {
rpc Ping (PingMessage) returns (PingMessage) {}
}
生成的对应的 go 代码在 fgrpc/ping.pb.go
中。
这是一个非常简单的方法,参数也足够简单。
客户端发起 ping 测试请求
ts := time.Now().UnixNano()
// 创建 PingServerClient
grpcstate[i].clientP = NewPingServerClient(conn)
if grpcstate[i].clientP == nil {
return nil, fmt.Errorf("unable to create ping client %d for %s", i, o.Destination)
}
// 组装请求的message
grpcstate[i].reqP = PingMessage{Payload: o.Payload, DelayNanos: o.Delay.Nanoseconds(), Seq: int64(i), Ts: ts}
if o.Exactly <= 0 {
// 调用 ping 方法
_, err = grpcstate[i].clientP.Ping(context.Background(), &grpcstate[i].reqP)
}
很正统的 gRPC 调用方式。
疑虑:为什么要为每个线程都建立一个 gRPC 连接?其实可以多线程共用一个 gRPC 连接的。
服务器端接收并响应 ping 测试请求
fgrpc/pingsrv.go
中实现了一个简单的 ping 方法:
func (s *pingSrv) Ping(c context.Context, in *PingMessage) (*PingMessage, error) {
log.LogVf("Ping called %+v (ctx %+v)", *in, c)
out := *in // copy the input including the payload etc
out.Ts = time.Now().UnixNano()
if in.DelayNanos > 0 {
s := time.Duration(in.DelayNanos)
log.LogVf("GRPC ping: sleeping for %v", s)
time.Sleep(s)
}
return &out, nil
}
4 - health测试
执行 health 测试
grpcstate[i].clientH = grpc_health_v1.NewHealthClient(conn)
if grpcstate[i].clientH == nil {
return nil, fmt.Errorf("unable to create health client %d for %s", i, o.Destination)
}
grpcstate[i].reqH = grpc_health_v1.HealthCheckRequest{Service: o.Service}
if o.Exactly <= 0 {
_, err = grpcstate[i].clientH.Check(context.Background(), &grpcstate[i].reqH)
}