Java源码示例:com.github.shyiko.mysql.binlog.event.WriteRowsEventData
示例1
@Override
protected EventBaseDTO formatData(Event event) {
WriteRowsEventData d = event.getData();
WriteRowsDTO writeRowsDTO = new WriteRowsDTO();
writeRowsDTO.setEventType(DatabaseEvent.WRITE_ROWS);
//添加表信息
ColumnsTableMapEventData tableMapData = context.getTableMapData(d.getTableId());
writeRowsDTO.setDatabase(tableMapData.getDatabase());
writeRowsDTO.setTable(tableMapData.getTable());
writeRowsDTO.setNamespace(context.getBinaryLogConfig().getNamespace());
//添加列映射
int[] includedColumns = d.getIncludedColumns().stream().toArray();
writeRowsDTO.setRowMaps(d.getRows().stream()
.map(r -> convert(r,includedColumns,tableMapData)).collect(Collectors.toList()));
return writeRowsDTO;
}
示例2
@Test
public void testWriteEvent() {
eventHeader.setEventType(EventType.EXT_WRITE_ROWS);
WriteRowsEventData eventData = new WriteRowsEventData();
eventData.setTableId(TABLE_ID);
eventData.setRows(ImmutableList.of(ROW));
Optional<BinlogEvent> binlogEvent =
BinaryLogConnectorEventMapper.INSTANCE.map(
new Event(eventHeader, eventData), BINLOG_FILE_POS);
assertTrue(binlogEvent.isPresent());
assertTrue(binlogEvent.get() instanceof WriteEvent);
WriteEvent writeEvent = (WriteEvent) (binlogEvent.get());
assertEquals(BINLOG_FILE_POS, writeEvent.getBinlogFilePos());
assertEquals(ImmutableList.of(ROW), writeEvent.getRows());
assertEquals(SERVER_ID, writeEvent.getServerId());
assertEquals(TABLE_ID, writeEvent.getTableId());
assertEquals(TIMESTAMP, writeEvent.getTimestamp());
}
示例3
@Override
public PublishedEvent parseEventData(WriteRowsEventData eventData, String binlogFilename, long position) throws IOException {
if (columnOrders.isEmpty()) {
try {
getColumnOrders();
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
String eventDataValue;
if(getValue(eventData, EVENT_DATA_FIELDNAME) instanceof String) {
eventDataValue = (String) getValue(eventData, EVENT_DATA_FIELDNAME);
} else {
eventDataValue = JsonBinary.parseAsString((byte[])getValue(eventData, EVENT_DATA_FIELDNAME));
}
return new PublishedEvent(
(String)getValue(eventData, EVENT_ID_FIELDNAME),
(String)getValue(eventData, ENTITY_ID_FIELDNAME),
(String)getValue(eventData, ENTITY_TYPE_FIELDNAME),
eventDataValue,
(String)getValue(eventData, EVENT_TYPE_FIELDNAME),
new BinlogFileOffset(binlogFilename, position),
Optional.ofNullable((String)getValue(eventData, EVENT_METADATA_FIELDNAME))
);
}
示例4
private List<Record> toRecords(Table table,
EventHeader eventHeader,
WriteRowsEventData eventData,
SourceOffset offset) {
List<Record> res = new ArrayList<>(eventData.getRows().size());
for (Serializable[] row : eventData.getRows()) {
Record record = recordFactory.create(offset.format());
Map<String, Field> fields = createHeader(table, eventHeader, offset);
fields.put(TYPE_FIELD, create("INSERT"));
record.getHeader().setAttribute(
OperationType.SDC_OPERATION_TYPE,
String.valueOf(OperationType.INSERT_CODE)
);
List<ColumnValue> columnValues = zipColumnsValues(
eventData.getIncludedColumns(),
table,
row
);
Map<String, Field> data = toMap(columnValues);
fields.put(DATA_FIELD, create(data));
record.set(create(fields));
res.add(record);
}
return res;
}
示例5
private List<Record> toRecords(Table table,
EventHeader eventHeader,
WriteRowsEventData eventData,
SourceOffset offset) {
List<Record> res = new ArrayList<>(eventData.getRows().size());
for (Serializable[] row : eventData.getRows()) {
Record record = recordFactory.create(offset.format());
Map<String, Field> fields = createHeader(table, eventHeader, offset);
fields.put(TYPE_FIELD, create("INSERT"));
record.getHeader().setAttribute(
OperationType.SDC_OPERATION_TYPE,
String.valueOf(OperationType.INSERT_CODE)
);
List<ColumnValue> columnValues = zipColumnsValues(
eventData.getIncludedColumns(),
table,
row
);
Map<String, Field> data = toMap(columnValues);
fields.put(DATA_FIELD, create(data));
record.set(create(fields));
res.add(record);
}
return res;
}
示例6
private void processWriteEvent(Event event) {
WriteRowsEventData data = event.getData();
Long tableId = data.getTableId();
List<Serializable[]> list = data.getRows();
for (Serializable[] row : list) {
addRow(EntryType.CREATE, tableId, row, null);
}
}
示例7
@Override
protected Set<ClientInfo> filter(Event event) {
WriteRowsEventData d = event.getData();
long tableId = d.getTableId();
TableMapEventData tableMapEventData = context.getTableMapData(tableId);
String tableKey = tableMapEventData.getDatabase().concat("/").concat(tableMapEventData.getTable());
return clientInfoMap.get(tableKey);
}
示例8
public static List<IndexedFullRow> getIndexedRows(SimpleEventType eventType, EventData data,
Set<Integer> primaryKeys) {
switch (eventType) {
case UPDATE:
return UpdateRowsEvent.getIndexedRows((UpdateRowsEventData) data);
case WRITE:
WriteRowsEventData write = (WriteRowsEventData) data;
return getIndexedRows(write.getRows(), write.getIncludedColumns());
case DELETE:
DeleteRowsEventData delete = (DeleteRowsEventData) data;
return getIndexedRows(delete.getRows(), delete.getIncludedColumns());
default:
throw new IllegalArgumentException("Unsupported event type");
}
}
示例9
public List<Record> toRecords(EnrichedEvent event) {
EventType eventType = event.getEvent().getHeader().getEventType();
switch (eventType) {
case PRE_GA_WRITE_ROWS:
case WRITE_ROWS:
case EXT_WRITE_ROWS:
return toRecords(
event.getTable(),
event.getEvent().getHeader(),
event.getEvent().<WriteRowsEventData>getData(),
event.getOffset()
);
case PRE_GA_UPDATE_ROWS:
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
return toRecords(
event.getTable(),
event.getEvent().getHeader(),
event.getEvent().<UpdateRowsEventData>getData(),
event.getOffset()
);
case PRE_GA_DELETE_ROWS:
case DELETE_ROWS:
case EXT_DELETE_ROWS:
return toRecords(
event.getTable(),
event.getEvent().getHeader(),
event.getEvent().<DeleteRowsEventData>getData(),
event.getOffset()
);
default:
throw new IllegalArgumentException(String.format("EventType '%s' not supported", eventType));
}
}
示例10
public List<Record> toRecords(EnrichedEvent event) {
EventType eventType = event.getEvent().getHeader().getEventType();
switch (eventType) {
case PRE_GA_WRITE_ROWS:
case WRITE_ROWS:
case EXT_WRITE_ROWS:
return toRecords(
event.getTable(),
event.getEvent().getHeader(),
event.getEvent().<WriteRowsEventData>getData(),
event.getOffset()
);
case PRE_GA_UPDATE_ROWS:
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
return toRecords(
event.getTable(),
event.getEvent().getHeader(),
event.getEvent().<UpdateRowsEventData>getData(),
event.getOffset()
);
case PRE_GA_DELETE_ROWS:
case DELETE_ROWS:
case EXT_DELETE_ROWS:
return toRecords(
event.getTable(),
event.getEvent().getHeader(),
event.getEvent().<DeleteRowsEventData>getData(),
event.getOffset()
);
default:
throw new IllegalArgumentException(String.format("EventType '%s' not supported", eventType));
}
}
示例11
@Test
public void testBinlog() throws IOException {
String hostname = "127.0.0.1", username = "yuwen", password = "lyp82nlf";
int port = 3306;
// BinaryLogClient其实就是一个连接数据库的客户端,
// 它加自己伪装成slave 连接到master上
BinaryLogClient client = new BinaryLogClient(
hostname, port, username, password
);
// 设置监听的Binlog, 如果不设置则监听最新的Binlog
//client.setBinlogFilename();
// 设置监听的binlog位置, 如果不设置则监听最新的位置
//client.setBinlogPosition();
// 注册事件监听器, 监听期间MySQL发生的一些变化, Event代表已经发生的事件
client.registerEventListener(event -> {
// MySQL 数据表发生变化的一些数据
EventData eventData = event.getData();
if (eventData instanceof UpdateRowsEventData) {
log.info("update event");
log.debug("{}", eventData);
} else if (eventData instanceof WriteRowsEventData) {
log.info("write event");
log.debug("{}", eventData);
} else if (eventData instanceof DeleteRowsEventData) {
log.info("delete event");
log.debug("{}", eventData);
}
});
// 连接到 master 开始监听
client.connect();
// 启动后手动连接到 MySQL执行
// insert into `ad_unit_keyword` (`unit_id`, `keyword`) values (10, '标志');
// 控制台得到日志
// 15:39:17.410 [main] INFO top.ezttf.ad.service.BinlogServiceTest - write event
// 15:39:17.459 [main] DEBUG top.ezttf.ad.service.BinlogServiceTest - WriteRowsEventData{tableId=122, includedColumns={0, 1, 2}, rows=[
// [13, 10, 标志]
// ]}
// WriteRowsEventData{tableId=118, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[
// [11, 666, plan, 1, 2019-01-01, 2019-01-01, Tue Jan 01 08:00:00 CST 2019, Tue Jan 01 08:00:00 CST 2019]
//]}
}
示例12
private Serializable getValue(WriteRowsEventData eventData, String columnName) {
if(columnOrders.containsKey(columnName)) {
return eventData.getRows().get(0)[columnOrders.get(columnName) - 1];
}
throw new RuntimeException("Column with name [" + columnName + "] not found");
}
示例13
@Override
public void onEvent(Event event) {
LOG.trace("Received event {}", event);
EventType eventType = event.getHeader().getEventType();
currentBinLogFileName = client.getBinlogFilename();
switch (eventType) {
case TABLE_MAP:
handleTableMappingEvent((TableMapEventData) event.getData());
break;
case PRE_GA_WRITE_ROWS:
case WRITE_ROWS:
case EXT_WRITE_ROWS:
handleRowEvent(event, event.<WriteRowsEventData>getData().getTableId());
break;
case PRE_GA_UPDATE_ROWS:
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
handleRowEvent(event, event.<UpdateRowsEventData>getData().getTableId());
break;
case PRE_GA_DELETE_ROWS:
case DELETE_ROWS:
case EXT_DELETE_ROWS:
handleRowEvent(event, event.<DeleteRowsEventData>getData().getTableId());
break;
case QUERY:
QueryEventData queryEventData = event.getData();
String query = queryEventData.getSql();
if (isCommit(query)) {
finishTx();
} else if (isSchemaChangeQuery(query)) {
schemaRepository.evictAll();
}
break;
case XID:
finishTx();
break;
case GTID:
GtidEventData eventData = event.getData();
currentGtidSet = client.getGtidSet();
currentTxGtid = eventData.getGtid();
currentTxEventSeqNo = 0;
LOG.trace("Started new tx, gtid: {}", currentTxGtid);
break;
default:
// ignore
break;
}
}
示例14
@Override
public void onEvent(Event event) {
LOG.trace("Received event {}", event);
EventType eventType = event.getHeader().getEventType();
currentBinLogFileName = client.getBinlogFilename();
switch (eventType) {
case TABLE_MAP:
handleTableMappingEvent((TableMapEventData) event.getData());
break;
case PRE_GA_WRITE_ROWS:
case WRITE_ROWS:
case EXT_WRITE_ROWS:
handleRowEvent(event, event.<WriteRowsEventData>getData().getTableId());
break;
case PRE_GA_UPDATE_ROWS:
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
handleRowEvent(event, event.<UpdateRowsEventData>getData().getTableId());
break;
case PRE_GA_DELETE_ROWS:
case DELETE_ROWS:
case EXT_DELETE_ROWS:
handleRowEvent(event, event.<DeleteRowsEventData>getData().getTableId());
break;
case QUERY:
QueryEventData queryEventData = event.getData();
String query = queryEventData.getSql();
if (isCommit(query)) {
finishTx();
} else if (isSchemaChangeQuery(query)) {
schemaRepository.evictAll();
}
break;
case XID:
finishTx();
break;
case GTID:
GtidEventData eventData = event.getData();
currentGtidSet = client.getGtidSet();
currentTxGtid = eventData.getGtid();
currentTxEventSeqNo = 0;
LOG.trace("Started new tx, gtid: {}", currentTxGtid);
break;
default:
// ignore
break;
}
}
示例15
public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, WriteRowsEventData data) {
super(tableInfo, INSERT_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
this.data = data;
}
示例16
M parseEventData(WriteRowsEventData eventData, String binlogFilename, long position) throws IOException;