injector.go的源码学习
Dapr Injector 中的 injector.go 的 代码
主流程代码
接口和结构体定义和创建
Injector 是Dapr运行时 sidecar 注入组件的接口。
// Injector is the interface for the Dapr runtime sidecar injection component
type Injector interface {
Run(ctx context.Context)
}
injector 结构体定义:
type injector struct {
config Config
deserializer runtime.Decoder
server *http.Server
kubeClient *kubernetes.Clientset
daprClient scheme.Interface
authUIDs []string
}
创建新的 injector 结构体(这个方法在injecot的main方法中被调用):
// NewInjector returns a new Injector instance with the given config
func NewInjector(authUIDs []string, config Config, daprClient scheme.Interface, kubeClient *kubernetes.Clientset) Injector {
mux := http.NewServeMux()
i := &injector{
config: config,
deserializer: serializer.NewCodecFactory(
runtime.NewScheme(),
).UniversalDeserializer(),
// 启动http server
server: &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
},
kubeClient: kubeClient,
daprClient: daprClient,
authUIDs: authUIDs,
}
// 给 k8s 调用的 mutate 端点
mux.HandleFunc("/mutate", i.handleRequest)
return i
}
Run()方法
最核心的run方法,
func (i *injector) Run(ctx context.Context) {
doneCh := make(chan struct{})
// 启动go routing,监听 ctx 和 doneCh 的信号
go func() {
select {
case <-ctx.Done():
log.Info("Sidecar injector is shutting down")
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Second*5,
)
defer cancel()
i.server.Shutdown(shutdownCtx) // nolint: errcheck
case <-doneCh:
}
}()
// 打印启动时的日志,这行日志可以通过
log.Infof("Sidecar injector is listening on %s, patching Dapr-enabled pods", i.server.Addr)
// TODO:这里有时会报错,证书有问题,导致injector无法正常工作,后面再来检查
err := i.server.ListenAndServeTLS(i.config.TLSCertFile, i.config.TLSKeyFile)
if err != http.ErrServerClosed {
log.Errorf("Sidecar injector error: %s", err)
}
close(doneCh)
}
可以对比通过 k logs dapr-sidecar-injector-86b8dc4dcd-bkbgw -n dapr-system
命令查看到的injecot 日志内容:
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"log level set to: info","scope":"dapr.injector","time":"2021-05-11T01:13:20.1904136Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"metrics server started on :9090/","scope":"dapr.metrics","time":"2021-05-11T01:13:20.1907347Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"starting Dapr Sidecar Injector -- version edge -- commit v1.0.0-rc.4-163-g9a4210a-dirty","scope":"dapr.injector","time":"2021-05-11T01:13:20.191669Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Healthz server is listening on :8080","scope":"dapr.injector","time":"2021-05-11T01:13:20.1928941Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Sidecar injector is listening on :4000, patching Dapr-enabled pods","scope":"dapr.injector","time":"2021-05-11T01:13:20.208587Z","type":"log","ver":"unknown"}
handleRequest方法
handleRequest方法用来处理来自 k8s api server的 mutate 调用:
mux.HandleFunc("/mutate", i.handleRequest)
func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
......
}
代码比较长,忽略部分细节代码。
读取请求的body,验证长度和content-type:
defer r.Body.Close()
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}
if len(body) == 0 {
log.Error("empty body")
http.Error(w, "empty body", http.StatusBadRequest)
return
}
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
log.Errorf("Content-Type=%s, expect application/json", contentType)
http.Error(
w,
"invalid Content-Type, expect `application/json`",
http.StatusUnsupportedMediaType,
)
return
}
反序列化body,并做一些基本的验证:
ar := v1.AdmissionReview{}
_, gvk, err := i.deserializer.Decode(body, nil, &ar)
if err != nil {
log.Errorf("Can't decode body: %v", err)
} else {
if !utils.StringSliceContains(ar.Request.UserInfo.UID, i.authUIDs) {
err = errors.Wrapf(err, "unauthorized request")
log.Error(err)
} else if ar.Request.Kind.Kind != "Pod" {
err = errors.Wrapf(err, "invalid kind for review: %s", ar.Kind)
log.Error(err)
} else {
patchOps, err = i.getPodPatchOperations(&ar, i.config.Namespace, i.config.SidecarImage, i.config.SidecarImagePullPolicy, i.kubeClient, i.daprClient)
}
}
getPodPatchOperations 是核心代码,后面细看。
统一处理前面可能产生的错误,以及 getPodPatchOperations() 的处理结果:
diagAppID := getAppIDFromRequest(ar.Request)
if err != nil {
admissionResponse = toAdmissionResponse(err)
log.Errorf("Sidecar injector failed to inject for app '%s'. Error: %s", diagAppID, err)
monitoring.RecordFailedSidecarInjectionCount(diagAppID, "patch")
} else if len(patchOps) == 0 {
// len(patchOps) == 0 表示什么都没改,返回 Allowed: true
admissionResponse = &v1.AdmissionResponse{
Allowed: true,
}
} else {
var patchBytes []byte
// 将 patchOps 序列化为json
patchBytes, err = json.Marshal(patchOps)
if err != nil {
admissionResponse = toAdmissionResponse(err)
} else {
// 返回AdmissionResponse
admissionResponse = &v1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: func() *v1.PatchType {
pt := v1.PatchTypeJSONPatch
return &pt
}(),
}
}
}
组装 AdmissionReview:
admissionReview := v1.AdmissionReview{}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
admissionReview.Response.UID = ar.Request.UID
admissionReview.SetGroupVersionKind(*gvk)
}
}
将应答序列化并返回:
log.Infof("ready to write response ...")
respBytes, err := json.Marshal(admissionReview)
if err != nil {
http.Error(
w,
err.Error(),
http.StatusInternalServerError,
)
log.Errorf("Sidecar injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err)
monitoring.RecordFailedSidecarInjectionCount(diagAppID, "response")
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
log.Error(err)
} else {
log.Infof("Sidecar injector succeeded injection for app '%s'", diagAppID)
monitoring.RecordSuccessfulSidecarInjectionCount(diagAppID)
}
帮助类代码
toAdmissionResponse方法
toAdmissionResponse 方法用于从一个 error 创建 k8s 的 AdmissionResponse :
// toAdmissionResponse is a helper function to create an AdmissionResponse
// with an embedded error
func toAdmissionResponse(err error) *v1.AdmissionResponse {
return &v1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
获取AppID
getAppIDFromRequest() 方法从 AdmissionRequest 中获取AppID:
func getAppIDFromRequest(req *v1.AdmissionRequest) string {
// default App ID
appID := ""
// if req is not given
if req == nil {
return appID
}
var pod corev1.Pod
// 解析pod的raw数据为json
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
log.Warnf("could not unmarshal raw object: %v", err)
} else {
// 然后从pod信息中获取appID
appID = getAppID(pod)
}
return appID
}
getAppID()方法的实现如下,首先读取 “dapr.io/app-id” 的 Annotation,如果没有,则取 pod 的 name 作为默认AppID:
const appIDKey = "dapr.io/app-id"
func getAppID(pod corev1.Pod) string {
return getStringAnnotationOrDefault(pod.Annotations, appIDKey, pod.GetName())
}
分支代码
ServiceAccount 相关代码
AllowedControllersServiceAccountUID()方法返回UID数组,这些是 webhook handler 上容许的 service account 列表:
var allowedControllersServiceAccounts = []string{
"replicaset-controller",
"deployment-controller",
"cronjob-controller",
"job-controller",
"statefulset-controller",
}
// AllowedControllersServiceAccountUID returns an array of UID, list of allowed service account on the webhook handler
func AllowedControllersServiceAccountUID(ctx context.Context, kubeClient *kubernetes.Clientset) ([]string, error) {
allowedUids := []string{}
for i, allowedControllersServiceAccount := range allowedControllersServiceAccounts {
saUUID, err := getServiceAccount(ctx, kubeClient, allowedControllersServiceAccount)
// i == 0 => "replicaset-controller" is the only one mandatory
if err != nil && i == 0 {
return nil, err
} else if err != nil {
log.Warnf("Unable to get SA %s UID (%s)", allowedControllersServiceAccount, err)
continue
}
allowedUids = append(allowedUids, saUUID)
}
return allowedUids, nil
}
func getServiceAccount(ctx context.Context, kubeClient *kubernetes.Clientset, allowedControllersServiceAccount string) (string, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, getKubernetesServiceAccountTimeoutSeconds*time.Second)
defer cancel()
sa, err := kubeClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(ctxWithTimeout, allowedControllersServiceAccount, metav1.GetOptions{})
if err != nil {
return "", err
}
return string(sa.ObjectMeta.UID), nil
}