diff --git a/pom.xml b/pom.xml
index 49ff3d3..7f8c018 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
UTF-8
1.8
1.8
-
+ 2.4.2
8.14
4.13.2
2.12
@@ -177,6 +177,30 @@
${flink.version}
${flink.scope}
+
+ com.ververica
+ flink-sql-connector-mysql-cdc
+ ${flink.sql.cdc.version}
+ provided
+
+
+ flink-shaded-guava
+ org.apache.flink
+
+
+
+
+ com.ververica
+ flink-sql-connector-oracle-cdc
+ ${flink.sql.cdc.version}
+ provided
+
+
+ flink-shaded-guava
+ org.apache.flink
+
+
+
diff --git a/src/main/java/org/apache/flink/connector/databend/DatabendDynamicTableFactory.java b/src/main/java/org/apache/flink/connector/databend/DatabendDynamicTableFactory.java
index 6c74ad5..4fc86b3 100644
--- a/src/main/java/org/apache/flink/connector/databend/DatabendDynamicTableFactory.java
+++ b/src/main/java/org/apache/flink/connector/databend/DatabendDynamicTableFactory.java
@@ -46,9 +46,9 @@ public DynamicTableSink createDynamicTableSink(Context context) {
Properties databendProperties =
getDatabendProperties(context.getCatalogTable().getOptions());
return new DatabendDynamicTableSink(
- getDmlOptions(config),
+ getDmlOptions(config, databendProperties),
databendProperties,
- getDmlOptions(config).getPrimaryKeys(),
+ getDmlOptions(config, databendProperties).getPrimaryKeys(),
catalogTable.getPartitionKeys().toArray(new String[0]),
context.getPhysicalRowDataType());
}
@@ -63,7 +63,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
Properties databendProperties =
getDatabendProperties(context.getCatalogTable().getOptions());
return new DatabendDynamicTableSource(
- getReadOptions(config), databendProperties, context.getPhysicalRowDataType());
+ getReadOptions(config, databendProperties), databendProperties, context.getPhysicalRowDataType());
}
@Override
@@ -102,7 +102,7 @@ private void validateConfigOptions(ReadableConfig config) {
}
}
- public DatabendDmlOptions getDmlOptions(ReadableConfig config) {
+ public DatabendDmlOptions getDmlOptions(ReadableConfig config, Properties databendProperties) {
return new DatabendDmlOptions.Builder()
.withUrl(config.get(URL))
.withUsername(config.get(USERNAME))
@@ -116,16 +116,18 @@ public DatabendDmlOptions getDmlOptions(ReadableConfig config) {
.withPrimaryKey(config.get(SINK_PRIMARY_KEYS).toArray(new String[0]))
.withIgnoreDelete(config.get(SINK_IGNORE_DELETE))
.withParallelism(config.get(SINK_PARALLELISM))
+ .withConnectionProperties(databendProperties)
.build();
}
- private DatabendReadOptions getReadOptions(ReadableConfig config) {
+ private DatabendReadOptions getReadOptions(ReadableConfig config, Properties databendProperties) {
return new DatabendReadOptions.Builder()
.withUrl(config.get(URL))
.withUsername(config.get(USERNAME))
.withPassword(config.get(PASSWORD))
.withDatabaseName(config.get(DATABASE_NAME))
.withTableName(config.get(TABLE_NAME))
+ .withConnectionProperties(databendProperties)
.build();
}
}
diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DataModel.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DataModel.java
new file mode 100644
index 0000000..f273d1d
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DataModel.java
@@ -0,0 +1,7 @@
+package org.apache.flink.connector.databend.catalog.databend;
+
+public enum DataModel {
+ DUPLICATE,
+ UNIQUE,
+ AGGREGATE
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java
new file mode 100644
index 0000000..859596f
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendSystem.java
@@ -0,0 +1,183 @@
+package org.apache.flink.connector.databend.catalog.databend;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.connector.databend.exception.DatabendRuntimeException;
+import org.apache.flink.connector.databend.exception.DatabendSystemException;
+import org.apache.flink.connector.databend.internal.connection.DatabendConnectionProvider;
+import org.apache.flink.connector.databend.internal.options.DatabendConnectionOptions;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Databend System Operate.
+ */
+@Public
+public class DatabendSystem implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(DatabendSystem.class);
+ private final DatabendConnectionProvider jdbcConnectionProvider;
+ private static final List builtinDatabases =
+ Collections.singletonList("information_schema");
+
+ public DatabendSystem(DatabendConnectionOptions options) {
+ this.jdbcConnectionProvider = new DatabendConnectionProvider(options, options.getConnectionProperties());
+ }
+
+ public List listDatabases() {
+ return extractColumnValuesBySQL(
+ "select schema_name from information_schema.schemata;;",
+ 1,
+ dbName -> !builtinDatabases.contains(dbName));
+ }
+
+ public boolean databaseExists(String database) {
+ checkArgument(!StringUtils.isNullOrWhitespaceOnly(database));
+ return listDatabases().contains(database);
+ }
+
+ private static List identifier(List name) {
+ List result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
+ return result;
+ }
+
+ public boolean createDatabase(String database) {
+ execute(String.format("CREATE DATABASE IF NOT EXISTS %s", database));
+ return true;
+ }
+
+ public void execute(String sql) {
+ try (Statement statement =
+ jdbcConnectionProvider.getOrCreateConnection().createStatement()) {
+ statement.execute(sql);
+ } catch (Exception e) {
+ throw new DatabendSystemException(
+ String.format("SQL query could not be executed: %s", sql), e);
+ }
+ }
+
+ public boolean tableExists(String database, String table) {
+ return databaseExists(database) && listTables(database).contains(table);
+ }
+
+ public List listTables(String databaseName) {
+ if (!databaseExists(databaseName)) {
+ throw new DatabendRuntimeException("database" + databaseName + " is not exists");
+ }
+ return extractColumnValuesBySQL(
+ "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+ 1,
+ null,
+ databaseName);
+ }
+
+ public void createTable(TableSchema schema) {
+ String ddl = buildCreateTableDDL(schema);
+ LOG.info("Create table with ddl:{}", ddl);
+ execute(ddl);
+ }
+
+ public static String buildCreateTableDDL(TableSchema schema) {
+ StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
+ sb.append(identifier(schema.getDatabase()))
+ .append(".")
+ .append(identifier(schema.getTable()))
+ .append("(");
+
+ Map fields = schema.getFields();
+
+ // append values
+ for (Map.Entry entry : fields.entrySet()) {
+ FieldSchema field = entry.getValue();
+ buildColumn(sb, field, false);
+ }
+ sb = sb.deleteCharAt(sb.length() - 1);
+ sb.append(" ) ");
+
+ // append table comment
+ if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
+ sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' ");
+ }
+
+
+ return sb.toString();
+ }
+
+ private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
+ String fieldType = field.getTypeString();
+ if (isKey && DatabendType.STRING.equals(fieldType)) {
+ fieldType = String.format("%s(%s)", DatabendType.VARCHAR, 65533);
+ }
+ sql.append(identifier(field.getName())).append(" ").append(fieldType);
+
+ if (field.getDefaultValue() != null) {
+ sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
+ }
+ sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',");
+ }
+
+ public static String quoteComment(String comment) {
+ if (comment == null) {
+ return "";
+ } else {
+ return comment.replaceAll("'", "\\\\'");
+ }
+ }
+
+ public static String quoteDefaultValue(String defaultValue) {
+ // DEFAULT current_timestamp not need quote
+ if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+ return defaultValue;
+ }
+ return "'" + defaultValue + "'";
+ }
+
+
+ public List extractColumnValuesBySQL(
+ String sql, int columnIndex, Predicate filterFunc, Object... params) {
+
+ List columnValues = Lists.newArrayList();
+ try (PreparedStatement ps =
+ jdbcConnectionProvider.getOrCreateConnection().prepareStatement(sql)) {
+ if (Objects.nonNull(params) && params.length > 0) {
+ for (int i = 0; i < params.length; i++) {
+ ps.setObject(i + 1, params[i]);
+ }
+ }
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ String columnValue = rs.getString(columnIndex);
+ if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
+ columnValues.add(columnValue);
+ }
+ }
+ return columnValues;
+ } catch (Exception e) {
+ throw new DatabendSystemException(
+ String.format("The following SQL query could not be executed: %s", sql), e);
+ }
+ }
+
+ private static String identifier(String name) {
+ return "`" + name + "`";
+ }
+
+
+ private String quoteProperties(String name) {
+ return "'" + name + "'";
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendType.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendType.java
new file mode 100644
index 0000000..c4a613c
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/DatabendType.java
@@ -0,0 +1,20 @@
+package org.apache.flink.connector.databend.catalog.databend;
+
+public class DatabendType {
+ public static final String BOOLEAN = "BOOLEAN";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String BIGINT = "BIGINT";
+ public static final String FLOAT = "FLOAT";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String DECIMAL = "DECIMAL";
+ public static final String DATE = "DATE";
+ public static final String DATETIME = "DATETIME";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String STRING = "STRING";
+ public static final String BITMAP = "BITMAP";
+ public static final String ARRAY = "ARRAY";
+ public static final String JSON = "JSON";
+ public static final String MAP = "MAP";
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java
new file mode 100644
index 0000000..f601f8f
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/FieldSchema.java
@@ -0,0 +1,49 @@
+package org.apache.flink.connector.databend.catalog.databend;
+
+public class FieldSchema {
+ private String name;
+ private String typeString;
+ private String comment;
+
+ private String defaultValue;
+
+ public FieldSchema() {
+ }
+
+ public FieldSchema(String name, String typeString, String comment, String defaultValue) {
+ this.name = name;
+ this.typeString = typeString;
+ this.defaultValue = defaultValue;
+ this.comment = comment;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getTypeString() {
+ return typeString;
+ }
+
+ public void setTypeString(String typeString) {
+ this.typeString = typeString;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public void setComment(String comment) {
+ this.comment = comment;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+}
+
+
diff --git a/src/main/java/org/apache/flink/connector/databend/catalog/databend/TableSchema.java b/src/main/java/org/apache/flink/connector/databend/catalog/databend/TableSchema.java
new file mode 100644
index 0000000..090e032
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/catalog/databend/TableSchema.java
@@ -0,0 +1,75 @@
+package org.apache.flink.connector.databend.catalog.databend;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TableSchema {
+ private String database;
+ private String table;
+ private String tableComment;
+ private Map fields;
+ private List keys = new ArrayList<>();
+ private DataModel model = DataModel.DUPLICATE;
+ private Map properties = new HashMap<>();
+
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getTableComment() {
+ return tableComment;
+ }
+
+ public Map getFields() {
+ return fields;
+ }
+
+ public List getKeys() {
+ return keys;
+ }
+
+ public DataModel getModel() {
+ return model;
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public void setTableComment(String tableComment) {
+ this.tableComment = tableComment;
+ }
+
+ public void setFields(Map fields) {
+ this.fields = fields;
+ }
+
+ public void setKeys(List keys) {
+ this.keys = keys;
+ }
+
+ public void setModel(DataModel model) {
+ this.model = model;
+ }
+
+
+ public void setProperties(Map properties) {
+ this.properties = properties;
+ }
+
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/exception/CreateTableException.java b/src/main/java/org/apache/flink/connector/databend/exception/CreateTableException.java
new file mode 100644
index 0000000..38c2ae7
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/exception/CreateTableException.java
@@ -0,0 +1,30 @@
+package org.apache.flink.connector.databend.exception;
+
+/**
+ * Create Table exception.
+ */
+public class CreateTableException extends RuntimeException {
+ public CreateTableException() {
+ super();
+ }
+
+ public CreateTableException(String message) {
+ super(message);
+ }
+
+ public CreateTableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CreateTableException(Throwable cause) {
+ super(cause);
+ }
+
+ protected CreateTableException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/exception/DatabendRuntimeException.java b/src/main/java/org/apache/flink/connector/databend/exception/DatabendRuntimeException.java
new file mode 100644
index 0000000..092fe98
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/exception/DatabendRuntimeException.java
@@ -0,0 +1,27 @@
+package org.apache.flink.connector.databend.exception;
+
+public class DatabendRuntimeException extends RuntimeException {
+ public DatabendRuntimeException() {
+ super();
+ }
+
+ public DatabendRuntimeException(String message) {
+ super(message);
+ }
+
+ public DatabendRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DatabendRuntimeException(Throwable cause) {
+ super(cause);
+ }
+
+ protected DatabendRuntimeException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/exception/DatabendSystemException.java b/src/main/java/org/apache/flink/connector/databend/exception/DatabendSystemException.java
new file mode 100644
index 0000000..17f2c35
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/exception/DatabendSystemException.java
@@ -0,0 +1,27 @@
+package org.apache.flink.connector.databend.exception;
+
+public class DatabendSystemException extends RuntimeException {
+ public DatabendSystemException() {
+ super();
+ }
+
+ public DatabendSystemException(String message) {
+ super(message);
+ }
+
+ public DatabendSystemException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DatabendSystemException(Throwable cause) {
+ super(cause);
+ }
+
+ protected DatabendSystemException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendConnectionOptions.java b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendConnectionOptions.java
index fd211c5..f6626ec 100644
--- a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendConnectionOptions.java
+++ b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendConnectionOptions.java
@@ -1,8 +1,9 @@
package org.apache.flink.connector.databend.internal.options;
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Optional;
-import javax.annotation.Nullable;
+import java.util.Properties;
/**
* Databend connection options.
@@ -21,13 +22,16 @@ public class DatabendConnectionOptions implements Serializable {
private final String tableName;
+ private final Properties connectionProperties;
+
public DatabendConnectionOptions(
- String url, @Nullable String username, @Nullable String password, String databaseName, String tableName) {
+ String url, @Nullable String username, @Nullable String password, String databaseName, String tableName, Properties connectionProperties) {
this.url = url;
this.username = username;
this.password = password;
this.databaseName = databaseName;
this.tableName = tableName;
+ this.connectionProperties = connectionProperties;
}
public String getUrl() {
@@ -49,4 +53,8 @@ public String getDatabaseName() {
public String getTableName() {
return this.tableName;
}
+
+ public Properties getConnectionProperties() {
+ return this.connectionProperties;
+ }
}
diff --git a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendDmlOptions.java b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendDmlOptions.java
index 527859a..210bd5e 100644
--- a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendDmlOptions.java
+++ b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendDmlOptions.java
@@ -4,6 +4,7 @@
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.Properties;
/**
* Databend data modify language options.
@@ -24,6 +25,7 @@ public class DatabendDmlOptions extends DatabendConnectionOptions {
private final boolean ignoreDelete;
private final Integer parallelism;
+ private Properties connectionProperties;
public DatabendDmlOptions(
String url,
@@ -31,6 +33,7 @@ public DatabendDmlOptions(
@Nullable String password,
String databaseName,
String tableName,
+ Properties connectionProperties,
int batchSize,
Duration flushInterval,
int maxRetires,
@@ -38,7 +41,7 @@ public DatabendDmlOptions(
String[] primaryKeys,
boolean ignoreDelete,
Integer parallelism) {
- super(url, username, password, databaseName, tableName);
+ super(url, username, password, databaseName, tableName, connectionProperties);
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.maxRetries = maxRetires;
@@ -64,7 +67,7 @@ public SinkUpdateStrategy getUpdateStrategy() {
return updateStrategy;
}
- public String[] getPrimaryKeys(){
+ public String[] getPrimaryKeys() {
return primaryKeys;
}
@@ -76,6 +79,10 @@ public Integer getParallelism() {
return parallelism;
}
+ public Properties getConnectionProperties() {
+ return this.connectionProperties;
+ }
+
/**
* Builder for {@link DatabendDmlOptions}.
*/
@@ -92,6 +99,7 @@ public static class Builder {
private boolean ignoreDelete;
private String[] primaryKey;
private Integer parallelism;
+ private Properties connectionProperties;
public Builder() {
}
@@ -156,6 +164,11 @@ public DatabendDmlOptions.Builder withParallelism(Integer parallelism) {
return this;
}
+ public DatabendDmlOptions.Builder withConnectionProperties(Properties connectionProperties) {
+ this.connectionProperties = connectionProperties;
+ return this;
+ }
+
public DatabendDmlOptions build() {
return new DatabendDmlOptions(
url,
@@ -163,6 +176,7 @@ public DatabendDmlOptions build() {
password,
databaseName,
tableName,
+ connectionProperties,
batchSize,
flushInterval,
maxRetries,
diff --git a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendReadOptions.java b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendReadOptions.java
index 4f0eeb4..948f7e1 100644
--- a/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendReadOptions.java
+++ b/src/main/java/org/apache/flink/connector/databend/internal/options/DatabendReadOptions.java
@@ -1,46 +1,19 @@
package org.apache.flink.connector.databend.internal.options;
import javax.annotation.Nullable;
+import java.util.Properties;
public class DatabendReadOptions extends DatabendConnectionOptions {
private static final long serialVersionUID = 1L;
- private final String partitionColumn;
- private final Integer partitionNum;
- private final Long partitionLowerBound;
- private final Long partitionUpperBound;
-
private DatabendReadOptions(
String url,
@Nullable String username,
@Nullable String password,
String databaseName,
String tableName,
- String partitionColumn,
- Integer partitionNum,
- Long partitionLowerBound,
- Long partitionUpperBound) {
- super(url, username, password, databaseName, tableName);
- this.partitionColumn = partitionColumn;
- this.partitionNum = partitionNum;
- this.partitionLowerBound = partitionLowerBound;
- this.partitionUpperBound = partitionUpperBound;
- }
-
- public String getPartitionColumn() {
- return partitionColumn;
- }
-
- public Integer getPartitionNum() {
- return partitionNum;
- }
-
- public Long getPartitionLowerBound() {
- return partitionLowerBound;
- }
-
- public Long getPartitionUpperBound() {
- return partitionUpperBound;
+ Properties connectionProperties) {
+ super(url, username, password, databaseName, tableName, connectionProperties);
}
/**
@@ -52,10 +25,7 @@ public static class Builder {
private String password;
private String databaseName;
private String tableName;
- private String partitionColumn;
- private Integer partitionNum;
- private Long partitionLowerBound;
- private Long partitionUpperBound;
+ private Properties connectionProperties;
public DatabendReadOptions.Builder withUrl(String url) {
this.url = url;
@@ -82,23 +52,8 @@ public DatabendReadOptions.Builder withTableName(String tableName) {
return this;
}
- public DatabendReadOptions.Builder withPartitionColumn(String partitionColumn) {
- this.partitionColumn = partitionColumn;
- return this;
- }
-
- public DatabendReadOptions.Builder withPartitionNum(Integer partitionNum) {
- this.partitionNum = partitionNum;
- return this;
- }
-
- public Builder withPartitionLowerBound(Long partitionLowerBound) {
- this.partitionLowerBound = partitionLowerBound;
- return this;
- }
-
- public Builder withPartitionUpperBound(Long partitionUpperBound) {
- this.partitionUpperBound = partitionUpperBound;
+ public DatabendReadOptions.Builder withConnectionProperties(Properties connectionProperties) {
+ this.connectionProperties = connectionProperties;
return this;
}
@@ -109,10 +64,7 @@ public DatabendReadOptions build() {
password,
databaseName,
tableName,
- partitionColumn,
- partitionNum,
- partitionLowerBound,
- partitionUpperBound);
+ connectionProperties);
}
}
}
diff --git a/src/main/java/org/apache/flink/connector/databend/tools/cdc/CdcTools.java b/src/main/java/org/apache/flink/connector/databend/tools/cdc/CdcTools.java
new file mode 100644
index 0000000..cde9ac1
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/tools/cdc/CdcTools.java
@@ -0,0 +1,81 @@
+package org.apache.flink.connector.databend.tools.cdc;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+/**
+ * cdc sync tools
+ */
+public class CdcTools {
+ private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
+ private static final List EMPTY_KEYS = Arrays.asList("password");
+
+ public static void main(String[] args) throws Exception {
+ String operation = args[0].toLowerCase();
+ String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
+ System.out.println();
+ switch (operation) {
+ case MYSQL_SYNC_DATABASE:
+ createMySQLSyncDatabase(opArgs);
+ break;
+ default:
+ System.out.println("Unknown operation " + operation);
+ System.exit(1);
+ }
+ }
+
+ private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
+ String jobName = params.get("job-name");
+ String database = params.get("database");
+ String tablePrefix = params.get("table-prefix");
+ String tableSuffix = params.get("table-suffix");
+ String includingTables = params.get("including-tables");
+ String excludingTables = params.get("excluding-tables");
+
+ Map mysqlMap = getConfigMap(params, "mysql-conf");
+ Map sinkMap = getConfigMap(params, "sink-conf");
+ Map tableMap = getConfigMap(params, "table-conf");
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
+ Configuration sinkConfig = Configuration.fromMap(sinkMap);
+
+ DatabaseSync databaseSync = new MysqlDatabaseSync();
+ databaseSync.create(env, database, mysqlConfig, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap);
+ databaseSync.build();
+
+ if(StringUtils.isNullOrWhitespaceOnly(jobName)){
+ jobName = String.format("MySQL-Databend Sync Database: %s", mysqlMap.get("database-name"));
+ }
+ env.execute(jobName);
+ }
+
+ private static Map getConfigMap(MultipleParameterTool params, String key) {
+ if (!params.has(key)) {
+ return null;
+ }
+
+ Map map = new HashMap<>();
+ for (String param : params.getMultiParameter(key)) {
+ String[] kv = param.split("=");
+ if (kv.length == 2) {
+ map.put(kv[0], kv[1]);
+ continue;
+ }else if(kv.length == 1 && EMPTY_KEYS.contains(kv[0])){
+ map.put(kv[0], "");
+ continue;
+ }
+
+ System.err.println(
+ "Invalid " + key + " " + param + ".\n");
+ return null;
+ }
+ return map;
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/tools/cdc/DatabaseSync.java b/src/main/java/org/apache/flink/connector/databend/tools/cdc/DatabaseSync.java
new file mode 100644
index 0000000..934a5e1
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/tools/cdc/DatabaseSync.java
@@ -0,0 +1,508 @@
+package org.apache.flink.connector.databend.tools.cdc;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.databend.catalog.databend.DatabendSystem;
+import org.apache.flink.connector.databend.catalog.databend.TableSchema;
+import org.apache.flink.connector.databend.config.DatabendConfigOptions;
+import org.apache.flink.connector.databend.internal.options.DatabendConnectionOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.*;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public abstract class DatabaseSync {
+ private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
+ private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
+ private static final String TABLE_NAME_OPTIONS = "table-name";
+
+ protected Configuration config;
+
+ protected String database;
+
+ protected TableNameConverter converter;
+ protected Pattern includingPattern;
+ protected Pattern excludingPattern;
+ protected Map multiToOneRulesPattern;
+ protected Map tableConfig = new HashMap<>();
+ protected Configuration sinkConfig;
+ protected boolean ignoreDefaultValue;
+
+ public StreamExecutionEnvironment env;
+ private boolean createTableOnly = false;
+ private boolean newSchemaChange;
+ protected String includingTables;
+ protected String excludingTables;
+ protected String multiToOneOrigin;
+ protected String multiToOneTarget;
+ protected String tablePrefix;
+ protected String tableSuffix;
+ protected boolean singleSink;
+ private final Map tableMapping = new HashMap<>();
+
+ public abstract void registerDriver() throws SQLException;
+
+ public abstract Connection getConnection() throws SQLException;
+
+ public abstract List getSchemaList() throws Exception;
+
+ public abstract DataStreamSource buildCdcSource(StreamExecutionEnvironment env);
+
+ /**
+ * Get the prefix of a specific tableList, for example, mysql is database, oracle is schema.
+ */
+ public abstract String getTableListPrefix();
+
+ protected DatabaseSync() throws SQLException {
+ registerDriver();
+ }
+
+ public void create() {
+ this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables);
+ this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
+ this.multiToOneRulesPattern = multiToOneRulesParser(multiToOneOrigin, multiToOneTarget);
+ this.converter = new TableNameConverter(tablePrefix, tableSuffix, multiToOneRulesPattern);
+ // default enable light schema change
+ if (!this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
+ this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
+ }
+ }
+
+ public void build() throws Exception {
+ DatabendConnectionOptions options = getDatabendConnectionOptions();
+ DatabendSystem databendSystem = new DatabendSystem(options);
+
+ List schemaList = getSchemaList();
+ Preconditions.checkState(!schemaList.isEmpty(), "No tables to be synchronized.");
+ if (!databendSystem.databaseExists(database)) {
+ LOG.info("database {} not exist, created", database);
+ databendSystem.createDatabase(database);
+ }
+
+ List syncTables = new ArrayList<>();
+ List dorisTables = new ArrayList<>();
+
+ Map tableBucketsMap = null;
+ if (tableConfig.containsKey("table-buckets")) {
+ tableBucketsMap = getTableBuckets(tableConfig.get("table-buckets"));
+ }
+ Set bucketsTable = new HashSet<>();
+ for (SourceSchema schema : schemaList) {
+ syncTables.add(schema.getTableName());
+ String dorisTable = converter.convert(schema.getTableName());
+
+ // Calculate the mapping relationship between upstream and downstream tables
+ tableMapping.put(
+ schema.getTableIdentifier(), String.format("%s.%s", database, dorisTable));
+ if (!databendSystem.tableExists(database, dorisTable)) {
+ TableSchema databendSchema = schema.convertTableSchema(tableConfig);
+ // set doris target database
+ databendSchema.setDatabase(database);
+ databendSchema.setTable(dorisTable);
+ if (tableBucketsMap != null) {
+ setTableSchemaBuckets(tableBucketsMap, databendSchema, dorisTable, bucketsTable);
+ }
+ databendSystem.createTable(databendSchema);
+ }
+ if (!dorisTables.contains(dorisTable)) {
+ dorisTables.add(dorisTable);
+ }
+ }
+ if (createTableOnly) {
+ System.out.println("Create table finished.");
+ System.exit(0);
+ }
+ config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
+ DataStreamSource streamSource = buildCdcSource(env);
+ if (singleSink) {
+ streamSource.sinkTo(buildDorisSink());
+ } else {
+ SingleOutputStreamOperator parsedStream =
+ streamSource.process(new ParsingProcessFunction(converter));
+ for (String table : dorisTables) {
+ OutputTag recordOutputTag =
+ ParsingProcessFunction.createRecordOutputTag(table);
+ DataStream sideOutput = parsedStream.getSideOutput(recordOutputTag);
+ int sinkParallel =
+ sinkConfig.getInteger(
+ DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism());
+ sideOutput
+ .sinkTo(buildDorisSink(table))
+ .setParallelism(sinkParallel)
+ .name(table)
+ .uid(table);
+ }
+ }
+ }
+
+ private DatabendConnectionOptions getDatabendConnectionOptions() {
+ String user = sinkConfig.getString(DatabendConfigOptions.USERNAME);
+ String passwd = sinkConfig.getString(DatabendConfigOptions.PASSWORD, "databend");
+ String jdbcUrl = sinkConfig.getString(DatabendConfigOptions.URL);
+ String database = sinkConfig.getString(DatabendConfigOptions.DATABASE_NAME);
+ Preconditions.checkNotNull(jdbcUrl, "jdbcurl is empty in sink-conf");
+ DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions(jdbcUrl, user, passwd, database, "", new Properties());
+ return databendConnectionOptions;
+ }
+
+ /**
+ * create doris sink for multi table.
+ */
+ public DorisSink buildDorisSink() {
+ return buildDorisSink(null);
+ }
+
+ /**
+ * create doris sink.
+ */
+ public DorisSink buildDorisSink(String table) {
+ String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
+ String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
+ String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
+ String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
+
+ DorisSink.Builder builder = DorisSink.builder();
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes(fenodes).setBenodes(benodes).setUsername(user).setPassword(passwd);
+ sinkConfig
+ .getOptional(DorisConfigOptions.AUTO_REDIRECT)
+ .ifPresent(dorisBuilder::setAutoRedirect);
+
+ // single sink not need table identifier
+ if (!singleSink && !StringUtils.isNullOrWhitespaceOnly(table)) {
+ dorisBuilder.setTableIdentifier(database + "." + table);
+ }
+
+ Properties pro = new Properties();
+ // default json data format
+ pro.setProperty("format", "json");
+ pro.setProperty("read_json_by_line", "true");
+ // customer stream load properties
+ Properties streamLoadProp = DorisConfigOptions.getStreamLoadProp(sinkConfig.toMap());
+ pro.putAll(streamLoadProp);
+ DorisExecutionOptions.Builder executionBuilder =
+ DorisExecutionOptions.builder().setStreamLoadProp(pro);
+
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_LABEL_PREFIX)
+ .ifPresent(executionBuilder::setLabelPrefix);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_ENABLE_DELETE)
+ .ifPresent(executionBuilder::setDeletable);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_BUFFER_COUNT)
+ .ifPresent(executionBuilder::setBufferCount);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_BUFFER_SIZE)
+ .ifPresent(executionBuilder::setBufferSize);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL)
+ .ifPresent(executionBuilder::setCheckInterval);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_MAX_RETRIES)
+ .ifPresent(executionBuilder::setMaxRetries);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE)
+ .ifPresent(executionBuilder::setIgnoreUpdateBefore);
+
+ if (!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)) {
+ executionBuilder.disable2PC();
+ } else if (sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()) {
+ // force open 2pc
+ executionBuilder.enable2PC();
+ }
+
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)
+ .ifPresent(executionBuilder::setBatchMode);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE)
+ .ifPresent(executionBuilder::setFlushQueueSize);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS)
+ .ifPresent(executionBuilder::setBufferFlushMaxRows);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES)
+ .ifPresent(executionBuilder::setBufferFlushMaxBytes);
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL)
+ .ifPresent(v -> executionBuilder.setBufferFlushIntervalMs(v.toMillis()));
+
+ sinkConfig
+ .getOptional(DorisConfigOptions.SINK_USE_CACHE)
+ .ifPresent(executionBuilder::setUseCache);
+
+ DorisExecutionOptions executionOptions = executionBuilder.build();
+ builder.setDorisReadOptions(DorisReadOptions.builder().build())
+ .setDorisExecutionOptions(executionOptions)
+ .setSerializer(
+ JsonDebeziumSchemaSerializer.builder()
+ .setDorisOptions(dorisBuilder.build())
+ .setNewSchemaChange(newSchemaChange)
+ .setExecutionOptions(executionOptions)
+ .setTableMapping(tableMapping)
+ .setTableProperties(tableConfig)
+ .setTargetDatabase(database)
+ .setTargetTablePrefix(tablePrefix)
+ .setTargetTableSuffix(tableSuffix)
+ .build())
+ .setDorisOptions(dorisBuilder.build());
+ return builder.build();
+ }
+
+ /**
+ * Filter table that need to be synchronized.
+ */
+ protected boolean isSyncNeeded(String tableName) {
+ boolean sync = true;
+ if (includingPattern != null) {
+ sync = includingPattern.matcher(tableName).matches();
+ }
+ if (excludingPattern != null) {
+ sync = sync && !excludingPattern.matcher(tableName).matches();
+ }
+ LOG.debug("table {} is synchronized? {}", tableName, sync);
+ return sync;
+ }
+
+ protected String getSyncTableList(List syncTables) {
+ if (!singleSink) {
+ return syncTables.stream()
+ .map(v -> getTableListPrefix() + "\\." + v)
+ .collect(Collectors.joining("|"));
+ } else {
+ // includingTablePattern and ^excludingPattern
+ if (includingTables == null) {
+ includingTables = ".*";
+ }
+ String includingPattern =
+ String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables);
+ if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
+ return includingPattern;
+ } else {
+ String excludingPattern =
+ String.format("?!(%s\\.(%s))$", getTableListPrefix(), excludingTables);
+ return String.format("(%s)(%s)", includingPattern, excludingPattern);
+ }
+ }
+ }
+
+ /**
+ * Filter table that many tables merge to one.
+ */
+ protected HashMap multiToOneRulesParser(
+ String multiToOneOrigin, String multiToOneTarget) {
+ if (StringUtils.isNullOrWhitespaceOnly(multiToOneOrigin)
+ || StringUtils.isNullOrWhitespaceOnly(multiToOneTarget)) {
+ return null;
+ }
+ HashMap multiToOneRulesPattern = new HashMap<>();
+ String[] origins = multiToOneOrigin.split("\\|");
+ String[] targets = multiToOneTarget.split("\\|");
+ if (origins.length != targets.length) {
+ System.out.println(
+ "param error : multi to one params length are not equal,please check your params.");
+ System.exit(1);
+ }
+ try {
+ for (int i = 0; i < origins.length; i++) {
+ multiToOneRulesPattern.put(Pattern.compile(origins[i]), targets[i]);
+ }
+ } catch (Exception e) {
+ System.out.println("param error : Your regular expression is incorrect,please check.");
+ System.exit(1);
+ }
+ return multiToOneRulesPattern;
+ }
+
+ /**
+ * Get table buckets Map.
+ *
+ * @param tableBuckets the string of tableBuckets, eg:student:10,student_info:20,student.*:30
+ * @return The table name and buckets map. The key is table name, the value is buckets.
+ */
+ public Map getTableBuckets(String tableBuckets) {
+ Map tableBucketsMap = new LinkedHashMap<>();
+ String[] tableBucketsArray = tableBuckets.split(",");
+ for (String tableBucket : tableBucketsArray) {
+ String[] tableBucketArray = tableBucket.split(":");
+ tableBucketsMap.put(
+ tableBucketArray[0].trim(), Integer.parseInt(tableBucketArray[1].trim()));
+ }
+ return tableBucketsMap;
+ }
+
+ /**
+ * Set table schema buckets.
+ *
+ * @param tableBucketsMap The table name and buckets map. The key is table name, the value is
+ * buckets.
+ * @param dorisSchema @{TableSchema}
+ * @param dorisTable the table name need to set buckets
+ * @param tableHasSet The buckets table is set
+ */
+ public void setTableSchemaBuckets(
+ Map tableBucketsMap,
+ TableSchema dorisSchema,
+ String dorisTable,
+ Set tableHasSet) {
+
+ if (tableBucketsMap != null) {
+ // Firstly, if the table name is in the table-buckets map, set the buckets of the table.
+ if (tableBucketsMap.containsKey(dorisTable)) {
+ dorisSchema.setTableBuckets(tableBucketsMap.get(dorisTable));
+ tableHasSet.add(dorisTable);
+ return;
+ }
+ // Secondly, iterate over the map to find a corresponding regular expression match,
+ for (Map.Entry entry : tableBucketsMap.entrySet()) {
+ if (tableHasSet.contains(entry.getKey())) {
+ continue;
+ }
+
+ Pattern pattern = Pattern.compile(entry.getKey());
+ if (pattern.matcher(dorisTable).matches()) {
+ dorisSchema.setTableBuckets(entry.getValue());
+ tableHasSet.add(dorisTable);
+ return;
+ }
+ }
+ }
+ }
+
+ public DatabaseSync setEnv(StreamExecutionEnvironment env) {
+ this.env = env;
+ return this;
+ }
+
+ public DatabaseSync setConfig(Configuration config) {
+ this.config = config;
+ return this;
+ }
+
+ public DatabaseSync setDatabase(String database) {
+ this.database = database;
+ return this;
+ }
+
+ public DatabaseSync setIncludingTables(String includingTables) {
+ this.includingTables = includingTables;
+ return this;
+ }
+
+ public DatabaseSync setExcludingTables(String excludingTables) {
+ this.excludingTables = excludingTables;
+ return this;
+ }
+
+ public DatabaseSync setMultiToOneOrigin(String multiToOneOrigin) {
+ this.multiToOneOrigin = multiToOneOrigin;
+ return this;
+ }
+
+ public DatabaseSync setMultiToOneTarget(String multiToOneTarget) {
+ this.multiToOneTarget = multiToOneTarget;
+ return this;
+ }
+
+ public DatabaseSync setTableConfig(Map tableConfig) {
+ if (!CollectionUtil.isNullOrEmpty(tableConfig)) {
+ this.tableConfig = tableConfig;
+ }
+ return this;
+ }
+
+ public DatabaseSync setSinkConfig(Configuration sinkConfig) {
+ this.sinkConfig = sinkConfig;
+ return this;
+ }
+
+ public DatabaseSync setIgnoreDefaultValue(boolean ignoreDefaultValue) {
+ this.ignoreDefaultValue = ignoreDefaultValue;
+ return this;
+ }
+
+ public DatabaseSync setCreateTableOnly(boolean createTableOnly) {
+ this.createTableOnly = createTableOnly;
+ return this;
+ }
+
+ public DatabaseSync setNewSchemaChange(boolean newSchemaChange) {
+ this.newSchemaChange = newSchemaChange;
+ return this;
+ }
+
+ public DatabaseSync setSingleSink(boolean singleSink) {
+ this.singleSink = singleSink;
+ return this;
+ }
+
+ public DatabaseSync setTablePrefix(String tablePrefix) {
+ this.tablePrefix = tablePrefix;
+ return this;
+ }
+
+ public DatabaseSync setTableSuffix(String tableSuffix) {
+ this.tableSuffix = tableSuffix;
+ return this;
+ }
+
+ public static class TableNameConverter implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String prefix;
+ private final String suffix;
+ private Map multiToOneRulesPattern;
+
+ TableNameConverter() {
+ this("", "");
+ }
+
+ TableNameConverter(String prefix, String suffix) {
+ this.prefix = prefix == null ? "" : prefix;
+ this.suffix = suffix == null ? "" : suffix;
+ }
+
+ TableNameConverter(
+ String prefix, String suffix, Map multiToOneRulesPattern) {
+ this.prefix = prefix == null ? "" : prefix;
+ this.suffix = suffix == null ? "" : suffix;
+ this.multiToOneRulesPattern = multiToOneRulesPattern;
+ }
+
+ public String convert(String tableName) {
+ if (multiToOneRulesPattern == null) {
+ return prefix + tableName + suffix;
+ }
+
+ String target = null;
+
+ for (Map.Entry patternStringEntry :
+ multiToOneRulesPattern.entrySet()) {
+ if (patternStringEntry.getKey().matcher(tableName).matches()) {
+ target = patternStringEntry.getValue();
+ }
+ }
+ /**
+ * If multiToOneRulesPattern is not null and target is not assigned, then the
+ * synchronization task contains both multi to one and one to one , prefixes and
+ * suffixes are added to common one-to-one mapping tables
+ */
+ if (target == null) {
+ return prefix + tableName + suffix;
+ }
+ return target;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/tools/cdc/SourceSchema.java b/src/main/java/org/apache/flink/connector/databend/tools/cdc/SourceSchema.java
new file mode 100644
index 0000000..33509a5
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/tools/cdc/SourceSchema.java
@@ -0,0 +1,139 @@
+package org.apache.flink.connector.databend.tools.cdc;
+
+import org.apache.flink.connector.databend.catalog.databend.DataModel;
+import org.apache.flink.connector.databend.catalog.databend.FieldSchema;
+import org.apache.flink.connector.databend.catalog.databend.TableSchema;
+import org.apache.flink.util.StringUtils;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.*;
+
+public abstract class SourceSchema {
+ private final String databaseName;
+ private final String schemaName;
+ private final String tableName;
+ private final String tableComment;
+ private final LinkedHashMap fields;
+ public final List primaryKeys;
+ public DataModel model = DataModel.UNIQUE;
+
+ public SourceSchema(
+ DatabaseMetaData metaData,
+ String databaseName,
+ String schemaName,
+ String tableName,
+ String tableComment)
+ throws Exception {
+ this.databaseName = databaseName;
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ this.tableComment = tableComment;
+
+ fields = new LinkedHashMap<>();
+ try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
+ while (rs.next()) {
+ String fieldName = rs.getString("COLUMN_NAME");
+ String comment = rs.getString("REMARKS");
+ String fieldType = rs.getString("TYPE_NAME");
+ String defaultValue = rs.getString("COLUMN_DEF");
+ Integer precision = rs.getInt("COLUMN_SIZE");
+ if (rs.wasNull()) {
+ precision = null;
+ }
+
+ Integer scale = rs.getInt("DECIMAL_DIGITS");
+ if (rs.wasNull()) {
+ scale = null;
+ }
+ String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
+ fields.put(
+ fieldName, new FieldSchema(fieldName, dorisTypeStr, defaultValue, comment));
+ }
+ }
+
+ primaryKeys = new ArrayList<>();
+ try (ResultSet rs = metaData.getPrimaryKeys(databaseName, schemaName, tableName)) {
+ while (rs.next()) {
+ String fieldName = rs.getString("COLUMN_NAME");
+ primaryKeys.add(fieldName);
+ }
+ }
+ }
+
+ public abstract String convertToDorisType(String fieldType, Integer precision, Integer scale);
+
+ public String getTableIdentifier() {
+ return getString(databaseName, schemaName, tableName);
+ }
+
+ public static String getString(String databaseName, String schemaName, String tableName) {
+ StringJoiner identifier = new StringJoiner(".");
+ if (!StringUtils.isNullOrWhitespaceOnly(databaseName)) {
+ identifier.add(databaseName);
+ }
+ if (!StringUtils.isNullOrWhitespaceOnly(schemaName)) {
+ identifier.add(schemaName);
+ }
+
+ if (!StringUtils.isNullOrWhitespaceOnly(tableName)) {
+ identifier.add(tableName);
+ }
+
+ return identifier.toString();
+ }
+
+ public TableSchema convertTableSchema(Map tableProps) {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setModel(this.model);
+ tableSchema.setFields(this.fields);
+ tableSchema.setKeys(buildKeys());
+ tableSchema.setTableComment(this.tableComment);
+ tableSchema.setProperties(tableProps);
+ return tableSchema;
+ }
+
+ private List buildKeys() {
+ return buildDistributeKeys();
+ }
+
+ private List buildDistributeKeys() {
+ if (!this.primaryKeys.isEmpty()) {
+ return primaryKeys;
+ }
+ if (!this.fields.isEmpty()) {
+ Map.Entry firstField = this.fields.entrySet().iterator().next();
+ return Collections.singletonList(firstField.getKey());
+ }
+ return new ArrayList<>();
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public Map getFields() {
+ return fields;
+ }
+
+ public List getPrimaryKeys() {
+ return primaryKeys;
+ }
+
+ public String getTableComment() {
+ return tableComment;
+ }
+
+ public DataModel getModel() {
+ return model;
+ }
+
+ public void setModel(DataModel model) {
+ this.model = model;
+ }
+}
+
diff --git a/src/main/java/org/apache/flink/connector/databend/tools/cdc/mysql/MySQLDatabaseSync.java b/src/main/java/org/apache/flink/connector/databend/tools/cdc/mysql/MySQLDatabaseSync.java
new file mode 100644
index 0000000..828ee47
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/tools/cdc/mysql/MySQLDatabaseSync.java
@@ -0,0 +1,4 @@
+package org.apache.flink.connector.databend.tools.cdc.mysql;
+
+public class MySQLDatabaseSync {
+}
diff --git a/src/main/java/org/apache/flink/connector/databend/writer/serializer/DatabendRecord.java b/src/main/java/org/apache/flink/connector/databend/writer/serializer/DatabendRecord.java
new file mode 100644
index 0000000..f98a664
--- /dev/null
+++ b/src/main/java/org/apache/flink/connector/databend/writer/serializer/DatabendRecord.java
@@ -0,0 +1,71 @@
+package org.apache.flink.connector.databend.writer.serializer;
+
+import java.io.Serializable;
+
+public class DatabendRecord implements Serializable {
+ public static DatabendRecord empty = new DatabendRecord();
+
+ private String database;
+ private String table;
+ private byte[] row;
+
+ public DatabendRecord() {
+ }
+
+ public DatabendRecord(String database, String table, byte[] row) {
+ this.database = database;
+ this.table = table;
+ this.row = row;
+ }
+
+ public String getTableIdentifier() {
+ if (database == null || table == null) {
+ return null;
+ }
+ return database + "." + table;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public byte[] getRow() {
+ return row;
+ }
+
+ public void setRow(byte[] row) {
+ this.row = row;
+ }
+
+ public static DatabendRecord of(String database, String table, byte[] row) {
+ return new DatabendRecord(database, table, row);
+ }
+
+ public static DatabendRecord of(String tableIdentifier, byte[] row) {
+ if (tableIdentifier != null) {
+ String[] dbTbl = tableIdentifier.split("\\.");
+ if (dbTbl.length == 2) {
+ String database = dbTbl[0];
+ String table = dbTbl[1];
+ return new DatabendRecord(database, table, row);
+ }
+ }
+ return null;
+ }
+
+ public static DatabendRecord of(byte[] row) {
+ return new DatabendRecord(null, null, row);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/flink/TestDatabendConnectionProvider.java b/src/test/java/org/apache/flink/TestDatabendConnectionProvider.java
index 0dc4cab..f9fef2f 100644
--- a/src/test/java/org/apache/flink/TestDatabendConnectionProvider.java
+++ b/src/test/java/org/apache/flink/TestDatabendConnectionProvider.java
@@ -24,7 +24,7 @@ public void TestCreateConnection() throws SQLException {
m.put("properties.table-name", "test");
Properties properties = DatabendUtil.getDatabendProperties(m);
DatabendConnectionOptions databendConnectionOptions =
- new DatabendConnectionOptions("databend://localhost:8000", "databend", "databend", "default", "test");
+ new DatabendConnectionOptions("databend://localhost:8000", "databend", "databend", "default", "test", properties);
DatabendConnectionProvider databendConnectionProvider =
new DatabendConnectionProvider(databendConnectionOptions, properties);
diff --git a/src/test/java/org/apache/flink/TestDatabendDynamicTableSink.java b/src/test/java/org/apache/flink/TestDatabendDynamicTableSink.java
index b22f8a5..d16374f 100644
--- a/src/test/java/org/apache/flink/TestDatabendDynamicTableSink.java
+++ b/src/test/java/org/apache/flink/TestDatabendDynamicTableSink.java
@@ -38,6 +38,7 @@ public void testConstructor() {
"databend",
"test_output_format",
"test",
+ connectionProperties,
1,
Duration.ofSeconds(100),
3,
@@ -77,6 +78,7 @@ public void testGetChangelogMode() {
"databend",
"test_output_format",
"test",
+ connectionProperties,
1,
Duration.ofSeconds(100),
3,
diff --git a/src/test/java/org/apache/flink/TestDatabendOutputFormat.java b/src/test/java/org/apache/flink/TestDatabendOutputFormat.java
index 74d9868..2c8b8ed 100644
--- a/src/test/java/org/apache/flink/TestDatabendOutputFormat.java
+++ b/src/test/java/org/apache/flink/TestDatabendOutputFormat.java
@@ -69,7 +69,7 @@ public void TestAbstractDatabendOutput() throws SQLException, IOException {
m.put("properties.table-name", "test");
Properties properties = DatabendUtil.getDatabendProperties(m);
DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions(
- "databend://0.0.0.0:8000", "databend", "databend", "test_output_format", "test");
+ "databend://0.0.0.0:8000", "databend", "databend", "test_output_format", "test", properties);
DatabendDmlOptions databendDmlOptions = new DatabendDmlOptions(
"databend://0.0.0.0:8000",
@@ -77,6 +77,7 @@ public void TestAbstractDatabendOutput() throws SQLException, IOException {
"databend",
"test_output_format",
"test",
+ properties,
1,
Duration.ofSeconds(100),
3,
diff --git a/src/test/java/org/apache/flink/TestDeleteOutputFormat.java b/src/test/java/org/apache/flink/TestDeleteOutputFormat.java
index 78e2e7b..75d6bbb 100644
--- a/src/test/java/org/apache/flink/TestDeleteOutputFormat.java
+++ b/src/test/java/org/apache/flink/TestDeleteOutputFormat.java
@@ -65,9 +65,9 @@ public void TestAbstractDatabendOutput() throws SQLException, IOException {
m.put("properties.database-name", "test_delete_output");
m.put("properties.table-name", "test");
Properties properties = DatabendUtil.getDatabendProperties(m);
- DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test");
+ DatabendConnectionOptions databendConnectionOptions = new DatabendConnectionOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test", properties);
- DatabendDmlOptions databendDmlOptions = new DatabendDmlOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test", 1, Duration.ofSeconds(100), 3, DatabendConfigOptions.SinkUpdateStrategy.UPDATE, new String[]{}, false, 1);
+ DatabendDmlOptions databendDmlOptions = new DatabendDmlOptions("databend://0.0.0.0:8000", "databend", "databend", "test_delete_output", "test", properties, 1, Duration.ofSeconds(100), 3, DatabendConfigOptions.SinkUpdateStrategy.UPDATE, new String[]{}, false, 1);
String[] fields = {"x", "y", "z"};
String[] primaryKeys = {"x"};
String[] partitionKeys = {"x"};