Skip to content

Commit 1497315

Browse files
committed
sql: improve observability of index merge timestamp
When building an index, the merge phase copies any new rows added since the index build started. A timestamp is used to set an upper bound for this scan. We suspect that the chosen timestamp may be omitting some rows. Our theory is that this happens if new rows were inserted before the merge phase but were committed with a timestamp after the merge. This discrepancy could be due to clock skew between nodes. However, attempts to reproduce this issue via unit tests were unsuccessful, so this remains a hypothesis. To address this, this change includes: - An adjustment to the merge timestamp to account for potential clock skew. - Additional logging of the merge timestamp chosen and the timestamps observed on each node when draining leased descriptors. These logs will help track the timestamp across nodes during the merge phase. - Treat index validation errors for non-unique indexes as assertion failures. We previously treated as a duplicat key error, which was very confusing because duplicates are allowed for non-unique indexes. Epic: none Release note: none Closes: #142751
1 parent 42b4638 commit 1497315

File tree

6 files changed

+113
-9
lines changed

6 files changed

+113
-9
lines changed

pkg/sql/backfill.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,6 +1929,15 @@ func ValidateForwardIndexes(
19291929
idx.GetID())
19301930
indexName = idx.GetName()
19311931
}
1932+
if !idx.IsUnique() {
1933+
// For non-unique indexes, the table row count must match the index
1934+
// key count. Unlike unique indexes, we don't filter out any rows,
1935+
// so every row must have a corresponding entry in the index. A
1936+
// mismatch indicates an assertion failure.
1937+
return errors.AssertionFailedf(
1938+
"validation of non-unique index %s failed: expected %d rows, found %d",
1939+
idx.GetName(), errors.Safe(expectedCount), errors.Safe(idxLen))
1940+
}
19321941
// TODO(vivek): find the offending row and include it in the error.
19331942
return pgerror.WithConstraintName(pgerror.Newf(pgcode.UniqueViolation,
19341943
"duplicate key value violates unique constraint %q",
@@ -2989,8 +2998,15 @@ func indexTruncateInTxn(
29892998
// part of a restore, then timestamp will be too old and the job will fail. On
29902999
// the next resume, a mergeTimestamp newer than the GC time will be picked and
29913000
// the job can continue.
2992-
func getMergeTimestamp(clock *hlc.Clock) hlc.Timestamp {
2993-
return clock.Now()
3001+
func getMergeTimestamp(ctx context.Context, clock *hlc.Clock) hlc.Timestamp {
3002+
// Use the current timestamp plus the maximum allowed offset to account for
3003+
// potential clock skew across nodes. The chosen timestamp must be greater
3004+
// than all commit timestamps used so far. This may result in seeing rows
3005+
// that are already present in the index being merged, but that’s fine as
3006+
// they will be treated as no-ops.
3007+
ts := clock.Now().AddDuration(clock.MaxOffset())
3008+
log.Infof(ctx, "merging all keys in temporary index before time %v", ts)
3009+
return ts
29943010
}
29953011

29963012
func (sc *SchemaChanger) distIndexMerge(
@@ -3001,8 +3017,7 @@ func (sc *SchemaChanger) distIndexMerge(
30013017
fractionScaler *multiStageFractionScaler,
30023018
) error {
30033019

3004-
mergeTimestamp := getMergeTimestamp(sc.clock)
3005-
log.Infof(ctx, "merging all keys in temporary index before time %v", mergeTimestamp)
3020+
mergeTimestamp := getMergeTimestamp(ctx, sc.clock)
30063021

30073022
// Gather the initial resume spans for the merge process.
30083023
progress, err := extractMergeProgress(sc.job, tableDesc, addedIndexes, temporaryIndexes)

pkg/sql/catalog/lease/lease.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ func (m *Manager) WaitForOneVersion(
475475
return nil, err
476476
}
477477
if detail.count == 0 {
478+
log.Infof(ctx, "all leases have expired at %v: desc=%v", now, descs)
478479
break
479480
}
480481
if detail.count != lastCount {

pkg/sql/mvcc_backfiller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (im *IndexBackfillerMergePlanner) MergeIndexes(
9292
)
9393
return tracker.SetMergeProgress(ctx, progress)
9494
}
95-
mergeTimeStamp := getMergeTimestamp(im.execCfg.Clock)
95+
mergeTimeStamp := getMergeTimestamp(ctx, im.execCfg.Clock)
9696
protectedTimestampCleaner := im.execCfg.ProtectedTimestampManager.TryToProtectBeforeGC(ctx, job, descriptor, mergeTimeStamp)
9797
defer func() {
9898
cleanupError := protectedTimestampCleaner(ctx)

pkg/sql/schemachanger/scexec/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ go_test(
109109
"@com_github_stretchr_testify//require",
110110
"//pkg/sql/sem/idxtype",
111111
"//pkg/sql/catalog/lease",
112+
"//pkg/sql/schemachanger/scplan",
113+
"//pkg/testutils/testcluster",
114+
"//pkg/util/ctxgroup",
112115
],
113116
)
114117

pkg/sql/schemachanger/scexec/exec_backfill_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ package scexec_test
77

88
import (
99
"context"
10+
"fmt"
1011
"math/rand"
12+
"sync/atomic"
1113
"testing"
1214

1315
"github.com/cockroachdb/cockroach/pkg/base"
16+
"github.com/cockroachdb/cockroach/pkg/server"
1417
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1518
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
1619
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
@@ -19,9 +22,12 @@ import (
1922
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps/sctestdeps"
2023
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
2124
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scop"
25+
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scplan"
2226
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
2327
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2428
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
29+
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
30+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
2531
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2632
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
2733
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -333,3 +339,84 @@ func TestExecBackfiller(t *testing.T) {
333339
}
334340

335341
}
342+
343+
// TestMergeTimestampSkew will ensure we do not miss rows during the merge phase
344+
// if the clocks are skewed.
345+
func TestMergeTimestampSkew(t *testing.T) {
346+
defer leaktest.AfterTest(t)()
347+
defer log.Scope(t).Close(t)
348+
349+
ctx := context.Background()
350+
var hookEnabled atomic.Bool
351+
var ingestFn func() error
352+
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
353+
ServerArgs: base.TestServerArgs{
354+
Knobs: base.TestingKnobs{
355+
SQLDeclarativeSchemaChanger: &scexec.TestingKnobs{
356+
BeforeStage: func(p scplan.Plan, stageIdx int) error {
357+
if !hookEnabled.Load() {
358+
return nil
359+
}
360+
return ingestFn()
361+
},
362+
},
363+
},
364+
},
365+
})
366+
defer tc.Stopper().Stop(ctx)
367+
db1 := tc.ApplicationLayer(0).SQLConn(t, serverutils.DBName("defaultdb"))
368+
r1 := sqlutils.MakeSQLRunner(db1)
369+
370+
// Add a second node so that we can use a separate clock for it.
371+
manualClock2 := hlc.NewHybridManualClock()
372+
tc.AddAndStartServer(t, base.TestServerArgs{
373+
Knobs: base.TestingKnobs{
374+
Server: &server.TestingKnobs{
375+
WallClock: manualClock2,
376+
},
377+
},
378+
})
379+
db2 := tc.ApplicationLayer(1).SQLConn(t, serverutils.DBName("defaultdb"))
380+
r2 := sqlutils.MakeSQLRunner(db2)
381+
382+
t.Run("create_non_unique_index", func(t *testing.T) {
383+
r1.ExecMultiple(t,
384+
`ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1, constraints = '[-node2]'`,
385+
"CREATE TABLE t_idx(n int)",
386+
"INSERT INTO t_idx(n) SELECT * FROM generate_series(1, 100)",
387+
)
388+
additionalInserts := 0
389+
390+
// Each stage we will insert a new row from a different node. That node will
391+
// use a skewed clock.
392+
ingestFn = func() error {
393+
manualClock2.Increment(10000000)
394+
keyVal := 1000 + additionalInserts
395+
additionalInserts++
396+
r2.Exec(t, fmt.Sprintf("INSERT INTO t_idx(n) VALUES (%d)", keyVal))
397+
return nil
398+
}
399+
hookEnabled.Store(true)
400+
defer hookEnabled.Store(false)
401+
402+
grp := ctxgroup.WithContext(ctx)
403+
grp.GoCtx(func(ctx context.Context) error {
404+
r1.Exec(t, "CREATE INDEX i1 ON t_idx (n)")
405+
hookEnabled.Store(false)
406+
return nil
407+
})
408+
require.NoError(t, grp.Wait())
409+
410+
// Compare row count with between new index and the primary key
411+
for _, q := range []string{
412+
`SELECT count(1) FROM t_idx@i1`,
413+
`SELECT count(1) FROM t_idx@t_idx_pkey`,
414+
} {
415+
var rowCount int
416+
res := r1.QueryRow(t, q)
417+
res.Scan(&rowCount)
418+
expectedRowCount := 100 + additionalInserts
419+
require.Equal(t, expectedRowCount, rowCount, "post create index row count mismatch in query %q", q)
420+
}
421+
})
422+
}

pkg/sql/schemachanger/scexec/exec_validation.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ import (
1616
"github.com/cockroachdb/errors"
1717
)
1818

19-
func executeValidateUniqueIndex(
20-
ctx context.Context, deps Dependencies, op *scop.ValidateIndex,
21-
) error {
19+
func executeValidateIndex(ctx context.Context, deps Dependencies, op *scop.ValidateIndex) error {
2220
descs, err := deps.Catalog().MustReadImmutableDescriptors(ctx, op.TableID)
2321
if err != nil {
2422
return err
@@ -118,7 +116,7 @@ func executeValidationOps(ctx context.Context, deps Dependencies, ops []scop.Op)
118116
func executeValidationOp(ctx context.Context, deps Dependencies, op scop.Op) (err error) {
119117
switch op := op.(type) {
120118
case *scop.ValidateIndex:
121-
if err = executeValidateUniqueIndex(ctx, deps, op); err != nil {
119+
if err = executeValidateIndex(ctx, deps, op); err != nil {
122120
if !scerrors.HasSchemaChangerUserError(err) {
123121
return errors.Wrapf(err, "%T: %v", op, op)
124122
}

0 commit comments

Comments
 (0)