Java源码示例:org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider
示例1
public static ZkClient fromExhibitor(Collection<String> serverSet, int restPort) {
try {
Exhibitors exhibitors = new Exhibitors(serverSet, restPort, () -> "");
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
ExhibitorEnsembleProvider ensemble =
new ExhibitorEnsembleProvider(
exhibitors,
new DefaultExhibitorRestClient(),
"/exhibitor/v1/cluster/list",
61000,
retryPolicy);
ensemble.pollForInitialEnsemble();
CuratorFramework curatorClient =
CuratorFrameworkFactory.builder()
.ensembleProvider(ensemble)
.retryPolicy(retryPolicy)
.build();
return new ZkClient(curatorClient);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
示例2
private EnsembleProvider createEnsembleProvider() throws Exception {
switch (conn.getType()) {
case EXHIBITOR:
final Exhibitors exhibitors = new Exhibitors(
conn.getAddresses().stream().map(AddressPort::getAddress).collect(Collectors.toList()),
conn.getAddresses().get(0).getPort(),
() -> {
throw new RuntimeException("There is no backup connection string (or it is wrong)");
});
final ExhibitorRestClient exhibitorRestClient = new DefaultExhibitorRestClient();
final ExhibitorEnsembleProvider result = new ExhibitorEnsembleProvider(
exhibitors,
exhibitorRestClient,
"/exhibitor/v1/cluster/list",
EXHIBITOR_POLLING_MS,
new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX)) {
@Override
public String getConnectionString() {
return super.getConnectionString() + conn.getPathPrepared();
}
};
result.pollForInitialEnsemble();
return result;
case ZOOKEEPER:
final String addressesJoined = conn.getAddresses().stream()
.map(AddressPort::asAddressPort)
.collect(Collectors.joining(","));
return new ChrootedFixedEnsembleProvider(addressesJoined, conn.getPathPrepared());
default:
throw new RuntimeException("Connection type " + conn.getType() + " is not supported");
}
}
示例3
private EnsembleProvider buildEnsembleProvider(ExhibitorConfig config) throws Exception {
Exhibitors exhibitors = new Exhibitors(config.getHostnames(), config.getRestPort(), () -> "");
RetryPolicy retryPolicy = buildRetryPolicy(config.getRetryConfig());
ExhibitorEnsembleProvider ensembleProvider =
new ExhibitorEnsembleProvider(
exhibitors,
new DefaultExhibitorRestClient(),
config.getRestUriPath(),
config.getPollingMs(),
retryPolicy);
ensembleProvider.pollForInitialEnsemble();
return ensembleProvider;
}
示例4
private CuratorFramework buildCuratorWithExhibitor(Configuration configuration) {
LOGGER.debug("configuring zookeeper connection through Exhibitor...");
ExhibitorEnsembleProvider ensembleProvider =
new KixeyeExhibitorEnsembleProvider(
exhibitors,
new KixeyeExhibitorRestClient(configuration.getBoolean(EXHIBITOR_USE_HTTPS.getPropertyName())),
configuration.getString(EXHIBITOR_URI_PATH.getPropertyName()),
configuration.getInt(EXHIBITOR_POLL_INTERVAL.getPropertyName()),
new ExponentialBackoffRetry(
configuration.getInt(EXHIBITOR_INITIAL_SLEEP_MILLIS.getPropertyName()),
configuration.getInt(EXHIBITOR_MAX_RETRIES.getPropertyName()),
configuration.getInt(EXHIBITOR_RETRIES_MAX_MILLIS.getPropertyName())));
//without this (undocumented) step, curator will attempt (and fail) to connect to a local instance of zookeeper (default behavior if no zookeeper connection string is provided) for
//several seconds until the EnsembleProvider polls to get the SERVER list from Exhibitor. Polling before staring curator
//ensures that the SERVER list from Exhibitor is already downloaded before curator attempts to connect to zookeeper.
try {
ensembleProvider.pollForInitialEnsemble();
} catch (Exception e) {
try {
Closeables.close(ensembleProvider, true);
} catch (IOException e1) {
}
throw new BootstrapException("Failed to initialize Exhibitor with host(s) " + exhibitors.getHostnames(), e);
}
CuratorFramework curator = CuratorFrameworkFactory.builder().ensembleProvider(ensembleProvider).retryPolicy(buildZookeeperRetryPolicy(configuration)).build();
curator.getConnectionStateListenable().addListener(new ConnectionStateListener() {
public void stateChanged(CuratorFramework client, ConnectionState newState) {
LOGGER.debug("Connection state to ZooKeeper changed: " + newState);
}
});
return curator;
}
示例5
private CuratorFramework makeCurator(final String connectString, int baseSleepTimeMs, int maxRetries, int exhibitorPort, String exhibitorRestPath, int pollingMs)
{
List<String> hostnames = Lists.newArrayList();
String[] parts = connectString.split(",");
for ( String spec : parts )
{
String[] subParts = spec.split(":");
try
{
if ( subParts.length != 2 )
{
log.error("Bad connection string: " + connectString);
return null;
}
}
catch ( NumberFormatException e )
{
log.error("Bad connection string: " + connectString);
return null;
}
hostnames.add(subParts[0]);
}
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
Exhibitors.BackupConnectionStringProvider backupConnectionStringProvider = new Exhibitors.BackupConnectionStringProvider()
{
@Override
public String getBackupConnectionString() throws Exception
{
return connectString;
}
};
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory
.builder()
.connectString(connectString)
.retryPolicy(retryPolicy);
if ( exhibitorPort > 0 )
{
Exhibitors exhibitors = new Exhibitors(hostnames, exhibitorPort, backupConnectionStringProvider);
ExhibitorEnsembleProvider ensembleProvider = new ExhibitorEnsembleProvider(exhibitors, new DefaultExhibitorRestClient(), exhibitorRestPath + "exhibitor/v1/cluster/list", pollingMs, retryPolicy);
builder = builder.ensembleProvider(ensembleProvider);
}
else
{
log.warn("Exhibitor on the shared ZooKeeper config ensemble is not being used.");
}
return builder.build();
}