Skip to content

Commit f69ce63

Browse files
authored
REP-6785 Persist resume token after rechecks (#161)
For some time the verifier has had a race condition where it would, immediately after passing a batch of events to the recheck-enqueue thread, persist the change stream’s resume token. Thus, if the recheck-enqueue thread failed, the verifier could have restarted and skipped documents. PR #156 aggravated this by storing multiple batches of change events in the channel between the reader and recheck-enqueue threads. Now, if there’s a failure after persisting a resume token, there are very good odds that documents will be skipped. This changeset fixes that by moving the resume token’s persistence to the recheck-enqueue thread. Now each resume token is sent along with its batch to the recheck-enqueue thread, and only after that batch is persisted is its resume token persisted.
1 parent 1bae7c2 commit f69ce63

File tree

3 files changed

+177
-23
lines changed

3 files changed

+177
-23
lines changed

internal/util/clusterinfo.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package util
22

33
import (
4+
"cmp"
45
"context"
56

67
"github.com/10gen/migration-verifier/internal/logger"
@@ -23,6 +24,10 @@ const (
2324
TopologyReplset ClusterTopology = "replset"
2425
)
2526

27+
func CmpMinorVersions(a, b [2]int) int {
28+
return cmp.Or(cmp.Compare(a[0], b[0]), cmp.Compare(a[1], b[1]))
29+
}
30+
2631
func GetClusterInfo(ctx context.Context, logger *logger.Logger, client *mongo.Client) (ClusterInfo, error) {
2732
va, err := getVersionArray(ctx, client)
2833
if err != nil {

internal/verifier/change_stream.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ func (uee UnknownEventError) Error() string {
6060

6161
type changeEventBatch struct {
6262
events []ParsedEvent
63+
resumeToken bson.Raw
6364
clusterTime bson.Timestamp
6465
}
6566

@@ -127,6 +128,21 @@ func (verifier *Verifier) initializeChangeStreamReaders() {
127128
func (verifier *Verifier) RunChangeEventHandler(ctx context.Context, reader *ChangeStreamReader) error {
128129
var err error
129130

131+
var lastPersistedTime time.Time
132+
persistResumeTokenIfNeeded := func(ctx context.Context, token bson.Raw) {
133+
if time.Since(lastPersistedTime) >= minChangeStreamPersistInterval {
134+
persistErr := reader.persistChangeStreamResumeToken(ctx, token)
135+
if persistErr != nil {
136+
verifier.logger.Warn().
137+
Stringer("changeReader", reader).
138+
Err(persistErr).
139+
Msg("Failed to persist resume token. Because of this, if the verifier restarts, it will have to re-process already-handled change events. This error may be transient, but if it recurs, investigate.")
140+
} else {
141+
lastPersistedTime = time.Now()
142+
}
143+
}
144+
}
145+
130146
HandlerLoop:
131147
for err == nil {
132148
select {
@@ -156,6 +172,10 @@ HandlerLoop:
156172
verifier.HandleChangeStreamEvents(ctx, batch, reader.readerType),
157173
"failed to handle change stream events",
158174
)
175+
176+
if err == nil && batch.resumeToken != nil {
177+
persistResumeTokenIfNeeded(ctx, batch.resumeToken)
178+
}
159179
}
160180
}
161181

@@ -470,6 +490,8 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
470490
case csr.changeEventBatchChan <- changeEventBatch{
471491
events: changeEvents,
472492

493+
resumeToken: cs.ResumeToken(),
494+
473495
// NB: We know by now that OperationTime is non-nil.
474496
clusterTime: *sess.OperationTime(),
475497
}:
@@ -493,21 +515,6 @@ func (csr *ChangeStreamReader) iterateChangeStream(
493515
cs *mongo.ChangeStream,
494516
sess *mongo.Session,
495517
) error {
496-
var lastPersistedTime time.Time
497-
498-
persistResumeTokenIfNeeded := func() error {
499-
if time.Since(lastPersistedTime) <= minChangeStreamPersistInterval {
500-
return nil
501-
}
502-
503-
err := csr.persistChangeStreamResumeToken(ctx, cs)
504-
if err == nil {
505-
lastPersistedTime = time.Now()
506-
}
507-
508-
return err
509-
}
510-
511518
for {
512519
var err error
513520
var gotwritesOffTimestamp bool
@@ -571,10 +578,6 @@ func (csr *ChangeStreamReader) iterateChangeStream(
571578
default:
572579
err = csr.readAndHandleOneChangeEventBatch(ctx, ri, cs, sess)
573580

574-
if err == nil {
575-
err = persistResumeTokenIfNeeded()
576-
}
577-
578581
if err != nil {
579582
return err
580583
}
@@ -659,7 +662,7 @@ func (csr *ChangeStreamReader) createChangeStream(
659662
return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to open change stream")
660663
}
661664

662-
err = csr.persistChangeStreamResumeToken(ctx, changeStream)
665+
err = csr.persistChangeStreamResumeToken(ctx, changeStream.ResumeToken())
663666
if err != nil {
664667
return nil, nil, bson.Timestamp{}, err
665668
}
@@ -852,9 +855,7 @@ func (csr *ChangeStreamReader) resumeTokenDocID() string {
852855
}
853856
}
854857

855-
func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, cs *mongo.ChangeStream) error {
856-
token := cs.ResumeToken()
857-
858+
func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Context, token bson.Raw) error {
858859
coll := csr.getChangeStreamMetadataCollection()
859860
_, err := coll.ReplaceOne(
860861
ctx,

internal/verifier/change_stream_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package verifier
22

33
import (
4+
"bytes"
45
"context"
6+
"fmt"
57
"io"
68
"strings"
79
"time"
@@ -18,10 +20,12 @@ import (
1820
"github.com/pkg/errors"
1921
"github.com/rs/zerolog"
2022
"github.com/samber/lo"
23+
"github.com/stretchr/testify/assert"
2124
"github.com/stretchr/testify/require"
2225
"go.mongodb.org/mongo-driver/v2/bson"
2326
"go.mongodb.org/mongo-driver/v2/mongo"
2427
"go.mongodb.org/mongo-driver/v2/mongo/options"
28+
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
2529
)
2630

2731
func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() {
@@ -246,6 +250,150 @@ func (suite *IntegrationTestSuite) startSrcChangeStreamReaderAndHandler(ctx cont
246250
}()
247251
}
248252

253+
func (suite *IntegrationTestSuite) TestChangeStream_Resume_NoSkip() {
254+
ctx := suite.T().Context()
255+
256+
verifier1 := suite.BuildVerifier()
257+
258+
// Use of linearizable read concern below seems to freeze pre-4.4 servers.
259+
srcVersion := verifier1.srcClusterInfo.VersionArray
260+
if util.CmpMinorVersions([2]int(srcVersion), [2]int{4, 4}) == -1 {
261+
suite.T().Skipf("Source version (%v) is too old for this test.", srcVersion)
262+
}
263+
264+
srcDB := verifier1.srcClient.Database(suite.DBNameForTest())
265+
srcColl := srcDB.Collection("coll")
266+
267+
require.NoError(
268+
suite.T(),
269+
srcDB.CreateCollection(ctx, srcColl.Name()),
270+
)
271+
272+
v1Ctx, v1Cancel := contextplus.WithCancelCause(ctx)
273+
defer v1Cancel(ctx.Err())
274+
suite.startSrcChangeStreamReaderAndHandler(v1Ctx, verifier1)
275+
276+
changeStreamMetaColl := verifier1.srcChangeStreamReader.getChangeStreamMetadataCollection()
277+
278+
var originalResumeToken bson.Raw
279+
280+
assert.Eventually(
281+
suite.T(),
282+
func() bool {
283+
var err error
284+
originalResumeToken, err = changeStreamMetaColl.FindOne(ctx, bson.D{}).Raw()
285+
return err == nil
286+
},
287+
time.Minute,
288+
50*time.Millisecond,
289+
"should see a change stream resume token persisted",
290+
)
291+
292+
insertCtx, cancelInserts := contextplus.WithCancelCause(ctx)
293+
defer cancelInserts(ctx.Err())
294+
insertsDone := make(chan struct{})
295+
go func() {
296+
defer func() {
297+
close(insertsDone)
298+
}()
299+
300+
sess, err := verifier1.srcClient.StartSession(
301+
options.Session().SetCausalConsistency(true),
302+
)
303+
require.NoError(suite.T(), err)
304+
305+
sessCtx := mongo.NewSessionContext(insertCtx, sess)
306+
307+
docID := int32(1)
308+
for {
309+
_, err := srcColl.InsertOne(
310+
sessCtx,
311+
bson.D{{"_id", docID}},
312+
)
313+
314+
if err != nil {
315+
require.ErrorIs(suite.T(), err, context.Canceled)
316+
return
317+
}
318+
319+
docID++
320+
}
321+
}()
322+
323+
assert.Eventually(
324+
suite.T(),
325+
func() bool {
326+
rt, err := changeStreamMetaColl.FindOne(ctx, bson.D{}).Raw()
327+
require.NoError(suite.T(), err)
328+
329+
suite.T().Logf("found rt: %v\n", rt)
330+
331+
return !bytes.Equal(rt, originalResumeToken)
332+
},
333+
time.Minute,
334+
50*time.Millisecond,
335+
"should see a new change stream resume token persisted",
336+
)
337+
338+
v1Cancel(fmt.Errorf("killing verifier1"))
339+
340+
verifier2 := suite.BuildVerifier()
341+
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2)
342+
343+
cancelInserts(fmt.Errorf("verifier2 started"))
344+
<-insertsDone
345+
346+
lastIDRes := srcColl.Database().Collection(
347+
srcColl.Name(),
348+
options.Collection().SetReadConcern(readconcern.Linearizable()),
349+
).FindOne(
350+
ctx,
351+
bson.D{},
352+
options.FindOne().
353+
SetSort(bson.D{{"_id", -1}}),
354+
)
355+
require.NoError(suite.T(), lastIDRes.Err())
356+
357+
lastDocID := lo.Must(lo.Must(lastIDRes.Raw()).LookupErr("_id")).Int32()
358+
359+
assert.Eventually(
360+
suite.T(),
361+
func() bool {
362+
rechecks := suite.fetchVerifierRechecks(ctx, verifier2)
363+
364+
return lo.SomeBy(
365+
rechecks,
366+
func(cur bson.M) bool {
367+
id := cur["_id"].(bson.D)
368+
369+
for _, el := range id {
370+
if el.Key != "docID" {
371+
continue
372+
}
373+
374+
return el.Value.(int32) == lastDocID
375+
}
376+
377+
panic(fmt.Sprintf("no docID in _id: %+v", id))
378+
},
379+
)
380+
},
381+
time.Minute,
382+
100*time.Millisecond,
383+
"last-inserted doc shows as recheck",
384+
)
385+
386+
sess := lo.Must(verifier2.verificationDatabase().Client().StartSession())
387+
sctx := mongo.NewSessionContext(ctx, sess)
388+
389+
rechecks := suite.fetchVerifierRechecks(sctx, verifier2)
390+
if !assert.EqualValues(suite.T(), lastDocID, len(rechecks), "all source docs should be rechecked") {
391+
for _, recheck := range rechecks {
392+
suite.T().Logf("found recheck: %v", recheck)
393+
}
394+
}
395+
}
396+
249397
// TestChangeStreamResumability creates a verifier, starts its change stream,
250398
// terminates that verifier, updates the source cluster, starts a new
251399
// verifier with change stream, and confirms that things look as they should.

0 commit comments

Comments
 (0)