From e849fb7e653d703139015cbbdba8e877ec1d9f8e Mon Sep 17 00:00:00 2001 From: bilibili Date: Mon, 9 Dec 2024 17:34:58 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E8=87=AA=E5=AE=9A=E4=B9=89=E8=A7=84?= =?UTF-8?q?=E5=88=99=EF=BC=8C=E5=8F=AA=E6=9C=89=E4=B8=80=E4=B8=AAtopic,?= =?UTF-8?q?=E4=B8=8D=E5=90=8C=E7=9A=84=E8=A1=A8=E5=AF=B9=E5=BA=94=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E7=9A=84=E5=88=86=E5=8C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/producer/CanalKafkaProducer.java | 148 ++++++++++++++++-- 1 file changed, 131 insertions(+), 17 deletions(-) diff --git a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java index 9fdd342f58..317a770f2c 100644 --- a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java +++ b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java @@ -2,6 +2,7 @@ import java.io.File; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -10,6 +11,9 @@ import java.util.concurrent.Future; import com.alibaba.otter.canal.common.utils.PropertiesUtils; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.lang.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -143,26 +147,48 @@ public void send(MQDestination mqDestination, Message message, Callback callback try { List result; + logger.info("self dynamicTopic = " + mqDestination.getDynamicTopic().substring(5)); + if (!StringUtils.isEmpty(mqDestination.getDynamicTopic())) { - // 动态topic路由计算,只是基于schema/table,不涉及proto数据反序列化 - Map messageMap = MQMessageUtils.messageTopics(message, - mqDestination.getTopic(), - mqDestination.getDynamicTopic()); - - // 针对不同的topic,引入多线程提升效率 - for (Map.Entry entry : messageMap.entrySet()) { - final String topicName = entry.getKey().replace('.', '_'); - final Message messageSub = entry.getValue(); - template.submit((Callable) () -> { - try { - return send(mqDestination, topicName, messageSub, mqProperties.isFlatMessage()); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + if (mqDestination.getDynamicTopic().startsWith("self|")) { + logger.info("self dynamicTopic = " + mqDestination.getDynamicTopic().substring(5)); + System.out.println(); + // 如果dynamicTopic以self打头,走自定义规则 + Map messageMap = messageTopicsForPartition(message, mqDestination.getTopic(), mqDestination.getDynamicTopic()); + + // 针对不同的topic,引入多线程提升效率 + for (Map.Entry entry : messageMap.entrySet()) { + final Message messageSub = entry.getValue(); + template.submit((Callable) () -> { + try { + return sendForCustom(mqDestination.getTopic(), messageSub, mqProperties.isFlatMessage(), entry.getKey()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + result = template.waitForResult(); + } else { + // 动态topic路由计算,只是基于schema/table,不涉及proto数据反序列化 + Map messageMap = MQMessageUtils.messageTopics(message, mqDestination.getTopic(), mqDestination.getDynamicTopic()); + + // 针对不同的topic,引入多线程提升效率 + for (Map.Entry entry : messageMap.entrySet()) { + final String topicName = entry.getKey().replace('.', '_'); + final Message messageSub = entry.getValue(); + template.submit((Callable) () -> { + try { + return send(mqDestination, topicName, messageSub, mqProperties.isFlatMessage()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + result = template.waitForResult(); } - result = template.waitForResult(); } else { result = new ArrayList(); List futures = send(mqDestination, @@ -197,6 +223,24 @@ public void send(MQDestination mqDestination, Message message, Callback callback } } + private List sendForCustom(String topicName, Message message, boolean flat, int partition) { + List> records = new ArrayList<>(); + if (!flat) { + records.add(new ProducerRecord<>(topicName, partition, null, CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()))); + } else { + // 发送扁平数据json + // 并发构造 + EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor); + // 串行分区 + List flatMessages = MQMessageUtils.messageConverter(datas, message.getId()); + for (FlatMessage flatMessage : flatMessages) { + records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls))); + } + } + + return produce(records); + } + private List send(MQDestination mqDestination, String topicName, Message message, boolean flat) { List> records = new ArrayList<>(); // 获取当前topic的分区数 @@ -273,4 +317,74 @@ private List produce(List> records) { return futures; } + /** + * 自定义分配规则 按 schema 或者 schema+table 将 message 分配到对应topic + * + * @param message 原message + * @param defaultTopic 默认topic + * @param dynamicTopicConfigs 动态topic规则 + * @return 分隔后的message map + */ + public static Map messageTopicsForPartition(Message message, String defaultTopic, String dynamicTopicConfigs) { + dynamicTopicConfigs = dynamicTopicConfigs.substring("self|".length()); + List entries; + if (message.isRaw()) { + List rawEntries = message.getRawEntries(); + entries = new ArrayList<>(rawEntries.size()); + for (ByteString byteString : rawEntries) { + CanalEntry.Entry entry; + try { + entry = CanalEntry.Entry.parseFrom(byteString); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + entries.add(entry); + } + } else { + entries = message.getEntries(); + } + Map messages = new HashMap<>(); + Map partitions = new HashMap<>(); + String[] configs = dynamicTopicConfigs.split(","); + if (configs != null) { + for (String config : configs) { + String[] tablePartition = config.split(":"); + partitions.put(tablePartition[0], Integer.valueOf(tablePartition[1])); + } + } + + for (CanalEntry.Entry entry : entries) { + // 如果有topic路由,则忽略begin/end事件 + if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { + continue; + } + + String schemaName = entry.getHeader().getSchemaName(); + String tableName = entry.getHeader().getTableName(); + + logger.info("schemaName and tableName {} {}",schemaName,tableName); + + if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) { + logger.info("schemaName or tableName is null {} {} skip",schemaName,tableName); +// System.out.println(entry.getHeader()); +// put2MapMessageForPartition(messages, message.getId(), defaultTopic, entry); + } else { + + if (partitions.containsKey(schemaName + "." + tableName)) { + Integer partitionId = partitions.get(schemaName + "." + tableName); + Message msg = messages.get(partitionId); + if (msg == null) { + msg = new Message(message.getId(), new ArrayList<>()); + messages.put(partitionId, msg); + } + msg.getEntries().add(entry); + + } + + } + } + return messages; + } + + } From 37dab77f9a30866b850a7e541a47a8a320072769 Mon Sep 17 00:00:00 2001 From: bilibili Date: Tue, 17 Dec 2024 17:26:49 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E8=87=AA=E5=AE=9A=E4=B9=89=E8=A7=84?= =?UTF-8?q?=E5=88=99=EF=BC=8C=E5=8F=AA=E6=9C=89=E4=B8=80=E4=B8=AAtopic,?= =?UTF-8?q?=E4=B8=8D=E5=90=8C=E7=9A=84=E8=A1=A8=E5=AF=B9=E5=BA=94=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E7=9A=84=E5=88=86=E5=8C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../canal/connector/kafka/producer/CanalKafkaProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java index 317a770f2c..729172e5c9 100644 --- a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java +++ b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java @@ -318,7 +318,7 @@ private List produce(List> records) { } /** - * 自定义分配规则 按 schema 或者 schema+table 将 message 分配到对应topic + * 自定义分配规则 按 schema 或者 schema+table 将 message 分配到对应topic分区 * * @param message 原message * @param defaultTopic 默认topic