Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

自定义规则,只有一个topic,一个表对应一个分区,kafka #5347 #5359

Open
wants to merge 3 commits into
base: canal-1.1.7-hotfix-1
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -10,6 +11,9 @@
import java.util.concurrent.Future;

import com.alibaba.otter.canal.common.utils.PropertiesUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -143,26 +147,48 @@ public void send(MQDestination mqDestination, Message message, Callback callback

try {
List result;
logger.info("self dynamicTopic = " + mqDestination.getDynamicTopic().substring(5));

if (!StringUtils.isEmpty(mqDestination.getDynamicTopic())) {
// 动态topic路由计算,只是基于schema/table,不涉及proto数据反序列化
Map<String, Message> messageMap = MQMessageUtils.messageTopics(message,
mqDestination.getTopic(),
mqDestination.getDynamicTopic());

// 针对不同的topic,引入多线程提升效率
for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
final String topicName = entry.getKey().replace('.', '_');
final Message messageSub = entry.getValue();
template.submit((Callable) () -> {
try {
return send(mqDestination, topicName, messageSub, mqProperties.isFlatMessage());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
if (mqDestination.getDynamicTopic().startsWith("self|")) {
logger.info("self dynamicTopic = " + mqDestination.getDynamicTopic().substring(5));
System.out.println();
// 如果dynamicTopic以self打头,走自定义规则
Map<Integer, Message> messageMap = messageTopicsForPartition(message, mqDestination.getTopic(), mqDestination.getDynamicTopic());

// 针对不同的topic,引入多线程提升效率
for (Map.Entry<Integer, Message> entry : messageMap.entrySet()) {
final Message messageSub = entry.getValue();
template.submit((Callable) () -> {
try {
return sendForCustom(mqDestination.getTopic(), messageSub, mqProperties.isFlatMessage(), entry.getKey());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

result = template.waitForResult();
} else {
// 动态topic路由计算,只是基于schema/table,不涉及proto数据反序列化
Map<String, Message> messageMap = MQMessageUtils.messageTopics(message, mqDestination.getTopic(), mqDestination.getDynamicTopic());

// 针对不同的topic,引入多线程提升效率
for (Map.Entry<String, Message> entry : messageMap.entrySet()) {
final String topicName = entry.getKey().replace('.', '_');
final Message messageSub = entry.getValue();
template.submit((Callable) () -> {
try {
return send(mqDestination, topicName, messageSub, mqProperties.isFlatMessage());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

result = template.waitForResult();
}

result = template.waitForResult();
} else {
result = new ArrayList();
List<Future> futures = send(mqDestination,
Expand Down Expand Up @@ -197,6 +223,24 @@ public void send(MQDestination mqDestination, Message message, Callback callback
}
}

private List<Future> sendForCustom(String topicName, Message message, boolean flat, int partition) {
List<ProducerRecord<String, byte[]>> records = new ArrayList<>();
if (!flat) {
records.add(new ProducerRecord<>(topicName, partition, null, CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry())));
} else {
// 发送扁平数据json
// 并发构造
EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
for (FlatMessage flatMessage : flatMessages) {
records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls)));
}
}

return produce(records);
}

private List<Future> send(MQDestination mqDestination, String topicName, Message message, boolean flat) {
List<ProducerRecord<String, byte[]>> records = new ArrayList<>();
// 获取当前topic的分区数
Expand Down Expand Up @@ -273,4 +317,74 @@ private List<Future> produce(List<ProducerRecord<String, byte[]>> records) {
return futures;
}

/**
* 自定义分配规则 按 schema 或者 schema+table 将 message 分配到对应topic分区
*
* @param message 原message
* @param defaultTopic 默认topic
* @param dynamicTopicConfigs 动态topic规则
* @return 分隔后的message map
*/
public static Map<Integer, Message> messageTopicsForPartition(Message message, String defaultTopic, String dynamicTopicConfigs) {
dynamicTopicConfigs = dynamicTopicConfigs.substring("self|".length());
List<CanalEntry.Entry> entries;
if (message.isRaw()) {
List<ByteString> rawEntries = message.getRawEntries();
entries = new ArrayList<>(rawEntries.size());
for (ByteString byteString : rawEntries) {
CanalEntry.Entry entry;
try {
entry = CanalEntry.Entry.parseFrom(byteString);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
entries.add(entry);
}
} else {
entries = message.getEntries();
}
Map<Integer, Message> messages = new HashMap<>();
Map<String, Integer> partitions = new HashMap<>();
String[] configs = dynamicTopicConfigs.split(",");
if (configs != null) {
for (String config : configs) {
String[] tablePartition = config.split(":");
partitions.put(tablePartition[0], Integer.valueOf(tablePartition[1]));
}
}

for (CanalEntry.Entry entry : entries) {
// 如果有topic路由,则忽略begin/end事件
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}

String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();

logger.info("schemaName and tableName {} {}",schemaName,tableName);

if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) {
logger.info("schemaName or tableName is null {} {} skip",schemaName,tableName);
// System.out.println(entry.getHeader());
// put2MapMessageForPartition(messages, message.getId(), defaultTopic, entry);
} else {

if (partitions.containsKey(schemaName + "." + tableName)) {
Integer partitionId = partitions.get(schemaName + "." + tableName);
Message msg = messages.get(partitionId);
if (msg == null) {
msg = new Message(message.getId(), new ArrayList<>());
messages.put(partitionId, msg);
}
msg.getEntries().add(entry);

}

}
}
return messages;
}


}