Java源码示例:com.alibaba.otter.canal.protocol.CanalEntry

示例1
protected boolean doFilter(CanalEntry.Entry entry) {
    if (filter != null && entry.getEntryType() == EntryType.ROWDATA) {
        String name = getSchemaNameAndTableName(entry);
        boolean need = filter.filter(name);
        if (!need) {
            logger.debug("filter name[{}] entry : {}:{}",
                name,
                entry.getHeader().getLogfileName(),
                entry.getHeader().getLogfileOffset());
        }

        return need;
    } else {
        return true;
    }
}
 
示例2
/**
 * {@link Schema.Table#columns}有值, 则对于UPDATE操作过滤更改的字段是否包含在{@link Schema.Table#columns}
 * DELETE, INSERT事件执行条件过滤, 对于UPDATE的过滤不在这做, 比较复杂, 由子类自己实现过滤
 *
 * @param rowChange 更改的数据
 * @return 解析结果
 */
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
protected final List<RowChangedData> changedDataParse(CanalEntry.RowChange rowChange) {
    List<RowChangedData> dataList = RowChangedData.build(rowChange, currentTable.getRowDataColumns());
    if (CommonsUtils.isEmpty(dataList)) return null;
    ConditionContainer columnCondition;
    if (currentEventType != CanalEntry.EventType.UPDATE
            && (columnCondition = currentTable.getColumnCondition()) != null) {
        //对于INSERT类型的记录更新, 如果条件判断没有通过, 可以认为该更新事件没有发生~~~~
        //对于DELETE类型的记录更新, 如果条件判断没有通过, 可以认为该数据删除之前就不关心, 那这次删除我们更不关心了~~~
        Iterator<RowChangedData> it = dataList.iterator();
        while (it.hasNext()) {
            if (!columnCondition.verify(it.next())) {
                it.remove();
            }
        }
    }
    return dataList;
}
 
示例3
@Override
public void onEvent(MessageEvent event) throws Exception {
    try {
        if (event.isNeedDmlParse()) {
            int eventType = event.getEvent().getHeader().getType();
            CanalEntry.Entry entry = null;
            switch (eventType) {
                case LogEvent.ROWS_QUERY_LOG_EVENT:
                    entry = logEventConvert.parse(event.getEvent(), false);
                    break;
                default:
                    // 单独解析dml事件
                    entry = logEventConvert.parseRowsEvent((RowsLogEvent) event.getEvent(), event.getTable());
            }

            event.setEntry(entry);
        }
    } catch (Throwable e) {
        exception = new CanalParseException(e);
        throw exception;
    }
}
 
示例4
public AbstractEventParser(){
    // 初始化一下
    transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {

        public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
            boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
            if (!running) {
                return;
            }

            if (!successed) {
                throw new CanalParseException("consume failed!");
            }

            LogPosition position = buildLastTransactionPosition(transaction);
            if (position != null) { // 可能position为空
                logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
            }
        }
    });
}
 
示例5
protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
                                                                                       InterruptedException {
    long startTs = -1;
    boolean enabled = getProfilingEnabled();
    if (enabled) {
        startTs = System.currentTimeMillis();
    }

    boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);

    if (enabled) {
        this.processingInterval = System.currentTimeMillis() - startTs;
    }

    if (consumedEventCount.incrementAndGet() < 0) {
        consumedEventCount.set(0);
    }

    return result;
}
 
示例6
protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod, boolean isSeek) throws Exception {
    long startTs = -1;
    boolean enabled = getProfilingEnabled();
    if (enabled) {
        startTs = System.currentTimeMillis();
    }
    CanalEntry.Entry event = binlogParser.parse(bod, isSeek);
    if (enabled) {
        this.parsingInterval = System.currentTimeMillis() - startTs;
    }

    if (parsedEventCount.incrementAndGet() < 0) {
        parsedEventCount.set(0);
    }
    return event;
}
 
示例7
private void printSummary(Message message, long batchId, int size) {
    long memsize = 0;
    for (CanalEntry.Entry entry : message.getEntries()) {
        memsize += entry.getHeader().getEventLength();
    }

    String startPosition = null;
    String endPosition = null;
    if (!CollectionUtils.isEmpty(message.getEntries())) {
        startPosition = buildPositionForDump(message.getEntries().get(0));
        endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
    }

    logger.info(context_format, new Object[]{batchId, size, memsize, format.format(new Date()), startPosition,
            endPosition});
}
 
