Skip to content

Commit c5053a4

Browse files
authored
REP-6735 Allow rechecks to be enqueued during recheck task gen (#157)
Historically, the verifier blocked the enqueuing of rechecks while generating new recheck tasks. This slowed down the change stream because it couldn’t enqueue rechecks during that time. There seems to be no reason why the change stream needs to be blocked while creating new tasks, though. This changeset removes the restriction. It also tweaks the generation-numbering so that the generation persisted with the recheck is the generation where the document will be rechecked rather than the generation where the recheck was enqueued. The net effect here is that, with the change stream not blocked for several seconds between generations, higher throughput is possible.
1 parent f69ce63 commit c5053a4

File tree

6 files changed

+59
-84
lines changed

6 files changed

+59
-84
lines changed

internal/verifier/change_stream_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() {
359359
assert.Eventually(
360360
suite.T(),
361361
func() bool {
362-
rechecks := suite.fetchVerifierRechecks(ctx, verifier2)
362+
rechecks := suite.fetchPendingVerifierRechecks(ctx, verifier2)
363363

364364
return lo.SomeBy(
365365
rechecks,
@@ -386,7 +386,7 @@ func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() {
386386
sess := lo.Must(verifier2.verificationDatabase().Client().StartSession())
387387
sctx := mongo.NewSessionContext(ctx, sess)
388388

389-
rechecks := suite.fetchVerifierRechecks(sctx, verifier2)
389+
rechecks := suite.fetchPendingVerifierRechecks(sctx, verifier2)
390390
if !assert.EqualValues(suite.T(), lastDocID, len(rechecks), "all source docs should be rechecked") {
391391
for _, recheck := range rechecks {
392392
suite.T().Logf("found recheck: %v", recheck)
@@ -426,7 +426,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
426426
verifier2 := suite.BuildVerifier()
427427

428428
suite.Require().Empty(
429-
suite.fetchVerifierRechecks(ctx, verifier2),
429+
suite.fetchPendingVerifierRechecks(ctx, verifier2),
430430
"no rechecks should be enqueued before starting change stream",
431431
)
432432

@@ -446,7 +446,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
446446
require.Eventually(
447447
suite.T(),
448448
func() bool {
449-
recheckDocs = suite.fetchVerifierRechecks(ctx, verifier2)
449+
recheckDocs = suite.fetchPendingVerifierRechecks(ctx, verifier2)
450450

451451
return len(recheckDocs) > 0
452452
},
@@ -490,10 +490,10 @@ func (suite *IntegrationTestSuite) getClusterTime(ctx context.Context, client *m
490490
return newTime
491491
}
492492

493-
func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M {
493+
func (suite *IntegrationTestSuite) fetchPendingVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M {
494494
recheckDocs := []bson.M{}
495495

496-
recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
496+
recheckColl := verifier.getRecheckQueueCollection(1 + verifier.generation)
497497
cursor, err := recheckColl.Aggregate(
498498
ctx,
499499
mongo.Pipeline{
@@ -732,7 +732,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
732732
require.Eventually(
733733
suite.T(),
734734
func() bool {
735-
rechecks = suite.fetchVerifierRechecks(ctx, verifier)
735+
rechecks = suite.fetchPendingVerifierRechecks(ctx, verifier)
736736
return len(rechecks) == 3
737737
},
738738
time.Minute,
@@ -1069,7 +1069,7 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
10691069
require.Eventually(
10701070
suite.T(),
10711071
func() bool {
1072-
recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
1072+
recheckColl := verifier.getRecheckQueueCollection(1 + verifier.generation)
10731073
cursor, err := recheckColl.Find(ctx, bson.D{})
10741074
if errors.Is(err, mongo.ErrNoDocuments) {
10751075
return false

internal/verifier/check.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,30 +390,35 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
390390
verifier.mux.Lock()
391391
verifier.lastGeneration = true
392392
}
393+
394+
// Increment the in-memory generation so that the change streams will
395+
// mark rechecks for the next generation. For example, if we just
396+
// finished generation 2, the change streams need to mark generation 3
397+
// on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will
398+
// derive from rechecks enqueued during generation 2.
393399
verifier.generation++
394400
verifier.phase = Recheck
401+
verifier.mux.Unlock()
395402

396403
// Generation of recheck tasks can partial-fail. The following will
397404
// cause a full redo in that case, which is inefficient but simple.
398405
// Such failures seem unlikely anyhow.
399406
err = retry.New().WithCallback(
400407
func(ctx context.Context, _ *retry.FuncInfo) error {
401-
return verifier.GenerateRecheckTasksWhileLocked(ctx)
408+
return verifier.GenerateRecheckTasks(ctx)
402409
},
403410
"generating recheck tasks",
404411
).Run(ctx, verifier.logger)
405412
if err != nil {
406-
verifier.mux.Unlock()
407413
return err
408414
}
409415

410-
err = verifier.DropOldRecheckQueueWhileLocked(ctx)
416+
err = verifier.DropCurrentGenRecheckQueue(ctx)
411417
if err != nil {
412418
verifier.logger.Warn().
413419
Err(err).
414420
Msg("Failed to clear out old recheck docs. (This is probably unimportant.)")
415421
}
416-
verifier.mux.Unlock()
417422
}
418423
}
419424

internal/verifier/metadata.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,7 @@ package verifier
33
// Metadata version history:
44
// 1: Defined metadata version.
55
// 2: Split failed-task discrepancies into separate collection.
6+
// 3: Enqueued rechecks now reference the generation in which they’ll be
7+
// rechecked rather than the generation during which they were enqueued.
68

7-
const verifierMetadataVersion = 2
9+
const verifierMetadataVersion = 3

internal/verifier/migration_verifier_test.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -714,11 +714,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck()
714714

715715
verifier.generation++
716716

717-
func() {
718-
verifier.mux.Lock()
719-
defer func() { verifier.mux.Unlock() }()
720-
suite.Require().NoError(verifier.GenerateRecheckTasksWhileLocked(ctx))
721-
}()
717+
suite.Require().NoError(verifier.GenerateRecheckTasks(ctx))
722718

723719
stats, err := verifier.GetPersistedNamespaceStatistics(ctx)
724720
suite.Require().NoError(err)
@@ -958,6 +954,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
958954
[]int32{100},
959955
)
960956
suite.Require().NoError(err)
957+
961958
event := ParsedEvent{
962959
DocID: mbson.ToRawValue(int32(55)),
963960
OpType: "delete",
@@ -976,6 +973,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
976973

977974
err = verifier.HandleChangeStreamEvents(ctx, batch, src)
978975
suite.Require().NoError(err)
976+
979977
event.OpType = "insert"
980978
err = verifier.HandleChangeStreamEvents(ctx, batch, src)
981979
suite.Require().NoError(err)
@@ -995,13 +993,9 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
995993
)
996994

997995
verifier.generation++
998-
func() {
999-
verifier.mux.Lock()
1000-
defer verifier.mux.Unlock()
1001996

1002-
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
1003-
suite.Require().NoError(err)
1004-
}()
997+
err = verifier.GenerateRecheckTasks(ctx)
998+
suite.Require().NoError(err)
1005999

10061000
var doc bson.M
10071001
cur, err := verifier.verificationTaskCollection().Find(ctx, bson.M{"generation": 1})
@@ -2323,10 +2317,12 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
23232317
}
23242318

23252319
func (suite *IntegrationTestSuite) awaitEnqueueOfRechecks(verifier *Verifier, minDocs int) {
2320+
suite.T().Helper()
2321+
23262322
var lastNonzeroRechecksCount int
23272323

23282324
suite.Eventually(func() bool {
2329-
cursor, err := verifier.getRecheckQueueCollection(verifier.generation).
2325+
cursor, err := verifier.getRecheckQueueCollection(1+verifier.generation).
23302326
Find(suite.Context(), bson.D{})
23312327
var rechecks []bson.D
23322328
suite.Require().NoError(err)

internal/verifier/recheck.go

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ func (verifier *Verifier) insertRecheckDocs(
6161

6262
generation, _ := verifier.getGenerationWhileLocked()
6363

64+
// We enqueue for the generation after the current one.
65+
generation++
66+
6467
eg, groupCtx := contextplus.ErrGroup(ctx)
6568

6669
// MongoDB’s Go driver starts failing requests if we try to exceed
@@ -233,66 +236,37 @@ func buildRequestBSON(collName string, rechecks []bson.Raw) bson.Raw {
233236
return requestBSON
234237
}
235238

236-
// DropOldRecheckQueueWhileLocked deletes the previous generation’s recheck
237-
// documents from the verifier’s metadata.
238-
//
239-
// The verifier **MUST** be locked when this function is called (or panic).
240-
func (verifier *Verifier) DropOldRecheckQueueWhileLocked(ctx context.Context) error {
241-
prevGeneration := verifier.getPreviousGenerationWhileLocked()
239+
// DropCurrentGenRecheckQueue deletes the current generation’s recheck
240+
// documents from the verifier’s metadata. This should only be called
241+
// after new recheck tasks have been created from those rechecks.
242+
func (verifier *Verifier) DropCurrentGenRecheckQueue(ctx context.Context) error {
243+
generation := verifier.generation
242244

243245
verifier.logger.Debug().
244-
Int("previousGeneration", prevGeneration).
245-
Msg("Deleting previous generation's enqueued rechecks.")
246+
Int("generation", generation).
247+
Msg("Deleting current generation's enqueued rechecks.")
246248

247-
genCollection := verifier.getRecheckQueueCollection(prevGeneration)
249+
genCollection := verifier.getRecheckQueueCollection(generation)
248250

249251
return retry.New().WithCallback(
250252
func(ctx context.Context, i *retry.FuncInfo) error {
251253
return genCollection.Drop(ctx)
252254
},
253255
"deleting generation %d's enqueued rechecks",
254-
prevGeneration,
256+
generation,
255257
).Run(ctx, verifier.logger)
256258
}
257259

258-
func (verifier *Verifier) getPreviousGenerationWhileLocked() int {
259-
generation, _ := verifier.getGenerationWhileLocked()
260-
if generation < 1 {
261-
panic("This function is forbidden before generation 1!")
262-
}
263-
264-
return generation - 1
265-
}
266-
267-
// GenerateRecheckTasksWhileLocked fetches the previous generation’s recheck
268-
// documents from the verifier’s metadata and creates current-generation
269-
// document-verification tasks from them.
260+
// GenerateRecheckTasks fetches the rechecks enqueued for the current generation
261+
// from the verifier’s metadata and creates document-verification tasks from
262+
// them.
270263
//
271264
// Note that this function DOES NOT retry on failure, so callers should wrap
272265
// calls to this function in a retryer.
273-
//
274-
// The verifier **MUST** be locked when this function is called (or panic).
275-
func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) error {
276-
prevGeneration := verifier.getPreviousGenerationWhileLocked()
266+
func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
267+
generation, _ := verifier.getGeneration()
277268

278-
verifier.logger.Debug().
279-
Int("priorGeneration", prevGeneration).
280-
Msgf("Counting prior generation’s enqueued rechecks.")
281-
282-
recheckColl := verifier.getRecheckQueueCollection(prevGeneration)
283-
284-
rechecksCount, err := recheckColl.CountDocuments(ctx, bson.D{})
285-
if err != nil {
286-
return errors.Wrapf(err,
287-
"failed to count generation %d’s rechecks",
288-
prevGeneration,
289-
)
290-
}
291-
292-
verifier.logger.Debug().
293-
Int("priorGeneration", prevGeneration).
294-
Int("rechecksCount", int(rechecksCount)).
295-
Msgf("Creating recheck tasks from prior generation’s enqueued rechecks.")
269+
recheckColl := verifier.getRecheckQueueCollection(generation)
296270

297271
startTime := time.Now()
298272

@@ -428,7 +402,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
428402

429403
if err == nil && totalDocs > 0 {
430404
verifier.logger.Info().
431-
Int("generation", 1+prevGeneration).
405+
Int("generation", generation).
432406
Int64("totalDocs", int64(totalDocs)).
433407
Str("totalData", reportutils.FmtBytes(totalRecheckData)).
434408
Stringer("timeElapsed", time.Since(startTime)).
@@ -439,6 +413,10 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
439413
}
440414

441415
func (v *Verifier) getRecheckQueueCollection(generation int) *mongo.Collection {
416+
if generation == 0 {
417+
panic("Recheck for generation 0 is nonsense!")
418+
}
419+
442420
return v.verificationDatabase().
443421
Collection(fmt.Sprintf("%s_gen%d", recheckQueueCollectionNameBase, generation))
444422
}

internal/verifier/recheck_test.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
8585
}
8686

8787
func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifier *Verifier) []recheck.Doc {
88-
metaColl := verifier.getRecheckQueueCollection(verifier.generation)
88+
metaColl := verifier.getRecheckQueueCollection(1 + verifier.generation)
8989

9090
cursor, err := metaColl.Aggregate(
9191
ctx,
@@ -182,7 +182,7 @@ func (suite *IntegrationTestSuite) TestRecheckResumability_Mismatch() {
182182
verificationStatus, err := verifier.GetVerificationStatus(ctx)
183183
suite.Require().NoError(err)
184184

185-
recheckDocs := suite.fetchVerifierRechecks(ctx, verifier)
185+
recheckDocs := suite.fetchPendingVerifierRechecks(ctx, verifier)
186186

187187
if verificationStatus.FailedTasks != 0 && len(recheckDocs) == 2 {
188188
break
@@ -212,7 +212,7 @@ func (suite *IntegrationTestSuite) TestRecheckResumability_Mismatch() {
212212
"restarted verifier should immediately see mismatches",
213213
)
214214

215-
recheckDocs := suite.fetchVerifierRechecks(ctx, verifier2)
215+
recheckDocs := suite.fetchPendingVerifierRechecks(ctx, verifier2)
216216
suite.Require().Len(recheckDocs, 2, "expect # of rechecks: %+v", recheckDocs)
217217
}
218218

@@ -268,13 +268,10 @@ func (suite *IntegrationTestSuite) TestManyManyRechecks() {
268268
)
269269
suite.Require().NoError(err)
270270

271-
verifier.mux.Lock()
272-
defer verifier.mux.Unlock()
273-
274271
verifier.generation++
275272

276273
suite.T().Logf("Generating recheck tasks …")
277-
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
274+
err = verifier.GenerateRecheckTasks(ctx)
278275
suite.Require().NoError(err)
279276
}
280277

@@ -307,8 +304,7 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() {
307304
suite.ElementsMatch([]any{d1, d2, d3}, results)
308305

309306
verifier.generation++
310-
verifier.mux.Lock()
311-
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
307+
err = verifier.GenerateRecheckTasks(ctx)
312308
suite.Require().NoError(err)
313309
taskColl := suite.metaMongoClient.Database(verifier.metaDBName).Collection(verificationTasksCollection)
314310
cursor, err := taskColl.Find(ctx, bson.D{}, options.Find().SetProjection(bson.D{{"_id", 0}}))
@@ -367,8 +363,7 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() {
367363
suite.ElementsMatch([]any{d1, d2, d3}, results)
368364

369365
verifier.generation++
370-
verifier.mux.Lock()
371-
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
366+
err = verifier.GenerateRecheckTasks(ctx)
372367
suite.Require().NoError(err)
373368
taskColl := suite.metaMongoClient.Database(verifier.metaDBName).Collection(verificationTasksCollection)
374369
cursor, err := taskColl.Find(ctx, bson.D{}, options.Find().SetProjection(bson.D{{"_id", 0}}))
@@ -417,8 +412,7 @@ func (suite *IntegrationTestSuite) TestMultipleNamespaces() {
417412
suite.Require().NoError(err)
418413

419414
verifier.generation++
420-
verifier.mux.Lock()
421-
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
415+
err = verifier.GenerateRecheckTasks(ctx)
422416
suite.Require().NoError(err)
423417
taskColl := suite.metaMongoClient.Database(verifier.metaDBName).Collection(verificationTasksCollection)
424418
cursor, err := taskColl.Find(ctx, bson.D{}, options.Find().SetProjection(bson.D{{"_id", 0}}))
@@ -477,7 +471,7 @@ func (suite *IntegrationTestSuite) TestGenerationalClear() {
477471

478472
verifier.generation++
479473

480-
err = verifier.DropOldRecheckQueueWhileLocked(ctx)
474+
err = verifier.DropCurrentGenRecheckQueue(ctx)
481475
suite.Require().NoError(err)
482476

483477
// This never happens in real life but is needed for this test.

0 commit comments

Comments
 (0)