Skip to content

Commit 199aa65

Browse files
Populate list of clusters in the controller at startup. (#364)
Assign the list of clusters in the controller with the up-to-date list of Postgres manifests on Kubernetes during the startup. Node migration routines launched asynchronously to the cluster processing rely on an up-to-date list of clusters in the controller to detect clusters affected by the migration of the node and lock them when doing migration of master pods. Without the initial list the operator was subject to race conditions like the one described at #363 Restructure the code to decouple list cluster function required by the postgresql informer from the one that emits cluster sync events. No extra work is introduced, since cluster sync already runs in a separate goroutine (clusterResync). Introduce explicit initial cluster sync at the end of acquireInitialListOfClusters instead of relying on an implicit one coming from list function of the PostgreSQL informer. Some minor refactoring. Review by @zerg-junior
1 parent acf46bf commit 199aa65

File tree

3 files changed

+77
-31
lines changed

3 files changed

+77
-31
lines changed

pkg/controller/controller.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -326,18 +326,25 @@ func (c *Controller) initSharedInformers() {
326326
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
327327
c.initController()
328328

329+
// start workers reading from the events queue to prevent the initial sync from blocking on it.
330+
for i := range c.clusterEventQueues {
331+
wg.Add(1)
332+
c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
333+
go c.processClusterEventsQueue(i, stopCh, wg)
334+
}
335+
336+
// populate clusters before starting nodeInformer that relies on it and run the initial sync
337+
if err := c.acquireInitialListOfClusters(); err != nil {
338+
panic("could not acquire initial list of clusters")
339+
}
340+
329341
wg.Add(5)
330342
go c.runPodInformer(stopCh, wg)
331343
go c.runPostgresqlInformer(stopCh, wg)
332344
go c.clusterResync(stopCh, wg)
333345
go c.apiserver.Run(stopCh, wg)
334346
go c.kubeNodesInformer(stopCh, wg)
335347

336-
for i := range c.clusterEventQueues {
337-
wg.Add(1)
338-
c.workerLogs[uint32(i)] = ringlog.New(c.opConfig.RingLogLines)
339-
go c.processClusterEventsQueue(i, stopCh, wg)
340-
}
341348

342349
c.logger.Info("started working in background")
343350
}

pkg/controller/postgresql.go

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
3232
for {
3333
select {
3434
case <-ticker.C:
35-
if _, err := c.clusterListFunc(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
35+
if err := c.clusterListAndSync(); err != nil {
3636
c.logger.Errorf("could not list clusters: %v", err)
3737
}
3838
case <-stopCh:
@@ -41,15 +41,10 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) {
4141
}
4242
}
4343

44-
// TODO: make a separate function to be called from InitSharedInformers
45-
// clusterListFunc obtains a list of all PostgreSQL clusters and runs sync when necessary
46-
// NB: as this function is called directly by the informer, it needs to avoid acquiring locks
47-
// on individual cluster structures. Therefore, it acts on the manifests obtained from Kubernetes
48-
// and not on the internal state of the clusters.
49-
func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) {
44+
// clusterListFunc obtains a list of all PostgreSQL clusters
45+
func (c *Controller) listClusters(options metav1.ListOptions) (*spec.PostgresqlList, error) {
5046
var (
51-
list spec.PostgresqlList
52-
event spec.EventType
47+
list spec.PostgresqlList
5348
)
5449

5550
req := c.KubeClient.CRDREST.
@@ -67,21 +62,42 @@ func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object
6762
c.logger.Warningf("could not unmarshal list of clusters: %v", err)
6863
}
6964

65+
return &list, err
66+
67+
}
68+
69+
// A separate function to be called from InitSharedInformers
70+
func (c *Controller) clusterListFunc(options metav1.ListOptions) (runtime.Object, error) {
71+
return c.listClusters(options)
72+
}
73+
74+
// clusterListAndSync lists all manifests and decides whether to run the sync or repair.
75+
func (c *Controller) clusterListAndSync() error {
76+
var (
77+
err error
78+
event spec.EventType
79+
)
80+
7081
currentTime := time.Now().Unix()
7182
timeFromPreviousSync := currentTime - atomic.LoadInt64(&c.lastClusterSyncTime)
7283
timeFromPreviousRepair := currentTime - atomic.LoadInt64(&c.lastClusterRepairTime)
84+
7385
if timeFromPreviousSync >= int64(c.opConfig.ResyncPeriod.Seconds()) {
7486
event = spec.EventSync
7587
} else if timeFromPreviousRepair >= int64(c.opConfig.RepairPeriod.Seconds()) {
7688
event = spec.EventRepair
7789
}
7890
if event != "" {
79-
c.queueEvents(&list, event)
91+
var list *spec.PostgresqlList
92+
if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
93+
return err
94+
}
95+
c.queueEvents(list, event)
8096
} else {
8197
c.logger.Infof("not enough time passed since the last sync (%s seconds) or repair (%s seconds)",
8298
timeFromPreviousSync, timeFromPreviousRepair)
8399
}
84-
return &list, err
100+
return nil
85101
}
86102

