Skip to content

Commit 0395f91

Browse files
committed
Introduce wait for cache utils
1 parent e1f8f82 commit 0395f91

File tree

7 files changed

+277
-216
lines changed

7 files changed

+277
-216
lines changed

.golangci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ linters:
178178
alias: ""
179179
- pkg: sigs.k8s.io/cluster-api/internal/topology/names
180180
alias: topologynames
181+
- pkg: sigs.k8s.io/cluster-api/internal/util/client
182+
alias: "clientutil"
181183
# CAPD
182184
- pkg: sigs.k8s.io/cluster-api/test/infrastructure/docker/api/v1alpha3
183185
alias: infrav1alpha3

controlplane/kubeadm/internal/controllers/inplace_trigger.go

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@ package controllers
1818

1919
import (
2020
"context"
21-
"time"
21+
"fmt"
2222

2323
"github.com/pkg/errors"
2424
corev1 "k8s.io/api/core/v1"
25-
"k8s.io/apimachinery/pkg/util/wait"
2625
"k8s.io/klog/v2"
2726
ctrl "sigs.k8s.io/controller-runtime"
2827
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -32,6 +31,7 @@ import (
3231
runtimehooksv1 "sigs.k8s.io/cluster-api/api/runtime/hooks/v1alpha1"
3332
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
3433
"sigs.k8s.io/cluster-api/internal/hooks"
34+
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
3535
"sigs.k8s.io/cluster-api/internal/util/ssa"
3636
)
3737

@@ -60,11 +60,8 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context
6060

6161
// Wait until the cache observed the Machine with UpdateInProgressAnnotation to ensure subsequent reconciles
6262
// will observe it as well and accordingly don't trigger another in-place update concurrently.
63-
if err := waitForCache(ctx, r.Client, machine, func(m *clusterv1.Machine) bool {
64-
_, annotationSet := m.Annotations[clusterv1.UpdateInProgressAnnotation]
65-
return annotationSet
66-
}); err != nil {
67-
return errors.Wrapf(err, "failed waiting for Machine %s to be updated in the cache after setting the %s annotation", klog.KObj(machine), clusterv1.UpdateInProgressAnnotation)
63+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, fmt.Sprintf("setting the %s annotation", clusterv1.UpdateInProgressAnnotation), machine); err != nil {
64+
return err
6865
}
6966
}
7067

@@ -141,13 +138,7 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context
141138

142139
// Wait until the cache observed the Machine with PendingHooksAnnotation to ensure subsequent reconciles
143140
// will observe it as well and won't repeatedly call triggerInPlaceUpdate.
144-
if err := waitForCache(ctx, r.Client, machine, func(m *clusterv1.Machine) bool {
145-
return hooks.IsPending(runtimehooksv1.UpdateMachine, m)
146-
}); err != nil {
147-
return errors.Wrapf(err, "failed waiting for Machine %s to be updated in the cache after marking the UpdateMachine hook as pending", klog.KObj(machine))
148-
}
149-
150-
return nil
141+
return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "marking the UpdateMachine hook as pending", desiredMachine)
151142
}
152143

153144
func (r *KubeadmControlPlaneReconciler) removeInitConfiguration(ctx context.Context, desiredKubeadmConfig *bootstrapv1.KubeadmConfig) error {
@@ -176,13 +167,3 @@ func (r *KubeadmControlPlaneReconciler) removeInitConfiguration(ctx context.Cont
176167
}
177168
return nil
178169
}
179-
180-
func waitForCache(ctx context.Context, c client.Client, machine *clusterv1.Machine, f func(m *clusterv1.Machine) bool) error {
181-
return wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
182-
m := &clusterv1.Machine{}
183-
if err := c.Get(ctx, client.ObjectKeyFromObject(machine), m); err != nil {
184-
return false, err
185-
}
186-
return f(m), nil
187-
})
188-
}

