Java源码示例:org.apache.curator.test.TestingCluster
示例1
/**
* @param instances Number of instances in cluster.
* @param customProps Custom configuration properties for every server.
* @return Test cluster.
*/
public static TestingCluster createTestingCluster(int instances, @Nullable Map<String,Object>[] customProps) {
String tmpDir;
tmpDir = System.getenv("TMPFS_ROOT") != null
? System.getenv("TMPFS_ROOT") : System.getProperty("java.io.tmpdir");
List<InstanceSpec> specs = new ArrayList<>();
for (int i = 0; i < instances; i++) {
File file = new File(tmpDir, "apacheIgniteTestZk-" + i);
if (file.isDirectory())
deleteRecursively0(file);
else {
if (!file.mkdirs())
throw new IgniteException("Failed to create directory for test Zookeeper server: " + file.getAbsolutePath());
}
Map<String,Object> props = customProps != null ? customProps[i] : null;
specs.add(new InstanceSpec(file, -1, -1, -1, true, -1, -1, 500, props));
}
return new TestingCluster(specs);
}
示例2
/**
* Before test.
*
* @throws Exception
*/
@Override public void beforeTest() throws Exception {
super.beforeTest();
// remove stale system properties
System.getProperties().remove(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING);
// start the ZK cluster
zkCluster = new TestingCluster(ZK_CLUSTER_SIZE);
zkCluster.start();
// start the Curator client so we can perform assertions on the ZK state later
zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000));
zkCurator.start();
}
示例3
@Before
public void setUp() throws Exception {
cluster = new TestingCluster(1);
cluster.start();
try(CuratorFramework zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build()) {
zooKeeperClient.start();
assertTrue(zooKeeperClient.blockUntilConnected(10, TimeUnit.SECONDS));
zooKeeperClient.create().forPath("/live_nodes");
zooKeeperClient.create().forPath("/live_nodes/host1:8983_solr");
zooKeeperClient.create().forPath("/live_nodes/host2:8983_solr");
zooKeeperClient.create().forPath("/live_nodes/host3:8983_solr");
}
manager = new SOLRZookeeperURLManager();
HaServiceConfig config = new DefaultHaServiceConfig("SOLR");
config.setEnabled(true);
config.setZookeeperEnsemble(cluster.getConnectString());
manager.setConfig(config);
}
示例4
private static void configureAndStartZKCluster() throws Exception {
// Configure security for the ZK cluster instances
Map<String, Object> customInstanceSpecProps = new HashMap<>();
customInstanceSpecProps.put("authProvider.1",
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
customInstanceSpecProps.put("requireClientAuthScheme", "sasl");
customInstanceSpecProps.put("admin.enableServer", false);
// Define the test cluster
List<InstanceSpec> instanceSpecs = new ArrayList<>();
for (int i = 0; i < 1; i++) {
InstanceSpec is = new InstanceSpec(null, -1, -1, -1, false, (i + 1), -1,
-1, customInstanceSpecProps);
instanceSpecs.add(is);
}
zkNodes = new TestingCluster(instanceSpecs);
// Start the cluster
zkNodes.start();
}
示例5
private void start0() {
zkPeers = createPeers(3);
zkAddresses = allocateAddresses(zkPeers);
peerCurators = createCurators(zkAddresses);
System.setProperty("zookeeper.jmx.log4j.disable", "true");
cluster = new TestingCluster(zkPeers);
zkServers = cluster.getServers();
try {
cluster.start();
} catch (Exception e) {
stop();
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
示例6
private Client(int id, TestingCluster cluster, Set<TaskId> executedTasks, CountDownLatch executedTasksLatch)
{
curator = CuratorFrameworkFactory.builder().connectString(cluster.getConnectString()).retryPolicy(new ExponentialBackoffRetry(10, 3)).build();
curator.start();
TestTaskExecutor taskExecutor = new TestTaskExecutor(6) {
@Override
protected void doRun(ExecutableTask task) throws InterruptedException
{
executedTasks.add(task.getTaskId());
timing.forWaiting().sleepABit();
executedTasksLatch.countDown();
}
};
workflowManager = WorkflowManagerBuilder.builder()
.addingTaskExecutor(taskExecutor, 10, taskType)
.withCurator(curator, "test", "1")
.withInstanceName("i-" + id)
.build();
workflowManager.start();
}
示例7
@Test
public void testSessionSurvives() throws Exception
{
Timing timing = new Timing();
CuratorFramework client = null;
TestingCluster cluster = new TestingCluster(3);
cluster.start();
try
{
client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
client.start();
client.create().withMode(CreateMode.EPHEMERAL).forPath("/temp", "value".getBytes());
Assert.assertNotNull(client.checkExists().forPath("/temp"));
for ( InstanceSpec spec : cluster.getInstances() )
{
cluster.killServer(spec);
timing.forWaiting().sleepABit();
cluster.restartServer(spec);
timing.sleepABit();
}
timing.sleepABit();
Assert.assertNotNull(client.checkExists().forPath("/temp"));
}
finally
{
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(cluster);
}
}
示例8
@BeforeMethod
public void setup() throws Exception
{
timing = new Timing();
cluster = new TestingCluster(3);
cluster.start();
client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
}
示例9
@Before
public void startTestCluster() throws Exception {
objectMapper = new ObjectMapper();
testingCluster = new TestingCluster(3);
testingCluster.start();
//registerService("localhost-1", 9000, 1);
//registerService("localhost-2", 9000, 1);
//registerService("localhost-3", 9000, 2);
}
示例10
@Before
public void startTestCluster() throws Exception {
objectMapper = new ObjectMapper();
testingCluster = new TestingCluster(3);
testingCluster.start();
registerService("localhost-1", 9000, 1);
//registerService("localhost-2", 9000, 1);
registerService("localhost-3", 9000, 2);
}
示例11
@Before
public void startTestCluster() throws Exception {
objectMapper = new ObjectMapper();
testingCluster = new TestingCluster(3);
testingCluster.start();
registerService("localhost-1", 9000, 1);
registerService("localhost-2", 9000, 1);
registerService("localhost-3", 9000, 1);
registerService("localhost-4", 9000, 2);
}
示例12
@Before
public void startTestCluster() throws Exception {
objectMapper = new ObjectMapper();
testingCluster = new TestingCluster(3);
testingCluster.start();
registerService("localhost-1", 9000, 1, 2);
registerService("localhost-2", 9000, 1, 3);
registerService("localhost-3", 9000, 2, 3);
}
示例13
@Before
public void startTestCluster() throws Exception {
objectMapper = new ObjectMapper();
testingCluster = new TestingCluster(3);
testingCluster.start();
curatorFramework = CuratorFrameworkFactory.builder()
.namespace("test")
.connectString(testingCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1000, 100)).build();
curatorFramework.start();
registerService("localhost-1", 9000, 1);
registerService("localhost-2", 9000, 1);
registerService("localhost-3", 9000, 2);
}
示例14
@Before
public void startTestCluster() throws Exception {
objectMapper = new ObjectMapper();
testingCluster = new TestingCluster(3);
testingCluster.start();
registerService("localhost-1", 9000, 1);
registerService("localhost-2", 9000, 1);
registerService("localhost-3", 9000, 2);
}
示例15
@Before
public void startTestCluster() throws Exception {
objectMapper = new ObjectMapper();
testingCluster = new TestingCluster(3);
testingCluster.start();
/* registering 3 with RotationMonitor on file and 1 on anotherFile */
registerService("localhost-1", 9000, 1, file);
registerService("localhost-2", 9000, 1, file);
registerService("localhost-3", 9000, 2, file);
registerService("localhost-4", 9000, 2, anotherFile);
serviceFinder = ServiceFinderBuilders.unshardedFinderBuilder()
.withConnectionString(testingCluster.getConnectString())
.withNamespace("test")
.withServiceName("test-service")
.withDeserializer(new Deserializer<UnshardedClusterInfo>() {
@Override
public ServiceNode<UnshardedClusterInfo> deserialize(byte[] data) {
try {
return objectMapper.readValue(data,
new TypeReference<ServiceNode<UnshardedClusterInfo>>() {
});
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
})
.build();
serviceFinder.start();
}
示例16
/**
* Wait for Zookeeper testing cluster ready for communications.
*
* @param zkCluster Zk cluster.
*/
static void waitForZkClusterReady(TestingCluster zkCluster) throws InterruptedException {
try (CuratorFramework curator = CuratorFrameworkFactory
.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1_000))) {
curator.start();
assertTrue("Failed to wait for Zookeeper testing cluster ready.",
curator.blockUntilConnected(30, SECONDS));
}
}
示例17
@Test
public void testUnsecuredZooKeeperWithSimpleRegistryConfig() throws Exception {
final String REGISTRY_CLIENT_NAME = "unsecured-zk-registry-name";
final String PRINCIPAL = null;
final String PWD = null;
final String CRED_ALIAS = null;
// Configure and start a secure ZK cluster
try (TestingCluster zkCluster = setupAndStartSecureTestZooKeeper(PRINCIPAL, PWD)) {
// Create the setup client for the test cluster, and initialize the test znodes
CuratorFramework setupClient = initializeTestClientAndZNodes(zkCluster, PRINCIPAL);
// Mock configuration
GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
final String registryConfigValue =
GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString();
EasyMock.expect(config.getRemoteRegistryConfiguration(REGISTRY_CLIENT_NAME))
.andReturn(registryConfigValue)
.anyTimes();
EasyMock.expect(config.getRemoteRegistryConfigurationNames())
.andReturn(Collections.singletonList(REGISTRY_CLIENT_NAME)).anyTimes();
EasyMock.replay(config);
doTestZooKeeperClient(setupClient, REGISTRY_CLIENT_NAME, config, CRED_ALIAS, PWD);
}
}
示例18
@Test
public void testZooKeeperWithSimpleRegistryConfig() throws Exception {
final String AUTH_TYPE = "digest";
final String REGISTRY_CLIENT_NAME = "zk-registry-name";
final String PRINCIPAL = "knox";
final String PWD = "knoxtest";
final String CRED_ALIAS = "zkCredential";
// Configure and start a secure ZK cluster
try (TestingCluster zkCluster = setupAndStartSecureTestZooKeeper(PRINCIPAL, PWD)) {
// Create the setup client for the test cluster, and initialize the test znodes
CuratorFramework setupClient = initializeTestClientAndZNodes(zkCluster, PRINCIPAL);
// Mock configuration
GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
final String registryConfigValue =
GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString() + ";" +
GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE + "=" + AUTH_TYPE + ";" +
GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL + "=" + PRINCIPAL + ";" +
GatewayConfig.REMOTE_CONFIG_REGISTRY_CREDENTIAL_ALIAS + "=" + CRED_ALIAS;
EasyMock.expect(config.getRemoteRegistryConfiguration(REGISTRY_CLIENT_NAME))
.andReturn(registryConfigValue)
.anyTimes();
EasyMock.expect(config.getRemoteRegistryConfigurationNames())
.andReturn(Collections.singletonList(REGISTRY_CLIENT_NAME)).anyTimes();
EasyMock.replay(config);
doTestZooKeeperClient(setupClient, REGISTRY_CLIENT_NAME, config, CRED_ALIAS, PWD);
}
}
示例19
/**
* Create a ZooKeeper client with SASL digest auth configured, and initialize the test znodes.
* @param zkCluster zkCluster to initialize
* @param principal principal for SASL digrest auth
* @throws Exception exception on failure
*/
private CuratorFramework initializeTestClientAndZNodes(TestingCluster zkCluster, String principal) throws Exception {
// Create the client for the test cluster
CuratorFramework setupClient = CuratorFrameworkFactory.builder()
.connectString(zkCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(100, 3))
.build();
assertNotNull(setupClient);
setupClient.start();
assertTrue(setupClient.blockUntilConnected(10, TimeUnit.SECONDS));
List<ACL> acls = new ArrayList<>();
if (principal != null) {
acls.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", principal)));
} else {
acls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
}
setupClient.create().creatingParentsIfNeeded().withACL(acls).forPath("/knox/config/descriptors");
setupClient.create().creatingParentsIfNeeded().withACL(acls).forPath("/knox/config/shared-providers");
List<ACL> negativeACLs = new ArrayList<>();
if (principal != null) {
negativeACLs.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", "notyou")));
} else {
negativeACLs.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
}
setupClient.create().creatingParentsIfNeeded().withACL(negativeACLs).forPath("/someotherconfig");
return setupClient;
}
示例20
@Before
public void setUp() throws Exception {
cluster = new TestingCluster(1);
cluster.start();
try (CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder()
.connectString(cluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build()) {
zooKeeperClient.start();
assertTrue(zooKeeperClient.blockUntilConnected(10, TimeUnit.SECONDS));
zooKeeperClient.create().forPath("/brokers");
zooKeeperClient.create().forPath("/brokers/ids");
}
}
示例21
@Before
public void setUp() throws Exception {
cluster = new TestingCluster(1);
cluster.start();
try(CuratorFramework zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build()) {
String host1 = "hive.server2.authentication=NONE;hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice;" +
"hive.server2.thrift.http.port=10001;hive.server2.thrift.bind.host=host1;hive.server2.use.SSL=true";
String host2 = "hive.server2.authentication=NONE;hive.server2.transport.mode=http;hive.server2.thrift.http.path=foobar;" +
"hive.server2.thrift.http.port=10002;hive.server2.thrift.bind.host=host2;hive.server2.use.SSL=false";
String host3 = "hive.server2.authentication=NONE;hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice;" +
"hive.server2.thrift.http.port=10003;hive.server2.thrift.bind.host=host3;hive.server2.use.SSL=false";
String host4 = "hive.server2.authentication=NONE;hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice;" +
"hive.server2.thrift.http.port=10004;hive.server2.thrift.bind.host=host4;hive.server2.use.SSL=true";
zooKeeperClient.start();
assertTrue(zooKeeperClient.blockUntilConnected(10, TimeUnit.SECONDS));
zooKeeperClient.create().forPath("/hiveServer2");
zooKeeperClient.create().forPath("/hiveServer2/host1", host1.getBytes(StandardCharsets.UTF_8));
zooKeeperClient.create().forPath("/hiveServer2/host2", host2.getBytes(StandardCharsets.UTF_8));
zooKeeperClient.create().forPath("/hiveServer2/host3", host3.getBytes(StandardCharsets.UTF_8));
zooKeeperClient.create().forPath("/hiveServer2/host4", host4.getBytes(StandardCharsets.UTF_8));
}
manager = new HS2ZookeeperURLManager();
HaServiceConfig config = new DefaultHaServiceConfig("HIVE");
config.setEnabled(true);
config.setZookeeperEnsemble(cluster.getConnectString());
config.setZookeeperNamespace("hiveServer2");
manager.setConfig(config);
}
示例22
@Before
public void setUp() throws Exception {
cluster = new TestingCluster(1);
cluster.start();
try(CuratorFramework zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(cluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build()) {
zooKeeperClient.start();
assertTrue(zooKeeperClient.blockUntilConnected(10, TimeUnit.SECONDS));
zooKeeperClient.create().forPath("/apache_atlas");
zooKeeperClient.create().forPath("/apache_atlas/active_server_info");
zooKeeperClient.setData().forPath("/apache_atlas/active_server_info",
atlasNode1.getBytes(StandardCharsets.UTF_8));
}
setAtlasActiveHostURLInZookeeper(atlasNode1);
manager = new AtlasZookeeperURLManager();
HaServiceConfig config = new DefaultHaServiceConfig("ATLAS-API");
config.setEnabled(true);
config.setZookeeperEnsemble(cluster.getConnectString());
config.setZookeeperNamespace("apache_atlas");
manager.setConfig(config);
}
示例23
private static void configureAndStartZKCluster() throws Exception {
// Configure security for the ZK cluster instances
Map<String, Object> customInstanceSpecProps = new HashMap<>();
customInstanceSpecProps.put("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
customInstanceSpecProps.put("requireClientAuthScheme", "sasl");
customInstanceSpecProps.put("admin.enableServer", false);
// Define the test cluster
List<InstanceSpec> instanceSpecs = new ArrayList<>();
for (int i = 0 ; i < 1 ; i++) {
InstanceSpec is = new InstanceSpec(null, -1, -1, -1, false, (i+1), -1, -1, customInstanceSpecProps);
instanceSpecs.add(is);
}
zkCluster = new TestingCluster(instanceSpecs);
// Start the cluster
zkCluster.start();
// Create the client for the test cluster
client = CuratorFrameworkFactory.builder()
.connectString(zkCluster.getConnectString())
.retryPolicy(new ExponentialBackoffRetry(100, 3))
.build();
assertNotNull(client);
client.start();
boolean connected = client.blockUntilConnected(10, TimeUnit.SECONDS);
assertTrue(connected);
// Create the knox config paths with an ACL for the sasl user configured for the client
List<ACL> acls = new ArrayList<>();
acls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_DESCRIPTORS);
assertNotNull("Failed to create node:" + PATH_KNOX_DESCRIPTORS,
client.checkExists().forPath(PATH_KNOX_DESCRIPTORS));
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_PROVIDERS);
assertNotNull("Failed to create node:" + PATH_KNOX_PROVIDERS,
client.checkExists().forPath(PATH_KNOX_PROVIDERS));
}
示例24
@Test
public void testAdd() throws Exception
{
try ( CuratorFramework client = newClient())
{
client.start();
QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
assertConfig(oldConfig, cluster.getInstances());
CountDownLatch latch = setChangeWaiter(client);
try ( TestingCluster newCluster = new TestingCluster(TestingCluster.makeSpecs(1, false)) )
{
newCluster.start();
client.reconfig().joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble();
Assert.assertTrue(timing.awaitLatch(latch));
byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
newInstances.addAll(newCluster.getInstances());
assertConfig(newConfig, newInstances);
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}
示例25
private Map<String, String> buildActual(TestingCluster cluster)
{
Map<String, String> actual = new HashMap<>();
try (CuratorFramework client = buildClient(cluster.getConnectString()))
{
client.start();
buildActual(client, actual, BASE_PATH);
}
return actual;
}
示例26
private List<Client> buildClients(TestingCluster cluster, int clientQty, AtomicReference<Exception> errorSignal)
{
return IntStream.range(0, clientQty)
.mapToObj(index -> new Client(index, cluster.getConnectString(), errorSignal))
.peek(Client::start)
.collect(Collectors.toList());
}
示例27
private void initializeBasePath(TestingCluster cluster) throws Exception
{
try (CuratorFramework client = buildClient(cluster.getConnectString()))
{
client.start();
client.create().forPath(BASE_PATH, "".getBytes());
}
}
示例28
@BeforeMethod
public void setup() throws Exception
{
cluster = new TestingCluster(3);
cluster.start();
clients = Lists.newArrayList();
executedTasks = Sets.newConcurrentHashSet();
executedTasksLatch = new CountDownLatch(6);
}
示例29
@Before
public void startTestCluster() throws Exception {
objectMapper = new ObjectMapper();
testingCluster = new TestingCluster(3);
testingCluster.start();
}
示例30
@Nullable
public TestingCluster getZooKeeperCluster() {
return zooKeeperCluster;
}