Injector的源码分析
- 1: Injector的代码实现
- 2: main.go的源码学习
- 3: config.go的源码学习
- 4: injector.go的源码学习
- 5: patch_operation.go的源码学习
- 6: pod_patch.go的源码学习
1 - Injector的代码实现
Inject的流程
以e2e中的 stateapp 为例。
应用的原始Deployment
tests/apps/stateapp/service.yaml
中是 stateapp 的 Service 定义和 Deployment定义。
Service的定义没有什么特殊:
kind: Service
apiVersion: v1
metadata:
name: stateapp
labels:
testapp: stateapp
spec:
selector:
testapp: stateapp
ports:
- protocol: TCP
port: 80
targetPort: 3000
type: LoadBalancer
deployment的定义:
apiVersion: apps/v1
kind: Deployment
metadata:
name: stateapp
labels:
testapp: stateapp
spec:
replicas: 1
selector:
matchLabels:
testapp: stateapp
template: # stateapp的pod定义
metadata:
labels:
testapp: stateapp
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "stateapp"
dapr.io/app-port: "3000"
spec: #stateapp的container定义,暂时pod中只定义了这个一个container
containers:
- name: stateapp
image: docker.io/YOUR_DOCKER_ALIAS/e2e-stateapp:dev
ports:
- containerPort: 3000
imagePullPolicy: Always
单独看 stateapp 的 pod 定义的 annotations ,
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "stateapp"
dapr.io/app-port: "3000"
源码
getPodPatchOperations:
func (i *injector) getPodPatchOperations(ar *v1beta1.AdmissionReview,
namespace, image string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
errors.Wrap(err, "could not unmarshal raw object")
return nil, err
}
log.Infof(
"AdmissionReview for Kind=%v, Namespace=%v Name=%v (%v) UID=%v "+
"patchOperation=%v UserInfo=%v",
req.Kind,
req.Namespace,
req.Name,
pod.Name,
req.UID,
req.Operation,
req.UserInfo,
)
if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
return nil, nil
}
...
这个info日志打印的例子如下:
{"instance":"dapr-sidecar-injector-5f6f4bb6df-n5dsk","level":"info","msg":"AdmissionReview for Kind=/v1, Kind=Pod, Namespace=dapr-tests Name= () UID=d0126a13-9efd-432e-894a-5ddbee55898c patchOperation=CREATE UserInfo={system:serviceaccount:kube-system:replicaset-controller 3e5de149-07a3-434e-a8de-209abee69760 [system:serviceaccounts system:serviceaccounts:kube-system system:authenticated] map[]}","scope":"dapr.injector","time":"2020-09-25T07:07:07.6482457Z","type":"log","ver":"edge"}
可以看到在 namespace dapr-tests 下 pod 有 CREATE operation时Injector有开始工作。
isResourceDaprEnabled(pod.Annotations)
检查是否是 dapr,判断的方式是看 pod 是否有名为dapr.io/enabled
的 annotation并且设置为true,缺省为false:
const (
daprEnabledKey = "dapr.io/enabled"
)
func isResourceDaprEnabled(annotations map[string]string) bool {
return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}
podContainsSidecarContainer 检查 pod 是不是已经包含 dapr的sidecar,判断的方式是看 container 的名字是不是 daprd
:
const (
sidecarContainerName = "daprd"
)
func podContainsSidecarContainer(pod *corev1.Pod) bool {
for _, c := range pod.Spec.Containers {
if c.Name == sidecarContainerName {
return true
}
}
return false
}
继续getPodPatchOperations():
id := getAppID(pod)
// Keep DNS resolution outside of getSidecarContainer for unit testing.
placementAddress := fmt.Sprintf("%s:80", getKubernetesDNS(placementService, namespace))
sentryAddress := fmt.Sprintf("%s:80", getKubernetesDNS(sentryService, namespace))
apiSrvAddress := fmt.Sprintf("%s:80", getKubernetesDNS(apiAddress, namespace))
getAppID(pod) 通过读取 annotation 来获取应用id,注意 “dapr.io/id” 已经废弃,1.0 之后将被删除,替换为dapr.io/app-id":
const (
appIDKey = "dapr.io/app-id"
// Deprecated, remove in v1.0
idKey = "dapr.io/id"
)
func getAppID(pod corev1.Pod) string {
id := getStringAnnotationOrDefault(pod.Annotations, appIDKey, "")
if id != "" {
return id
}
return getStringAnnotationOrDefault(pod.Annotations, idKey, pod.GetName())
}
mtlsEnabled的判断
var trustAnchors string
var certChain string
var certKey string
var identity string
mtlsEnabled := mTLSEnabled(daprClient)
if mtlsEnabled {
trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
}
mTLSEnabled判断的方式,居然是读取所有的namespace下的dapr configuration:
const (
// NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces
NamespaceAll string = ""
)
func mTLSEnabled(daprClient scheme.Interface) bool {
resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
if err != nil {
return defaultMtlsEnabled
}
for _, c := range resp.Items {
if c.GetName() == defaultConfig { // "daprsystem"
return c.Spec.MTLSSpec.Enabled
}
}
return defaultMtlsEnabled
}
通过读取k8s的资源来判断是否要开启 mtls,tests/config/dapr_mtls_off_config.yaml
有example内容:
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprsystem # 名字一定要是 daprsystem
spec:
mtls:
enabled: "false" # 在这里配置要不要开启 mtls
workloadCertTTL: "1h"
allowedClockSkew: "20m"
但这个坑货
E0925 09:37:53.480772 1 reflector.go:153] sigs.k8s.io/controller-runtime/pkg/cache/internal/informers_map.go:224: Failed to list *v1alpha1.Configuration: v1alpha1.ConfigurationList.Items: []v1alpha1.Configuration: v1alpha1.Configuration.Spec: v1alpha1.ConfigurationSpec.MTLSSpec: v1alpha1.MTLSSpec.Enabled: ReadBool: expect t or f, but found ", error found in #10 byte of ...|enabled":"false","wo|..., bigger context ...|pec":{"mtls":{"allowedClockSkew":"20m","enabled":"false","workloadCertTTL":"1h"}}},{"apiVersion":"da|...
生效的应用pod定义
apiVersion: v1
kind: Pod
metadata:
annotations:
dapr.io/app-id: stateapp
dapr.io/app-port: "3000"
dapr.io/enabled: "true"
dapr.io/sidecar-cpu-limit: "4.0"
dapr.io/sidecar-cpu-request: "0.5"
dapr.io/sidecar-memory-limit: 512Mi
dapr.io/sidecar-memory-request: 250Mi
creationTimestamp: "2020-09-25T07:07:07Z"
generateName: stateapp-567b6b9c6f-
labels:
pod-template-hash: 567b6b9c6f
testapp: stateapp
name: stateapp-567b6b9c6f-84kzb
namespace: dapr-tests
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: ReplicaSet
name: stateapp-567b6b9c6f
uid: 25a34367-79ed-4e19-868a-5b063a45b1f4
resourceVersion: "146616"
selfLink: /api/v1/namespaces/dapr-tests/pods/stateapp-567b6b9c6f-84kzb
uid: 0f4060df-0312-4d73-91c1-6f085462b33d
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
containers:
- env:
- name: DAPR_HTTP_PORT
value: "3500"
- name: DAPR_GRPC_PORT
value: "50001"
image: docker.io/skyao/e2e-stateapp:dev-linux-amd64
imagePullPolicy: Always
name: stateapp
ports:
- containerPort: 3000
name: http
protocol: TCP
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: default-token-qncjc
readOnly: true
- args:
- --mode
- kubernetes
- --dapr-http-port
- "3500"
- --dapr-grpc-port
- "50001"
- --dapr-internal-grpc-port
- "50002"
- --app-port
- "3000"
- --app-id
- stateapp
- --control-plane-address
- dapr-api.dapr-system.svc.cluster.local:80
- --app-protocol
- http
- --placement-host-address
- dapr-placement.dapr-system.svc.cluster.local:80
- --config
- ""
- --log-level
- info
- --app-max-concurrency
- "-1"
- --sentry-address
- dapr-sentry.dapr-system.svc.cluster.local:80
- --metrics-port
- "9090"
- --enable-mtls
command:
- /daprd
env:
- name: DAPR_HOST_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: NAMESPACE
value: dapr-tests
- name: DAPR_TRUST_ANCHORS
value: |
-----BEGIN CERTIFICATE-----
MIIB3TCCAYKgAwIBAgIRAMra+wjgMY6ABDtu3/vJ0NcwCgYIKoZIzj0EAwIwMTEX
MBUGA1UEChMOZGFwci5pby9zZW50cnkxFjAUBgNVBAMTDWNsdXN0ZXIubG9jYWww
HhcNMjAwOTI1MDU1ODAzWhcNMjEwOTI1MDU1ODAzWjAxMRcwFQYDVQQKEw5kYXBy
LmlvL3NlbnRyeTEWMBQGA1UEAxMNY2x1c3Rlci5sb2NhbDBZMBMGByqGSM49AgEG
CCqGSM49AwEHA0IABE/w/8YBtRJPYNJkcDM05e9PhrbGjBU/RQd09J909OJebDe8
rthysygWrcGYHYKziKK2Pyc1j4ua2xklLC5DFEWjezB5MA4GA1UdDwEB/wQEAwIC
BDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0TAQH/BAUwAwEB
/zAdBgNVHQ4EFgQUQ2v6OiayM9V4DPAU6UZHGe/nc1swGAYDVR0RBBEwD4INY2x1
c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNJADBGAiEAtVBx9vDXiRE3fXJTU2yK11W5
eo+Ce4+U6/vXDtzw4PUCIQDlLOB45ihOAhhLVLG9akhgwJOrgZLEW3FZjRabpSsb
og==
-----END CERTIFICATE-----
- name: DAPR_CERT_CHAIN
value: |
-----BEGIN CERTIFICATE-----
MIIBxDCCAWqgAwIBAgIQQ1sfEH4aYacFZwBau+aOozAKBggqhkjOPQQDAjAxMRcw
FQYDVQQKEw5kYXByLmlvL3NlbnRyeTEWMBQGA1UEAxMNY2x1c3Rlci5sb2NhbDAe
Fw0yMDA5MjUwNTU4MDNaFw0yMTA5MjUwNTU4MDNaMBgxFjAUBgNVBAMTDWNsdXN0
ZXIubG9jYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARhj7MQ1uiOkZvJ0AYV
uiFca/Iu9D5O98E5JN1mjCohRawk+QT1PjW05YtmyVji4Tt6ckIMvOXwG3aoTsGO
UbRio30wezAOBgNVHQ8BAf8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4E
FgQUTPUh0WWBB5baKs3aJjMzInVLX/EwHwYDVR0jBBgwFoAUQ2v6OiayM9V4DPAU
6UZHGe/nc1swGAYDVR0RBBEwD4INY2x1c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNI
ADBFAiBO0oCadeYyLM+RkSAYPSTtjMyEZ0wv1/BsWuUMg+KZ6AIhALHnT0pxiqlj
miYT4WZWvaBc17AbUh1efgV2DAaNKm54
-----END CERTIFICATE-----
- name: DAPR_CERT_KEY
value: |
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIDj6niLJ5ep+fDdY71bKyWl9RZHrXyRjND6pWySL2Q4UoAoGCCqGSM49
AwEHoUQDQgAEYY+zENbojpGbydAGFbohXGvyLvQ+TvfBOSTdZowqIUWsJPkE9T41
tOWLZslY4uE7enJCDLzl8Bt2qE7BjlG0Yg==
-----END EC PRIVATE KEY-----
- name: SENTRY_LOCAL_IDENTITY
value: default:dapr-tests
image: docker.io/skyao/daprd:dev-linux-amd64
imagePullPolicy: Always
livenessProbe:
failureThreshold: 3
httpGet:
path: /v1.0/healthz
port: 3500
scheme: HTTP
initialDelaySeconds: 3
periodSeconds: 6
successThreshold: 1
timeoutSeconds: 3
name: daprd
ports:
- containerPort: 3500
name: dapr-http
protocol: TCP
- containerPort: 50001
name: dapr-grpc
protocol: TCP
- containerPort: 50002
name: dapr-internal
protocol: TCP
- containerPort: 9090
name: dapr-metrics
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /v1.0/healthz
port: 3500
scheme: HTTP
initialDelaySeconds: 3
periodSeconds: 6
successThreshold: 1
timeoutSeconds: 3
resources:
limits:
cpu: "4"
memory: 512Mi
requests:
cpu: 500m
memory: 250Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: default-token-qncjc
readOnly: true
dnsPolicy: ClusterFirst
enableServiceLinks: true
nodeName: docker-desktop
priority: 0
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: default
serviceAccountName: default
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoExecute
key: node.kubernetes.io/not-ready
operator: Exists
tolerationSeconds: 300
- effect: NoExecute
key: node.kubernetes.io/unreachable
operator: Exists
tolerationSeconds: 300
volumes:
- name: default-token-qncjc
secret:
defaultMode: 420
secretName: default-token-qncjc
status:
conditions:
- lastProbeTime: null
lastTransitionTime: "2020-09-25T07:07:07Z"
status: "True"
type: Initialized
- lastProbeTime: null
lastTransitionTime: "2020-09-25T07:07:07Z"
message: 'containers with unready status: [daprd]'
reason: ContainersNotReady
status: "False"
type: Ready
- lastProbeTime: null
lastTransitionTime: "2020-09-25T07:07:07Z"
message: 'containers with unready status: [daprd]'
reason: ContainersNotReady
status: "False"
type: ContainersReady
- lastProbeTime: null
lastTransitionTime: "2020-09-25T07:07:07Z"
status: "True"
type: PodScheduled
containerStatuses:
- containerID: docker://26a1d85ac6e2accd833832681b8dc2aa809e3c0fcfa293398bd5e7c2e8bf3e2b
image: skyao/daprd:dev-linux-amd64
imageID: docker-pullable://skyao/daprd@sha256:387f3bf4e7397c43dca9ac2d248a9ce790b1c1888aa0d6de3b07107ce124752f
lastState:
terminated:
containerID: docker://26a1d85ac6e2accd833832681b8dc2aa809e3c0fcfa293398bd5e7c2e8bf3e2b
exitCode: 1
finishedAt: "2020-09-25T08:03:14Z"
reason: Error
startedAt: "2020-09-25T08:03:04Z"
name: daprd
ready: false
restartCount: 21
started: false
state:
waiting:
message: back-off 5m0s restarting failed container=daprd pod=stateapp-567b6b9c6f-84kzb_dapr-tests(0f4060df-0312-4d73-91c1-6f085462b33d)
reason: CrashLoopBackOff
- containerID: docker://737745ace04213c9519ad1f91e248015c89a80e2b3d61081c3c530d1c89bdbae
image: skyao/e2e-stateapp:dev-linux-amd64
imageID: docker-pullable://skyao/e2e-stateapp@sha256:16351b331f1338a61348c9a87fce43728369f1bf18ee69d9d45fb13db0283644
lastState: {}
name: stateapp
ready: true
restartCount: 0
started: true
state:
running:
startedAt: "2020-09-25T07:07:24Z"
hostIP: 192.168.65.3
phase: Running
podIP: 10.1.0.194
podIPs:
- ip: 10.1.0.194
qosClass: Burstable
startTime: "2020-09-25T07:07:07Z"
其他
injector自身的pod定义
dapr-sidecar-injector
apiVersion: v1
kind: Pod
metadata:
annotations:
prometheus.io/path: /
prometheus.io/port: "9090"
prometheus.io/scrape: "true"
creationTimestamp: "2020-09-25T05:57:37Z"
generateName: dapr-sidecar-injector-5f6f4bb6df-
labels:
app: dapr-sidecar-injector
app.kubernetes.io/component: sidecar-injector
app.kubernetes.io/managed-by: helm
app.kubernetes.io/name: dapr
app.kubernetes.io/part-of: dapr
app.kubernetes.io/version: dev-linux-amd64
pod-template-hash: 5f6f4bb6df
name: dapr-sidecar-injector-5f6f4bb6df-n5dsk
namespace: dapr-system
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: ReplicaSet
name: dapr-sidecar-injector-5f6f4bb6df
uid: ff47b1df-6da7-4a19-b99d-15622ca3a485
resourceVersion: "133143"
selfLink: /api/v1/namespaces/dapr-system/pods/dapr-sidecar-injector-5f6f4bb6df-n5dsk
uid: 40df3834-4df2-495a-aa26-5b2a22de7639
spec:
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- preference:
matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
weight: 1
containers:
- args:
- --log-level
- info
- --log-as-json
- --metrics-port
- "9090"
command:
- /injector
env:
- name: TLS_CERT_FILE
value: /dapr/cert/tls.crt
- name: TLS_KEY_FILE
value: /dapr/cert/tls.key
- name: SIDECAR_IMAGE
value: docker.io/skyao/daprd:dev-linux-amd64
- name: NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
image: docker.io/skyao/dapr:dev-linux-amd64
imagePullPolicy: Always
livenessProbe:
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 3
periodSeconds: 3
successThreshold: 1
timeoutSeconds: 1
name: dapr-sidecar-injector
ports:
- containerPort: 4000
name: https
protocol: TCP
- containerPort: 9090
name: metrics
protocol: TCP
readinessProbe:
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 3
periodSeconds: 3
successThreshold: 1
timeoutSeconds: 1
resources: {}
securityContext:
runAsUser: 1000
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /dapr/cert
name: cert
readOnly: true
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: dapr-operator-token-lgpvc
readOnly: true
dnsPolicy: ClusterFirst
enableServiceLinks: true
nodeName: docker-desktop
priority: 0
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: dapr-operator
serviceAccountName: dapr-operator
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoExecute
key: node.kubernetes.io/not-ready
operator: Exists
tolerationSeconds: 300
- effect: NoExecute
key: node.kubernetes.io/unreachable
operator: Exists
tolerationSeconds: 300
volumes:
- name: cert
secret:
defaultMode: 420
secretName: dapr-sidecar-injector-cert
- name: dapr-operator-token-lgpvc
secret:
defaultMode: 420
secretName: dapr-operator-token-lgpvc
status:
conditions:
- lastProbeTime: null
lastTransitionTime: "2020-09-25T05:57:37Z"
status: "True"
type: Initialized
- lastProbeTime: null
lastTransitionTime: "2020-09-25T05:58:10Z"
status: "True"
type: Ready
- lastProbeTime: null
lastTransitionTime: "2020-09-25T05:58:10Z"
status: "True"
type: ContainersReady
- lastProbeTime: null
lastTransitionTime: "2020-09-25T05:57:37Z"
status: "True"
type: PodScheduled
containerStatuses:
- containerID: docker://a820646b468a07eabdd89ca133f062a93e85256afc6c19c1bdf13b56980ec5e9
image: skyao/dapr:dev-linux-amd64
imageID: docker-pullable://skyao/dapr@sha256:77003eee9fd02d9fc24c2e9f385a6c86223bc35915cede98a8897c0dfc51ee61
lastState: {}
name: dapr-sidecar-injector
ready: true
restartCount: 0
started: true
state:
running:
startedAt: "2020-09-25T05:58:06Z"
hostIP: 192.168.65.3
phase: Running
podIP: 10.1.0.188
podIPs:
- ip: 10.1.0.188
qosClass: BestEffort
startTime: "2020-09-25T05:57:37Z"
2 - main.go的源码学习
Dapr injector 中的 main.go 文件的源码分析。
init() 方法
init() 进行初始化,包括 flag (logger, metric),
flag 设定和读取
func init() {
loggerOptions := logger.DefaultOptions()
// 这里设定了 `log-level` 和 `log-as-json`
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)
metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
// 这里设定了 `metrics-port` 和 `enable-metrics`
metricsExporter.Options().AttachCmdFlags(flag.StringVar, flag.BoolVar)
flag.Parse()
参考 injector pod yaml文件中 Command 段:
Command:
/injector
Args:
--log-level
info
--log-as-json
--enable-metrics
--metrics-port
9090
初始化 logger
// Apply options to all loggers
if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
log.Fatal(err)
} else {
log.Infof("log level set to: %s", loggerOptions.OutputLevel)
}
初始化 metrics
// Initialize dapr metrics exporter
if err := metricsExporter.Init(); err != nil {
log.Fatal(err)
}
// Initialize injector service metrics
if err := monitoring.InitMetrics(); err != nil {
log.Fatal(err)
}
main() 方法
获取配置
从环境变量中读取配置:
func main() {
logger.DaprVersion = version.Version()
log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())
ctx := signals.Context()
cfg, err := injector.GetConfigFromEnvironment()
if err != nil {
log.Fatalf("error getting config: %s", err)
}
......
}
获取daprClient
kubeClient := utils.GetKubeClient()
conf := utils.GetConfig()
daprClient, _ := scheme.NewForConfig(conf)
启动 healthz server
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
healthzErr := healthzServer.Run(ctx, healthzPort)
if healthzErr != nil {
log.Fatalf("failed to start healthz server: %s", healthzErr)
}
}()
service account
uids, err := injector.AllowedControllersServiceAccountUID(ctx, kubeClient)
if err != nil {
log.Fatalf("failed to get authentication uids from services accounts: %s", err)
}
创建 injector
injector.NewInjector(uids, cfg, daprClient, kubeClient).Run(ctx)
graceful shutdown
简单的sleep 5秒作为 graceful shutdown :
shutdownDuration := 5 * time.Second
log.Infof("allowing %s for graceful shutdown to complete", shutdownDuration)
<-time.After(shutdownDuration)
3 - config.go的源码学习
Dapr injector package中的 config.go 文件的源码分析。
代码实现
Config 结构体定义
Injector 相关的配置项定义:
// Config represents configuration options for the Dapr Sidecar Injector webhook server
type Config struct {
TLSCertFile string `envconfig:"TLS_CERT_FILE" required:"true"`
TLSKeyFile string `envconfig:"TLS_KEY_FILE" required:"true"`
SidecarImage string `envconfig:"SIDECAR_IMAGE" required:"true"`
SidecarImagePullPolicy string `envconfig:"SIDECAR_IMAGE_PULL_POLICY"`
Namespace string `envconfig:"NAMESPACE" required:"true"`
}
NewConfigWithDefaults() 方法
只设置了一个 SidecarImagePullPolicy 的默认值:
func NewConfigWithDefaults() Config {
return Config{
SidecarImagePullPolicy: "Always",
}
}
这个方法只被下面的 GetConfigFromEnvironment() 方法调用。
GetConfigFromEnvironment() 方法
从环境中获取配置
func GetConfigFromEnvironment() (Config, error) {
c := NewConfigWithDefaults()
err := envconfig.Process("", &c)
return c, err
}
envconfig.Process() 的代码实现会通过反射读取到 Config 结构体的信息,然后根据设定的环境变量名来读取。
这个方法的调用只有一个地方,在injector main 函数的开始位置:
func main() {
log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())
ctx := signals.Context()
cfg, err := injector.GetConfigFromEnvironment()
if err != nil {
log.Fatalf("error getting config: %s", err)
}
......
}
通过命令如 k describe pod dapr-sidecar-injector-6f656b7dd-sg87p -n dapr-system
拿到 injector pod 的yaml 文件,可以看到 Environment 的这一段:
Environment:
TLS_CERT_FILE: /dapr/cert/tls.crt
TLS_KEY_FILE: /dapr/cert/tls.key
SIDECAR_IMAGE: docker.io/skyao/daprd:dev-linux-amd64
SIDECAR_IMAGE_PULL_POLICY: IfNotPresent
NAMESPACE: dapr-system (v1:metadata.namespace)
injector yaml 备用
以下是完整的 injector pod yaml,留着备用:
Name: dapr-sidecar-injector-6f656b7dd-sg87p
Namespace: dapr-system
Priority: 0
Node: docker-desktop/192.168.65.3
Start Time: Mon, 19 Apr 2021 15:04:07 +0800
Labels: app=dapr-sidecar-injector
app.kubernetes.io/component=sidecar-injector
app.kubernetes.io/managed-by=helm
app.kubernetes.io/name=dapr
app.kubernetes.io/part-of=dapr
app.kubernetes.io/version=dev-linux-amd64
pod-template-hash=6f656b7dd
Annotations: prometheus.io/path: /
prometheus.io/port: 9090
prometheus.io/scrape: true
Status: Running
IP: 10.1.2.162
IPs:
IP: 10.1.2.162
Controlled By: ReplicaSet/dapr-sidecar-injector-6f656b7dd
Containers:
dapr-sidecar-injector:
Container ID: docker://544dabf00bdaba9cf8f320218dd0b7e6d2ebce7fbf5184ce162d58bc693162d9
Image: docker.io/skyao/dapr:dev-linux-amd64
Image ID: docker-pullable://skyao/dapr@sha256:b4843ee78eabf014e15749bc4daa5c249ce3d33f796a89aaba9d117dd3dc76c9
Ports: 4000/TCP, 9090/TCP
Host Ports: 0/TCP, 0/TCP
Command:
/injector
Args:
--log-level
info
--log-as-json
--enable-metrics
--metrics-port
9090
State: Running
Started: Mon, 19 Apr 2021 15:04:08 +0800
Ready: True
Restart Count: 0
Liveness: http-get http://:8080/healthz delay=3s timeout=1s period=3s #success=1 #failure=5
Readiness: http-get http://:8080/healthz delay=3s timeout=1s period=3s #success=1 #failure=5
Environment:
TLS_CERT_FILE: /dapr/cert/tls.crt
TLS_KEY_FILE: /dapr/cert/tls.key
SIDECAR_IMAGE: docker.io/skyao/daprd:dev-linux-amd64
SIDECAR_IMAGE_PULL_POLICY: IfNotPresent
NAMESPACE: dapr-system (v1:metadata.namespace)
Mounts:
/dapr/cert from cert (ro)
/var/run/secrets/kubernetes.io/serviceaccount from dapr-operator-token-cjpnd (ro)
Conditions:
Type Status
Initialized True
Ready True
ContainersReady True
PodScheduled True
Volumes:
cert:
Type: Secret (a volume populated by a Secret)
SecretName: dapr-sidecar-injector-cert
Optional: false
dapr-operator-token-cjpnd:
Type: Secret (a volume populated by a Secret)
SecretName: dapr-operator-token-cjpnd
Optional: false
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 17m default-scheduler Successfully assigned dapr-system/dapr-sidecar-injector-6f656b7dd-sg87p to docker-desktop
Normal Pulled 17m kubelet Container image "docker.io/skyao/dapr:dev-linux-amd64" already present on machine
Normal Created 17m kubelet Created container dapr-sidecar-injector
Normal Started 17m kubelet Started container dapr-sidecar-injector
4 - injector.go的源码学习
主流程代码
接口和结构体定义和创建
Injector 是Dapr运行时 sidecar 注入组件的接口。
// Injector is the interface for the Dapr runtime sidecar injection component
type Injector interface {
Run(ctx context.Context)
}
injector 结构体定义:
type injector struct {
config Config
deserializer runtime.Decoder
server *http.Server
kubeClient *kubernetes.Clientset
daprClient scheme.Interface
authUIDs []string
}
创建新的 injector 结构体(这个方法在injecot的main方法中被调用):
// NewInjector returns a new Injector instance with the given config
func NewInjector(authUIDs []string, config Config, daprClient scheme.Interface, kubeClient *kubernetes.Clientset) Injector {
mux := http.NewServeMux()
i := &injector{
config: config,
deserializer: serializer.NewCodecFactory(
runtime.NewScheme(),
).UniversalDeserializer(),
// 启动http server
server: &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
},
kubeClient: kubeClient,
daprClient: daprClient,
authUIDs: authUIDs,
}
// 给 k8s 调用的 mutate 端点
mux.HandleFunc("/mutate", i.handleRequest)
return i
}
Run()方法
最核心的run方法,
func (i *injector) Run(ctx context.Context) {
doneCh := make(chan struct{})
// 启动go routing,监听 ctx 和 doneCh 的信号
go func() {
select {
case <-ctx.Done():
log.Info("Sidecar injector is shutting down")
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Second*5,
)
defer cancel()
i.server.Shutdown(shutdownCtx) // nolint: errcheck
case <-doneCh:
}
}()
// 打印启动时的日志,这行日志可以通过
log.Infof("Sidecar injector is listening on %s, patching Dapr-enabled pods", i.server.Addr)
// TODO:这里有时会报错,证书有问题,导致injector无法正常工作,后面再来检查
err := i.server.ListenAndServeTLS(i.config.TLSCertFile, i.config.TLSKeyFile)
if err != http.ErrServerClosed {
log.Errorf("Sidecar injector error: %s", err)
}
close(doneCh)
}
可以对比通过 k logs dapr-sidecar-injector-86b8dc4dcd-bkbgw -n dapr-system
命令查看到的injecot 日志内容:
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"log level set to: info","scope":"dapr.injector","time":"2021-05-11T01:13:20.1904136Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"metrics server started on :9090/","scope":"dapr.metrics","time":"2021-05-11T01:13:20.1907347Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"starting Dapr Sidecar Injector -- version edge -- commit v1.0.0-rc.4-163-g9a4210a-dirty","scope":"dapr.injector","time":"2021-05-11T01:13:20.191669Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Healthz server is listening on :8080","scope":"dapr.injector","time":"2021-05-11T01:13:20.1928941Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Sidecar injector is listening on :4000, patching Dapr-enabled pods","scope":"dapr.injector","time":"2021-05-11T01:13:20.208587Z","type":"log","ver":"unknown"}
handleRequest方法
handleRequest方法用来处理来自 k8s api server的 mutate 调用:
mux.HandleFunc("/mutate", i.handleRequest)
func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
......
}
代码比较长,忽略部分细节代码。
读取请求的body,验证长度和content-type:
defer r.Body.Close()
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}
if len(body) == 0 {
log.Error("empty body")
http.Error(w, "empty body", http.StatusBadRequest)
return
}
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
log.Errorf("Content-Type=%s, expect application/json", contentType)
http.Error(
w,
"invalid Content-Type, expect `application/json`",
http.StatusUnsupportedMediaType,
)
return
}
反序列化body,并做一些基本的验证:
ar := v1.AdmissionReview{}
_, gvk, err := i.deserializer.Decode(body, nil, &ar)
if err != nil {
log.Errorf("Can't decode body: %v", err)
} else {
if !utils.StringSliceContains(ar.Request.UserInfo.UID, i.authUIDs) {
err = errors.Wrapf(err, "unauthorized request")
log.Error(err)
} else if ar.Request.Kind.Kind != "Pod" {
err = errors.Wrapf(err, "invalid kind for review: %s", ar.Kind)
log.Error(err)
} else {
patchOps, err = i.getPodPatchOperations(&ar, i.config.Namespace, i.config.SidecarImage, i.config.SidecarImagePullPolicy, i.kubeClient, i.daprClient)
}
}
getPodPatchOperations 是核心代码,后面细看。
统一处理前面可能产生的错误,以及 getPodPatchOperations() 的处理结果:
diagAppID := getAppIDFromRequest(ar.Request)
if err != nil {
admissionResponse = toAdmissionResponse(err)
log.Errorf("Sidecar injector failed to inject for app '%s'. Error: %s", diagAppID, err)
monitoring.RecordFailedSidecarInjectionCount(diagAppID, "patch")
} else if len(patchOps) == 0 {
// len(patchOps) == 0 表示什么都没改,返回 Allowed: true
admissionResponse = &v1.AdmissionResponse{
Allowed: true,
}
} else {
var patchBytes []byte
// 将 patchOps 序列化为json
patchBytes, err = json.Marshal(patchOps)
if err != nil {
admissionResponse = toAdmissionResponse(err)
} else {
// 返回AdmissionResponse
admissionResponse = &v1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: func() *v1.PatchType {
pt := v1.PatchTypeJSONPatch
return &pt
}(),
}
}
}
组装 AdmissionReview:
admissionReview := v1.AdmissionReview{}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
admissionReview.Response.UID = ar.Request.UID
admissionReview.SetGroupVersionKind(*gvk)
}
}
将应答序列化并返回:
log.Infof("ready to write response ...")
respBytes, err := json.Marshal(admissionReview)
if err != nil {
http.Error(
w,
err.Error(),
http.StatusInternalServerError,
)
log.Errorf("Sidecar injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err)
monitoring.RecordFailedSidecarInjectionCount(diagAppID, "response")
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
log.Error(err)
} else {
log.Infof("Sidecar injector succeeded injection for app '%s'", diagAppID)
monitoring.RecordSuccessfulSidecarInjectionCount(diagAppID)
}
帮助类代码
toAdmissionResponse方法
toAdmissionResponse 方法用于从一个 error 创建 k8s 的 AdmissionResponse :
// toAdmissionResponse is a helper function to create an AdmissionResponse
// with an embedded error
func toAdmissionResponse(err error) *v1.AdmissionResponse {
return &v1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
获取AppID
getAppIDFromRequest() 方法从 AdmissionRequest 中获取AppID:
func getAppIDFromRequest(req *v1.AdmissionRequest) string {
// default App ID
appID := ""
// if req is not given
if req == nil {
return appID
}
var pod corev1.Pod
// 解析pod的raw数据为json
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
log.Warnf("could not unmarshal raw object: %v", err)
} else {
// 然后从pod信息中获取appID
appID = getAppID(pod)
}
return appID
}
getAppID()方法的实现如下,首先读取 “dapr.io/app-id” 的 Annotation,如果没有,则取 pod 的 name 作为默认AppID:
const appIDKey = "dapr.io/app-id"
func getAppID(pod corev1.Pod) string {
return getStringAnnotationOrDefault(pod.Annotations, appIDKey, pod.GetName())
}
分支代码
ServiceAccount 相关代码
AllowedControllersServiceAccountUID()方法返回UID数组,这些是 webhook handler 上容许的 service account 列表:
var allowedControllersServiceAccounts = []string{
"replicaset-controller",
"deployment-controller",
"cronjob-controller",
"job-controller",
"statefulset-controller",
}
// AllowedControllersServiceAccountUID returns an array of UID, list of allowed service account on the webhook handler
func AllowedControllersServiceAccountUID(ctx context.Context, kubeClient *kubernetes.Clientset) ([]string, error) {
allowedUids := []string{}
for i, allowedControllersServiceAccount := range allowedControllersServiceAccounts {
saUUID, err := getServiceAccount(ctx, kubeClient, allowedControllersServiceAccount)
// i == 0 => "replicaset-controller" is the only one mandatory
if err != nil && i == 0 {
return nil, err
} else if err != nil {
log.Warnf("Unable to get SA %s UID (%s)", allowedControllersServiceAccount, err)
continue
}
allowedUids = append(allowedUids, saUUID)
}
return allowedUids, nil
}
func getServiceAccount(ctx context.Context, kubeClient *kubernetes.Clientset, allowedControllersServiceAccount string) (string, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, getKubernetesServiceAccountTimeoutSeconds*time.Second)
defer cancel()
sa, err := kubeClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(ctxWithTimeout, allowedControllersServiceAccount, metav1.GetOptions{})
if err != nil {
return "", err
}
return string(sa.ObjectMeta.UID), nil
}
5 - patch_operation.go的源码学习
代码非常简单,只定义了一个结构体 PatchOperation,用来表示要应用于Kubernetes资源的一个单独的变化。
// PatchOperation represents a discreet change to be applied to a Kubernetes resource
type PatchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}
6 - pod_patch.go的源码学习
主流程
getPodPatchOperations() 是最重要的方法,injector 对 pod 的修改就在这里进行:
func (i *injector) getPodPatchOperations(ar *v1.AdmissionReview,
namespace, image, imagePullPolicy string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
......
return patchOps, nil
}
解析request,得到 pod 对象 (这里和前面重复了?):
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
errors.Wrap(err, "could not unmarshal raw object")
return nil, err
}
判断是否需要 injector 做处理:
if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
return nil, nil
}
// 判断是否启动了dapr,依据是是否设置 annotation "dapr.io/enabled" 为 true,默认为false
const daprEnabledKey = "dapr.io/enabled"
func isResourceDaprEnabled(annotations map[string]string) bool {
return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}
// 判断是否包含了 dapr 的 sidecar container
const sidecarContainerName = "daprd"
func podContainsSidecarContainer(pod *corev1.Pod) bool {
for _, c := range pod.Spec.Containers {
// 检测方式是循环pod中的所有container,检查是否有container的名字为 "daprd"
if c.Name == sidecarContainerName {
return true
}
}
return false
}
创建 daprd sidecar container:
sidecarContainer, err := getSidecarContainer(pod.Annotations, id, image, imagePullPolicy, req.Namespace, apiSrvAddress, placementAddress, tokenMount, trustAnchors, certChain, certKey, sentryAddress, mtlsEnabled, identity)
getSidecarContainer()的细节后面看,先走完主流程。
patchOps := []PatchOperation{}
envPatchOps := []PatchOperation{}
var path string
var value interface{}
if len(pod.Spec.Containers) == 0 {
// 如果pod的container数量为0(什么情况下会有这种没有container的pod?)
path = containersPath
value = []corev1.Container{*sidecarContainer}
} else {
// 将 daprd 的sidecar 加入
envPatchOps = addDaprEnvVarsToContainers(pod.Spec.Containers)
// TODO:path 的设值有什么规范或者要求?
path = "/spec/containers/-"
value = sidecarContainer
}
patchOps = append(
patchOps,
PatchOperation{
Op: "add",
Path: path,
Value: value,
},
)
patchOps = append(patchOps, envPatchOps...)
addDaprEnvVarsToContainers
// This function add Dapr environment variables to all the containers in any Dapr enabled pod.
// The containers can be injected or user defined.
func addDaprEnvVarsToContainers(containers []corev1.Container) []PatchOperation {
portEnv := []corev1.EnvVar{
{
Name: userContainerDaprHTTPPortName,
Value: strconv.Itoa(sidecarHTTPPort),
},
{
Name: userContainerDaprGRPCPortName,
Value: strconv.Itoa(sidecarAPIGRPCPort),
},
}
envPatchOps := make([]PatchOperation, 0, len(containers))
for i, container := range containers {
path := fmt.Sprintf("%s/%d/env", containersPath, i)
patchOps := getEnvPatchOperations(container.Env, portEnv, path)
envPatchOps = append(envPatchOps, patchOps...)
}
return envPatchOps
}
分支流程:mTLS的处理
mtlsEnabled := mTLSEnabled(daprClient)
if mtlsEnabled {
trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
}
func mTLSEnabled(daprClient scheme.Interface) bool {
resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
if err != nil {
log.Errorf("Failed to load dapr configuration from k8s, use default value %t for mTLSEnabled: %s", defaultMtlsEnabled, err)
return defaultMtlsEnabled
}
for _, c := range resp.Items {
if c.GetName() == defaultConfig {
return c.Spec.MTLSSpec.Enabled
}
}
log.Infof("Dapr system configuration (%s) is not found, use default value %t for mTLSEnabled", defaultConfig, defaultMtlsEnabled)
return defaultMtlsEnabled
}