health.go的源码学习
health checking的客户端实现
Dapr health package中的 health.go 文件的源码分析,health checking的客户端实现
代码实现
Option 方法定义
// Option is an a function that applies a health check option
type Option func(o *healthCheckOptions)
healthCheckOptions 结构体定义
healthCheckOptions 结构体
type healthCheckOptions struct {
initialDelay time.Duration
requestTimeout time.Duration
failureThreshold int
interval time.Duration
successStatusCode int
}
With系列方法
WithXxx 方法用来设置上述5个健康检查的选项,每个方法都返回一个 Option 函数:
// WithInitialDelay sets the initial delay for the health check
func WithInitialDelay(delay time.Duration) Option {
return func(o *healthCheckOptions) {
o.initialDelay = delay
}
}
// WithFailureThreshold sets the failure threshold for the health check
func WithFailureThreshold(threshold int) Option {
return func(o *healthCheckOptions) {
o.failureThreshold = threshold
}
}
// WithRequestTimeout sets the request timeout for the health check
func WithRequestTimeout(timeout time.Duration) Option {
return func(o *healthCheckOptions) {
o.requestTimeout = timeout
}
}
// WithSuccessStatusCode sets the status code for the health check
func WithSuccessStatusCode(code int) Option {
return func(o *healthCheckOptions) {
o.successStatusCode = code
}
}
// WithInterval sets the interval for the health check
func WithInterval(interval time.Duration) Option {
return func(o *healthCheckOptions) {
o.interval = interval
}
}
StartEndpointHealthCheck 方法
StartEndpointHealthCheck 方法用给定的选项在指定的地址上启动健康检查。它返回一个通道,如果端点是健康的则发出true,如果满足失败条件则发出false。
// StartEndpointHealthCheck starts a health check on the specified address with the given options.
// It returns a channel that will emit true if the endpoint is healthy and false if the failure conditions
// Have been met.
func StartEndpointHealthCheck(endpointAddress string, opts ...Option) chan bool {
options := &healthCheckOptions{}
applyDefaults(options)
// 执行每个 Option 函数来设置健康检查的选项
for _, o := range opts {
o(options)
}
signalChan := make(chan bool, 1)
go func(ch chan<- bool, endpointAddress string, options *healthCheckOptions) {
// 设置健康检查的间隔时间 interval,默认5秒一次
ticker := time.NewTicker(options.interval)
failureCount := 0
// 先 sleep initialDelay 时间再开始健康检查
time.Sleep(options.initialDelay)
// 创建 http client,设置请求超时时间为 requestTimeout
client := &fasthttp.Client{
MaxConnsPerHost: 5, // Limit Keep-Alive connections
ReadTimeout: options.requestTimeout,
MaxIdemponentCallAttempts: 1,
}
req := fasthttp.AcquireRequest()
req.SetRequestURI(endpointAddress)
req.Header.SetMethod(fasthttp.MethodGet)
defer fasthttp.ReleaseRequest(req)
for range ticker.C {
resp := fasthttp.AcquireResponse()
err := client.DoTimeout(req, resp, options.requestTimeout)
// 通过检查应答的状态码来判断健康检查是否成功: successStatusCode
if err != nil || resp.StatusCode() != options.successStatusCode {
// 健康检查失败,错误计数器加一
failureCount++
// 如果连续错误次数达到阈值 failureThreshold,则视为健康检查失败,发送false到channel
if failureCount == options.failureThreshold {
ch <- false
}
} else {
// 健康检查成功,发送 true 到 channel
ch <- true
// 同时重制 failureCount
failureCount = 0
}
fasthttp.ReleaseResponse(resp)
}
}(signalChan, endpointAddress, options)
return signalChan
}
applyDefaults() 方法设置默认属性:
const (
initialDelay = time.Second * 1
failureThreshold = 2
requestTimeout = time.Second * 2
interval = time.Second * 5
successStatusCode = 200
)
func applyDefaults(o *healthCheckOptions) {
o.failureThreshold = failureThreshold
o.initialDelay = initialDelay
o.requestTimeout = requestTimeout
o.successStatusCode = successStatusCode
o.interval = interval
}
健康检查方式总结
对某一个给定地址 endpointAddress 进行健康检查的步骤和方式为:
- 先 sleep initialDelay 时间再开始健康检查:可能对方还在初始化过程中
- 每隔间隔时间 interval 时间发起一次健康检查
- 每次健康检查是向目标地址 endpointAddress 发起一个 HTTP GET 请求,超时时间为 requestTimeout
- 检查应答判断是否健康
- 返回应答并且应答的状态码是 successStatusCode 则视为本地健康检查成功
- 超时或者应答的状态码不是 successStatusCode 则视为本地健康检查失败
- 如果失败则开始累加计数器,然后间隔 interval 时间之后再次进行健康检查
- 如果多次失败,累计达到阈值 failureThreshold,报告为健康检查失败
- 只要单次成功,则清理之前的错误累计次数,报告为健康检查成功。