解释下面代码: // rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes, // remaining within the constraints imposed by the update strategy. func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { logger := klog.FromContext(ctx) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods) if err != nil { return fmt.Errorf("couldn't get unavailable numbers: %v", err) } now := dsc.failedPodsBackoff.Clock.Now() // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any // are necessary, and let the core loop create new instances on those nodes. // // Assumptions: // * Expect manage loop to allow no more than one pod per node // * Expect manage loop will create new pods // * Expect manage loop will handle failed pods // * Deleted pods do not count as unavailable so that updates make progress when nodes are down // Invariants: // * The number of new pods that are unavailable must be less than maxUnavailable // * A node with an available old pod is a candidate for deletion if it does not violate other invariants // if maxSurge == 0 { var numUnavailable int var allowedReplacementPods []string var candidatePodsToDelete []string for nodeName, pods := range nodeToDaemonPods { newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) if !ok { // let the manage loop clean up this node, and treat it as an unavailable node logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName)) numUnavailable++ continue } switch { case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil: // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable numUnavailable++ case newPod != nil: // this pod is up to date, check its availability if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { // an unavailable new pod is counted against maxUnavailable numUnavailable++ } default: // this pod is old, it is an update candidate switch { case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): // the old pod isn't available, so it needs to be replaced logger.V(5).Info("DaemonSet pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName)) // record the replacement if allowedReplacementPods == nil { allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods)) } allowedReplacementPods = append(allowedReplacementPods, oldPod.Name) numUnavailable++ case numUnavailable >= maxUnavailable: // no point considering any other candidates continue default: logger.V(5).Info("DaemonSet pod on node is out of date, this is a candidate to replace", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName)) // record the candidate if candidatePodsToDelete == nil { candidatePodsToDelete = make([]string, 0, maxUnavailable) } candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name) } } } // use any of the candidates we can, including the allowedReplacemnntPods logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedReplacementPods), "maxUnavailable", maxUnavailable, "numUnavailable", numUnavailable, "candidates", len(candidatePodsToDelete)) remainingUnavailable := maxUnavailable - numUnavailable if remainingUnavailable < 0 { remainingUnavailable = 0 } if max := len(candidatePodsToDelete); remainingUnavailable > max { remainingUnavailable = max } oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash) } // When surging, we create new pods whenever an old pod is unavailable, and we can create up // to maxSurge extra pods // // Assumptions: // * Expect manage loop to allow no more than two pods per node, one old, one new // * Expect manage loop will create new pods if there are no pods on node // * Expect manage loop will handle failed pods // * Deleted pods do not count as unavailable so that updates make progress when nodes are down // Invariants: // * A node with an unavailable old pod is a candidate for immediate new pod creation // *
理解问题解释下面代码: // rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes, // remaining within the constraints imposed by the update strategy. func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { logger := klog.FromContext(ctx) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods) if err != nil { return fmt.Errorf("couldn't get unavailable numbers: %v", err) } now := dsc.failedPodsBackoff.Clock.Now() // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any // are necessary, and let the core loop create new instances on those nodes. // // Assumptions: // * Expect manage loop to allow no more than one pod per node // * Expect manage loop will create new pods // * Expect manage loop will handle failed pods // * Deleted pods do not count as unavailable so that updates make progress when nodes are down // Invariants: // * The number of new pods that are unavailable must be less than maxUnavailable // * A node with an available old pod is a candidate for deletion if it does not violate other invariants // if maxSurge == 0 { var numUnavailable int var allowedReplacementPods []string var candidatePodsToDelete []string for nodeName, pods := range nodeToDaemonPods { newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) if !ok { // let the manage loop clean up this node, and treat it as an unavailable node logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName)) numUnavailable++ continue } switch { case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil: // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable numUnavailable++ case newPod != nil: // this pod is up to date, check its availability if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { // an unavailable new pod is counted against maxUnavailable numUnavailable++ } default: // this pod is old, it is an update candidate switch { case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): // the old pod isn't available, so it needs to be replaced logger.V(5).Info("DaemonSet pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName)) // record the replacement if allowedReplacementPods == nil { allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods)) } allowedReplacementPods = append(allowedReplacementPods, oldPod.Name) numUnavailable++ case numUnavailable >= maxUnavailable: // no point considering any other candidates continue default: logger.V(5).Info("DaemonSet pod on node is out of date, this is a candidate to replace", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName)) // record the candidate if candidatePodsToDelete == nil { candidatePodsToDelete = make([]string, 0, maxUnavailable) } candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name) } } } // use any of the candidates we can, including the allowedReplacemnntPods logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedReplacementPods), "maxUnavailable", maxUnavailable, "numUnavailable", numUnavailable, "candidates", len(candidatePodsToDelete)) remainingUnavailable := maxUnavailable - numUnavailable if remainingUnavailable < 0 { remainingUnavailable = 0 } if max := len(candidatePodsToDelete); remainingUnavailable > max { remainingUnavailable = max } oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash) } // When surging, we create new pods whenever an old pod is unavailable, and we can create up // to maxSurge extra pods // // Assumptions: // * Expect manage loop to allow no more than two pods per node, one old, one new // * Expect manage loop will create new pods if there are no pods on node // * Expect manage loop will handle failed pods // * Deleted pods do not count as unavailable so that updates make progress when nodes are down // Invariants: // * A node with an unavailable old pod is a candidate for immediate new pod creation // *
已完成理解「解释下面代码: // rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes, // remaining within the constraints imposed by the update strategy. func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { logger := klog.FromContext(ctx) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ds, nodeList, nodeToDaemonPods) if err != nil { return fmt.Errorf("couldn't get unavailable numbers: %v", err) } now := dsc.failedPodsBackoff.Clock.Now() // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any // are necessary, and let the core loop create new instances on those nodes. // // Assumptions: // * Expect manage loop to allow no more than one pod per node // * Expect manage loop will create new pods // * Expect manage loop will handle failed pods // * Deleted pods do not count as unavailable so that updates make progress when nodes are down // Invariants: // * The number of new pods that are unavailable must be less than maxUnavailable // * A node with an available old pod is a candidate for deletion if it does not violate other invariants // if maxSurge == 0 { var numUnavailable int var allowedReplacementPods []string var candidatePodsToDelete []string for nodeName, pods := range nodeToDaemonPods { newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) if !ok { // let the manage loop clean up this node, and treat it as an unavailable node logger.V(3).Info("DaemonSet has excess pods on node, skipping to allow the core loop to process", "daemonset", klog.KObj(ds), "node", klog.KRef("", nodeName)) numUnavailable++ continue } switch { case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil: // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable numUnavailable++ case newPod != nil: // this pod is up to date, check its availability if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { // an unavailable new pod is counted against maxUnavailable numUnavailable++ } default: // this pod is old, it is an update candidate switch { case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): // the old pod isn't available, so it needs to be replaced logger.V(5).Info("DaemonSet pod on node is out of date and not available, allowing replacement", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName)) // record the replacement if allowedReplacementPods == nil { allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods)) } allowedReplacementPods = append(allowedReplacementPods, oldPod.Name) numUnavailable++ case numUnavailable >= maxUnavailable: // no point considering any other candidates continue default: logger.V(5).Info("DaemonSet pod on node is out of date, this is a candidate to replace", "daemonset", klog.KObj(ds), "pod", klog.KObj(oldPod), "node", klog.KRef("", nodeName)) // record the candidate if candidatePodsToDelete == nil { candidatePodsToDelete = make([]string, 0, maxUnavailable) } candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name) } } } // use any of the candidates we can, including the allowedReplacemnntPods logger.V(5).Info("DaemonSet allowing replacements", "daemonset", klog.KObj(ds), "replacements", len(allowedReplacementPods), "maxUnavailable", maxUnavailable, "numUnavailable", numUnavailable, "candidates", len(candidatePodsToDelete)) remainingUnavailable := maxUnavailable - numUnavailable if remainingUnavailable < 0 { remainingUnavailable = 0 } if max := len(candidatePodsToDelete); remainingUnavailable > max { remainingUnavailable = max } oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) return dsc.syncNodes(ctx, ds, oldPodsToDelete, nil, hash) } // When surging, we create new pods whenever an old pod is unavailable, and we can create up // to maxSurge extra pods // // Assumptions: // * Expect manage loop to allow no more than two pods per node, one old, one new // * Expect manage loop will create new pods if there are no pods on node // * Expect manage loop will handle failed pods // * Deleted pods do not count as unavailable so that updates make progress when nodes are down // Invariants: // * A node with an unavailable old pod is a candidate for immediate new pod creation // * 」
展开阅读网页
这段代码是 Kubernetes 集群中 DaemonSetsController 的一部分,用于实现 DaemonSet 的滚动更新(rolling update)。DaemonSet 确保所有符合条件的节点上都运行着一个 Pod 的副本。滚动更新是一种更新策略,允许逐步替换旧版本的 Pod 以使用新版本,同时保持服务的可用性。
以下是代码的主要逻辑和组件的解释:
-
函数定义:
rollingUpdate
函数接收一个上下文ctx
,一个 DaemonSet 对象ds
,一个节点列表nodeList
,以及一个用于标识新版本的哈希字符串hash
。
-
日志记录:
- 使用
klog.FromContext(ctx)
创建一个日志记录器。
- 使用
-
获取节点到 Pod 的映射:
- 调用
dsc.getNodesToDaemonPods
函数获取每个节点上运行的 Pod 列表。
- 调用
-
更新期望的节点计数:
dsc.updatedDesiredNodeCounts
函数计算最大 surge(额外 Pod 数量),最大不可用 Pod 数量,以及期望调度的节点数量。
-
时间获取:
- 获取当前时间,用于后续的 Pod 可用性检查。
-
无 surge 情况下的更新逻辑:
- 如果
maxSurge
为 0,表示不额外创建 Pod,而是逐步替换旧 Pod。 - 遍历每个节点,找到需要更新的 Pod,并根据可用性和其他条件决定是否删除旧 Pod 或创建新 Pod。
- 如果
-
可用性检查:
- 使用
podutil.IsPodAvailable
检查 Pod 是否可用。这通常基于 Pod 的启动时间和MinReadySeconds
参数。
- 使用
-
记录和删除旧 Pod:
- 根据条件记录需要替换的旧 Pod,并在满足
maxUnavailable
限制的情况下删除它们。
- 根据条件记录需要替换的旧 Pod,并在满足
-
同步节点:
- 最后,调用
dsc.syncNodes
函数同步节点上的 Pod,删除旧 Pod 并创建新 Pod。
- 最后,调用
-
有 surge 情况下的更新逻辑(代码未完整显示):
- 如果
maxSurge
大于 0,表示可以额外创建 Pod,同时逐步替换旧 Pod。
- 如果
代码中的注释详细解释了每个步骤的假设、期望行为和不变量(invariants)。这些注释对于理解代码的逻辑和设计至关重要。
请注意,这段代码只是一个片段,并没有显示完整的函数实现,特别是 surge 情况下的逻辑部分。此外,代码中可能还有其他辅助函数和类型定义,这些在 Kubernetes 的源代码中可以找到。