diff --git a/pkg/controllers/updaterun/execution.go b/pkg/controllers/updaterun/execution.go index c69e34f11..469bbdd96 100644 --- a/pkg/controllers/updaterun/execution.go +++ b/pkg/controllers/updaterun/execution.go @@ -22,12 +22,15 @@ import ( "fmt" "reflect" "strconv" + "strings" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" @@ -67,8 +70,12 @@ func (r *Reconciler) execute( updateRunStatus := updateRun.GetUpdateRunStatus() if updatingStageIndex < len(updateRunStatus.StagesStatus) { + maxConcurrency, err := calculateMaxConcurrencyValue(updateRunStatus, updatingStageIndex) + if err != nil { + return false, 0, err + } updatingStage := &updateRunStatus.StagesStatus[updatingStageIndex] - waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings) + waitTime, execErr := r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, maxConcurrency) if errors.Is(execErr, errStagedUpdatedAborted) { markStageUpdatingFailed(updatingStage, updateRun.GetGeneration(), execErr.Error()) return true, waitTime, execErr @@ -91,6 +98,7 @@ func (r *Reconciler) executeUpdatingStage( updateRun placementv1beta1.UpdateRunObj, updatingStageIndex int, toBeUpdatedBindings []placementv1beta1.BindingObj, + maxConcurrency int, ) (time.Duration, error) { updateRunStatus := updateRun.GetUpdateRunStatus() updateRunSpec := updateRun.GetUpdateRunSpec() @@ -105,25 +113,33 @@ func (r *Reconciler) executeUpdatingStage( bindingSpec := binding.GetBindingSpec() toBeUpdatedBindingsMap[bindingSpec.TargetCluster] = binding } - finishedClusterCount := 0 - // Go through each cluster in the stage and check if it's updated. - for i := range updatingStageStatus.Clusters { + finishedClusterCount := 0 + clusterUpdatingCount := 0 + var stuckClusterNames []string + var clusterUpdateErrors []error + // Go through each cluster in the stage and check if it's updating/succeeded/failed. + for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ { clusterStatus := &updatingStageStatus.Clusters[i] - clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) - if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) { - // The cluster is marked as failed to update. - failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName) - klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef) - return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()) - } - if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) { - // The cluster has been updated successfully. - finishedClusterCount++ - continue + if clusterUpdateSucceededCond == nil { + // The cluster is either updating or not started yet. + clusterUpdatingCount++ + } else { + if condition.IsConditionStatusFalse(clusterUpdateSucceededCond, updateRun.GetGeneration()) { + // The cluster is marked as failed to update. + failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName) + klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef) + return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()) + } + if condition.IsConditionStatusTrue(clusterUpdateSucceededCond, updateRun.GetGeneration()) { + // The cluster has been updated successfully. + finishedClusterCount++ + continue + } } - // The cluster is either updating or not started yet. + // The cluster needs to be processed. + clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName] if !condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) { // The cluster has not started updating yet. @@ -138,11 +154,13 @@ func (r *Reconciler) executeUpdatingStage( bindingSpec.ApplyStrategy = updateRunStatus.ApplyStrategy if err := r.Client.Update(ctx, binding); err != nil { klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "updateRun", updateRunRef) - return 0, controller.NewUpdateIgnoreConflictError(err) + clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err)) + continue } klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil { - return 0, err + clusterUpdateErrors = append(clusterUpdateErrors, err) + continue } } else { klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) @@ -151,20 +169,24 @@ func (r *Reconciler) executeUpdatingStage( bindingSpec.State = placementv1beta1.BindingStateBound if err := r.Client.Update(ctx, binding); err != nil { klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) - return 0, controller.NewUpdateIgnoreConflictError(err) + clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err)) + continue } klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil { - return 0, err + clusterUpdateErrors = append(clusterUpdateErrors, err) + continue } } else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.GetBindingStatus().Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.GetGeneration()) { klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil { - return 0, err + clusterUpdateErrors = append(clusterUpdateErrors, err) + continue } } else { if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil { - return clusterUpdatingWaitTime, updateErr + clusterUpdateErrors = append(clusterUpdateErrors, updateErr) + continue } } } @@ -172,8 +194,8 @@ func (r *Reconciler) executeUpdatingStage( if finishedClusterCount == 0 { markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration()) } - // No need to continue as we only support one cluster updating at a time for now. - return clusterUpdatingWaitTime, nil + // Need to continue as we need to process at most maxConcurrency number of clusters in parallel. + continue } // Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound. @@ -190,24 +212,35 @@ func (r *Reconciler) executeUpdatingStage( "bindingSpecInSync", inSync, "bindingState", bindingSpec.State, "bindingRolloutStarted", rolloutStarted, "binding", klog.KObj(binding), "updateRun", updateRunRef) markClusterUpdatingFailed(clusterStatus, updateRun.GetGeneration(), preemptedErr.Error()) - return 0, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error()) + clusterUpdateErrors = append(clusterUpdateErrors, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error())) + continue } finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun) + if updateErr != nil { + clusterUpdateErrors = append(clusterUpdateErrors, updateErr) + } if finished { finishedClusterCount++ - markUpdateRunProgressing(updateRun) + // The cluster has finished successfully, we can process another cluster in this round. + clusterUpdatingCount-- continue } else { // If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck. timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time) if timeElapsed > updateRunStuckThreshold { klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef) - markUpdateRunStuck(updateRun, updatingStageStatus.StageName, clusterStatus.ClusterName) + stuckClusterNames = append(stuckClusterNames, clusterStatus.ClusterName) } } - // No need to continue as we only support one cluster updating at a time for now. - return clusterUpdatingWaitTime, updateErr + } + + // After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing. + aggregateUpdateRunStatus(updateRun, updatingStageStatus.StageName, stuckClusterNames, finishedClusterCount) + + // Aggregate and return errors. + if len(clusterUpdateErrors) > 0 { + return 0, utilerrors.NewAggregate(clusterUpdateErrors) } if finishedClusterCount == len(updatingStageStatus.Clusters) { @@ -232,6 +265,7 @@ func (r *Reconciler) executeUpdatingStage( } return waitTime, nil } + // Some clusters are still updating. return clusterUpdatingWaitTime, nil } @@ -431,6 +465,31 @@ func (r *Reconciler) updateApprovalRequestAccepted(ctx context.Context, appReq p return nil } +// calculateMaxConcurrencyValue calculates the actual max concurrency value for a stage. +// It converts the IntOrString maxConcurrency (which can be an integer or percentage) to an integer value +// based on the total number of clusters in the stage. The value is rounded down. +func calculateMaxConcurrencyValue(status *placementv1beta1.UpdateRunStatus, stageIndex int) (int, error) { + specifiedMaxConcurrency := status.UpdateStrategySnapshot.Stages[stageIndex].MaxConcurrency + clusterCount := len(status.StagesStatus[stageIndex].Clusters) + // Round down the maxConcurrency to the number of clusters in the stage. + maxConcurrencyValue, err := intstr.GetScaledValueFromIntOrPercent(specifiedMaxConcurrency, clusterCount, false) + if err != nil { + return 0, err + } + return maxConcurrencyValue, nil +} + +// aggregateUpdateRunStatus aggregates the status of the update run based on the cluster update status. +// It marks the update run as stuck if any clusters are stuck, or as progressing if some clusters have finished updating. +func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string, finishedClusterCount int) { + if len(stuckClusterNames) > 0 { + markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", ")) + } else if finishedClusterCount > 0 { + // If there is no stuck cluster but some progress has been made, mark the update run as progressing. + markUpdateRunProgressing(updateRun) + } +} + // isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status. func isBindingSyncedWithClusterStatus(resourceSnapshotName string, updateRun placementv1beta1.UpdateRunObj, binding placementv1beta1.BindingObj, cluster *placementv1beta1.ClusterUpdatingStatus) bool { bindingSpec := binding.GetBindingSpec() @@ -544,14 +603,14 @@ func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun placementv1beta1.Upda } // markUpdateRunStuck marks the updateRun as stuck in memory. -func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clusterName string) { +func markUpdateRunStuck(updateRun placementv1beta1.UpdateRunObj, stageName, clusterNames string) { updateRunStatus := updateRun.GetUpdateRunStatus() meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{ Type: string(placementv1beta1.StagedUpdateRunConditionProgressing), Status: metav1.ConditionFalse, ObservedGeneration: updateRun.GetGeneration(), Reason: condition.UpdateRunStuckReason, - Message: fmt.Sprintf("The updateRun is stuck waiting for cluster %s in stage %s to finish updating, please check placement status for potential errors", clusterName, stageName), + Message: fmt.Sprintf("The updateRun is stuck waiting for cluster/clusters %s in stage %s to finish updating, please check placement status for potential errors", clusterNames, stageName), }) } diff --git a/pkg/controllers/updaterun/execution_test.go b/pkg/controllers/updaterun/execution_test.go index 2dbf4dcff..55646e679 100644 --- a/pkg/controllers/updaterun/execution_test.go +++ b/pkg/controllers/updaterun/execution_test.go @@ -17,12 +17,21 @@ limitations under the License. package updaterun import ( + "context" + "errors" + "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" @@ -398,3 +407,518 @@ func TestBuildApprovalRequestObject(t *testing.T) { }) } } + +func TestExecuteUpdatingStage_Error(t *testing.T) { + tests := []struct { + name string + updateRun *placementv1beta1.ClusterStagedUpdateRun + bindings []placementv1beta1.BindingObj + interceptorFunc *interceptor.Funcs + wantErr error + expectWaitTime time.Duration + }{ + { + name: "cluster update failed", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ClusterUpdatingConditionSucceeded), + Status: metav1.ConditionFalse, + ObservedGeneration: 1, + Reason: condition.ClusterUpdatingFailedReason, + Message: "cluster update failed", + }, + }, + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: nil, + interceptorFunc: nil, + wantErr: errors.New("the cluster `cluster-1` in the stage test-stage has failed"), + expectWaitTime: 0, + }, + { + name: "binding update failure", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 1, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + State: placementv1beta1.BindingStateScheduled, + }, + }, + }, + interceptorFunc: &interceptor.Funcs{ + Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { + return errors.New("simulated update error") + }, + }, + wantErr: errors.New("simulated update error"), + expectWaitTime: 0, + }, + { + name: "binding preemption", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ClusterUpdatingConditionStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.ClusterUpdatingStartedReason, + }, + }, + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 1, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + ResourceSnapshotName: "wrong-snapshot", + State: placementv1beta1.BindingStateBound, + }, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + }, + }, + }, + }, + }, + interceptorFunc: nil, + wantErr: errors.New("the binding of the updating cluster `cluster-1` in the stage `test-stage` is not up-to-date with the desired status"), + expectWaitTime: 0, + }, + { + name: "binding synced but state not bound - update binding state fails", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + // No conditions - cluster has not started updating yet. + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 1, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + ResourceSnapshotName: "test-placement-1-snapshot", // Already synced. + State: placementv1beta1.BindingStateScheduled, // But not Bound yet. + }, + }, + }, + interceptorFunc: &interceptor.Funcs{ + Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error { + return errors.New("failed to update binding state") + }, + }, + wantErr: errors.New("failed to update binding state"), + expectWaitTime: 0, + }, + { + name: "binding synced and bound but generation updated - update rolloutStarted fails", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + // No conditions - cluster has not started updating yet. + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 2, // Generation updated by scheduler. + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + ResourceSnapshotName: "test-placement-1-snapshot", // Already synced. + State: placementv1beta1.BindingStateBound, // Already Bound. + }, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, // Old generation - needs update. + Reason: condition.RolloutStartedReason, + }, + }, + }, + }, + }, + interceptorFunc: &interceptor.Funcs{ + SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { + // Fail the status update for rolloutStarted. + return errors.New("failed to update binding rolloutStarted status") + }, + }, + wantErr: errors.New("failed to update binding rolloutStarted status"), + expectWaitTime: 0, + }, + { + name: "binding synced, bound, rolloutStarted true, but binding has failed condition", + updateRun: &placementv1beta1.ClusterStagedUpdateRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-update-run", + Generation: 1, + }, + Spec: placementv1beta1.UpdateRunSpec{ + PlacementName: "test-placement", + ResourceSnapshotIndex: "1", + }, + Status: placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: []placementv1beta1.ClusterUpdatingStatus{ + { + ClusterName: "cluster-1", + // No conditions - cluster has not started updating yet. + }, + }, + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 1}, + }, + }, + }, + }, + }, + bindings: []placementv1beta1.BindingObj{ + &placementv1beta1.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "binding-1", + Generation: 1, + }, + Spec: placementv1beta1.ResourceBindingSpec{ + TargetCluster: "cluster-1", + ResourceSnapshotName: "test-placement-1-snapshot", // Already synced. + State: placementv1beta1.BindingStateBound, // Already Bound. + }, + Status: placementv1beta1.ResourceBindingStatus{ + Conditions: []metav1.Condition{ + { + Type: string(placementv1beta1.ResourceBindingRolloutStarted), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + Reason: condition.RolloutStartedReason, + }, + { + Type: string(placementv1beta1.ResourceBindingApplied), + Status: metav1.ConditionFalse, + ObservedGeneration: 1, + Reason: condition.ApplyFailedReason, + }, + }, + }, + }, + }, + interceptorFunc: nil, + wantErr: errors.New("cluster updating encountered an error at stage"), + expectWaitTime: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + scheme := runtime.NewScheme() + _ = placementv1beta1.AddToScheme(scheme) + + var fakeClient client.Client + objs := make([]client.Object, len(tt.bindings)) + for i := range tt.bindings { + objs[i] = tt.bindings[i] + } + if tt.interceptorFunc != nil { + fakeClient = interceptor.NewClient( + fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build(), + *tt.interceptorFunc, + ) + } else { + fakeClient = fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() + } + + r := &Reconciler{ + Client: fakeClient, + } + + // Execute the stage. + waitTime, gotErr := r.executeUpdatingStage(ctx, tt.updateRun, 0, tt.bindings, 1) + + // Verify error expectation. + if tt.wantErr != nil && gotErr == nil { + t.Fatalf("executeUpdatingStage() expected error containing %v, got nil", tt.wantErr) + } + if tt.wantErr == nil && gotErr != nil { + t.Fatalf("executeUpdatingStage() unexpected error: %v", gotErr) + } + + // Verify error message contains expected substring. + if tt.wantErr != nil && gotErr != nil { + if !strings.Contains(gotErr.Error(), tt.wantErr.Error()) { + t.Fatalf("executeUpdatingStage() expected error containing %v, got: %v", tt.wantErr, gotErr) + } + } + + // Verify wait time. + if waitTime != tt.expectWaitTime { + t.Fatalf("executeUpdatingStage() expected waitTime=%v, got: %v", tt.expectWaitTime, waitTime) + } + }) + } +} + +func TestCalculateMaxConcurrencyValue(t *testing.T) { + tests := []struct { + name string + maxConcurrency *intstr.IntOrString + clusterCount int + wantValue int + wantErr bool + }{ + { + name: "integer value - less than cluster count", + maxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + clusterCount: 10, + wantValue: 3, + wantErr: false, + }, + { + name: "integer value - equal to cluster count", + maxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}, + clusterCount: 10, + wantValue: 10, + wantErr: false, + }, + { + name: "integer value - greater than cluster count", + maxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 15}, + clusterCount: 10, + wantValue: 15, + wantErr: false, + }, + { + name: "percentage value - 50%", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, + clusterCount: 10, + wantValue: 5, + wantErr: false, + }, + { + name: "percentage value - 33% rounds down", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "33%"}, + clusterCount: 10, + wantValue: 3, + wantErr: false, + }, + { + name: "percentage value - 100%", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "100%"}, + clusterCount: 10, + wantValue: 10, + wantErr: false, + }, + { + name: "percentage value - 25% with 7 clusters", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "25%"}, + clusterCount: 7, + wantValue: 1, + wantErr: false, + }, + { + name: "zero clusters", + maxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + clusterCount: 0, + wantValue: 3, + wantErr: false, + }, + { + name: "non-zero percentage with zero clusters", + maxConcurrency: &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}, + clusterCount: 0, + wantValue: 0, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + status := &placementv1beta1.UpdateRunStatus{ + StagesStatus: []placementv1beta1.StageUpdatingStatus{ + { + StageName: "test-stage", + Clusters: make([]placementv1beta1.ClusterUpdatingStatus, tt.clusterCount), + }, + }, + UpdateStrategySnapshot: &placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "test-stage", + MaxConcurrency: tt.maxConcurrency, + }, + }, + }, + } + + gotValue, gotErr := calculateMaxConcurrencyValue(status, 0) + + if (gotErr != nil) != tt.wantErr { + t.Fatalf("calculateMaxConcurrencyValue() error = %v, wantErr %v", gotErr, tt.wantErr) + } + + if gotValue != tt.wantValue { + t.Fatalf("calculateMaxConcurrencyValue() = %v, want %v", gotValue, tt.wantValue) + } + }) + } +} diff --git a/pkg/controllers/updaterun/validation.go b/pkg/controllers/updaterun/validation.go index 27d557b77..b3c5e5e0a 100644 --- a/pkg/controllers/updaterun/validation.go +++ b/pkg/controllers/updaterun/validation.go @@ -234,23 +234,6 @@ func validateClusterUpdatingStatus( return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) } updatingStageIndex = curStage - // Collect the updating clusters. - var updatingClusters []string - for j := range stageStatus.Clusters { - clusterStartedCond := meta.FindStatusCondition(stageStatus.Clusters[j].Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted)) - clusterFinishedCond := meta.FindStatusCondition(stageStatus.Clusters[j].Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)) - if condition.IsConditionStatusTrue(clusterStartedCond, updateRun.GetGeneration()) && - !(condition.IsConditionStatusTrue(clusterFinishedCond, updateRun.GetGeneration()) || condition.IsConditionStatusFalse(clusterFinishedCond, updateRun.GetGeneration())) { - updatingClusters = append(updatingClusters, stageStatus.Clusters[j].ClusterName) - } - } - // We don't allow more than one clusters to be updating at the same time. - // TODO(wantjian): support multiple clusters updating at the same time. - if len(updatingClusters) > 1 { - unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("more than one cluster is updating in the stage `%s`, clusters: %v", stageStatus.StageName, updatingClusters)) - klog.ErrorS(unexpectedErr, "Detected more than one updating clusters in the stage", "updateRun", klog.KObj(updateRun)) - return -1, -1, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error()) - } } return updatingStageIndex, lastFinishedStageIndex, nil } diff --git a/pkg/controllers/updaterun/validation_test.go b/pkg/controllers/updaterun/validation_test.go index 0f01168e3..d6ad8215d 100644 --- a/pkg/controllers/updaterun/validation_test.go +++ b/pkg/controllers/updaterun/validation_test.go @@ -145,7 +145,7 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { wantLastFinishedStageIndex: -1, }, { - name: "determineUpdatignStage should return error if there are multiple clusters updating in an updating stage", + name: "determineUpdatignStage should not return error if there are multiple clusters updating in an updating stage", curStage: 0, updatingStageIndex: -1, lastFinishedStageIndex: -1, @@ -163,8 +163,8 @@ func TestValidateClusterUpdatingStatus(t *testing.T) { }, }, }, - wantErr: wrapErr(true, fmt.Errorf("more than one cluster is updating in the stage `test-stage`, clusters: [cluster-1 cluster-2]")), - wantUpdatingStageIndex: -1, + wantErr: nil, + wantUpdatingStageIndex: 0, wantLastFinishedStageIndex: -1, }, { diff --git a/test/e2e/cluster_staged_updaterun_test.go b/test/e2e/cluster_staged_updaterun_test.go index 9880c2eef..81dd10a00 100644 --- a/test/e2e/cluster_staged_updaterun_test.go +++ b/test/e2e/cluster_staged_updaterun_test.go @@ -27,6 +27,7 @@ import ( apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -37,7 +38,8 @@ import ( const ( // The current stage wait between clusters are 15 seconds - updateRunEventuallyDuration = time.Minute + updateRunEventuallyDuration = time.Minute + updateRunParallelEventuallyDuration = 20 * time.Second resourceSnapshotIndex1st = "0" resourceSnapshotIndex2nd = "1" @@ -1270,6 +1272,95 @@ var _ = Describe("test CRP rollout with staged update run", func() { } }) }) + + Context("Test parallel cluster updates with maxConcurrency set to 3", Ordered, func() { + var strategy *placementv1beta1.ClusterStagedUpdateStrategy + updateRunName := fmt.Sprintf(clusterStagedUpdateRunNameWithSubIndexTemplate, GinkgoParallelProcess(), 0) + + BeforeAll(func() { + // Create a test namespace and a configMap inside it on the hub cluster. + createWorkResources() + + // Create the CRP with external rollout strategy. + crp := &placementv1beta1.ClusterResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: crpName, + // Add a custom finalizer; this would allow us to better observe + // the behavior of the controllers. + Finalizers: []string{customDeletionBlockerFinalizer}, + }, + Spec: placementv1beta1.PlacementSpec{ + ResourceSelectors: workResourceSelector(), + Strategy: placementv1beta1.RolloutStrategy{ + Type: placementv1beta1.ExternalRolloutStrategyType, + }, + }, + } + Expect(hubClient.Create(ctx, crp)).To(Succeed(), "Failed to create CRP") + + // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency specified. + strategy = &placementv1beta1.ClusterStagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: strategyName, + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "parallel", + // Pick all clusters in a single stage. + LabelSelector: &metav1.LabelSelector{}, + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + }, + }, + }, + } + Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create ClusterStagedUpdateStrategy") + }) + + AfterAll(func() { + // Remove the custom deletion blocker finalizer from the CRP. + ensureCRPAndRelatedResourcesDeleted(crpName, allMemberClusters) + + // Delete the clusterStagedUpdateRun. + ensureClusterStagedUpdateRunDeletion(updateRunName) + + // Delete the clusterStagedUpdateStrategy. + ensureClusterUpdateRunStrategyDeletion(strategyName) + }) + + It("Should not rollout any resources to member clusters as there's no update run yet", checkIfRemovedWorkResourcesFromAllMemberClustersConsistently) + + It("Should have the latest resource snapshot", func() { + validateLatestClusterResourceSnapshot(crpName, resourceSnapshotIndex1st) + }) + + It("Should successfully schedule the crp", func() { + validateLatestClusterSchedulingPolicySnapshot(crpName, policySnapshotIndex1st, 3) + }) + + It("Should update crp status as pending rollout", func() { + crpStatusUpdatedActual := crpStatusWithExternalStrategyActual(nil, "", false, allMemberClusterNames, []string{"", "", ""}, []bool{false, false, false}, nil, nil) + Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP %s status as expected", crpName) + }) + + It("Should create a cluster staged update run successfully", func() { + createClusterStagedUpdateRunSucceed(updateRunName, crpName, resourceSnapshotIndex1st, strategyName) + }) + + It("Should complete the cluster staged update run with all 3 clusters updated in parallel", func() { + // With maxConcurrency=3, all 3 clusters should be updated in parallel. + // Each cluster waits 15 seconds, so total time should be under 20s. + csurSucceededActual := clusterStagedUpdateRunStatusSucceededActual(updateRunName, policySnapshotIndex1st, len(allMemberClusters), defaultApplyStrategy, &strategy.Spec, [][]string{{allMemberClusterNames[0], allMemberClusterNames[1], allMemberClusterNames[2]}}, nil, nil, nil) + Eventually(csurSucceededActual, updateRunParallelEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to validate updateRun %s succeeded", updateRunName) + checkIfPlacedWorkResourcesOnMemberClustersInUpdateRun(allMemberClusters) + }) + + It("Should update crp status as completed", func() { + crpStatusUpdatedActual := crpStatusWithExternalStrategyActual(workResourceIdentifiers(), resourceSnapshotIndex1st, true, allMemberClusterNames, + []string{resourceSnapshotIndex1st, resourceSnapshotIndex1st, resourceSnapshotIndex1st}, []bool{true, true, true}, nil, nil) + Eventually(crpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update CRP %s status as expected", crpName) + }) + }) }) // Note that this container cannot run in parallel with other containers. diff --git a/test/e2e/staged_updaterun_test.go b/test/e2e/staged_updaterun_test.go index b5a5428b1..357ec5759 100644 --- a/test/e2e/staged_updaterun_test.go +++ b/test/e2e/staged_updaterun_test.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -1101,6 +1102,94 @@ var _ = Describe("test RP rollout with staged update run", Label("resourceplacem } }) }) + + Context("Test parallel cluster updates with maxConcurrency set to 3", Ordered, func() { + var strategy *placementv1beta1.StagedUpdateStrategy + updateRunName := fmt.Sprintf(stagedUpdateRunNameWithSubIndexTemplate, GinkgoParallelProcess(), 0) + + BeforeAll(func() { + // Create the RP with external rollout strategy. + rp := &placementv1beta1.ResourcePlacement{ + ObjectMeta: metav1.ObjectMeta{ + Name: rpName, + Namespace: testNamespace, + // Add a custom finalizer; this would allow us to better observe + // the behavior of the controllers. + Finalizers: []string{customDeletionBlockerFinalizer}, + }, + Spec: placementv1beta1.PlacementSpec{ + ResourceSelectors: configMapSelector(), + Strategy: placementv1beta1.RolloutStrategy{ + Type: placementv1beta1.ExternalRolloutStrategyType, + }, + }, + } + Expect(hubClient.Create(ctx, rp)).To(Succeed(), "Failed to create RP") + + // Create a strategy with a single stage selecting all 3 clusters with maxConcurrency specified. + strategy = &placementv1beta1.StagedUpdateStrategy{ + ObjectMeta: metav1.ObjectMeta{ + Name: strategyName, + Namespace: testNamespace, + }, + Spec: placementv1beta1.UpdateStrategySpec{ + Stages: []placementv1beta1.StageConfig{ + { + Name: "parallel", + // Pick all clusters in a single stage. + LabelSelector: &metav1.LabelSelector{}, + MaxConcurrency: &intstr.IntOrString{Type: intstr.Int, IntVal: 3}, + }, + }, + }, + } + Expect(hubClient.Create(ctx, strategy)).To(Succeed(), "Failed to create StagedUpdateStrategy") + }) + + AfterAll(func() { + // Remove the custom deletion blocker finalizer from the RP. + ensureRPAndRelatedResourcesDeleted(types.NamespacedName{Name: rpName, Namespace: testNamespace}, allMemberClusters) + + // Delete the stagedUpdateRun. + ensureStagedUpdateRunDeletion(updateRunName, testNamespace) + + // Delete the stagedUpdateStrategy. + ensureStagedUpdateRunStrategyDeletion(strategyName, testNamespace) + }) + + It("Should not rollout any resources to member clusters as there's no update run yet", checkIfRemovedConfigMapFromAllMemberClustersConsistently) + + It("Should have the latest resource snapshot", func() { + validateLatestResourceSnapshot(rpName, testNamespace, resourceSnapshotIndex1st) + }) + + It("Should successfully schedule the rp", func() { + validateLatestSchedulingPolicySnapshot(rpName, testNamespace, policySnapshotIndex1st, 3) + }) + + It("Should update rp status as pending rollout", func() { + rpStatusUpdatedActual := rpStatusWithExternalStrategyActual(nil, "", false, allMemberClusterNames, []string{"", "", ""}, []bool{false, false, false}, nil, nil) + Eventually(rpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update RP %s/%s status as expected", testNamespace, rpName) + }) + + It("Should create a staged update run successfully", func() { + createStagedUpdateRunSucceed(updateRunName, testNamespace, rpName, resourceSnapshotIndex1st, strategyName) + }) + + It("Should complete the staged update run with all 3 clusters updated in parallel", func() { + // With maxConcurrency=3, all 3 clusters should be updated in parallel. + // Each cluster waits 15 seconds, so total time should be under 20s. + surSucceededActual := stagedUpdateRunStatusSucceededActual(updateRunName, testNamespace, policySnapshotIndex1st, len(allMemberClusters), defaultApplyStrategy, &strategy.Spec, [][]string{{allMemberClusterNames[0], allMemberClusterNames[1], allMemberClusterNames[2]}}, nil, nil, nil) + Eventually(surSucceededActual, updateRunParallelEventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to validate updateRun %s/%s succeeded", testNamespace, updateRunName) + checkIfPlacedWorkResourcesOnMemberClustersInUpdateRun(allMemberClusters) + }) + + It("Should update rp status as completed", func() { + rpStatusUpdatedActual := rpStatusWithExternalStrategyActual(appConfigMapIdentifiers(), resourceSnapshotIndex1st, true, allMemberClusterNames, + []string{resourceSnapshotIndex1st, resourceSnapshotIndex1st, resourceSnapshotIndex1st}, []bool{true, true, true}, nil, nil) + Eventually(rpStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update RP %s/%s status as expected", testNamespace, rpName) + }) + }) }) func createStagedUpdateStrategySucceed(strategyName, namespace string) *placementv1beta1.StagedUpdateStrategy {