Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ linters:
alias: ""
- pkg: sigs.k8s.io/cluster-api/internal/topology/names
alias: topologynames
- pkg: sigs.k8s.io/cluster-api/internal/util/client
alias: "clientutil"
# CAPD
- pkg: sigs.k8s.io/cluster-api/test/infrastructure/docker/api/v1alpha3
alias: infrav1alpha3
Expand Down
29 changes: 5 additions & 24 deletions controlplane/kubeadm/internal/controllers/inplace_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package controllers

import (
"context"
"time"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -32,6 +31,7 @@ import (
runtimehooksv1 "sigs.k8s.io/cluster-api/api/runtime/hooks/v1alpha1"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/internal/hooks"
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
"sigs.k8s.io/cluster-api/internal/util/ssa"
)

Expand Down Expand Up @@ -60,11 +60,8 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context

// Wait until the cache observed the Machine with UpdateInProgressAnnotation to ensure subsequent reconciles
// will observe it as well and accordingly don't trigger another in-place update concurrently.
if err := waitForCache(ctx, r.Client, machine, func(m *clusterv1.Machine) bool {
_, annotationSet := m.Annotations[clusterv1.UpdateInProgressAnnotation]
return annotationSet
}); err != nil {
return errors.Wrapf(err, "failed waiting for Machine %s to be updated in the cache after setting the %s annotation", klog.KObj(machine), clusterv1.UpdateInProgressAnnotation)
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, fmt.Sprintf("setting the %s annotation", clusterv1.UpdateInProgressAnnotation), machine); err != nil {
return err
}
}

Expand Down Expand Up @@ -141,13 +138,7 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context

// Wait until the cache observed the Machine with PendingHooksAnnotation to ensure subsequent reconciles
// will observe it as well and won't repeatedly call triggerInPlaceUpdate.
if err := waitForCache(ctx, r.Client, machine, func(m *clusterv1.Machine) bool {
return hooks.IsPending(runtimehooksv1.UpdateMachine, m)
}); err != nil {
return errors.Wrapf(err, "failed waiting for Machine %s to be updated in the cache after marking the UpdateMachine hook as pending", klog.KObj(machine))
}

return nil
return clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "marking the UpdateMachine hook as pending", desiredMachine)
}

func (r *KubeadmControlPlaneReconciler) removeInitConfiguration(ctx context.Context, desiredKubeadmConfig *bootstrapv1.KubeadmConfig) error {
Expand Down Expand Up @@ -176,13 +167,3 @@ func (r *KubeadmControlPlaneReconciler) removeInitConfiguration(ctx context.Cont
}
return nil
}

func waitForCache(ctx context.Context, c client.Client, machine *clusterv1.Machine, f func(m *clusterv1.Machine) bool) error {
return wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
m := &clusterv1.Machine{}
if err := c.Get(ctx, client.ObjectKeyFromObject(machine), m); err != nil {
return false, err
}
return f(m), nil
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand All @@ -41,6 +39,7 @@ import (

clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
"sigs.k8s.io/cluster-api/controllers/external"
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/collections"
Expand Down Expand Up @@ -349,24 +348,10 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
// Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of
// the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of
// a duplicate MachineSet.
var pollErrors []error
tmpMS := &clusterv1.MachineSet{}
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ms), tmpMS); err != nil {
// Do not return error here. Continue to poll even if we hit an error
// so that we avoid existing because of transient errors like network flakes.
// Capture all the errors and return the aggregate error if the poll fails eventually.
pollErrors = append(pollErrors, err)
return false, nil
}
return true, nil
}); err != nil {
return errors.Wrapf(kerrors.NewAggregate(pollErrors), "failed to get the MachineSet %s after creation", klog.KObj(ms))
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet creation", ms); err != nil {
return err
}

// Report back creation timestamp, because legacy scale func uses it to sort machines.
// TODO(in-place): drop this as soon as handling of MD with paused rollouts is moved into rollout planner (see scale in machinedeployment_sync.go).
ms.CreationTimestamp = tmpMS.CreationTimestamp
continue
}

Expand All @@ -387,17 +372,24 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c

newReplicas := ptr.Deref(ms.Spec.Replicas, 0)
if newReplicas < originalReplicas {
changes = append(changes, "replicas", newReplicas)
log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas), changes...)
changes = append(changes, fmt.Sprintf("replicas %d", newReplicas))
log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas), "diff", strings.Join(changes, ","))
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled down MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
}
if newReplicas > originalReplicas {
changes = append(changes, "replicas", newReplicas)
log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas), changes...)
changes = append(changes, fmt.Sprintf("replicas %d", newReplicas))
log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas), "diff", strings.Join(changes, ","))
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled up MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
}
if newReplicas == originalReplicas && len(changes) > 0 {
log.Info(fmt.Sprintf("MachineSet %s updated", ms.Name), changes...)
log.Info(fmt.Sprintf("MachineSet %s updated", ms.Name), "diff", strings.Join(changes, ","))
}