示例8
@SuppressWarnings({"rawtypes", "unchecked"})
private void runLastEventTypeOfAction(int eventType, List<? extends RowChangedData> dataList) {
    if (log.isDebugEnabled()) {
        log.debug("canal instance: " + instanceName + " need handle data size: " + dataList.size() + ", eventType: " + eventType
                + " table: " + lastTable);
    }
    if (eventType == CanalEntry.EventType.UPDATE_VALUE) {
        lastTable.getAction().onUpdateAction(Collections.unmodifiableList((List<RowChangedData.Update>) dataList));
    } else if (eventType == CanalEntry.EventType.INSERT_VALUE) {
        lastTable.getAction().onInsertAction(Collections.unmodifiableList((List<RowChangedData.Insert>) dataList));
    } else {
        lastTable.getAction().onDeleteAction(Collections.unmodifiableList((List<RowChangedData.Delete>) dataList));
    }
    //这儿主动调用clear, 数据无效掉, 避免调用Action时保留引用导致无法回收
    for (RowChangedData data : dataList) {
        data.close();
    }
    //这儿清楚掉
    dataList.clear();
}
 
示例9
public void insert(List<CanalEntry.Column> data, String schemaName, String tableName) {
    DBObject obj = DBConvertUtil.columnToJson(data);
    logger.debug("insert :{}", obj.toString());
    //订单库单独处理
    if (schemaName.equals("order")) {
        //保存原始数据
        if (tableName.startsWith("order_base_info")) {
            tableName = "order_base_info";
        } else if (tableName.startsWith("order_detail_info")) {
            tableName = "order_detail_info";
        } else {
            logger.info("unknown data :{}.{}:{}", schemaName, tableName, obj);
            return;
        }
        insertData(schemaName, tableName, obj, obj);
    } else {
        DBObject newObj = (DBObject) ObjectUtils.clone(obj);
        if (newObj.containsField("id")) {
            newObj.put("_id", newObj.get("id"));
            newObj.removeField("id");
        }
        insertData(schemaName, tableName, newObj, obj);
    }
}
 
示例10
public void update(List<CanalEntry.Column> data, String schemaName, String tableName) {
    DBObject obj = DBConvertUtil.columnToJson(data);
    logger.debug("update:{}", obj.toString());
    //订单库单独处理
    if (schemaName.equals("order")) {
        if (tableName.startsWith("order_base_info")) {
            tableName = "order_base_info";
        } else if (tableName.startsWith("order_detail_info")) {
            tableName = "order_detail_info";
        } else {
            logger.info("unknown data:{}.{}:{}", schemaName, tableName, obj);
        }
        updateData(schemaName, tableName, new BasicDBObject("orderId", obj.get("orderId")), obj);
    } else {
        if (obj.containsField("id")) {
            updateData(schemaName, tableName, new BasicDBObject("_id", obj.get("id")), obj);
        } else {
            logger.info("unknown data structure");
        }
    }
}
 
示例11
public void insertData(String schemaName, String tableName, DBObject naive, DBObject complete) {
    int i = 0;
    DBObject logObj = (DBObject) ObjectUtils.clone(complete);
    //保存原始数据
    try {
        String path = "/" + schemaName + "/" + tableName + "/" + CanalEntry.EventType.INSERT.getNumber();
        i++;
        naiveMongoTemplate.getCollection(tableName).insert(naive);
        i++;
        SpringUtil.doEvent(path, complete);
        i++;
    } catch (MongoClientException | MongoSocketException clientException) {
        //客户端连接异常抛出,阻塞同步,防止mongodb宕机
        throw clientException;
    } catch (Exception e) {
        logError(schemaName, tableName, 1, i, logObj, e);
    }
}
 
示例12
protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException,
                                                                                       InterruptedException {
    long startTs = -1;
    boolean enabled = getProfilingEnabled();
    if (enabled) {
        startTs = System.currentTimeMillis();
    }

    boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination);

    if (enabled) {
        this.processingInterval = System.currentTimeMillis() - startTs;
    }

    if (consumedEventCount.incrementAndGet() < 0) {
        consumedEventCount.set(0);
    }

    return result;
}
 
示例13
public void deleteData(String schemaName, String tableName, DBObject obj) {
    int i = 0;
    String path = "/" + schemaName + "/" + tableName + "/" + CanalEntry.EventType.DELETE.getNumber();
    DBObject newObj = (DBObject) ObjectUtils.clone(obj);
    DBObject logObj = (DBObject) ObjectUtils.clone(obj);
    //保存原始数据
    try {
        i++;
        if (obj.containsField("id")) {
            naiveMongoTemplate.getCollection(tableName).remove(new BasicDBObject("_id", obj.get("id")));
        }
        i++;
        SpringUtil.doEvent(path, newObj);
    } catch (MongoClientException | MongoSocketException clientException) {
        //客户端连接异常抛出,阻塞同步,防止mongodb宕机
        throw clientException;
    } catch (Exception e) {
        logError(schemaName, tableName, 3, i, logObj, e);
    }
}
 
