Java源码示例:com.alibaba.otter.canal.protocol.CanalEntry
示例1
protected boolean doFilter(CanalEntry.Entry entry) {
if (filter != null && entry.getEntryType() == EntryType.ROWDATA) {
String name = getSchemaNameAndTableName(entry);
boolean need = filter.filter(name);
if (!need) {
logger.debug("filter name[{}] entry : {}:{}",
name,
entry.getHeader().getLogfileName(),
entry.getHeader().getLogfileOffset());
}
return need;
} else {
return true;
}
}
示例2
/**
* {@link Schema.Table#columns}有值, 则对于UPDATE操作过滤更改的字段是否包含在{@link Schema.Table#columns}
* DELETE, INSERT事件执行条件过滤, 对于UPDATE的过滤不在这做, 比较复杂, 由子类自己实现过滤
*
* @param rowChange 更改的数据
* @return 解析结果
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
protected final List<RowChangedData> changedDataParse(CanalEntry.RowChange rowChange) {
List<RowChangedData> dataList = RowChangedData.build(rowChange, currentTable.getRowDataColumns());
if (CommonsUtils.isEmpty(dataList)) return null;
ConditionContainer columnCondition;
if (currentEventType != CanalEntry.EventType.UPDATE
&& (columnCondition = currentTable.getColumnCondition()) != null) {
//对于INSERT类型的记录更新, 如果条件判断没有通过, 可以认为该更新事件没有发生~~~~
//对于DELETE类型的记录更新, 如果条件判断没有通过, 可以认为该数据删除之前就不关心, 那这次删除我们更不关心了~~~
Iterator<RowChangedData> it = dataList.iterator();
while (it.hasNext()) {
if (!columnCondition.verify(it.next())) {
it.remove();
}
}
}
return dataList;
}
示例3
@Override
public void onEvent(MessageEvent event) throws Exception {
try {
if (event.isNeedDmlParse()) {
int eventType = event.getEvent().getHeader().getType();
CanalEntry.Entry entry = null;
switch (eventType) {
case LogEvent.ROWS_QUERY_LOG_EVENT:
entry = logEventConvert.parse(event.getEvent(), false);
break;
default:
// 单独解析dml事件
entry = logEventConvert.parseRowsEvent((RowsLogEvent) event.getEvent(), event.getTable());
}
event.setEntry(entry);
}
} catch (Throwable e) {
exception = new CanalParseException(e);
throw exception;
}
}
示例4
public AbstractEventParser(){
// 初始化一下
transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {
public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
if (!running) {
return;
}
if (!successed) {
throw new CanalParseException("consume failed!");
}
LogPosition position = buildLastTransactionPosition(transaction);
if (position != null) { // 可能position为空
logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
}
}
});
}
示例5
protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
InterruptedException {
long startTs = -1;
boolean enabled = getProfilingEnabled();
if (enabled) {
startTs = System.currentTimeMillis();
}
boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);
if (enabled) {
this.processingInterval = System.currentTimeMillis() - startTs;
}
if (consumedEventCount.incrementAndGet() < 0) {
consumedEventCount.set(0);
}
return result;
}
示例6
protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod, boolean isSeek) throws Exception {
long startTs = -1;
boolean enabled = getProfilingEnabled();
if (enabled) {
startTs = System.currentTimeMillis();
}
CanalEntry.Entry event = binlogParser.parse(bod, isSeek);
if (enabled) {
this.parsingInterval = System.currentTimeMillis() - startTs;
}
if (parsedEventCount.incrementAndGet() < 0) {
parsedEventCount.set(0);
}
return event;
}
示例7
private void printSummary(Message message, long batchId, int size) {
long memsize = 0;
for (CanalEntry.Entry entry : message.getEntries()) {
memsize += entry.getHeader().getEventLength();
}
String startPosition = null;
String endPosition = null;
if (!CollectionUtils.isEmpty(message.getEntries())) {
startPosition = buildPositionForDump(message.getEntries().get(0));
endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
}
logger.info(context_format, new Object[]{batchId, size, memsize, format.format(new Date()), startPosition,
endPosition});
}
示例8
@SuppressWarnings({"rawtypes", "unchecked"})
private void runLastEventTypeOfAction(int eventType, List<? extends RowChangedData> dataList) {
if (log.isDebugEnabled()) {
log.debug("canal instance: " + instanceName + " need handle data size: " + dataList.size() + ", eventType: " + eventType
+ " table: " + lastTable);
}
if (eventType == CanalEntry.EventType.UPDATE_VALUE) {
lastTable.getAction().onUpdateAction(Collections.unmodifiableList((List<RowChangedData.Update>) dataList));
} else if (eventType == CanalEntry.EventType.INSERT_VALUE) {
lastTable.getAction().onInsertAction(Collections.unmodifiableList((List<RowChangedData.Insert>) dataList));
} else {
lastTable.getAction().onDeleteAction(Collections.unmodifiableList((List<RowChangedData.Delete>) dataList));
}
//这儿主动调用clear, 数据无效掉, 避免调用Action时保留引用导致无法回收
for (RowChangedData data : dataList) {
data.close();
}
//这儿清楚掉
dataList.clear();
}
示例9
public void insert(List<CanalEntry.Column> data, String schemaName, String tableName) {
DBObject obj = DBConvertUtil.columnToJson(data);
logger.debug("insert :{}", obj.toString());
//订单库单独处理
if (schemaName.equals("order")) {
//保存原始数据
if (tableName.startsWith("order_base_info")) {
tableName = "order_base_info";
} else if (tableName.startsWith("order_detail_info")) {
tableName = "order_detail_info";
} else {
logger.info("unknown data :{}.{}:{}", schemaName, tableName, obj);
return;
}
insertData(schemaName, tableName, obj, obj);
} else {
DBObject newObj = (DBObject) ObjectUtils.clone(obj);
if (newObj.containsField("id")) {
newObj.put("_id", newObj.get("id"));
newObj.removeField("id");
}
insertData(schemaName, tableName, newObj, obj);
}
}
示例10
public void update(List<CanalEntry.Column> data, String schemaName, String tableName) {
DBObject obj = DBConvertUtil.columnToJson(data);
logger.debug("update:{}", obj.toString());
//订单库单独处理
if (schemaName.equals("order")) {
if (tableName.startsWith("order_base_info")) {
tableName = "order_base_info";
} else if (tableName.startsWith("order_detail_info")) {
tableName = "order_detail_info";
} else {
logger.info("unknown data:{}.{}:{}", schemaName, tableName, obj);
}
updateData(schemaName, tableName, new BasicDBObject("orderId", obj.get("orderId")), obj);
} else {
if (obj.containsField("id")) {
updateData(schemaName, tableName, new BasicDBObject("_id", obj.get("id")), obj);
} else {
logger.info("unknown data structure");
}
}
}
示例11
public void insertData(String schemaName, String tableName, DBObject naive, DBObject complete) {
int i = 0;
DBObject logObj = (DBObject) ObjectUtils.clone(complete);
//保存原始数据
try {
String path = "/" + schemaName + "/" + tableName + "/" + CanalEntry.EventType.INSERT.getNumber();
i++;
naiveMongoTemplate.getCollection(tableName).insert(naive);
i++;
SpringUtil.doEvent(path, complete);
i++;
} catch (MongoClientException | MongoSocketException clientException) {
//客户端连接异常抛出,阻塞同步,防止mongodb宕机
throw clientException;
} catch (Exception e) {
logError(schemaName, tableName, 1, i, logObj, e);
}
}
示例12
protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
InterruptedException {
long startTs = -1;
boolean enabled = getProfilingEnabled();
if (enabled) {
startTs = System.currentTimeMillis();
}
boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);
if (enabled) {
this.processingInterval = System.currentTimeMillis() - startTs;
}
if (consumedEventCount.incrementAndGet() < 0) {
consumedEventCount.set(0);
}
return result;
}
示例13
public void deleteData(String schemaName, String tableName, DBObject obj) {
int i = 0;
String path = "/" + schemaName + "/" + tableName + "/" + CanalEntry.EventType.DELETE.getNumber();
DBObject newObj = (DBObject) ObjectUtils.clone(obj);
DBObject logObj = (DBObject) ObjectUtils.clone(obj);
//保存原始数据
try {
i++;
if (obj.containsField("id")) {
naiveMongoTemplate.getCollection(tableName).remove(new BasicDBObject("_id", obj.get("id")));
}
i++;
SpringUtil.doEvent(path, newObj);
} catch (MongoClientException | MongoSocketException clientException) {
//客户端连接异常抛出,阻塞同步,防止mongodb宕机
throw clientException;
} catch (Exception e) {
logError(schemaName, tableName, 3, i, logObj, e);
}
}
示例14
protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod, boolean isSeek) throws Exception {
long startTs = -1;
boolean enabled = getProfilingEnabled();
if (enabled) {
startTs = System.currentTimeMillis();
}
CanalEntry.Entry event = binlogParser.parse(bod, isSeek);
if (enabled) {
this.parsingInterval = System.currentTimeMillis() - startTs;
}
if (parsedEventCount.incrementAndGet() < 0) {
parsedEventCount.set(0);
}
return event;
}
示例15
@Override
public int getRowCount() {
//int proCount = entry.getHeader().getPropsCount();
//if (proCount == 1) {
// CanalEntry.Pair pair = entry.getHeader().getProps(0);
// if (pair.getKey().equals("rowCount"))
// return Integer.parseInt(pair.getValue());
//}
for (CanalEntry.Pair pair : entry.getHeader().getPropsList()) {
if (pair.getKey().equals("rowsCount")) {
return Integer.parseInt(pair.getValue());
}
}
return 1;
}
示例16
protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod) throws Exception {
long startTs = -1;
boolean enabled = getProfilingEnabled();
if (enabled) {
startTs = System.currentTimeMillis();
}
CanalEntry.Entry event = binlogParser.parse(bod);
if (enabled) {
this.parsingInterval = System.currentTimeMillis() - startTs;
}
if (parsedEventCount.incrementAndGet() < 0) {
parsedEventCount.set(0);
}
return event;
}
示例17
/**
* 获取当前Entry的 GTID信息示例
*
* @param header
* @return
*/
public static String getCurrentGtid(CanalEntry.Header header) {
List<CanalEntry.Pair> props = header.getPropsList();
if (props != null && props.size() > 0) {
for (CanalEntry.Pair pair : props) {
if ("curtGtid".equals(pair.getKey())) {
return pair.getValue();
}
}
}
return "";
}
示例18
/**
* 获取当前Entry的 GTID Sequence No信息示例
*
* @param header
* @return
*/
public static String getCurrentGtidSn(CanalEntry.Header header) {
List<CanalEntry.Pair> props = header.getPropsList();
if (props != null && props.size() > 0) {
for (CanalEntry.Pair pair : props) {
if ("curtGtidSn".equals(pair.getKey())) {
return pair.getValue();
}
}
}
return "";
}
示例19
/**
* 获取当前Entry的 GTID Last Committed信息示例
*
* @param header
* @return
*/
public static String getCurrentGtidLct(CanalEntry.Header header) {
List<CanalEntry.Pair> props = header.getPropsList();
if (props != null && props.size() > 0) {
for (CanalEntry.Pair pair : props) {
if ("curtGtidLct".equals(pair.getKey())) {
return pair.getValue();
}
}
}
return "";
}
示例20
public boolean filter(CanalEntry.Entry entry) throws CanalFilterException {
if (StringUtils.isEmpty(expression)) {
return true;
}
Map<String, Object> env = new HashMap<String, Object>();
env.put(ROOT_KEY, entry);
return (Boolean) AviatorEvaluator.execute(expression, env);
}
示例21
/**
* 获取当前Entry的 GTID Last Committed信息示例
*
* @param header
* @return
*/
public static String getCurrentGtidLct(CanalEntry.Header header) {
List<CanalEntry.Pair> props = header.getPropsList();
if (props != null && props.size() > 0) {
for (CanalEntry.Pair pair : props) {
if ("curtGtidLct".equals(pair.getKey())) {
return pair.getValue();
}
}
}
return "";
}
示例22
public void start() throws CanalStoreException {
super.start();
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
Assert.notNull(flushCallback, "flush callback is null!");
indexMask = bufferSize - 1;
entries = new CanalEntry.Entry[bufferSize];
}
示例23
private void flush() throws InterruptedException {
long start = this.flushSequence.get() + 1;
long end = this.putSequence.get();
if (start <= end) {
List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();
for (long next = start; next <= end; next++) {
transaction.add(this.entries[getIndex(next)]);
}
flushCallback.flush(transaction);
flushSequence.set(end);// flush成功后,更新flush位置
}
}
示例24
public boolean filter(CanalEntry.Entry entry) throws CanalFilterException {
if (StringUtils.isEmpty(expression)) {
return true;
}
Map<String, Object> env = new HashMap<String, Object>();
env.put(ROOT_KEY, entry);
return (Boolean) AviatorEvaluator.execute(expression, env);
}
示例25
@Test
public void test_el() {
AviaterELFilter filter = new AviaterELFilter("str(entry.entryType) == 'ROWDATA'");
CanalEntry.Entry.Builder entry = CanalEntry.Entry.newBuilder();
entry.setEntryType(CanalEntry.EntryType.ROWDATA);
boolean result = filter.filter(entry.build());
Assert.assertEquals(true, result);
}
示例26
public Event(LogIdentity logIdentity, CanalEntry.Entry entry, boolean raw){
this.logIdentity = logIdentity;
this.entryType = entry.getEntryType();
this.executeTime = entry.getHeader().getExecuteTime();
this.journalName = entry.getHeader().getLogfileName();
this.position = entry.getHeader().getLogfileOffset();
this.serverId = entry.getHeader().getServerId();
this.gtid = entry.getHeader().getGtid();
this.eventType = entry.getHeader().getEventType();
if (entryType == EntryType.ROWDATA) {
List<CanalEntry.Pair> props = entry.getHeader().getPropsList();
if (props != null) {
for (CanalEntry.Pair p : props) {
if ("rowsCount".equals(p.getKey())) {
rowsCount = Integer.parseInt(p.getValue());
break;
}
}
}
}
if (raw) {
// build raw
this.rawEntry = entry.toByteString();
this.rawLength = rawEntry.size();
} else {
this.entry = entry;
// 按照6倍的event length预估
this.rawLength = entry.getHeader().getEventLength() * 6;
}
}
示例27
protected void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
StringBuilder builder = new StringBuilder();
builder.append(column.getName() + " : " + column.getValue());
builder.append(" type=" + column.getMysqlType());
if (column.getUpdated()) {
builder.append(" update=" + column.getUpdated());
}
builder.append(SEP);
logger.info(builder.toString());
}
}
示例28
public static Message deserializer(byte[] data, boolean lazyParseEntry) {
try {
if (data == null) {
return null;
} else {
CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
switch (p.getType()) {
case MESSAGES: {
if (!p.getCompression().equals(CanalPacket.Compression.NONE)
&& !p.getCompression().equals(CanalPacket.Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
throw new CanalClientException("compression is not supported in this connector");
}
CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
Message result = new Message(messages.getBatchId());
if (lazyParseEntry) {
// byteString
result.setRawEntries(messages.getMessagesList());
result.setRaw(true);
} else {
for (ByteString byteString : messages.getMessagesList()) {
result.addEntry(CanalEntry.Entry.parseFrom(byteString));
}
result.setRaw(false);
}
return result;
}
case ACK: {
CanalPacket.Ack ack = CanalPacket.Ack.parseFrom(p.getBody());
throw new CanalClientException("something goes wrong with reason: " + ack.getErrorMessage());
}
default: {
throw new CanalClientException("unexpected packet type: " + p.getType());
}
}
}
} catch (Exception e) {
throw new CanalClientException("deserializer failed", e);
}
}
示例29
/**
* 获取当前Entry的 GTID信息示例
*
* @param header
* @return
*/
public static String getCurrentGtid(CanalEntry.Header header) {
List<CanalEntry.Pair> props = header.getPropsList();
if (props != null && props.size() > 0) {
for (CanalEntry.Pair pair : props) {
if ("curtGtid".equals(pair.getKey())) {
return pair.getValue();
}
}
}
return "";
}
示例30
private CanalEntry.RowChange parseRowChange(CanalEntry.Entry entry) {
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
logger.error("ERROR ## parser of eromanga-event has an error , data:" + entry.toString());
}
return rowChange;
}