Java源码示例:com.github.shyiko.mysql.binlog.event.WriteRowsEventData

示例1
@Override
protected EventBaseDTO formatData(Event event) {
    WriteRowsEventData d = event.getData();
    WriteRowsDTO writeRowsDTO = new WriteRowsDTO();
    writeRowsDTO.setEventType(DatabaseEvent.WRITE_ROWS);
    //添加表信息
    ColumnsTableMapEventData tableMapData = context.getTableMapData(d.getTableId());
    writeRowsDTO.setDatabase(tableMapData.getDatabase());
    writeRowsDTO.setTable(tableMapData.getTable());
    writeRowsDTO.setNamespace(context.getBinaryLogConfig().getNamespace());
    //添加列映射
    int[] includedColumns = d.getIncludedColumns().stream().toArray();
    writeRowsDTO.setRowMaps(d.getRows().stream()
            .map(r -> convert(r,includedColumns,tableMapData)).collect(Collectors.toList()));
    return writeRowsDTO;
}
 
示例2
@Test
public void testWriteEvent() {
  eventHeader.setEventType(EventType.EXT_WRITE_ROWS);
  WriteRowsEventData eventData = new WriteRowsEventData();
  eventData.setTableId(TABLE_ID);
  eventData.setRows(ImmutableList.of(ROW));

  Optional<BinlogEvent> binlogEvent =
      BinaryLogConnectorEventMapper.INSTANCE.map(
          new Event(eventHeader, eventData), BINLOG_FILE_POS);
  assertTrue(binlogEvent.isPresent());
  assertTrue(binlogEvent.get() instanceof WriteEvent);
  WriteEvent writeEvent = (WriteEvent) (binlogEvent.get());
  assertEquals(BINLOG_FILE_POS, writeEvent.getBinlogFilePos());
  assertEquals(ImmutableList.of(ROW), writeEvent.getRows());
  assertEquals(SERVER_ID, writeEvent.getServerId());
  assertEquals(TABLE_ID, writeEvent.getTableId());
  assertEquals(TIMESTAMP, writeEvent.getTimestamp());
}
 
示例3
@Override
public PublishedEvent parseEventData(WriteRowsEventData eventData, String binlogFilename, long position) throws IOException {
  if (columnOrders.isEmpty()) {
    try {
      getColumnOrders();
    } catch (SQLException e) {
      throw new RuntimeException(e);
    }
  }

  String eventDataValue;
  if(getValue(eventData, EVENT_DATA_FIELDNAME) instanceof String) {
    eventDataValue = (String) getValue(eventData, EVENT_DATA_FIELDNAME);
  } else {
    eventDataValue = JsonBinary.parseAsString((byte[])getValue(eventData, EVENT_DATA_FIELDNAME));
  }
  return new PublishedEvent(
          (String)getValue(eventData, EVENT_ID_FIELDNAME),
          (String)getValue(eventData, ENTITY_ID_FIELDNAME),
          (String)getValue(eventData, ENTITY_TYPE_FIELDNAME),
          eventDataValue,
          (String)getValue(eventData, EVENT_TYPE_FIELDNAME),
          new BinlogFileOffset(binlogFilename, position),
          Optional.ofNullable((String)getValue(eventData, EVENT_METADATA_FIELDNAME))
  );
}
 
示例4
private List<Record> toRecords(Table table,
                               EventHeader eventHeader,
                               WriteRowsEventData eventData,
                               SourceOffset offset) {
  List<Record> res = new ArrayList<>(eventData.getRows().size());
  for (Serializable[] row : eventData.getRows()) {
    Record record = recordFactory.create(offset.format());
    Map<String, Field> fields = createHeader(table, eventHeader, offset);
    fields.put(TYPE_FIELD, create("INSERT"));
    record.getHeader().setAttribute(
        OperationType.SDC_OPERATION_TYPE,
        String.valueOf(OperationType.INSERT_CODE)
    );
    List<ColumnValue> columnValues = zipColumnsValues(
        eventData.getIncludedColumns(),
        table,
        row
    );
    Map<String, Field> data = toMap(columnValues);
    fields.put(DATA_FIELD, create(data));

    record.set(create(fields));
    res.add(record);
  }
  return res;
}
 
示例5
private List<Record> toRecords(Table table,
                               EventHeader eventHeader,
                               WriteRowsEventData eventData,
                               SourceOffset offset) {
  List<Record> res = new ArrayList<>(eventData.getRows().size());
  for (Serializable[] row : eventData.getRows()) {
    Record record = recordFactory.create(offset.format());
    Map<String, Field> fields = createHeader(table, eventHeader, offset);
    fields.put(TYPE_FIELD, create("INSERT"));
    record.getHeader().setAttribute(
        OperationType.SDC_OPERATION_TYPE,
        String.valueOf(OperationType.INSERT_CODE)
    );
    List<ColumnValue> columnValues = zipColumnsValues(
        eventData.getIncludedColumns(),
        table,
        row
    );
    Map<String, Field> data = toMap(columnValues);
    fields.put(DATA_FIELD, create(data));

    record.set(create(fields));
    res.add(record);
  }
  return res;
}
 
