1 - Injector的代码实现

Dapr Injector的代码实现

Inject的流程

以e2e中的 stateapp 为例。

应用的原始Deployment

tests/apps/stateapp/service.yaml 中是 stateapp 的 Service 定义和 Deployment定义。

Service的定义没有什么特殊:

kind: Service
apiVersion: v1
metadata:
  name: stateapp
  labels:
    testapp: stateapp
spec:
  selector:
    testapp: stateapp
  ports:
  - protocol: TCP
    port: 80
    targetPort: 3000
  type: LoadBalancer

deployment的定义:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: stateapp
  labels:
    testapp: stateapp
spec:
  replicas: 1
  selector:
    matchLabels:
      testapp: stateapp
  template: # stateapp的pod定义
    metadata:
      labels:
        testapp: stateapp
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "stateapp"
        dapr.io/app-port: "3000"
    spec:   #stateapp的container定义,暂时pod中只定义了这个一个container
      containers:
      - name: stateapp
        image: docker.io/YOUR_DOCKER_ALIAS/e2e-stateapp:dev
        ports:
        - containerPort: 3000
        imagePullPolicy: Always

单独看 stateapp 的 pod 定义的 annotations ,

      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "stateapp"
        dapr.io/app-port: "3000"

源码

getPodPatchOperations:

func (i *injector) getPodPatchOperations(ar *v1beta1.AdmissionReview,
	namespace, image string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
	req := ar.Request
	var pod corev1.Pod
	if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
		errors.Wrap(err, "could not unmarshal raw object")
		return nil, err
	}

	log.Infof(
		"AdmissionReview for Kind=%v, Namespace=%v Name=%v (%v) UID=%v "+
			"patchOperation=%v UserInfo=%v",
		req.Kind,
		req.Namespace,
		req.Name,
		pod.Name,
		req.UID,
		req.Operation,
		req.UserInfo,
	)

	if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
		return nil, nil
	}
  ...

这个info日志打印的例子如下:

{"instance":"dapr-sidecar-injector-5f6f4bb6df-n5dsk","level":"info","msg":"AdmissionReview for Kind=/v1, Kind=Pod, Namespace=dapr-tests Name= () UID=d0126a13-9efd-432e-894a-5ddbee55898c patchOperation=CREATE UserInfo={system:serviceaccount:kube-system:replicaset-controller 3e5de149-07a3-434e-a8de-209abee69760 [system:serviceaccounts system:serviceaccounts:kube-system system:authenticated] map[]}","scope":"dapr.injector","time":"2020-09-25T07:07:07.6482457Z","type":"log","ver":"edge"}

可以看到在 namespace dapr-tests 下 pod 有 CREATE operation时Injector有开始工作。

isResourceDaprEnabled(pod.Annotations) 检查是否是 dapr,判断的方式是看 pod 是否有名为dapr.io/enabled 的 annotation并且设置为true,缺省为false:

const (
	daprEnabledKey                    = "dapr.io/enabled"
)
func isResourceDaprEnabled(annotations map[string]string) bool {
	return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}

podContainsSidecarContainer 检查 pod 是不是已经包含 dapr的sidecar,判断的方式是看 container 的名字是不是 daprd

const (
	sidecarContainerName              = "daprd"
)
func podContainsSidecarContainer(pod *corev1.Pod) bool {
	for _, c := range pod.Spec.Containers {
		if c.Name == sidecarContainerName {
			return true
		}
	}
	return false
}

继续getPodPatchOperations():

	id := getAppID(pod)
	// Keep DNS resolution outside of getSidecarContainer for unit testing.
	placementAddress := fmt.Sprintf("%s:80", getKubernetesDNS(placementService, namespace))
	sentryAddress := fmt.Sprintf("%s:80", getKubernetesDNS(sentryService, namespace))
	apiSrvAddress := fmt.Sprintf("%s:80", getKubernetesDNS(apiAddress, namespace))

getAppID(pod) 通过读取 annotation 来获取应用id,注意 “dapr.io/id” 已经废弃,1.0 之后将被删除,替换为dapr.io/app-id":

const (
	appIDKey                          = "dapr.io/app-id"
  	// Deprecated, remove in v1.0
	idKey                 = "dapr.io/id"
)
func getAppID(pod corev1.Pod) string {
	id := getStringAnnotationOrDefault(pod.Annotations, appIDKey, "")
	if id != "" {
		return id
	}

	return getStringAnnotationOrDefault(pod.Annotations, idKey, pod.GetName())
}

mtlsEnabled的判断

	var trustAnchors string
	var certChain string
	var certKey string
	var identity string

	mtlsEnabled := mTLSEnabled(daprClient)
	if mtlsEnabled {
		trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
		identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
	}

mTLSEnabled判断的方式,居然是读取所有的namespace下的dapr configuration:

const (
	// NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces
	NamespaceAll string = ""
)
func mTLSEnabled(daprClient scheme.Interface) bool {
	resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
	if err != nil {
		return defaultMtlsEnabled
	}

	for _, c := range resp.Items {
		if c.GetName() == defaultConfig {  // "daprsystem"
			return c.Spec.MTLSSpec.Enabled
		}
	}
	return defaultMtlsEnabled
}

通过读取k8s的资源来判断是否要开启 mtls,tests/config/dapr_mtls_off_config.yaml 有example内容:

apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: daprsystem # 名字一定要是 daprsystem
spec:
  mtls:
    enabled: "false"  # 在这里配置要不要开启 mtls
    workloadCertTTL: "1h"
    allowedClockSkew: "20m"

但这个坑货

E0925 09:37:53.480772       1 reflector.go:153] sigs.k8s.io/controller-runtime/pkg/cache/internal/informers_map.go:224: Failed to list *v1alpha1.Configuration: v1alpha1.ConfigurationList.Items: []v1alpha1.Configuration: v1alpha1.Configuration.Spec: v1alpha1.ConfigurationSpec.MTLSSpec: v1alpha1.MTLSSpec.Enabled: ReadBool: expect t or f, but found ", error found in #10 byte of ...|enabled":"false","wo|..., bigger context ...|pec":{"mtls":{"allowedClockSkew":"20m","enabled":"false","workloadCertTTL":"1h"}}},{"apiVersion":"da|...

生效的应用pod定义

apiVersion: v1
kind: Pod
metadata:
  annotations:
    dapr.io/app-id: stateapp
    dapr.io/app-port: "3000"
    dapr.io/enabled: "true"
    dapr.io/sidecar-cpu-limit: "4.0"
    dapr.io/sidecar-cpu-request: "0.5"
    dapr.io/sidecar-memory-limit: 512Mi
    dapr.io/sidecar-memory-request: 250Mi
  creationTimestamp: "2020-09-25T07:07:07Z"
  generateName: stateapp-567b6b9c6f-
  labels:
    pod-template-hash: 567b6b9c6f
    testapp: stateapp
  name: stateapp-567b6b9c6f-84kzb
  namespace: dapr-tests
  ownerReferences:
  - apiVersion: apps/v1
    blockOwnerDeletion: true
    controller: true
    kind: ReplicaSet
    name: stateapp-567b6b9c6f
    uid: 25a34367-79ed-4e19-868a-5b063a45b1f4
  resourceVersion: "146616"
  selfLink: /api/v1/namespaces/dapr-tests/pods/stateapp-567b6b9c6f-84kzb
  uid: 0f4060df-0312-4d73-91c1-6f085462b33d
  spec:
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
        - matchExpressions:
          - key: kubernetes.io/os
            operator: In
            values:
            - linux
          - key: kubernetes.io/arch
            operator: In
            values:
            - amd64
  containers:
  - env:
    - name: DAPR_HTTP_PORT
      value: "3500"
    - name: DAPR_GRPC_PORT
      value: "50001"
    image: docker.io/skyao/e2e-stateapp:dev-linux-amd64
    imagePullPolicy: Always
    name: stateapp
    ports:
    - containerPort: 3000
      name: http
      protocol: TCP
    resources: {}
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: default-token-qncjc
      readOnly: true
  - args:
    - --mode
    - kubernetes
    - --dapr-http-port
    - "3500"
    - --dapr-grpc-port
    - "50001"
    - --dapr-internal-grpc-port
    - "50002"
    - --app-port
    - "3000"
    - --app-id
    - stateapp
    - --control-plane-address
    - dapr-api.dapr-system.svc.cluster.local:80
    - --app-protocol
    - http
    - --placement-host-address
    - dapr-placement.dapr-system.svc.cluster.local:80
    - --config
    - ""
    - --log-level
    - info
    - --app-max-concurrency
    - "-1"
    - --sentry-address
    - dapr-sentry.dapr-system.svc.cluster.local:80
    - --metrics-port
    - "9090"
    - --enable-mtls
    command:
    - /daprd
    env:
    - name: DAPR_HOST_IP
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: status.podIP
    - name: NAMESPACE
      value: dapr-tests
    - name: DAPR_TRUST_ANCHORS
      value: |
        -----BEGIN CERTIFICATE-----
        MIIB3TCCAYKgAwIBAgIRAMra+wjgMY6ABDtu3/vJ0NcwCgYIKoZIzj0EAwIwMTEX
        MBUGA1UEChMOZGFwci5pby9zZW50cnkxFjAUBgNVBAMTDWNsdXN0ZXIubG9jYWww
        HhcNMjAwOTI1MDU1ODAzWhcNMjEwOTI1MDU1ODAzWjAxMRcwFQYDVQQKEw5kYXBy
        LmlvL3NlbnRyeTEWMBQGA1UEAxMNY2x1c3Rlci5sb2NhbDBZMBMGByqGSM49AgEG
        CCqGSM49AwEHA0IABE/w/8YBtRJPYNJkcDM05e9PhrbGjBU/RQd09J909OJebDe8
        rthysygWrcGYHYKziKK2Pyc1j4ua2xklLC5DFEWjezB5MA4GA1UdDwEB/wQEAwIC
        BDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0TAQH/BAUwAwEB
        /zAdBgNVHQ4EFgQUQ2v6OiayM9V4DPAU6UZHGe/nc1swGAYDVR0RBBEwD4INY2x1
        c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNJADBGAiEAtVBx9vDXiRE3fXJTU2yK11W5
        eo+Ce4+U6/vXDtzw4PUCIQDlLOB45ihOAhhLVLG9akhgwJOrgZLEW3FZjRabpSsb
        og==
        -----END CERTIFICATE-----        
    - name: DAPR_CERT_CHAIN
      value: |
        -----BEGIN CERTIFICATE-----
        MIIBxDCCAWqgAwIBAgIQQ1sfEH4aYacFZwBau+aOozAKBggqhkjOPQQDAjAxMRcw
        FQYDVQQKEw5kYXByLmlvL3NlbnRyeTEWMBQGA1UEAxMNY2x1c3Rlci5sb2NhbDAe
        Fw0yMDA5MjUwNTU4MDNaFw0yMTA5MjUwNTU4MDNaMBgxFjAUBgNVBAMTDWNsdXN0
        ZXIubG9jYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARhj7MQ1uiOkZvJ0AYV
        uiFca/Iu9D5O98E5JN1mjCohRawk+QT1PjW05YtmyVji4Tt6ckIMvOXwG3aoTsGO
        UbRio30wezAOBgNVHQ8BAf8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4E
        FgQUTPUh0WWBB5baKs3aJjMzInVLX/EwHwYDVR0jBBgwFoAUQ2v6OiayM9V4DPAU
        6UZHGe/nc1swGAYDVR0RBBEwD4INY2x1c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNI
        ADBFAiBO0oCadeYyLM+RkSAYPSTtjMyEZ0wv1/BsWuUMg+KZ6AIhALHnT0pxiqlj
        miYT4WZWvaBc17AbUh1efgV2DAaNKm54
        -----END CERTIFICATE-----
                
    - name: DAPR_CERT_KEY
      value: |
        -----BEGIN EC PRIVATE KEY-----
        MHcCAQEEIDj6niLJ5ep+fDdY71bKyWl9RZHrXyRjND6pWySL2Q4UoAoGCCqGSM49
        AwEHoUQDQgAEYY+zENbojpGbydAGFbohXGvyLvQ+TvfBOSTdZowqIUWsJPkE9T41
        tOWLZslY4uE7enJCDLzl8Bt2qE7BjlG0Yg==
        -----END EC PRIVATE KEY-----        
    - name: SENTRY_LOCAL_IDENTITY
      value: default:dapr-tests
    image: docker.io/skyao/daprd:dev-linux-amd64
    imagePullPolicy: Always
    livenessProbe:
      failureThreshold: 3
      httpGet:
        path: /v1.0/healthz
        port: 3500
        scheme: HTTP
      initialDelaySeconds: 3
      periodSeconds: 6
      successThreshold: 1
      timeoutSeconds: 3
    name: daprd
    ports:
    - containerPort: 3500
      name: dapr-http
      protocol: TCP
    - containerPort: 50001
      name: dapr-grpc
      protocol: TCP
    - containerPort: 50002
      name: dapr-internal
      protocol: TCP
    - containerPort: 9090
      name: dapr-metrics
      protocol: TCP
    readinessProbe:
      failureThreshold: 3
      httpGet:
        path: /v1.0/healthz
        port: 3500
        scheme: HTTP
      initialDelaySeconds: 3
      periodSeconds: 6
      successThreshold: 1
      timeoutSeconds: 3
    resources:
      limits:
        cpu: "4"
        memory: 512Mi
      requests:
        cpu: 500m
        memory: 250Mi
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: default-token-qncjc
      readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  nodeName: docker-desktop
  priority: 0
  restartPolicy: Always
  schedulerName: default-scheduler
  securityContext: {}
  serviceAccount: default
  serviceAccountName: default
  terminationGracePeriodSeconds: 30
  tolerations:
  - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
  volumes:
  - name: default-token-qncjc
    secret:
      defaultMode: 420
      secretName: default-token-qncjc
status:
  conditions:
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T07:07:07Z"
    status: "True"
    type: Initialized
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T07:07:07Z"
    message: 'containers with unready status: [daprd]'
    reason: ContainersNotReady
    status: "False"
    type: Ready
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T07:07:07Z"
    message: 'containers with unready status: [daprd]'
    reason: ContainersNotReady
    status: "False"
    type: ContainersReady
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T07:07:07Z"
    status: "True"
    type: PodScheduled
  containerStatuses:
  - containerID: docker://26a1d85ac6e2accd833832681b8dc2aa809e3c0fcfa293398bd5e7c2e8bf3e2b
    image: skyao/daprd:dev-linux-amd64
    imageID: docker-pullable://skyao/daprd@sha256:387f3bf4e7397c43dca9ac2d248a9ce790b1c1888aa0d6de3b07107ce124752f
    lastState:
      terminated:
        containerID: docker://26a1d85ac6e2accd833832681b8dc2aa809e3c0fcfa293398bd5e7c2e8bf3e2b
        exitCode: 1
        finishedAt: "2020-09-25T08:03:14Z"
        reason: Error
        startedAt: "2020-09-25T08:03:04Z"
    name: daprd
    ready: false
    restartCount: 21
    started: false
    state:
      waiting:
        message: back-off 5m0s restarting failed container=daprd pod=stateapp-567b6b9c6f-84kzb_dapr-tests(0f4060df-0312-4d73-91c1-6f085462b33d)
        reason: CrashLoopBackOff
  - containerID: docker://737745ace04213c9519ad1f91e248015c89a80e2b3d61081c3c530d1c89bdbae
    image: skyao/e2e-stateapp:dev-linux-amd64
    imageID: docker-pullable://skyao/e2e-stateapp@sha256:16351b331f1338a61348c9a87fce43728369f1bf18ee69d9d45fb13db0283644
    lastState: {}
    name: stateapp
    ready: true
    restartCount: 0
    started: true
    state:
      running:
        startedAt: "2020-09-25T07:07:24Z"
  hostIP: 192.168.65.3
  phase: Running
  podIP: 10.1.0.194
  podIPs:
  - ip: 10.1.0.194
  qosClass: Burstable
  startTime: "2020-09-25T07:07:07Z"

其他

injector自身的pod定义

dapr-sidecar-injector

apiVersion: v1
kind: Pod
metadata:
  annotations:
    prometheus.io/path: /
    prometheus.io/port: "9090"
    prometheus.io/scrape: "true"
  creationTimestamp: "2020-09-25T05:57:37Z"
  generateName: dapr-sidecar-injector-5f6f4bb6df-
  labels:
    app: dapr-sidecar-injector
    app.kubernetes.io/component: sidecar-injector
    app.kubernetes.io/managed-by: helm
    app.kubernetes.io/name: dapr
    app.kubernetes.io/part-of: dapr
    app.kubernetes.io/version: dev-linux-amd64
    pod-template-hash: 5f6f4bb6df
  name: dapr-sidecar-injector-5f6f4bb6df-n5dsk
  namespace: dapr-system
  ownerReferences:
  - apiVersion: apps/v1
    blockOwnerDeletion: true
    controller: true
    kind: ReplicaSet
    name: dapr-sidecar-injector-5f6f4bb6df
    uid: ff47b1df-6da7-4a19-b99d-15622ca3a485
  resourceVersion: "133143"
  selfLink: /api/v1/namespaces/dapr-system/pods/dapr-sidecar-injector-5f6f4bb6df-n5dsk
  uid: 40df3834-4df2-495a-aa26-5b2a22de7639
  spec:
  affinity:
    nodeAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - preference:
          matchExpressions:
          - key: kubernetes.io/os
            operator: In
            values:
            - linux
        weight: 1
  containers:
  - args:
    - --log-level
    - info
    - --log-as-json
    - --metrics-port
    - "9090"
    command:
    - /injector
    env:
    - name: TLS_CERT_FILE
      value: /dapr/cert/tls.crt
    - name: TLS_KEY_FILE
      value: /dapr/cert/tls.key
    - name: SIDECAR_IMAGE
      value: docker.io/skyao/daprd:dev-linux-amd64
    - name: NAMESPACE
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.namespace
    image: docker.io/skyao/dapr:dev-linux-amd64
    imagePullPolicy: Always
        livenessProbe:
      failureThreshold: 5
      httpGet:
        path: /healthz
        port: 8080
        scheme: HTTP
      initialDelaySeconds: 3
      periodSeconds: 3
      successThreshold: 1
      timeoutSeconds: 1
    name: dapr-sidecar-injector
    ports:
    - containerPort: 4000
      name: https
      protocol: TCP
    - containerPort: 9090
      name: metrics
      protocol: TCP
    readinessProbe:
      failureThreshold: 5
      httpGet:
        path: /healthz
        port: 8080
        scheme: HTTP
      initialDelaySeconds: 3
      periodSeconds: 3
      successThreshold: 1
      timeoutSeconds: 1
    resources: {}
        securityContext:
      runAsUser: 1000
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /dapr/cert
      name: cert
      readOnly: true
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: dapr-operator-token-lgpvc
      readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  nodeName: docker-desktop
  priority: 0
  restartPolicy: Always
  schedulerName: default-scheduler
  securityContext: {}
  serviceAccount: dapr-operator
  serviceAccountName: dapr-operator
  terminationGracePeriodSeconds: 30
  tolerations:
    - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
  volumes:
  - name: cert
    secret:
      defaultMode: 420
      secretName: dapr-sidecar-injector-cert
  - name: dapr-operator-token-lgpvc
    secret:
      defaultMode: 420
      secretName: dapr-operator-token-lgpvc
status:
  conditions:
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T05:57:37Z"
    status: "True"
    type: Initialized
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T05:58:10Z"
    status: "True"
    type: Ready
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T05:58:10Z"
    status: "True"
    type: ContainersReady
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T05:57:37Z"
    status: "True"
    type: PodScheduled
  containerStatuses:
  - containerID: docker://a820646b468a07eabdd89ca133f062a93e85256afc6c19c1bdf13b56980ec5e9
    image: skyao/dapr:dev-linux-amd64
    imageID: docker-pullable://skyao/dapr@sha256:77003eee9fd02d9fc24c2e9f385a6c86223bc35915cede98a8897c0dfc51ee61
    lastState: {}
    name: dapr-sidecar-injector
    ready: true
    restartCount: 0
    started: true
    state:
      running:
        startedAt: "2020-09-25T05:58:06Z"
  hostIP: 192.168.65.3
  phase: Running
  podIP: 10.1.0.188
  podIPs:
  - ip: 10.1.0.188
  qosClass: BestEffort
  startTime: "2020-09-25T05:57:37Z"

2 - main.go的源码学习

Dapr Injector 的 main 代码

Dapr injector 中的 main.go 文件的源码分析。

init() 方法

init() 进行初始化,包括 flag (logger, metric),

flag 设定和读取

func init() {
	loggerOptions := logger.DefaultOptions()
	// 这里设定了 `log-level` 和 `log-as-json`
	loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)

	metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
	
	// 这里设定了 `metrics-port` 和 `enable-metrics`
metricsExporter.Options().AttachCmdFlags(flag.StringVar, flag.BoolVar)

	flag.Parse()

参考 injector pod yaml文件中 Command 段:

    Command:
      /injector
    Args:
      --log-level
      info
      --log-as-json
      --enable-metrics
      --metrics-port
      9090

初始化 logger

	// Apply options to all loggers
	if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
		log.Fatal(err)
	} else {
		log.Infof("log level set to: %s", loggerOptions.OutputLevel)
	}

