Skip to content
120 changes: 90 additions & 30 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -67,8 +70,12 @@ func (r *Reconciler) execute(

updateRunStatus := updateRun.GetUpdateRunStatus()
if updatingStageIndex < len(updateRunStatus.StagesStatus) {
maxConcurrency, err := calculateMaxConcurrencyValue(updateRunStatus, updatingStageIndex)
if err != nil {
return false, 0, err
}
updatingStage := &updateRunStatus.StagesStatus[updatingStageIndex]
waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings)
waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, maxConcurrency)
if errors.Is(execErr, errStagedUpdatedAborted) {
markStageUpdatingFailed(updatingStage, updateRun.GetGeneration(), execErr.Error())
return true, waitTime, execErr
Expand All @@ -91,6 +98,7 @@ func (r *Reconciler) executeUpdatingStage(
updateRun placementv1beta1.UpdateRunObj,
updatingStageIndex int,
toBeUpdatedBindings []placementv1beta1.BindingObj,
maxConcurrency int,
) (time.Duration, error) {
updateRunStatus := updateRun.GetUpdateRunStatus()
updateRunSpec := updateRun.GetUpdateRunSpec()
Expand All @@ -105,24 +113,33 @@ func (r *Reconciler) executeUpdatingStage(
bindingSpec := binding.GetBindingSpec()
toBeUpdatedBindingsMap[bindingSpec.TargetCluster] = binding
}
finishedClusterCount := 0

// Go through each cluster in the stage and check if it's updated.
for i := range updatingStageStatus.Clusters {
finishedClusterCount := 0
clusterUpdatingCount := 0
var stuckClusterNames []string
var clusterUpdateErrors []error
// Go through each cluster in the stage and check if it's updating/succeeded/failed.
for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ {
clusterStatus := &updatingStageStatus.Clusters[i]
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster is marked as failed to update.
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef)
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error())
}
if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster has been updated successfully.
finishedClusterCount++
continue
if clusterUpdateSucceededCond == nil {
// The cluster is either updating or not started yet.
clusterUpdatingCount++
} else {
if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster is marked as failed to update.
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef)
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error())
}
if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) {
// The cluster has been updated successfully.
finishedClusterCount++
continue
}
}
// The cluster needs to be processed.
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
// The cluster is either updating or not started yet.
binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName]
if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) {
Expand All @@ -138,11 +155,13 @@ func (r *Reconciler) executeUpdatingStage(
bindingSpec.ApplyStrategy = updateRunStatus.ApplyStrategy
if err := r.Client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "updateRun", updateRunRef)
return 0, controller.NewUpdateIgnoreConflictError(err)
clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err))
continue
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
clusterUpdateErrors = append(clusterUpdateErrors, err)
continue
}
} else {
klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
Expand All @@ -151,29 +170,33 @@ func (r *Reconciler) executeUpdatingStage(
bindingSpec.State = placementv1beta1.BindingStateBound
if err := r.Client.Update(ctx, binding); err != nil {
klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
return 0, controller.NewUpdateIgnoreConflictError(err)
clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err))
continue
}
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
clusterUpdateErrors = append(clusterUpdateErrors, err)
continue
}
} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.GetBindingStatus().Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.GetGeneration()) {
klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
return 0, err
clusterUpdateErrors = append(clusterUpdateErrors, err)
continue
}
} else {
if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
return clusterUpdatingWaitTime, updateErr
clusterUpdateErrors = append(clusterUpdateErrors, updateErr)
continue
}
}
}
markClusterUpdatingStarted(clusterStatus, updateRun.GetGeneration())
if finishedClusterCount == 0 {
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, nil
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
continue
}

// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
Expand All @@ -190,24 +213,35 @@ func (r *Reconciler) executeUpdatingStage(
"bindingSpecInSync", inSync, "bindingState", bindingSpec.State,
"bindingRolloutStarted", rolloutStarted, "binding", klog.KObj(binding), "updateRun", updateRunRef)
markClusterUpdatingFailed(clusterStatus, updateRun.GetGeneration(), preemptedErr.Error())
return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error())
clusterUpdateErrors = append(clusterUpdateErrors, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error()))
continue
}

finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
if updateErr != nil {
clusterUpdateErrors = append(clusterUpdateErrors, updateErr)
}
if finished {
finishedClusterCount++
markUpdateRunProgressing(updateRun)
// The cluster has finished successfully, we can process another cluster in this round.
clusterUpdatingCount--
continue
} else {
// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
if timeElapsed > updateRunStuckThreshold {
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName)
stuckClusterNames = append(stuckClusterNames, clusterStatus.ClusterName)
}
}
// No need to continue as we only support one cluster updating at a time for now.
return clusterUpdatingWaitTime, updateErr
}

// After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing.
aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames, finishedClusterCount)

// After processing all clusters, aggregate and return errors.
if len(clusterUpdateErrors) > 0 {
return 0, utilerrors.NewAggregate(clusterUpdateErrors)
}

if finishedClusterCount == len(updatingStageStatus.Clusters) {
Expand All @@ -232,6 +266,7 @@ func (r *Reconciler) executeUpdatingStage(
}
return waitTime, nil
}
// Some clusters are still updating.
return clusterUpdatingWaitTime, nil
}

Expand Down Expand Up @@ -431,6 +466,31 @@ func (r *Reconciler) updateApprovalRequestAccepted(ctx context.Context, appReq p
return nil
}

// calculateMaxConcurrencyValue calculates the actual max concurrency value for a stage.
// It converts the IntOrString maxConcurrency (which can be an integer or percentage) to an integer value
// based on the total number of clusters in the stage. The value is rounded down.
func calculateMaxConcurrencyValue(status *placementv1beta1.UpdateRunStatus, stageIndex int) (int, error) {
specifiedMaxConcurrency := status.UpdateStrategySnapshot.Stages[stageIndex].MaxConcurrency
clusterCount := len(status.StagesStatus[stageIndex].Clusters)
// Round down the maxConcurrency to the number of clusters in the stage.
maxConcurrencyValue, err := intstr.GetScaledValueFromIntOrPercent(specifiedMaxConcurrency, clusterCount, false)
if err != nil {
return 0, err
}
return maxConcurrencyValue, nil
}

// aggregateUpdateRunStatus aggregates the status of the update run based on the cluster update status.
// It marks the update run as stuck if any clusters are stuck, or as progressing if some clusters have finished updating.
func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string, finishedClusterCount int) {
if len(stuckClusterNames) > 0 {
markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", "))
} else if finishedClusterCount > 0 {
// If there is no stuck cluster but some progress has been made, mark the update run as progressing.
markUpdateRunProgressing(updateRun)
}
}

// isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status.
func isBindingSyncedWithClusterStatus(resourceSnapshotName string, updateRun placementv1beta1.UpdateRunObj, binding placementv1beta1.BindingObj, cluster *placementv1beta1.ClusterUpdatingStatus) bool {
bindingSpec := binding.GetBindingSpec()
Expand Down Expand Up @@ -544,14 +604,14 @@ func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun placementv1beta1.Upda
}

// markUpdateRunStuck marks the updateRun as stuck in memory.
func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clusterName string) {
func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clusterNames string) {
updateRunStatus := updateRun.GetUpdateRunStatus()
meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.GetGeneration(),
Reason: condition.UpdateRunStuckReason,
Message: fmt.Sprintf("The updateRun is stuck waiting for cluster %s in stage %s to finish updating, please check placement status for potential errors", clusterName, stageName),
Message: fmt.Sprintf("The updateRun is stuck waiting for cluster/clusters %s in stage %s to finish updating, please check placement status for potential errors", clusterNames, stageName),
})
}

Expand Down
Loading
Loading