Skip to content

Commit a06e2f9

Browse files
committed
Introduce wait for cache utils
1 parent 043957b commit a06e2f9

File tree

9 files changed

+728
-216
lines changed

9 files changed

+728
-216
lines changed

.golangci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ linters:
178178
alias: ""
179179
- pkg: sigs.k8s.io/cluster-api/internal/topology/names
180180
alias: topologynames
181+
- pkg: sigs.k8s.io/cluster-api/internal/util/client
182+
alias: "clientutil"
181183
# CAPD
182184
- pkg: sigs.k8s.io/cluster-api/test/infrastructure/docker/api/v1alpha3
183185
alias: infrav1alpha3

controlplane/kubeadm/internal/controllers/inplace_trigger.go

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@ package controllers
1818

1919
import (
2020
"context"
21-
"time"
21+
"fmt"
2222

2323
"github.com/pkg/errors"
2424
corev1 "k8s.io/api/core/v1"
25-
"k8s.io/apimachinery/pkg/util/wait"
2625
"k8s.io/klog/v2"
2726
ctrl "sigs.k8s.io/controller-runtime"
2827
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -32,6 +31,7 @@ import (
3231
runtimehooksv1 "sigs.k8s.io/cluster-api/api/runtime/hooks/v1alpha1"
3332
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
3433
"sigs.k8s.io/cluster-api/internal/hooks"
34+
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
3535
"sigs.k8s.io/cluster-api/internal/util/ssa"
3636
)
3737

@@ -60,11 +60,8 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context
6060

6161
// Wait until the cache observed the Machine with UpdateInProgressAnnotation to ensure subsequent reconciles
6262
// will observe it as well and accordingly don't trigger another in-place update concurrently.
63-
if err := waitForCache(ctx, r.Client, machine, func(m *clusterv1.Machine) bool {
64-
_, annotationSet := m.Annotations[clusterv1.UpdateInProgressAnnotation]
65-
return annotationSet
66-
}); err != nil {
67-
return errors.Wrapf(err, "failed waiting for Machine %s to be updated in the cache after setting the %s annotation", klog.KObj(machine), clusterv1.UpdateInProgressAnnotation)
63+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, fmt.Sprintf("setting the %s annotation", clusterv1.UpdateInProgressAnnotation), machine); err != nil {
64+
return err
6865
}
6966
}
7067

@@ -141,13 +138,7 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context
141138

142139
// Wait until the cache observed the Machine with PendingHooksAnnotation to ensure subsequent reconciles
143140
// will observe it as well and won't repeatedly call triggerInPlaceUpdate.
144-
if err := waitForCache(ctx, r.Client, machine, func(m *clusterv1.Machine) bool {
145-
return hooks.IsPending(runtimehooksv1.UpdateMachine, m)
146-
}); err != nil {
147-
return errors.Wrapf(err, "failed waiting for Machine %s to be updated in the cache after marking the UpdateMachine hook as pending", klog.KObj(machine))
148-
}
149-
150-
return nil
141+
return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "marking the UpdateMachine hook as pending", desiredMachine)
151142
}
152143

153144
func (r *KubeadmControlPlaneReconciler) removeInitConfiguration(ctx context.Context, desiredKubeadmConfig *bootstrapv1.KubeadmConfig) error {
@@ -176,13 +167,3 @@ func (r *KubeadmControlPlaneReconciler) removeInitConfiguration(ctx context.Cont
176167
}
177168
return nil
178169
}
179-
180-
func waitForCache(ctx context.Context, c client.Client, machine *clusterv1.Machine, f func(m *clusterv1.Machine) bool) error {
181-
return wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
182-
m := &clusterv1.Machine{}
183-
if err := c.Get(ctx, client.ObjectKeyFromObject(machine), m); err != nil {
184-
return false, err
185-
}
186-
return f(m), nil
187-
})
188-
}

internal/controllers/machinedeployment/machinedeployment_controller.go