初始化 metrics

	// Initialize dapr metrics exporter
	if err := metricsExporter.Init(); err != nil {
		log.Fatal(err)
	}

	// Initialize injector service metrics
	if err := monitoring.InitMetrics(); err != nil {
		log.Fatal(err)
	}

main() 方法

获取配置

从环境变量中读取配置:

func main() {
	logger.DaprVersion = version.Version()
	log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())

	ctx := signals.Context()
	cfg, err := injector.GetConfigFromEnvironment()
	if err != nil {
		log.Fatalf("error getting config: %s", err)
	}
	......
}

获取daprClient

	kubeClient := utils.GetKubeClient()
	conf := utils.GetConfig()
	daprClient, _ := scheme.NewForConfig(conf)

启动 healthz server

	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		healthzErr := healthzServer.Run(ctx, healthzPort)
		if healthzErr != nil {
			log.Fatalf("failed to start healthz server: %s", healthzErr)
		}
	}()

service account

	uids, err := injector.AllowedControllersServiceAccountUID(ctx, kubeClient)
	if err != nil {
		log.Fatalf("failed to get authentication uids from services accounts: %s", err)
	}

创建 injector

	injector.NewInjector(uids, cfg, daprClient, kubeClient).Run(ctx)

graceful shutdown

简单的sleep 5秒作为 graceful shutdown :

	shutdownDuration := 5 * time.Second
	log.Infof("allowing %s for graceful shutdown to complete", shutdownDuration)
	<-time.After(shutdownDuration)