示例6
private void processWriteEvent(Event event) {
    WriteRowsEventData data = event.getData();
    Long tableId = data.getTableId();
    List<Serializable[]> list = data.getRows();

    for (Serializable[] row : list) {
        addRow(EntryType.CREATE, tableId, row, null);
    }
}
 
示例7
@Override
protected Set<ClientInfo> filter(Event event) {
    WriteRowsEventData d = event.getData();
    long tableId = d.getTableId();
    TableMapEventData tableMapEventData = context.getTableMapData(tableId);
    String tableKey = tableMapEventData.getDatabase().concat("/").concat(tableMapEventData.getTable());
    return clientInfoMap.get(tableKey);
}
 
示例8
public static List<IndexedFullRow> getIndexedRows(SimpleEventType eventType, EventData data,
                                                  Set<Integer> primaryKeys) {
  switch (eventType) {
    case UPDATE:
      return UpdateRowsEvent.getIndexedRows((UpdateRowsEventData) data);
    case WRITE:
      WriteRowsEventData write = (WriteRowsEventData) data;
      return getIndexedRows(write.getRows(), write.getIncludedColumns());
    case DELETE:
      DeleteRowsEventData delete = (DeleteRowsEventData) data;
      return getIndexedRows(delete.getRows(), delete.getIncludedColumns());
    default:
      throw new IllegalArgumentException("Unsupported event type");
  }
}
 
示例9
public List<Record> toRecords(EnrichedEvent event) {
  EventType eventType = event.getEvent().getHeader().getEventType();
  switch (eventType) {
    case PRE_GA_WRITE_ROWS:
    case WRITE_ROWS:
    case EXT_WRITE_ROWS:
      return toRecords(
          event.getTable(),
          event.getEvent().getHeader(),
          event.getEvent().<WriteRowsEventData>getData(),
          event.getOffset()
      );
    case PRE_GA_UPDATE_ROWS:
    case UPDATE_ROWS:
    case EXT_UPDATE_ROWS:
      return toRecords(
          event.getTable(),
          event.getEvent().getHeader(),
          event.getEvent().<UpdateRowsEventData>getData(),
          event.getOffset()
      );
    case PRE_GA_DELETE_ROWS:
    case DELETE_ROWS:
    case EXT_DELETE_ROWS:
      return toRecords(
          event.getTable(),
          event.getEvent().getHeader(),
          event.getEvent().<DeleteRowsEventData>getData(),
          event.getOffset()
      );
    default:
      throw new IllegalArgumentException(String.format("EventType '%s' not supported", eventType));
  }
}
 
示例10
public List<Record> toRecords(EnrichedEvent event) {
  EventType eventType = event.getEvent().getHeader().getEventType();
  switch (eventType) {
    case PRE_GA_WRITE_ROWS:
    case WRITE_ROWS:
    case EXT_WRITE_ROWS:
      return toRecords(
          event.getTable(),
          event.getEvent().getHeader(),
          event.getEvent().<WriteRowsEventData>getData(),
          event.getOffset()
      );
    case PRE_GA_UPDATE_ROWS:
    case UPDATE_ROWS:
    case EXT_UPDATE_ROWS:
      return toRecords(
          event.getTable(),
          event.getEvent().getHeader(),
          event.getEvent().<UpdateRowsEventData>getData(),
          event.getOffset()
      );
    case PRE_GA_DELETE_ROWS:
    case DELETE_ROWS:
    case EXT_DELETE_ROWS:
      return toRecords(
          event.getTable(),
          event.getEvent().getHeader(),
          event.getEvent().<DeleteRowsEventData>getData(),
          event.getOffset()
      );
    default:
      throw new IllegalArgumentException(String.format("EventType '%s' not supported", eventType));
  }
}
 