Lines changed: 25 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@ import (
2020
"context"
2121
"fmt"
2222
"strings"
23-
"time"
2423

2524
"github.com/pkg/errors"
2625
corev1 "k8s.io/api/core/v1"
2726
apierrors "k8s.io/apimachinery/pkg/api/errors"
2827
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2928
"k8s.io/apimachinery/pkg/labels"
3029
kerrors "k8s.io/apimachinery/pkg/util/errors"
31-
"k8s.io/apimachinery/pkg/util/wait"
3230
"k8s.io/client-go/tools/record"
3331
"k8s.io/klog/v2"
3432
"k8s.io/utils/ptr"
@@ -41,6 +39,7 @@ import (
4139

4240
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
4341
"sigs.k8s.io/cluster-api/controllers/external"
42+
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
4443
"sigs.k8s.io/cluster-api/internal/util/ssa"
4544
"sigs.k8s.io/cluster-api/util"
4645
"sigs.k8s.io/cluster-api/util/collections"
@@ -349,24 +348,10 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
349348
// Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of
350349
// the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of
351350
// a duplicate MachineSet.
352-
var pollErrors []error
353-
tmpMS := &clusterv1.MachineSet{}
354-
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
355-
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ms), tmpMS); err != nil {
356-
// Do not return error here. Continue to poll even if we hit an error
357-
// so that we avoid existing because of transient errors like network flakes.
358-
// Capture all the errors and return the aggregate error if the poll fails eventually.
359-
pollErrors = append(pollErrors, err)
360-
return false, nil
361-
}
362-
return true, nil
363-
}); err != nil {
364-
return errors.Wrapf(kerrors.NewAggregate(pollErrors), "failed to get the MachineSet %s after creation", klog.KObj(ms))
351+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet creation", ms); err != nil {
352+
return err
365353
}
366354

367-
// Report back creation timestamp, because legacy scale func uses it to sort machines.
368-
// TODO(in-place): drop this as soon as handling of MD with paused rollouts is moved into rollout planner (see scale in machinedeployment_sync.go).
369-
ms.CreationTimestamp = tmpMS.CreationTimestamp
370355
continue
371356
}
372357

@@ -387,17 +372,24 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
387372

388373
newReplicas := ptr.Deref(ms.Spec.Replicas, 0)
389374
if newReplicas < originalReplicas {
390-
changes = append(changes, "replicas", newReplicas)
391-
log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas), changes...)
375+
changes = append(changes, fmt.Sprintf("replicas %d", newReplicas))
376+
log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas), "diff", strings.Join(changes, ","))
392377
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled down MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
393378
}
394379
if newReplicas > originalReplicas {
395-
changes = append(changes, "replicas", newReplicas)
396-
log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas), changes...)
380+
changes = append(changes, fmt.Sprintf("replicas %d", newReplicas))
381+
log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas), "diff", strings.Join(changes, ","))
397382
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled up MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
398383
}
399384
if newReplicas == originalReplicas && len(changes) > 0 {
400-
log.Info(fmt.Sprintf("MachineSet %s updated", ms.Name), changes...)
385+
log.Info(fmt.Sprintf("MachineSet %s updated", ms.Name), "diff", strings.Join(changes, ","))
386+
}
387+
388+
// Only wait for cache if the object was changed.
389+
if originalMS.ResourceVersion != ms.ResourceVersion {
390+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet update", ms); err != nil {
391+
return err
392+
}
401393
}
402394
}
403395

@@ -412,37 +404,37 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
412404
return nil
413405
}
414406

415-
func getAnnotationChanges(originalMS *clusterv1.MachineSet, ms *clusterv1.MachineSet) []any {
416-
changes := []any{}
407+
func getAnnotationChanges(originalMS *clusterv1.MachineSet, ms *clusterv1.MachineSet) []string {
408+
changes := []string{}
417409
if originalMS.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation] != ms.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation] {
418410
if value, ok := ms.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation]; ok {
419-
changes = append(changes, clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, value)
411+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, value))
420412
} else {
421-
changes = append(changes, clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, "(annotation removed)")
413+
changes = append(changes, fmt.Sprintf("%s removed", clusterv1.MachineSetMoveMachinesToMachineSetAnnotation))
422414
}
423415
}
424416

425417
if originalMS.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation] != ms.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation] {
426418
if value, ok := ms.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation]; ok {
427-
changes = append(changes, clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, value)
419+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, value))
428420
} else {
429-
changes = append(changes, clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, "(annotation removed)")
421+
changes = append(changes, fmt.Sprintf("%s removed", clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation))
430422
}
431423
}
432424

433425
if originalMS.Annotations[clusterv1.AcknowledgedMoveAnnotation] != ms.Annotations[clusterv1.AcknowledgedMoveAnnotation] {
434426
if value, ok := ms.Annotations[clusterv1.AcknowledgedMoveAnnotation]; ok {
435-
changes = append(changes, clusterv1.AcknowledgedMoveAnnotation, value)
427+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.AcknowledgedMoveAnnotation, value))
436428
} else {
437-
changes = append(changes, clusterv1.AcknowledgedMoveAnnotation, "(annotation removed)")
429+
changes = append(changes, fmt.Sprintf("%s removed", clusterv1.AcknowledgedMoveAnnotation))
438430
}
439431
}
440432

