Skip to content

Commit 3fcc4cb

Browse files
craig[bot]andyyang890wenyihu6
committed
154982: changefeedccl: partially revert periodic aggregator frontier flushing r=KeithCh,asg0451 a=andyyang890 This patch partially reverts adfa889 since that change appears to have caused `TestChangefeedNemeses` to start failing for the `cloudstorage` sink. The change aggregator will now once again flush its frontier only if either its frontier has advanced or some of its spans are lagging. Fixes #154325 Release note: None 155100: kvserver: add vmodule logging for TestReplicateQueueSeesLearnerOrJointConfig r=wenyihu6 a=wenyihu6 Informs: #153988 Release note: none Co-authored-by: Andy Yang <yang@cockroachlabs.com> Co-authored-by: wenyihu6 <wenyi@cockroachlabs.com>
3 parents e0df19f + 94b54b7 + f122d2e commit 3fcc4cb

File tree

5 files changed

+39
-4
lines changed

5 files changed

+39
-4
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,10 @@ type changeAggregator struct {
108108
eventConsumer eventConsumer
109109

110110
flushFrequency time.Duration // how often high watermark can be checkpointed.
111+
lastSpanFlush time.Time // last time expensive, span based checkpoint was written.
111112

112113
// frontierFlushLimiter is a rate limiter for flushing the span frontier
113-
// to the coordinator.
114+
// to the coordinator because of frontier advancement.
114115
frontierFlushLimiter *saveRateLimiter
115116

116117
// frontier keeps track of resolved timestamps for spans along with schema change
@@ -480,6 +481,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
480481

481482
// Init heartbeat timer.
482483
ca.lastPush = timeutil.Now()
484+
485+
// Generate expensive checkpoint only after we ran for a while.
486+
ca.lastSpanFlush = timeutil.Now()
483487
}
484488

485489
func (ca *changeAggregator) startKVFeed(
@@ -932,16 +936,34 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
932936
ca.sliMetrics.setResolved(ca.sliMetricsID, ca.frontier.Frontier())
933937
}
934938

939+
if ca.knobs.ShouldFlushFrontier != nil && ca.knobs.ShouldFlushFrontier(resolved) {
940+
return ca.flushFrontier(ctx)
941+
}
942+
935943
forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE
936944

937-
checkpointFrontier := (advanced && forceFlush) || ca.frontierFlushLimiter.canSave(ctx)
945+
// TODO(#155015): Re-enable periodic frontier flushing.
946+
checkpointFrontier := advanced && (forceFlush || ca.frontierFlushLimiter.canSave(ctx))
938947

939948
if checkpointFrontier {
940949
now := crtime.NowMono()
941950
if err := ca.flushFrontier(ctx); err != nil {
942951
return err
943952
}
944953
ca.frontierFlushLimiter.doneSave(now.Elapsed())
954+
return nil
955+
}
956+
957+
// At a lower frequency, we checkpoint specific spans in the job progress
958+
// either in backfills or if the highwater mark is excessively lagging behind.
959+
checkpointSpans := (ca.frontier.InBackfill(resolved) || ca.frontier.HasLaggingSpans(sv)) &&
960+
canCheckpointSpans(sv, ca.lastSpanFlush)
961+
962+
if checkpointSpans {
963+
defer func() {
964+
ca.lastSpanFlush = timeutil.Now()
965+
}()
966+
return ca.flushFrontier(ctx)
945967
}
946968

947969
return nil
@@ -1761,7 +1783,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
17611783
// The feed's checkpoint is tracked in a map which is used to inform the
17621784
// checkpoint_progress metric which will return the lowest timestamp across
17631785
// all feeds in the scope.
1764-
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)
1786+
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, cf.frontier.Frontier())
17651787

17661788
return cf.maybeEmitResolved(cf.Ctx(), newResolved)
17671789
}

pkg/ccl/changefeedccl/changefeed_progress_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ func TestChangefeedProgressSkewMetrics(t *testing.T) {
234234
}
235235
return false, nil
236236
}
237+
238+
knobs.ShouldFlushFrontier = func(rs jobspb.ResolvedSpan) bool {
239+
return true
240+
}
237241
}
238242

239243
// Create changefeed for both tables with no initial scan.
@@ -269,7 +273,8 @@ WITH no_initial_scan, min_checkpoint_frequency='1s', resolved, metrics_label='%s
269273
}
270274
if !perTableTracking {
271275
if tableSkew != 0 {
272-
return errors.Newf("expected table skew to be 0, got %d", tableSkew)
276+
// TODO(#155083): Return an error here.
277+
return nil
273278
}
274279
return nil
275280
}

pkg/ccl/changefeedccl/testing_knobs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ type TestingKnobs struct {
7171
partitions []sql.SpanPartition, draining []roachpb.NodeID,
7272
) ([]sql.SpanPartition, error)
7373

74+
// ShouldFlushFrontier returns true if the change aggregator should flush
75+
// its frontier after processing a resolved span.
76+
ShouldFlushFrontier func(rs jobspb.ResolvedSpan) bool
77+
7478
// ShouldCheckpointToJobRecord returns true if change frontier should checkpoint itself
7579
// to the job record.
7680
ShouldCheckpointToJobRecord func(hw hlc.Timestamp) bool

pkg/cmd/roachtest/tests/cdc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,6 +2146,9 @@ CONFIGURE ZONE USING
21462146
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
21472147
runCDCInitialScanRollingRestart(ctx, t, c, cdcFrontierPersistence)
21482148
},
2149+
// TODO(#155015): Unskip this test.
2150+
Skip: "frontier persistence will not happen during an initial-scan only changefeed " +
2151+
"without periodic aggregator frontier flushes",
21492152
})
21502153
r.Add(registry.TestSpec{
21512154
Name: "cdc/fine-grained-checkpointing",

pkg/kv/kvserver/replica_learner_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,6 +1220,7 @@ func TestReplicateQueueSeesLearnerOrJointConfig(t *testing.T) {
12201220
defer leaktest.AfterTest(t)()
12211221
defer log.Scope(t).Close(t)
12221222
// NB also see TestAllocatorRemoveLearner for a lower-level test.
1223+
testutils.SetVModule(t, "queue=4,replicate_queue=4,replica_command=4,allocator=4,replicate=4")
12231224

12241225
ctx := context.Background()
12251226
_, ltk := makeReplicationTestKnobs()

0 commit comments

Comments
 (0)