1 - sentry的main函数入口

sentry 模块的入口在文件 cmd/sentry/main.go 中。

准备工作

读取命令行参数

const (
	defaultCredentialsPath = "/var/run/dapr/credentials"
	// defaultDaprSystemConfigName is the default resource object name for Dapr System Config.
	defaultDaprSystemConfigName = "daprsystem"

	healthzPort = 8080
)

func main() {
	configName := flag.String("config", defaultDaprSystemConfigName, "Path to config file, or name of a configuration object")
	credsPath := flag.String("issuer-credentials", defaultCredentialsPath, "Path to the credentials directory holding the issuer data")
	flag.StringVar(&credentials.RootCertFilename, "issuer-ca-filename", credentials.RootCertFilename, "Certificate Authority certificate filename")
	flag.StringVar(&credentials.IssuerCertFilename, "issuer-certificate-filename", credentials.IssuerCertFilename, "Issuer certificate filename")
	flag.StringVar(&credentials.IssuerKeyFilename, "issuer-key-filename", credentials.IssuerKeyFilename, "Issuer private key filename")
	trustDomain := flag.String("trust-domain", "localhost", "The CA trust domain")
	tokenAudience := flag.String("token-audience", "", "Expected audience for tokens; multiple values can be separated by a comma")
......
}

logger  和 metrics 的参数需要展开:

	loggerOptions := logger.DefaultOptions()
	loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)

	metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
	metricsExporter.Options().AttachCmdFlags(flag.StringVar, flag.BoolVar)

获取 k8s 的 配置文件路径:

	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
    // 读取 home 路径
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
    // 通过 `--kubeconfig` 传递完整的 kubeconfig 文件路径
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}

最后解析一把:

flag.Parse()

设置环境变量

将 kubeconfig 的值设置到 KUBE_CONFIG 环境变量:

var (
	KubeConfigVar = "KUBE_CONFIG"
)

if err := utils.SetEnvVariables(map[string]string{
		utils.KubeConfigVar: *kubeconfig,
	}); err != nil {
		log.Fatalf("error set env failed:  %s", err.Error())
	}

初始化

这行日志标记着初始化正式开始:

	log.Infof("starting sentry certificate authority -- version %s -- commit %s", buildinfo.Version(), buildinfo.Commit())
	log.Infof("log level set to: %s", loggerOptions.OutputLevel)

初始化metrics

// Initialize dapr metrics exporter
	if err := metricsExporter.Init(); err != nil {
		log.Fatal(err)
	}

初始化监控

	if err := monitoring.InitMetrics(); err != nil {
		log.Fatal(err)
	}

读取配置

  // 拼凑文件路径
  issuerCertPath := filepath.Join(*credsPath, credentials.IssuerCertFilename) //issuer.crt
	issuerKeyPath := filepath.Join(*credsPath, credentials.IssuerKeyFilename)   // issuer.key
	rootCertPath := filepath.Join(*credsPath, credentials.RootCertFilename)     // ca.crt

  // 读取 sentry 配置:
  config, err := config.FromConfigName(*configName)
	if err != nil {
		log.Warn(err)
	}

  // 保存证书相关的各个路径和参数
	config.IssuerCertPath = issuerCertPath
	config.IssuerKeyPath = issuerKeyPath
	config.RootCertPath = rootCertPath
	config.TrustDomain = *trustDomain
	if *tokenAudience != "" {
		config.TokenAudience = tokenAudience
	}

启动服务

启动sentry server

	ca := sentry.NewSentryCA()

	// Start the server in background
	err = ca.Start(runCtx, config)
	if err != nil {
		log.Fatalf("failed to restart sentry server: %s", err)
	}

启动 health server

	log.Infof("starting watch on filesystem directory: %s", watchDir)

// Start the health server in background
	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		if innerErr := healthzServer.Run(runCtx, healthzPort); innerErr != nil {
			log.Fatalf("failed to start healthz server: %s", innerErr)
		}
	}()

监控目录变化

  issuerEvent := make(chan struct{})
  watchDir := filepath.Dir(config.IssuerCertPath)

  // Watch for changes in the watchDir
	// This also blocks until runCtx is canceled
	fswatcher.Watch(runCtx, watchDir, issuerEvent)

这个函数会一直阻塞直到 runCtx 被取消(这意味着要退出 sentry 进程)。

如果有文件更新,则 issuerEvent 会收到 event,issuerEvent 相关的处理代码:

	go func() {
		// Restart the server when the issuer credentials change
		var restart <-chan time.Time
		for {
			select {
			case <-issuerEvent:
				monitoring.IssuerCertChanged()
				log.Debug("received issuer credentials changed signal")
				// Batch all signals within 2s of each other
				if restart == nil {
          // issuerEvent 不会被直接处理,而是安排在 2 秒发一个 restart event
          // 2秒之内的各种 issuerEvent 都会被这个 restart event 集中处理
					restart = time.After(2 * time.Second)
				}
			case <-restart:
        // 收到 restart,意味着 issuerEvent 已经积攒了 2 秒钟,可以统一处理了
				log.Warn("issuer credentials changed; reloading")
				innerErr := ca.Restart(runCtx, config)
				if innerErr != nil {
					log.Fatalf("failed to restart sentry server: %s", innerErr)
				}
        // 重置 restart,恢复原样,以便处理 2 秒之后的后续 issuerEvent
				restart = nil
			}
		}
	}()

退出

	shutdownDuration := 5 * time.Second
	log.Infof("allowing %s for graceful shutdown to complete", shutdownDuration)
	<-time.After(shutdownDuration)

总结

去除非核心代码,sentry main 函数的主要功能是启动 sentry 的 ca server, 并监控目录,如果有变化则重启 ca server。

2 - sentry的Proto定义

proto服务定义

sentry 模块的 proto 服务定义在文件 dapr/proto/sentry/v1/sentry.proto 中。

service CA {
  // A request for a time-bound certificate to be signed.
  //
  // The requesting side must provide an id for both loosely based
  // And strong based identities.
  rpc SignCertificate (SignCertificateRequest) returns (SignCertificateResponse) {}
}

SignCertificate() 方法要求签署一个有时间限制的证书。请求方必须提供一个可以同时用于松散型身份和强势型身份的ID。

SignCertificateRequest 的定义:

message SignCertificateRequest {
  string id = 1;
  string token = 2;
  string trust_domain = 3;
  string namespace = 4;
  // A PEM-encoded x509 CSR.
  bytes certificate_signing_request = 5;
}

SignCertificateResponse 的定义:

message SignCertificateResponse {
  // A PEM-encoded x509 Certificate.
  bytes workload_certificate = 1;

  // A list of PEM-encoded x509 Certificates that establish the trust chain
  // between the workload certificate and the well-known trust root cert.
  repeated bytes trust_chain_certificates = 2;

  google.protobuf.Timestamp valid_until = 3;
}

trust_chain_certificates 是一个 PEM 编码的 x509 证书的列表,这些证书在 workload_certificate 和众所周知的信任根证书(trust root cert)之间建立信任链。

3 - sentry代码

sentry 模块的主要实现在文件 pkg/sentry/sentry.go 中。

定义

定义 CA 接口

type CertificateAuthority interface {
	Start(context.Context, config.SentryConfig) error
	Stop()
	Restart(context.Context, config.SentryConfig) error
}

start 和 restart 的函数定义是一样的。

