Java源码示例:org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider

示例1
public static ZkClient fromExhibitor(Collection<String> serverSet, int restPort) {
  try {
    Exhibitors exhibitors = new Exhibitors(serverSet, restPort, () -> "");
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    ExhibitorEnsembleProvider ensemble =
        new ExhibitorEnsembleProvider(
            exhibitors,
            new DefaultExhibitorRestClient(),
            "/exhibitor/v1/cluster/list",
            61000,
            retryPolicy);
    ensemble.pollForInitialEnsemble();
    CuratorFramework curatorClient =
        CuratorFrameworkFactory.builder()
            .ensembleProvider(ensemble)
            .retryPolicy(retryPolicy)
            .build();
    return new ZkClient(curatorClient);
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
 
示例2
private EnsembleProvider createEnsembleProvider() throws Exception {
    switch (conn.getType()) {
        case EXHIBITOR:
            final Exhibitors exhibitors = new Exhibitors(
                    conn.getAddresses().stream().map(AddressPort::getAddress).collect(Collectors.toList()),
                    conn.getAddresses().get(0).getPort(),
                    () -> {
                        throw new RuntimeException("There is no backup connection string (or it is wrong)");
                    });
            final ExhibitorRestClient exhibitorRestClient = new DefaultExhibitorRestClient();
            final ExhibitorEnsembleProvider result = new ExhibitorEnsembleProvider(
                    exhibitors,
                    exhibitorRestClient,
                    "/exhibitor/v1/cluster/list",
                    EXHIBITOR_POLLING_MS,
                    new ExponentialBackoffRetry(EXHIBITOR_RETRY_TIME, EXHIBITOR_RETRY_MAX)) {
                @Override
                public String getConnectionString() {
                    return super.getConnectionString() + conn.getPathPrepared();
                }
            };
            result.pollForInitialEnsemble();
            return result;
        case ZOOKEEPER:
            final String addressesJoined = conn.getAddresses().stream()
                    .map(AddressPort::asAddressPort)
                    .collect(Collectors.joining(","));
            return new ChrootedFixedEnsembleProvider(addressesJoined, conn.getPathPrepared());
        default:
            throw new RuntimeException("Connection type " + conn.getType() + " is not supported");
    }
}
 
示例3
private EnsembleProvider buildEnsembleProvider(ExhibitorConfig config) throws Exception {
  Exhibitors exhibitors = new Exhibitors(config.getHostnames(), config.getRestPort(), () -> "");
  RetryPolicy retryPolicy = buildRetryPolicy(config.getRetryConfig());
  ExhibitorEnsembleProvider ensembleProvider =
      new ExhibitorEnsembleProvider(
          exhibitors,
          new DefaultExhibitorRestClient(),
          config.getRestUriPath(),
          config.getPollingMs(),
          retryPolicy);
  ensembleProvider.pollForInitialEnsemble();
  return ensembleProvider;
}
 
示例4
private CuratorFramework buildCuratorWithExhibitor(Configuration configuration) {
      LOGGER.debug("configuring zookeeper connection through Exhibitor...");
      ExhibitorEnsembleProvider ensembleProvider =
              new KixeyeExhibitorEnsembleProvider(
                      exhibitors,
                      new KixeyeExhibitorRestClient(configuration.getBoolean(EXHIBITOR_USE_HTTPS.getPropertyName())),
                      configuration.getString(EXHIBITOR_URI_PATH.getPropertyName()),
                      configuration.getInt(EXHIBITOR_POLL_INTERVAL.getPropertyName()),
                      new ExponentialBackoffRetry(
                              configuration.getInt(EXHIBITOR_INITIAL_SLEEP_MILLIS.getPropertyName()),
                              configuration.getInt(EXHIBITOR_MAX_RETRIES.getPropertyName()),
                              configuration.getInt(EXHIBITOR_RETRIES_MAX_MILLIS.getPropertyName())));

      //without this (undocumented) step, curator will attempt (and fail) to connect to a local instance of zookeeper (default behavior if no zookeeper connection string is provided) for
      //several seconds until the EnsembleProvider polls to get the SERVER list from Exhibitor. Polling before staring curator
      //ensures that the SERVER list from Exhibitor is already downloaded before curator attempts to connect to zookeeper.
      try {
          ensembleProvider.pollForInitialEnsemble();
      } catch (Exception e) {
          try {
              Closeables.close(ensembleProvider, true);
          } catch (IOException e1) {
          }
          throw new BootstrapException("Failed to initialize Exhibitor with host(s) " + exhibitors.getHostnames(), e);
      }
      
      CuratorFramework curator = CuratorFrameworkFactory.builder().ensembleProvider(ensembleProvider).retryPolicy(buildZookeeperRetryPolicy(configuration)).build();
      curator.getConnectionStateListenable().addListener(new ConnectionStateListener() {
	public void stateChanged(CuratorFramework client, ConnectionState newState) {
		LOGGER.debug("Connection state to ZooKeeper changed: " + newState);
	}
});
      
      return curator;
  }
 
示例5
private CuratorFramework makeCurator(final String connectString, int baseSleepTimeMs, int maxRetries, int exhibitorPort, String exhibitorRestPath, int pollingMs)
{
    List<String>    hostnames = Lists.newArrayList();
    String[]        parts = connectString.split(",");
    for ( String spec : parts )
    {
        String[]        subParts = spec.split(":");
        try
        {
            if ( subParts.length != 2 )
            {
                log.error("Bad connection string: " + connectString);
                return null;
            }
        }
        catch ( NumberFormatException e )
        {
            log.error("Bad connection string: " + connectString);
            return null;
        }

        hostnames.add(subParts[0]);
    }

    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
    Exhibitors.BackupConnectionStringProvider   backupConnectionStringProvider = new Exhibitors.BackupConnectionStringProvider()
    {
        @Override
        public String getBackupConnectionString() throws Exception
        {
            return connectString;
        }
    };

    CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory
        .builder()
        .connectString(connectString)
        .retryPolicy(retryPolicy);
    if ( exhibitorPort > 0 )
    {
        Exhibitors                  exhibitors = new Exhibitors(hostnames, exhibitorPort, backupConnectionStringProvider);
        ExhibitorEnsembleProvider   ensembleProvider = new ExhibitorEnsembleProvider(exhibitors, new DefaultExhibitorRestClient(), exhibitorRestPath + "exhibitor/v1/cluster/list", pollingMs, retryPolicy);
        builder = builder.ensembleProvider(ensembleProvider);
    }
    else
    {
        log.warn("Exhibitor on the shared ZooKeeper config ensemble is not being used.");
    }
    return builder.build();
}