Java源码示例:com.google.code.or.binlog.impl.event.WriteRowsEvent
示例1
/**
* This creates the Data Event based on the parameters passed. Delete/Write Only.
*/
private static BinlogEventV4 formDWDataEvent(long tableId, int eventType, List<MySqlValue> valueList) {
BinlogEventV4HeaderImpl eventV4Header = new BinlogEventV4HeaderImpl();
eventV4Header.setTimestamp(System.currentTimeMillis());
eventV4Header.setEventType(eventType);
eventV4Header.setServerId(1);
if (eventType == MySQLConstants.WRITE_ROWS_EVENT_V2) {
WriteRowsEventV2 writeRowsEventV2 = new WriteRowsEventV2(eventV4Header);
writeRowsEventV2.setTableId(tableId);
writeRowsEventV2.setRows(getRows(valueList));
return writeRowsEventV2;
} else if (eventType == MySQLConstants.WRITE_ROWS_EVENT) {
WriteRowsEvent writeRowsEvent = new WriteRowsEvent(eventV4Header);
writeRowsEvent.setTableId(tableId);
writeRowsEvent.setRows(getRows(valueList));
return writeRowsEvent;
} else if (eventType == MySQLConstants.DELETE_ROWS_EVENT_V2) {
DeleteRowsEventV2 deleteRowsEventV2 = new DeleteRowsEventV2(eventV4Header);
deleteRowsEventV2.setTableId(tableId);
deleteRowsEventV2.setRows(getRows(valueList));
return deleteRowsEventV2;
} else if (eventType == MySQLConstants.DELETE_ROWS_EVENT) {
DeleteRowsEvent deleteRowsEvent = new DeleteRowsEvent(eventV4Header);
deleteRowsEvent.setTableId(tableId);
deleteRowsEvent.setRows(getRows(valueList));
return deleteRowsEvent;
}
return null;
}
示例2
@Override
public void onEvents(BinlogEventV4 event) {
if (_log.isTraceEnabled()) {
_log.trace(event.toString());
}
try {
if (event instanceof XidEvent) {
onCommit((XidEvent)event);
}
else if (event instanceof QueryEvent) {
onQuery((QueryEvent)event);
getCheckPoint().setSlaveLogPosition(event.getHeader().getNextPosition());
}
else if (event instanceof RotateEvent) {
onRotate((RotateEvent)event);
}
else if (event instanceof WriteRowsEvent) {
onWriteRows((WriteRowsEvent)event);
getCheckPoint().setSlaveLogPosition(event.getHeader().getNextPosition());
}
else if (event instanceof WriteRowsEventV2) {
onWriteRows((WriteRowsEventV2)event);
getCheckPoint().setSlaveLogPosition(event.getHeader().getNextPosition());
}
else if (event instanceof UpdateRowsEvent) {
onUpdateRows((UpdateRowsEvent)event);
getCheckPoint().setSlaveLogPosition(event.getHeader().getNextPosition());
}
else if (event instanceof UpdateRowsEventV2) {
onUpdateRows((UpdateRowsEventV2)event);
getCheckPoint().setSlaveLogPosition(event.getHeader().getNextPosition());
}
else if (event instanceof DeleteRowsEvent) {
onDeleteRows((DeleteRowsEvent)event);
getCheckPoint().setSlaveLogPosition(event.getHeader().getNextPosition());
}
else if (event instanceof DeleteRowsEventV2) {
onDeleteRows((DeleteRowsEventV2)event);
getCheckPoint().setSlaveLogPosition(event.getHeader().getNextPosition());
}
else if (event instanceof TableMapEvent) {
onTableMap((TableMapEvent)event);
}
}
catch (JumpException ignored) {
// skip on purpose
}
catch (Exception x) {
_log.error("replication failed: {}", event, x);
StringBuffer buf = new StringBuffer();
this.tableById.forEach((id, name) -> {
buf.append(id);
buf.append("=[");
buf.append(name[0]);
buf.append(",");
buf.append(name[1]);
buf.append("],");
});
buf.deleteCharAt(buf.length()-1);
_log.error("table map: {}", buf.toString());
stop();
}
}
示例3
private void onWriteRows(WriteRowsEvent event) throws SQLException{
_insertCount++;
long tableId = event.getTableId();
executeInsertPstmt(tableId, event.getRows());
}