Skip to content

Commit

Permalink
fixed issue #626, support xa rollback/commit event
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Jun 6, 2018
1 parent 1f16310 commit b734a32
Show file tree
Hide file tree
Showing 5 changed files with 4,860 additions and 4,176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
Expand Down Expand Up @@ -199,6 +200,7 @@ protected void printEntry(List<Entry> entrys) {
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime) });
logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
printXAInfo(begin.getPropsList());
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
try {
Expand All @@ -209,6 +211,7 @@ protected void printEntry(List<Entry> entrys) {
// 打印事务提交信息,事务id
logger.info("----------------\n");
logger.info(" END ----> transaction id: {}", end.getTransactionId());
printXAInfo(end.getPropsList());
logger.info(transaction_format,
new Object[] { entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
Expand Down Expand Up @@ -241,6 +244,7 @@ protected void printEntry(List<Entry> entrys) {
continue;
}

printXAInfo(rowChage.getPropsList());
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
Expand All @@ -267,6 +271,27 @@ protected void printColumn(List<Column> columns) {
}
}

protected void printXAInfo(List<Pair> pairs) {
if (pairs == null) {
return;
}

String xaType = null;
String xaXid = null;
for (Pair pair : pairs) {
String key = pair.getKey();
if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
xaType = pair.getValue();
} else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
xaXid = pair.getValue();
}
}

if (xaType != null && xaXid != null) {
logger.info(" ------> " + xaType + " " + xaXid);
}
}

public void setConnector(CanalConnector connector) {
this.connector = connector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
*/
public class LogEventConvert extends AbstractCanalLifeCycle implements BinlogParser<LogEvent> {

public static final String XA_XID = "XA_XID";
public static final String XA_TYPE = "XA_TYPE";
public static final String XA_START = "XA START";
public static final String XA_END = "XA END";
public static final String XA_COMMIT = "XA COMMIT";
public static final String XA_ROLLBACK = "XA ROLLBACK";
public static final String ISO_8859_1 = "ISO-8859-1";
public static final String UTF_8 = "UTF-8";
public static final int TINYINT_MAX_VALUE = 256;
Expand Down Expand Up @@ -174,7 +180,43 @@ private Entry parseGTIDLogEvent(GtidLogEvent logEvent) {

private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
String queryString = event.getQuery();
if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
if (StringUtils.startsWithIgnoreCase(queryString, XA_START)) {
// xa start use TransactionBegin
TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
beginBuilder.setThreadId(event.getSessionId());
beginBuilder.addProps(createSpecialPair(XA_TYPE, XA_START));
beginBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_START)));
TransactionBegin transactionBegin = beginBuilder.build();
Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_END)) {
// xa start use TransactionEnd
TransactionEnd.Builder endBuilder = TransactionEnd.newBuilder();
endBuilder.setTransactionId(String.valueOf(0L));
endBuilder.addProps(createSpecialPair(XA_TYPE, XA_END));
endBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_END)));
TransactionEnd transactionEnd = endBuilder.build();
Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_COMMIT)) {
// xa commit
Header header = createHeader(binlogFileName, event.getHeader(), "", "", EventType.XACOMMIT);
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setSql(queryString);
rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_COMMIT));
rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_COMMIT)));
rowChangeBuider.setEventType(EventType.XACOMMIT);
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_ROLLBACK)) {
// xa rollback
Header header = createHeader(binlogFileName, event.getHeader(), "", "", EventType.XAROLLBACK);
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setSql(queryString);
rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_ROLLBACK));
rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_ROLLBACK)));
rowChangeBuider.setEventType(EventType.XAROLLBACK);
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
} else if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
TransactionBegin transactionBegin = createTransactionBegin(event.getSessionId());
Header header = createHeader(binlogFileName, event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
Expand Down Expand Up @@ -237,6 +279,10 @@ private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
}
}

private String getXaXid(String queryString, String type) {
return StringUtils.substringAfter(queryString, type);
}

private boolean processFilter(String queryString, DdlResult result) {
String schemaName = result.getSchemaName();
String tableName = result.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.nio.charset.Charset;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -16,6 +17,7 @@
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.position.EntryPosition;
Expand All @@ -27,8 +29,8 @@ public class MysqlDumpTest {
@Test
public void testSimple() {
final MysqlEventParser controller = new MysqlEventParser();
final EntryPosition startPosition = new EntryPosition("mysql-bin.000010", 154L, 100L);
startPosition.setGtid("f1ceb61a-a5d5-11e7-bdee-107c3dbcf8a7:1-17");
final EntryPosition startPosition = new EntryPosition("mysql-bin.000012", 34051L, 100L);
// startPosition.setGtid("f1ceb61a-a5d5-11e7-bdee-107c3dbcf8a7:1-17");
controller.setConnectionCharset(Charset.forName("UTF-8"));
controller.setSlaveId(3344L);
controller.setDetectingEnable(false);
Expand All @@ -39,7 +41,7 @@ public void testSimple() {
controller.setTsdbSpringXml("classpath:tsdb/h2-tsdb.xml");
controller.setEventFilter(new AviaterRegexFilter("test\\..*"));
controller.setEventBlackFilter(new AviaterRegexFilter("canal_tsdb\\..*"));
controller.setIsGTIDMode(true);
controller.setIsGTIDMode(false);
controller.setEventSink(new AbstractCanalEventSinkTest<List<Entry>>() {

public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String destination)
Expand Down Expand Up @@ -73,6 +75,7 @@ public boolean sink(List<Entry> entrys, InetSocketAddress remoteAddress, String
System.out.println(" sql ----> " + rowChage.getSql());
}

printXAInfo(rowChage.getPropsList());
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
print(rowData.getBeforeColumnsList());
Expand Down Expand Up @@ -119,4 +122,25 @@ private void print(List<Column> columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}

private void printXAInfo(List<Pair> pairs) {
if (pairs == null) {
return;
}

String xaType = null;
String xaXid = null;
for (Pair pair : pairs) {
String key = pair.getKey();
if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
xaType = pair.getValue();
} else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
xaXid = pair.getValue();
}
}

if (xaType != null && xaXid != null) {
System.out.println(" ------> " + xaType + " " + xaXid);
}
}
}
Loading

0 comments on commit b734a32

Please sign in to comment.