Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion apis/placement/v1beta1/stageupdate_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type UpdateRunSpec struct {

// The resource snapshot index of the selected resources to be updated across clusters.
// The index represents a group of resource snapshots that includes all the resources a ResourcePlacement selected.
// +kubebuilder:validation:Required
// +kubebuilder:validation:Optional
ResourceSnapshotIndex string `json:"resourceSnapshotIndex"`

// The name of the update strategy that specifies the stages and the sequence
Expand Down Expand Up @@ -335,6 +335,10 @@ type UpdateRunStatus struct {
// +kubebuilder:validation:Optional
PolicyObservedClusterCount int `json:"policyObservedClusterCount,omitempty"`

// ResourceSnapshotName records the name of the resource snapshot that the update run is based on.
// +kubebuilder:validation:Optional
ResourceSnapshotName string `json:"resourceSnapshotName,omitempty"`

// ApplyStrategy is the apply strategy that the stagedUpdateRun is using.
// It is the same as the apply strategy in the CRP when the staged update run starts.
// The apply strategy is not updated during the update run even if it changes in the CRP.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,6 @@ spec:
type: string
required:
- placementName
- resourceSnapshotIndex
- stagedRolloutStrategyName
type: object
x-kubernetes-validations:
Expand Down Expand Up @@ -1856,6 +1855,10 @@ spec:
All clusters involved in the update run are selected from the list of clusters scheduled by the CRP according
to the current policy.
type: string
resourceSnapshotName:
description: ResourceSnapshotName records the name of the resource
snapshot that the update run is based on.
type: string
stagedUpdateStrategySnapshot:
description: |-
UpdateStrategySnapshot is the snapshot of the UpdateStrategy used for the update run.
Expand Down Expand Up @@ -1998,7 +2001,7 @@ spec:
description: |-
MaxConcurrency specifies the maximum number of clusters that can be updated concurrently within this stage.
Value can be an absolute number (ex: 5) or a percentage of the total clusters in the stage (ex: 50%).
Absolute number is calculated from percentage by rounding up.
Fractional results are rounded down. A minimum of 1 update is enforced.
If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1).
Defaults to 1.
pattern: ^((100|[0-9]{1,2})%|[0-9]+)$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ spec:
description: |-
MaxConcurrency specifies the maximum number of clusters that can be updated concurrently within this stage.
Value can be an absolute number (ex: 5) or a percentage of the total clusters in the stage (ex: 50%).
Absolute number is calculated from percentage by rounding up.
Fractional results are rounded down. A minimum of 1 update is enforced.
If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1).
Defaults to 1.
pattern: ^((100|[0-9]{1,2})%|[0-9]+)$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ spec:
type: string
required:
- placementName
- resourceSnapshotIndex
- stagedRolloutStrategyName
type: object
x-kubernetes-validations:
Expand Down Expand Up @@ -775,6 +774,10 @@ spec:
All clusters involved in the update run are selected from the list of clusters scheduled by the CRP according
to the current policy.
type: string
resourceSnapshotName:
description: ResourceSnapshotName records the name of the resource
snapshot that the update run is based on.
type: string
stagedUpdateStrategySnapshot:
description: |-
UpdateStrategySnapshot is the snapshot of the UpdateStrategy used for the update run.
Expand Down Expand Up @@ -917,7 +920,7 @@ spec:
description: |-
MaxConcurrency specifies the maximum number of clusters that can be updated concurrently within this stage.
Value can be an absolute number (ex: 5) or a percentage of the total clusters in the stage (ex: 50%).
Absolute number is calculated from percentage by rounding up.
Fractional results are rounded down. A minimum of 1 update is enforced.
If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1).
Defaults to 1.
pattern: ^((100|[0-9]{1,2})%|[0-9]+)$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ spec:
description: |-
MaxConcurrency specifies the maximum number of clusters that can be updated concurrently within this stage.
Value can be an absolute number (ex: 5) or a percentage of the total clusters in the stage (ex: 50%).
Absolute number is calculated from percentage by rounding up.
Fractional results are rounded down. A minimum of 1 update is enforced.
If not specified, all clusters in the stage are updated sequentially (effectively maxConcurrency = 1).
Defaults to 1.
pattern: ^((100|[0-9]{1,2})%|[0-9]+)$
Expand Down
6 changes: 1 addition & 5 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"reflect"
"strconv"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -93,11 +92,8 @@ func (r *Reconciler) executeUpdatingStage(
toBeUpdatedBindings []placementv1beta1.BindingObj,
) (time.Duration, error) {
updateRunStatus := updateRun.GetUpdateRunStatus()
updateRunSpec := updateRun.GetUpdateRunSpec()
updatingStageStatus := &updateRunStatus.StagesStatus[updatingStageIndex]
// The parse error is ignored because the initialization should have caught it.
resourceIndex, _ := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
resourceSnapshotName := fmt.Sprintf(placementv1beta1.ResourceSnapshotNameFmt, updateRunSpec.PlacementName, resourceIndex)
resourceSnapshotName := updateRunStatus.ResourceSnapshotName
updateRunRef := klog.KObj(updateRun)
// Create the map of the toBeUpdatedBindings.
toBeUpdatedBindingsMap := make(map[string]placementv1beta1.BindingObj, len(toBeUpdatedBindings))
Expand Down
62 changes: 43 additions & 19 deletions pkg/controllers/updaterun/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,26 +473,14 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t
updateRunSpec := updateRun.GetUpdateRunSpec()
placementName := placementKey.Name

snapshotIndex, err := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
if err != nil || snapshotIndex < 0 {
err := controller.NewUserError(fmt.Errorf("invalid resource snapshot index `%s` provided, expected an integer >= 0", updateRunSpec.ResourceSnapshotIndex))
klog.ErrorS(err, "Failed to parse the resource snapshot index", "updateRun", updateRunRef)
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
}

resourceSnapshotList, err := controller.ListAllResourceSnapshotWithAnIndex(ctx, r.Client, updateRunSpec.ResourceSnapshotIndex, placementName, placementKey.Namespace)
resourceSnapshotObjs, err := r.getResourceSnapshotObjs(ctx, updateRunSpec, placementName, placementKey, updateRunRef)
if err != nil {
klog.ErrorS(err, "Failed to list the resourceSnapshots associated with the placement",
"placement", placementKey, "resourceSnapshotIndex", snapshotIndex, "updateRun", updateRunRef)
// err can be retried.
return controller.NewAPIServerError(true, err)
return err
}

resourceSnapshotObjs := resourceSnapshotList.GetResourceSnapshotObjs()
if len(resourceSnapshotObjs) == 0 {
err := controller.NewUserError(fmt.Errorf("no resourceSnapshots with index `%d` found for placement `%s`", snapshotIndex, placementKey))
klog.ErrorS(err, "No specified resourceSnapshots found", "updateRun", updateRunRef)
err := controller.NewUserError(fmt.Errorf("no resourceSnapshots found for placement `%s`", placementKey))
klog.ErrorS(err, "No resourceSnapshots found", "updateRun", updateRunRef)
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
}
Expand All @@ -506,15 +494,23 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t
break
}
}