internal/controllers/machinedeployment/machinedeployment_controller.go

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@ import (
2020
"context"
2121
"fmt"
2222
"strings"
23-
"time"
2423

2524
"github.com/pkg/errors"
2625
corev1 "k8s.io/api/core/v1"
2726
apierrors "k8s.io/apimachinery/pkg/api/errors"
2827
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2928
"k8s.io/apimachinery/pkg/labels"
3029
kerrors "k8s.io/apimachinery/pkg/util/errors"
31-
"k8s.io/apimachinery/pkg/util/wait"
3230
"k8s.io/client-go/tools/record"
3331
"k8s.io/klog/v2"
3432
"k8s.io/utils/ptr"
@@ -41,6 +39,7 @@ import (
4139

4240
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
4341
"sigs.k8s.io/cluster-api/controllers/external"
42+
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
4443
"sigs.k8s.io/cluster-api/internal/util/ssa"
4544
"sigs.k8s.io/cluster-api/util"
4645
"sigs.k8s.io/cluster-api/util/collections"
@@ -349,24 +348,10 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
349348
// Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of
350349
// the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of
351350
// a duplicate MachineSet.
352-
var pollErrors []error
353-
tmpMS := &clusterv1.MachineSet{}
354-
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
355-
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ms), tmpMS); err != nil {
356-
// Do not return error here. Continue to poll even if we hit an error
357-
// so that we avoid existing because of transient errors like network flakes.
358-
// Capture all the errors and return the aggregate error if the poll fails eventually.
359-
pollErrors = append(pollErrors, err)
360-
return false, nil
361-
}
362-
return true, nil
363-
}); err != nil {
364-
return errors.Wrapf(kerrors.NewAggregate(pollErrors), "failed to get the MachineSet %s after creation", klog.KObj(ms))
351+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet creation", ms); err != nil {
352+
return err
365353
}
366354

367-
// Report back creation timestamp, because legacy scale func uses it to sort machines.
368-
// TODO(in-place): drop this as soon as handling of MD with paused rollouts is moved into rollout planner (see scale in machinedeployment_sync.go).
369-
ms.CreationTimestamp = tmpMS.CreationTimestamp
370355
continue
371356
}
372357

@@ -387,17 +372,24 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
387372

388373
newReplicas := ptr.Deref(ms.Spec.Replicas, 0)
389374
if newReplicas < originalReplicas {
390-
changes = append(changes, "replicas", newReplicas)
391-
log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas), changes...)
375+
changes = append(changes, fmt.Sprintf("replicas: %d", newReplicas))
376+
log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas), "diff", strings.Join(changes, ","))
392377
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled down MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
393378
}
394379
if newReplicas > originalReplicas {
395-
changes = append(changes, "replicas", newReplicas)
396-
log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas), changes...)
380+
changes = append(changes, fmt.Sprintf("replicas: %d", newReplicas))
381+
log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas), "diff", strings.Join(changes, ","))
397382
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled up MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
398383
}
399384
if newReplicas == originalReplicas && len(changes) > 0 {
400-
log.Info(fmt.Sprintf("MachineSet %s updated", ms.Name), changes...)
385+
log.Info(fmt.Sprintf("MachineSet %s updated", ms.Name), "diff", strings.Join(changes, ","))
386+
}
387+
388+
// Only wait for cache if the object was changed.
389+
if originalMS.ResourceVersion != ms.ResourceVersion {
390+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet update", ms); err != nil {
391+
return err
392+
}
401393
}
402394
}
403395

@@ -412,37 +404,37 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
412404
return nil
413405
}
414406

415-
func getAnnotationChanges(originalMS *clusterv1.MachineSet, ms *clusterv1.MachineSet) []any {
416-
changes := []any{}
407+
func getAnnotationChanges(originalMS *clusterv1.MachineSet, ms *clusterv1.MachineSet) []string {
408+
changes := []string{}
417409
if originalMS.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation] != ms.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation] {
418410
if value, ok := ms.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation]; ok {
419-
changes = append(changes, clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, value)
411+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, value))
420412
} else {
421-
changes = append(changes, clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, "(annotation removed)")
413+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, "(annotation removed)"))
422414
}
423415
}
424416

425417
if originalMS.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation] != ms.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation] {
426418
if value, ok := ms.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation]; ok {
427-
changes = append(changes, clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, value)
419+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, value))
428420
} else {
429-
changes = append(changes, clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, "(annotation removed)")
421+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, "(annotation removed)"))
430422
}
431423
}
432424

433425
if originalMS.Annotations[clusterv1.AcknowledgedMoveAnnotation] != ms.Annotations[clusterv1.AcknowledgedMoveAnnotation] {
434426
if value, ok := ms.Annotations[clusterv1.AcknowledgedMoveAnnotation]; ok {
435-
changes = append(changes, clusterv1.AcknowledgedMoveAnnotation, value)
427+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.AcknowledgedMoveAnnotation, value))
436428
} else {
437-
changes = append(changes, clusterv1.AcknowledgedMoveAnnotation, "(annotation removed)")
429+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.AcknowledgedMoveAnnotation, "(annotation removed)"))
438430
}
439431
}
440432

