Java源码示例:com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager
示例1
@Before
public void setUp() {
CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
Canal canal = buildCanal();
return new CanalInstanceWithManager(canal, FILTER);
}
});
nettyServer = CanalServerWithNetty.instance();
nettyServer.setEmbeddedServer(embeddedServer);
nettyServer.setPort(1088);
nettyServer.start();
}
示例2
@Before
public void setUp() {
CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
Canal canal = buildCanal();
return new CanalInstanceWithManager(canal, FILTER);
}
});
nettyServer = CanalServerWithNetty.instance();
nettyServer.setEmbeddedServer(embeddedServer);
nettyServer.setPort(1088);
nettyServer.start();
}
示例3
@Before
public void setUp() {
server = CanalServerWithEmbedded.instance();
server.setCanalInstanceGenerator(new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
Canal canal = buildCanal();
return new CanalInstanceWithManager(canal, FILTER);
}
});
server.start();
server.start(DESTINATION);
}
示例4
@SuppressWarnings("unchecked")
private CanalInstanceWithManager buildWorkingInstance() {
StringBuilder gwFilters = new StringBuilder();
for (Map.Entry<String, RegisterItem> entry : endpointInstances.entrySet()) {
if (gwFilters.length() > 0) {
gwFilters.append(",");
}
gwFilters.append(entry.getValue().filter);
}
String gwFilter = gwFilters.toString();
logger.info("The gwFilter for GatewayInstance is {}.", gwFilter);
//多路复用的情况下,filter为注册GatewayInstance的所有endpointInstances filter的并集
return new CustomCanalInstanceWithManager(gwCanal, gwFilter) {
@Override
protected void initEventSink() {
//因为sddl数据源暂不支持读端数据复用,所以目前initEventSink()方法创建的肯定是EntryEventSink
logger.info("init gateway eventSink begin...");
super.initEventSink();
eventSink = new GatewayEventSink(gwCanal, eventSink, (GatewayEventStore) eventStore);
logger.info("init gateway eventSink end! \n\t load CanalEventSink:{}", eventSink.getClass().getName());
}
@Override
protected void initEventStore() {
logger.info("init gateway eventStore begin...");
eventStore = new GatewayEventStore(gwCanal);
logger.info("init gateway eventStore end! \n\t load CanalEventStore:{}", eventStore.getClass().getName());
}
@Override
protected void initMetaManager() {
logger.info("init gateway metaManager begin...");
metaManager = new GatewayMetaManager(gwCanal);
logger.info("init gateway metaManager end! \n\t load CanalMetaManager:{}", metaManager.getClass().getName());
}
};
}
示例5
@Before
public void setUp() {
server = CanalServerWithEmbedded.instance();
server.setCanalInstanceGenerator(new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
Canal canal = buildCanal();
return new CanalInstanceWithManager(canal, FILTER);
}
});
server.start();
server.start(DESTINATION);
}