sentry代码

sentry 模块的主要实现在文件 pkg/sentry/sentry.go 中。

定义

定义 CA 接口

type CertificateAuthority interface {
	Start(context.Context, config.SentryConfig) error
	Stop()
	Restart(context.Context, config.SentryConfig) error
}

start 和 restart 的函数定义是一样的。

定义 sentry 结构体

type sentry struct {
	conf        config.SentryConfig    // sentry的配置,启动时由 main 函数初始化后传入
	ctx         context.Context				 // 启动时由 main 函数初始化后传入
	cancel      context.CancelFunc     
	server      server.CAServer        // CA server
	restartLock sync.Mutex             // 用于 restart 的锁
	running     chan bool
	stopping    chan bool
}

主流程

Sentry.go 被 sentry main.go 调用,主要工作流程就是三个事情:

// 1. 初始化
ca := sentry.NewSentryCA()
// 2. 启动
err = ca.Start(runCtx, config)
// 3. 在需要时重启
innerErr := ca.Restart(runCtx, config)

备注:sentry main.go 没有调用 sentry的 stop(),这个 stop() 只在 restart() 方法中被调用。

初始化 sentry

NewSentryCA() 的实现:

// NewSentryCA returns a new Sentry Certificate Authority instance.
func NewSentryCA() CertificateAuthority {
	return &sentry{
		running: make(chan bool, 1),
	}
}

什么都没干,只是初始化了 running 这个channel。

启动 sentry

// Start the server in background.
func (s *sentry) Start(ctx context.Context, conf config.SentryConfig) error {
	// If the server is already running, return an error
	select {
	case s.running <- true:
	default:
		return errors.New("CertificateAuthority server is already running")
	}

	// Create the CA server
	s.conf = conf
	certAuth, v := s.createCAServer()

	// Start the server in background
	s.ctx, s.cancel = context.WithCancel(ctx)
	go s.run(certAuth, v)

	// Wait 100ms to ensure a clean startup
	time.Sleep(100 * time.Millisecond)

	return nil
}

主要工作就是创建 CA server,然后运行服务。

创建 ca server

createCAServer() 方法加载信任锚和签发者证书,然后创建一个新的CA:

// Loads the trust anchors and issuer certs, then creates a new CA.
func (s *sentry) createCAServer() (ca.CertificateAuthority, identity.Validator) {
	// Create CA
	certAuth, authorityErr := ca.NewCertificateAuthority(s.conf)
	if authorityErr != nil {
		log.Fatalf("error getting certificate authority: %s", authorityErr)
	}
	log.Info("certificate authority loaded")

	// Load the trust bundle
	trustStoreErr := certAuth.LoadOrStoreTrustBundle()
	if trustStoreErr != nil {
		log.Fatalf("error loading trust root bundle: %s", trustStoreErr)
	}
	certExpiry := certAuth.GetCACertBundle().GetIssuerCertExpiry()
	if certExpiry == nil {
		log.Fatalf("error loading trust root bundle: missing certificate expiry")
	} else {
		// Need to be in an else block for the linter
		log.Infof("trust root bundle loaded. issuer cert expiry: %s", certExpiry.String())
	}
	monitoring.IssuerCertExpiry(certExpiry)

	// Create identity validator
	v, validatorErr := s.createValidator()
	if validatorErr != nil {
		log.Fatalf("error creating validator: %s", validatorErr)
	}
	log.Info("validator created")

	return certAuth, v
}

方法返回 ca.CertificateAuthority 和 identity.Validator 。

创建 identity.Validator

createValidator 的实现细节:

func (s *sentry) createValidator() (identity.Validator, error) {
	if config.IsKubernetesHosted() {  // 通过 KUBERNETES_SERVICE_HOST 环境变量来判断
		// we're in Kubernetes, create client and init a new serviceaccount token validator
		kubeClient, err := k8s.GetClient()
		if err != nil {
			return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
		}

		// TODO: Remove once the NoDefaultTokenAudience feature is finalized
		noDefaultTokenAudience := false

    // 创建 kubernetes 的 Validator
		return kubernetes.NewValidator(kubeClient, s.conf.GetTokenAudiences(), noDefaultTokenAudience), nil
	}
  
  // 创建 selfhosted 的 Validator
	return selfhosted.NewValidator(), nil
}

