Java源码示例:com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager

示例1
@Before
public void setUp() {
    CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
    embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {

        public CanalInstance generate(String destination) {
            Canal canal = buildCanal();
            return new CanalInstanceWithManager(canal, FILTER);
        }
    });

    nettyServer = CanalServerWithNetty.instance();
    nettyServer.setEmbeddedServer(embeddedServer);
    nettyServer.setPort(1088);
    nettyServer.start();
}
 
示例2
@Before
public void setUp() {
    CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded();
    embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {

        public CanalInstance generate(String destination) {
            Canal canal = buildCanal();
            return new CanalInstanceWithManager(canal, FILTER);
        }
    });

    nettyServer = CanalServerWithNetty.instance();
    nettyServer.setEmbeddedServer(embeddedServer);
    nettyServer.setPort(1088);
    nettyServer.start();
}
 
示例3
@Before
public void setUp() {
    server = CanalServerWithEmbedded.instance();
    server.setCanalInstanceGenerator(new CanalInstanceGenerator() {

        public CanalInstance generate(String destination) {
            Canal canal = buildCanal();
            return new CanalInstanceWithManager(canal, FILTER);
        }
    });
    server.start();
    server.start(DESTINATION);
}
 
示例4
@SuppressWarnings("unchecked")
private CanalInstanceWithManager buildWorkingInstance() {

    StringBuilder gwFilters = new StringBuilder();
    for (Map.Entry<String, RegisterItem> entry : endpointInstances.entrySet()) {
        if (gwFilters.length() > 0) {
            gwFilters.append(",");
        }
        gwFilters.append(entry.getValue().filter);
    }
    String gwFilter = gwFilters.toString();
    logger.info("The gwFilter for GatewayInstance is {}.", gwFilter);

    //多路复用的情况下,filter为注册GatewayInstance的所有endpointInstances filter的并集
    return new CustomCanalInstanceWithManager(gwCanal, gwFilter) {

        @Override
        protected void initEventSink() {
            //因为sddl数据源暂不支持读端数据复用,所以目前initEventSink()方法创建的肯定是EntryEventSink
            logger.info("init gateway eventSink begin...");
            super.initEventSink();
            eventSink = new GatewayEventSink(gwCanal, eventSink, (GatewayEventStore) eventStore);
            logger.info("init gateway eventSink end! \n\t load CanalEventSink:{}", eventSink.getClass().getName());
        }

        @Override
        protected void initEventStore() {
            logger.info("init gateway eventStore begin...");
            eventStore = new GatewayEventStore(gwCanal);
            logger.info("init gateway eventStore end! \n\t load CanalEventStore:{}", eventStore.getClass().getName());
        }

        @Override
        protected void initMetaManager() {
            logger.info("init gateway metaManager begin...");
            metaManager = new GatewayMetaManager(gwCanal);
            logger.info("init gateway metaManager end! \n\t load CanalMetaManager:{}", metaManager.getClass().getName());
        }
    };
}
 
示例5
@Before
public void setUp() {
    server = CanalServerWithEmbedded.instance();
    server.setCanalInstanceGenerator(new CanalInstanceGenerator() {

        public CanalInstance generate(String destination) {
            Canal canal = buildCanal();
            return new CanalInstanceWithManager(canal, FILTER);
        }
    });
    server.start();
    server.start(DESTINATION);
}