// Only wait for cache if the object was changed.
if originalMS.ResourceVersion != ms.ResourceVersion {
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "MachineSet update", ms); err != nil {
return err
}
}
}

Expand All @@ -412,37 +404,37 @@ func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(c
return nil
}

func getAnnotationChanges(originalMS *clusterv1.MachineSet, ms *clusterv1.MachineSet) []any {
changes := []any{}
func getAnnotationChanges(originalMS *clusterv1.MachineSet, ms *clusterv1.MachineSet) []string {
changes := []string{}
if originalMS.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation] != ms.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation] {
if value, ok := ms.Annotations[clusterv1.MachineSetMoveMachinesToMachineSetAnnotation]; ok {
changes = append(changes, clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, value)
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, value))
} else {
changes = append(changes, clusterv1.MachineSetMoveMachinesToMachineSetAnnotation, "(annotation removed)")
changes = append(changes, fmt.Sprintf("%s removed", clusterv1.MachineSetMoveMachinesToMachineSetAnnotation))
}
}

if originalMS.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation] != ms.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation] {
if value, ok := ms.Annotations[clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation]; ok {
changes = append(changes, clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, value)
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, value))
} else {
changes = append(changes, clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation, "(annotation removed)")
changes = append(changes, fmt.Sprintf("%s removed", clusterv1.MachineSetReceiveMachinesFromMachineSetsAnnotation))
}
}

if originalMS.Annotations[clusterv1.AcknowledgedMoveAnnotation] != ms.Annotations[clusterv1.AcknowledgedMoveAnnotation] {
if value, ok := ms.Annotations[clusterv1.AcknowledgedMoveAnnotation]; ok {
changes = append(changes, clusterv1.AcknowledgedMoveAnnotation, value)
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.AcknowledgedMoveAnnotation, value))
} else {
changes = append(changes, clusterv1.AcknowledgedMoveAnnotation, "(annotation removed)")
changes = append(changes, fmt.Sprintf("%s removed", clusterv1.AcknowledgedMoveAnnotation))
}
}

if originalMS.Annotations[clusterv1.DisableMachineCreateAnnotation] != ms.Annotations[clusterv1.DisableMachineCreateAnnotation] {
if value, ok := ms.Annotations[clusterv1.DisableMachineCreateAnnotation]; ok {
changes = append(changes, clusterv1.DisableMachineCreateAnnotation, value)
changes = append(changes, fmt.Sprintf("%s: %s", clusterv1.DisableMachineCreateAnnotation, value))
} else {
changes = append(changes, clusterv1.DisableMachineCreateAnnotation, "(annotation removed)")
changes = append(changes, fmt.Sprintf("%s removed", clusterv1.DisableMachineCreateAnnotation))
}
}
return changes
Expand Down
103 changes: 5 additions & 98 deletions internal/controllers/machineset/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand All @@ -54,6 +53,7 @@ import (
"sigs.k8s.io/cluster-api/internal/controllers/machine"
"sigs.k8s.io/cluster-api/internal/hooks"
topologynames "sigs.k8s.io/cluster-api/internal/topology/names"
clientutil "sigs.k8s.io/cluster-api/internal/util/client"
"sigs.k8s.io/cluster-api/internal/util/inplace"
"sigs.k8s.io/cluster-api/internal/util/ssa"
"sigs.k8s.io/cluster-api/util"
Expand All @@ -72,13 +72,6 @@ import (
var (
// machineSetKind contains the schema.GroupVersionKind for the MachineSet type.
machineSetKind = clusterv1.GroupVersion.WithKind("MachineSet")

// stateConfirmationTimeout is the amount of time allowed to wait for desired state.
stateConfirmationTimeout = 10 * time.Second

// stateConfirmationInterval is the amount of time between polling for the desired state.
// The polling is against a local memory cache.
stateConfirmationInterval = 100 * time.Millisecond
)

const (
Expand Down Expand Up @@ -421,7 +414,7 @@ func (r *Reconciler) triggerInPlaceUpdate(ctx context.Context, s *scope) (ctrl.R

// Wait until the cache observed the Machine with PendingHooksAnnotation to ensure subsequent reconciles
// will observe it as well and won't repeatedly call the logic to trigger in-place update.
if err := r.waitForMachinesInPlaceUpdateStarted(ctx, machinesTriggeredInPlace); err != nil {
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "starting in-place updates", machinesTriggeredInPlace...); err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
Expand Down Expand Up @@ -900,7 +893,7 @@ func (r *Reconciler) createMachines(ctx context.Context, s *scope, machinesToAdd
}

// Wait for cache update to ensure following reconcile gets latest change.
return ctrl.Result{}, r.waitForMachinesCreation(ctx, machinesAdded)
return ctrl.Result{}, clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "Machine creation", machinesAdded...)
}

func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDelete int) (ctrl.Result, error) {
Expand Down Expand Up @@ -948,7 +941,7 @@ func (r *Reconciler) deleteMachines(ctx context.Context, s *scope, machinesToDel
}

// Wait for cache update to ensure following reconcile gets latest change.
if err := r.waitForMachinesDeletion(ctx, machinesDeleted); err != nil {
if err := clientutil.WaitForObjectsToBeDeletedFromTheCache(ctx, r.Client, "Machine deletion", machinesDeleted...); err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
Expand Down Expand Up @@ -1061,7 +1054,7 @@ func (r *Reconciler) startMoveMachines(ctx context.Context, s *scope, targetMSNa
}

// Wait for cache update to ensure following reconcile gets latest change.
if err := r.waitForMachinesStartedMove(ctx, machinesMoved); err != nil {
if err := clientutil.WaitForCacheToBeUpToDate(ctx, r.Client, "moving Machines", machinesMoved...); err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
Expand Down Expand Up @@ -1242,92 +1235,6 @@ func (r *Reconciler) adoptOrphan(ctx context.Context, machineSet *clusterv1.Mach
return r.Client.Patch(ctx, machine, patch)
}

func (r *Reconciler) waitForMachinesCreation(ctx context.Context, machines []*clusterv1.Machine) error {
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
for _, machine := range machines {
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
if err := r.Client.Get(ctx, key, &clusterv1.Machine{}); err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
}
return true, nil
})

if pollErr != nil {
return errors.Wrap(pollErr, "failed waiting for Machines to be created")
}
return nil
}

func (r *Reconciler) waitForMachinesDeletion(ctx context.Context, machines []*clusterv1.Machine) error {
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
for _, machine := range machines {
m := &clusterv1.Machine{}
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
if err := r.Client.Get(ctx, key, m); err != nil {
if apierrors.IsNotFound(err) {
continue
}
return false, err
}
if m.DeletionTimestamp.IsZero() {
return false, nil
}
}
return true, nil
})

if pollErr != nil {
return errors.Wrap(pollErr, "failed waiting for Machines to be deleted")
}
return nil
}