3 - config.go的源码学习

Dapr Injector 的 config 代码

Dapr injector package中的 config.go 文件的源码分析。

代码实现

Config 结构体定义

Injector 相关的配置项定义:

// Config represents configuration options for the Dapr Sidecar Injector webhook server
type Config struct {
	TLSCertFile            string `envconfig:"TLS_CERT_FILE" required:"true"`
	TLSKeyFile             string `envconfig:"TLS_KEY_FILE" required:"true"`
	SidecarImage           string `envconfig:"SIDECAR_IMAGE" required:"true"`
	SidecarImagePullPolicy string `envconfig:"SIDECAR_IMAGE_PULL_POLICY"`
	Namespace              string `envconfig:"NAMESPACE" required:"true"`
}

NewConfigWithDefaults() 方法

只设置了一个 SidecarImagePullPolicy 的默认值:

func NewConfigWithDefaults() Config {
	return Config{
		SidecarImagePullPolicy: "Always",
	}
}

这个方法只被下面的 GetConfigFromEnvironment() 方法调用。

GetConfigFromEnvironment() 方法

从环境中获取配置

func GetConfigFromEnvironment() (Config, error) {
	c := NewConfigWithDefaults()
	err := envconfig.Process("", &c)
	return c, err
}

envconfig.Process() 的代码实现会通过反射读取到 Config 结构体的信息,然后根据设定的环境变量名来读取。

