From f843e6d6baddce2c130481193a16ac7e461591e6 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 30 Jan 2024 16:53:05 +0800 Subject: [PATCH 1/4] add databend schema related --- pom.xml | 14 +++- .../databend/DatabendDynamicTableFactory.java | 12 +-- .../databend/catalog/databend/DataModel.java | 7 ++ .../catalog/databend/DatabendSystem.java | 42 +++++++++++ .../catalog/databend/DatabendType.java | 20 +++++ .../catalog/databend/FieldSchema.java | 40 ++++++++++ .../catalog/databend/TableSchema.java | 75 +++++++++++++++++++ .../exception/CreateTableException.java | 30 ++++++++ .../options/DatabendConnectionOptions.java | 12 ++- .../internal/options/DatabendDmlOptions.java | 18 ++++- .../internal/options/DatabendReadOptions.java | 62 ++------------- .../flink/TestDatabendConnectionProvider.java | 2 +- .../flink/TestDatabendOutputFormat.java | 2 +- .../apache/flink/TestDeleteOutputFormat.java | 2 +- 14 files changed, 270 insertions(+), 68 deletions(-) create mode 100644 src/main/java/org/apache/flink/connector/databend/catalog/databend/DataModel.java create mode 100644 src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java create mode 100644 src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendType.java create mode 100644 src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java create mode 100644 src/main/java/org/apache/flink/connector/databend/catalog/databend/TableSchema.java create mode 100644 src/main/java/org/apache/flink/connector/databend/exception/CreateTableException.java diff --git a/pom.xml b/pom.xml index 49ff3d3..ac2144f 100644 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ UTF-8 1.8 1.8 - + 2.4.2 8.14 4.13.2 2.12 @@ -177,6 +177,18 @@ ${flink.version} ${flink.scope} + + com.ververica + flink-sql-connector-oracle-cdc + ${flink.sql.cdc.version} + provided + + + flink-shaded-guava + org.apache.flink + + + diff --git a/src/main/java/org/apache/flink/connector/databend/DatabendDynamicTableFactory.java b/src/main/java/org/apache/flink/connector/databend/DatabendDynamicTableFactory.java index 6c74ad5..4fc86b3 100644 --- a/src/main/java/org/apache/flink/connector/databend/DatabendDynamicTableFactory.java +++ b/src/main/java/org/apache/flink/connector/databend/DatabendDynamicTableFactory.java @@ -46,9 +46,9 @@ public DynamicTableSink createDynamicTableSink(Context context) { Properties databendProperties = getDatabendProperties(context.getCatalogTable().getOptions()); return new DatabendDynamicTableSink( - getDmlOptions(config), + getDmlOptions(config, databendProperties), databendProperties, - getDmlOptions(config).getPrimaryKeys(), + getDmlOptions(config, databendProperties).getPrimaryKeys(), catalogTable.getPartitionKeys().toArray(new String[0]), context.getPhysicalRowDataType()); } @@ -63,7 +63,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { Properties databendProperties = getDatabendProperties(context.getCatalogTable().getOptions()); return new DatabendDynamicTableSource( - getReadOptions(config), databendProperties, context.getPhysicalRowDataType()); + getReadOptions(config, databendProperties), databendProperties, context.getPhysicalRowDataType()); } @Override @@ -102,7 +102,7 @@ private void validateConfigOptions(ReadableConfig config) { } } - public DatabendDmlOptions getDmlOptions(ReadableConfig config) { + public DatabendDmlOptions getDmlOptions(ReadableConfig config, Properties databendProperties) { return new DatabendDmlOptions.Builder() .withUrl(config.get(URL)) .withUsername(config.get(USERNAME)) @@ -116,16 +116,18 @@ public DatabendDmlOptions getDmlOptions(ReadableConfig config) { .withPrimaryKey(config.get(SINK_PRIMARY_KEYS).toArray(new String[0])) .withIgnoreDelete(config.get(SINK_IGNORE_DELETE)) .withParallelism(config.get(SINK_PARALLELISM)) + .withConnectionProperties(databendProperties) .build(); } - private DatabendReadOptions getReadOptions(ReadableConfig config) { + private DatabendReadOptions getReadOptions(ReadableConfig config, Properties databendProperties) { return new DatabendReadOptions.Builder() .withUrl(config.get(URL)) .withUsername(config.get(USERNAME)) .withPassword(config.get(PASSWORD)) .withDatabaseName(config.get(DATABASE_NAME)) .withTableName(config.get(TABLE_NAME)) + .withConnectionProperties(databendProperties) .build(); } } diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DataModel.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DataModel.java new file mode 100644 index 0000000..f273d1d --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DataModel.java @@ -0,0 +1,7 @@ +package org.apache.flink.connector.databend.catalog.databend; + +public enum DataModel { + DUPLICATE, + UNIQUE, + AGGREGATE +} diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java new file mode 100644 index 0000000..f738257 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java @@ -0,0 +1,42 @@ +package org.apache.flink.connector.databend.catalog.databend; + +import org.apache.flink.annotation.Public; +import org.apache.flink.connector.databend.internal.connection.DatabendConnectionProvider; +import org.apache.flink.connector.databend.internal.options.DatabendConnectionOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Doris System Operate. + */ +@Public +public class DatabendSystem implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(DatabendSystem.class); + private final DatabendConnectionProvider jdbcConnectionProvider; + private static final List builtinDatabases = + Collections.singletonList("information_schema"); + + public DatabendSystem(DatabendConnectionOptions options) { + this.jdbcConnectionProvider = new DatabendConnectionProvider(options, options.getConnectionProperties()); + } + + + private List identifier(List name) { + List result = name.stream().map(m -> identifier(m)).collect(Collectors.toList()); + return result; + } + + private String identifier(String name) { + return "`" + name + "`"; + } + + private String quoteProperties(String name) { + return "'" + name + "'"; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendType.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendType.java new file mode 100644 index 0000000..c4a613c --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendType.java @@ -0,0 +1,20 @@ +package org.apache.flink.connector.databend.catalog.databend; + +public class DatabendType { + public static final String BOOLEAN = "BOOLEAN"; + public static final String TINYINT = "TINYINT"; + public static final String SMALLINT = "SMALLINT"; + public static final String INT = "INT"; + public static final String BIGINT = "BIGINT"; + public static final String FLOAT = "FLOAT"; + public static final String DOUBLE = "DOUBLE"; + public static final String DECIMAL = "DECIMAL"; + public static final String DATE = "DATE"; + public static final String DATETIME = "DATETIME"; + public static final String VARCHAR = "VARCHAR"; + public static final String STRING = "STRING"; + public static final String BITMAP = "BITMAP"; + public static final String ARRAY = "ARRAY"; + public static final String JSON = "JSON"; + public static final String MAP = "MAP"; +} diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java new file mode 100644 index 0000000..538bded --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java @@ -0,0 +1,40 @@ +package org.apache.flink.connector.databend.catalog.databend; + +public class FieldSchema { + private String name; + private String typeString; + private String comment; + + public FieldSchema() { + } + + public FieldSchema(String name, String typeString, String comment) { + this.name = name; + this.typeString = typeString; + this.comment = comment; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getTypeString() { + return typeString; + } + + public void setTypeString(String typeString) { + this.typeString = typeString; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } +} diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/TableSchema.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/TableSchema.java new file mode 100644 index 0000000..090e032 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/TableSchema.java @@ -0,0 +1,75 @@ +package org.apache.flink.connector.databend.catalog.databend; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TableSchema { + private String database; + private String table; + private String tableComment; + private Map fields; + private List keys = new ArrayList<>(); + private DataModel model = DataModel.DUPLICATE; + private Map properties = new HashMap<>(); + + + public String getDatabase() { + return database; + } + + public String getTable() { + return table; + } + + public String getTableComment() { + return tableComment; + } + + public Map getFields() { + return fields; + } + + public List getKeys() { + return keys; + } + + public DataModel getModel() { + return model; + } + + public Map getProperties() { + return properties; + } + + public void setDatabase(String database) { + this.database = database; + } + + public void setTable(String table) { + this.table = table; + } + + public void setTableComment(String tableComment) { + this.tableComment = tableComment; + } + + public void setFields(Map fields) { + this.fields = fields; + } + + public void setKeys(List keys) { + this.keys = keys; + } + + public void setModel(DataModel model) { + this.model = model; + } + + + public void setProperties(Map properties) { + this.properties = properties; + } + +} diff --git a/src/main/java/org/apache/flink/connector/databend/exception/CreateTableException.java b/src/main/java/org/apache/flink/connector/databend/exception/CreateTableException.java new file mode 100644 index 0000000..38c2ae7 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/exception/CreateTableException.java @@ -0,0 +1,30 @@ +package org.apache.flink.connector.databend.exception; + +/** + * Create Table exception. + */ +public class CreateTableException extends RuntimeException { + public CreateTableException() { + super(); + } + + public CreateTableException(String message) { + super(message); + } + + public CreateTableException(String message, Throwable cause) { + super(message, cause); + } + + public CreateTableException(Throwable cause) { + super(cause); + } + + protected CreateTableException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendConnectionOptions.java b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendConnectionOptions.java index fd211c5..f6626ec 100644 --- a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendConnectionOptions.java +++ b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendConnectionOptions.java @@ -1,8 +1,9 @@ package org.apache.flink.connector.databend.internal.options; +import javax.annotation.Nullable; import java.io.Serializable; import java.util.Optional; -import javax.annotation.Nullable; +import java.util.Properties; /** * Databend connection options. @@ -21,13 +22,16 @@ public class DatabendConnectionOptions implements Serializable { private final String tableName; + private final Properties connectionProperties; + public DatabendConnectionOptions( - String url, @Nullable String username, @Nullable String password, String databaseName, String tableName) { + String url, @Nullable String username, @Nullable String password, String databaseName, String tableName, Properties connectionProperties) { this.url = url; this.username = username; this.password = password; this.databaseName = databaseName; this.tableName = tableName; + this.connectionProperties = connectionProperties; } public String getUrl() { @@ -49,4 +53,8 @@ public String getDatabaseName() { public String getTableName() { return this.tableName; } + + public Properties getConnectionProperties() { + return this.connectionProperties; + } } diff --git a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendDmlOptions.java b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendDmlOptions.java index 527859a..210bd5e 100644 --- a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendDmlOptions.java +++ b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendDmlOptions.java @@ -4,6 +4,7 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.util.Properties; /** * Databend data modify language options. @@ -24,6 +25,7 @@ public class DatabendDmlOptions extends DatabendConnectionOptions { private final boolean ignoreDelete; private final Integer parallelism; + private Properties connectionProperties; public DatabendDmlOptions( String url, @@ -31,6 +33,7 @@ public DatabendDmlOptions( @Nullable String password, String databaseName, String tableName, + Properties connectionProperties, int batchSize, Duration flushInterval, int maxRetires, @@ -38,7 +41,7 @@ public DatabendDmlOptions( String[] primaryKeys, boolean ignoreDelete, Integer parallelism) { - super(url, username, password, databaseName, tableName); + super(url, username, password, databaseName, tableName, connectionProperties); this.batchSize = batchSize; this.flushInterval = flushInterval; this.maxRetries = maxRetires; @@ -64,7 +67,7 @@ public SinkUpdateStrategy getUpdateStrategy() { return updateStrategy; } - public String[] getPrimaryKeys(){ + public String[] getPrimaryKeys() { return primaryKeys; } @@ -76,6 +79,10 @@ public Integer getParallelism() { return parallelism; } + public Properties getConnectionProperties() { + return this.connectionProperties; + } + /** * Builder for {@link DatabendDmlOptions}. */ @@ -92,6 +99,7 @@ public static class Builder { private boolean ignoreDelete; private String[] primaryKey; private Integer parallelism; + private Properties connectionProperties; public Builder() { } @@ -156,6 +164,11 @@ public DatabendDmlOptions.Builder withParallelism(Integer parallelism) { return this; } + public DatabendDmlOptions.Builder withConnectionProperties(Properties connectionProperties) { + this.connectionProperties = connectionProperties; + return this; + } + public DatabendDmlOptions build() { return new DatabendDmlOptions( url, @@ -163,6 +176,7 @@ public DatabendDmlOptions build() { password, databaseName, tableName, + connectionProperties, batchSize, flushInterval, maxRetries, diff --git a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendReadOptions.java b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendReadOptions.java index 4f0eeb4..948f7e1 100644 --- a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendReadOptions.java +++ b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendReadOptions.java @@ -1,46 +1,19 @@ package org.apache.flink.connector.databend.internal.options; import javax.annotation.Nullable; +import java.util.Properties; public class DatabendReadOptions extends DatabendConnectionOptions { private static final long serialVersionUID = 1L; - private final String partitionColumn; - private final Integer partitionNum; - private final Long partitionLowerBound; - private final Long partitionUpperBound; - private DatabendReadOptions( String url, @Nullable String username, @Nullable String password, String databaseName, String tableName, - String partitionColumn, - Integer partitionNum, - Long partitionLowerBound, - Long partitionUpperBound) { - super(url, username, password, databaseName, tableName); - this.partitionColumn = partitionColumn; - this.partitionNum = partitionNum; - this.partitionLowerBound = partitionLowerBound; - this.partitionUpperBound = partitionUpperBound; - } - - public String getPartitionColumn() { - return partitionColumn; - } - - public Integer getPartitionNum() { - return partitionNum; - } - - public Long getPartitionLowerBound() { - return partitionLowerBound; - } - - public Long getPartitionUpperBound() { - return partitionUpperBound; + Properties connectionProperties) { + super(url, username, password, databaseName, tableName, connectionProperties); } /** @@ -52,10 +25,7 @@ public static class Builder { private String password; private String databaseName; private String tableName; - private String partitionColumn; - private Integer partitionNum; - private Long partitionLowerBound; - private Long partitionUpperBound; + private Properties connectionProperties; public DatabendReadOptions.Builder withUrl(String url) { this.url = url; @@ -82,23 +52,8 @@ public DatabendReadOptions.Builder withTableName(String tableName) { return this; } - public DatabendReadOptions.Builder withPartitionColumn(String partitionColumn) { - this.partitionColumn = partitionColumn; - return this; - } - - public DatabendReadOptions.Builder withPartitionNum(Integer partitionNum) { - this.partitionNum = partitionNum; - return this; - } - - public Builder withPartitionLowerBound(Long partitionLowerBound) { - this.partitionLowerBound = partitionLowerBound; - return this; - } - - public Builder withPartitionUpperBound(Long partitionUpperBound) { - this.partitionUpperBound = partitionUpperBound; + public DatabendReadOptions.Builder withConnectionProperties(Properties connectionProperties) { + this.connectionProperties = connectionProperties; return this; } @@ -109,10 +64,7 @@ public DatabendReadOptions build() { password, databaseName, tableName, - partitionColumn, - partitionNum, - partitionLowerBound, - partitionUpperBound); + connectionProperties); } } } diff --git a/src/test/java/org/apache/flink/TestDatabendConnectionProvider.java b/src/test/java/org/apache/flink/TestDatabendConnectionProvider.java index 0dc4cab..f9fef2f 100644 --- a/src/test/java/org/apache/flink/TestDatabendConnectionProvider.java +++ b/src/test/java/org/apache/flink/TestDatabendConnectionProvider.java @@ -24,7 +24,7 @@ public void TestCreateConnection() throws SQLException { m.put("properties.table-name", "test"); Properties properties = DatabendUtil.getDatabendProperties(m); DatabendConnectionOptions databendConnectionOptions = - new DatabendConnectionOptions("databend://localhost:8000", "databend", "databend", "default", "test"); + new DatabendConnectionOptions("databend://localhost:8000", "databend", "databend", "default", "test", properties); DatabendConnectionProvider databendConnectionProvider = new DatabendConnectionProvider(databendConnectionOptions, properties); diff --git a/src/test/java/org/apache/flink/TestDatabendOutputFormat.java b/src/test/java/org/apache/flink/TestDatabendOutputFormat.java index 74d9868..03046d4 100644 --- a/src/test/java/org/apache/flink/TestDatabendOutputFormat.java +++ b/src/test/java/org/apache/flink/TestDatabendOutputFormat.java @@ -69,7 +69,7 @@ public void TestAbstractDatabendOutput() throws SQLException, IOException { m.put("properties.table-name", "test"); Properties properties = DatabendUtil.getDatabendProperties(m); DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions( - "databend://0.0.0.0:8000", "databend", "databend", "test_output_format", "test"); + "databend://0.0.0.0:8000", "databend", "databend", "test_output_format", "test", properties); DatabendDmlOptions databendDmlOptions = new DatabendDmlOptions( "databend://0.0.0.0:8000", diff --git a/src/test/java/org/apache/flink/TestDeleteOutputFormat.java b/src/test/java/org/apache/flink/TestDeleteOutputFormat.java index 78e2e7b..a797ceb 100644 --- a/src/test/java/org/apache/flink/TestDeleteOutputFormat.java +++ b/src/test/java/org/apache/flink/TestDeleteOutputFormat.java @@ -65,7 +65,7 @@ public void TestAbstractDatabendOutput() throws SQLException, IOException { m.put("properties.database-name", "test_delete_output"); m.put("properties.table-name", "test"); Properties properties = DatabendUtil.getDatabendProperties(m); - DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test"); + DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test", properties); DatabendDmlOptions databendDmlOptions = new DatabendDmlOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test", 1, Duration.ofSeconds(100), 3, DatabendConfigOptions.SinkUpdateStrategy.UPDATE, new String[]{}, false, 1); String[] fields = {"x", "y", "z"}; From db35750f019b84949cbdcfb2d99f53835d3c358c Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 30 Jan 2024 16:59:52 +0800 Subject: [PATCH 2/4] fix tests --- .../java/org/apache/flink/TestDatabendDynamicTableSink.java | 2 ++ src/test/java/org/apache/flink/TestDatabendOutputFormat.java | 1 + src/test/java/org/apache/flink/TestDeleteOutputFormat.java | 2 +- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/test/java/org/apache/flink/TestDatabendDynamicTableSink.java b/src/test/java/org/apache/flink/TestDatabendDynamicTableSink.java index b22f8a5..d16374f 100644 --- a/src/test/java/org/apache/flink/TestDatabendDynamicTableSink.java +++ b/src/test/java/org/apache/flink/TestDatabendDynamicTableSink.java @@ -38,6 +38,7 @@ public void testConstructor() { "databend", "test_output_format", "test", + connectionProperties, 1, Duration.ofSeconds(100), 3, @@ -77,6 +78,7 @@ public void testGetChangelogMode() { "databend", "test_output_format", "test", + connectionProperties, 1, Duration.ofSeconds(100), 3, diff --git a/src/test/java/org/apache/flink/TestDatabendOutputFormat.java b/src/test/java/org/apache/flink/TestDatabendOutputFormat.java index 03046d4..2c8b8ed 100644 --- a/src/test/java/org/apache/flink/TestDatabendOutputFormat.java +++ b/src/test/java/org/apache/flink/TestDatabendOutputFormat.java @@ -77,6 +77,7 @@ public void TestAbstractDatabendOutput() throws SQLException, IOException { "databend", "test_output_format", "test", + properties, 1, Duration.ofSeconds(100), 3, diff --git a/src/test/java/org/apache/flink/TestDeleteOutputFormat.java b/src/test/java/org/apache/flink/TestDeleteOutputFormat.java index a797ceb..75d6bbb 100644 --- a/src/test/java/org/apache/flink/TestDeleteOutputFormat.java +++ b/src/test/java/org/apache/flink/TestDeleteOutputFormat.java @@ -67,7 +67,7 @@ public void TestAbstractDatabendOutput() throws SQLException, IOException { Properties properties = DatabendUtil.getDatabendProperties(m); DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test", properties); - DatabendDmlOptions databendDmlOptions = new DatabendDmlOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test", 1, Duration.ofSeconds(100), 3, DatabendConfigOptions.SinkUpdateStrategy.UPDATE, new String[]{}, false, 1); + DatabendDmlOptions databendDmlOptions = new DatabendDmlOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test", properties, 1, Duration.ofSeconds(100), 3, DatabendConfigOptions.SinkUpdateStrategy.UPDATE, new String[]{}, false, 1); String[] fields = {"x", "y", "z"}; String[] primaryKeys = {"x"}; String[] partitionKeys = {"x"}; From a3cd658802b8d82dbeddea3c1d5460b2802c9454 Mon Sep 17 00:00:00 2001 From: hantmac Date: Tue, 30 Jan 2024 17:11:41 +0800 Subject: [PATCH 3/4] fix typo --- .../connector/databend/catalog/databend/DatabendSystem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java index f738257..955bb40 100644 --- a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java +++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java @@ -12,7 +12,7 @@ import java.util.stream.Collectors; /** - * Doris System Operate. + * Databend System Operate. */ @Public public class DatabendSystem implements Serializable { From 9711a25bb68887cf9954c5ff9a88c6802c2ebf45 Mon Sep 17 00:00:00 2001 From: hantmac Date: Fri, 22 Aug 2025 20:05:20 +0800 Subject: [PATCH 4/4] support mysql full db sync --- pom.xml | 12 + .../catalog/databend/DatabendSystem.java | 145 ++++- .../catalog/databend/FieldSchema.java | 11 +- .../exception/DatabendRuntimeException.java | 27 + .../exception/DatabendSystemException.java | 27 + .../databend/tools/cdc/CdcTools.java | 81 +++ .../databend/tools/cdc/DatabaseSync.java | 508 ++++++++++++++++++ .../databend/tools/cdc/SourceSchema.java | 139 +++++ .../tools/cdc/mysql/MySQLDatabaseSync.java | 4 + .../writer/serializer/DatabendRecord.java | 71 +++ 10 files changed, 1022 insertions(+), 3 deletions(-) create mode 100644 src/main/java/org/apache/flink/connector/databend/exception/DatabendRuntimeException.java create mode 100644 src/main/java/org/apache/flink/connector/databend/exception/DatabendSystemException.java create mode 100644 src/main/java/org/apache/flink/connector/databend/tools/cdc/CdcTools.java create mode 100644 src/main/java/org/apache/flink/connector/databend/tools/cdc/DatabaseSync.java create mode 100644 src/main/java/org/apache/flink/connector/databend/tools/cdc/SourceSchema.java create mode 100644 src/main/java/org/apache/flink/connector/databend/tools/cdc/mysql/MySQLDatabaseSync.java create mode 100644 src/main/java/org/apache/flink/connector/databend/writer/serializer/DatabendRecord.java diff --git a/pom.xml b/pom.xml index ac2144f..7f8c018 100644 --- a/pom.xml +++ b/pom.xml @@ -177,6 +177,18 @@ ${flink.version} ${flink.scope} + + com.ververica + flink-sql-connector-mysql-cdc + ${flink.sql.cdc.version} + provided + + + flink-shaded-guava + org.apache.flink + + + com.ververica flink-sql-connector-oracle-cdc diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java index 955bb40..859596f 100644 --- a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java +++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java @@ -1,16 +1,28 @@ package org.apache.flink.connector.databend.catalog.databend; +import org.apache.commons.compress.utils.Lists; import org.apache.flink.annotation.Public; +import org.apache.flink.connector.databend.exception.DatabendRuntimeException; +import org.apache.flink.connector.databend.exception.DatabendSystemException; import org.apache.flink.connector.databend.internal.connection.DatabendConnectionProvider; import org.apache.flink.connector.databend.internal.options.DatabendConnectionOptions; +import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Databend System Operate. */ @@ -26,16 +38,145 @@ public DatabendSystem(DatabendConnectionOptions options) { this.jdbcConnectionProvider = new DatabendConnectionProvider(options, options.getConnectionProperties()); } + public List listDatabases() { + return extractColumnValuesBySQL( + "select schema_name from information_schema.schemata;;", + 1, + dbName -> !builtinDatabases.contains(dbName)); + } - private List identifier(List name) { + public boolean databaseExists(String database) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(database)); + return listDatabases().contains(database); + } + + private static List identifier(List name) { List result = name.stream().map(m -> identifier(m)).collect(Collectors.toList()); return result; } - private String identifier(String name) { + public boolean createDatabase(String database) { + execute(String.format("CREATE DATABASE IF NOT EXISTS %s", database)); + return true; + } + + public void execute(String sql) { + try (Statement statement = + jdbcConnectionProvider.getOrCreateConnection().createStatement()) { + statement.execute(sql); + } catch (Exception e) { + throw new DatabendSystemException( + String.format("SQL query could not be executed: %s", sql), e); + } + } + + public boolean tableExists(String database, String table) { + return databaseExists(database) && listTables(database).contains(table); + } + + public List listTables(String databaseName) { + if (!databaseExists(databaseName)) { + throw new DatabendRuntimeException("database" + databaseName + " is not exists"); + } + return extractColumnValuesBySQL( + "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?", + 1, + null, + databaseName); + } + + public void createTable(TableSchema schema) { + String ddl = buildCreateTableDDL(schema); + LOG.info("Create table with ddl:{}", ddl); + execute(ddl); + } + + public static String buildCreateTableDDL(TableSchema schema) { + StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS "); + sb.append(identifier(schema.getDatabase())) + .append(".") + .append(identifier(schema.getTable())) + .append("("); + + Map fields = schema.getFields(); + + // append values + for (Map.Entry entry : fields.entrySet()) { + FieldSchema field = entry.getValue(); + buildColumn(sb, field, false); + } + sb = sb.deleteCharAt(sb.length() - 1); + sb.append(" ) "); + + // append table comment + if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) { + sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' "); + } + + + return sb.toString(); + } + + private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) { + String fieldType = field.getTypeString(); + if (isKey && DatabendType.STRING.equals(fieldType)) { + fieldType = String.format("%s(%s)", DatabendType.VARCHAR, 65533); + } + sql.append(identifier(field.getName())).append(" ").append(fieldType); + + if (field.getDefaultValue() != null) { + sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue())); + } + sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',"); + } + + public static String quoteComment(String comment) { + if (comment == null) { + return ""; + } else { + return comment.replaceAll("'", "\\\\'"); + } + } + + public static String quoteDefaultValue(String defaultValue) { + // DEFAULT current_timestamp not need quote + if (defaultValue.equalsIgnoreCase("current_timestamp")) { + return defaultValue; + } + return "'" + defaultValue + "'"; + } + + + public List extractColumnValuesBySQL( + String sql, int columnIndex, Predicate filterFunc, Object... params) { + + List columnValues = Lists.newArrayList(); + try (PreparedStatement ps = + jdbcConnectionProvider.getOrCreateConnection().prepareStatement(sql)) { + if (Objects.nonNull(params) && params.length > 0) { + for (int i = 0; i < params.length; i++) { + ps.setObject(i + 1, params[i]); + } + } + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + String columnValue = rs.getString(columnIndex); + if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) { + columnValues.add(columnValue); + } + } + return columnValues; + } catch (Exception e) { + throw new DatabendSystemException( + String.format("The following SQL query could not be executed: %s", sql), e); + } + } + + private static String identifier(String name) { return "`" + name + "`"; } + private String quoteProperties(String name) { return "'" + name + "'"; } diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java index 538bded..f601f8f 100644 --- a/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java +++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java @@ -5,12 +5,15 @@ public class FieldSchema { private String typeString; private String comment; + private String defaultValue; + public FieldSchema() { } - public FieldSchema(String name, String typeString, String comment) { + public FieldSchema(String name, String typeString, String comment, String defaultValue) { this.name = name; this.typeString = typeString; + this.defaultValue = defaultValue; this.comment = comment; } @@ -37,4 +40,10 @@ public String getComment() { public void setComment(String comment) { this.comment = comment; } + + public String getDefaultValue() { + return defaultValue; + } } + + diff --git a/src/main/java/org/apache/flink/connector/databend/exception/DatabendRuntimeException.java b/src/main/java/org/apache/flink/connector/databend/exception/DatabendRuntimeException.java new file mode 100644 index 0000000..092fe98 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/exception/DatabendRuntimeException.java @@ -0,0 +1,27 @@ +package org.apache.flink.connector.databend.exception; + +public class DatabendRuntimeException extends RuntimeException { + public DatabendRuntimeException() { + super(); + } + + public DatabendRuntimeException(String message) { + super(message); + } + + public DatabendRuntimeException(String message, Throwable cause) { + super(message, cause); + } + + public DatabendRuntimeException(Throwable cause) { + super(cause); + } + + protected DatabendRuntimeException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/org/apache/flink/connector/databend/exception/DatabendSystemException.java b/src/main/java/org/apache/flink/connector/databend/exception/DatabendSystemException.java new file mode 100644 index 0000000..17f2c35 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/exception/DatabendSystemException.java @@ -0,0 +1,27 @@ +package org.apache.flink.connector.databend.exception; + +public class DatabendSystemException extends RuntimeException { + public DatabendSystemException() { + super(); + } + + public DatabendSystemException(String message) { + super(message); + } + + public DatabendSystemException(String message, Throwable cause) { + super(message, cause); + } + + public DatabendSystemException(Throwable cause) { + super(cause); + } + + protected DatabendSystemException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/org/apache/flink/connector/databend/tools/cdc/CdcTools.java b/src/main/java/org/apache/flink/connector/databend/tools/cdc/CdcTools.java new file mode 100644 index 0000000..cde9ac1 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/tools/cdc/CdcTools.java @@ -0,0 +1,81 @@ +package org.apache.flink.connector.databend.tools.cdc; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.StringUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +/** + * cdc sync tools + */ +public class CdcTools { + private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database"; + private static final List EMPTY_KEYS = Arrays.asList("password"); + + public static void main(String[] args) throws Exception { + String operation = args[0].toLowerCase(); + String[] opArgs = Arrays.copyOfRange(args, 1, args.length); + System.out.println(); + switch (operation) { + case MYSQL_SYNC_DATABASE: + createMySQLSyncDatabase(opArgs); + break; + default: + System.out.println("Unknown operation " + operation); + System.exit(1); + } + } + + private static void createMySQLSyncDatabase(String[] opArgs) throws Exception { + MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + String jobName = params.get("job-name"); + String database = params.get("database"); + String tablePrefix = params.get("table-prefix"); + String tableSuffix = params.get("table-suffix"); + String includingTables = params.get("including-tables"); + String excludingTables = params.get("excluding-tables"); + + Map mysqlMap = getConfigMap(params, "mysql-conf"); + Map sinkMap = getConfigMap(params, "sink-conf"); + Map tableMap = getConfigMap(params, "table-conf"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration mysqlConfig = Configuration.fromMap(mysqlMap); + Configuration sinkConfig = Configuration.fromMap(sinkMap); + + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync.create(env, database, mysqlConfig, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap); + databaseSync.build(); + + if(StringUtils.isNullOrWhitespaceOnly(jobName)){ + jobName = String.format("MySQL-Databend Sync Database: %s", mysqlMap.get("database-name")); + } + env.execute(jobName); + } + + private static Map getConfigMap(MultipleParameterTool params, String key) { + if (!params.has(key)) { + return null; + } + + Map map = new HashMap<>(); + for (String param : params.getMultiParameter(key)) { + String[] kv = param.split("="); + if (kv.length == 2) { + map.put(kv[0], kv[1]); + continue; + }else if(kv.length == 1 && EMPTY_KEYS.contains(kv[0])){ + map.put(kv[0], ""); + continue; + } + + System.err.println( + "Invalid " + key + " " + param + ".\n"); + return null; + } + return map; + } +} diff --git a/src/main/java/org/apache/flink/connector/databend/tools/cdc/DatabaseSync.java b/src/main/java/org/apache/flink/connector/databend/tools/cdc/DatabaseSync.java new file mode 100644 index 0000000..934a5e1 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/tools/cdc/DatabaseSync.java @@ -0,0 +1,508 @@ +package org.apache.flink.connector.databend.tools.cdc; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.databend.catalog.databend.DatabendSystem; +import org.apache.flink.connector.databend.catalog.databend.TableSchema; +import org.apache.flink.connector.databend.config.DatabendConfigOptions; +import org.apache.flink.connector.databend.internal.options.DatabendConnectionOptions; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.*; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public abstract class DatabaseSync { + private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class); + private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change"; + private static final String TABLE_NAME_OPTIONS = "table-name"; + + protected Configuration config; + + protected String database; + + protected TableNameConverter converter; + protected Pattern includingPattern; + protected Pattern excludingPattern; + protected Map multiToOneRulesPattern; + protected Map tableConfig = new HashMap<>(); + protected Configuration sinkConfig; + protected boolean ignoreDefaultValue; + + public StreamExecutionEnvironment env; + private boolean createTableOnly = false; + private boolean newSchemaChange; + protected String includingTables; + protected String excludingTables; + protected String multiToOneOrigin; + protected String multiToOneTarget; + protected String tablePrefix; + protected String tableSuffix; + protected boolean singleSink; + private final Map tableMapping = new HashMap<>(); + + public abstract void registerDriver() throws SQLException; + + public abstract Connection getConnection() throws SQLException; + + public abstract List getSchemaList() throws Exception; + + public abstract DataStreamSource buildCdcSource(StreamExecutionEnvironment env); + + /** + * Get the prefix of a specific tableList, for example, mysql is database, oracle is schema. + */ + public abstract String getTableListPrefix(); + + protected DatabaseSync() throws SQLException { + registerDriver(); + } + + public void create() { + this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); + this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); + this.multiToOneRulesPattern = multiToOneRulesParser(multiToOneOrigin, multiToOneTarget); + this.converter = new TableNameConverter(tablePrefix, tableSuffix, multiToOneRulesPattern); + // default enable light schema change + if (!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) { + this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true"); + } + } + + public void build() throws Exception { + DatabendConnectionOptions options = getDatabendConnectionOptions(); + DatabendSystem databendSystem = new DatabendSystem(options); + + List schemaList = getSchemaList(); + Preconditions.checkState(!schemaList.isEmpty(), "No tables to be synchronized."); + if (!databendSystem.databaseExists(database)) { + LOG.info("database {} not exist, created", database); + databendSystem.createDatabase(database); + } + + List syncTables = new ArrayList<>(); + List dorisTables = new ArrayList<>(); + + Map tableBucketsMap = null; + if (tableConfig.containsKey("table-buckets")) { + tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets")); + } + Set bucketsTable = new HashSet<>(); + for (SourceSchema schema : schemaList) { + syncTables.add(schema.getTableName()); + String dorisTable = converter.convert(schema.getTableName()); + + // Calculate the mapping relationship between upstream and downstream tables + tableMapping.put( + schema.getTableIdentifier(), String.format("%s.%s", database, dorisTable)); + if (!databendSystem.tableExists(database, dorisTable)) { + TableSchema databendSchema = schema.convertTableSchema(tableConfig); + // set doris target database + databendSchema.setDatabase(database); + databendSchema.setTable(dorisTable); + if (tableBucketsMap != null) { + setTableSchemaBuckets(tableBucketsMap, databendSchema, dorisTable, bucketsTable); + } + databendSystem.createTable(databendSchema); + } + if (!dorisTables.contains(dorisTable)) { + dorisTables.add(dorisTable); + } + } + if (createTableOnly) { + System.out.println("Create table finished."); + System.exit(0); + } + config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables)); + DataStreamSource streamSource = buildCdcSource(env); + if (singleSink) { + streamSource.sinkTo(buildDorisSink()); + } else { + SingleOutputStreamOperator parsedStream = + streamSource.process(new ParsingProcessFunction(converter)); + for (String table : dorisTables) { + OutputTag recordOutputTag = + ParsingProcessFunction.createRecordOutputTag(table); + DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag); + int sinkParallel = + sinkConfig.getInteger( + DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism()); + sideOutput + .sinkTo(buildDorisSink(table)) + .setParallelism(sinkParallel) + .name(table) + .uid(table); + } + } + } + + private DatabendConnectionOptions getDatabendConnectionOptions() { + String user = sinkConfig.getString(DatabendConfigOptions.USERNAME); + String passwd = sinkConfig.getString(DatabendConfigOptions.PASSWORD, "databend"); + String jdbcUrl = sinkConfig.getString(DatabendConfigOptions.URL); + String database = sinkConfig.getString(DatabendConfigOptions.DATABASE_NAME); + Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf"); + DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions(jdbcUrl, user, passwd, database, "", new Properties()); + return databendConnectionOptions; + } + + /** + * create doris sink for multi table. + */ + public DorisSink buildDorisSink() { + return buildDorisSink(null); + } + + /** + * create doris sink. + */ + public DorisSink buildDorisSink(String table) { + String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES); + String benodes = sinkConfig.getString(DorisConfigOptions.BENODES); + String user = sinkConfig.getString(DorisConfigOptions.USERNAME); + String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, ""); + + DorisSink.Builder builder = DorisSink.builder(); + DorisOptions.Builder dorisBuilder = DorisOptions.builder(); + dorisBuilder.setFenodes(fenodes).setBenodes(benodes).setUsername(user).setPassword(passwd); + sinkConfig + .getOptional(DorisConfigOptions.AUTO_REDIRECT) + .ifPresent(dorisBuilder::setAutoRedirect); + + // single sink not need table identifier + if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)) { + dorisBuilder.setTableIdentifier(database + "." + table); + } + + Properties pro = new Properties(); + // default json data format + pro.setProperty("format", "json"); + pro.setProperty("read_json_by_line", "true"); + // customer stream load properties + Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap()); + pro.putAll(streamLoadProp); + DorisExecutionOptions.Builder executionBuilder = + DorisExecutionOptions.builder().setStreamLoadProp(pro); + + sinkConfig + .getOptional(DorisConfigOptions.SINK_LABEL_PREFIX) + .ifPresent(executionBuilder::setLabelPrefix); + sinkConfig + .getOptional(DorisConfigOptions.SINK_ENABLE_DELETE) + .ifPresent(executionBuilder::setDeletable); + sinkConfig + .getOptional(DorisConfigOptions.SINK_BUFFER_COUNT) + .ifPresent(executionBuilder::setBufferCount); + sinkConfig + .getOptional(DorisConfigOptions.SINK_BUFFER_SIZE) + .ifPresent(executionBuilder::setBufferSize); + sinkConfig + .getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL) + .ifPresent(executionBuilder::setCheckInterval); + sinkConfig + .getOptional(DorisConfigOptions.SINK_MAX_RETRIES) + .ifPresent(executionBuilder::setMaxRetries); + sinkConfig + .getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE) + .ifPresent(executionBuilder::setIgnoreUpdateBefore); + + if (!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)) { + executionBuilder.disable2PC(); + } else if (sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()) { + // force open 2pc + executionBuilder.enable2PC(); + } + + sinkConfig + .getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE) + .ifPresent(executionBuilder::setBatchMode); + sinkConfig + .getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE) + .ifPresent(executionBuilder::setFlushQueueSize); + sinkConfig + .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS) + .ifPresent(executionBuilder::setBufferFlushMaxRows); + sinkConfig + .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES) + .ifPresent(executionBuilder::setBufferFlushMaxBytes); + sinkConfig + .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL) + .ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis())); + + sinkConfig + .getOptional(DorisConfigOptions.SINK_USE_CACHE) + .ifPresent(executionBuilder::setUseCache); + + DorisExecutionOptions executionOptions = executionBuilder.build(); + builder.setDorisReadOptions(DorisReadOptions.builder().build()) + .setDorisExecutionOptions(executionOptions) + .setSerializer( + JsonDebeziumSchemaSerializer.builder() + .setDorisOptions(dorisBuilder.build()) + .setNewSchemaChange(newSchemaChange) + .setExecutionOptions(executionOptions) + .setTableMapping(tableMapping) + .setTableProperties(tableConfig) + .setTargetDatabase(database) + .setTargetTablePrefix(tablePrefix) + .setTargetTableSuffix(tableSuffix) + .build()) + .setDorisOptions(dorisBuilder.build()); + return builder.build(); + } + + /** + * Filter table that need to be synchronized. + */ + protected boolean isSyncNeeded(String tableName) { + boolean sync = true; + if (includingPattern != null) { + sync = includingPattern.matcher(tableName).matches(); + } + if (excludingPattern != null) { + sync = sync && !excludingPattern.matcher(tableName).matches(); + } + LOG.debug("table {} is synchronized? {}", tableName, sync); + return sync; + } + + protected String getSyncTableList(List syncTables) { + if (!singleSink) { + return syncTables.stream() + .map(v -> getTableListPrefix() + "\\." + v) + .collect(Collectors.joining("|")); + } else { + // includingTablePattern and ^excludingPattern + if (includingTables == null) { + includingTables = ".*"; + } + String includingPattern = + String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables); + if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) { + return includingPattern; + } else { + String excludingPattern = + String.format("?!(%s\\.(%s))$", getTableListPrefix(), excludingTables); + return String.format("(%s)(%s)", includingPattern, excludingPattern); + } + } + } + + /** + * Filter table that many tables merge to one. + */ + protected HashMap multiToOneRulesParser( + String multiToOneOrigin, String multiToOneTarget) { + if (StringUtils.isNullOrWhitespaceOnly(multiToOneOrigin) + || StringUtils.isNullOrWhitespaceOnly(multiToOneTarget)) { + return null; + } + HashMap multiToOneRulesPattern = new HashMap<>(); + String[] origins = multiToOneOrigin.split("\\|"); + String[] targets = multiToOneTarget.split("\\|"); + if (origins.length != targets.length) { + System.out.println( + "param error : multi to one params length are not equal,please check your params."); + System.exit(1); + } + try { + for (int i = 0; i < origins.length; i++) { + multiToOneRulesPattern.put(Pattern.compile(origins[i]), targets[i]); + } + } catch (Exception e) { + System.out.println("param error : Your regular expression is incorrect,please check."); + System.exit(1); + } + return multiToOneRulesPattern; + } + + /** + * Get table buckets Map. + * + * @param tableBuckets the string of tableBuckets, eg:student:10,student_info:20,student.*:30 + * @return The table name and buckets map. The key is table name, the value is buckets. + */ + public Map getTableBuckets(String tableBuckets) { + Map tableBucketsMap = new LinkedHashMap<>(); + String[] tableBucketsArray = tableBuckets.split(","); + for (String tableBucket : tableBucketsArray) { + String[] tableBucketArray = tableBucket.split(":"); + tableBucketsMap.put( + tableBucketArray[0].trim(), Integer.parseInt(tableBucketArray[1].trim())); + } + return tableBucketsMap; + } + + /** + * Set table schema buckets. + * + * @param tableBucketsMap The table name and buckets map. The key is table name, the value is + * buckets. + * @param dorisSchema @{TableSchema} + * @param dorisTable the table name need to set buckets + * @param tableHasSet The buckets table is set + */ + public void setTableSchemaBuckets( + Map tableBucketsMap, + TableSchema dorisSchema, + String dorisTable, + Set tableHasSet) { + + if (tableBucketsMap != null) { + // Firstly, if the table name is in the table-buckets map, set the buckets of the table. + if (tableBucketsMap.containsKey(dorisTable)) { + dorisSchema.setTableBuckets(tableBucketsMap.get(dorisTable)); + tableHasSet.add(dorisTable); + return; + } + // Secondly, iterate over the map to find a corresponding regular expression match, + for (Map.Entry entry : tableBucketsMap.entrySet()) { + if (tableHasSet.contains(entry.getKey())) { + continue; + } + + Pattern pattern = Pattern.compile(entry.getKey()); + if (pattern.matcher(dorisTable).matches()) { + dorisSchema.setTableBuckets(entry.getValue()); + tableHasSet.add(dorisTable); + return; + } + } + } + } + + public DatabaseSync setEnv(StreamExecutionEnvironment env) { + this.env = env; + return this; + } + + public DatabaseSync setConfig(Configuration config) { + this.config = config; + return this; + } + + public DatabaseSync setDatabase(String database) { + this.database = database; + return this; + } + + public DatabaseSync setIncludingTables(String includingTables) { + this.includingTables = includingTables; + return this; + } + + public DatabaseSync setExcludingTables(String excludingTables) { + this.excludingTables = excludingTables; + return this; + } + + public DatabaseSync setMultiToOneOrigin(String multiToOneOrigin) { + this.multiToOneOrigin = multiToOneOrigin; + return this; + } + + public DatabaseSync setMultiToOneTarget(String multiToOneTarget) { + this.multiToOneTarget = multiToOneTarget; + return this; + } + + public DatabaseSync setTableConfig(Map tableConfig) { + if (!CollectionUtil.isNullOrEmpty(tableConfig)) { + this.tableConfig = tableConfig; + } + return this; + } + + public DatabaseSync setSinkConfig(Configuration sinkConfig) { + this.sinkConfig = sinkConfig; + return this; + } + + public DatabaseSync setIgnoreDefaultValue(boolean ignoreDefaultValue) { + this.ignoreDefaultValue = ignoreDefaultValue; + return this; + } + + public DatabaseSync setCreateTableOnly(boolean createTableOnly) { + this.createTableOnly = createTableOnly; + return this; + } + + public DatabaseSync setNewSchemaChange(boolean newSchemaChange) { + this.newSchemaChange = newSchemaChange; + return this; + } + + public DatabaseSync setSingleSink(boolean singleSink) { + this.singleSink = singleSink; + return this; + } + + public DatabaseSync setTablePrefix(String tablePrefix) { + this.tablePrefix = tablePrefix; + return this; + } + + public DatabaseSync setTableSuffix(String tableSuffix) { + this.tableSuffix = tableSuffix; + return this; + } + + public static class TableNameConverter implements Serializable { + private static final long serialVersionUID = 1L; + private final String prefix; + private final String suffix; + private Map multiToOneRulesPattern; + + TableNameConverter() { + this("", ""); + } + + TableNameConverter(String prefix, String suffix) { + this.prefix = prefix == null ? "" : prefix; + this.suffix = suffix == null ? "" : suffix; + } + + TableNameConverter( + String prefix, String suffix, Map multiToOneRulesPattern) { + this.prefix = prefix == null ? "" : prefix; + this.suffix = suffix == null ? "" : suffix; + this.multiToOneRulesPattern = multiToOneRulesPattern; + } + + public String convert(String tableName) { + if (multiToOneRulesPattern == null) { + return prefix + tableName + suffix; + } + + String target = null; + + for (Map.Entry patternStringEntry : + multiToOneRulesPattern.entrySet()) { + if (patternStringEntry.getKey().matcher(tableName).matches()) { + target = patternStringEntry.getValue(); + } + } + /** + * If multiToOneRulesPattern is not null and target is not assigned, then the + * synchronization task contains both multi to one and one to one , prefixes and + * suffixes are added to common one-to-one mapping tables + */ + if (target == null) { + return prefix + tableName + suffix; + } + return target; + } + } +} diff --git a/src/main/java/org/apache/flink/connector/databend/tools/cdc/SourceSchema.java b/src/main/java/org/apache/flink/connector/databend/tools/cdc/SourceSchema.java new file mode 100644 index 0000000..33509a5 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/tools/cdc/SourceSchema.java @@ -0,0 +1,139 @@ +package org.apache.flink.connector.databend.tools.cdc; + +import org.apache.flink.connector.databend.catalog.databend.DataModel; +import org.apache.flink.connector.databend.catalog.databend.FieldSchema; +import org.apache.flink.connector.databend.catalog.databend.TableSchema; +import org.apache.flink.util.StringUtils; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.*; + +public abstract class SourceSchema { + private final String databaseName; + private final String schemaName; + private final String tableName; + private final String tableComment; + private final LinkedHashMap fields; + public final List primaryKeys; + public DataModel model = DataModel.UNIQUE; + + public SourceSchema( + DatabaseMetaData metaData, + String databaseName, + String schemaName, + String tableName, + String tableComment) + throws Exception { + this.databaseName = databaseName; + this.schemaName = schemaName; + this.tableName = tableName; + this.tableComment = tableComment; + + fields = new LinkedHashMap<>(); + try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) { + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + String comment = rs.getString("REMARKS"); + String fieldType = rs.getString("TYPE_NAME"); + String defaultValue = rs.getString("COLUMN_DEF"); + Integer precision = rs.getInt("COLUMN_SIZE"); + if (rs.wasNull()) { + precision = null; + } + + Integer scale = rs.getInt("DECIMAL_DIGITS"); + if (rs.wasNull()) { + scale = null; + } + String dorisTypeStr = convertToDorisType(fieldType, precision, scale); + fields.put( + fieldName, new FieldSchema(fieldName, dorisTypeStr, defaultValue, comment)); + } + } + + primaryKeys = new ArrayList<>(); + try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) { + while (rs.next()) { + String fieldName = rs.getString("COLUMN_NAME"); + primaryKeys.add(fieldName); + } + } + } + + public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale); + + public String getTableIdentifier() { + return getString(databaseName, schemaName, tableName); + } + + public static String getString(String databaseName, String schemaName, String tableName) { + StringJoiner identifier = new StringJoiner("."); + if (!StringUtils.isNullOrWhitespaceOnly(databaseName)) { + identifier.add(databaseName); + } + if (!StringUtils.isNullOrWhitespaceOnly(schemaName)) { + identifier.add(schemaName); + } + + if (!StringUtils.isNullOrWhitespaceOnly(tableName)) { + identifier.add(tableName); + } + + return identifier.toString(); + } + + public TableSchema convertTableSchema(Map tableProps) { + TableSchema tableSchema = new TableSchema(); + tableSchema.setModel(this.model); + tableSchema.setFields(this.fields); + tableSchema.setKeys(buildKeys()); + tableSchema.setTableComment(this.tableComment); + tableSchema.setProperties(tableProps); + return tableSchema; + } + + private List buildKeys() { + return buildDistributeKeys(); + } + + private List buildDistributeKeys() { + if (!this.primaryKeys.isEmpty()) { + return primaryKeys; + } + if (!this.fields.isEmpty()) { + Map.Entry firstField = this.fields.entrySet().iterator().next(); + return Collections.singletonList(firstField.getKey()); + } + return new ArrayList<>(); + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public Map getFields() { + return fields; + } + + public List getPrimaryKeys() { + return primaryKeys; + } + + public String getTableComment() { + return tableComment; + } + + public DataModel getModel() { + return model; + } + + public void setModel(DataModel model) { + this.model = model; + } +} + diff --git a/src/main/java/org/apache/flink/connector/databend/tools/cdc/mysql/MySQLDatabaseSync.java b/src/main/java/org/apache/flink/connector/databend/tools/cdc/mysql/MySQLDatabaseSync.java new file mode 100644 index 0000000..828ee47 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/tools/cdc/mysql/MySQLDatabaseSync.java @@ -0,0 +1,4 @@ +package org.apache.flink.connector.databend.tools.cdc.mysql; + +public class MySQLDatabaseSync { +} diff --git a/src/main/java/org/apache/flink/connector/databend/writer/serializer/DatabendRecord.java b/src/main/java/org/apache/flink/connector/databend/writer/serializer/DatabendRecord.java new file mode 100644 index 0000000..f98a664 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/databend/writer/serializer/DatabendRecord.java @@ -0,0 +1,71 @@ +package org.apache.flink.connector.databend.writer.serializer; + +import java.io.Serializable; + +public class DatabendRecord implements Serializable { + public static DatabendRecord empty = new DatabendRecord(); + + private String database; + private String table; + private byte[] row; + + public DatabendRecord() { + } + + public DatabendRecord(String database, String table, byte[] row) { + this.database = database; + this.table = table; + this.row = row; + } + + public String getTableIdentifier() { + if (database == null || table == null) { + return null; + } + return database + "." + table; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public byte[] getRow() { + return row; + } + + public void setRow(byte[] row) { + this.row = row; + } + + public static DatabendRecord of(String database, String table, byte[] row) { + return new DatabendRecord(database, table, row); + } + + public static DatabendRecord of(String tableIdentifier, byte[] row) { + if (tableIdentifier != null) { + String[] dbTbl = tableIdentifier.split("\\."); + if (dbTbl.length == 2) { + String database = dbTbl[0]; + String table = dbTbl[1]; + return new DatabendRecord(database, table, row); + } + } + return null; + } + + public static DatabendRecord of(byte[] row) { + return new DatabendRecord(null, null, row); + } +} \ No newline at end of file