Skip to content

Commit b6da592

Browse files
committed
logical: enable crud writer by default
This change enables the crud writer by default for IMMEDIATE and VALIDATED mode LDR jobs. Release note: LDR now supports partial indexes and is tolerant of mismatched column ids in the source and destination tables. Epic: CRDB-51533
1 parent 3c7eac5 commit b6da592

File tree

3 files changed

+15
-6
lines changed

3 files changed

+15
-6
lines changed

pkg/crosscluster/logical/logical_replication_job_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/base"
2323
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
2424
"github.com/cockroachdb/cockroach/pkg/clusterversion"
25+
"github.com/cockroachdb/cockroach/pkg/crosscluster"
2526
"github.com/cockroachdb/cockroach/pkg/crosscluster/replicationtestutils"
2627
"github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient"
2728
_ "github.com/cockroachdb/cockroach/pkg/crosscluster/streamclient/randclient"
@@ -895,9 +896,6 @@ func TestRandomTables(t *testing.T) {
895896
tc, s, runnerA, runnerB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
896897
defer tc.Stopper().Stop(ctx)
897898

898-
// TODO(#148303): Remove this once the crud writer supports tables with array primary keys.
899-
runnerA.Exec(t, "SET CLUSTER SETTING logical_replication.consumer.immediate_mode_writer = 'legacy-kv'")
900-
901899
sqlA := s.SQLConn(t, serverutils.DBName("a"))
902900

903901
var tableName, streamStartStmt string
@@ -2897,8 +2895,14 @@ func TestGetWriterType(t *testing.T) {
28972895

28982896
t.Run("validated-mode", func(t *testing.T) {
28992897
st := cluster.MakeTestingClusterSettings()
2898+
29002899
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Validated, st)
29012900
require.NoError(t, err)
2901+
require.Equal(t, sqlclustersettings.LDRWriterTypeCRUD, wt)
2902+
2903+
crosscluster.LogicalReplicationUDFWriterEnabled.Override(ctx, &st.SV, true)
2904+
wt, err = getWriterType(ctx, jobspb.LogicalReplicationDetails_Validated, st)
2905+
require.NoError(t, err)
29022906
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
29032907
})
29042908

pkg/crosscluster/logical/logical_replication_writer_processor.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,11 +741,17 @@ func (lrw *logicalReplicationWriterProcessor) setupBatchHandlers(ctx context.Con
741741
func getWriterType(
742742
ctx context.Context, mode jobspb.LogicalReplicationDetails_ApplyMode, settings *cluster.Settings,
743743
) (sqlclustersettings.LDRWriterType, error) {
744+
// TODO(jeffswenson): delete the kv and legacy sql ldr writers
744745
switch mode {
745746
case jobspb.LogicalReplicationDetails_Immediate:
746747
return sqlclustersettings.LDRWriterType(sqlclustersettings.LDRImmediateModeWriter.Get(&settings.SV)), nil
747748
case jobspb.LogicalReplicationDetails_Validated:
748-
return sqlclustersettings.LDRWriterTypeSQL, nil
749+
if crosscluster.LogicalReplicationUDFWriterEnabled.Get(&settings.SV) {
750+
// If the UDF writer is enabled, fall back to the legacy SQL writer for
751+
// validated mode.
752+
return sqlclustersettings.LDRWriterTypeSQL, nil
753+
}
754+
return sqlclustersettings.LDRWriterTypeCRUD, nil
749755
default:
750756
return "", errors.Newf("unknown logical replication writer type: %s", mode)
751757
}

pkg/sql/sqlclustersettings/clustersettings.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,7 @@ var LDRImmediateModeWriter = settings.RegisterStringSetting(
144144
settings.ApplicationLevel,
145145
"logical_replication.consumer.immediate_mode_writer",
146146
"the writer to use when in immediate mode",
147-
// TODO(jeffswenson): make the crud writer the default
148-
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(LDRWriterTypeLegacyKV), string(LDRWriterTypeCRUD), string(LDRWriterTypeSQL)),
147+
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(LDRWriterTypeCRUD), string(LDRWriterTypeLegacyKV), string(LDRWriterTypeSQL)),
149148
settings.WithValidateString(func(sv *settings.Values, val string) error {
150149
if val != string(LDRWriterTypeSQL) && val != string(LDRWriterTypeLegacyKV) && val != string(LDRWriterTypeCRUD) {
151150
return errors.Newf("immediate mode writer must be either 'sql', 'legacy-kv', or 'crud', got '%s'", val)

0 commit comments

Comments
 (0)