87103
// queueEvents queues a sync or repair event for every cluster with a valid manifest
@@ -125,6 +141,30 @@ func (c *Controller) queueEvents(list *spec.PostgresqlList, event spec.EventType
125141
}
126142
}
127143

144+
func (c *Controller) acquireInitialListOfClusters() error {
145+
var (
146+
list *spec.PostgresqlList
147+
err error
148+
clusterName spec.NamespacedName
149+
)
150+
151+
if list, err = c.listClusters(metav1.ListOptions{ResourceVersion: "0"}); err != nil {
152+
return err
153+
}
154+
c.logger.Debugf("acquiring initial list of clusters")
155+
for _, pg := range list.Items {
156+
if pg.Error != nil {
157+
continue
158+
}
159+
clusterName = util.NameFromMeta(pg.ObjectMeta)
160+
c.addCluster(c.logger, clusterName, &pg)
161+
c.logger.Debugf("added new cluster: %q", clusterName)
162+
}
163+
// initiate initial sync of all clusters.
164+
c.queueEvents(list, spec.EventSync)
165+
return nil
166+
}
167+
128168
type crdDecoder struct {
129169
dec *json.Decoder
130170
close func() error

pkg/util/config/util.go

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -172,48 +172,47 @@ func processField(value string, field reflect.Value) error {
172172
type parserState int
173173

174174
const (
175-
Plain parserState = iota
176-
DoubleQuoted
177-
SingleQuoted
178-
Escape
175+
plain parserState = iota
176+
doubleQuoted
177+
singleQuoted
179178
)
180179

181180
// Split the pair candidates by commas not located inside open quotes
182181
// Escape characters are not supported for simplicity, as we don't
183182
// expect to find them inside the map values for our use cases
184183
func getMapPairsFromString(value string) (pairs []string, err error) {
185184
pairs = make([]string, 0)
186-
state := Plain
185+
state := plain
187186
var start, quote int
188187

189188
for i, ch := range strings.Split(value, "") {
190189
if (ch == `"` || ch == `'`) && i > 0 && value[i-1] == '\\' {
191190
fmt.Printf("Parser warning: ecape character '\\' have no effect on quotes inside the configuration value %s\n", value)
192191
}
193192
if ch == `"` {
194-
if state == Plain {
195-
state = DoubleQuoted
193+
if state == plain {
194+
state = doubleQuoted
196195
quote = i
197-
} else if state == DoubleQuoted {
198-
state = Plain
196+
} else if state == doubleQuoted {
197+
state = plain
199198
quote = 0
200199
}
201200
}
202201
if ch == "'" {
203-
if state == Plain {
204-
state = SingleQuoted
202+
if state == plain {
203+
state = singleQuoted
205204
quote = i
206-
} else if state == SingleQuoted {
207-
state = Plain
205+
} else if state == singleQuoted {
206+
state = plain
208207
quote = 0
209208
}
210209
}
211-
if ch == "," && state == Plain {
210+
if ch == "," && state == plain {
212211
pairs = append(pairs, strings.Trim(value[start:i], " \t"))
213212
start = i + 1
214213
}
215214
}
216-
if state != Plain {
215+
if state != plain {
217216
err = fmt.Errorf("unmatched quote starting at position %d", quote+1)
218217
pairs = nil
219218
} else {

0 commit comments

Comments
 (0)