Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/constant column zy #4562

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions client-adapter/tablestore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@
</exclusions>
</dependency>

<!--mysql依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.30</version>
<scope>test</scope>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static Map<String, MappingConfig> load(Properties envProperties) {
}
try {
config.validate();
config.getDbMapping().init(config);
} catch (Exception e) {
throw new RuntimeException("ERROR Config: " + fileName + " " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.alibaba.otter.canal.client.adapter.tablestore.enums.TablestoreFieldType;
import com.alibaba.otter.canal.client.adapter.tablestore.support.SyncUtil;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSetMetaData;
import java.util.*;
Expand Down Expand Up @@ -154,6 +156,7 @@ public int hashCode() {


public static class DbMapping implements AdapterMapping {
protected Logger logger = LoggerFactory.getLogger(this.getClass());

private String database; // 数据库名或schema名
private String table; // 表名
Expand All @@ -168,6 +171,13 @@ public static class DbMapping implements AdapterMapping {
private int readBatch = 5000;
private int commitBatch = 5000; // etl等批量提交大小

private Map<String, String> constantTargetColumns; //目标字段常量值映射

private Map<String, String> constantTargetColumnsParsed; //目标字段常量值映射解析

private Map<String, ConstantColumnItem> constantColumnItems = new LinkedHashMap<>(); // 转换后的静态常量字段映射列表


private Map<String, ColumnItem> columnItems = new LinkedHashMap<>(); // 转换后的字段映射列表

public String getDatabase() {
Expand Down Expand Up @@ -249,60 +259,98 @@ public void setColumnItems(Map<String, ColumnItem> columnItems) {
this.columnItems = columnItems;
}

public Map<String, String> getConstantTargetColumns() {
return constantTargetColumns;
}

public void setConstantTargetColumns(Map<String, String> constantTargetColumns) {
this.constantTargetColumns = constantTargetColumns;
}

public Map<String, String> getConstantTargetColumnsParsed() {
return constantTargetColumnsParsed;
}

public void setConstantTargetColumnsParsed(Map<String, String> constantTargetColumnsParsed) {
this.constantTargetColumnsParsed = constantTargetColumnsParsed;
}



public static class ConstantColumnItem {
private String targetColumn;
private String column;
private TablestoreFieldType type;

public String getColumn() {
return column;
}

public void setColumn(String column) {
this.column = column;
}

public TablestoreFieldType getType() {
return type;
}

public void setType(TablestoreFieldType type) {
this.type = type;
}

public String getTargetColumn() {
return targetColumn;
}

public void setTargetColumn(String targetColumn) {
this.targetColumn = targetColumn;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ColumnItem that = (ColumnItem) o;
return Objects.equals(column, that.column);
}

@Override
public int hashCode() {
return Objects.hash(column);
}
}



public void init(MappingConfig config) {
logger.info("=========dbMapping begin init.=========");
String splitBy = "$";
if (targetColumns != null) {
boolean needTypeInference = false;
for (Map.Entry<String, String> columnField : targetColumns.entrySet()) {
String field = columnField.getValue();
String type = null;
if (field != null) {
// 解析类型
int i = field.indexOf(splitBy);
if (i > -1) {
type = field.substring(i + 1);
field = field.substring(0, i);
}
}
ColumnItem columnItem = new ColumnItem();
columnItem.setColumn(columnField.getKey());
columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);

TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
if (fieldType == null) {
needTypeInference = true;
}
columnItem.setType(fieldType);
columnItems.put(columnField.getKey(), columnItem);
}
if (needTypeInference) {
// 认为有field没有配置映射类型,需要进行类型推断
DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
try {
ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = rsd.getColumnName(i);
if (columnItems.containsKey(columnName) && columnItems.get(columnName).getType() == null) {
int columnType = rsd.getColumnType(i);
columnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
}
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

} else {
//普通列
if (targetColumns != null) {
parseTargetColumns(config, splitBy);
}
if (targetColumns == null) {
this.targetColumns = new LinkedHashMap<>();
}
targetColumnsParsed = new HashMap<>();

//常量列
if (constantTargetColumns != null) {
parseTargetConstant(config, splitBy);
}
if (constantTargetColumns == null) {
this.constantColumnItems = new LinkedHashMap<>();
}

initParsed(splitBy);

logger.info("=========dbMapping success init.=========");

}


private void initParsed(String splitBy) {
targetColumnsParsed = new HashMap<>();
targetColumns.forEach((key, value) -> {
if (StringUtils.isEmpty(value)) {
targetColumnsParsed.put(key, key);
Expand All @@ -312,6 +360,114 @@ public void init(MappingConfig config) {
targetColumnsParsed.put(key, value);
}
});

constantTargetColumnsParsed = new HashMap<>();
constantTargetColumns.forEach((key, value) -> {
if (StringUtils.isEmpty(value)) {
constantTargetColumnsParsed.put(key, key);
} else if (value.contains(splitBy) && constantColumnItems.containsKey(key)) {
constantTargetColumnsParsed.put(key, constantColumnItems.get(key).targetColumn);
} else {
constantTargetColumnsParsed.put(key, value);
}
});
}


private void parseTargetColumns(MappingConfig config, String splitBy) {
boolean needTypeInference = false;
for (Map.Entry<String, String> columnField : targetColumns.entrySet()) {
String field = columnField.getValue();
String type = null;
if (field != null) {
// 解析类型
int i = field.indexOf(splitBy);
if (i > -1) {
type = field.substring(i + 1);
field = field.substring(0, i);
}
}
ColumnItem columnItem = new ColumnItem();
columnItem.setColumn(columnField.getKey());
columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);

TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
if (fieldType == null) {
needTypeInference = true;
}
columnItem.setType(fieldType);
columnItems.put(columnField.getKey(), columnItem);
}
if (needTypeInference) {
// 认为有field没有配置映射类型,需要进行类型推断
DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
try {
ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = rsd.getColumnName(i);
if (columnItems.containsKey(columnName) && columnItems.get(columnName).getType() == null) {
int columnType = rsd.getColumnType(i);
columnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
}
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

private void parseTargetConstant(MappingConfig config, String splitBy) {
boolean needTypeInference = false;
for (Map.Entry<String, String> columnField : constantTargetColumns.entrySet()) {
String field = columnField.getValue();
String type = null;
if (field != null) {
// 解析类型
int i = field.indexOf(splitBy);
if (i > -1) {
type = field.substring(i + 1);
field = field.substring(0, i);
}
}
ConstantColumnItem columnItem = new ConstantColumnItem();
columnItem.setColumn(columnField.getKey());
columnItem.setTargetColumn(StringUtils.isBlank(field) ? columnField.getKey() : field);

TablestoreFieldType fieldType = SyncUtil.getTablestoreType(type);
if (fieldType == null) {
needTypeInference = true;
}
columnItem.setType(fieldType);
constantColumnItems.put(columnField.getKey(), columnItem);
}
if (needTypeInference) {
// 认为有field没有配置映射类型,需要进行类型推断
DruidDataSource sourceDS = DatasourceConfig.DATA_SOURCES.get(config.getDataSourceKey());

Util.sqlRS(sourceDS, "SELECT * FROM " + SyncUtil.getDbTableName(database, table) + " LIMIT 1 ", rs -> {
try {
ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = rsd.getColumnName(i);
if (constantColumnItems.containsKey(columnName) && constantColumnItems.get(columnName).getType() == null) {
int columnType = rsd.getColumnType(i);
constantColumnItems.get(columnName).setType(SyncUtil.getDefaultTablestoreType(columnType));
}
}
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

}
Expand Down
Loading