这个方法的调用只有一个地方,在injector main 函数的开始位置:

func main() {
   log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())

   ctx := signals.Context()
   cfg, err := injector.GetConfigFromEnvironment()
   if err != nil {
      log.Fatalf("error getting config: %s", err)
   }
   ......  
}

通过命令如 k describe pod dapr-sidecar-injector-6f656b7dd-sg87p -n dapr-system 拿到 injector pod 的yaml 文件,可以看到 Environment 的这一段:

    Environment:
      TLS_CERT_FILE:              /dapr/cert/tls.crt
      TLS_KEY_FILE:               /dapr/cert/tls.key
      SIDECAR_IMAGE:              docker.io/skyao/daprd:dev-linux-amd64
      SIDECAR_IMAGE_PULL_POLICY:  IfNotPresent
      NAMESPACE:                  dapr-system (v1:metadata.namespace)

injector yaml 备用

以下是完整的 injector pod yaml,留着备用:

Name:         dapr-sidecar-injector-6f656b7dd-sg87p
Namespace:    dapr-system
Priority:     0
Node:         docker-desktop/192.168.65.3
Start Time:   Mon, 19 Apr 2021 15:04:07 +0800
Labels:       app=dapr-sidecar-injector
              app.kubernetes.io/component=sidecar-injector
              app.kubernetes.io/managed-by=helm
              app.kubernetes.io/name=dapr
              app.kubernetes.io/part-of=dapr
              app.kubernetes.io/version=dev-linux-amd64
              pod-template-hash=6f656b7dd
Annotations:  prometheus.io/path: /
              prometheus.io/port: 9090
              prometheus.io/scrape: true
Status:       Running
IP:           10.1.2.162
IPs:
  IP:           10.1.2.162