定义 sentry 结构体

type sentry struct {
	conf        config.SentryConfig    // sentry的配置,启动时由 main 函数初始化后传入
	ctx         context.Context				 // 启动时由 main 函数初始化后传入
	cancel      context.CancelFunc     
	server      server.CAServer        // CA server
	restartLock sync.Mutex             // 用于 restart 的锁
	running     chan bool
	stopping    chan bool
}

主流程

Sentry.go 被 sentry main.go 调用,主要工作流程就是三个事情:

// 1. 初始化
ca := sentry.NewSentryCA()
// 2. 启动
err = ca.Start(runCtx, config)
// 3. 在需要时重启
innerErr := ca.Restart(runCtx, config)

备注:sentry main.go 没有调用 sentry的 stop(),这个 stop() 只在 restart() 方法中被调用。

初始化 sentry

NewSentryCA() 的实现:

// NewSentryCA returns a new Sentry Certificate Authority instance.
func NewSentryCA() CertificateAuthority {
	return &sentry{
		running: make(chan bool, 1),
	}
}

什么都没干,只是初始化了 running 这个channel。

启动 sentry

// Start the server in background.
func (s *sentry) Start(ctx context.Context, conf config.SentryConfig) error {
	// If the server is already running, return an error
	select {
	case s.running <- true:
	default:
		return errors.New("CertificateAuthority server is already running")
	}

	// Create the CA server
	s.conf = conf
	certAuth, v := s.createCAServer()

	// Start the server in background
	s.ctx, s.cancel = context.WithCancel(ctx)
	go s.run(certAuth, v)

	// Wait 100ms to ensure a clean startup
	time.Sleep(100 * time.Millisecond)

	return nil
}

主要工作就是创建 CA server,然后运行服务。

创建 ca server

createCAServer() 方法加载信任锚和签发者证书,然后创建一个新的CA:

// Loads the trust anchors and issuer certs, then creates a new CA.
func (s *sentry) createCAServer() (ca.CertificateAuthority, identity.Validator) {
	// Create CA
	certAuth, authorityErr := ca.NewCertificateAuthority(s.conf)
	if authorityErr != nil {
		log.Fatalf("error getting certificate authority: %s", authorityErr)
	}
	log.Info("certificate authority loaded")

	// Load the trust bundle
	trustStoreErr := certAuth.LoadOrStoreTrustBundle()
	if trustStoreErr != nil {
		log.Fatalf("error loading trust root bundle: %s", trustStoreErr)
	}
	certExpiry := certAuth.GetCACertBundle().GetIssuerCertExpiry()
	if certExpiry == nil {
		log.Fatalf("error loading trust root bundle: missing certificate expiry")
	} else {
		// Need to be in an else block for the linter
		log.Infof("trust root bundle loaded. issuer cert expiry: %s", certExpiry.String())
	}
	monitoring.IssuerCertExpiry(certExpiry)

	// Create identity validator
	v, validatorErr := s.createValidator()
	if validatorErr != nil {
		log.Fatalf("error creating validator: %s", validatorErr)
	}
	log.Info("validator created")

	return certAuth, v
}

方法返回 ca.CertificateAuthority 和 identity.Validator 。

创建 identity.Validator

createValidator 的实现细节:

func (s *sentry) createValidator() (identity.Validator, error) {
	if config.IsKubernetesHosted() {  // 通过 KUBERNETES_SERVICE_HOST 环境变量来判断
		// we're in Kubernetes, create client and init a new serviceaccount token validator
		kubeClient, err := k8s.GetClient()
		if err != nil {
			return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
		}

		// TODO: Remove once the NoDefaultTokenAudience feature is finalized
		noDefaultTokenAudience := false

    // 创建 kubernetes 的 Validator
		return kubernetes.NewValidator(kubeClient, s.conf.GetTokenAudiences(), noDefaultTokenAudience), nil
	}
  
  // 创建 selfhosted 的 Validator
	return selfhosted.NewValidator(), nil
}

运行 sentry

run 方法运行 CA server,阻塞直到服务器关闭:

// Runs the CA server.
// This method blocks until the server is shut down.
func (s *sentry) run(certAuth ca.CertificateAuthority, v identity.Validator) {
	s.server = server.NewCAServer(certAuth, v)

	// In background, watch for the root certificate's expiration
	go watchCertExpiry(s.ctx, certAuth)

	// Watch for context cancelation to stop the server
	go func() {
		<-s.ctx.Done()
		s.server.Shutdown()
		close(s.running)
		s.running = make(chan bool, 1)
		if s.stopping != nil {
			close(s.stopping)
		}
	}()

	// Start the server; this is a blocking call
	log.Infof("sentry certificate authority is running, protecting y'all")
	serverRunErr := s.server.Run(s.conf.Port, certAuth.GetCACertBundle())
	if serverRunErr != nil {
		log.Fatalf("error starting gRPC server: %s", serverRunErr)
	}
}

启动 ca 的 grpc server 以便接收外部请求。

监控证书过期

Run() 方法中启动了一个 goroutine,用于监控证书是否过期。如果快要过期了,则会显示错误信息。

// Watches certificates' expiry and shows an error message when they're nearing expiration time.
// This is a blocking method that should be run in its own goroutine.
func watchCertExpiry(ctx context.Context, certAuth ca.CertificateAuthority) {
	log.Debug("starting root certificate expiration watcher")
  // time 是每小时触发一次
	certExpiryCheckTicker := time.NewTicker(time.Hour)
	for {
		select {
		case <-certExpiryCheckTicker.C:
			caCrt := certAuth.GetCACertBundle().GetRootCertPem()
			block, _ := pem.Decode(caCrt)
			cert, certParseErr := x509.ParseCertificate(block.Bytes)
			if certParseErr != nil {
				log.Warn("could not determine Dapr root certificate expiration time")
				break
			}
			if cert.NotAfter.Before(time.Now().UTC()) {
        // 已经过期则报警
				log.Warn("Dapr root certificate expiration warning: certificate has expired.")
				break
			}
			if (cert.NotAfter.Add(-30 * 24 * time.Hour)).Before(time.Now().UTC()) {
        // 有效期不足30天也报警
				expiryDurationHours := int(cert.NotAfter.Sub(time.Now().UTC()).Hours())
				log.Warnf("Dapr root certificate expiration warning: certificate expires in %d days and %d hours", expiryDurationHours/24, expiryDurationHours%24)
			} else {
				validity := cert.NotAfter.Sub(time.Now().UTC())
				log.Debugf("Dapr root certificate is still valid for %s", validity.String())
			}
		case <-ctx.Done():
			log.Debug("terminating root certificate expiration watcher")
			certExpiryCheckTicker.Stop()
			return
		}
	}
}

停止 sentry

// Stop the server.
func (s *sentry) Stop() {
	log.Info("sentry certificate authority is shutting down")
	if s.cancel != nil {
		s.stopping = make(chan bool)
		s.cancel()
		<-s.stopping
		s.stopping = nil
	}
}

重启 sentry

Restart() 方法重启 sentry:

func (s *sentry) Restart(ctx context.Context, conf config.SentryConfig) error {
	s.restartLock.Lock()
	defer s.restartLock.Unlock()
	log.Info("sentry certificate authority is restarting")
	s.Stop()
	// Wait 200ms to ensure a clean shutdown
	time.Sleep(200 * time.Millisecond)
	return s.Start(ctx, conf)
}

