Skip to content

Commit 7fb8d25

Browse files
craig[bot]wenyihu6msbutlerrafissjeffswenson
committed
155033: asim: use kvserver.LoadBasedRebalancingObjective r=wenyihu6 a=wenyihu6 Previously, LBRebalancingObjective was part of the simulation settings, using QPS as the load rebalance objective. However, the cluster setting kvserver.LoadBasedRebalancingObjective defaults to CPU rebalancing instead. This commit updates asim to rely on the cluster setting, adopting the default cluster setting configuration. Epic: CRDB-55052 Release note: none Resolves: #154512 156594: backup: test adding/dropping database in cluster backup and restore r=jeffswenson a=msbutler Epic: None Release note: none 156876: rttanalysis: deflake BenchmarkJobs r=rafiss a=rafiss The benchmark started failing since the code doesn't handle a nil table descriptor. The fix is to create a dummy descriptor in the job payload details. fixes #151302 Release note: None 156935: logical: enable crud writer by default r=jeffswenson a=jeffswenson This change enables the crud writer by default for IMMEDIATE and VALIDATED mode LDR jobs. Release note: LDR now supports partial indexes and is tolerant of mismatched column ids in the source and destination tables. Epic: CRDB-51533 156942: server: make mma Allocator calls more visible r=wenyihu6 a=tbg They were kind of hidden before, and it has been tripping me up every now and then for a while. The problem was that NewAllocatorState returns a private struct directly, so a bunch of the calls later in the method do not show up when you look at the call hierarchy for the Allocator interface. This is now fixed. Epic: CRDB-55052. 156943: workflow: backport stale action should mark PRs stale after 14 days r=RaduBerinde a=rail Previously, the backport stale action marked PRs as stale after 21 days of inactivity. This change updates the configuration to mark PRs as stale after 14 days of inactivity, ensuring that stale backport PRs are identified and addressed more promptly. Epic: none Release note: none Co-authored-by: wenyihu6 <wenyi@cockroachlabs.com> Co-authored-by: Michael Butler <butler@cockroachlabs.com> Co-authored-by: Rafi Shamim <rafi@cockroachlabs.com> Co-authored-by: Jeff Swenson <jeffswenson@betterthannull.com> Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com> Co-authored-by: Rail Aliiev <rail@iqchoice.com>
7 parents 5b5000c + 10ce3e9 + 3d28310 + 60804c6 + 0d92aa9 + d2cfdda + 67f8eaa commit 7fb8d25

24 files changed

+218
-122
lines changed

.github/workflows/backport-stale.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ name: Mark stale backport requests
22

33
on:
44
schedule:
5+
# Run at 10am UTC daily, except weekends
56
- cron: "0 10 * * 1-4"
67
workflow_dispatch:
78

@@ -12,17 +13,17 @@ jobs:
1213
issues: write
1314
pull-requests: write
1415
steps:
15-
- uses: actions/stale@v3
16+
- uses: actions/stale@v10
1617
with:
1718
operations-per-run: 1000
1819
repo-token: ${{ secrets.GITHUB_TOKEN }}
19-
stale-issue-message: 'Blah'
20-
stale-pr-message: 'Reminder: it has been 3 weeks please merge or close your backport!'
20+
stale-issue-message: 'Ignored'
21+
stale-pr-message: 'Reminder: it has been 2 weeks please merge or close your backport!'
2122
stale-issue-label: 'no-backport-issue-activity'
2223
stale-pr-label: 'no-backport-pr-activity'
2324
close-issue-label: 'X-stale'
2425
close-pr-label: 'X-stale'
25-
days-before-pr-stale: 21
26+
days-before-pr-stale: 14
2627
# Disable this for issues, by setting a very high bar
2728
days-before-issue-stale: 99999
2829
days-before-close: 99999

