sentry库的源码学习
- 1: sentry的main函数入口
- 2: sentry的Proto定义
- 3: sentry代码
- 4: CA server代码
- 5: csr代码
- 6: certs代码
- 7: certs store代码
- 8: metrics代码
- 9: sentry库的identify实现
- 9.1: identify.go
- 9.2: validator.go
- 9.3: spiff.go
- 9.4: kubernetes下的validator.go
- 9.5: selfhosted下的validator.go
1 - sentry的main函数入口
sentry 模块的入口在文件 cmd/sentry/main.go
中。
准备工作
读取命令行参数
const (
defaultCredentialsPath = "/var/run/dapr/credentials"
// defaultDaprSystemConfigName is the default resource object name for Dapr System Config.
defaultDaprSystemConfigName = "daprsystem"
healthzPort = 8080
)
func main() {
configName := flag.String("config", defaultDaprSystemConfigName, "Path to config file, or name of a configuration object")
credsPath := flag.String("issuer-credentials", defaultCredentialsPath, "Path to the credentials directory holding the issuer data")
flag.StringVar(&credentials.RootCertFilename, "issuer-ca-filename", credentials.RootCertFilename, "Certificate Authority certificate filename")
flag.StringVar(&credentials.IssuerCertFilename, "issuer-certificate-filename", credentials.IssuerCertFilename, "Issuer certificate filename")
flag.StringVar(&credentials.IssuerKeyFilename, "issuer-key-filename", credentials.IssuerKeyFilename, "Issuer private key filename")
trustDomain := flag.String("trust-domain", "localhost", "The CA trust domain")
tokenAudience := flag.String("token-audience", "", "Expected audience for tokens; multiple values can be separated by a comma")
......
}
logger 和 metrics 的参数需要展开:
loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)
metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
metricsExporter.Options().AttachCmdFlags(flag.StringVar, flag.BoolVar)
获取 k8s 的 配置文件路径:
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
// 读取 home 路径
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
// 通过 `--kubeconfig` 传递完整的 kubeconfig 文件路径
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
最后解析一把:
flag.Parse()
设置环境变量
将 kubeconfig 的值设置到 KUBE_CONFIG 环境变量:
var (
KubeConfigVar = "KUBE_CONFIG"
)
if err := utils.SetEnvVariables(map[string]string{
utils.KubeConfigVar: *kubeconfig,
}); err != nil {
log.Fatalf("error set env failed: %s", err.Error())
}
初始化
这行日志标记着初始化正式开始:
log.Infof("starting sentry certificate authority -- version %s -- commit %s", buildinfo.Version(), buildinfo.Commit())
log.Infof("log level set to: %s", loggerOptions.OutputLevel)
初始化metrics
// Initialize dapr metrics exporter
if err := metricsExporter.Init(); err != nil {
log.Fatal(err)
}
初始化监控
if err := monitoring.InitMetrics(); err != nil {
log.Fatal(err)
}
读取配置
// 拼凑文件路径
issuerCertPath := filepath.Join(*credsPath, credentials.IssuerCertFilename) //issuer.crt
issuerKeyPath := filepath.Join(*credsPath, credentials.IssuerKeyFilename) // issuer.key
rootCertPath := filepath.Join(*credsPath, credentials.RootCertFilename) // ca.crt
// 读取 sentry 配置:
config, err := config.FromConfigName(*configName)
if err != nil {
log.Warn(err)
}
// 保存证书相关的各个路径和参数
config.IssuerCertPath = issuerCertPath
config.IssuerKeyPath = issuerKeyPath
config.RootCertPath = rootCertPath
config.TrustDomain = *trustDomain
if *tokenAudience != "" {
config.TokenAudience = tokenAudience
}
启动服务
启动sentry server
ca := sentry.NewSentryCA()
// Start the server in background
err = ca.Start(runCtx, config)
if err != nil {
log.Fatalf("failed to restart sentry server: %s", err)
}
启动 health server
log.Infof("starting watch on filesystem directory: %s", watchDir)
// Start the health server in background
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
if innerErr := healthzServer.Run(runCtx, healthzPort); innerErr != nil {
log.Fatalf("failed to start healthz server: %s", innerErr)
}
}()
监控目录变化
issuerEvent := make(chan struct{})
watchDir := filepath.Dir(config.IssuerCertPath)
// Watch for changes in the watchDir
// This also blocks until runCtx is canceled
fswatcher.Watch(runCtx, watchDir, issuerEvent)
这个函数会一直阻塞直到 runCtx 被取消(这意味着要退出 sentry 进程)。
如果有文件更新,则 issuerEvent 会收到 event,issuerEvent 相关的处理代码:
go func() {
// Restart the server when the issuer credentials change
var restart <-chan time.Time
for {
select {
case <-issuerEvent:
monitoring.IssuerCertChanged()
log.Debug("received issuer credentials changed signal")
// Batch all signals within 2s of each other
if restart == nil {
// issuerEvent 不会被直接处理,而是安排在 2 秒发一个 restart event
// 2秒之内的各种 issuerEvent 都会被这个 restart event 集中处理
restart = time.After(2 * time.Second)
}
case <-restart:
// 收到 restart,意味着 issuerEvent 已经积攒了 2 秒钟,可以统一处理了
log.Warn("issuer credentials changed; reloading")
innerErr := ca.Restart(runCtx, config)
if innerErr != nil {
log.Fatalf("failed to restart sentry server: %s", innerErr)
}
// 重置 restart,恢复原样,以便处理 2 秒之后的后续 issuerEvent
restart = nil
}
}
}()
退出
shutdownDuration := 5 * time.Second
log.Infof("allowing %s for graceful shutdown to complete", shutdownDuration)
<-time.After(shutdownDuration)
总结
去除非核心代码,sentry main 函数的主要功能是启动 sentry 的 ca server, 并监控目录,如果有变化则重启 ca server。
2 - sentry的Proto定义
sentry 模块的 proto 服务定义在文件 dapr/proto/sentry/v1/sentry.proto
中。
service CA {
// A request for a time-bound certificate to be signed.
//
// The requesting side must provide an id for both loosely based
// And strong based identities.
rpc SignCertificate (SignCertificateRequest) returns (SignCertificateResponse) {}
}
SignCertificate() 方法要求签署一个有时间限制的证书。请求方必须提供一个可以同时用于松散型身份和强势型身份的ID。
SignCertificateRequest 的定义:
message SignCertificateRequest {
string id = 1;
string token = 2;
string trust_domain = 3;
string namespace = 4;
// A PEM-encoded x509 CSR.
bytes certificate_signing_request = 5;
}
SignCertificateResponse 的定义:
message SignCertificateResponse {
// A PEM-encoded x509 Certificate.
bytes workload_certificate = 1;
// A list of PEM-encoded x509 Certificates that establish the trust chain
// between the workload certificate and the well-known trust root cert.
repeated bytes trust_chain_certificates = 2;
google.protobuf.Timestamp valid_until = 3;
}
trust_chain_certificates 是一个 PEM 编码的 x509 证书的列表,这些证书在 workload_certificate 和众所周知的信任根证书(trust root cert)之间建立信任链。
3 - sentry代码
sentry 模块的主要实现在文件 pkg/sentry/sentry.go
中。
定义
定义 CA 接口
type CertificateAuthority interface {
Start(context.Context, config.SentryConfig) error
Stop()
Restart(context.Context, config.SentryConfig) error
}
start 和 restart 的函数定义是一样的。
定义 sentry 结构体
type sentry struct {
conf config.SentryConfig // sentry的配置,启动时由 main 函数初始化后传入
ctx context.Context // 启动时由 main 函数初始化后传入
cancel context.CancelFunc
server server.CAServer // CA server
restartLock sync.Mutex // 用于 restart 的锁
running chan bool
stopping chan bool
}
主流程
Sentry.go 被 sentry main.go 调用,主要工作流程就是三个事情:
// 1. 初始化
ca := sentry.NewSentryCA()
// 2. 启动
err = ca.Start(runCtx, config)
// 3. 在需要时重启
innerErr := ca.Restart(runCtx, config)
备注:sentry main.go 没有调用 sentry的 stop(),这个 stop() 只在 restart() 方法中被调用。
初始化 sentry
NewSentryCA() 的实现:
// NewSentryCA returns a new Sentry Certificate Authority instance.
func NewSentryCA() CertificateAuthority {
return &sentry{
running: make(chan bool, 1),
}
}
什么都没干,只是初始化了 running 这个channel。
启动 sentry
// Start the server in background.
func (s *sentry) Start(ctx context.Context, conf config.SentryConfig) error {
// If the server is already running, return an error
select {
case s.running <- true:
default:
return errors.New("CertificateAuthority server is already running")
}
// Create the CA server
s.conf = conf
certAuth, v := s.createCAServer()
// Start the server in background
s.ctx, s.cancel = context.WithCancel(ctx)
go s.run(certAuth, v)
// Wait 100ms to ensure a clean startup
time.Sleep(100 * time.Millisecond)
return nil
}
主要工作就是创建 CA server,然后运行服务。
创建 ca server
createCAServer() 方法加载信任锚和签发者证书,然后创建一个新的CA:
// Loads the trust anchors and issuer certs, then creates a new CA.
func (s *sentry) createCAServer() (ca.CertificateAuthority, identity.Validator) {
// Create CA
certAuth, authorityErr := ca.NewCertificateAuthority(s.conf)
if authorityErr != nil {
log.Fatalf("error getting certificate authority: %s", authorityErr)
}
log.Info("certificate authority loaded")
// Load the trust bundle
trustStoreErr := certAuth.LoadOrStoreTrustBundle()
if trustStoreErr != nil {
log.Fatalf("error loading trust root bundle: %s", trustStoreErr)
}
certExpiry := certAuth.GetCACertBundle().GetIssuerCertExpiry()
if certExpiry == nil {
log.Fatalf("error loading trust root bundle: missing certificate expiry")
} else {
// Need to be in an else block for the linter
log.Infof("trust root bundle loaded. issuer cert expiry: %s", certExpiry.String())
}
monitoring.IssuerCertExpiry(certExpiry)
// Create identity validator
v, validatorErr := s.createValidator()
if validatorErr != nil {
log.Fatalf("error creating validator: %s", validatorErr)
}
log.Info("validator created")
return certAuth, v
}
方法返回 ca.CertificateAuthority 和 identity.Validator 。
创建 identity.Validator
createValidator 的实现细节:
func (s *sentry) createValidator() (identity.Validator, error) {
if config.IsKubernetesHosted() { // 通过 KUBERNETES_SERVICE_HOST 环境变量来判断
// we're in Kubernetes, create client and init a new serviceaccount token validator
kubeClient, err := k8s.GetClient()
if err != nil {
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
// TODO: Remove once the NoDefaultTokenAudience feature is finalized
noDefaultTokenAudience := false
// 创建 kubernetes 的 Validator
return kubernetes.NewValidator(kubeClient, s.conf.GetTokenAudiences(), noDefaultTokenAudience), nil
}
// 创建 selfhosted 的 Validator
return selfhosted.NewValidator(), nil
}
运行 sentry
run 方法运行 CA server,阻塞直到服务器关闭:
// Runs the CA server.
// This method blocks until the server is shut down.
func (s *sentry) run(certAuth ca.CertificateAuthority, v identity.Validator) {
s.server = server.NewCAServer(certAuth, v)
// In background, watch for the root certificate's expiration
go watchCertExpiry(s.ctx, certAuth)
// Watch for context cancelation to stop the server
go func() {
<-s.ctx.Done()
s.server.Shutdown()
close(s.running)
s.running = make(chan bool, 1)
if s.stopping != nil {
close(s.stopping)
}
}()
// Start the server; this is a blocking call
log.Infof("sentry certificate authority is running, protecting y'all")
serverRunErr := s.server.Run(s.conf.Port, certAuth.GetCACertBundle())
if serverRunErr != nil {
log.Fatalf("error starting gRPC server: %s", serverRunErr)
}
}
启动 ca 的 grpc server 以便接收外部请求。
监控证书过期
Run() 方法中启动了一个 goroutine,用于监控证书是否过期。如果快要过期了,则会显示错误信息。
// Watches certificates' expiry and shows an error message when they're nearing expiration time.
// This is a blocking method that should be run in its own goroutine.
func watchCertExpiry(ctx context.Context, certAuth ca.CertificateAuthority) {
log.Debug("starting root certificate expiration watcher")
// time 是每小时触发一次
certExpiryCheckTicker := time.NewTicker(time.Hour)
for {
select {
case <-certExpiryCheckTicker.C:
caCrt := certAuth.GetCACertBundle().GetRootCertPem()
block, _ := pem.Decode(caCrt)
cert, certParseErr := x509.ParseCertificate(block.Bytes)
if certParseErr != nil {
log.Warn("could not determine Dapr root certificate expiration time")
break
}
if cert.NotAfter.Before(time.Now().UTC()) {
// 已经过期则报警
log.Warn("Dapr root certificate expiration warning: certificate has expired.")
break
}
if (cert.NotAfter.Add(-30 * 24 * time.Hour)).Before(time.Now().UTC()) {
// 有效期不足30天也报警
expiryDurationHours := int(cert.NotAfter.Sub(time.Now().UTC()).Hours())
log.Warnf("Dapr root certificate expiration warning: certificate expires in %d days and %d hours", expiryDurationHours/24, expiryDurationHours%24)
} else {
validity := cert.NotAfter.Sub(time.Now().UTC())
log.Debugf("Dapr root certificate is still valid for %s", validity.String())
}
case <-ctx.Done():
log.Debug("terminating root certificate expiration watcher")
certExpiryCheckTicker.Stop()
return
}
}
}
停止 sentry
// Stop the server.
func (s *sentry) Stop() {
log.Info("sentry certificate authority is shutting down")
if s.cancel != nil {
s.stopping = make(chan bool)
s.cancel()
<-s.stopping
s.stopping = nil
}
}
重启 sentry
Restart() 方法重启 sentry:
func (s *sentry) Restart(ctx context.Context, conf config.SentryConfig) error {
s.restartLock.Lock()
defer s.restartLock.Unlock()
log.Info("sentry certificate authority is restarting")
s.Stop()
// Wait 200ms to ensure a clean shutdown
time.Sleep(200 * time.Millisecond)
return s.Start(ctx, conf)
}
步骤:
- 先加锁
- 停止 sentry
- sleep 200 毫秒
- 再启动 sentry
4 - CA server代码
ca server 的实现在文件 pkg/sentry/server/server.go
中。
定义
CAServer 接口
// CAServer is an interface for the Certificate Authority server.
type CAServer interface {
Run(port int, trustBundle ca.TrustRootBundler) error
Shutdown()
}
server 结构体
type server struct {
certificate *tls.Certificate
certAuth ca.CertificateAuthority
srv *grpc.Server // grpc server,用来对外提供 grpc 服务
validator identity.Validator
}
主流程
server.go 被 sentry.go 调用,主要工作流程就是三个事情:
// 1. 初始化CA server
s.server = server.NewCAServer(certAuth, v)
// 2. 运行CA server
s.server.Run(s.conf.Port, certAuth.GetCACertBundle())
// 3. 在需要时关闭CA server
s.server.Shutdown()
初始化CA server
// NewCAServer returns a new CA Server running a gRPC server.
func NewCAServer(ca ca.CertificateAuthority, validator identity.Validator) CAServer {
return &server{
certAuth: ca,
validator: validator,
}
}
保存传递进来的参数,这两个参数在 sentry.go 中初始化。
运行 CA server
CA server 主要提供两个功能:
- 基本的 grpc 服务:以便为客户端提供服务
- 安全:必须为提供的服务进行安全保护,因此客户端必须实用 trust root cert
// Run starts a secured gRPC server for the Sentry Certificate Authority.
// It enforces client side cert validation using the trust root cert.
func (s *server) Run(port int, trustBundler ca.TrustRootBundler) error {
addr := fmt.Sprintf(":%d", port)
lis, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("could not listen on %s: %w", addr, err)
}
tlsOpt := s.tlsServerOption(trustBundler)
// 创建 grpc server
s.srv = grpc.NewServer(tlsOpt)
// 注册 ca server 到 grpc server
sentryv1pb.RegisterCAServer(s.srv, s)
// 启动 grpc server 监听服务地址
if err := s.srv.Serve(lis); err != nil {
return fmt.Errorf("grpc serve error: %w", err)
}
return nil
}
trustBundler 是从 sentry.go 中传递过来,后面详细展开。
关闭 CA server
func (s *server) Shutdown() {
if s.srv != nil {
// 调用 grpc 的 GracefulStop,会在请求完成后再关闭
s.srv.GracefulStop()
}
}
客户端安全
tlsServerOption() 方法,为客户端连接准备 tls 相关的选项:
func (s *server) tlsServerOption(trustBundler ca.TrustRootBundler) grpc.ServerOption {
cp := trustBundler.GetTrustAnchors()
//nolint:gosec
config := &tls.Config{
ClientCAs: cp,
// 这里要求验证客户端证书
// Require cert verification
ClientAuth: tls.RequireAndVerifyClientCert,
GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
if s.certificate == nil || needsRefresh(s.certificate, serverCertExpiryBuffer) {
// 如果ca server的证书为空,或者需要刷新,则开始创建/刷新证书
cert, err := s.getServerCertificate()
if err != nil {
monitoring.ServerCertIssueFailed("server_cert")
log.Error(err)
return nil, fmt.Errorf("failed to get TLS server certificate: %w", err)
}
s.certificate = cert
}
return s.certificate, nil
},
}
return grpc.Creds(credentials.NewTLS(config))
}
needsRefresh() 方法的实现:
func needsRefresh(cert *tls.Certificate, expiryBuffer time.Duration) bool {
leaf := cert.Leaf
if leaf == nil {
return true
}
// Check if the leaf certificate is about to expire.
// 检查是不是快要过期了:15 分钟
return leaf.NotAfter.Add(-serverCertExpiryBuffer).Before(time.Now().UTC())
}
const (
serverCertExpiryBuffer = time.Minute * 15
)
getServerCertificate() 方法负责生成服务器端的证书:
func (s *server) getServerCertificate() (*tls.Certificate, error) {
csrPem, pkPem, err := csr.GenerateCSR("", false)
if err != nil {
return nil, err
}
now := time.Now().UTC()
issuerExp := s.certAuth.GetCACertBundle().GetIssuerCertExpiry()
if issuerExp == nil {
return nil, errors.New("could not find expiration in issuer certificate")
}
serverCertTTL := issuerExp.Sub(now)
resp, err := s.certAuth.SignCSR(csrPem, s.certAuth.GetCACertBundle().GetTrustDomain(), nil, serverCertTTL, false)
if err != nil {
return nil, err
}
certPem := resp.CertPEM
certPem = append(certPem, s.certAuth.GetCACertBundle().GetIssuerCertPem()...)
if rootCertPem := s.certAuth.GetCACertBundle().GetRootCertPem(); len(rootCertPem) > 0 {
certPem = append(certPem, rootCertPem...)
}
cert, err := tls.X509KeyPair(certPem, pkPem)
if err != nil {
return nil, err
}
return &cert, nil
}
更多细节要看 certAuth.SignCSR() 方法的实现。
签署证书
SignCertificate() 方法处理从 dapr sidedar 发起的 CSR 请求。这个方法接收带有 identity 和 初始证书的请求,并为调用者返回包括信任链在内的签名证书和过期时间。
// SignCertificate handles CSR requests originating from Dapr sidecars.
// The method receives a request with an identity and initial cert and returns
// A signed certificate including the trust chain to the caller along with an expiry date.
func (s *server) SignCertificate(ctx context.Context, req *sentryv1pb.SignCertificateRequest) (*sentryv1pb.SignCertificateResponse, error) {
monitoring.CertSignRequestReceived()
csrPem := req.GetCertificateSigningRequest()
// 解析请求中的 CSR
csr, err := certs.ParsePemCSR(csrPem)
if err != nil {
err = fmt.Errorf("cannot parse certificate signing request pem: %w", err)
log.Error(err)
monitoring.CertSignFailed("cert_parse")
return nil, err
}
// 验证 CSR
err = s.certAuth.ValidateCSR(csr)
if err != nil {
err = fmt.Errorf("error validating csr: %w", err)
log.Error(err)
monitoring.CertSignFailed("cert_validation")
return nil, err
}
// 验证请求身份
err = s.validator.Validate(req.GetId(), req.GetToken(), req.GetNamespace())
if err != nil {
err = fmt.Errorf("error validating requester identity: %w", err)
log.Error(err)
monitoring.CertSignFailed("req_id_validation")
return nil, err
}
// 签名证书
identity := identity.NewBundle(csr.Subject.CommonName, req.GetNamespace(), req.GetTrustDomain())
signed, err := s.certAuth.SignCSR(csrPem, csr.Subject.CommonName, identity, -1, false)
if err != nil {
err = fmt.Errorf("error signing csr: %w", err)
log.Error(err)
monitoring.CertSignFailed("cert_sign")
return nil, err
}
// 准备要返回的各种数据
certPem := signed.CertPEM
issuerCert := s.certAuth.GetCACertBundle().GetIssuerCertPem()
rootCert := s.certAuth.GetCACertBundle().GetRootCertPem()
certPem = append(certPem, issuerCert...)
if len(rootCert) > 0 {
certPem = append(certPem, rootCert...)
}
if len(certPem) == 0 {
err = errors.New("insufficient data in certificate signing request, no certs signed")
log.Error(err)
monitoring.CertSignFailed("insufficient_data")
return nil, err
}
expiry := timestamppb.New(signed.Certificate.NotAfter)
if err = expiry.CheckValid(); err != nil {
return nil, fmt.Errorf("could not validate certificate validity: %w", err)
}
// 组装 response 结构体
resp := &sentryv1pb.SignCertificateResponse{
WorkloadCertificate: certPem,
TrustChainCertificates: [][]byte{issuerCert, rootCert},
ValidUntil: expiry,
}
monitoring.CertSignSucceed()
return resp, nil
}
总结
实现很简单,就是涉及到证书的各种操作,需要有相关的背景知识。
5 - csr代码
csr 相关的逻辑实现在文件 pkg/sentry/csr/csr.go
中。
准备工作
常量定义
const (
blockTypeECPrivateKey = "EC PRIVATE KEY" // EC private key
blockTypePrivateKey = "PRIVATE KEY" // PKCS#8 private key
encodeMsgCSR = "CERTIFICATE REQUEST"
encodeMsgCert = "CERTIFICATE"
)
全局变量定义
// The OID for the SAN extension (http://www.alvestrand.no/objectid/2.5.29.17.html)
var oidSubjectAlternativeName = asn1.ObjectIdentifier{2, 5, 29, 17}
实现
生成CSR
GenerateCSR() f方法创建 X.509 certificate sign request 和私钥:
// GenerateCSR creates a X.509 certificate sign request and private key.
func GenerateCSR(org string, pkcs8 bool) ([]byte, []byte, error) {
// 生成 ec 私钥
key, err := certs.GenerateECPrivateKey()
if err != nil {
return nil, nil, fmt.Errorf("unable to generate private keys: %w", err)
}
// 生成 csr 模版
templ, err := genCSRTemplate(org)
if err != nil {
return nil, nil, fmt.Errorf("error generating csr template: %w", err)
}
// 创建证书请求
csrBytes, err := x509.CreateCertificateRequest(rand.Reader, templ, key)
if err != nil {
return nil, nil, fmt.Errorf("failed to create CSR: %w", err)
}
// 编码证书
crtPem, keyPem, err := encode(true, csrBytes, key, pkcs8)
return crtPem, keyPem, err
}
生成 csr 模版的实现,只设置了Organization :
func genCSRTemplate(org string) (*x509.CertificateRequest, error) {
return &x509.CertificateRequest{
Subject: pkix.Name{
Organization: []string{org},
},
}, nil
}
编码证书的实现代码:
func encode(csr bool, csrOrCert []byte, privKey *ecdsa.PrivateKey, pkcs8 bool) ([]byte, []byte, error) {
// 判断是 "CERTIFICATE" 还是 "CERTIFICATE REQUEST"
encodeMsg := encodeMsgCert
if csr {
encodeMsg = encodeMsgCSR
}
// 执行编码
csrOrCertPem := pem.EncodeToMemory(&pem.Block{Type: encodeMsg, Bytes: csrOrCert})
var encodedKey, privPem []byte
var err error
if pkcs8 {
// 如果是 pkcs8,需要将私钥编码为 PKCS8 私钥 / "PRIVATE KEY"
if encodedKey, err = x509.MarshalPKCS8PrivateKey(privKey); err != nil {
return nil, nil, err
}
// 将上面的 PKCS8 私钥编码到内存
privPem = pem.EncodeToMemory(&pem.Block{Type: blockTypePrivateKey, Bytes: encodedKey})
} else {
// 不是 pkcs8 的话,需要将私钥编码为 EC 私钥 / "EC PRIVATE KEY"
encodedKey, err = x509.MarshalECPrivateKey(privKey)
if err != nil {
return nil, nil, err
}
privPem = pem.EncodeToMemory(&pem.Block{Type: blockTypeECPrivateKey, Bytes: encodedKey})
}
return csrOrCertPem, privPem, nil
}
生成基础证书
generateBaseCert() 方法返回一个基本的非CA证书,该证书可以通过添加 subject、key usage 和附加属性成为一个工作负载或CA证书:
// generateBaseCert returns a base non-CA cert that can be made a workload or CA cert
// By adding subjects, key usage and additional proerties.
func generateBaseCert(ttl, skew time.Duration, publicKey interface{}) (*x509.Certificate, error) {
// 创建一个新的序列号
serNum, err := newSerialNumber()
if err != nil {
return nil, err
}
now := time.Now().UTC()
// Allow for clock skew with the NotBefore validity bound.
// 允许在 NotBefore 有效期内出现时钟偏移。
notBefore := now.Add(-1 * skew)
notAfter := now.Add(ttl)
// 创建并返回 x509 证书
return &x509.Certificate{
SerialNumber: serNum,
NotBefore: notBefore,
NotAfter: notAfter,
PublicKey: publicKey,
}, nil
}
创建一个新的序列号的代码实现细节:
func newSerialNumber() (*big.Int, error) {
// 序列号的最大值,1 << 128
serialNumLimit := new(big.Int).Lsh(big.NewInt(1), 128)
// 在这个区间内取随机数
serialNum, err := rand.Int(rand.Reader, serialNumLimit)
if err != nil {
return nil, fmt.Errorf("error generating serial number: %w", err)
}
return serialNum, nil
}
生成基础证书的第一步就是生成其他证书。
生成 Root Cert CSR
GenerateRootCertCSR() 方法返回 CA root cert x509 证书:
// GenerateRootCertCSR returns a CA root cert x509 Certificate.
func GenerateRootCertCSR(org, cn string, publicKey interface{}, ttl, skew time.Duration) (*x509.Certificate, error) {
// 先生成基本证书
cert, err := generateBaseCert(ttl, skew, publicKey)
if err != nil {
return nil, err
}
// 设置证书的参数
cert.KeyUsage = x509.KeyUsageCertSign
cert.ExtKeyUsage = append(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth)
cert.Subject = pkix.Name{
CommonName: cn,
Organization: []string{org},
}
cert.DNSNames = []string{cn}
cert.IsCA = true
cert.BasicConstraintsValid = true
cert.SignatureAlgorithm = x509.ECDSAWithSHA256
return cert, nil
}
生成 CSR Certificate
GenerateCSRCertificate() 方法 返回 x509 Certificate,输入为 CSR / 签名证书 / 公钥 / 签名私钥 和持续时间:
// GenerateCSRCertificate returns an x509 Certificate from a CSR, signing cert, public key, signing private key and duration.
func GenerateCSRCertificate(csr *x509.CertificateRequest, subject string, identityBundle *identity.Bundle, signingCert *x509.Certificate, publicKey interface{}, signingKey crypto.PrivateKey,
ttl, skew time.Duration, isCA bool,
) ([]byte, error) {
// 先生成基本证书
cert, err := generateBaseCert(ttl, skew, publicKey)
if err != nil {
return nil, fmt.Errorf("error generating csr certificate: %w", err)
}
if isCA {
cert.KeyUsage = x509.KeyUsageCertSign | x509.KeyUsageCRLSign
} else {
cert.KeyUsage = x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment
cert.ExtKeyUsage = append(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageClientAuth)
}
if subject == "cluster.local" {
cert.Subject = pkix.Name{
CommonName: subject,
}
cert.DNSNames = []string{subject}
}
cert.Issuer = signingCert.Issuer
cert.IsCA = isCA
cert.IPAddresses = csr.IPAddresses
cert.Extensions = csr.Extensions
cert.BasicConstraintsValid = true
cert.SignatureAlgorithm = csr.SignatureAlgorithm
if identityBundle != nil {
spiffeID, err := identity.CreateSPIFFEID(identityBundle.TrustDomain, identityBundle.Namespace, identityBundle.ID)
if err != nil {
return nil, fmt.Errorf("error generating spiffe id: %w", err)
}
rv := []asn1.RawValue{
{
Bytes: []byte(spiffeID),
Class: asn1.ClassContextSpecific,
Tag: asn1.TagOID,
},
{
Bytes: []byte(fmt.Sprintf("%s.%s.svc.cluster.local", subject, identityBundle.Namespace)),
Class: asn1.ClassContextSpecific,
Tag: 2,
},
}
b, err := asn1.Marshal(rv)
if err != nil {
return nil, fmt.Errorf("failed to marshal asn1 raw value for spiffe id: %w", err)
}
cert.ExtraExtensions = append(cert.ExtraExtensions, pkix.Extension{
Id: oidSubjectAlternativeName,
Value: b,
Critical: true, // According to x509 and SPIFFE specs, a SubjAltName extension must be critical if subject name and DNS are not present.
})
}
return x509.CreateCertificate(rand.Reader, cert, signingCert, publicKey, signingKey)
}
这里涉及很多 x509 相关的领域知识。
6 - certs代码
certs 相关的逻辑实现在文件 pkg/sentry/certs/certs.go
中。
准备工作
常量定义
const (
BlockTypeCertificate = "CERTIFICATE"
BlockTypeECPrivateKey = "EC PRIVATE KEY"
BlockTypePKCS1PrivateKey = "RSA PRIVATE KEY"
BlockTypePKCS8PrivateKey = "PRIVATE KEY"
)
备注:这里的常量定义和 csr.go 中的有部分重复。
结构体定义
Credentials 结构体包含一个证书和一个 私钥:
// Credentials holds a certificate and private key.
type Credentials struct {
PrivateKey crypto.PrivateKey
Certificate *x509.Certificate
}
实现
解码 PEM key
DecodePEMKey() 接收一个 PEM key 字节数组并返回一个代表 RSA 或 EC 私钥 的对象:
func DecodePEMKey(key []byte) (crypto.PrivateKey, error) {
// 解码 pem key
block, _ := pem.Decode(key)
if block == nil {
return nil, errors.New("key is not PEM encoded")
}
// 按照类型进行后续解析处理
switch block.Type {
case BlockTypeECPrivateKey:
// EC Private Key
return x509.ParseECPrivateKey(block.Bytes)
case BlockTypePKCS1PrivateKey:
// PKCS1 Private Key
return x509.ParsePKCS1PrivateKey(block.Bytes)
case BlockTypePKCS8PrivateKey:
// PKCS8 Private Key
return x509.ParsePKCS8PrivateKey(block.Bytes)
default:
return nil, fmt.Errorf("unsupported block type %s", block.Type)
}
}
解码 PEM 证书
DecodePEMCertificates() 方法接收一个 PEM 编码的 x509 证书字节数组,并以 x509.Certificate 对象片断的方式返回所有证书:
func DecodePEMCertificates(crtb []byte) ([]*x509.Certificate, error) {
certs := []*x509.Certificate{}
// crtb 数组可能包含多个证书
for len(crtb) > 0 {
var err error
var cert *x509.Certificate
// 解码单个 pem 证书
cert, crtb, err = decodeCertificatePEM(crtb)
if err != nil {
return nil, err
}
if cert != nil {
// it's a cert, add to pool
certs = append(certs, cert)
}
}
return certs, nil
}
decodeCertificatePEM() 方法解码单个 pem 证书:
func decodeCertificatePEM(crtb []byte) (*x509.Certificate, []byte, error) {
// 执行pem 解码
// pem.Decode() 方法将在输入中找到下一个 PEM 格式的块(证书,私钥 等)的输入。
// 它返回该块和输入的其余部分。
// 注意是返回剩余部分,当没有更多部分时,返回的长度为0
// 如果没有找到PEM数据,则返回 block 为nil,其余部分返回整个输入。
block, crtb := pem.Decode(crtb)
if block == nil {
return nil, crtb, errors.New("invalid PEM certificate")
}
if block.Type != BlockTypeCertificate {
return nil, nil, nil
}
// 解码 x509 证书
c, err := x509.ParseCertificate(block.Bytes)
return c, crtb, err
}
生成基础证书的第一步就是生成其他证书。
从文件中获取 PEM 凭证
PEMCredentialsFromFiles() 方法接收一个密钥/证书对的路径,并返回一个经过验证的Credentials包装器:
func PEMCredentialsFromFiles(certPem, keyPem []byte) (*Credentials, error) {
// 解码 PEM key
pk, err := DecodePEMKey(keyPem)
if err != nil {
return nil, err
}
// 解码 PEM 证书
// 如果有多个证书,实际后续只使用多个证书中的第一个
crts, err := DecodePEMCertificates(certPem)
if err != nil {
return nil, err
}
if len(crts) == 0 {
return nil, errors.New("no certificates found")
}
// 检查私钥和证书的 PublicKey 是否匹配
match := matchCertificateAndKey(pk, crts[0])
if !match {
return nil, errors.New("error validating credentials: public and private key pair do not match")
}
// 构建 Credentials 结构体并返回
creds := &Credentials{
PrivateKey: pk,
Certificate: crts[0],
}
return creds, nil
}
matchCertificateAndKey() 方法检查私钥和证书的 PublicKey 是否匹配 :
func matchCertificateAndKey(pk any, cert *x509.Certificate) bool {
// 根据私钥的类型进行匹配
// 实际是根据私钥类型的不同,获取到 cert 相应的 PublicKey,然后和私钥的 PublicKey 对比看是否相同
switch key := pk.(type) {
case *ecdsa.PrivateKey:
// ecdsa PrivateKey
if cert.PublicKeyAlgorithm != x509.ECDSA {
return false
}
pub, ok := cert.PublicKey.(*ecdsa.PublicKey)
return ok && pub.Equal(key.Public())
case *rsa.PrivateKey:
// rsa PrivateKey
if cert.PublicKeyAlgorithm != x509.RSA {
return false
}
pub, ok := cert.PublicKey.(*rsa.PublicKey)
return ok && pub.Equal(key.Public())
case ed25519.PrivateKey:
// ed25519 Private Key
if cert.PublicKeyAlgorithm != x509.Ed25519 {
return false
}
pub, ok := cert.PublicKey.(ed25519.PublicKey)
return ok && pub.Equal(key.Public())
default:
return false
}
}
从 PEM 创建 cert pool
CertPoolFromPEM() 方法从一个 PEM 编码的证书字符串返回一个 CertPool
func CertPoolFromPEM(certPem []byte) (*x509.CertPool, error) {
// 解码 PEM 证书
certs, err := DecodePEMCertificates(certPem)
if err != nil {
return nil, err
}
if len(certs) == 0 {
return nil, errors.New("no certificates found")
}
// 从多个证书中创建 cert pool
return certPoolFromCertificates(certs), nil
}
certPoolFromCertificates() 方法的实现很简单:
func certPoolFromCertificates(certs []*x509.Certificate) *x509.CertPool {
// 创建 cert pool
pool := x509.NewCertPool()
for _, c := range certs {
// 将每个证书添加到 pool
pool.AddCert(c)
}
return pool
}
解析 PRM CSR
ParsePemCSR() 使用给定的 PEM 编码的证书签名请求构建一个 x509 证书请求:
func ParsePemCSR(csrPem []byte) (*x509.CertificateRequest, error) {
// pem 解码
block, _ := pem.Decode(csrPem)
if block == nil {
return nil, errors.New("certificate signing request is not properly encoded")
}
// 尝试 x509 解码证书请求
csr, err := x509.ParseCertificateRequest(block.Bytes)
if err != nil {
return nil, fmt.Errorf("failed to parse X.509 certificate signing request: %w", err)
}
return csr, nil
}
生成 ECP 私钥
GenerateECPrivateKey() 方法返回一个新的 ECP 私钥:
func GenerateECPrivateKey() (*ecdsa.PrivateKey, error) {
return ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
}
这里涉及很多 x509 相关的领域知识。
7 - certs store代码
certs 相关的存储逻辑实现在文件 pkg/sentry/certs/store.go
中。
准备工作
常量定义
const (
defaultSecretNamespace = "default"
)
实现
存储凭证
StoreCredentials() 方法将 trust bundle 存储在 Kubernetes secret store 或者本地磁盘上,取决于托管的平台:
func StoreCredentials(ctx context.Context, conf config.SentryConfig, rootCertPem, issuerCertPem, issuerKeyPem []byte) error {
if config.IsKubernetesHosted() {
// 如果是 k8s 托管来
return storeKubernetes(ctx, rootCertPem, issuerCertPem, issuerKeyPem)
}
// 否则是自托管
return storeSelfhosted(rootCertPem, issuerCertPem, issuerKeyPem, conf.RootCertPath, conf.IssuerCertPath, conf.IssuerKeyPath)
}
在 Kubernetes 中的存储
storeKubernetes() 方法将凭证存储在 Kubernetes secret store 中:
// 部分常量定于在 consts.go 中
const (
TrustBundleK8sSecretName = "dapr-trust-bundle" /* #nosec */
)
func storeKubernetes(ctx context.Context, rootCertPem, issuerCertPem, issuerCertKey []byte) error {
// 准备 k8s client
kubeClient, err := kubernetes.GetClient()
if err != nil {
return err
}
// 获取 namespace
namespace := getNamespace()
// 调用 k8s API 的方法获取 secret
secret, err := kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), consts.TrustBundleK8sSecretName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return fmt.Errorf("failed to get secret %w", err)
}
// 将 rootCertPem / issuerCertPem / issuerCertKey 保存到 secret 的 Data 中
secret.Data = map[string][]byte{
credentials.RootCertFilename: rootCertPem,
credentials.IssuerCertFilename: issuerCertPem,
credentials.IssuerKeyFilename: issuerCertKey,
}
// 更新保存 secret
// We update and not create because sentry expects a secret to already exist
_, err = kubeClient.CoreV1().Secrets(namespace).Update(ctx, secret, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed saving secret to kubernetes: %w", err)
}
return nil
}
其中 getNamespace() 读取环境变量 “NAMESPACE” 来获知当前的命名空间,缺省值为 “default”:
const (
defaultSecretNamespace = "default"
)
func getNamespace() string {
namespace := os.Getenv("NAMESPACE")
if namespace == "" {
namespace = defaultSecretNamespace
}
return namespace
}
自托管时的存储
storeSelfhosted() 方法将凭证存储在本地文件中:
func StoreCredentials(...) {
......
return storeSelfhosted(rootCertPem, issuerCertPem, issuerKeyPem, conf.RootCertPath, conf.IssuerCertPath, conf.IssuerKeyPath)
}
func storeSelfhosted(rootCertPem, issuerCertPem, issuerKeyPem []byte, rootCertPath, issuerCertPath, issuerKeyPath string) error {
// 分别将三个内容保存到三个文件中
err := os.WriteFile(rootCertPath, rootCertPem, 0o644)
if err != nil {
return fmt.Errorf("failed saving file to %s: %w", rootCertPath, err)
}
err = os.WriteFile(issuerCertPath, issuerCertPem, 0o644)
if err != nil {
return fmt.Errorf("failed saving file to %s: %w", issuerCertPath, err)
}
err = os.WriteFile(issuerKeyPath, issuerKeyPem, 0o644)
if err != nil {
return fmt.Errorf("failed saving file to %s: %w", issuerKeyPath, err)
}
return nil
}
rootCertPem / issuerCertPem / issuerKeyPem 分别保存到 conf.RootCertPath / conf.IssuerCertPath / conf.IssuerKeyPath 这三个 sentry 配置指定的文件路径中。
回顾一下 main.go 中读取相关配置的代码实现:
const (
defaultCredentialsPath = "/var/run/dapr/credentials"
)
var (
// RootCertFilename is the filename that holds the root certificate.
RootCertFilename = "ca.crt"
// IssuerCertFilename is the filename that holds the issuer certificate.
IssuerCertFilename = "issuer.crt"
// IssuerKeyFilename is the filename that holds the issuer key.
IssuerKeyFilename = "issuer.key"
)
func main() {
......
credsPath := flag.String("issuer-credentials", defaultCredentialsPath, "Path to the credentials directory holding the issuer data")
flag.StringVar(&credentials.RootCertFilename, "issuer-ca-filename", credentials.RootCertFilename, "Certificate Authority certificate filename")
flag.StringVar(&credentials.IssuerCertFilename, "issuer-certificate-filename", credentials.IssuerCertFilename, "Issuer certificate filename")
flag.StringVar(&credentials.IssuerKeyFilename, "issuer-key-filename", credentials.IssuerKeyFilename, "Issuer private key filename")
issuerCertPath := filepath.Join(*credsPath, credentials.IssuerCertFilename)
issuerKeyPath := filepath.Join(*credsPath, credentials.IssuerKeyFilename)
rootCertPath := filepath.Join(*credsPath, credentials.RootCertFilename)
......
config.IssuerCertPath = issuerCertPath
config.IssuerKeyPath = issuerKeyPath
config.RootCertPath = rootCertPath
......
}
可见默认是使用 “/var/run/dapr/credentials” 目录下的这三个文件:
- “ca.crt”
- “issuer.crt”
- “issuer.key”
8 - metrics代码
metrics 相关的实现在文件 pkg/sentry/monitoring/metrics.go
中。
准备工作
变量定义
定义了一些和 metrics 相关的变量:
var (
// Metrics definitions.
csrReceivedTotal = stats.Int64(
"sentry/cert/sign/request_received_total",
"The number of CSRs received.",
stats.UnitDimensionless)
certSignSuccessTotal = stats.Int64(
"sentry/cert/sign/success_total",
"The number of certificates issuances that have succeeded.",
stats.UnitDimensionless)
certSignFailedTotal = stats.Int64(
"sentry/cert/sign/failure_total",
"The number of errors occurred when signing the CSR.",
stats.UnitDimensionless)
serverTLSCertIssueFailedTotal = stats.Int64(
"sentry/servercert/issue_failed_total",
"The number of server TLS certificate issuance failures.",
stats.UnitDimensionless)
issuerCertChangedTotal = stats.Int64(
"sentry/issuercert/changed_total",
"The number of issuer cert updates, when issuer cert or key is changed",
stats.UnitDimensionless)
issuerCertExpiryTimestamp = stats.Int64(
"sentry/issuercert/expiry_timestamp",
"The unix timestamp, in seconds, when issuer/root cert will expire.",
stats.UnitDimensionless)
// Metrics Tags.
failedReasonKey = tag.MustNewKey("reason")
noKeys = []tag.Key{}
)
目前总共有 6 个 metrics 指标:
- csrReceivedTotal:接收到的 csr 的数量
- certSignSuccessTotal:签署成功的证书数量
- certSignFailedTotal:签署失败的证书数量
- serverTLSCertIssueFailedTotal:服务器TLS证书发放失败的次数。
- issuerCertChangedTotal: 当签发人的证书或钥匙被改变时,签发人证书更新的数量
- issuerCertExpiryTimestamp:发行人/根证书有效期的unix时间戳,单位是秒。
初始化
初始化 metrics:
func InitMetrics() error {
// 将 6 个 metrics 指标都注册起来
return view.Register(
diagUtils.NewMeasureView(csrReceivedTotal, noKeys, view.Count()),
diagUtils.NewMeasureView(certSignSuccessTotal, noKeys, view.Count()),
diagUtils.NewMeasureView(certSignFailedTotal, []tag.Key{failedReasonKey}, view.Count()),
diagUtils.NewMeasureView(serverTLSCertIssueFailedTotal, []tag.Key{failedReasonKey}, view.Count()),
diagUtils.NewMeasureView(issuerCertChangedTotal, noKeys, view.Count()),
diagUtils.NewMeasureView(issuerCertExpiryTimestamp, noKeys, view.LastValue()),
)
}
收集 metrics
crs 相关
CertSignRequestReceived() 对接收到的 csr 数量进行计数:
// CertSignRequestReceived counts when CSR received.
func CertSignRequestReceived() {
stats.Record(context.Background(), csrReceivedTotal.M(1))
}
另外 CertSignSucceed() 会对处理成功的情况进行计数:
func CertSignSucceed() {
stats.Record(context.Background(), certSignSuccessTotal.M(1))
}
而 CertSignFailed() 则会对处理失败的情况进行计数:
func CertSignFailed(reason string) {
stats.RecordWithTags(
context.Background(),
diagUtils.WithTags(certSignFailedTotal.Name(), failedReasonKey, reason),
certSignFailedTotal.M(1))
}
三者的调用点为 server.go 中的 SignCertificate() 函数,这个函数负责处理 csr 请求:
func (s *server) SignCertificate(ctx context.Context, req *sentryv1pb.SignCertificateRequest) (*sentryv1pb.SignCertificateResponse, error) {
// 进来就计数:这是 接收到的 csr 数量
monitoring.CertSignRequestReceived()
......
// 每一个错误在return之前都要进行一次失败计数
if err != nil {
monitoring.CertSignFailed("cert_parse")
return nil, err
}
......
// 如果最后 csr 处理成功,则进行成功计数
monitoring.CertSignSucceed()
return resp, nil
}
证书有效期
IssuerCertExpiry() 方法记录 root cert 有效期的情况:
// IssuerCertExpiry records root cert expiry.
func IssuerCertExpiry(expiry *time.Time) {
stats.Record(context.Background(), issuerCertExpiryTimestamp.M(expiry.Unix()))
}
调用点在 sentry.go 中的 createCAServer() 函数中:
func (s *sentry) createCAServer(ctx context.Context) (ca.CertificateAuthority, identity.Validator) {
certAuth, authorityErr := ca.NewCertificateAuthority(s.conf)
trustStoreErr := certAuth.LoadOrStoreTrustBundle(ctx)
......
certExpiry := certAuth.GetCACertBundle().GetIssuerCertExpiry()
monitoring.IssuerCertExpiry(certExpiry)
......
return certAuth, v
}
在 CA server 的创建过程中,会加载 trust bundle并检查证书的有效期,在这里记录有效期的数据收集。
服务器证书签发失败
ServerCertIssueFailed() 记录服务器证书签发失败。
func ServerCertIssueFailed(reason string) {
stats.Record(context.Background(), serverTLSCertIssueFailedTotal.M(1))
}
调用点在 server.go 中:
func (s *server) Run(ctx context.Context, port int, trustBundler ca.TrustRootBundler) error {
......
tlsOpt := s.tlsServerOption(trustBundler)
s.srv = grpc.NewServer(tlsOpt)
......
}
sentry server启动过程中,在启动 grpc server 时,需要获取 tls server 的参数,期间要获取 sentry server 的服务器端证书:
func (s *server) tlsServerOption(trustBundler ca.TrustRootBundler) grpc.ServerOption {
cp := trustBundler.GetTrustAnchors()
config := &tls.Config{
ClientCAs: cp,
// Require cert verification
ClientAuth: tls.RequireAndVerifyClientCert,
GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
if s.certificate == nil || needsRefresh(s.certificate, serverCertExpiryBuffer) {
cert, err := s.getServerCertificate()
if err != nil {
monitoring.ServerCertIssueFailed("server_cert")
log.Error(err)
return nil, fmt.Errorf("failed to get TLS server certificate: %w", err)
}
s.certificate = cert
}
......
}
如果获取失败,则会记录这个失败信息。
发行者证书变更
IssuerCertChanged() 记录发行人凭证的变更:
func IssuerCertChanged() {
stats.Record(context.Background(), issuerCertChangedTotal.M(1))
}
调用点在 main.go 中的 main() 函数中,sentry 在启动后会监视发行者证书(默认为 “/var/run/dapr/credentials” 下的 “issuer.crt” 文件):
func main() {
......
func(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case <-issuerEvent:
monitoring.IssuerCertChanged()
log.Debug("received issuer credentials changed signal")
......
}
......
// Watch for changes in the watchDir
mngr.Add(func(ctx context.Context) error {
log.Infof("starting watch on filesystem directory: %s", watchDir)
return fswatcher.Watch(ctx, watchDir, issuerEvent)
})
}
9 - sentry库的identify实现
9.1 - identify.go
结构体定义
// Bundle 包含了足以以识别一个跨信任域和命名空间的工作负载的所有的元素:
type Bundle struct {
ID string
Namespace string
TrustDomain string
}
其实就三个元素: ID / Namespace 以及 TrustDomain
NewBundle() 方法
NewBundle() 方法返回一个新的 identity Bundle。
func NewBundle(id, namespace, trustDomain string) *Bundle {
// Empty namespace and trust domain result in an empty bundle
// 如果 namespace 或者 trust domain 为空,则返回空的 bundle(nil)
if namespace == "" || trustDomain == "" {
return nil
}
// 否则指示简单的赋值三个属性
return &Bundle{
ID: id,
Namespace: namespace,
TrustDomain: trustDomain,
}
}
namespace和trustDomain是可选参数。当为空时,将返回一个 nil 值。
9.2 - validator.go
接口定义
Validator 通过使用 ID 和 token 来验证证书请求的身份
type Validator interface {
Validate(id, token, namespace string) error
}
9.3 - spiff.go
CreateSPIFFEID 方法
CreateSPIFFEID() 方法从给定的 trustDomain, namespace, appID 创建符合 SPIFFE 标准的唯一ID:
func CreateSPIFFEID(trustDomain, namespace, appID string) (string, error) {
// trustDomain, namespace, appID 三者都不能为空
if trustDomain == "" {
return "", errors.New("can't create spiffe id: trust domain is empty")
}
if namespace == "" {
return "", errors.New("can't create spiffe id: namespace is empty")
}
if appID == "" {
return "", errors.New("can't create spiffe id: app id is empty")
}
// 根据 SPIFFE 规范进行验证
// Validate according to the SPIFFE spec
if strings.ContainsRune(trustDomain, ':') {
// trustDomain不能带":"
return "", errors.New("trust domain cannot contain the ':' character")
}
// trustDomain 的长度不能大于255个 byte
if len([]byte(trustDomain)) > 255 {
return "", errors.New("trust domain cannot exceed 255 bytes")
}
// 拼接出 SPIFFE ID
id := fmt.Sprintf("spiffe://%s/ns/%s/%s", trustDomain, namespace, appID)
if len([]byte(id)) > 2048 {
// 验证 SPIFFE ID 长度不大于 2048
return "", errors.New("spiffe id cannot exceed 2048 bytes")
}
return id, nil
}
9.4 - kubernetes下的validator.go
准备工作
结构体定义
validator 结构体定义:
type validator struct {
client k8s.Interface
auth kauth.AuthenticationV1Interface
audiences []string
}
创建validator的方法
NewValidator() 方法创建新的 validator 结构体:
func NewValidator(client k8s.Interface, audiences []string) identity.Validator {
return &validator{
client: client,
auth: client.AuthenticationV1(),
audiences: audiences,
}
}
实现
Validate() 实现通过使用 ID 和 token 来验证证书请求的身份:
func (v *validator) Validate(id, token, namespace string) error {
// id 和 token 不能为空
if id == "" {
return fmt.Errorf("%s: id field in request must not be empty", errPrefix)
}
if token == "" {
return fmt.Errorf("%s: token field in request must not be empty", errPrefix)
}
// TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
var canTryWithNilAudience, showDefaultTokenAudienceWarning bool
audiences := v.audiences
if len(audiences) == 0 {
// 处理用户没有显式设置 audience 的特殊情况
// 此时采用默认是 sentryConsts.ServiceAccountTokenAudience "dapr.io/sentry"
audiences = []string{sentryConsts.ServiceAccountTokenAudience}
// TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
// Because the user did not specify an explicit audience and is instead relying on the default, if the authentication fails we can retry with nil audience
// 并记录下来这是特殊情况,如果认证失败则应该尝试 audience 为 nil 的情况
canTryWithNilAudience = true
}
tokenReview := &kauthapi.TokenReview{
Spec: kauthapi.TokenReviewSpec{
Token: token,
Audiences: audiences,
},
}
tr: // TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
prts, err := v.executeTokenReview(tokenReview)
if err != nil {
// TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
if canTryWithNilAudience {
// Retry with a nil audience, which means the default audience for the K8s API server
tokenReview.Spec.Audiences = nil
showDefaultTokenAudienceWarning = true
canTryWithNilAudience = false
goto tr
}
return err
}
// TODO: Remove in Dapr 1.12 to enforce setting an explicit audience
if showDefaultTokenAudienceWarning {
log.Warn("WARNING: Sentry accepted a token with the audience for the Kubernetes API server. This is deprecated and only supported to ensure a smooth upgrade from Dapr pre-1.10.")
}
if len(prts) != 4 || prts[0] != "system" {
return fmt.Errorf("%s: provided token is not a properly structured service account token", errPrefix)
}
podSa := prts[3]
podNs := prts[2]
// 检验 namespace
if namespace != "" {
if podNs != namespace {
return fmt.Errorf("%s: namespace mismatch. received namespace: %s", errPrefix, namespace)
}
}
// 检验 id
if id != podNs+":"+podSa {
return fmt.Errorf("%s: token/id mismatch. received id: %s", errPrefix, id)
}
return nil
}
executeTokenReview() 方法执行 tokenReview,如果 token 无效或者失败则返回错误:
func (v *validator) executeTokenReview(tokenReview *kauthapi.TokenReview) ([]string, error) {
review, err := v.auth.TokenReviews().Create(context.TODO(), tokenReview, v1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("%s: token review failed: %w", errPrefix, err)
}
if review.Status.Error != "" {
return nil, fmt.Errorf("%s: invalid token: %s", errPrefix, review.Status.Error)
}
if !review.Status.Authenticated {
return nil, fmt.Errorf("%s: authentication failed", errPrefix)
}
return strings.Split(review.Status.User.Username, ":"), nil
}
9.5 - selfhosted下的validator.go
selfhosted 下实际没有做验证:
func NewValidator() identity.Validator {
return &validator{}
}
type validator struct{}
func (v *validator) Validate(id, token, namespace string) error {
// no validation for self hosted.
return nil
}
只是保留了一套代码框架,以满足 Validator 接口的要求。
这意味着在 selfhosted 下是不会进行身份验证的。