示例14
protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod, boolean isSeek) throws Exception {
    long startTs = -1;
    boolean enabled = getProfilingEnabled();
    if (enabled) {
        startTs = System.currentTimeMillis();
    }
    CanalEntry.Entry event = binlogParser.parse(bod, isSeek);
    if (enabled) {
        this.parsingInterval = System.currentTimeMillis() - startTs;
    }

    if (parsedEventCount.incrementAndGet() < 0) {
        parsedEventCount.set(0);
    }
    return event;
}
 
示例15
@Override
public int getRowCount() {
    //int proCount = entry.getHeader().getPropsCount();
    //if (proCount == 1) {
    //    CanalEntry.Pair pair = entry.getHeader().getProps(0);
    //    if (pair.getKey().equals("rowCount"))
    //        return Integer.parseInt(pair.getValue());
    //}
    for (CanalEntry.Pair pair : entry.getHeader().getPropsList()) {
        if (pair.getKey().equals("rowsCount")) {
            return Integer.parseInt(pair.getValue());
        }
    }

    return 1;
}
 
示例16
protected CanalEntry.Entry parseAndProfilingIfNecessary(EVENT bod) throws Exception {
    long startTs = -1;
    boolean enabled = getProfilingEnabled();
    if (enabled) {
        startTs = System.currentTimeMillis();
    }
    CanalEntry.Entry event = binlogParser.parse(bod);
    if (enabled) {
        this.parsingInterval = System.currentTimeMillis() - startTs;
    }

    if (parsedEventCount.incrementAndGet() < 0) {
        parsedEventCount.set(0);
    }
    return event;
}
 
示例17
/**
 * 获取当前Entry的 GTID信息示例
 * 
 * @param header
 * @return
 */
public static String getCurrentGtid(CanalEntry.Header header) {
    List<CanalEntry.Pair> props = header.getPropsList();
    if (props != null && props.size() > 0) {
        for (CanalEntry.Pair pair : props) {
            if ("curtGtid".equals(pair.getKey())) {
                return pair.getValue();
            }
        }
    }
    return "";
}
 
示例18
/**
 * 获取当前Entry的 GTID Sequence No信息示例
 * 
 * @param header
 * @return
 */
public static String getCurrentGtidSn(CanalEntry.Header header) {
    List<CanalEntry.Pair> props = header.getPropsList();
    if (props != null && props.size() > 0) {
        for (CanalEntry.Pair pair : props) {
            if ("curtGtidSn".equals(pair.getKey())) {
                return pair.getValue();
            }
        }
    }
    return "";
}
 
示例19
/**
 * 获取当前Entry的 GTID Last Committed信息示例
 * 
 * @param header
 * @return
 */
public static String getCurrentGtidLct(CanalEntry.Header header) {
    List<CanalEntry.Pair> props = header.getPropsList();
    if (props != null && props.size() > 0) {
        for (CanalEntry.Pair pair : props) {
            if ("curtGtidLct".equals(pair.getKey())) {
                return pair.getValue();
            }
        }
    }
    return "";
}
 
示例20
public boolean filter(CanalEntry.Entry entry) throws CanalFilterException {
    if (StringUtils.isEmpty(expression)) {
        return true;
    }

    Map<String, Object> env = new HashMap<String, Object>();
    env.put(ROOT_KEY, entry);
    return (Boolean) AviatorEvaluator.execute(expression, env);
}
 
示例21
/**
 * 获取当前Entry的 GTID Last Committed信息示例
 * 
 * @param header
 * @return
 */
public static String getCurrentGtidLct(CanalEntry.Header header) {
    List<CanalEntry.Pair> props = header.getPropsList();
    if (props != null && props.size() > 0) {
        for (CanalEntry.Pair pair : props) {
            if ("curtGtidLct".equals(pair.getKey())) {
                return pair.getValue();
            }
        }
    }
    return "";
}
 