步骤:

  • 先加锁
  • 停止 sentry
  • sleep 200 毫秒
  • 再启动 sentry

4 - CA server代码

ca server 的实现在文件 pkg/sentry/server/server.go 中。

定义

CAServer 接口

// CAServer is an interface for the Certificate Authority server.
type CAServer interface {
	Run(port int, trustBundle ca.TrustRootBundler) error
	Shutdown()
}

server 结构体

type server struct {
	certificate *tls.Certificate
	certAuth    ca.CertificateAuthority
	srv         *grpc.Server    // grpc server,用来对外提供 grpc 服务
	validator   identity.Validator
}

主流程

server.go 被 sentry.go 调用,主要工作流程就是三个事情:

// 1. 初始化CA server
s.server = server.NewCAServer(certAuth, v)
// 2. 运行CA server
s.server.Run(s.conf.Port, certAuth.GetCACertBundle())
// 3. 在需要时关闭CA server
s.server.Shutdown()

初始化CA server

// NewCAServer returns a new CA Server running a gRPC server.
func NewCAServer(ca ca.CertificateAuthority, validator identity.Validator) CAServer {
	return &server{
		certAuth:  ca,
		validator: validator,
	}
}

保存传递进来的参数,这两个参数在 sentry.go 中初始化。

运行 CA server

CA server 主要提供两个功能:

  1. 基本的 grpc 服务:以便为客户端提供服务
  2. 安全:必须为提供的服务进行安全保护,因此客户端必须实用 trust root cert
// Run starts a secured gRPC server for the Sentry Certificate Authority.
// It enforces client side cert validation using the trust root cert.
func (s *server) Run(port int, trustBundler ca.TrustRootBundler) error {
	addr := fmt.Sprintf(":%d", port)
	lis, err := net.Listen("tcp", addr)
	if err != nil {
		return fmt.Errorf("could not listen on %s: %w", addr, err)
	}

	tlsOpt := s.tlsServerOption(trustBundler)
  // 创建 grpc server
	s.srv = grpc.NewServer(tlsOpt)
  // 注册 ca server 到 grpc server
	sentryv1pb.RegisterCAServer(s.srv, s)

  // 启动 grpc server 监听服务地址
	if err := s.srv.Serve(lis); err != nil {
		return fmt.Errorf("grpc serve error: %w", err)
	}
	return nil
}

trustBundler 是从 sentry.go 中传递过来,后面详细展开。

关闭 CA server

func (s *server) Shutdown() {
	if s.srv != nil {
    // 调用 grpc 的 GracefulStop,会在请求完成后再关闭
		s.srv.GracefulStop()
	}
}

客户端安全

tlsServerOption() 方法,为客户端连接准备 tls 相关的选项:

func (s *server) tlsServerOption(trustBundler ca.TrustRootBundler) grpc.ServerOption {
	cp := trustBundler.GetTrustAnchors()

	//nolint:gosec
	config := &tls.Config{
		ClientCAs: cp,
    // 这里要求验证客户端证书
		// Require cert verification
		ClientAuth: tls.RequireAndVerifyClientCert,
		GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
			if s.certificate == nil || needsRefresh(s.certificate, serverCertExpiryBuffer) {
        // 如果ca server的证书为空,或者需要刷新,则开始创建/刷新证书
				cert, err := s.getServerCertificate()
				if err != nil {
					monitoring.ServerCertIssueFailed("server_cert")
					log.Error(err)
					return nil, fmt.Errorf("failed to get TLS server certificate: %w", err)
				}
				s.certificate = cert
			}
			return s.certificate, nil
		},
	}
	return grpc.Creds(credentials.NewTLS(config))
}

needsRefresh() 方法的实现:

func needsRefresh(cert *tls.Certificate, expiryBuffer time.Duration) bool {
	leaf := cert.Leaf
	if leaf == nil {
		return true
	}

	// Check if the leaf certificate is about to expire.
  // 检查是不是快要过期了:15 分钟
	return leaf.NotAfter.Add(-serverCertExpiryBuffer).Before(time.Now().UTC())
}
const (
	serverCertExpiryBuffer = time.Minute * 15
)

getServerCertificate() 方法负责生成服务器端的证书:

func (s *server) getServerCertificate() (*tls.Certificate, error) {
	csrPem, pkPem, err := csr.GenerateCSR("", false)
	if err != nil {
		return nil, err
	}

	now := time.Now().UTC()
	issuerExp := s.certAuth.GetCACertBundle().GetIssuerCertExpiry()
	if issuerExp == nil {
		return nil, errors.New("could not find expiration in issuer certificate")
	}
	serverCertTTL := issuerExp.Sub(now)

	resp, err := s.certAuth.SignCSR(csrPem, s.certAuth.GetCACertBundle().GetTrustDomain(), nil, serverCertTTL, false)
	if err != nil {
		return nil, err
	}

	certPem := resp.CertPEM
	certPem = append(certPem, s.certAuth.GetCACertBundle().GetIssuerCertPem()...)
	if rootCertPem := s.certAuth.GetCACertBundle().GetRootCertPem(); len(rootCertPem) > 0 {
		certPem = append(certPem, rootCertPem...)
	}

	cert, err := tls.X509KeyPair(certPem, pkPem)
	if err != nil {
		return nil, err
	}

	return &cert, nil
}

更多细节要看 certAuth.SignCSR() 方法的实现。

签署证书

SignCertificate() 方法处理从 dapr sidedar 发起的 CSR 请求。这个方法接收带有 identity 和 初始证书的请求,并为调用者返回包括信任链在内的签名证书和过期时间。

// SignCertificate handles CSR requests originating from Dapr sidecars.
// The method receives a request with an identity and initial cert and returns
// A signed certificate including the trust chain to the caller along with an expiry date.
func (s *server) SignCertificate(ctx context.Context, req *sentryv1pb.SignCertificateRequest) (*sentryv1pb.SignCertificateResponse, error) {
	monitoring.CertSignRequestReceived()

	csrPem := req.GetCertificateSigningRequest()

  // 解析请求中的 CSR
	csr, err := certs.ParsePemCSR(csrPem)
	if err != nil {
		err = fmt.Errorf("cannot parse certificate signing request pem: %w", err)
		log.Error(err)
		monitoring.CertSignFailed("cert_parse")
		return nil, err
	}

  // 验证 CSR
	err = s.certAuth.ValidateCSR(csr)
	if err != nil {
		err = fmt.Errorf("error validating csr: %w", err)
		log.Error(err)
		monitoring.CertSignFailed("cert_validation")
		return nil, err
	}

  // 验证请求身份
	err = s.validator.Validate(req.GetId(), req.GetToken(), req.GetNamespace())
	if err != nil {
		err = fmt.Errorf("error validating requester identity: %w", err)
		log.Error(err)
		monitoring.CertSignFailed("req_id_validation")
		return nil, err
	}

  // 签名证书
	identity := identity.NewBundle(csr.Subject.CommonName, req.GetNamespace(), req.GetTrustDomain())
	signed, err := s.certAuth.SignCSR(csrPem, csr.Subject.CommonName, identity, -1, false)
	if err != nil {
		err = fmt.Errorf("error signing csr: %w", err)
		log.Error(err)
		monitoring.CertSignFailed("cert_sign")
		return nil, err
	}

  // 准备要返回的各种数据
	certPem := signed.CertPEM
	issuerCert := s.certAuth.GetCACertBundle().GetIssuerCertPem()
	rootCert := s.certAuth.GetCACertBundle().GetRootCertPem()

	certPem = append(certPem, issuerCert...)
	if len(rootCert) > 0 {
		certPem = append(certPem, rootCert...)
	}

	if len(certPem) == 0 {
		err = errors.New("insufficient data in certificate signing request, no certs signed")
		log.Error(err)
		monitoring.CertSignFailed("insufficient_data")
		return nil, err
	}

	expiry := timestamppb.New(signed.Certificate.NotAfter)
	if err = expiry.CheckValid(); err != nil {
		return nil, fmt.Errorf("could not validate certificate validity: %w", err)
	}

  // 组装 response 结构体
	resp := &sentryv1pb.SignCertificateResponse{
		WorkloadCertificate:    certPem,
		TrustChainCertificates: [][]byte{issuerCert, rootCert},
		ValidUntil:             expiry,
	}

	monitoring.CertSignSucceed()

	return resp, nil
}