441433
if originalMS.Annotations[clusterv1.DisableMachineCreateAnnotation] != ms.Annotations[clusterv1.DisableMachineCreateAnnotation] {
442434
if value, ok := ms.Annotations[clusterv1.DisableMachineCreateAnnotation]; ok {
443-
changes = append(changes, clusterv1.DisableMachineCreateAnnotation, value)
435+
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.DisableMachineCreateAnnotation, value))
444436
} else {
445-
changes = append(changes, clusterv1.DisableMachineCreateAnnotation, "(annotation removed)")
437+
changes = append(changes, fmt.Sprintf("%s removed", clusterv1.DisableMachineCreateAnnotation))
446438
}
447439
}
448440
return changes

internal/controllers/machineset/machineset_controller.go

Lines changed: 5 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
kerrors "k8s.io/apimachinery/pkg/util/errors"
3535
"k8s.io/apimachinery/pkg/util/intstr"
3636
"k8s.io/apimachinery/pkg/util/sets"
37-
"k8s.io/apimachinery/pkg/util/wait"
3837
"k8s.io/client-go/tools/record"
3938
"k8s.io/klog/v2"
4039
"k8s.io/utils/ptr"
@@ -54,6 +53,7 @@ import (
5453
"sigs.k8s.io/cluster-api/internal/controllers/machine"
5554
"sigs.k8s.io/cluster-api/internal/hooks"
5655
topologynames "sigs.k8s.io/cluster-api/internal/topology/names"
56+
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
5757
"sigs.k8s.io/cluster-api/internal/util/inplace"
5858
"sigs.k8s.io/cluster-api/internal/util/ssa"
5959
"sigs.k8s.io/cluster-api/util"
@@ -72,13 +72,6 @@ import (
7272
var (
7373
// machineSetKind contains the schema.GroupVersionKind for the MachineSet type.
7474
machineSetKind = clusterv1.GroupVersion.WithKind("MachineSet")
75-
76-
// stateConfirmationTimeout is the amount of time allowed to wait for desired state.
77-
stateConfirmationTimeout = 10 * time.Second
78-
79-
// stateConfirmationInterval is the amount of time between polling for the desired state.
80-
// The polling is against a local memory cache.
81-
stateConfirmationInterval = 100 * time.Millisecond
8275
)
8376

8477
const (
@@ -421,7 +414,7 @@ func (r *Reconciler) triggerInPlaceUpdate(ctx context.Context, s *scope) (ctrl.R
421414

422415
// Wait until the cache observed the Machine with PendingHooksAnnotation to ensure subsequent reconciles
423416
// will observe it as well and won't repeatedly call the logic to trigger in-place update.
424-
if err := r.waitForMachinesInPlaceUpdateStarted(ctx, machinesTriggeredInPlace); err != nil {
417+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "starting in-place updates", machinesTriggeredInPlace...); err != nil {
425418
errs = append(errs, err)
426419
}
427420
if len(errs) > 0 {
@@ -900,7 +893,7 @@ func (r *Reconciler) createMachines(ctx context.Context, s *scope, machinesToAdd
900893
}
901894

902895
// Wait for cache update to ensure following reconcile gets latest change.
903-
return ctrl.Result{}, r.waitForMachinesCreation(ctx, machinesAdded)
896+
return ctrl.Result{}, clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Machine creation", machinesAdded...)
904897
}
905898

906899
func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDelete int) (ctrl.Result, error) {
@@ -948,7 +941,7 @@ func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDel
948941
}
949942

950943
// Wait for cache update to ensure following reconcile gets latest change.
951-
if err := r.waitForMachinesDeletion(ctx, machinesDeleted); err != nil {
944+
if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion", machinesDeleted...); err != nil {
952945
errs = append(errs, err)
953946
}
954947
if len(errs) > 0 {
@@ -1061,7 +1054,7 @@ func (r *Reconciler) startMoveMachines(ctx context.Context, s *scope, targetMSNa
10611054
}
10621055

10631056
// Wait for cache update to ensure following reconcile gets latest change.
1064-
if err := r.waitForMachinesStartedMove(ctx, machinesMoved); err != nil {
1057+
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "moving Machines", machinesMoved...); err != nil {
10651058
errs = append(errs, err)
10661059
}
10671060
if len(errs) > 0 {
@@ -1242,92 +1235,6 @@ func (r *Reconciler) adoptOrphan(ctx context.Context, machineSet *clusterv1.Mach
12421235
return r.Client.Patch(ctx, machine, patch)
12431236
}
12441237

1245-
func (r *Reconciler) waitForMachinesCreation(ctx context.Context, machines []*clusterv1.Machine) error {
1246-
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
1247-
for _, machine := range machines {
1248-
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
1249-
if err := r.Client.Get(ctx, key, &clusterv1.Machine{}); err != nil {
1250-
if apierrors.IsNotFound(err) {
1251-
return false, nil
1252-
}
1253-
return false, err
1254-
}
1255-
}
1256-
return true, nil
1257-
})
1258-
1259-
if pollErr != nil {
1260-
return errors.Wrap(pollErr, "failed waiting for Machines to be created")
1261-
}
1262-
return nil
1263-
}
1264-
1265-
func (r *Reconciler) waitForMachinesDeletion(ctx context.Context, machines []*clusterv1.Machine) error {
1266-
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
1267-
for _, machine := range machines {
1268-
m := &clusterv1.Machine{}
1269-
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
1270-
if err := r.Client.Get(ctx, key, m); err != nil {
1271-
if apierrors.IsNotFound(err) {
1272-
continue
1273-
}
1274-
return false, err
1275-
}
1276-
if m.DeletionTimestamp.IsZero() {
1277-
return false, nil
1278-
}
1279-
}
1280-
return true, nil
1281-
})
1282-
1283-
if pollErr != nil {
1284-
return errors.Wrap(pollErr, "failed waiting for Machines to be deleted")
1285-
}
1286-
return nil
1287-
}
1288-
1289-
func (r *Reconciler) waitForMachinesStartedMove(ctx context.Context, machines []*clusterv1.Machine) error {
1290-
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
1291-
for _, machine := range machines {
1292-
m := &clusterv1.Machine{}
1293-
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
1294-
if err := r.Client.Get(ctx, key, m); err != nil {
1295-
return false, err
1296-
}
1297-
if _, annotationSet := m.Annotations[clusterv1.UpdateInProgressAnnotation]; !annotationSet {
1298-
return false, nil
1299-
}
1300-
}
1301-
return true, nil
1302-
})
1303-
1304-
if pollErr != nil {
1305-
return errors.Wrap(pollErr, "failed waiting for Machines to start move")
1306-
}
1307-
return nil
1308-
}
1309-
1310-
func (r *Reconciler) waitForMachinesInPlaceUpdateStarted(ctx context.Context, machines []*clusterv1.Machine) error {
1311-
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
1312-
for _, machine := range machines {
1313-
m := &clusterv1.Machine{}
1314-
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
1315-
if err := r.Client.Get(ctx, key, m); err != nil {
1316-
return false, err
1317-
}
1318-
if !hooks.IsPending(runtimehooksv1.UpdateMachine, m) {
1319-
return false, nil
1320-
}
1321-
}
1322-
return true, nil
1323-
})
1324-
1325-
if pollErr != nil {
1326-
return errors.Wrap(pollErr, "failed waiting for Machines to complete move")
1327-
}
1328-
return nil
1329-
}
1330-
13311238
// MachineToMachineSets is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation
13321239
// for MachineSets that might adopt an orphaned Machine.
13331240
func (r *Reconciler) MachineToMachineSets(ctx context.Context, o client.Object) []ctrl.Request {

internal/controllers/machineset/machineset_controller_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2679,6 +2679,18 @@ func TestMachineSetReconciler_createMachines(t *testing.T) {
26792679
infraTmpl,
26802680
).WithInterceptorFuncs(tt.interceptorFuncs(&i)).Build()
26812681

2682+
// TODO(controller-runtime-0.23): This workaround is needed because controller-runtime v0.22 does not set resourceVersion correctly with SSA (fixed with v0.23).
2683+
fakeClient = interceptor.NewClient(fakeClient, interceptor.Funcs{
2684+
Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error {
2685+
clientObject, ok := obj.(client.Object)
2686+
if !ok {
2687+
return errors.Errorf("error during object creation: unexpected ApplyConfiguration")
2688+
}
2689+
clientObject.SetResourceVersion("1")
2690+
return c.Apply(ctx, obj, opts...)
2691+
},
2692+
})
2693+
26822694
r := &Reconciler{
26832695
Client: fakeClient,
26842696
recorder: record.NewFakeRecorder(32),

0 commit comments

Comments
 (0)