pkg/backup/backup_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11198,3 +11198,94 @@ func TestBackupRestoreFunctionDependenciesRevisionHistory(t *testing.T) {
1119811198

1119911199
checkFunctions("test6")
1120011200
}
11201+
11202+
func TestBackupRestoreDatabaseRevisionHistory(t *testing.T) {
11203+
defer leaktest.AfterTest(t)()
11204+
defer log.Scope(t).Close(t)
11205+
11206+
dataDir, dirCleanupFunc := testutils.TempDir(t)
11207+
defer dirCleanupFunc()
11208+
11209+
_, sqlDB, cleanupFn := backupRestoreTestSetupEmpty(t, singleNode, dataDir, InitManualReplication, base.TestClusterArgs{})
11210+
defer cleanupFn()
11211+
11212+
// Helper function to check if a database exists.
11213+
checkDatabase := func(dbName string, shouldExist bool) {
11214+
var count int
11215+
sqlDB.QueryRow(t, `SELECT count(*) FROM system.namespace WHERE name = $1 AND "parentID" = 0`, dbName).Scan(&count)
11216+
if shouldExist {
11217+
if count != 1 {
11218+
t.Fatalf("expected database %s to exist, but it doesn't", dbName)
11219+
}
11220+
} else {
11221+
if count != 0 {
11222+
t.Fatalf("expected database %s to not exist, but it does", dbName)
11223+
}
11224+
}
11225+
}
11226+
11227+
sqlDB.Exec(t, `CREATE DATABASE test_db`)
11228+
11229+
// T1: Full cluster backup with revision history.
11230+
backupPath := "nodelocal://1/database_revision_backup"
11231+
sqlDB.Exec(t, `BACKUP INTO $1 WITH revision_history`, backupPath)
11232+
11233+
// T2: Capture timestamp after backup (database exists).
11234+
var t2 string
11235+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&t2)
11236+
11237+
sqlDB.Exec(t, `DROP DATABASE test_db`)
11238+
11239+
checkDatabase("test_db", false)
11240+
11241+
// T3: Capture timestamp after dropping database.
11242+
var t3 string
11243+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&t3)
11244+
11245+
sqlDB.Exec(t, `CREATE DATABASE ephemeral`)
11246+
11247+
// T4: Capture timestamp the has the ephemeral database.
11248+
var t4 string
11249+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&t4)
11250+
11251+
sqlDB.Exec(t, `DROP DATABASE ephemeral`)
11252+
11253+
// T5: Capture timestamp after dropping ephemeral database.
11254+
var t5 string
11255+
sqlDB.QueryRow(t, `SELECT cluster_logical_timestamp()`).Scan(&t5)
11256+
11257+
sqlDB.Exec(t, `BACKUP INTO LATEST IN $1 WITH revision_history`, backupPath)
11258+
11259+
// Choose one of the 4 restores to run, to reduce test runtime:
11260+
i := rand.Intn(4)
11261+
11262+
switch i {
11263+
case 0:
11264+
// Restore AOST t2 -> expect database to exist.
11265+
restoreQuery := fmt.Sprintf(
11266+
"RESTORE FROM LATEST IN $1 AS OF SYSTEM TIME %s", t2)
11267+
sqlDB.Exec(t, restoreQuery, backupPath)
11268+
checkDatabase("test_db", true)
11269+
11270+
sqlDB.Exec(t, `DROP DATABASE test_db`)
11271+
case 1:
11272+
// Restore AOST t3 -> expect database to not exist.
11273+
restoreQuery := fmt.Sprintf(
11274+
"RESTORE FROM LATEST IN $1 AS OF SYSTEM TIME %s", t3)
11275+
sqlDB.Exec(t, restoreQuery, backupPath)
11276+
checkDatabase("test_db", false)
11277+
case 2:
11278+
// Restore AOST t4 -> ephemeral db exists.
11279+
restoreQuery := fmt.Sprintf(
11280+
"RESTORE FROM LATEST IN $1 AS OF SYSTEM TIME %s", t4)
11281+
sqlDB.Exec(t, restoreQuery, backupPath)
11282+
checkDatabase("ephemeral", true)
11283+
11284+
sqlDB.Exec(t, `DROP DATABASE ephemeral`)
11285+
case 3:
11286+
restoreQuery := fmt.Sprintf(
11287+
"RESTORE FROM LATEST IN $1 AS OF SYSTEM TIME %s", t5)
11288+
sqlDB.Exec(t, restoreQuery, backupPath)
11289+
checkDatabase("ephemeral", false)
11290+
}
11291+
}