总结

实现很简单,就是涉及到证书的各种操作,需要有相关的背景知识。

5 - csr代码

处理 csr 的相关逻辑

csr 相关的逻辑实现在文件 pkg/sentry/csr/csr.go 中。

准备工作

常量定义

const (
	blockTypeECPrivateKey = "EC PRIVATE KEY" // EC private key
	blockTypePrivateKey   = "PRIVATE KEY"    // PKCS#8 private key
	encodeMsgCSR          = "CERTIFICATE REQUEST"
	encodeMsgCert         = "CERTIFICATE"
)

全局变量定义

// The OID for the SAN extension (http://www.alvestrand.no/objectid/2.5.29.17.html)
var oidSubjectAlternativeName = asn1.ObjectIdentifier{2, 5, 29, 17}

实现

生成CSR

GenerateCSR() f方法创建 X.509 certificate sign request 和私钥:

// GenerateCSR creates a X.509 certificate sign request and private key.
func GenerateCSR(org string, pkcs8 bool) ([]byte, []byte, error) {
  // 生成 ec 私钥
	key, err := certs.GenerateECPrivateKey()
	if err != nil {
		return nil, nil, fmt.Errorf("unable to generate private keys: %w", err)
	}

  // 生成 csr 模版
	templ, err := genCSRTemplate(org)
	if err != nil {
		return nil, nil, fmt.Errorf("error generating csr template: %w", err)
	}

  // 创建证书请求
	csrBytes, err := x509.CreateCertificateRequest(rand.Reader, templ, key)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to create CSR: %w", err)
	}

  // 编码证书
	crtPem, keyPem, err := encode(true, csrBytes, key, pkcs8)
	return crtPem, keyPem, err
}

生成 csr 模版的实现,只设置了Organization :

func genCSRTemplate(org string) (*x509.CertificateRequest, error) {
	return &x509.CertificateRequest{
		Subject: pkix.Name{
			Organization: []string{org},
		},
	}, nil
}

编码证书的实现代码:

func encode(csr bool, csrOrCert []byte, privKey *ecdsa.PrivateKey, pkcs8 bool) ([]byte, []byte, error) {
	// 判断是 "CERTIFICATE" 还是 "CERTIFICATE REQUEST"
  encodeMsg := encodeMsgCert
	if csr {
		encodeMsg = encodeMsgCSR
	}
  // 执行编码
	csrOrCertPem := pem.EncodeToMemory(&pem.Block{Type: encodeMsg, Bytes: csrOrCert})

	var encodedKey, privPem []byte
	var err error

	if pkcs8 {
    // 如果是 pkcs8,需要将私钥编码为 PKCS8 私钥 / "PRIVATE KEY"
		if encodedKey, err = x509.MarshalPKCS8PrivateKey(privKey); err != nil {
			return nil, nil, err
		}
    // 将上面的 PKCS8 私钥编码到内存
		privPem = pem.EncodeToMemory(&pem.Block{Type: blockTypePrivateKey, Bytes: encodedKey})
	} else {
    // 不是 pkcs8 的话,需要将私钥编码为 EC 私钥 / "EC PRIVATE KEY"
		encodedKey, err = x509.MarshalECPrivateKey(privKey)
		if err != nil {
			return nil, nil, err
		}
		privPem = pem.EncodeToMemory(&pem.Block{Type: blockTypeECPrivateKey, Bytes: encodedKey})
	}
	return csrOrCertPem, privPem, nil
}

生成基础证书

generateBaseCert() 方法返回一个基本的非CA证书,该证书可以通过添加 subject、key usage 和附加属性成为一个工作负载或CA证书:

// generateBaseCert returns a base non-CA cert that can be made a workload or CA cert
// By adding subjects, key usage and additional proerties.
func generateBaseCert(ttl, skew time.Duration, publicKey interface{}) (*x509.Certificate, error) {
  // 创建一个新的序列号
	serNum, err := newSerialNumber()
	if err != nil {
		return nil, err
	}

	now := time.Now().UTC()
	// Allow for clock skew with the NotBefore validity bound.
  // 允许在 NotBefore 有效期内出现时钟偏移。
	notBefore := now.Add(-1 * skew)
	notAfter := now.Add(ttl)

  // 创建并返回 x509 证书
	return &x509.Certificate{
		SerialNumber: serNum,
		NotBefore:    notBefore,
		NotAfter:     notAfter,
		PublicKey:    publicKey,
	}, nil
}

创建一个新的序列号的代码实现细节:

func newSerialNumber() (*big.Int, error) {
  // 序列号的最大值,1 << 128
	serialNumLimit := new(big.Int).Lsh(big.NewInt(1), 128)
  // 在这个区间内取随机数
	serialNum, err := rand.Int(rand.Reader, serialNumLimit)
	if err != nil {
		return nil, fmt.Errorf("error generating serial number: %w", err)
	}
	return serialNum, nil
}

生成基础证书的第一步就是生成其他证书。

生成 Root Cert CSR

GenerateRootCertCSR() 方法返回 CA root cert x509 证书:

// GenerateRootCertCSR returns a CA root cert x509 Certificate.
func GenerateRootCertCSR(org, cn string, publicKey interface{}, ttl, skew time.Duration) (*x509.Certificate, error) {
  // 先生成基本证书
	cert, err := generateBaseCert(ttl, skew, publicKey)
	if err != nil {
		return nil, err
	}

  // 设置证书的参数
	cert.KeyUsage = x509.KeyUsageCertSign
	cert.ExtKeyUsage = append(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth)
	cert.Subject = pkix.Name{
		CommonName:   cn,
		Organization: []string{org},
	}
	cert.DNSNames = []string{cn}
	cert.IsCA = true
	cert.BasicConstraintsValid = true
	cert.SignatureAlgorithm = x509.ECDSAWithSHA256
	return cert, nil
}

生成 CSR Certificate

GenerateCSRCertificate() 方法 返回 x509 Certificate,输入为 CSR / 签名证书 / 公钥 / 签名私钥 和持续时间:

// GenerateCSRCertificate returns an x509 Certificate from a CSR, signing cert, public key, signing private key and duration.
func GenerateCSRCertificate(csr *x509.CertificateRequest, subject string, identityBundle *identity.Bundle, signingCert *x509.Certificate, publicKey interface{}, signingKey crypto.PrivateKey,
	ttl, skew time.Duration, isCA bool,
) ([]byte, error) {
  // 先生成基本证书
	cert, err := generateBaseCert(ttl, skew, publicKey)
	if err != nil {
		return nil, fmt.Errorf("error generating csr certificate: %w", err)
	}
	if isCA {
		cert.KeyUsage = x509.KeyUsageCertSign | x509.KeyUsageCRLSign
	} else {
		cert.KeyUsage = x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment
		cert.ExtKeyUsage = append(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth)
	}

	if subject == "cluster.local" {
		cert.Subject = pkix.Name{
			CommonName: subject,
		}
		cert.DNSNames = []string{subject}
	}

	cert.Issuer = signingCert.Issuer
	cert.IsCA = isCA
	cert.IPAddresses = csr.IPAddresses
	cert.Extensions = csr.Extensions
	cert.BasicConstraintsValid = true
	cert.SignatureAlgorithm = csr.SignatureAlgorithm

	if identityBundle != nil {
		spiffeID, err := identity.CreateSPIFFEID(identityBundle.TrustDomain, identityBundle.Namespace, identityBundle.ID)
		if err != nil {
			return nil, fmt.Errorf("error generating spiffe id: %w", err)
		}

		rv := []asn1.RawValue{
			{
				Bytes: []byte(spiffeID),
				Class: asn1.ClassContextSpecific,
				Tag:   asn1.TagOID,
			},
			{
				Bytes: []byte(fmt.Sprintf("%s.%s.svc.cluster.local", subject, identityBundle.Namespace)),
				Class: asn1.ClassContextSpecific,
				Tag:   2,
			},
		}

		b, err := asn1.Marshal(rv)
		if err != nil {
			return nil, fmt.Errorf("failed to marshal asn1 raw value for spiffe id: %w", err)
		}

		cert.ExtraExtensions = append(cert.ExtraExtensions, pkix.Extension{
			Id:       oidSubjectAlternativeName,
			Value:    b,
			Critical: true, // According to x509 and SPIFFE specs, a SubjAltName extension must be critical if subject name and DNS are not present.
		})
	}

	return x509.CreateCertificate(rand.Reader, cert, signingCert, publicKey, signingKey)
}

这里涉及很多 x509 相关的领域知识。

6 - certs代码

处理 certs 的相关逻辑

certs 相关的逻辑实现在文件 pkg/sentry/certs/certs.go 中。

准备工作

常量定义

const (
	BlockTypeCertificate     = "CERTIFICATE"
	BlockTypeECPrivateKey    = "EC PRIVATE KEY"
	BlockTypePKCS1PrivateKey = "RSA PRIVATE KEY"
	BlockTypePKCS8PrivateKey = "PRIVATE KEY"
)

备注:这里的常量定义和 csr.go 中的有部分重复。

结构体定义

Credentials 结构体包含一个证书和一个 私钥:

// Credentials holds a certificate and private key.
type Credentials struct {
	PrivateKey  crypto.PrivateKey
	Certificate *x509.Certificate
}

实现

解码 PEM key

DecodePEMKey() 接收一个 PEM key 字节数组并返回一个代表 RSA 或 EC 私钥 的对象:

func DecodePEMKey(key []byte) (crypto.PrivateKey, error) {
  // 解码 pem key
	block, _ := pem.Decode(key)
	if block == nil {
		return nil, errors.New("key is not PEM encoded")
	}
  
  // 按照类型进行后续解析处理
	switch block.Type {
	case BlockTypeECPrivateKey:
    // EC Private Key
		return x509.ParseECPrivateKey(block.Bytes)
	case BlockTypePKCS1PrivateKey:
    // PKCS1 Private Key
		return x509.ParsePKCS1PrivateKey(block.Bytes)
	case BlockTypePKCS8PrivateKey:
    // PKCS8 Private Key
		return x509.ParsePKCS8PrivateKey(block.Bytes)
	default:
		return nil, fmt.Errorf("unsupported block type %s", block.Type)
	}
}

解码 PEM 证书

DecodePEMCertificates() 方法接收一个 PEM 编码的 x509 证书字节数组,并以 x509.Certificate 对象片断的方式返回所有证书:

func DecodePEMCertificates(crtb []byte) ([]*x509.Certificate, error) {
	certs := []*x509.Certificate{}
  // crtb 数组可能包含多个证书
	for len(crtb) > 0 {
		var err error
		var cert *x509.Certificate

    // 解码单个 pem 证书
		cert, crtb, err = decodeCertificatePEM(crtb)
		if err != nil {
			return nil, err
		}
		if cert != nil {
			// it's a cert, add to pool
			certs = append(certs, cert)
		}
	}
	return certs, nil
}

decodeCertificatePEM() 方法解码单个 pem 证书:

func decodeCertificatePEM(crtb []byte) (*x509.Certificate, []byte, error) {
  // 执行pem 解码
  // pem.Decode() 方法将在输入中找到下一个 PEM 格式的块(证书,私钥  等)的输入。
  // 它返回该块和输入的其余部分。
  // 注意是返回剩余部分,当没有更多部分时,返回的长度为0
  // 如果没有找到PEM数据,则返回 block 为nil,其余部分返回整个输入。
	block, crtb := pem.Decode(crtb)
	if block == nil {
		return nil, crtb, errors.New("invalid PEM certificate")
	}
	if block.Type != BlockTypeCertificate {
		return nil, nil, nil
	}
  // 解码 x509 证书
	c, err := x509.ParseCertificate(block.Bytes)
	return c, crtb, err
}

生成基础证书的第一步就是生成其他证书。

从文件中获取 PEM 凭证

PEMCredentialsFromFiles() 方法接收一个密钥/证书对的路径,并返回一个经过验证的Credentials包装器:

func PEMCredentialsFromFiles(certPem, keyPem []byte) (*Credentials, error) {
  // 解码 PEM key
	pk, err := DecodePEMKey(keyPem)
	if err != nil {
		return nil, err
	}

  // 解码 PEM 证书
  // 如果有多个证书,实际后续只使用多个证书中的第一个
	crts, err := DecodePEMCertificates(certPem)
	if err != nil {
		return nil, err
	}

	if len(crts) == 0 {
		return nil, errors.New("no certificates found")
	}

  // 检查私钥和证书的 PublicKey 是否匹配
	match := matchCertificateAndKey(pk, crts[0])
	if !match {
		return nil, errors.New("error validating credentials: public and private key pair do not match")
	}

  // 构建 Credentials 结构体并返回
	creds := &Credentials{
		PrivateKey:  pk,
		Certificate: crts[0],
	}

	return creds, nil
}

matchCertificateAndKey() 方法检查私钥和证书的 PublicKey 是否匹配 :

func matchCertificateAndKey(pk any, cert *x509.Certificate) bool {
  // 根据私钥的类型进行匹配
  // 实际是根据私钥类型的不同,获取到 cert 相应的 PublicKey,然后和私钥的 PublicKey 对比看是否相同
	switch key := pk.(type) {
	case *ecdsa.PrivateKey:
    // ecdsa PrivateKey
		if cert.PublicKeyAlgorithm != x509.ECDSA {
			return false
		}
		pub, ok := cert.PublicKey.(*ecdsa.PublicKey)
		return ok && pub.Equal(key.Public())
	case *rsa.PrivateKey:
    // rsa PrivateKey
		if cert.PublicKeyAlgorithm != x509.RSA {
			return false
		}
		pub, ok := cert.PublicKey.(*rsa.PublicKey)
		return ok && pub.Equal(key.Public())
	case ed25519.PrivateKey:
    // ed25519 Private Key
		if cert.PublicKeyAlgorithm != x509.Ed25519 {
			return false
		}
		pub, ok := cert.PublicKey.(ed25519.PublicKey)
		return ok && pub.Equal(key.Public())
	default:
		return false
	}
}