func (r *Reconciler) waitForMachinesStartedMove(ctx context.Context, machines []*clusterv1.Machine) error {
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
for _, machine := range machines {
m := &clusterv1.Machine{}
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
if err := r.Client.Get(ctx, key, m); err != nil {
return false, err
}
if _, annotationSet := m.Annotations[clusterv1.UpdateInProgressAnnotation]; !annotationSet {
return false, nil
}
}
return true, nil
})

if pollErr != nil {
return errors.Wrap(pollErr, "failed waiting for Machines to start move")
}
return nil
}

func (r *Reconciler) waitForMachinesInPlaceUpdateStarted(ctx context.Context, machines []*clusterv1.Machine) error {
pollErr := wait.PollUntilContextTimeout(ctx, stateConfirmationInterval, stateConfirmationTimeout, true, func(ctx context.Context) (bool, error) {
for _, machine := range machines {
m := &clusterv1.Machine{}
key := client.ObjectKey{Namespace: machine.Namespace, Name: machine.Name}
if err := r.Client.Get(ctx, key, m); err != nil {
return false, err
}
if !hooks.IsPending(runtimehooksv1.UpdateMachine, m) {
return false, nil
}
}
return true, nil
})

if pollErr != nil {
return errors.Wrap(pollErr, "failed waiting for Machines to complete move")
}
return nil
}

// MachineToMachineSets is a handler.ToRequestsFunc to be used to enqueue requests for reconciliation
// for MachineSets that might adopt an orphaned Machine.
func (r *Reconciler) MachineToMachineSets(ctx context.Context, o client.Object) []ctrl.Request {
Expand Down
12 changes: 12 additions & 0 deletions internal/controllers/machineset/machineset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2679,6 +2679,18 @@ func TestMachineSetReconciler_createMachines(t *testing.T) {
infraTmpl,
).WithInterceptorFuncs(tt.interceptorFuncs(&i)).Build()

// TODO(controller-runtime-0.23): This workaround is needed because controller-runtime v0.22 does not set resourceVersion correctly with SSA (fixed with v0.23).
fakeClient = interceptor.NewClient(fakeClient, interceptor.Funcs{
Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error {
clientObject, ok := obj.(client.Object)
if !ok {
return errors.Errorf("error during object creation: unexpected ApplyConfiguration")
}
clientObject.SetResourceVersion("1")
return c.Apply(ctx, obj, opts...)
},
})

r := &Reconciler{
Client: fakeClient,
recorder: record.NewFakeRecorder(32),
Expand Down
Loading