Controlled By:  ReplicaSet/dapr-sidecar-injector-6f656b7dd
Containers:
  dapr-sidecar-injector:
    Container ID:  docker://544dabf00bdaba9cf8f320218dd0b7e6d2ebce7fbf5184ce162d58bc693162d9
    Image:         docker.io/skyao/dapr:dev-linux-amd64
    Image ID:      docker-pullable://skyao/dapr@sha256:b4843ee78eabf014e15749bc4daa5c249ce3d33f796a89aaba9d117dd3dc76c9
    Ports:         4000/TCP, 9090/TCP
    Host Ports:    0/TCP, 0/TCP
    Command:
      /injector
    Args:
      --log-level
      info
      --log-as-json
      --enable-metrics
      --metrics-port
      9090
    State:          Running
      Started:      Mon, 19 Apr 2021 15:04:08 +0800
    Ready:          True
    Restart Count:  0
    Liveness:       http-get http://:8080/healthz delay=3s timeout=1s period=3s #success=1 #failure=5
    Readiness:      http-get http://:8080/healthz delay=3s timeout=1s period=3s #success=1 #failure=5
    Environment:
      TLS_CERT_FILE:              /dapr/cert/tls.crt
      TLS_KEY_FILE:               /dapr/cert/tls.key
      SIDECAR_IMAGE:              docker.io/skyao/daprd:dev-linux-amd64
      SIDECAR_IMAGE_PULL_POLICY:  IfNotPresent
      NAMESPACE:                  dapr-system (v1:metadata.namespace)
    Mounts:
      /dapr/cert from cert (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from dapr-operator-token-cjpnd (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             True 
  ContainersReady   True 
  PodScheduled      True 
Volumes:
  cert:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  dapr-sidecar-injector-cert
    Optional:    false
  dapr-operator-token-cjpnd:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  dapr-operator-token-cjpnd
    Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  17m   default-scheduler  Successfully assigned dapr-system/dapr-sidecar-injector-6f656b7dd-sg87p to docker-desktop
  Normal  Pulled     17m   kubelet            Container image "docker.io/skyao/dapr:dev-linux-amd64" already present on machine
  Normal  Created    17m   kubelet            Created container dapr-sidecar-injector
  Normal  Started    17m   kubelet            Started container dapr-sidecar-injector

4 - injector.go的源码学习

Dapr Injector 中的 injector.go 的 代码

主流程代码

接口和结构体定义和创建

Injector 是Dapr运行时 sidecar 注入组件的接口。

// Injector is the interface for the Dapr runtime sidecar injection component
type Injector interface {
   Run(ctx context.Context)
}

injector 结构体定义:

type injector struct {
   config       Config
   deserializer runtime.Decoder
   server       *http.Server
   kubeClient   *kubernetes.Clientset
   daprClient   scheme.Interface
   authUIDs     []string
}

创建新的 injector 结构体(这个方法在injecot的main方法中被调用):

// NewInjector returns a new Injector instance with the given config
func NewInjector(authUIDs []string, config Config, daprClient scheme.Interface, kubeClient *kubernetes.Clientset) Injector {
   mux := http.NewServeMux()

   i := &injector{
      config: config,
      deserializer: serializer.NewCodecFactory(
         runtime.NewScheme(),
      ).UniversalDeserializer(),
      // 启动http server
      server: &http.Server{
         Addr:    fmt.Sprintf(":%d", port),
         Handler: mux,
      },
      kubeClient: kubeClient,
      daprClient: daprClient,
      authUIDs:   authUIDs,
   }

   // 给 k8s 调用的 mutate 端点
   mux.HandleFunc("/mutate", i.handleRequest)
   return i
}

Run()方法

最核心的run方法,

func (i *injector) Run(ctx context.Context) {
   doneCh := make(chan struct{})

   // 启动go routing,监听 ctx 和 doneCh 的信号
   go func() {
      select {
      case <-ctx.Done():
         log.Info("Sidecar injector is shutting down")
         shutdownCtx, cancel := context.WithTimeout(
            context.Background(),
            time.Second*5,
         )
         defer cancel()
         i.server.Shutdown(shutdownCtx) // nolint: errcheck
      case <-doneCh:
      }
   }()

   // 打印启动时的日志,这行日志可以通过 
   log.Infof("Sidecar injector is listening on %s, patching Dapr-enabled pods", i.server.Addr)
   // TODO:这里有时会报错,证书有问题,导致injector无法正常工作,后面再来检查
   err := i.server.ListenAndServeTLS(i.config.TLSCertFile, i.config.TLSKeyFile)
   if err != http.ErrServerClosed {
      log.Errorf("Sidecar injector error: %s", err)
   }
   close(doneCh)
}

可以对比通过 k logs dapr-sidecar-injector-86b8dc4dcd-bkbgw -n dapr-system 命令查看到的injecot 日志内容:

{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"log level set to: info","scope":"dapr.injector","time":"2021-05-11T01:13:20.1904136Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"metrics server started on :9090/","scope":"dapr.metrics","time":"2021-05-11T01:13:20.1907347Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"starting Dapr Sidecar Injector -- version edge -- commit v1.0.0-rc.4-163-g9a4210a-dirty","scope":"dapr.injector","time":"2021-05-11T01:13:20.191669Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Healthz server is listening on :8080","scope":"dapr.injector","time":"2021-05-11T01:13:20.1928941Z","type":"log","ver":"unknown"}

{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Sidecar injector is listening on :4000, patching Dapr-enabled pods","scope":"dapr.injector","time":"2021-05-11T01:13:20.208587Z","type":"log","ver":"unknown"}

handleRequest方法

handleRequest方法用来处理来自 k8s api server的 mutate 调用:

mux.HandleFunc("/mutate", i.handleRequest)

func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
  ......
}

代码比较长,忽略部分细节代码。

读取请求的body,验证长度和content-type:

defer r.Body.Close()

var body []byte
if r.Body != nil {
   if data, err := ioutil.ReadAll(r.Body); err == nil {
      body = data
   }
}
if len(body) == 0 {
   log.Error("empty body")
   http.Error(w, "empty body", http.StatusBadRequest)
   return
}

contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
  log.Errorf("Content-Type=%s, expect application/json", contentType)
  http.Error(
    w,
    "invalid Content-Type, expect `application/json`",
    http.StatusUnsupportedMediaType,
  )

  return
}

反序列化body,并做一些基本的验证:

ar := v1.AdmissionReview{}
_, gvk, err := i.deserializer.Decode(body, nil, &ar)
if err != nil {
   log.Errorf("Can't decode body: %v", err)
} else {
   if !utils.StringSliceContains(ar.Request.UserInfo.UID, i.authUIDs) {
      err = errors.Wrapf(err, "unauthorized request")
      log.Error(err)
   } else if ar.Request.Kind.Kind != "Pod" {
      err = errors.Wrapf(err, "invalid kind for review: %s", ar.Kind)
      log.Error(err)
   } else {
      patchOps, err = i.getPodPatchOperations(&ar, i.config.Namespace, i.config.SidecarImage, i.config.SidecarImagePullPolicy, i.kubeClient, i.daprClient)
   }
}

getPodPatchOperations 是核心代码,后面细看。

统一处理前面可能产生的错误,以及 getPodPatchOperations() 的处理结果:

diagAppID := getAppIDFromRequest(ar.Request)

if err != nil {
   admissionResponse = toAdmissionResponse(err)
   log.Errorf("Sidecar injector failed to inject for app '%s'. Error: %s", diagAppID, err)
   monitoring.RecordFailedSidecarInjectionCount(diagAppID, "patch")
} else if len(patchOps) == 0 {
   // len(patchOps) == 0 表示什么都没改,返回  Allowed: true
   admissionResponse = &v1.AdmissionResponse{
      Allowed: true,
   }
} else {
   var patchBytes []byte
   // 将 patchOps 序列化为json
   patchBytes, err = json.Marshal(patchOps)
   if err != nil {
      admissionResponse = toAdmissionResponse(err)
   } else {
      // 返回AdmissionResponse
      admissionResponse = &v1.AdmissionResponse{
         Allowed: true,
         Patch:   patchBytes,
         PatchType: func() *v1.PatchType {
            pt := v1.PatchTypeJSONPatch
            return &pt
         }(),
      }
   }
}

组装 AdmissionReview:

admissionReview := v1.AdmissionReview{}
if admissionResponse != nil {
   admissionReview.Response = admissionResponse
   if ar.Request != nil {
      admissionReview.Response.UID = ar.Request.UID
      admissionReview.SetGroupVersionKind(*gvk)
   }
}

将应答序列化并返回:

log.Infof("ready to write response ...")
respBytes, err := json.Marshal(admissionReview)
if err != nil {
   http.Error(
      w,
      err.Error(),
      http.StatusInternalServerError,
   )

   log.Errorf("Sidecar injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err)
   monitoring.RecordFailedSidecarInjectionCount(diagAppID, "response")
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
   log.Error(err)
} else {
   log.Infof("Sidecar injector succeeded injection for app '%s'", diagAppID)
   monitoring.RecordSuccessfulSidecarInjectionCount(diagAppID)
}

帮助类代码

toAdmissionResponse方法

toAdmissionResponse 方法用于从一个 error 创建 k8s 的 AdmissionResponse :

// toAdmissionResponse is a helper function to create an AdmissionResponse
// with an embedded error
func toAdmissionResponse(err error) *v1.AdmissionResponse {
   return &v1.AdmissionResponse{
      Result: &metav1.Status{
         Message: err.Error(),
      },
   }
}

获取AppID

getAppIDFromRequest() 方法从 AdmissionRequest 中获取AppID:

func getAppIDFromRequest(req *v1.AdmissionRequest) string {
   // default App ID
   appID := ""

   // if req is not given
   if req == nil {
      return appID
   }

   var pod corev1.Pod
   // 解析pod的raw数据为json
   if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
      log.Warnf("could not unmarshal raw object: %v", err)
   } else {
      // 然后从pod信息中获取appID
      appID = getAppID(pod)
   }

   return appID
}

getAppID()方法的实现如下,首先读取 “dapr.io/app-id” 的 Annotation,如果没有,则取 pod 的 name 作为默认AppID:

const	appIDKey                          = "dapr.io/app-id"
func getAppID(pod corev1.Pod) string {
	return getStringAnnotationOrDefault(pod.Annotations, appIDKey, pod.GetName())
}

分支代码

ServiceAccount 相关代码

AllowedControllersServiceAccountUID()方法返回UID数组,这些是 webhook handler 上容许的 service account 列表:

var allowedControllersServiceAccounts = []string{
	"replicaset-controller",
	"deployment-controller",
	"cronjob-controller",
	"job-controller",
	"statefulset-controller",
}

// AllowedControllersServiceAccountUID returns an array of UID, list of allowed service account on the webhook handler
func AllowedControllersServiceAccountUID(ctx context.Context, kubeClient *kubernetes.Clientset) ([]string, error) {
   allowedUids := []string{}
   for i, allowedControllersServiceAccount := range allowedControllersServiceAccounts {
      saUUID, err := getServiceAccount(ctx, kubeClient, allowedControllersServiceAccount)
      // i == 0 => "replicaset-controller" is the only one mandatory
      if err != nil && i == 0 {
         return nil, err
      } else if err != nil {
         log.Warnf("Unable to get SA %s UID (%s)", allowedControllersServiceAccount, err)
         continue
      }
      allowedUids = append(allowedUids, saUUID)
   }

   return allowedUids, nil
}

func getServiceAccount(ctx context.Context, kubeClient *kubernetes.Clientset, allowedControllersServiceAccount string) (string, error) {
	ctxWithTimeout, cancel := context.WithTimeout(ctx, getKubernetesServiceAccountTimeoutSeconds*time.Second)
	defer cancel()

	sa, err := kubeClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(ctxWithTimeout, allowedControllersServiceAccount, metav1.GetOptions{})
	if err != nil {
		return "", err
	}

	return string(sa.ObjectMeta.UID), nil
}

5 - patch_operation.go的源码学习

Dapr Injector 中的 patch_operation.go 的 代码

代码非常简单,只定义了一个结构体 PatchOperation,用来表示要应用于Kubernetes资源的一个单独的变化。

// PatchOperation represents a discreet change to be applied to a Kubernetes resource
type PatchOperation struct {
	Op    string      `json:"op"`
	Path  string      `json:"path"`
	Value interface{} `json:"value,omitempty"`
}

6 - pod_patch.go的源码学习

Dapr Injector 中的 pod_patch.go 的 代码

主流程

getPodPatchOperations() 是最重要的方法,injector 对 pod 的修改就在这里进行:


func (i *injector) getPodPatchOperations(ar *v1.AdmissionReview,
	namespace, image, imagePullPolicy string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
    ......
  	return patchOps, nil
}

解析request,得到 pod 对象 (这里和前面重复了?):

req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
   errors.Wrap(err, "could not unmarshal raw object")
   return nil, err
}