从 PEM 创建 cert pool

CertPoolFromPEM() 方法从一个 PEM 编码的证书字符串返回一个 CertPool

func CertPoolFromPEM(certPem []byte) (*x509.CertPool, error) {
  // 解码 PEM 证书
	certs, err := DecodePEMCertificates(certPem)
	if err != nil {
		return nil, err
	}
	if len(certs) == 0 {
		return nil, errors.New("no certificates found")
	}

  // 从多个证书中创建 cert pool
	return certPoolFromCertificates(certs), nil
}

certPoolFromCertificates() 方法的实现很简单:

func certPoolFromCertificates(certs []*x509.Certificate) *x509.CertPool {
  // 创建 cert pool
	pool := x509.NewCertPool()
	for _, c := range certs {
    // 将每个证书添加到 pool
		pool.AddCert(c)
	}
	return pool
}

解析 PRM CSR

ParsePemCSR() 使用给定的 PEM 编码的证书签名请求构建一个 x509 证书请求:

func ParsePemCSR(csrPem []byte) (*x509.CertificateRequest, error) {
  // pem 解码
	block, _ := pem.Decode(csrPem)
	if block == nil {
		return nil, errors.New("certificate signing request is not properly encoded")
	}
  
  // 尝试 x509 解码证书请求
	csr, err := x509.ParseCertificateRequest(block.Bytes)
	if err != nil {
		return nil, fmt.Errorf("failed to parse X.509 certificate signing request: %w", err)
	}
	return csr, nil
}

生成 ECP 私钥

GenerateECPrivateKey() 方法返回一个新的 ECP 私钥:

func GenerateECPrivateKey() (*ecdsa.PrivateKey, error) {
	return ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
}

这里涉及很多 x509 相关的领域知识。

7 - certs store代码

实现 certs 的存储 store

certs 相关的存储逻辑实现在文件 pkg/sentry/certs/store.go 中。

准备工作

常量定义

const (
	defaultSecretNamespace = "default"
)

实现

存储凭证

StoreCredentials() 方法将 trust bundle 存储在 Kubernetes secret store 或者本地磁盘上,取决于托管的平台:

func StoreCredentials(ctx context.Context, conf config.SentryConfig, rootCertPem, issuerCertPem, issuerKeyPem []byte) error {
	if config.IsKubernetesHosted() {
    // 如果是 k8s 托管来
		return storeKubernetes(ctx, rootCertPem, issuerCertPem, issuerKeyPem)
	}
  
  // 否则是自托管
	return storeSelfhosted(rootCertPem, issuerCertPem, issuerKeyPem, conf.RootCertPath, conf.IssuerCertPath, conf.IssuerKeyPath)
}

在 Kubernetes 中的存储

storeKubernetes() 方法将凭证存储在 Kubernetes secret store 中:

// 部分常量定于在 consts.go 中
const (
	TrustBundleK8sSecretName = "dapr-trust-bundle" /* #nosec */
)

func storeKubernetes(ctx context.Context, rootCertPem, issuerCertPem, issuerCertKey []byte) error {
  // 准备 k8s client
	kubeClient, err := kubernetes.GetClient()
	if err != nil {
		return err
	}

  // 获取 namespace
	namespace := getNamespace()
  // 调用 k8s API 的方法获取 secret
	secret, err := kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), consts.TrustBundleK8sSecretName, metav1.GetOptions{})
	if errors.IsNotFound(err) {
		return fmt.Errorf("failed to get secret %w", err)
	}

  // 将 rootCertPem / issuerCertPem / issuerCertKey 保存到 secret 的 Data 中
	secret.Data = map[string][]byte{
		credentials.RootCertFilename:   rootCertPem,
		credentials.IssuerCertFilename: issuerCertPem,
		credentials.IssuerKeyFilename:  issuerCertKey,
	}

  // 更新保存 secret
	// We update and not create because sentry expects a secret to already exist
	_, err = kubeClient.CoreV1().Secrets(namespace).Update(ctx, secret, metav1.UpdateOptions{})
	if err != nil {
		return fmt.Errorf("failed saving secret to kubernetes: %w", err)
	}
	return nil
}

其中 getNamespace() 读取环境变量 “NAMESPACE” 来获知当前的命名空间,缺省值为 “default”:

const (
	defaultSecretNamespace = "default"
)

func getNamespace() string {
	namespace := os.Getenv("NAMESPACE")
	if namespace == "" {
		namespace = defaultSecretNamespace
	}
	return namespace
}

自托管时的存储

storeSelfhosted() 方法将凭证存储在本地文件中:

func StoreCredentials(...) {
  ......
	return storeSelfhosted(rootCertPem, issuerCertPem, issuerKeyPem, conf.RootCertPath, conf.IssuerCertPath, conf.IssuerKeyPath)
  }

func storeSelfhosted(rootCertPem, issuerCertPem, issuerKeyPem []byte, rootCertPath, issuerCertPath, issuerKeyPath string) error {
  // 分别将三个内容保存到三个文件中
	err := os.WriteFile(rootCertPath, rootCertPem, 0o644)
	if err != nil {
		return fmt.Errorf("failed saving file to %s: %w", rootCertPath, err)
	}

	err = os.WriteFile(issuerCertPath, issuerCertPem, 0o644)
	if err != nil {
		return fmt.Errorf("failed saving file to %s: %w", issuerCertPath, err)
	}

	err = os.WriteFile(issuerKeyPath, issuerKeyPem, 0o644)
	if err != nil {
		return fmt.Errorf("failed saving file to %s: %w", issuerKeyPath, err)
	}
	return nil
}

rootCertPem / issuerCertPem / issuerKeyPem 分别保存到 conf.RootCertPath / conf.IssuerCertPath / conf.IssuerKeyPath 这三个 sentry 配置指定的文件路径中。

回顾一下 main.go  中读取相关配置的代码实现:

const (
	defaultCredentialsPath = "/var/run/dapr/credentials"
)

var (
	// RootCertFilename is the filename that holds the root certificate.
	RootCertFilename = "ca.crt"
	// IssuerCertFilename is the filename that holds the issuer certificate.
	IssuerCertFilename = "issuer.crt"
	// IssuerKeyFilename is the filename that holds the issuer key.
	IssuerKeyFilename = "issuer.key"
)

func main() {
  ......
credsPath := flag.String("issuer-credentials", defaultCredentialsPath, "Path to the credentials directory holding the issuer data")	
  flag.StringVar(&credentials.RootCertFilename, "issuer-ca-filename", credentials.RootCertFilename, "Certificate Authority certificate filename")
	flag.StringVar(&credentials.IssuerCertFilename, "issuer-certificate-filename", credentials.IssuerCertFilename, "Issuer certificate filename")
	flag.StringVar(&credentials.IssuerKeyFilename, "issuer-key-filename", credentials.IssuerKeyFilename, "Issuer private key filename")

	issuerCertPath := filepath.Join(*credsPath, credentials.IssuerCertFilename)
	issuerKeyPath := filepath.Join(*credsPath, credentials.IssuerKeyFilename)
	rootCertPath := filepath.Join(*credsPath, credentials.RootCertFilename)

	......
	config.IssuerCertPath = issuerCertPath
	config.IssuerKeyPath = issuerKeyPath
	config.RootCertPath = rootCertPath
  ......
}

