Configurator getContainerConfigurator() {
if (containerConfigurator == null) {
containerConfigurator = ServiceLoader.load(javax.websocket.server.ServerEndpointConfig.Configurator.class)
.findFirst().orElse(new ContainerDefaultConfigurator());
}
return containerConfigurator;
}
@Override
protected void registerWebSocketEndpoints(ServerContainer container) {
try {
final ListeningScheduledExecutorService exec = MoreExecutors.listeningDecorator(
Executors.newScheduledThreadPool(
config.getInt(KsqlRestConfig.KSQL_WEBSOCKETS_NUM_THREADS),
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("websockets-query-thread-%d")
.build()
)
);
final ObjectMapper mapper = getJsonMapper();
final StatementParser statementParser = new StatementParser(ksqlEngine);
container.addEndpoint(
ServerEndpointConfig.Builder
.create(
WSQueryEndpoint.class,
WSQueryEndpoint.class.getAnnotation(ServerEndpoint.class).value()
)
.configurator(new Configurator() {
@Override
@SuppressWarnings("unchecked")
public <T> T getEndpointInstance(Class<T> endpointClass) {
return (T) new WSQueryEndpoint(
mapper,
statementParser,
ksqlEngine,
exec
);
}
})
.build()
);
} catch (DeploymentException e) {
log.error("Unable to create websockets endpoint", e);
}
}