From a06e2f94a5510df98648471a0c9b74a2e1d0ed8d Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Wed, 5 Nov 2025 13:03:06 +0100 Subject: [PATCH 1/2] Introduce wait for cache utils --- .golangci.yml | 2 + .../internal/controllers/inplace_trigger.go | 29 +- .../machinedeployment_controller.go | 58 ++- .../machineset/machineset_controller.go | 103 +---- .../machineset/machineset_controller_test.go | 12 + .../topology/cluster/reconcile_state.go | 71 +--- internal/util/client/client.go | 266 +++++++++++++ internal/util/client/client_test.go | 368 ++++++++++++++++++ internal/util/client/metrics.go | 35 ++ 9 files changed, 728 insertions(+), 216 deletions(-) create mode 100644 internal/util/client/client.go create mode 100644 internal/util/client/client_test.go create mode 100644 internal/util/client/metrics.go diff --git a/.golangci.yml b/.golangci.yml index 4030de971c57..56752caf0dc2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -178,6 +178,8 @@ linters: alias: "" - pkg: sigs.k8s.io/cluster-api/internal/topology/names alias: topologynames + - pkg: sigs.k8s.io/cluster-api/internal/util/client + alias: "clientutil" # CAPD - pkg: sigs.k8s.io/cluster-api/test/infrastructure/docker/api/v1alpha3 alias: infrav1alpha3 diff --git a/controlplane/kubeadm/internal/controllers/inplace_trigger.go b/controlplane/kubeadm/internal/controllers/inplace_trigger.go index 7e49bceb5863..e16ea5230d90 100644 --- a/controlplane/kubeadm/internal/controllers/inplace_trigger.go +++ b/controlplane/kubeadm/internal/controllers/inplace_trigger.go @@ -18,11 +18,10 @@ package controllers import ( "context" - "time" + "fmt" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,6 +31,7 @@ import ( runtimehooksv1 "sigs.k8s.io/cluster-api/api/runtime/hooks/v1alpha1" "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal" "sigs.k8s.io/cluster-api/internal/hooks" + clientutil "sigs.k8s.io/cluster-api/internal/util/client" "sigs.k8s.io/cluster-api/internal/util/ssa" ) @@ -60,11 +60,8 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context // Wait until the cache observed the Machine with UpdateInProgressAnnotation to ensure subsequent reconciles // will observe it as well and accordingly don't trigger another in-place update concurrently. - if err := waitForCache(ctx, r.Client, machine, func(m *clusterv1.Machine) bool { - _, annotationSet := m.Annotations[clusterv1.UpdateInProgressAnnotation] - return annotationSet - }); err != nil { - return errors.Wrapf(err, "failed waiting for Machine %s to be updated in the cache after setting the %s annotation", klog.KObj(machine), clusterv1.UpdateInProgressAnnotation) + if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, fmt.Sprintf("setting the %s annotation", clusterv1.UpdateInProgressAnnotation), machine); err != nil { + return err } } @@ -141,13 +138,7 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context // Wait until the cache observed the Machine with PendingHooksAnnotation to ensure subsequent reconciles // will observe it as well and won't repeatedly call triggerInPlaceUpdate. - if err := waitForCache(ctx, r.Client, machine, func(m *clusterv1.Machine) bool { - return hooks.IsPending(runtimehooksv1.UpdateMachine, m) - }); err != nil { - return errors.Wrapf(err, "failed waiting for Machine %s to be updated in the cache after marking the UpdateMachine hook as pending", klog.KObj(machine)) - } - - return nil + return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "marking the UpdateMachine hook as pending", desiredMachine) } func (r *KubeadmControlPlaneReconciler) removeInitConfiguration(ctx context.Context, desiredKubeadmConfig *bootstrapv1.KubeadmConfig) error { @@ -176,13 +167,3 @@ func (r *KubeadmControlPlaneReconciler) removeInitConfiguration(ctx context.Cont } return nil } - -func waitForCache(ctx context.Context, c client.Client, machine *clusterv1.Machine, f func(m *clusterv1.Machine) bool) error { - return wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - m := &clusterv1.Machine{} - if err := c.Get(ctx, client.ObjectKeyFromObject(machine), m); err != nil { - return false, err - } - return f(m), nil - }) -} diff --git a/internal/controllers/machinedeployment/machinedeployment_controller.go b/internal/controllers/machinedeployment/machinedeployment_controller.go index 8c29442b70ca..af2e9ebe67d0 100644 --- a/internal/controllers/machinedeployment/machinedeployment_controller.go +++ b/internal/controllers/machinedeployment/machinedeployment_controller.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "strings" - "time" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -28,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" kerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -41,6 +39,7 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2" "sigs.k8s.io/cluster-api/controllers/external" + clientutil "sigs.k8s.io/cluster-api/internal/util/client" "sigs.k8s.io/cluster-api/internal/util/ssa" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/collections" @@ -349,24 +348,10 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c // Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of // the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of // a duplicate MachineSet. - var pollErrors []error - tmpMS := &clusterv1.MachineSet{} - if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) { - if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ms), tmpMS); err != nil { - // Do not return error here. Continue to poll even if we hit an error - // so that we avoid existing because of transient errors like network flakes. - // Capture all the errors and return the aggregate error if the poll fails eventually. - pollErrors = append(pollErrors, err) - return false, nil - } - return true, nil - }); err != nil { - return errors.Wrapf(kerrors.NewAggregate(pollErrors), "failed to get the MachineSet %s after creation", klog.KObj(ms)) + if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet creation", ms); err != nil { + return err } - // Report back creation timestamp, because legacy scale func uses it to sort machines. - // TODO(in-place): drop this as soon as handling of MD with paused rollouts is moved into rollout planner (see scale in machinedeployment_sync.go). - ms.CreationTimestamp = tmpMS.CreationTimestamp continue } @@ -387,17 +372,24 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c newReplicas := ptr.Deref(ms.Spec.Replicas, 0) if newReplicas < originalReplicas { - changes = append(changes, "replicas", newReplicas) - log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas), changes...) + changes = append(changes, fmt.Sprintf("replicas %d", newReplicas)) + log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas), "diff", strings.Join(changes, ",")) r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled down MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas) } if newReplicas > originalReplicas { - changes = append(changes, "replicas", newReplicas) - log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas), changes...) + changes = append(changes, fmt.Sprintf("replicas %d", newReplicas)) + log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas), "diff", strings.Join(changes, ",")) r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled up MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas) } if newReplicas == originalReplicas && len(changes) > 0 { - log.Info(fmt.Sprintf("MachineSet %s updated", ms.Name), changes...) + log.Info(fmt.Sprintf("MachineSet %s updated", ms.Name), "diff", strings.Join(changes, ",")) + } + + // Only wait for cache if the object was changed. + if originalMS.ResourceVersion != ms.ResourceVersion { + if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet update", ms); err != nil { + return err + } } } @@ -412,37 +404,37 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c return nil } -func getAnnotationChanges(originalMS *clusterv1.MachineSet, ms *clusterv1.MachineSet) []any { - changes := []any{} +func getAnnotationChanges(originalMS *clusterv1.MachineSet, ms *clusterv1.MachineSet) []string { + changes := []string{} if originalMS.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation] != ms.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation] { if value, ok := ms.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation]; ok { - changes = append(changes, clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, value) + changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, value)) } else { - changes = append(changes, clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, "(annotation removed)") + changes = append(changes, fmt.Sprintf("%s removed", clusterv1.MachineSetMoveMachinesToMachineSetAnnotation)) } } if originalMS.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation] != ms.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation] { if value, ok := ms.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation]; ok { - changes = append(changes, clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, value) + changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, value)) } else { - changes = append(changes, clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, "(annotation removed)") + changes = append(changes, fmt.Sprintf("%s removed", clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation)) } } if originalMS.Annotations[clusterv1.AcknowledgedMoveAnnotation] != ms.Annotations[clusterv1.AcknowledgedMoveAnnotation] { if value, ok := ms.Annotations[clusterv1.AcknowledgedMoveAnnotation]; ok { - changes = append(changes, clusterv1.AcknowledgedMoveAnnotation, value) + changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.AcknowledgedMoveAnnotation, value)) } else { - changes = append(changes, clusterv1.AcknowledgedMoveAnnotation, "(annotation removed)") + changes = append(changes, fmt.Sprintf("%s removed", clusterv1.AcknowledgedMoveAnnotation)) } } if originalMS.Annotations[clusterv1.DisableMachineCreateAnnotation] != ms.Annotations[clusterv1.DisableMachineCreateAnnotation] { if value, ok := ms.Annotations[clusterv1.DisableMachineCreateAnnotation]; ok { - changes = append(changes, clusterv1.DisableMachineCreateAnnotation, value) + changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.DisableMachineCreateAnnotation, value)) } else { - changes = append(changes, clusterv1.DisableMachineCreateAnnotation, "(annotation removed)") + changes = append(changes, fmt.Sprintf("%s removed", clusterv1.DisableMachineCreateAnnotation)) } } return changes diff --git a/internal/controllers/machineset/machineset_controller.go b/internal/controllers/machineset/machineset_controller.go index 23508f2f0132..70e645446a78 100644 --- a/internal/controllers/machineset/machineset_controller.go +++ b/internal/controllers/machineset/machineset_controller.go @@ -34,7 +34,6 @@ import ( kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -54,6 +53,7 @@ import ( "sigs.k8s.io/cluster-api/internal/controllers/machine" "sigs.k8s.io/cluster-api/internal/hooks" topologynames "sigs.k8s.io/cluster-api/internal/topology/names" + clientutil "sigs.k8s.io/cluster-api/internal/util/client" "sigs.k8s.io/cluster-api/internal/util/inplace" "sigs.k8s.io/cluster-api/internal/util/ssa" "sigs.k8s.io/cluster-api/util" @@ -72,13 +72,6 @@ import ( var ( // machineSetKind contains the schema.GroupVersionKind for the MachineSet type. machineSetKind = clusterv1.GroupVersion.WithKind("MachineSet") - - // stateConfirmationTimeout is the amount of time allowed to wait for desired state. - stateConfirmationTimeout = 10 * time.Second - - // stateConfirmationInterval is the amount of time between polling for the desired state. - // The polling is against a local memory cache. - stateConfirmationInterval = 100 * time.Millisecond ) const ( @@ -421,7 +414,7 @@ func (r *Reconciler) triggerInPlaceUpdate(ctx context.Context, s *scope) (ctrl.R // Wait until the cache observed the Machine with PendingHooksAnnotation to ensure subsequent reconciles // will observe it as well and won't repeatedly call the logic to trigger in-place update. - if err := r.waitForMachinesInPlaceUpdateStarted(ctx, machinesTriggeredInPlace); err != nil { + if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "starting in-place updates", machinesTriggeredInPlace...); err != nil { errs = append(errs, err) } if len(errs) > 0 { @@ -900,7 +893,7 @@ func (r *Reconciler) createMachines(ctx context.Context, s *scope, machinesToAdd } // Wait for cache update to ensure following reconcile gets latest change. - return ctrl.Result{}, r.waitForMachinesCreation(ctx, machinesAdded) + return ctrl.Result{}, clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Machine creation", machinesAdded...) } func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDelete int) (ctrl.Result, error) { @@ -948,7 +941,7 @@ func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDel } // Wait for cache update to ensure following reconcile gets latest change. - if err := r.waitForMachinesDeletion(ctx, machinesDeleted); err != nil { + if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion", machinesDeleted...); err != nil { errs = append(errs, err) } if len(errs) > 0 { @@ -1061,7 +1054,7 @@ func (r *Reconciler) startMoveMachines(ctx context.Context, s *scope, targetMSNa } // Wait for cache update to ensure following reconcile gets latest change. - if err := r.waitForMachinesStartedMove(ctx, machinesMoved); err != nil { + if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "moving Machines", machinesMoved...); err != nil { errs = append(errs, err) } if len(errs) > 0 { @@ -1242,92 +1235,6 @@ func (r *Reconciler) adoptOrphan(ctx context.Context, machineSet *clusterv1.Mach return r.Client.Patch(ctx, machine, patch) } -func (r *Reconciler) waitForMachinesCreation(ctx context.Context, machines []*clusterv1.Machine) error { - pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) { - for _, machine := range machines { - key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name} - if err := r.Client.Get(ctx, key, &clusterv1.Machine{}); err != nil { - if apierrors.IsNotFound(err) { - return false, nil - } - return false, err - } - } - return true, nil - }) - - if pollErr != nil { - return errors.Wrap(pollErr, "failed waiting for Machines to be created") - } - return nil -} - -func (r *Reconciler) waitForMachinesDeletion(ctx context.Context, machines []*clusterv1.Machine) error { - pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) { - for _, machine := range machines { - m := &clusterv1.Machine{} - key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name} - if err := r.Client.Get(ctx, key, m); err != nil { - if apierrors.IsNotFound(err) { - continue - } - return false, err - } - if m.DeletionTimestamp.IsZero() { - return false, nil - } - } - return true, nil - }) - - if pollErr != nil { - return errors.Wrap(pollErr, "failed waiting for Machines to be deleted") - } - return nil -} - -func (r *Reconciler) waitForMachinesStartedMove(ctx context.Context, machines []*clusterv1.Machine) error { - pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) { - for _, machine := range machines { - m := &clusterv1.Machine{} - key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name} - if err := r.Client.Get(ctx, key, m); err != nil { - return false, err - } - if _, annotationSet := m.Annotations[clusterv1.UpdateInProgressAnnotation]; !annotationSet { - return false, nil - } - } - return true, nil - }) - - if pollErr != nil { - return errors.Wrap(pollErr, "failed waiting for Machines to start move") - } - return nil -} - -func (r *Reconciler) waitForMachinesInPlaceUpdateStarted(ctx context.Context, machines []*clusterv1.Machine) error { - pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) { - for _, machine := range machines { - m := &clusterv1.Machine{} - key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name} - if err := r.Client.Get(ctx, key, m); err != nil { - return false, err - } - if !hooks.IsPending(runtimehooksv1.UpdateMachine, m) { - return false, nil - } - } - return true, nil - }) - - if pollErr != nil { - return errors.Wrap(pollErr, "failed waiting for Machines to complete move") - } - return nil -} - // MachineToMachineSets is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation // for MachineSets that might adopt an orphaned Machine. func (r *Reconciler) MachineToMachineSets(ctx context.Context, o client.Object) []ctrl.Request { diff --git a/internal/controllers/machineset/machineset_controller_test.go b/internal/controllers/machineset/machineset_controller_test.go index 4774d45dfb1d..3531ceab026d 100644 --- a/internal/controllers/machineset/machineset_controller_test.go +++ b/internal/controllers/machineset/machineset_controller_test.go @@ -2679,6 +2679,18 @@ func TestMachineSetReconciler_createMachines(t *testing.T) { infraTmpl, ).WithInterceptorFuncs(tt.interceptorFuncs(&i)).Build() + // TODO(controller-runtime-0.23): This workaround is needed because controller-runtime v0.22 does not set resourceVersion correctly with SSA (fixed with v0.23). + fakeClient = interceptor.NewClient(fakeClient, interceptor.Funcs{ + Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + clientObject, ok := obj.(client.Object) + if !ok { + return errors.Errorf("error during object creation: unexpected ApplyConfiguration") + } + clientObject.SetResourceVersion("1") + return c.Apply(ctx, obj, opts...) + }, + }) + r := &Reconciler{ Client: fakeClient, recorder: record.NewFakeRecorder(32), diff --git a/internal/controllers/topology/cluster/reconcile_state.go b/internal/controllers/topology/cluster/reconcile_state.go index b57bfa9c2384..4ea82cb9b042 100644 --- a/internal/controllers/topology/cluster/reconcile_state.go +++ b/internal/controllers/topology/cluster/reconcile_state.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "strings" - "time" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" @@ -30,7 +29,6 @@ import ( kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage/names" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -48,6 +46,7 @@ import ( "sigs.k8s.io/cluster-api/internal/topology/clustershim" topologynames "sigs.k8s.io/cluster-api/internal/topology/names" "sigs.k8s.io/cluster-api/internal/topology/ownerrefs" + clientutil "sigs.k8s.io/cluster-api/internal/util/client" "sigs.k8s.io/cluster-api/util" ) @@ -505,18 +504,8 @@ func (r *Reconciler) reconcileCluster(ctx context.Context, s *scope.Scope) error // Note: It is good enough to check that the resource version changed. Other controllers might have updated the // Cluster as well, but the combination of the patch call above without a conflict and a changed resource // version here guarantees that we see the changes of our own update. - err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - key := client.ObjectKey{Namespace: s.Current.Cluster.GetNamespace(), Name: s.Current.Cluster.GetName()} - cachedCluster := &clusterv1.Cluster{} - if err := r.Client.Get(ctx, key, cachedCluster); err != nil { - return false, err - } - return s.Current.Cluster.GetResourceVersion() != cachedCluster.GetResourceVersion(), nil - }) - if err != nil { - return errors.Wrapf(err, "failed waiting for Cluster %s to be updated in the cache after patch", klog.KObj(s.Current.Cluster)) - } - return nil + // Note: Using DeepCopy to not modify s.Current.Cluster as it's not trivial to figure out what impact that would have. + return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Cluster update", s.Current.Cluster.DeepCopy()) } // reconcileMachineDeployments reconciles the desired state of the MachineDeployment objects. @@ -678,18 +667,8 @@ func (r *Reconciler) createMachineDeployment(ctx context.Context, s *scope.Scope // Wait until MachineDeployment is visible in the cache. // Note: We have to do this because otherwise using a cached client in current state could // miss a newly created MachineDeployment (because the cache might be stale). - err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - key := client.ObjectKey{Namespace: md.Object.Namespace, Name: md.Object.Name} - if err := r.Client.Get(ctx, key, &clusterv1.MachineDeployment{}); err != nil { - if apierrors.IsNotFound(err) { - return false, nil - } - return false, err - } - return true, nil - }) - if err != nil { - return errors.Wrapf(err, "failed waiting for MachineDeployment %s to be visible in the cache after create", md.Object.Kind) + if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineDeployment creation", md.Object); err != nil { + return err } // If the MachineDeployment has defined a MachineHealthCheck reconcile it. @@ -812,16 +791,8 @@ func (r *Reconciler) updateMachineDeployment(ctx context.Context, s *scope.Scope // Note: It is good enough to check that the resource version changed. Other controllers might have updated the // MachineDeployment as well, but the combination of the patch call above without a conflict and a changed resource // version here guarantees that we see the changes of our own update. - err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - key := client.ObjectKey{Namespace: currentMD.Object.GetNamespace(), Name: currentMD.Object.GetName()} - cachedMD := &clusterv1.MachineDeployment{} - if err := r.Client.Get(ctx, key, cachedMD); err != nil { - return false, err - } - return currentMD.Object.GetResourceVersion() != cachedMD.GetResourceVersion(), nil - }) - if err != nil { - return errors.Wrapf(err, "failed waiting for MachineDeployment %s to be updated in the cache after patch", klog.KObj(currentMD.Object)) + if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineDeployment update", currentMD.Object); err != nil { + return err } // We want to call both cleanup functions even if one of them fails to clean up as much as possible. @@ -1019,21 +990,7 @@ func (r *Reconciler) createMachinePool(ctx context.Context, s *scope.Scope, mp * // Wait until MachinePool is visible in the cache. // Note: We have to do this because otherwise using a cached client in current state could // miss a newly created MachinePool (because the cache might be stale). - err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - key := client.ObjectKey{Namespace: mp.Object.Namespace, Name: mp.Object.Name} - if err := r.Client.Get(ctx, key, &clusterv1.MachinePool{}); err != nil { - if apierrors.IsNotFound(err) { - return false, nil - } - return false, err - } - return true, nil - }) - if err != nil { - return errors.Wrapf(err, "failed waiting for MachinePool %s to be visible in the cache after create", mp.Object.Kind) - } - - return nil + return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachinePool creation", mp.Object) } // updateMachinePool updates a MachinePool. Also updates the corresponding objects if necessary. @@ -1094,16 +1051,8 @@ func (r *Reconciler) updateMachinePool(ctx context.Context, s *scope.Scope, mpTo // Note: It is good enough to check that the resource version changed. Other controllers might have updated the // MachinePool as well, but the combination of the patch call above without a conflict and a changed resource // version here guarantees that we see the changes of our own update. - err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { - key := client.ObjectKey{Namespace: currentMP.Object.GetNamespace(), Name: currentMP.Object.GetName()} - cachedMP := &clusterv1.MachinePool{} - if err := r.Client.Get(ctx, key, cachedMP); err != nil { - return false, err - } - return currentMP.Object.GetResourceVersion() != cachedMP.GetResourceVersion(), nil - }) - if err != nil { - return errors.Wrapf(err, "failed waiting for MachinePool %s to be updated in the cache after patch", klog.KObj(currentMP.Object)) + if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachinePool update", currentMP.Object); err != nil { + return err } // We want to call both cleanup functions even if one of them fails to clean up as much as possible. diff --git a/internal/util/client/client.go b/internal/util/client/client.go new file mode 100644 index 000000000000..856949d9fbfe --- /dev/null +++ b/internal/util/client/client.go @@ -0,0 +1,266 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package client provides utils for usage with the controller-runtime client. +package client + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +var ( + // waitForCacheTimeout is the timeout used when waiting for the cache to become up-to-date. + waitForCacheTimeout = 10 * time.Second + + // waitForCacheInterval is the timeout used when waiting for the cache to become up-to-date. + // This interval seems pretty low, but based on tests it's realistic that the cache is up-to-date + // that quickly. + waitForCacheInterval = 100 * time.Microsecond +) + +// WaitForCacheToBeUpToDate waits until the cache is up-to-date in the sense of that the cache contains +// all passed in objects with at least the passed in resourceVersion. +// This is done by retrieving objects from the cache via the client and then comparing resourceVersions. +// Note: This func will update the passed in objects while polling. +// Note: The generic parameter enforces that all objects have the same type. +func WaitForCacheToBeUpToDate[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error { + return waitFor(ctx, c, action, checkIfObjectUpToDate, objs...) +} + +// WaitForObjectsToBeDeletedFromTheCache waits until the cache is up-to-date in the sense of that the +// passed in objects have been either removed from the cache or they have a deletionTimestamp set. +// Note: This func will update the passed in objects while polling. +// Note: The generic parameter enforces that all objects have the same type. +func WaitForObjectsToBeDeletedFromTheCache[T client.Object](ctx context.Context, c client.Client, action string, objs ...T) error { + return waitFor(ctx, c, action, checkIfObjectDeleted, objs...) +} + +// checkIfObjectUpToDate checks if an object is up-to-date and returns an error if it is not. +func checkIfObjectUpToDate(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) { + if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil { + if apierrors.IsNotFound(err) { + // Object is not yet in the cache (retryable). + return true, err + } + // Unexpected error occurred (not retryable). + return false, err + } + + if desiredObj.MinimumResourceVersion == "" { + // Done, if MinimumResourceVersion is empty, as it is enough if the object exists in the cache. + // Note: This can happen when the ServerSidePatchHelper is used to create an object as the ServerSidePatchHelper + // does not update the object after Apply and accordingly resourceVersion remains empty. + return false, nil + } + + cmp, err := compareResourceVersion(desiredObj.Object.GetResourceVersion(), desiredObj.MinimumResourceVersion) + if err != nil { + // Unexpected error occurred: invalid resourceVersion (not retryable). + return false, errors.Wrapf(err, "%s: cannot compare with invalid resourceVersion: current: %s, expected to be >= %s", + klog.KObj(desiredObj.Object), desiredObj.Object.GetResourceVersion(), desiredObj.MinimumResourceVersion) + } + if cmp < 0 { + // resourceVersion < MinimumResourceVersion (retryable). + return true, errors.Errorf("%s: resourceVersion not yet up-to-date: current: %s, expected to be >= %s", + klog.KObj(desiredObj.Object), desiredObj.Object.GetResourceVersion(), desiredObj.MinimumResourceVersion) + } + + // Done, resourceVersion is new enough. + return false, nil +} + +func checkIfObjectDeleted(ctx context.Context, c client.Client, desiredObj desiredObject) (isErrorRetryable bool, err error) { + if err := c.Get(ctx, desiredObj.Key, desiredObj.Object); err != nil { + if apierrors.IsNotFound(err) { + // Done, object has been removed from the cache. + return false, nil + } + // Unexpected error occurred (not retryable). + return false, err + } + + if !desiredObj.Object.GetDeletionTimestamp().IsZero() { + // Done, object has deletionTimestamp set. + return false, nil + } + + // Object does not have deletionTimestamp set yet (retryable). + return true, fmt.Errorf("%s still exists", klog.KObj(desiredObj.Object)) +} + +type desiredObject struct { + Object client.Object + Key client.ObjectKey + MinimumResourceVersion string +} + +type checkFunc func(ctx context.Context, c client.Client, desiredObj desiredObject) (retryableErr bool, err error) + +func waitFor[T client.Object](ctx context.Context, c client.Client, action string, checkFunc checkFunc, objs ...T) error { + // Done, if there are no objects. + if len(objs) == 0 { + return nil + } + + // All objects have the same type, so we can just take the GVK of the first object. + objGVK, err := apiutil.GVKForObject(objs[0], c.Scheme()) + if err != nil { + return err + } + + log := ctrl.LoggerFrom(ctx) + + desiredObjects := make([]desiredObject, len(objs)) + for i, obj := range objs { + desiredObjects[i] = desiredObject{ + Object: obj, + Key: client.ObjectKeyFromObject(obj), + MinimumResourceVersion: obj.GetResourceVersion(), + } + } + + now := time.Now() + + var pollErrs []error + err = wait.PollUntilContextTimeout(ctx, waitForCacheInterval, waitForCacheTimeout, true, func(ctx context.Context) (bool, error) { + pollErrs = nil + + for _, desiredObj := range desiredObjects { + if isErrorRetryable, err := checkFunc(ctx, c, desiredObj); err != nil { + pollErrs = append(pollErrs, err) + if !isErrorRetryable { + // Stop polling, non-retryable error occurred. + return true, nil + } + } + } + + if len(pollErrs) > 0 { + // Continue polling, only retryable errors occurred. + return false, nil + } + + // Stop polling, all objects are up-to-date. + return true, nil + }) + + waitDuration := time.Since(now) + + if err != nil || len(pollErrs) > 0 { + waitDurationMetric.WithLabelValues(objGVK.Kind, "error").Observe(waitDuration.Seconds()) + + var errSuffix string + if err != nil { + if wait.Interrupted(err) { + errSuffix = fmt.Sprintf(": timed out after %s", waitForCacheTimeout) + } else { + errSuffix = fmt.Sprintf(": %s", err.Error()) + } + } + err := errors.Errorf("failed to wait for up-to-date %s objects in the cache after %s%s: %s", objGVK.Kind, action, errSuffix, kerrors.NewAggregate(pollErrs)) + log.Error(err, "Failed to wait for cache to be up-to-date", "kind", objGVK.Kind, "waitDuration", waitDuration) + return err + } + + waitDurationMetric.WithLabelValues(objGVK.Kind, "success").Observe(waitDuration.Seconds()) + + // Log on a high log-level if it took a long time for the cache to be up-to-date. + if waitDuration >= 1*time.Second { + log.Info("Successfully waited for cache to be up-to-date (>=1s)", "kind", objGVK.Kind, "waitDuration", waitDuration) + } else { + log.V(10).Info("Successfully waited for cache to be up-to-date", "kind", objGVK.Kind, "waitDuration", waitDuration) + } + + return nil +} + +type invalidResourceVersion struct { + rv string +} + +func (i invalidResourceVersion) Error() string { + return fmt.Sprintf("resource version is not well formed: %s", i.rv) +} + +// compareResourceVersion runs a comparison between two ResourceVersions. This +// only has semantic meaning when the comparison is done on two objects of the +// same resource. The return values are: +// +// -1: If RV a < RV b +// 0: If RV a == RV b +// +1: If RV a > RV b +// +// The function will return an error if the resource version is not a properly +// formatted positive integer, but has no restriction on length. A properly +// formatted integer will not contain leading zeros or non integer characters. +// Zero is also considered an invalid value as it is used as a special value in +// list/watch events and will never be a live resource version. +// TODO(controller-runtime-0.23): This code has been copied from +// https://github.com/kubernetes/kubernetes/blob/v1.35.0-alpha.2/staging/src/k8s.io/apimachinery/pkg/util/resourceversion/resourceversion.go +// and will be removed once we bump to CR v0.23 / k8s.io/apimachinery v1.35.0. +func compareResourceVersion(a, b string) (int, error) { + if !isWellFormed(a) { + return 0, invalidResourceVersion{rv: a} + } + if !isWellFormed(b) { + return 0, invalidResourceVersion{rv: b} + } + // both are well-formed integer strings with no leading zeros + aLen := len(a) + bLen := len(b) + switch { + case aLen < bLen: + // shorter is less + return -1, nil + case aLen > bLen: + // longer is greater + return 1, nil + default: + // equal-length compares lexically + return strings.Compare(a, b), nil + } +} + +func isWellFormed(s string) bool { + if len(s) == 0 { //nolint:gocritic // not going to modify code copied from upstream + return false + } + if s[0] == '0' { + return false + } + for i := range s { + if !isDigit(s[i]) { + return false + } + } + return true +} + +func isDigit(b byte) bool { + return b >= '0' && b <= '9' +} diff --git a/internal/util/client/client_test.go b/internal/util/client/client_test.go new file mode 100644 index 000000000000..9d9563aae995 --- /dev/null +++ b/internal/util/client/client_test.go @@ -0,0 +1,368 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package client provides utils for usage with the controller-runtime client. +package client + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/gomega" + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2" +) + +func Test_WaitForCacheToBeUpToDate(t *testing.T) { + // Modify timeout to speed up test + waitForCacheTimeout = 1 * time.Second + + tests := []struct { + name string + objs []client.Object + clientResponses map[client.ObjectKey][]client.Object + wantErr string + }{ + { + name: "no-op if no objects are passed in", + }, + { + name: "error if passed in objects have invalid resourceVersion", + objs: []client.Object{ + machine("machine-1", "invalidResourceVersion", nil), + machine("machine-2", "invalidResourceVersion", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{ + {Namespace: metav1.NamespaceDefault, Name: "machine-1"}: { + machine("machine-1", "1", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-2"}: { + machine("machine-2", "2", nil), + }, + }, + wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: " + + "default/machine-1: cannot compare with invalid resourceVersion: current: 1, expected to be >= invalidResourceVersion: resource version is not well formed: invalidResourceVersion", + }, + { + name: "error if objects from cache have invalid resourceVersion", + objs: []client.Object{ + machine("machine-1", "1", nil), + machine("machine-2", "2", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{ + {Namespace: metav1.NamespaceDefault, Name: "machine-1"}: { + machine("machine-1", "invalidResourceVersion", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-2"}: { + machine("machine-2", "invalidResourceVersion", nil), + }, + }, + wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: " + + "default/machine-1: cannot compare with invalid resourceVersion: current: invalidResourceVersion, expected to be >= 1: resource version is not well formed: invalidResourceVersion", + }, + { + name: "error if objects never show up in the cache", + objs: []client.Object{ + machine("machine-1", "1", nil), + machine("machine-2", "2", nil), + machine("machine-3", "3", nil), + machine("machine-4", "4", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{}, + wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: timed out after 1s: [" + + "machines.cluster.x-k8s.io \"machine-1\" not found, " + + "machines.cluster.x-k8s.io \"machine-2\" not found, " + + "machines.cluster.x-k8s.io \"machine-3\" not found, " + + "machines.cluster.x-k8s.io \"machine-4\" not found]", + }, + { + name: "success if objects are instantly up-to-date", + objs: []client.Object{ + machine("machine-1", "", nil), + machine("machine-2", "2", nil), + machine("machine-3", "3", nil), + machine("machine-4", "4", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{ + {Namespace: metav1.NamespaceDefault, Name: "machine-1"}: { + // For this object it's enough if it shows up, exact resourceVersion doesn't matter. + machine("machine-1", "5", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-2"}: { + machine("machine-2", "2", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-3"}: { + machine("machine-3", "3", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-4"}: { + // This object has an even newer resourceVersion. + machine("machine-4", "6", nil), + }, + }, + }, + { + name: "success if objects are up-to-date after a few tries", + objs: []client.Object{ + machine("machine-1", "", nil), + machine("machine-2", "10", nil), + machine("machine-3", "11", nil), + machine("machine-4", "12", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{ + {Namespace: metav1.NamespaceDefault, Name: "machine-1"}: { + // For this object it's enough if it shows up, exact resourceVersion doesn't matter. + machine("machine-1", "4", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-2"}: { + machine("machine-2", "1", nil), + machine("machine-2", "5", nil), + machine("machine-2", "10", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-3"}: { + machine("machine-3", "2", nil), + machine("machine-3", "3", nil), + machine("machine-3", "7", nil), + machine("machine-3", "11", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-4"}: { + machine("machine-4", "3", nil), + machine("machine-4", "6", nil), + machine("machine-4", "8", nil), + machine("machine-4", "13", nil), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + scheme := runtime.NewScheme() + _ = clusterv1.AddToScheme(scheme) + + callCounter := map[client.ObjectKey]int{} + fakeClient := interceptor.NewClient(fake.NewClientBuilder().WithScheme(scheme).Build(), interceptor.Funcs{ + Get: func(ctx context.Context, _ client.WithWatch, key client.ObjectKey, obj client.Object, _ ...client.GetOption) error { + if len(tt.clientResponses) == 0 || len(tt.clientResponses[key]) == 0 { + return apierrors.NewNotFound(schema.GroupResource{ + Group: clusterv1.GroupVersion.Group, + Resource: "machines", + }, key.Name) + } + + currentCall := callCounter[key] + currentCall = min(currentCall, len(tt.clientResponses[key])-1) + + // Write back the modified object so callers can access the patched object. + if err := scheme.Convert(tt.clientResponses[key][currentCall], obj, ctx); err != nil { + return errors.Wrapf(err, "unexpected error: failed to get") + } + + callCounter[key]++ + + return nil + }, + }) + + err := WaitForCacheToBeUpToDate(t.Context(), fakeClient, "Machine creation", tt.objs...) + if tt.wantErr != "" { + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(Equal(tt.wantErr)) + } else { + g.Expect(err).ToNot(HaveOccurred()) + } + }) + } +} + +func Test_WaitForObjectsToBeDeletedFromTheCache(t *testing.T) { + // Modify timeout to speed up test + waitForCacheTimeout = 1 * time.Second + + tests := []struct { + name string + objs []client.Object + clientResponses map[client.ObjectKey][]client.Object + wantErr string + }{ + { + name: "no-op if no objects are passed in", + }, + { + name: "success if objects are going away instantly (not found)", + objs: []client.Object{ + machine("machine-1", "", nil), + machine("machine-2", "2", nil), + machine("machine-3", "3", nil), + machine("machine-4", "4", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{}, + }, + { + name: "success if objects are going away instantly (deletionTimestamp)", + objs: []client.Object{ + machine("machine-1", "1", nil), + machine("machine-2", "2", nil), + machine("machine-3", "3", nil), + machine("machine-4", "4", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{ + {Namespace: metav1.NamespaceDefault, Name: "machine-1"}: { + machine("machine-1", "1", ptr.To(metav1.Now())), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-2"}: { + machine("machine-2", "2", ptr.To(metav1.Now())), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-3"}: { + machine("machine-3", "3", ptr.To(metav1.Now())), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-4"}: { + machine("machine-4", "4", ptr.To(metav1.Now())), + }, + }, + }, + { + name: "success if objects are going away after a few tries (deletionTimestamp)", + objs: []client.Object{ + machine("machine-1", "1", nil), + machine("machine-2", "2", nil), + machine("machine-3", "3", nil), + machine("machine-4", "4", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{ + {Namespace: metav1.NamespaceDefault, Name: "machine-1"}: { + machine("machine-1", "1", ptr.To(metav1.Now())), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-2"}: { + machine("machine-2", "2", nil), + machine("machine-2", "2", nil), + machine("machine-2", "5", ptr.To(metav1.Now())), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-3"}: { + machine("machine-3", "3", nil), + machine("machine-3", "3", nil), + machine("machine-3", "3", nil), + machine("machine-3", "6", ptr.To(metav1.Now())), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-4"}: { + machine("machine-4", "4", nil), + machine("machine-4", "4", nil), + machine("machine-4", "4", nil), + machine("machine-4", "7", ptr.To(metav1.Now())), + }, + }, + }, + { + name: "error if objects are not going away after a few tries", + objs: []client.Object{ + machine("machine-1", "1", nil), + machine("machine-2", "2", nil), + machine("machine-3", "3", nil), + machine("machine-4", "4", nil), + }, + clientResponses: map[client.ObjectKey][]client.Object{ + {Namespace: metav1.NamespaceDefault, Name: "machine-1"}: { + machine("machine-1", "1", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-2"}: { + machine("machine-2", "2", nil), + machine("machine-2", "2", nil), + machine("machine-2", "5", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-3"}: { + machine("machine-3", "3", nil), + machine("machine-3", "3", nil), + machine("machine-3", "3", nil), + machine("machine-3", "6", nil), + }, + {Namespace: metav1.NamespaceDefault, Name: "machine-4"}: { + machine("machine-4", "4", nil), + machine("machine-4", "4", nil), + machine("machine-4", "4", nil), + machine("machine-4", "7", nil), + }, + }, + wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine deletion: timed out after 1s: [" + + "default/machine-1 still exists, " + + "default/machine-2 still exists, " + + "default/machine-3 still exists, " + + "default/machine-4 still exists]", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + scheme := runtime.NewScheme() + _ = clusterv1.AddToScheme(scheme) + + callCounter := map[client.ObjectKey]int{} + fakeClient := interceptor.NewClient(fake.NewClientBuilder().WithScheme(scheme).Build(), interceptor.Funcs{ + Get: func(ctx context.Context, _ client.WithWatch, key client.ObjectKey, obj client.Object, _ ...client.GetOption) error { + if len(tt.clientResponses) == 0 || len(tt.clientResponses[key]) == 0 { + return apierrors.NewNotFound(schema.GroupResource{ + Group: clusterv1.GroupVersion.Group, + Resource: "machines", + }, key.Name) + } + + currentCall := callCounter[key] + currentCall = min(currentCall, len(tt.clientResponses[key])-1) + + // Write back the modified object so callers can access the patched object. + if err := scheme.Convert(tt.clientResponses[key][currentCall], obj, ctx); err != nil { + return errors.Wrapf(err, "unexpected error: failed to get") + } + + callCounter[key]++ + + return nil + }, + }) + + err := WaitForObjectsToBeDeletedFromTheCache(t.Context(), fakeClient, "Machine deletion", tt.objs...) + if tt.wantErr != "" { + g.Expect(err).To(HaveOccurred()) + g.Expect(err.Error()).To(Equal(tt.wantErr)) + } else { + g.Expect(err).ToNot(HaveOccurred()) + } + }) + } +} + +func machine(name, resourceVersion string, deletionTimestamp *metav1.Time) client.Object { + return &clusterv1.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: name, + ResourceVersion: resourceVersion, + DeletionTimestamp: deletionTimestamp, + }, + } +} diff --git a/internal/util/client/metrics.go b/internal/util/client/metrics.go new file mode 100644 index 000000000000..eaab7d4b6c4f --- /dev/null +++ b/internal/util/client/metrics.go @@ -0,0 +1,35 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "github.com/prometheus/client_golang/prometheus" + ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +func init() { + // Register the metrics at the controller-runtime metrics registry. + ctrlmetrics.Registry.MustRegister(waitDurationMetric) +} + +var ( + waitDurationMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "capi_client_cache_wait_duration_seconds", + Help: "Duration that we waited for the cache to be up-to-date in seconds, broken down by kind and status", + Buckets: []float64{0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, 0.001, 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0}, + }, []string{"kind", "status"}) +) From f89757304634102fd6c39171b315655b778da2b4 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Thu, 6 Nov 2025 15:41:22 +0100 Subject: [PATCH 2/2] Fix review findings --- internal/util/client/client.go | 27 +++++++++++++++++---------- internal/util/client/client_test.go | 27 +++++++++++++++++++++++---- internal/util/client/metrics.go | 2 +- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/internal/util/client/client.go b/internal/util/client/client.go index 856949d9fbfe..1f7b0298b4dd 100644 --- a/internal/util/client/client.go +++ b/internal/util/client/client.go @@ -25,6 +25,7 @@ import ( "github.com/pkg/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -34,13 +35,14 @@ import ( ) var ( - // waitForCacheTimeout is the timeout used when waiting for the cache to become up-to-date. - waitForCacheTimeout = 10 * time.Second - - // waitForCacheInterval is the timeout used when waiting for the cache to become up-to-date. - // This interval seems pretty low, but based on tests it's realistic that the cache is up-to-date - // that quickly. - waitForCacheInterval = 100 * time.Microsecond + // waitBackoff is the timeout used when waiting for the cache to become up-to-date. + // This adds up to ~ 10 seconds max wait duration. + waitBackoff = wait.Backoff{ + Duration: 25 * time.Microsecond, + Cap: 2 * time.Second, + Factor: 1.2, + Steps: 63, + } ) // WaitForCacheToBeUpToDate waits until the cache is up-to-date in the sense of that the cache contains @@ -127,10 +129,15 @@ func waitFor[T client.Object](ctx context.Context, c client.Client, action strin return nil } + var o any = objs[0] + if _, ok := o.(*unstructured.Unstructured); ok { + return errors.Errorf("failed to wait for up-to-date objects in the cache after %s: Unstructured is not supported", action) + } + // All objects have the same type, so we can just take the GVK of the first object. objGVK, err := apiutil.GVKForObject(objs[0], c.Scheme()) if err != nil { - return err + return errors.Wrapf(err, "failed to wait for up-to-date objects in the cache after %s", action) } log := ctrl.LoggerFrom(ctx) @@ -147,7 +154,7 @@ func waitFor[T client.Object](ctx context.Context, c client.Client, action strin now := time.Now() var pollErrs []error - err = wait.PollUntilContextTimeout(ctx, waitForCacheInterval, waitForCacheTimeout, true, func(ctx context.Context) (bool, error) { + err = wait.ExponentialBackoffWithContext(ctx, waitBackoff, func(ctx context.Context) (bool, error) { pollErrs = nil for _, desiredObj := range desiredObjects { @@ -177,7 +184,7 @@ func waitFor[T client.Object](ctx context.Context, c client.Client, action strin var errSuffix string if err != nil { if wait.Interrupted(err) { - errSuffix = fmt.Sprintf(": timed out after %s", waitForCacheTimeout) + errSuffix = ": timed out" } else { errSuffix = fmt.Sprintf(": %s", err.Error()) } diff --git a/internal/util/client/client_test.go b/internal/util/client/client_test.go index 9d9563aae995..da702bc55638 100644 --- a/internal/util/client/client_test.go +++ b/internal/util/client/client_test.go @@ -26,8 +26,10 @@ import ( "github.com/pkg/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -38,7 +40,12 @@ import ( func Test_WaitForCacheToBeUpToDate(t *testing.T) { // Modify timeout to speed up test - waitForCacheTimeout = 1 * time.Second + waitBackoff = wait.Backoff{ + Duration: 25 * time.Microsecond, + Cap: 2 * time.Second, + Factor: 1.2, + Steps: 5, + } tests := []struct { name string @@ -92,7 +99,7 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) { machine("machine-4", "4", nil), }, clientResponses: map[client.ObjectKey][]client.Object{}, - wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: timed out after 1s: [" + + wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine creation: timed out: [" + "machines.cluster.x-k8s.io \"machine-1\" not found, " + "machines.cluster.x-k8s.io \"machine-2\" not found, " + "machines.cluster.x-k8s.io \"machine-3\" not found, " + @@ -201,7 +208,12 @@ func Test_WaitForCacheToBeUpToDate(t *testing.T) { func Test_WaitForObjectsToBeDeletedFromTheCache(t *testing.T) { // Modify timeout to speed up test - waitForCacheTimeout = 1 * time.Second + waitBackoff = wait.Backoff{ + Duration: 25 * time.Microsecond, + Cap: 2 * time.Second, + Factor: 1.2, + Steps: 5, + } tests := []struct { name string @@ -212,6 +224,13 @@ func Test_WaitForObjectsToBeDeletedFromTheCache(t *testing.T) { { name: "no-op if no objects are passed in", }, + { + name: "error if Unstructured is used", + objs: []client.Object{ + &unstructured.Unstructured{}, + }, + wantErr: "failed to wait for up-to-date objects in the cache after Machine deletion: Unstructured is not supported", + }, { name: "success if objects are going away instantly (not found)", objs: []client.Object{ @@ -306,7 +325,7 @@ func Test_WaitForObjectsToBeDeletedFromTheCache(t *testing.T) { machine("machine-4", "7", nil), }, }, - wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine deletion: timed out after 1s: [" + + wantErr: "failed to wait for up-to-date Machine objects in the cache after Machine deletion: timed out: [" + "default/machine-1 still exists, " + "default/machine-2 still exists, " + "default/machine-3 still exists, " + diff --git a/internal/util/client/metrics.go b/internal/util/client/metrics.go index eaab7d4b6c4f..cb8c7c3e2753 100644 --- a/internal/util/client/metrics.go +++ b/internal/util/client/metrics.go @@ -30,6 +30,6 @@ var ( waitDurationMetric = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "capi_client_cache_wait_duration_seconds", Help: "Duration that we waited for the cache to be up-to-date in seconds, broken down by kind and status", - Buckets: []float64{0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, 0.001, 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0}, + Buckets: []float64{0.00001, 0.000025, 0.00005, 0.0001, 0.00025, 0.0005, 0.001, 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 2.5, 5, 10}, }, []string{"kind", "status"}) )