可见默认是使用 “/var/run/dapr/credentials” 目录下的这三个文件:

  • “ca.crt”
  • “issuer.crt”
  • “issuer.key”

8 - metrics代码

sentry 中的 metrics 实现

metrics 相关的实现在文件 pkg/sentry/monitoring/metrics.go 中。

准备工作

变量定义

定义了一些和 metrics 相关的变量:

var (
	// Metrics definitions.
	csrReceivedTotal = stats.Int64(
		"sentry/cert/sign/request_received_total",
		"The number of CSRs received.",
		stats.UnitDimensionless)
	certSignSuccessTotal = stats.Int64(
		"sentry/cert/sign/success_total",
		"The number of certificates issuances that have succeeded.",
		stats.UnitDimensionless)
	certSignFailedTotal = stats.Int64(
		"sentry/cert/sign/failure_total",
		"The number of errors occurred when signing the CSR.",
		stats.UnitDimensionless)
	serverTLSCertIssueFailedTotal = stats.Int64(
		"sentry/servercert/issue_failed_total",
		"The number of server TLS certificate issuance failures.",
		stats.UnitDimensionless)
	issuerCertChangedTotal = stats.Int64(
		"sentry/issuercert/changed_total",
		"The number of issuer cert updates, when issuer cert or key is changed",
		stats.UnitDimensionless)
	issuerCertExpiryTimestamp = stats.Int64(
		"sentry/issuercert/expiry_timestamp",
		"The unix timestamp, in seconds, when issuer/root cert will expire.",
		stats.UnitDimensionless)

	// Metrics Tags.
	failedReasonKey = tag.MustNewKey("reason")
	noKeys          = []tag.Key{}
)

目前总共有 6 个 metrics 指标:

  • csrReceivedTotal:接收到的 csr 的数量
  • certSignSuccessTotal:签署成功的证书数量
  • certSignFailedTotal:签署失败的证书数量
  • serverTLSCertIssueFailedTotal:服务器TLS证书发放失败的次数。
  • issuerCertChangedTotal: 当签发人的证书或钥匙被改变时,签发人证书更新的数量
  • issuerCertExpiryTimestamp:发行人/根证书有效期的unix时间戳,单位是秒。

初始化

初始化 metrics:

func InitMetrics() error {
  // 将 6 个 metrics 指标都注册起来
	return view.Register(
		diagUtils.NewMeasureView(csrReceivedTotal, noKeys, view.Count()),
		diagUtils.NewMeasureView(certSignSuccessTotal, noKeys, view.Count()),
		diagUtils.NewMeasureView(certSignFailedTotal, []tag.Key{failedReasonKey}, view.Count()),
		diagUtils.NewMeasureView(serverTLSCertIssueFailedTotal, []tag.Key{failedReasonKey}, view.Count()),
		diagUtils.NewMeasureView(issuerCertChangedTotal, noKeys, view.Count()),
		diagUtils.NewMeasureView(issuerCertExpiryTimestamp, noKeys, view.LastValue()),
	)
}

收集 metrics

crs 相关

CertSignRequestReceived() 对接收到的 csr 数量进行计数:

// CertSignRequestReceived counts when CSR received.
func CertSignRequestReceived() {
	stats.Record(context.Background(), csrReceivedTotal.M(1))
}

另外 CertSignSucceed() 会对处理成功的情况进行计数:

func CertSignSucceed() {
	stats.Record(context.Background(), certSignSuccessTotal.M(1))
}

而 CertSignFailed() 则会对处理失败的情况进行计数:

func CertSignFailed(reason string) {
	stats.RecordWithTags(
		context.Background(),
		diagUtils.WithTags(certSignFailedTotal.Name(), failedReasonKey, reason),
		certSignFailedTotal.M(1))
}

三者的调用点为 server.go 中的 SignCertificate() 函数,这个函数负责处理 csr 请求:

func (s *server) SignCertificate(ctx context.Context, req *sentryv1pb.SignCertificateRequest) (*sentryv1pb.SignCertificateResponse, error) {
  // 进来就计数:这是 接收到的 csr 数量
	monitoring.CertSignRequestReceived()
  ......
  
  // 每一个错误在return之前都要进行一次失败计数
	if err != nil {
		monitoring.CertSignFailed("cert_parse")
		return nil, err
	}
  ......
  // 如果最后 csr 处理成功,则进行成功计数
  monitoring.CertSignSucceed()

	return resp, nil
}

证书有效期

IssuerCertExpiry() 方法记录 root cert 有效期的情况:

// IssuerCertExpiry records root cert expiry.
func IssuerCertExpiry(expiry *time.Time) {
	stats.Record(context.Background(), issuerCertExpiryTimestamp.M(expiry.Unix()))
}

调用点在 sentry.go 中的 createCAServer() 函数中:

func (s *sentry) createCAServer(ctx context.Context) (ca.CertificateAuthority, identity.Validator) {
	certAuth, authorityErr := ca.NewCertificateAuthority(s.conf)
	trustStoreErr := certAuth.LoadOrStoreTrustBundle(ctx)
	......
	certExpiry := certAuth.GetCACertBundle().GetIssuerCertExpiry()
	monitoring.IssuerCertExpiry(certExpiry)
	......
	return certAuth, v
}

在 CA server 的创建过程中,会加载 trust bundle并检查证书的有效期,在这里记录有效期的数据收集。

服务器证书签发失败

ServerCertIssueFailed() 记录服务器证书签发失败。

func ServerCertIssueFailed(reason string) {
	stats.Record(context.Background(), serverTLSCertIssueFailedTotal.M(1))
}

调用点在 server.go 中:


func (s *server) Run(ctx context.Context, port int, trustBundler ca.TrustRootBundler) error {
  ......
  tlsOpt := s.tlsServerOption(trustBundler)
  s.srv = grpc.NewServer(tlsOpt)
  ......
}

sentry server启动过程中,在启动 grpc server 时,需要获取 tls server 的参数,期间要获取 sentry server 的服务器端证书:

func (s *server) tlsServerOption(trustBundler ca.TrustRootBundler) grpc.ServerOption {
	cp := trustBundler.GetTrustAnchors()

	config := &tls.Config{
		ClientCAs: cp,
		// Require cert verification
		ClientAuth: tls.RequireAndVerifyClientCert,
		GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
			if s.certificate == nil || needsRefresh(s.certificate, serverCertExpiryBuffer) {
				cert, err := s.getServerCertificate()
				if err != nil {
					monitoring.ServerCertIssueFailed("server_cert")
					log.Error(err)
					return nil, fmt.Errorf("failed to get TLS server certificate: %w", err)
				}
				s.certificate = cert
			}
	......
}

如果获取失败,则会记录这个失败信息。

发行者证书变更

IssuerCertChanged() 记录发行人凭证的变更:

func IssuerCertChanged() {
	stats.Record(context.Background(), issuerCertChangedTotal.M(1))
}

调用点在 main.go 中的 main() 函数中,sentry 在启动后会监视发行者证书(默认为 “/var/run/dapr/credentials” 下的 “issuer.crt” 文件):

func main() {
  ......
			func(ctx context.Context) error {
				select {
				case <-ctx.Done():
					return nil

				case <-issuerEvent:
					monitoring.IssuerCertChanged()
					log.Debug("received issuer credentials changed signal")
				......
	}
  ......
  	// Watch for changes in the watchDir
	mngr.Add(func(ctx context.Context) error {
		log.Infof("starting watch on filesystem directory: %s", watchDir)
		return fswatcher.Watch(ctx, watchDir, issuerEvent)
	})
}

9 - sentry库的identify实现

identify

9.1 - identify.go

identify结构体定义

结构体定义

// Bundle 包含了足以以识别一个跨信任域和命名空间的工作负载的所有的元素:

type Bundle struct {
	ID          string
	Namespace   string
	TrustDomain string
}

其实就三个元素: ID / Namespace 以及 TrustDomain

NewBundle() 方法

NewBundle() 方法返回一个新的 identity Bundle。

func NewBundle(id, namespace, trustDomain string) *Bundle {
	// Empty namespace and trust domain result in an empty bundle
  // 如果 namespace 或者 trust domain 为空,则返回空的 bundle(nil)
	if namespace == "" || trustDomain == "" {
		return nil
	}

  // 否则指示简单的赋值三个属性
	return &Bundle{
		ID:          id,
		Namespace:   namespace,
		TrustDomain: trustDomain,
	}
}

namespace和trustDomain是可选参数。当为空时,将返回一个 nil 值。

9.2 - validator.go

validator 接口定义

接口定义

Validator 通过使用 ID 和 token 来验证证书请求的身份

type Validator interface {
	Validate(id, token, namespace string) error
}

9.3 - spiff.go

创建 spiff ID

CreateSPIFFEID 方法

CreateSPIFFEID() 方法从给定的 trustDomain, namespace, appID 创建符合 SPIFFE 标准的唯一ID:

func CreateSPIFFEID(trustDomain, namespace, appID string) (string, error) {
  // trustDomain, namespace, appID 三者都不能为空
	if trustDomain == "" {
		return "", errors.New("can't create spiffe id: trust domain is empty")
	}
	if namespace == "" {
		return "", errors.New("can't create spiffe id: namespace is empty")
	}
	if appID == "" {
		return "", errors.New("can't create spiffe id: app id is empty")
	}

  // 根据 SPIFFE 规范进行验证
	// Validate according to the SPIFFE spec
	if strings.ContainsRune(trustDomain, ':') {
    // trustDomain不能带":"
		return "", errors.New("trust domain cannot contain the ':' character")
	}
  // trustDomain 的长度不能大于255个 byte
	if len([]byte(trustDomain)) > 255 {
		return "", errors.New("trust domain cannot exceed 255 bytes")
	}

  // 拼接出 SPIFFE ID
	id := fmt.Sprintf("spiffe://%s/ns/%s/%s", trustDomain, namespace, appID)
	if len([]byte(id)) > 2048 {
    // 验证 SPIFFE ID 长度不大于 2048
		return "", errors.New("spiffe id cannot exceed 2048 bytes")
	}
	return id, nil
}

9.4 - kubernetes下的validator.go

kubernetes下的validator实现

准备工作

结构体定义

validator 结构体定义:

type validator struct {
	client    k8s.Interface
	auth      kauth.AuthenticationV1Interface
	audiences []string
}

创建validator的方法

NewValidator() 方法创建新的 validator 结构体:

func NewValidator(client k8s.Interface, audiences []string) identity.Validator {
	return &validator{
		client:    client,
		auth:      client.AuthenticationV1(),
		audiences: audiences,
	}
}

实现

Validate() 实现通过使用 ID 和 token 来验证证书请求的身份:

func (v *validator) Validate(id, token, namespace string) error {
  // id 和 token 不能为空
	if id == "" {
		return fmt.Errorf("%s: id field in request must not be empty", errPrefix)
	}
	if token == "" {
		return fmt.Errorf("%s: token field in request must not be empty", errPrefix)
	}

	// TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
	var canTryWithNilAudience, showDefaultTokenAudienceWarning bool

	audiences := v.audiences
  
	if len(audiences) == 0 {
    // 处理用户没有显式设置 audience 的特殊情况
    // 此时采用默认是 sentryConsts.ServiceAccountTokenAudience "dapr.io/sentry"
		audiences = []string{sentryConsts.ServiceAccountTokenAudience}

		// TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
		// Because the user did not specify an explicit audience and is instead relying on the default, if the authentication fails we can retry with nil audience
    // 并记录下来这是特殊情况,如果认证失败则应该尝试 audience 为 nil 的情况
		canTryWithNilAudience = true
	}
	tokenReview := &kauthapi.TokenReview{
		Spec: kauthapi.TokenReviewSpec{
			Token:     token,
			Audiences: audiences,
		},
	}

tr: // TODO: Remove in Dapr 1.12 to enforce setting an explicit audience

	prts, err := v.executeTokenReview(tokenReview)
	if err != nil {
		// TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
		if canTryWithNilAudience {
			// Retry with a nil audience, which means the default audience for the K8s API server
			tokenReview.Spec.Audiences = nil
			showDefaultTokenAudienceWarning = true
			canTryWithNilAudience = false
			goto tr
		}

		return err
	}

	// TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
	if showDefaultTokenAudienceWarning {
		log.Warn("WARNING: Sentry accepted a token with the audience for the Kubernetes API server. This is deprecated and only supported to ensure a smooth upgrade from Dapr pre-1.10.")
	}

	if len(prts) != 4 || prts[0] != "system" {
		return fmt.Errorf("%s: provided token is not a properly structured service account token", errPrefix)
	}

	podSa := prts[3]
	podNs := prts[2]

  // 检验 namespace
	if namespace != "" {
		if podNs != namespace {
			return fmt.Errorf("%s: namespace mismatch. received namespace: %s", errPrefix, namespace)
		}
	}

  // 检验 id
	if id != podNs+":"+podSa {
		return fmt.Errorf("%s: token/id mismatch. received id: %s", errPrefix, id)
	}
	return nil
}

executeTokenReview() 方法执行 tokenReview,如果 token 无效或者失败则返回错误:

func (v *validator) executeTokenReview(tokenReview *kauthapi.TokenReview) ([]string, error) {
	review, err := v.auth.TokenReviews().Create(context.TODO(), tokenReview, v1.CreateOptions{})
	if err != nil {
		return nil, fmt.Errorf("%s: token review failed: %w", errPrefix, err)
	}
	if review.Status.Error != "" {
		return nil, fmt.Errorf("%s: invalid token: %s", errPrefix, review.Status.Error)
	}
	if !review.Status.Authenticated {
		return nil, fmt.Errorf("%s: authentication failed", errPrefix)
	}
	return strings.Split(review.Status.User.Username, ":"), nil
}

9.5 - selfhosted下的validator.go

selfhosted下的validator实现

selfhosted 下实际没有做验证:

func NewValidator() identity.Validator {
	return &validator{}
}

type validator struct{}

func (v *validator) Validate(id, token, namespace string) error {
	// no validation for self hosted.
	return nil
}

只是保留了一套代码框架,以满足 Validator 接口的要求。

这意味着在 selfhosted 下是不会进行身份验证的。