判断是否需要 injector 做处理:

if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
   return nil, nil
}

// 判断是否启动了dapr,依据是是否设置 annotation "dapr.io/enabled" 为 true,默认为false
const daprEnabledKey                    = "dapr.io/enabled"
func isResourceDaprEnabled(annotations map[string]string) bool {
	return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}

// 判断是否包含了 dapr 的 sidecar container
const 	sidecarContainerName              = "daprd"
func podContainsSidecarContainer(pod *corev1.Pod) bool {
	for _, c := range pod.Spec.Containers {
    // 检测方式是循环pod中的所有container,检查是否有container的名字为 "daprd"
		if c.Name == sidecarContainerName {
			return true
		}
	}
	return false
}

创建 daprd sidecar container:

sidecarContainer, err := getSidecarContainer(pod.Annotations, id, image, imagePullPolicy, req.Namespace, apiSrvAddress, placementAddress, tokenMount, trustAnchors, certChain, certKey, sentryAddress, mtlsEnabled, identity)

getSidecarContainer()的细节后面看,先走完主流程。

patchOps := []PatchOperation{}
envPatchOps := []PatchOperation{}
var path string
var value interface{}
if len(pod.Spec.Containers) == 0 {
   // 如果pod的container数量为0(什么情况下会有这种没有container的pod?)
   path = containersPath
   value = []corev1.Container{*sidecarContainer}
} else {
   // 将 daprd 的sidecar 加入
   envPatchOps = addDaprEnvVarsToContainers(pod.Spec.Containers)
   // TODO:path 的设值有什么规范或者要求?
   path = "/spec/containers/-"
   value = sidecarContainer
}

	patchOps = append(
		patchOps,
		PatchOperation{
			Op:    "add",
			Path:  path,
			Value: value,
		},
	)
	patchOps = append(patchOps, envPatchOps...)