441433
if originalMS.Annotations[clusterv1.DisableMachineCreateAnnotation] != ms.Annotations[clusterv1.DisableMachineCreateAnnotation] {
442434
if value, ok := ms.Annotations[clusterv1.DisableMachineCreateAnnotation]; ok {
443-
changes = append(changes, clusterv1.DisableMachineCreateAnnotation, value)
435+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.DisableMachineCreateAnnotation, value))
444436
} else {
445-
changes = append(changes, clusterv1.DisableMachineCreateAnnotation, "(annotation removed)")
437+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.DisableMachineCreateAnnotation, "(annotation removed)"))
446438
}
447439
}
448440
return changes

internal/controllers/machineset/machineset_controller.go

Lines changed: 5 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
kerrors "k8s.io/apimachinery/pkg/util/errors"
3535
"k8s.io/apimachinery/pkg/util/intstr"
3636
"k8s.io/apimachinery/pkg/util/sets"
37-
"k8s.io/apimachinery/pkg/util/wait"
3837
"k8s.io/client-go/tools/record"
3938
"k8s.io/klog/v2"
4039
"k8s.io/utils/ptr"
@@ -55,6 +54,7 @@ import (
5554
"sigs.k8s.io/cluster-api/internal/controllers/machinedeployment/mdutil"
5655
"sigs.k8s.io/cluster-api/internal/hooks"
5756
topologynames "sigs.k8s.io/cluster-api/internal/topology/names"
57+
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
5858
"sigs.k8s.io/cluster-api/internal/util/ssa"
5959
"sigs.k8s.io/cluster-api/util"
6060
"sigs.k8s.io/cluster-api/util/annotations"
@@ -72,13 +72,6 @@ import (
7272
var (
7373
// machineSetKind contains the schema.GroupVersionKind for the MachineSet type.
7474
machineSetKind = clusterv1.GroupVersion.WithKind("MachineSet")
75-
76-
// stateConfirmationTimeout is the amount of time allowed to wait for desired state.
77-
stateConfirmationTimeout = 10 * time.Second
78-
79-
// stateConfirmationInterval is the amount of time between polling for the desired state.
80-
// The polling is against a local memory cache.
81-
stateConfirmationInterval = 100 * time.Millisecond
8275
)
8376

8477
const (
@@ -421,7 +414,7 @@ func (r *Reconciler) triggerInPlaceUpdate(ctx context.Context, s *scope) (ctrl.R
421414

422415
// Wait until the cache observed the Machine with PendingHooksAnnotation to ensure subsequent reconciles
423416
// will observe it as well and won't repeatedly call the logic to trigger in-place update.
424-
if err := r.waitForMachinesInPlaceUpdateStarted(ctx, machinesTriggeredInPlace); err != nil {
417+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "starting in-place update", machinesTriggeredInPlace...); err != nil {
425418
errs = append(errs, err)
426419
}
427420
if len(errs) > 0 {
@@ -976,7 +969,7 @@ func (r *Reconciler) createMachines(ctx context.Context, s *scope, machinesToAdd
976969
}
977970

978971
// Wait for cache update to ensure following reconcile gets latest change.
979-
return ctrl.Result{}, r.waitForMachinesCreation(ctx, machinesAdded)
972+
return ctrl.Result{}, clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Machine creation", machinesAdded...)
980973
}
981974

982975
func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDelete int) (ctrl.Result, error) {
@@ -1024,7 +1017,7 @@ func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDel
10241017
}
10251018

10261019
// Wait for cache update to ensure following reconcile gets latest change.
1027-
if err := r.waitForMachinesDeletion(ctx, machinesDeleted); err != nil {
1020+
if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion", machinesDeleted...); err != nil {
10281021
errs = append(errs, err)
10291022
}
10301023
if len(errs) > 0 {
@@ -1137,7 +1130,7 @@ func (r *Reconciler) startMoveMachines(ctx context.Context, s *scope, targetMSNa
11371130
}
11381131

11391132
// Wait for cache update to ensure following reconcile gets latest change.
1140-
if err := r.waitForMachinesStartedMove(ctx, machinesMoved); err != nil {
1133+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "moving the Machine", machinesMoved...); err != nil {
11411134
errs = append(errs, err)
11421135
}
11431136
if len(errs) > 0 {
@@ -1318,92 +1311,6 @@ func (r *Reconciler) adoptOrphan(ctx context.Context, machineSet *clusterv1.Mach
13181311
return r.Client.Patch(ctx, machine, patch)
13191312
}
13201313

1321-
func (r *Reconciler) waitForMachinesCreation(ctx context.Context, machines []*clusterv1.Machine) error {
1322-
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
1323-
for _, machine := range machines {
1324-
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
1325-
if err := r.Client.Get(ctx, key, &clusterv1.Machine{}); err != nil {
1326-
if apierrors.IsNotFound(err) {
1327-
return false, nil
1328-
}
1329-
return false, err
1330-
}
1331-
}
1332-
return true, nil
1333-
})
1334-
1335-
if pollErr != nil {
1336-
return errors.Wrap(pollErr, "failed waiting for Machines to be created")
1337-
}
1338-
return nil
1339-
}
1340-
1341-
func (r *Reconciler) waitForMachinesDeletion(ctx context.Context, machines []*clusterv1.Machine) error {
1342-
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
1343-
for _, machine := range machines {
1344-
m := &clusterv1.Machine{}
1345-
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
1346-
if err := r.Client.Get(ctx, key, m); err != nil {
1347-
if apierrors.IsNotFound(err) {
1348-
continue
1349-
}
1350-
return false, err
1351-
}
1352-
if m.DeletionTimestamp.IsZero() {
1353-
return false, nil
1354-
}
1355-
}
1356-
return true, nil
1357-
})
1358-
1359-
if pollErr != nil {
1360-
return errors.Wrap(pollErr, "failed waiting for Machines to be deleted")
1361-
}
1362-
return nil
1363-
}
1364-
1365-
func (r *Reconciler) waitForMachinesStartedMove(ctx context.Context, machines []*clusterv1.Machine) error {
1366-
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
1367-
for _, machine := range machines {
1368-
m := &clusterv1.Machine{}
1369-
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
1370-
if err := r.Client.Get(ctx, key, m); err != nil {
1371-
return false, err
1372-
}
1373-
if _, annotationSet := m.Annotations[clusterv1.UpdateInProgressAnnotation]; !annotationSet {
1374-
return false, nil
1375-
}
1376-
}
1377-
return true, nil
1378-
})
1379-
1380-
if pollErr != nil {
1381-
return errors.Wrap(pollErr, "failed waiting for Machines to start move")
1382-
}
1383-
return nil
1384-
}
1385-
1386-
func (r *Reconciler) waitForMachinesInPlaceUpdateStarted(ctx context.Context, machines []*clusterv1.Machine) error {
1387-
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
1388-
for _, machine := range machines {
1389-
m := &clusterv1.Machine{}
1390-
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
1391-
if err := r.Client.Get(ctx, key, m); err != nil {
1392-
return false, err
1393-
}
1394-
if !hooks.IsPending(runtimehooksv1.UpdateMachine, m) {
1395-
return false, nil
1396-
}
1397-
}
1398-
return true, nil
1399-
})
1400-
1401-
if pollErr != nil {
1402-
return errors.Wrap(pollErr, "failed waiting for Machines to complete move")
1403-
}
1404-
return nil
1405-
}
1406-
14071314
// MachineToMachineSets is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation
14081315
// for MachineSets that might adopt an orphaned Machine.
14091316
func (r *Reconciler) MachineToMachineSets(ctx context.Context, o client.Object) []ctrl.Request {

internal/controllers/machineset/machineset_controller_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2681,6 +2681,18 @@ func TestMachineSetReconciler_createMachines(t *testing.T) {
26812681
infraTmpl,
26822682
).WithInterceptorFuncs(tt.interceptorFuncs(&i)).Build()
26832683

2684+
// TODO(controller-runtime-0.23): This workaround is needed because controller-runtime v0.22 does not set resourceVersion correctly with SSA (fixed with v0.23).
2685+
fakeClient = interceptor.NewClient(fakeClient, interceptor.Funcs{
2686+
Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error {
2687+
clientObject, ok := obj.(client.Object)
2688+
if !ok {
2689+
return errors.Errorf("error during object creation: unexpected ApplyConfiguration")
2690+
}
2691+
clientObject.SetResourceVersion("1")
2692+
return c.Apply(ctx, obj, opts...)
2693+
},
2694+
})
2695+
26842696
r := &Reconciler{
26852697
Client: fakeClient,
26862698
recorder: record.NewFakeRecorder(32),

0 commit comments

Comments
 (0)