Skip to content
Merged
12 changes: 6 additions & 6 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
verifier2 := suite.BuildVerifier()

suite.Require().Empty(
suite.fetchVerifierRechecks(ctx, verifier2),
suite.fetchPendingVerifierRechecks(ctx, verifier2),
"no rechecks should be enqueued before starting change stream",
)

Expand All @@ -298,7 +298,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
require.Eventually(
suite.T(),
func() bool {
recheckDocs = suite.fetchVerifierRechecks(ctx, verifier2)
recheckDocs = suite.fetchPendingVerifierRechecks(ctx, verifier2)

return len(recheckDocs) > 0
},
Expand Down Expand Up @@ -342,10 +342,10 @@ func (suite *IntegrationTestSuite) getClusterTime(ctx context.Context, client *m
return newTime
}

func (suite *IntegrationTestSuite) fetchVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M {
func (suite *IntegrationTestSuite) fetchPendingVerifierRechecks(ctx context.Context, verifier *Verifier) []bson.M {
recheckDocs := []bson.M{}

recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
recheckColl := verifier.getRecheckQueueCollection(1 + verifier.generation)
cursor, err := recheckColl.Aggregate(
ctx,
mongo.Pipeline{
Expand Down Expand Up @@ -584,7 +584,7 @@ func (suite *IntegrationTestSuite) TestWithChangeEventsBatching() {
require.Eventually(
suite.T(),
func() bool {
rechecks = suite.fetchVerifierRechecks(ctx, verifier)
rechecks = suite.fetchPendingVerifierRechecks(ctx, verifier)
return len(rechecks) == 3
},
time.Minute,
Expand Down Expand Up @@ -921,7 +921,7 @@ func (suite *IntegrationTestSuite) TestRecheckDocsWithDstChangeEvents() {
require.Eventually(
suite.T(),
func() bool {
recheckColl := verifier.getRecheckQueueCollection(verifier.generation)
recheckColl := verifier.getRecheckQueueCollection(1 + verifier.generation)
cursor, err := recheckColl.Find(ctx, bson.D{})
if errors.Is(err, mongo.ErrNoDocuments) {
return false
Expand Down
13 changes: 9 additions & 4 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,30 +390,35 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
verifier.mux.Lock()
verifier.lastGeneration = true
}

// Increment the in-memory generation so that the change streams will
// mark rechecks for the next generation. For example, if we just
// finished generation 2, the change streams need to mark generation 3
// on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will
// derive from rechecks enqueued during generation 2.
verifier.generation++
verifier.phase = Recheck
verifier.mux.Unlock()

// Generation of recheck tasks can partial-fail. The following will
// cause a full redo in that case, which is inefficient but simple.
// Such failures seem unlikely anyhow.
err = retry.New().WithCallback(
func(ctx context.Context, _ *retry.FuncInfo) error {
return verifier.GenerateRecheckTasksWhileLocked(ctx)
return verifier.GenerateRecheckTasks(ctx)
},
"generating recheck tasks",
).Run(ctx, verifier.logger)
if err != nil {
verifier.mux.Unlock()
return err
}

err = verifier.DropOldRecheckQueueWhileLocked(ctx)
err = verifier.DropCurrentGenRecheckQueue(ctx)
if err != nil {
verifier.logger.Warn().
Err(err).
Msg("Failed to clear out old recheck docs. (This is probably unimportant.)")
}
verifier.mux.Unlock()
}
}

Expand Down
4 changes: 3 additions & 1 deletion internal/verifier/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ package verifier
// Metadata version history:
// 1: Defined metadata version.
// 2: Split failed-task discrepancies into separate collection.
// 3: Enqueued rechecks now reference the generation in which they’ll be
// rechecked rather than the generation during which they were enqueued.

const verifierMetadataVersion = 2
const verifierMetadataVersion = 3
20 changes: 8 additions & 12 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck()

verifier.generation++

func() {
verifier.mux.Lock()
defer func() { verifier.mux.Unlock() }()
suite.Require().NoError(verifier.GenerateRecheckTasksWhileLocked(ctx))
}()
suite.Require().NoError(verifier.GenerateRecheckTasks(ctx))

stats, err := verifier.GetPersistedNamespaceStatistics(ctx)
suite.Require().NoError(err)
Expand Down Expand Up @@ -958,6 +954,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
[]int32{100},
)
suite.Require().NoError(err)

event := ParsedEvent{
DocID: mbson.ToRawValue(int32(55)),
OpType: "delete",
Expand All @@ -976,6 +973,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {

err = verifier.HandleChangeStreamEvents(ctx, batch, src)
suite.Require().NoError(err)

event.OpType = "insert"
err = verifier.HandleChangeStreamEvents(ctx, batch, src)
suite.Require().NoError(err)
Expand All @@ -995,13 +993,9 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
)

verifier.generation++
func() {
verifier.mux.Lock()
defer verifier.mux.Unlock()

err = verifier.GenerateRecheckTasksWhileLocked(ctx)
suite.Require().NoError(err)
}()
err = verifier.GenerateRecheckTasks(ctx)
suite.Require().NoError(err)

var doc bson.M
cur, err := verifier.verificationTaskCollection().Find(ctx, bson.M{"generation": 1})
Expand Down Expand Up @@ -2323,10 +2317,12 @@ func (suite *IntegrationTestSuite) TestVerifierWithFilter() {
}

func (suite *IntegrationTestSuite) awaitEnqueueOfRechecks(verifier *Verifier, minDocs int) {
suite.T().Helper()

var lastNonzeroRechecksCount int

suite.Eventually(func() bool {
cursor, err := verifier.getRecheckQueueCollection(verifier.generation).
cursor, err := verifier.getRecheckQueueCollection(1+verifier.generation).
Find(suite.Context(), bson.D{})
var rechecks []bson.D
suite.Require().NoError(err)
Expand Down
69 changes: 24 additions & 45 deletions internal/verifier/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (verifier *Verifier) insertRecheckDocs(

generation, _ := verifier.getGenerationWhileLocked()

// We enqueue for the generation after the current one.
generation++

eg, groupCtx := contextplus.ErrGroup(ctx)
eg.SetLimit(100)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment to explain the purpose of the limit?


// MongoDB’s Go driver starts failing requests if we try to exceed
// its connection pool’s size. To avoid that, we limit our concurrency.
Expand Down Expand Up @@ -233,66 +237,37 @@ func buildRequestBSON(collName string, rechecks []bson.Raw) bson.Raw {
return requestBSON
}

// DropOldRecheckQueueWhileLocked deletes the previous generation’s recheck
// documents from the verifier’s metadata.
//
// The verifier **MUST** be locked when this function is called (or panic).
func (verifier *Verifier) DropOldRecheckQueueWhileLocked(ctx context.Context) error {
prevGeneration := verifier.getPreviousGenerationWhileLocked()
// DropCurrentGenRecheckQueue deletes the current generation’s recheck
// documents from the verifier’s metadata. This should only be called
// after new recheck tasks have been created from those rechecks.
func (verifier *Verifier) DropCurrentGenRecheckQueue(ctx context.Context) error {
generation := verifier.generation

verifier.logger.Debug().
Int("previousGeneration", prevGeneration).
Msg("Deleting previous generation's enqueued rechecks.")
Int("generation", generation).
Msg("Deleting current generation's enqueued rechecks.")

genCollection := verifier.getRecheckQueueCollection(prevGeneration)
genCollection := verifier.getRecheckQueueCollection(generation)

return retry.New().WithCallback(
func(ctx context.Context, i *retry.FuncInfo) error {
return genCollection.Drop(ctx)
},
"deleting generation %d's enqueued rechecks",
prevGeneration,
generation,
).Run(ctx, verifier.logger)
}

func (verifier *Verifier) getPreviousGenerationWhileLocked() int {
generation, _ := verifier.getGenerationWhileLocked()
if generation < 1 {
panic("This function is forbidden before generation 1!")
}

return generation - 1
}

// GenerateRecheckTasksWhileLocked fetches the previous generation’s recheck
// documents from the verifier’s metadata and creates current-generation
// document-verification tasks from them.
// GenerateRecheckTasks fetches the rechecks enqueued for the current generation
// from the verifier’s metadata and creates document-verification tasks from
// them.
//
// Note that this function DOES NOT retry on failure, so callers should wrap
// calls to this function in a retryer.
//
// The verifier **MUST** be locked when this function is called (or panic).
func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) error {
prevGeneration := verifier.getPreviousGenerationWhileLocked()
func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
generation, _ := verifier.getGeneration()

verifier.logger.Debug().
Int("priorGeneration", prevGeneration).
Msgf("Counting prior generation’s enqueued rechecks.")

recheckColl := verifier.getRecheckQueueCollection(prevGeneration)

rechecksCount, err := recheckColl.CountDocuments(ctx, bson.D{})
if err != nil {
return errors.Wrapf(err,
"failed to count generation %d’s rechecks",
prevGeneration,
)
}

verifier.logger.Debug().
Int("priorGeneration", prevGeneration).
Int("rechecksCount", int(rechecksCount)).
Msgf("Creating recheck tasks from prior generation’s enqueued rechecks.")
recheckColl := verifier.getRecheckQueueCollection(generation)

startTime := time.Now()

Expand Down Expand Up @@ -428,7 +403,7 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e

if err == nil && totalDocs > 0 {
verifier.logger.Info().
Int("generation", 1+prevGeneration).
Int("generation", generation).
Int64("totalDocs", int64(totalDocs)).
Str("totalData", reportutils.FmtBytes(totalRecheckData)).
Stringer("timeElapsed", time.Since(startTime)).
Expand All @@ -439,6 +414,10 @@ func (verifier *Verifier) GenerateRecheckTasksWhileLocked(ctx context.Context) e
}

func (v *Verifier) getRecheckQueueCollection(generation int) *mongo.Collection {
if generation == 0 {
panic("Recheck for generation 0 is nonsense!")
}

return v.verificationDatabase().
Collection(fmt.Sprintf("%s_gen%d", recheckQueueCollectionNameBase, generation))
}
22 changes: 8 additions & 14 deletions internal/verifier/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
}

func (suite *IntegrationTestSuite) fetchRecheckDocs(ctx context.Context, verifier *Verifier) []recheck.Doc {
metaColl := verifier.getRecheckQueueCollection(verifier.generation)
metaColl := verifier.getRecheckQueueCollection(1 + verifier.generation)

cursor, err := metaColl.Aggregate(
ctx,
Expand Down Expand Up @@ -182,7 +182,7 @@ func (suite *IntegrationTestSuite) TestRecheckResumability_Mismatch() {
verificationStatus, err := verifier.GetVerificationStatus(ctx)
suite.Require().NoError(err)

recheckDocs := suite.fetchVerifierRechecks(ctx, verifier)
recheckDocs := suite.fetchPendingVerifierRechecks(ctx, verifier)

if verificationStatus.FailedTasks != 0 && len(recheckDocs) == 2 {
break
Expand Down Expand Up @@ -212,7 +212,7 @@ func (suite *IntegrationTestSuite) TestRecheckResumability_Mismatch() {
"restarted verifier should immediately see mismatches",
)

recheckDocs := suite.fetchVerifierRechecks(ctx, verifier2)
recheckDocs := suite.fetchPendingVerifierRechecks(ctx, verifier2)
suite.Require().Len(recheckDocs, 2, "expect # of rechecks: %+v", recheckDocs)
}

Expand Down Expand Up @@ -268,13 +268,10 @@ func (suite *IntegrationTestSuite) TestManyManyRechecks() {
)
suite.Require().NoError(err)

verifier.mux.Lock()
defer verifier.mux.Unlock()

verifier.generation++

suite.T().Logf("Generating recheck tasks …")
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
err = verifier.GenerateRecheckTasks(ctx)
suite.Require().NoError(err)
}

Expand Down Expand Up @@ -307,8 +304,7 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() {
suite.ElementsMatch([]any{d1, d2, d3}, results)

verifier.generation++
verifier.mux.Lock()
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
err = verifier.GenerateRecheckTasks(ctx)
suite.Require().NoError(err)
taskColl := suite.metaMongoClient.Database(verifier.metaDBName).Collection(verificationTasksCollection)
cursor, err := taskColl.Find(ctx, bson.D{}, options.Find().SetProjection(bson.D{{"_id", 0}}))
Expand Down Expand Up @@ -367,8 +363,7 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() {
suite.ElementsMatch([]any{d1, d2, d3}, results)

verifier.generation++
verifier.mux.Lock()
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
err = verifier.GenerateRecheckTasks(ctx)
suite.Require().NoError(err)
taskColl := suite.metaMongoClient.Database(verifier.metaDBName).Collection(verificationTasksCollection)
cursor, err := taskColl.Find(ctx, bson.D{}, options.Find().SetProjection(bson.D{{"_id", 0}}))
Expand Down Expand Up @@ -417,8 +412,7 @@ func (suite *IntegrationTestSuite) TestMultipleNamespaces() {
suite.Require().NoError(err)

verifier.generation++
verifier.mux.Lock()
err = verifier.GenerateRecheckTasksWhileLocked(ctx)
err = verifier.GenerateRecheckTasks(ctx)
suite.Require().NoError(err)
taskColl := suite.metaMongoClient.Database(verifier.metaDBName).Collection(verificationTasksCollection)
cursor, err := taskColl.Find(ctx, bson.D{}, options.Find().SetProjection(bson.D{{"_id", 0}}))
Expand Down Expand Up @@ -477,7 +471,7 @@ func (suite *IntegrationTestSuite) TestGenerationalClear() {

verifier.generation++

err = verifier.DropOldRecheckQueueWhileLocked(ctx)
err = verifier.DropCurrentGenRecheckQueue(ctx)
suite.Require().NoError(err)

// This never happens in real life but is needed for this test.
Expand Down