// No masterResourceSnapshot found.
if masterResourceSnapshot == nil {
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("no master resourceSnapshot found for placement `%s` with index `%d`", placementKey, snapshotIndex))
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("no master resourceSnapshot found for placement %s", placementKey))
klog.ErrorS(err, "Failed to find master resourceSnapshot", "updateRun", updateRunRef)
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
}
klog.V(2).InfoS("Found master resourceSnapshot", "placement", placementKey, "index", snapshotIndex, "updateRun", updateRunRef)

klog.InfoS("Found master resourceSnapshot", "placement", placementKey, "index", updateRun.GetUpdateRunSpec().ResourceSnapshotIndex, "updateRun", updateRunRef)

// Update the resource snapshot name in the UpdateRun status.
updateRun.GetUpdateRunStatus().ResourceSnapshotName = masterResourceSnapshot.GetName()
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the UpdateRun status with resource snapshot name", "updateRun", klog.KObj(updateRun), "resourceSnapshot", klog.KObj(masterResourceSnapshot))
// updateErr can be retried.
return controller.NewUpdateIgnoreConflictError(updateErr)
}

resourceSnapshotRef := klog.KObj(masterResourceSnapshot)
// Fetch all the matching overrides.
Expand Down Expand Up @@ -543,6 +539,34 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t
return nil
}

// getResourceSnapshotObjs retrieves the resource snapshot objects either by index or latest snapshots.
func (r *Reconciler) getResourceSnapshotObjs(ctx context.Context, updateRunSpec *placementv1beta1.UpdateRunSpec, placementName string, placementKey types.NamespacedName, updateRunRef klog.ObjectRef) ([]placementv1beta1.ResourceSnapshotObj, error) {
if updateRunSpec.ResourceSnapshotIndex != "" {
snapshotIndex, err := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
if err != nil || snapshotIndex < 0 {
err := controller.NewUserError(fmt.Errorf("invalid resource snapshot index `%s` provided, expected an integer >= 0", updateRunSpec.ResourceSnapshotIndex))
klog.ErrorS(err, "Failed to parse the resource snapshot index", "updateRun", updateRunRef)
return nil, fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
}

resourceSnapshotList, err := controller.ListAllResourceSnapshotWithAnIndex(ctx, r.Client, updateRunSpec.ResourceSnapshotIndex, placementName, placementKey.Namespace)
if err != nil {
klog.ErrorS(err, "Failed to list the resourceSnapshots associated with the placement",
"placement", placementKey, "resourceSnapshotIndex", snapshotIndex, "updateRun", updateRunRef)
return nil, controller.NewAPIServerError(true, err)
}
return resourceSnapshotList.GetResourceSnapshotObjs(), nil
}

latestResourceSnapshots, err := controller.ListLatestResourceSnapshots(ctx, r.Client, placementKey)
if err != nil {
klog.ErrorS(err, "Failed to list the latest resourceSnapshots associated with the placement",
"placement", placementKey, "updateRun", updateRunRef)
return nil, controller.NewAPIServerError(true, err)
}
return latestResourceSnapshots.GetResourceSnapshotObjs(), nil
}

