这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Healthz的源码学习

Dapr Healthz的源码学习

1 - health.go的源码学习

health checking的客户端实现

Dapr health package中的 health.go 文件的源码分析,health checking的客户端实现

代码实现

Option 方法定义

// Option is an a function that applies a health check option
type Option func(o *healthCheckOptions)

healthCheckOptions 结构体定义

healthCheckOptions 结构体

type healthCheckOptions struct {
	initialDelay      time.Duration
	requestTimeout    time.Duration
	failureThreshold  int
	interval          time.Duration
	successStatusCode int
}

With系列方法

WithXxx 方法用来设置上述5个健康检查的选项,每个方法都返回一个 Option 函数:

// WithInitialDelay sets the initial delay for the health check
func WithInitialDelay(delay time.Duration) Option {
	return func(o *healthCheckOptions) {
		o.initialDelay = delay
	}
}

// WithFailureThreshold sets the failure threshold for the health check
func WithFailureThreshold(threshold int) Option {
	return func(o *healthCheckOptions) {
		o.failureThreshold = threshold
	}
}

// WithRequestTimeout sets the request timeout for the health check
func WithRequestTimeout(timeout time.Duration) Option {
	return func(o *healthCheckOptions) {
		o.requestTimeout = timeout
	}
}

// WithSuccessStatusCode sets the status code for the health check
func WithSuccessStatusCode(code int) Option {
	return func(o *healthCheckOptions) {
		o.successStatusCode = code
	}
}

// WithInterval sets the interval for the health check
func WithInterval(interval time.Duration) Option {
	return func(o *healthCheckOptions) {
		o.interval = interval
	}
}

StartEndpointHealthCheck 方法

StartEndpointHealthCheck 方法用给定的选项在指定的地址上启动健康检查。它返回一个通道,如果端点是健康的则发出true,如果满足失败条件则发出false。

// StartEndpointHealthCheck starts a health check on the specified address with the given options.
// It returns a channel that will emit true if the endpoint is healthy and false if the failure conditions
// Have been met.
func StartEndpointHealthCheck(endpointAddress string, opts ...Option) chan bool {
	options := &healthCheckOptions{}
	applyDefaults(options)

   // 执行每个 Option 函数来设置健康检查的选项
	for _, o := range opts {
		o(options)
	}
	signalChan := make(chan bool, 1)

	go func(ch chan<- bool, endpointAddress string, options *healthCheckOptions) {
      // 设置健康检查的间隔时间 interval,默认5秒一次
		ticker := time.NewTicker(options.interval)
		failureCount := 0
      // 先 sleep initialDelay 时间再开始健康检查
		time.Sleep(options.initialDelay)

      // 创建 http client,设置请求超时时间为 requestTimeout
		client := &fasthttp.Client{
			MaxConnsPerHost:           5, // Limit Keep-Alive connections
			ReadTimeout:               options.requestTimeout,
			MaxIdemponentCallAttempts: 1,
		}

		req := fasthttp.AcquireRequest()
		req.SetRequestURI(endpointAddress)
		req.Header.SetMethod(fasthttp.MethodGet)
		defer fasthttp.ReleaseRequest(req)

		for range ticker.C {
			resp := fasthttp.AcquireResponse()
			err := client.DoTimeout(req, resp, options.requestTimeout)
         // 通过检查应答的状态码来判断健康检查是否成功: successStatusCode
			if err != nil || resp.StatusCode() != options.successStatusCode {
            // 健康检查失败,错误计数器加一
				failureCount++
            // 如果连续错误次数达到阈值 failureThreshold,则视为健康检查失败,发送false到channel
				if failureCount == options.failureThreshold {
					ch <- false
				}
			} else {
            // 健康检查成功,发送 true 到 channel
				ch <- true
            // 同时重制 failureCount
				failureCount = 0
			}
			fasthttp.ReleaseResponse(resp)
		}
	}(signalChan, endpointAddress, options)
	return signalChan
}

applyDefaults() 方法设置默认属性:

const (
	initialDelay      = time.Second * 1
	failureThreshold  = 2
	requestTimeout    = time.Second * 2
	interval          = time.Second * 5
	successStatusCode = 200
)

func applyDefaults(o *healthCheckOptions) {
   o.failureThreshold = failureThreshold
   o.initialDelay = initialDelay
   o.requestTimeout = requestTimeout
   o.successStatusCode = successStatusCode
   o.interval = interval
}

健康检查方式总结

对某一个给定地址 endpointAddress 进行健康检查的步骤和方式为:

  1. 先 sleep initialDelay 时间再开始健康检查:可能对方还在初始化过程中
  2. 每隔间隔时间 interval 时间发起一次健康检查
  3. 每次健康检查是向目标地址 endpointAddress 发起一个 HTTP GET 请求,超时时间为 requestTimeout
  4. 检查应答判断是否健康
    • 返回应答并且应答的状态码是 successStatusCode 则视为本地健康检查成功
    • 超时或者应答的状态码不是 successStatusCode 则视为本地健康检查失败
  5. 如果失败则开始累加计数器,然后间隔 interval 时间之后再次进行健康检查
  6. 如果多次失败,累计达到阈值 failureThreshold,报告为健康检查失败
  7. 只要单次成功,则清理之前的错误累计次数,报告为健康检查成功。

