1 - health.go的源码学习
Dapr health package中的 health.go 文件的源码分析,health checking的客户端实现
代码实现
Option 方法定义
// Option is an a function that applies a health check option
type Option func(o *healthCheckOptions)
healthCheckOptions 结构体定义
healthCheckOptions 结构体
type healthCheckOptions struct {
initialDelay time.Duration
requestTimeout time.Duration
failureThreshold int
interval time.Duration
successStatusCode int
}
With系列方法
WithXxx 方法用来设置上述5个健康检查的选项,每个方法都返回一个 Option 函数:
// WithInitialDelay sets the initial delay for the health check
func WithInitialDelay(delay time.Duration) Option {
return func(o *healthCheckOptions) {
o.initialDelay = delay
}
}
// WithFailureThreshold sets the failure threshold for the health check
func WithFailureThreshold(threshold int) Option {
return func(o *healthCheckOptions) {
o.failureThreshold = threshold
}
}
// WithRequestTimeout sets the request timeout for the health check
func WithRequestTimeout(timeout time.Duration) Option {
return func(o *healthCheckOptions) {
o.requestTimeout = timeout
}
}
// WithSuccessStatusCode sets the status code for the health check
func WithSuccessStatusCode(code int) Option {
return func(o *healthCheckOptions) {
o.successStatusCode = code
}
}
// WithInterval sets the interval for the health check
func WithInterval(interval time.Duration) Option {
return func(o *healthCheckOptions) {
o.interval = interval
}
}
StartEndpointHealthCheck 方法
StartEndpointHealthCheck 方法用给定的选项在指定的地址上启动健康检查。它返回一个通道,如果端点是健康的则发出true,如果满足失败条件则发出false。
// StartEndpointHealthCheck starts a health check on the specified address with the given options.
// It returns a channel that will emit true if the endpoint is healthy and false if the failure conditions
// Have been met.
func StartEndpointHealthCheck(endpointAddress string, opts ...Option) chan bool {
options := &healthCheckOptions{}
applyDefaults(options)
// 执行每个 Option 函数来设置健康检查的选项
for _, o := range opts {
o(options)
}
signalChan := make(chan bool, 1)
go func(ch chan<- bool, endpointAddress string, options *healthCheckOptions) {
// 设置健康检查的间隔时间 interval,默认5秒一次
ticker := time.NewTicker(options.interval)
failureCount := 0
// 先 sleep initialDelay 时间再开始健康检查
time.Sleep(options.initialDelay)
// 创建 http client,设置请求超时时间为 requestTimeout
client := &fasthttp.Client{
MaxConnsPerHost: 5, // Limit Keep-Alive connections
ReadTimeout: options.requestTimeout,
MaxIdemponentCallAttempts: 1,
}
req := fasthttp.AcquireRequest()
req.SetRequestURI(endpointAddress)
req.Header.SetMethod(fasthttp.MethodGet)
defer fasthttp.ReleaseRequest(req)
for range ticker.C {
resp := fasthttp.AcquireResponse()
err := client.DoTimeout(req, resp, options.requestTimeout)
// 通过检查应答的状态码来判断健康检查是否成功: successStatusCode
if err != nil || resp.StatusCode() != options.successStatusCode {
// 健康检查失败,错误计数器加一
failureCount++
// 如果连续错误次数达到阈值 failureThreshold,则视为健康检查失败,发送false到channel
if failureCount == options.failureThreshold {
ch <- false
}
} else {
// 健康检查成功,发送 true 到 channel
ch <- true
// 同时重制 failureCount
failureCount = 0
}
fasthttp.ReleaseResponse(resp)
}
}(signalChan, endpointAddress, options)
return signalChan
}
applyDefaults() 方法设置默认属性:
const (
initialDelay = time.Second * 1
failureThreshold = 2
requestTimeout = time.Second * 2
interval = time.Second * 5
successStatusCode = 200
)
func applyDefaults(o *healthCheckOptions) {
o.failureThreshold = failureThreshold
o.initialDelay = initialDelay
o.requestTimeout = requestTimeout
o.successStatusCode = successStatusCode
o.interval = interval
}
健康检查方式总结
对某一个给定地址 endpointAddress 进行健康检查的步骤和方式为:
- 先 sleep initialDelay 时间再开始健康检查:可能对方还在初始化过程中
- 每隔间隔时间 interval 时间发起一次健康检查
- 每次健康检查是向目标地址 endpointAddress 发起一个 HTTP GET 请求,超时时间为 requestTimeout
- 检查应答判断是否健康
- 返回应答并且应答的状态码是 successStatusCode 则视为本地健康检查成功
- 超时或者应答的状态码不是 successStatusCode 则视为本地健康检查失败
- 如果失败则开始累加计数器,然后间隔 interval 时间之后再次进行健康检查
- 如果多次失败,累计达到阈值 failureThreshold,报告为健康检查失败
- 只要单次成功,则清理之前的错误累计次数,报告为健康检查成功。
2 - server.go的源码学习
Dapr health package中的 server.go 文件的源码分析,healthz server的实现
代码实现
Health server
healthz server 的接口定义:
// Server is the interface for the healthz server
type Server interface {
Run(context.Context, int) error
Ready()
NotReady()
}
server 结构体,ready 字段保存状态:
type server struct {
ready bool
log logger.Logger
}
创建 healthz server的方法:
// NewServer returns a new healthz server
func NewServer(log logger.Logger) Server {
return &server{
log: log,
}
}
设置 ready 状态的两个方法:
// Ready sets a ready state for the endpoint handlers
func (s *server) Ready() {
s.ready = true
}
// NotReady sets a not ready state for the endpoint handlers
func (s *server) NotReady() {
s.ready = false
}
运行healthz server
Run 方法启动一个带有 healthz 端点的 http 服务器,端口通过参数 port 指定:
// Run starts a net/http server with a healthz endpoint
func (s *server) Run(ctx context.Context, port int) error {
router := http.NewServeMux()
router.Handle("/healthz", s.healthz())
srv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: router,
}
...
}
启动之后:
doneCh := make(chan struct{})
go func() {
select {
case <-ctx.Done():
s.log.Info("Healthz server is shutting down")
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Second*5,
)
defer cancel()
srv.Shutdown(shutdownCtx) // nolint: errcheck
case <-doneCh:
}
}()
s.log.Infof("Healthz server is listening on %s", srv.Addr)
err := srv.ListenAndServe()
if err != http.ErrServerClosed {
s.log.Errorf("Healthz server error: %s", err)
}
close(doneCh)
return err
}
healthz server 处理请求
healthz() 方法是 health endpoint 的 handler,根据当前 healthz server 的 ready 字段的状态值返回 HTTP 状态码:
// healthz is a health endpoint handler
func (s *server) healthz() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var status int
if s.ready {
// ready 返回 200
status = http.StatusOK
} else {
// 不 ready 则返回 503
status = http.StatusServiceUnavailable
}
w.WriteHeader(status)
})
}
使用场景
healthz server 在 injector / placement / sentry / operator 中都有使用,这些进程都是在 main 方法中启动 healthz server。
injector
injector 启动在 8080 端口:
const (
healthzPort = 8080
)
func main() {
......
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
healthzErr := healthzServer.Run(ctx, healthzPort)
if healthzErr != nil {
log.Fatalf("failed to start healthz server: %s", healthzErr)
}
}()
......
}
placement
placement 默认启动在 8080 端口(也可以通过命令行参数修改端口):
const (
defaultHealthzPort = 8080
)
func main() {
flag.IntVar(&cfg.healthzPort, "healthz-port", cfg.healthzPort, "sets the HTTP port for the healthz server")
......
go startHealthzServer(cfg.healthzPort)
......
}
func startHealthzServer(healthzPort int) {
healthzServer := health.NewServer(log)
healthzServer.Ready()
if err := healthzServer.Run(context.Background(), healthzPort); err != nil {
log.Fatalf("failed to start healthz server: %s", err)
}
}
sentry
sentry 启动在 8080 端口:
const (
healthzPort = 8080
)
func main() {
......
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
err := healthzServer.Run(ctx, healthzPort)
if err != nil {
log.Fatalf("failed to start healthz server: %s", err)
}
}()
......
}
operator
operator 启动在 8080 端口:
const (
healthzPort = 8080
)
func main() {
......
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
err := healthzServer.Run(ctx, healthzPort)
if err != nil {
log.Fatalf("failed to start healthz server: %s", err)
}
}()
......
}
darpd
特别指出:daprd 没有使用 healthz server,daprd 是直接在 dapr HTTP api 的基础上增加了 healthz 的功能。
具体代码在 http/api.go 中:
func NewAPI(......
api.endpoints = append(api.endpoints, api.constructHealthzEndpoints()...)
return api
}
func (a *api) constructHealthzEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{fasthttp.MethodGet},
Route: "healthz",
Version: apiVersionV1,
Handler: a.onGetHealthz,
},
}
}
onGetHealthz() 方法处理请求:
func (a *api) onGetHealthz(reqCtx *fasthttp.RequestCtx) {
if !a.readyStatus {
msg := NewErrorResponse("ERR_HEALTH_NOT_READY", messages.ErrHealthNotReady)
respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
log.Debug(msg)
} else {
respondEmpty(reqCtx)
}
}
func respondEmpty(ctx *fasthttp.RequestCtx) {
ctx.Response.SetBody(nil)
ctx.Response.SetStatusCode(fasthttp.StatusNoContent)
}
注意:这里成功时返回的状态码是 204 StatusNoContent,而不是通常的 200 OK。