pod_patch.go的源码学习
Dapr Injector 中的 pod_patch.go 的 代码
主流程
getPodPatchOperations() 是最重要的方法,injector 对 pod 的修改就在这里进行:
func (i *injector) getPodPatchOperations(ar *v1.AdmissionReview,
namespace, image, imagePullPolicy string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
......
return patchOps, nil
}
解析request,得到 pod 对象 (这里和前面重复了?):
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
errors.Wrap(err, "could not unmarshal raw object")
return nil, err
}
判断是否需要 injector 做处理:
if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
return nil, nil
}
// 判断是否启动了dapr,依据是是否设置 annotation "dapr.io/enabled" 为 true,默认为false
const daprEnabledKey = "dapr.io/enabled"
func isResourceDaprEnabled(annotations map[string]string) bool {
return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}
// 判断是否包含了 dapr 的 sidecar container
const sidecarContainerName = "daprd"
func podContainsSidecarContainer(pod *corev1.Pod) bool {
for _, c := range pod.Spec.Containers {
// 检测方式是循环pod中的所有container,检查是否有container的名字为 "daprd"
if c.Name == sidecarContainerName {
return true
}
}
return false
}
创建 daprd sidecar container:
sidecarContainer, err := getSidecarContainer(pod.Annotations, id, image, imagePullPolicy, req.Namespace, apiSrvAddress, placementAddress, tokenMount, trustAnchors, certChain, certKey, sentryAddress, mtlsEnabled, identity)
getSidecarContainer()的细节后面看,先走完主流程。
patchOps := []PatchOperation{}
envPatchOps := []PatchOperation{}
var path string
var value interface{}
if len(pod.Spec.Containers) == 0 {
// 如果pod的container数量为0(什么情况下会有这种没有container的pod?)
path = containersPath
value = []corev1.Container{*sidecarContainer}
} else {
// 将 daprd 的sidecar 加入
envPatchOps = addDaprEnvVarsToContainers(pod.Spec.Containers)
// TODO:path 的设值有什么规范或者要求?
path = "/spec/containers/-"
value = sidecarContainer
}
patchOps = append(
patchOps,
PatchOperation{
Op: "add",
Path: path,
Value: value,
},
)
patchOps = append(patchOps, envPatchOps...)
addDaprEnvVarsToContainers
// This function add Dapr environment variables to all the containers in any Dapr enabled pod.
// The containers can be injected or user defined.
func addDaprEnvVarsToContainers(containers []corev1.Container) []PatchOperation {
portEnv := []corev1.EnvVar{
{
Name: userContainerDaprHTTPPortName,
Value: strconv.Itoa(sidecarHTTPPort),
},
{
Name: userContainerDaprGRPCPortName,
Value: strconv.Itoa(sidecarAPIGRPCPort),
},
}
envPatchOps := make([]PatchOperation, 0, len(containers))
for i, container := range containers {
path := fmt.Sprintf("%s/%d/env", containersPath, i)
patchOps := getEnvPatchOperations(container.Env, portEnv, path)
envPatchOps = append(envPatchOps, patchOps...)
}
return envPatchOps
}
分支流程:mTLS的处理
mtlsEnabled := mTLSEnabled(daprClient)
if mtlsEnabled {
trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
}
func mTLSEnabled(daprClient scheme.Interface) bool {
resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
if err != nil {
log.Errorf("Failed to load dapr configuration from k8s, use default value %t for mTLSEnabled: %s", defaultMtlsEnabled, err)
return defaultMtlsEnabled
}
for _, c := range resp.Items {
if c.GetName() == defaultConfig {
return c.Spec.MTLSSpec.Enabled
}
}
log.Infof("Dapr system configuration (%s) is not found, use default value %t for mTLSEnabled", defaultConfig, defaultMtlsEnabled)
return defaultMtlsEnabled
}