// recordInitializationSucceeded records the successful initialization condition in the UpdateRun status.
func (r *Reconciler) recordInitializationSucceeded(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error {
updateRunStatus := updateRun.GetUpdateRunStatus()
Expand Down
35 changes: 32 additions & 3 deletions pkg/controllers/updaterun/initialization_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ var _ = Describe("Updaterun initialization tests", func() {
}

want := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride)
want.ResourceSnapshotName = "" // resourceSnapshot not created in this test
// No clusters should be selected in the first stage.
want.StagesStatus[0].Clusters = []placementv1beta1.ClusterUpdatingStatus{}
// All clusters should be selected in the second stage and sorted by name.
Expand Down Expand Up @@ -697,6 +698,7 @@ var _ = Describe("Updaterun initialization tests", func() {
}

want := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride)
want.ResourceSnapshotName = "" // resourceSnapshot not created in this test
for i := range want.StagesStatus[0].Clusters {
// Remove the CROs, as they are not added in this test.
want.StagesStatus[0].Clusters[i].ClusterResourceOverrideSnapshots = nil
Expand Down Expand Up @@ -777,7 +779,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots found for placement")

By("Checking update run status metrics are emitted")
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
Expand All @@ -792,7 +794,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots found for placement")

By("Checking update run status metrics are emitted")
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
Expand All @@ -807,7 +809,7 @@ var _ = Describe("Updaterun initialization tests", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization failed")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots with index `0` found")
validateFailedInitCondition(ctx, updateRun, "no resourceSnapshots found for placement")

By("Checking update run status metrics are emitted")
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
Expand All @@ -828,6 +830,30 @@ var _ = Describe("Updaterun initialization tests", func() {
validateUpdateRunMetricsEmitted(generateInitializationFailedMetric(updateRun))
})

It("Should put related ClusterResourceOverrides in the status with no resource index defined", func() {
By("Creating a new resource snapshot")
Expect(k8sClient.Create(ctx, resourceSnapshot)).To(Succeed())

By("Creating a new cluster resource override")
Expect(k8sClient.Create(ctx, clusterResourceOverride)).To(Succeed())

By("Creating a new clusterStagedUpdateRun")
updateRun.Spec.ResourceSnapshotIndex = ""
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the clusterStagedUpdateRun stats")
initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride)
initialized.ResourceSnapshotName = testResourceSnapshotName
want := generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, want, "")

By("Validating the clusterStagedUpdateRun initialized consistently")
validateClusterStagedUpdateRunStatusConsistently(ctx, updateRun, want, "")

By("Checking update run status metrics are emitted")
validateUpdateRunMetricsEmitted(generateProgressingMetric(updateRun))
})

It("Should put related ClusterResourceOverrides in the status", func() {
By("Creating a new resource snapshot")
Expect(k8sClient.Create(ctx, resourceSnapshot)).To(Succeed())
Expand All @@ -840,6 +866,7 @@ var _ = Describe("Updaterun initialization tests", func() {

By("Validating the clusterStagedUpdateRun stats")
initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride)
initialized.ResourceSnapshotName = testResourceSnapshotName
want := generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, want, "")

Expand Down Expand Up @@ -879,6 +906,7 @@ func generateSucceededInitializationStatus(
status := &placementv1beta1.UpdateRunStatus{
PolicySnapshotIndexUsed: policySnapshot.Labels[placementv1beta1.PolicyIndexLabel],
PolicyObservedClusterCount: 10,
ResourceSnapshotName: testResourceSnapshotName,
ApplyStrategy: crp.Spec.Strategy.ApplyStrategy.DeepCopy(),
UpdateStrategySnapshot: &updateStrategy.Spec,
StagesStatus: []placementv1beta1.StageUpdatingStatus{
Expand Down Expand Up @@ -939,6 +967,7 @@ func generateSucceededInitializationStatusForSmallClusters(
status := &placementv1beta1.UpdateRunStatus{
PolicySnapshotIndexUsed: policySnapshot.Labels[placementv1beta1.PolicyIndexLabel],
PolicyObservedClusterCount: 3,
ResourceSnapshotName: testResourceSnapshotName,
ApplyStrategy: crp.Spec.Strategy.ApplyStrategy.DeepCopy(),
UpdateStrategySnapshot: &updateStrategy.Spec,
StagesStatus: []placementv1beta1.StageUpdatingStatus{
Expand Down
Loading