示例22
public void start() throws CanalStoreException {
    super.start();
    if (Integer.bitCount(bufferSize) != 1) {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    Assert.notNull(flushCallback, "flush callback is null!");
    indexMask = bufferSize - 1;
    entries = new CanalEntry.Entry[bufferSize];
}
 
示例23
private void flush() throws InterruptedException {
    long start = this.flushSequence.get() + 1;
    long end = this.putSequence.get();

    if (start <= end) {
        List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>();
        for (long next = start; next <= end; next++) {
            transaction.add(this.entries[getIndex(next)]);
        }

        flushCallback.flush(transaction);
        flushSequence.set(end);// flush成功后,更新flush位置
    }
}
 
示例24
public boolean filter(CanalEntry.Entry entry) throws CanalFilterException {
    if (StringUtils.isEmpty(expression)) {
        return true;
    }

    Map<String, Object> env = new HashMap<String, Object>();
    env.put(ROOT_KEY, entry);
    return (Boolean) AviatorEvaluator.execute(expression, env);
}
 
示例25
@Test
public void test_el() {
    AviaterELFilter filter = new AviaterELFilter("str(entry.entryType) == 'ROWDATA'");

    CanalEntry.Entry.Builder entry = CanalEntry.Entry.newBuilder();
    entry.setEntryType(CanalEntry.EntryType.ROWDATA);

    boolean result = filter.filter(entry.build());
    Assert.assertEquals(true, result);
}
 
示例26
public Event(LogIdentity logIdentity, CanalEntry.Entry entry, boolean raw){
    this.logIdentity = logIdentity;
    this.entryType = entry.getEntryType();
    this.executeTime = entry.getHeader().getExecuteTime();
    this.journalName = entry.getHeader().getLogfileName();
    this.position = entry.getHeader().getLogfileOffset();
    this.serverId = entry.getHeader().getServerId();
    this.gtid = entry.getHeader().getGtid();
    this.eventType = entry.getHeader().getEventType();
    if (entryType == EntryType.ROWDATA) {
        List<CanalEntry.Pair> props = entry.getHeader().getPropsList();
        if (props != null) {
            for (CanalEntry.Pair p : props) {
                if ("rowsCount".equals(p.getKey())) {
                    rowsCount = Integer.parseInt(p.getValue());
                    break;
                }
            }
        }
    }

    if (raw) {
        // build raw
        this.rawEntry = entry.toByteString();
        this.rawLength = rawEntry.size();
    } else {
        this.entry = entry;
        // 按照6倍的event length预估
        this.rawLength = entry.getHeader().getEventLength() * 6;
    }
}
 
示例27
protected void printColumn(List<CanalEntry.Column> columns) {
    for (CanalEntry.Column column : columns) {
        StringBuilder builder = new StringBuilder();
        builder.append(column.getName() + " : " + column.getValue());
        builder.append("    type=" + column.getMysqlType());
        if (column.getUpdated()) {
            builder.append("    update=" + column.getUpdated());
        }
        builder.append(SEP);
        logger.info(builder.toString());
    }
}
 
示例28
public static Message deserializer(byte[] data, boolean lazyParseEntry) {
    try {
        if (data == null) {
            return null;
        } else {
            CanalPacket.Packet p = CanalPacket.Packet.parseFrom(data);
            switch (p.getType()) {
                case MESSAGES: {
                    if (!p.getCompression().equals(CanalPacket.Compression.NONE)
                        && !p.getCompression().equals(CanalPacket.Compression.COMPRESSIONCOMPATIBLEPROTO2)) {
                        throw new CanalClientException("compression is not supported in this connector");
                    }

                    CanalPacket.Messages messages = CanalPacket.Messages.parseFrom(p.getBody());
                    Message result = new Message(messages.getBatchId());
                    if (lazyParseEntry) {
                        // byteString
                        result.setRawEntries(messages.getMessagesList());
                        result.setRaw(true);
                    } else {
                        for (ByteString byteString : messages.getMessagesList()) {
                            result.addEntry(CanalEntry.Entry.parseFrom(byteString));
                        }
                        result.setRaw(false);
                    }
                    return result;
                }
                case ACK: {
                    CanalPacket.Ack ack = CanalPacket.Ack.parseFrom(p.getBody());
                    throw new CanalClientException("something goes wrong with reason: " + ack.getErrorMessage());
                }
                default: {
                    throw new CanalClientException("unexpected packet type: " + p.getType());
                }
            }
        }
    } catch (Exception e) {
        throw new CanalClientException("deserializer failed", e);
    }
}
 
示例29
/**
 * 获取当前Entry的 GTID信息示例
 * 
 * @param header
 * @return
 */
public static String getCurrentGtid(CanalEntry.Header header) {
    List<CanalEntry.Pair> props = header.getPropsList();
    if (props != null && props.size() > 0) {
        for (CanalEntry.Pair pair : props) {
            if ("curtGtid".equals(pair.getKey())) {
                return pair.getValue();
            }
        }
    }
    return "";
}
 
示例30
private CanalEntry.RowChange parseRowChange(CanalEntry.Entry entry) {
    CanalEntry.RowChange rowChange = null;
    try {
        rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
        logger.error("ERROR ## parser of eromanga-event has an error , data:" + entry.toString());
    }
    return rowChange;
}