2 - server.go的源码学习

healthz server的实现

Dapr health package中的 server.go 文件的源码分析,healthz server的实现

代码实现

Health server

healthz server 的接口定义:

// Server is the interface for the healthz server
type Server interface {
	Run(context.Context, int) error
	Ready()
	NotReady()
}

server 结构体,ready 字段保存状态:

type server struct {
	ready bool
	log   logger.Logger
}

创建 healthz server的方法:

// NewServer returns a new healthz server
func NewServer(log logger.Logger) Server {
   return &server{
      log: log,
   }
}

设置 ready 状态的两个方法:

// Ready sets a ready state for the endpoint handlers
func (s *server) Ready() {
   s.ready = true
}

// NotReady sets a not ready state for the endpoint handlers
func (s *server) NotReady() {
   s.ready = false
}

运行healthz server

Run 方法启动一个带有 healthz 端点的 http 服务器,端口通过参数 port 指定:

// Run starts a net/http server with a healthz endpoint
func (s *server) Run(ctx context.Context, port int) error {
   router := http.NewServeMux()
   router.Handle("/healthz", s.healthz())

   srv := &http.Server{
      Addr:    fmt.Sprintf(":%d", port),
      Handler: router,
   }
   ...
}

启动之后:

   doneCh := make(chan struct{})

   go func() {
      select {
      case <-ctx.Done():
         s.log.Info("Healthz server is shutting down")
         shutdownCtx, cancel := context.WithTimeout(
            context.Background(),
            time.Second*5,
         )
         defer cancel()
         srv.Shutdown(shutdownCtx) // nolint: errcheck
      case <-doneCh:
      }
   }()

   s.log.Infof("Healthz server is listening on %s", srv.Addr)
   err := srv.ListenAndServe()
   if err != http.ErrServerClosed {
      s.log.Errorf("Healthz server error: %s", err)
   }
   close(doneCh)
   return err
}

healthz server 处理请求

healthz() 方法是 health endpoint 的 handler,根据当前 healthz server 的 ready 字段的状态值返回 HTTP 状态码:

// healthz is a health endpoint handler
func (s *server) healthz() http.Handler {
   return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
      var status int
      if s.ready {
      	// ready 返回 200
         status = http.StatusOK
      } else {
         // 不 ready 则返回 503
         status = http.StatusServiceUnavailable
      }
      w.WriteHeader(status)
   })
}

使用场景

healthz server 在 injector / placement / sentry / operator 中都有使用,这些进程都是在 main 方法中启动 healthz server。

injector

injector 启动在 8080 端口:

const (
	healthzPort = 8080
)

func main() {
   ......
	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		healthzErr := healthzServer.Run(ctx, healthzPort)
		if healthzErr != nil {
			log.Fatalf("failed to start healthz server: %s", healthzErr)
		}
	}()
	......
}

placement

placement 默认启动在 8080 端口(也可以通过命令行参数修改端口):

const (
	defaultHealthzPort       = 8080
)

func main() {
	flag.IntVar(&cfg.healthzPort, "healthz-port", cfg.healthzPort, "sets the HTTP port for the healthz server")
   ......
	go startHealthzServer(cfg.healthzPort)
	......
}

func startHealthzServer(healthzPort int) {
	healthzServer := health.NewServer(log)
	healthzServer.Ready()

	if err := healthzServer.Run(context.Background(), healthzPort); err != nil {
		log.Fatalf("failed to start healthz server: %s", err)
	}
}

sentry

sentry 启动在 8080 端口:

const (
	healthzPort = 8080
)

func main() {
   ......
	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		err := healthzServer.Run(ctx, healthzPort)
		if err != nil {
			log.Fatalf("failed to start healthz server: %s", err)
		}
	}()
	......
}

operator

operator 启动在 8080 端口:

const (
	healthzPort = 8080
)

func main() {
   ......
	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		err := healthzServer.Run(ctx, healthzPort)
		if err != nil {
			log.Fatalf("failed to start healthz server: %s", err)
		}
	}()
	......
}

darpd

特别指出:daprd 没有使用 healthz server,daprd 是直接在 dapr HTTP api 的基础上增加了 healthz 的功能。

具体代码在 http/api.go 中:

func NewAPI(......
   api.endpoints = append(api.endpoints, api.constructHealthzEndpoints()...)
	return api
}

func (a *api) constructHealthzEndpoints() []Endpoint {
   return []Endpoint{
      {
         Methods: []string{fasthttp.MethodGet},
         Route:   "healthz",
         Version: apiVersionV1,
         Handler: a.onGetHealthz,
      },
   }
}

onGetHealthz() 方法处理请求:

func (a *api) onGetHealthz(reqCtx *fasthttp.RequestCtx) {
   if !a.readyStatus {
      msg := NewErrorResponse("ERR_HEALTH_NOT_READY", messages.ErrHealthNotReady)
      respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
      log.Debug(msg)
   } else {
      respondEmpty(reqCtx)
   }
}

func respondEmpty(ctx *fasthttp.RequestCtx) {
	ctx.Response.SetBody(nil)
	ctx.Response.SetStatusCode(fasthttp.StatusNoContent)
}

注意:这里成功时返回的状态码是 204 StatusNoContent,而不是通常的 200 OK。