pkg/bench/rttanalysis/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ go_test(
6565
"//pkg/security/username",
6666
"//pkg/server",
6767
"//pkg/server/serverpb",
68+
"//pkg/sql/catalog/descpb",
6869
"//pkg/testutils/pgurlutils",
6970
"//pkg/testutils/serverutils",
7071
"//pkg/testutils/skip",

pkg/bench/rttanalysis/jobs_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,27 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/security/username"
1515
"github.com/cockroachdb/cockroach/pkg/server"
1616
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
17+
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1718
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
1819
)
1920

2021
func BenchmarkJobs(b *testing.B) { reg.Run(b) }
2122
func init() {
23+
// Create a minimal table descriptor for the import job.
24+
tableDesc := &descpb.TableDescriptor{
25+
ID: 100,
26+
ParentID: 1,
27+
Name: "benchmark_table",
28+
FormatVersion: descpb.InterleavedFormatVersion,
29+
Version: 1,
30+
}
31+
2232
payloadBytes, err := protoutil.Marshal(&jobspb.Payload{
23-
Details: jobspb.WrapPayloadDetails(jobspb.ImportDetails{}),
33+
Details: jobspb.WrapPayloadDetails(jobspb.ImportDetails{
34+
Table: jobspb.ImportDetails_Table{
35+
Desc: tableDesc,
36+
},
37+
}),
2438
UsernameProto: username.RootUserName().EncodeProto(),
2539
})
2640
if err != nil {

pkg/crosscluster/logical/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ go_test(
150150
"//pkg/ccl/changefeedccl/changefeedbase",
151151
"//pkg/ccl/storageccl",
152152
"//pkg/clusterversion",
153+
"//pkg/crosscluster",
153154
"//pkg/crosscluster/replicationtestutils",
154155
"//pkg/crosscluster/streamclient",
155156
"//pkg/crosscluster/streamclient/randclient",

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/base"
2323
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
2424
"github.com/cockroachdb/cockroach/pkg/clusterversion"
25+
"github.com/cockroachdb/cockroach/pkg/crosscluster"
2526
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
2627
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
2728
_ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient"
@@ -895,9 +896,6 @@ func TestRandomTables(t *testing.T) {
895896
tc, s, runnerA, runnerB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
896897
defer tc.Stopper().Stop(ctx)
897898

898-
// TODO(#148303): Remove this once the crud writer supports tables with array primary keys.
899-
runnerA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'legacy-kv'")
900-
901899
sqlA := s.SQLConn(t, serverutils.DBName("a"))
902900

903901
var tableName, streamStartStmt string
@@ -2897,8 +2895,14 @@ func TestGetWriterType(t *testing.T) {
28972895

28982896
t.Run("validated-mode", func(t *testing.T) {
28992897
st := cluster.MakeTestingClusterSettings()
2898+
29002899
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Validated, st)
29012900
require.NoError(t, err)
2901+
require.Equal(t, sqlclustersettings.LDRWriterTypeCRUD, wt)
2902+
2903+
crosscluster.LogicalReplicationUDFWriterEnabled.Override(ctx, &st.SV, true)
2904+
wt, err = getWriterType(ctx, jobspb.LogicalReplicationDetails_Validated, st)
2905+
require.NoError(t, err)
29022906
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
29032907
})
29042908

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,11 +741,17 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
741741
func getWriterType(
742742
ctx context.Context, mode jobspb.LogicalReplicationDetails_ApplyMode, settings *cluster.Settings,
743743
) (sqlclustersettings.LDRWriterType, error) {
744+
// TODO(jeffswenson): delete the kv and legacy sql ldr writers
744745
switch mode {
745746
case jobspb.LogicalReplicationDetails_Immediate:
746747
return sqlclustersettings.LDRWriterType(sqlclustersettings.LDRImmediateModeWriter.Get(&settings.SV)), nil
747748
case jobspb.LogicalReplicationDetails_Validated:
748-
return sqlclustersettings.LDRWriterTypeSQL, nil
749+
if crosscluster.LogicalReplicationUDFWriterEnabled.Get(&settings.SV) {
750+
// If the UDF writer is enabled, fall back to the legacy SQL writer for
751+
// validated mode.
752+
return sqlclustersettings.LDRWriterTypeSQL, nil
753+
}
754+
return sqlclustersettings.LDRWriterTypeCRUD, nil
749755
default:
750756
return "", errors.Newf("unknown logical replication writer type: %s", mode)
751757
}

pkg/kv/kvserver/asim/config/settings.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@ const (
2929
defaultSplitStatRetention = 10 * time.Minute
3030
defaultSeed = 42
3131
defaultLBRebalancingInterval = time.Minute
32-
defaultLBRebalanceQPSThreshold = 0.1
33-
defaultLBRebalancingObjective = 0 // QPS
3432
)
3533

3634
const DefaultNodeCPURateCapacityNanos = 8 * 1e9 // 8 vcpus
@@ -101,8 +99,6 @@ type SimulationSettings struct {
10199
// SplitStatRetention is the duration which recorded load will be retained
102100
// and factored into load based splitting decisions.
103101
SplitStatRetention time.Duration
104-
// LBRebalancingObjective is the load objective to balance.
105-
LBRebalancingObjective int64
106102
// LBRebalancingInterval controls how often the store rebalancer will
107103
// consider opportunities for rebalancing.
108104
LBRebalancingInterval time.Duration
@@ -141,7 +137,6 @@ func DefaultSimulationSettings() *SimulationSettings {
141137
StateExchangeDelay: defaultStateExchangeDelay,
142138
SplitQPSThreshold: defaultSplitQPSThreshold,
143139
SplitStatRetention: defaultSplitStatRetention,
144-
LBRebalancingObjective: defaultLBRebalancingObjective,
145140
LBRebalancingInterval: defaultLBRebalancingInterval,
146141
ReplicateQueueEnabled: true,
147142
LeaseQueueEnabled: true,

pkg/kv/kvserver/asim/state/impl.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,8 @@ func (s *state) SetClusterSetting(Key string, Value interface{}) {
14021402
switch Key {
14031403
case "LBRebalancingMode":
14041404
kvserverbase.LoadBasedRebalancingMode.Override(context.Background(), &s.settings.ST.SV, kvserverbase.LBRebalancingMode(Value.(int64)))
1405+
case "LBRebalancingObjective":
1406+
kvserver.LoadBasedRebalancingObjective.Override(context.Background(), &s.settings.ST.SV, kvserver.LBRebalancingObjective(Value.(int64)))
14051407
default:
14061408
panic("other cluster settings not supported")
14071409
}

pkg/kv/kvserver/asim/storerebalancer/store_rebalancer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,11 @@ type simRebalanceObjectiveProvider struct {
126126

127127
// Objective returns the current rebalance objective.
128128
func (s simRebalanceObjectiveProvider) Objective() kvserver.LBRebalancingObjective {
129-
return kvserver.LBRebalancingObjective(s.settings.LBRebalancingObjective)
129+
return kvserver.LoadBasedRebalancingObjective.Get(&s.settings.ST.SV)
130130
}
131131

132132
func (src *storeRebalancerControl) scorerOptions() *allocatorimpl.LoadScorerOptions {
133-
dim := kvserver.LBRebalancingObjective(src.settings.LBRebalancingObjective).ToDimension()
133+
dim := kvserver.LoadBasedRebalancingObjective.Get(&src.settings.ST.SV).ToDimension()
134134
return &allocatorimpl.LoadScorerOptions{
135135
BaseScorerOptions: allocatorimpl.BaseScorerOptions{
136136
IOOverload: src.allocator.IOOverloadOptions(),
@@ -191,7 +191,7 @@ func (src *storeRebalancerControl) phasePrologue(
191191
ctx, src.scorerOptions(),
192192
hottestRanges(
193193
s, src.storeID,
194-
kvserver.LBRebalancingObjective(src.settings.LBRebalancingObjective).ToDimension(),
194+
kvserver.LoadBasedRebalancingObjective.Get(&src.settings.ST.SV).ToDimension(),
195195
),
196196
kvserverbase.LoadBasedRebalancingMode.Get(&src.settings.ST.SV),
197197
)

0 commit comments

Comments
 (0)