addDaprEnvVarsToContainers

// This function add Dapr environment variables to all the containers in any Dapr enabled pod.
// The containers can be injected or user defined.
func addDaprEnvVarsToContainers(containers []corev1.Container) []PatchOperation {
   portEnv := []corev1.EnvVar{
      {
         Name:  userContainerDaprHTTPPortName,
         Value: strconv.Itoa(sidecarHTTPPort),
      },
      {
         Name:  userContainerDaprGRPCPortName,
         Value: strconv.Itoa(sidecarAPIGRPCPort),
      },
   }
   envPatchOps := make([]PatchOperation, 0, len(containers))
   for i, container := range containers {
      path := fmt.Sprintf("%s/%d/env", containersPath, i)
      patchOps := getEnvPatchOperations(container.Env, portEnv, path)
      envPatchOps = append(envPatchOps, patchOps...)
   }
   return envPatchOps
}

分支流程:mTLS的处理

mtlsEnabled := mTLSEnabled(daprClient)
if mtlsEnabled {
   trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
   identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
}
func mTLSEnabled(daprClient scheme.Interface) bool {
   resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
   if err != nil {
      log.Errorf("Failed to load dapr configuration from k8s, use default value %t for mTLSEnabled: %s", defaultMtlsEnabled, err)
      return defaultMtlsEnabled
   }

   for _, c := range resp.Items {
      if c.GetName() == defaultConfig {
         return c.Spec.MTLSSpec.Enabled
      }
   }
   log.Infof("Dapr system configuration (%s) is not found, use default value %t for mTLSEnabled", defaultConfig, defaultMtlsEnabled)
   return defaultMtlsEnabled
}

分支处理:serviceaccount