示例11
@Test
public void testBinlog() throws IOException {
    String hostname = "127.0.0.1", username = "yuwen", password = "lyp82nlf";
    int port = 3306;
    // BinaryLogClient其实就是一个连接数据库的客户端,
    // 它加自己伪装成slave 连接到master上
    BinaryLogClient client = new BinaryLogClient(
            hostname, port, username, password
    );
    // 设置监听的Binlog, 如果不设置则监听最新的Binlog
    //client.setBinlogFilename();
    // 设置监听的binlog位置, 如果不设置则监听最新的位置
    //client.setBinlogPosition();
    // 注册事件监听器, 监听期间MySQL发生的一些变化, Event代表已经发生的事件
    client.registerEventListener(event -> {
        // MySQL 数据表发生变化的一些数据
        EventData eventData = event.getData();
        if (eventData instanceof UpdateRowsEventData) {
            log.info("update event");
            log.debug("{}", eventData);
        } else if (eventData instanceof WriteRowsEventData) {
            log.info("write event");
            log.debug("{}", eventData);
        } else if (eventData instanceof DeleteRowsEventData) {
            log.info("delete event");
            log.debug("{}", eventData);
        }
    });
    // 连接到 master 开始监听
    client.connect();


    // 启动后手动连接到 MySQL执行
    // insert into `ad_unit_keyword` (`unit_id`, `keyword`) values (10, '标志');
    // 控制台得到日志
    // 15:39:17.410 [main] INFO top.ezttf.ad.service.BinlogServiceTest - write event
    // 15:39:17.459 [main] DEBUG top.ezttf.ad.service.BinlogServiceTest - WriteRowsEventData{tableId=122, includedColumns={0, 1, 2}, rows=[
    //     [13, 10, 标志]
    // ]}

    // WriteRowsEventData{tableId=118, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
    //    [11, 666, plan, 1, 2019-01-01, 2019-01-01, Tue Jan 01 08:00:00 CST 2019, Tue Jan 01 08:00:00 CST 2019]
    //]}
}
 
示例12
private Serializable getValue(WriteRowsEventData eventData, String columnName) {
  if(columnOrders.containsKey(columnName)) {
    return eventData.getRows().get(0)[columnOrders.get(columnName) - 1];
  }
  throw new RuntimeException("Column with name [" + columnName + "] not found");
}
 
示例13
@Override
public void onEvent(Event event) {
  LOG.trace("Received event {}", event);
  EventType eventType = event.getHeader().getEventType();
  currentBinLogFileName = client.getBinlogFilename();
  switch (eventType) {
    case TABLE_MAP:
      handleTableMappingEvent((TableMapEventData) event.getData());
      break;
    case PRE_GA_WRITE_ROWS:
    case WRITE_ROWS:
    case EXT_WRITE_ROWS:
      handleRowEvent(event, event.<WriteRowsEventData>getData().getTableId());
      break;
    case PRE_GA_UPDATE_ROWS:
    case UPDATE_ROWS:
    case EXT_UPDATE_ROWS:
      handleRowEvent(event, event.<UpdateRowsEventData>getData().getTableId());
      break;
    case PRE_GA_DELETE_ROWS:
    case DELETE_ROWS:
    case EXT_DELETE_ROWS:
      handleRowEvent(event, event.<DeleteRowsEventData>getData().getTableId());
      break;
    case QUERY:
      QueryEventData queryEventData = event.getData();
      String query = queryEventData.getSql();
      if (isCommit(query)) {
        finishTx();
      } else if (isSchemaChangeQuery(query)) {
        schemaRepository.evictAll();
      }
      break;
    case XID:
      finishTx();
      break;
    case GTID:
      GtidEventData eventData = event.getData();
      currentGtidSet = client.getGtidSet();
      currentTxGtid = eventData.getGtid();
      currentTxEventSeqNo = 0;
      LOG.trace("Started new tx, gtid: {}", currentTxGtid);
      break;
    default:
      // ignore
      break;
  }
}
 
示例14
@Override
public void onEvent(Event event) {
  LOG.trace("Received event {}", event);
  EventType eventType = event.getHeader().getEventType();
  currentBinLogFileName = client.getBinlogFilename();
  switch (eventType) {
    case TABLE_MAP:
      handleTableMappingEvent((TableMapEventData) event.getData());
      break;
    case PRE_GA_WRITE_ROWS:
    case WRITE_ROWS:
    case EXT_WRITE_ROWS:
      handleRowEvent(event, event.<WriteRowsEventData>getData().getTableId());
      break;
    case PRE_GA_UPDATE_ROWS:
    case UPDATE_ROWS:
    case EXT_UPDATE_ROWS:
      handleRowEvent(event, event.<UpdateRowsEventData>getData().getTableId());
      break;
    case PRE_GA_DELETE_ROWS:
    case DELETE_ROWS:
    case EXT_DELETE_ROWS:
      handleRowEvent(event, event.<DeleteRowsEventData>getData().getTableId());
      break;
    case QUERY:
      QueryEventData queryEventData = event.getData();
      String query = queryEventData.getSql();
      if (isCommit(query)) {
        finishTx();
      } else if (isSchemaChangeQuery(query)) {
        schemaRepository.evictAll();
      }
      break;
    case XID:
      finishTx();
      break;
    case GTID:
      GtidEventData eventData = event.getData();
      currentGtidSet = client.getGtidSet();
      currentTxGtid = eventData.getGtid();
      currentTxEventSeqNo = 0;
      LOG.trace("Started new tx, gtid: {}", currentTxGtid);
      break;
    default:
      // ignore
      break;
  }
}
 
示例15
public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, WriteRowsEventData data) {
    super(tableInfo, INSERT_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
    this.data = data;
}
 
示例16
M parseEventData(WriteRowsEventData eventData, String binlogFilename, long position) throws IOException;