运行 sentry

run 方法运行 CA server,阻塞直到服务器关闭:

// Runs the CA server.
// This method blocks until the server is shut down.
func (s *sentry) run(certAuth ca.CertificateAuthority, v identity.Validator) {
	s.server = server.NewCAServer(certAuth, v)

	// In background, watch for the root certificate's expiration
	go watchCertExpiry(s.ctx, certAuth)

	// Watch for context cancelation to stop the server
	go func() {
		<-s.ctx.Done()
		s.server.Shutdown()
		close(s.running)
		s.running = make(chan bool, 1)
		if s.stopping != nil {
			close(s.stopping)
		}
	}()

	// Start the server; this is a blocking call
	log.Infof("sentry certificate authority is running, protecting y'all")
	serverRunErr := s.server.Run(s.conf.Port, certAuth.GetCACertBundle())
	if serverRunErr != nil {
		log.Fatalf("error starting gRPC server: %s", serverRunErr)
	}
}

启动 ca 的 grpc server 以便接收外部请求。

监控证书过期

Run() 方法中启动了一个 goroutine,用于监控证书是否过期。如果快要过期了,则会显示错误信息。

// Watches certificates' expiry and shows an error message when they're nearing expiration time.
// This is a blocking method that should be run in its own goroutine.
func watchCertExpiry(ctx context.Context, certAuth ca.CertificateAuthority) {
	log.Debug("starting root certificate expiration watcher")
  // time 是每小时触发一次
	certExpiryCheckTicker := time.NewTicker(time.Hour)
	for {
		select {
		case <-certExpiryCheckTicker.C:
			caCrt := certAuth.GetCACertBundle().GetRootCertPem()
			block, _ := pem.Decode(caCrt)
			cert, certParseErr := x509.ParseCertificate(block.Bytes)
			if certParseErr != nil {
				log.Warn("could not determine Dapr root certificate expiration time")
				break
			}
			if cert.NotAfter.Before(time.Now().UTC()) {
        // 已经过期则报警
				log.Warn("Dapr root certificate expiration warning: certificate has expired.")
				break
			}
			if (cert.NotAfter.Add(-30 * 24 * time.Hour)).Before(time.Now().UTC()) {
        // 有效期不足30天也报警
				expiryDurationHours := int(cert.NotAfter.Sub(time.Now().UTC()).Hours())
				log.Warnf("Dapr root certificate expiration warning: certificate expires in %d days and %d hours", expiryDurationHours/24, expiryDurationHours%24)
			} else {
				validity := cert.NotAfter.Sub(time.Now().UTC())
				log.Debugf("Dapr root certificate is still valid for %s", validity.String())
			}
		case <-ctx.Done():
			log.Debug("terminating root certificate expiration watcher")
			certExpiryCheckTicker.Stop()
			return
		}
	}
}

停止 sentry

// Stop the server.
func (s *sentry) Stop() {
	log.Info("sentry certificate authority is shutting down")
	if s.cancel != nil {
		s.stopping = make(chan bool)
		s.cancel()
		<-s.stopping
		s.stopping = nil
	}
}

重启 sentry

Restart() 方法重启 sentry:

func (s *sentry) Restart(ctx context.Context, conf config.SentryConfig) error {
	s.restartLock.Lock()
	defer s.restartLock.Unlock()
	log.Info("sentry certificate authority is restarting")
	s.Stop()
	// Wait 200ms to ensure a clean shutdown
	time.Sleep(200 * time.Millisecond)
	return s.Start(ctx, conf)
}

步骤:

  • 先加锁
  • 停止 sentry
  • sleep 200 毫秒
  • 再启动 sentry