Java源码示例:org.apache.twill.zookeeper.ZKClientService
示例1
/**
* Sets up common resources required by all clients.
*/
public void init() throws IOException {
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
txClient = injector.getInstance(TransactionServiceClient.class);
conn = ConnectionFactory.createConnection(conf);
createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
}
示例2
/**
* Sets up common resources required by all clients.
*/
public void init() throws IOException {
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
txClient = injector.getInstance(TransactionServiceClient.class);
conn = ConnectionFactory.createConnection(conf);
createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
}
示例3
/**
* Sets up common resources required by all clients.
*/
public void init() throws IOException {
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
txClient = injector.getInstance(TransactionServiceClient.class);
conn = ConnectionFactory.createConnection(conf);
createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
}
示例4
/**
* Sets up common resources required by all clients.
*/
public void init() throws IOException {
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
txClient = injector.getInstance(TransactionServiceClient.class);
createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
conn = HConnectionManager.createConnection(conf);
}
示例5
/**
* Invoked by jsvc to start the program.
*/
public void start() throws Exception {
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
ZKClientService zkClientService = injector.getInstance(ZKClientService.class);
zkClientService.startAndWait();
// start a tx server
txService = injector.getInstance(TransactionService.class);
try {
LOG.info("Starting {}", getClass().getSimpleName());
txService.startAndWait();
} catch (Exception e) {
System.err.println("Failed to start service: " + e.getMessage());
}
}
示例6
@Provides
@Singleton
private ZKClientService provideZKClientService(Configuration conf) {
String zkStr = conf.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
if (zkStr == null) {
// Default to HBase one.
zkStr = conf.get(TxConstants.HBase.ZOOKEEPER_QUORUM);
}
int timeOut = conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT, TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT);
ZKClientService zkClientService = new TephraZKClientService(zkStr, timeOut, null,
ArrayListMultimap.<String, byte[]>create());
return ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(zkClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
)
)
);
}
示例7
private void expireZkSession(ZKClientService zkClientService) throws Exception {
ZooKeeper zooKeeper = zkClientService.getZooKeeperSupplier().get();
final SettableFuture<?> connectFuture = SettableFuture.create();
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectFuture.set(null);
}
}
};
// Create another Zookeeper session with the same sessionId so that the original one expires.
ZooKeeper dupZookeeper =
new ZooKeeper(zkClientService.getConnectString(), zooKeeper.getSessionTimeout(), watcher,
zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
connectFuture.get(30, TimeUnit.SECONDS);
Assert.assertEquals("Failed to re-create current session", dupZookeeper.getState(), ZooKeeper.States.CONNECTED);
dupZookeeper.close();
}
示例8
@Test(timeout = 20000)
public void testReentrant() {
// Test the lock is reentrant from the same thread
ZKClientService zkClient = createZKClient();
try {
ReentrantDistributedLock lock = new ReentrantDistributedLock(zkClient, "reentrant");
lock.lock();
try {
try {
lock.lock();
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
} finally {
zkClient.stopAndWait();
}
}
示例9
@Override
public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo, int port) {
config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, port);
int retryTimeOut = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC,
TxConstants.Service.DEFAULT_DATA_TX_CLIENT_DISCOVERY_TIMEOUT_SEC);
ZKClientService zkClient = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(
ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
.setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
.build(),
RetryStrategies.exponentialDelay(500, retryTimeOut, TimeUnit.MILLISECONDS)
)
)
);
DiscoveryService discovery = new ZKDiscoveryService(zkClient);
TransactionManager txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config,
new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector());
TransactionService txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager));
TephraTransactionService service = new TephraTransactionService(zkClient, txService);
service.start();
return service;
}
示例10
@Override
protected void configure() {
/**
* ZKClientService is provided by the provider method
* {@link #provideZKClientService(org.apache.hadoop.conf.Configuration)}.
*/
bind(ZKClient.class).to(ZKClientService.class);
}
示例11
@Provides
@Singleton
private DiscoveryService providesDiscoveryService(final ZKClientService zkClient,
final ZKDiscoveryService delegate) {
return new DiscoveryService() {
@Override
public Cancellable register(Discoverable discoverable) {
if (!zkClient.isRunning()) {
zkClient.startAndWait();
}
return delegate.register(discoverable);
}
};
}
示例12
@Provides
@Singleton
private DiscoveryServiceClient providesDiscoveryServiceClient(final ZKClientService zkClient,
final ZKDiscoveryService delegate) {
return new DiscoveryServiceClient() {
@Override
public ServiceDiscovered discover(String s) {
if (!zkClient.isRunning()) {
zkClient.startAndWait();
}
return delegate.discover(s);
}
};
}
示例13
@BeforeClass
public static void beforeClass() {
zkServer = InMemoryZKServer.builder().setTickTime(100000).build();
zkServer.startAndWait();
zkClient = ZKClientServices.delegate(
ZKClients.retryOnFailure(
ZKClients.reWatchOnExpire(
ZKClientService.Builder.of(zkServer.getConnectionStr()).build()),
RetryStrategies.fixDelay(1, TimeUnit.SECONDS)));
zkClient.startAndWait();
}
示例14
@Test (timeout = 20000)
public void testMultiThreads() throws InterruptedException {
// Test the lock mechanism between multiple threads
ZKClientService zkClient = createZKClient();
try {
// Create two threads and compete for the lock
final ReentrantDistributedLock lock = new ReentrantDistributedLock(zkClient, "multiThreads");
final CountDownLatch acquired = new CountDownLatch(1);
Thread t = new Thread() {
@Override
public void run() {
lock.lock();
try {
acquired.countDown();
} finally {
lock.unlock();
}
}
};
lock.lock();
try {
t.start();
// Wait for the thread to get the lock, should fail.
Assert.assertFalse(acquired.await(1, TimeUnit.SECONDS));
} finally {
lock.unlock();
}
Assert.assertTrue(acquired.await(5, TimeUnit.SECONDS));
t.join();
} finally {
zkClient.stopAndWait();
}
}
示例15
@Test (timeout = 20000)
public void testLockInterrupt() throws InterruptedException {
// Test lock interruption on multiple threads.
ZKClientService zkClient = createZKClient();
try {
final ReentrantDistributedLock lock = new ReentrantDistributedLock(zkClient, "/interrupt");
// Create a new thread to acquire the same lock interruptibly.
lock.lock();
try {
final CountDownLatch lockAcquired = new CountDownLatch(1);
final CountDownLatch lockInterrupted = new CountDownLatch(1);
Thread t = new Thread() {
@Override
public void run() {
try {
lock.lockInterruptibly();
try {
lockAcquired.countDown();
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
lockInterrupted.countDown();
}
}
};
t.start();
t.interrupt();
Assert.assertFalse(lockAcquired.await(2, TimeUnit.SECONDS));
Assert.assertTrue(lockInterrupted.await(2, TimeUnit.SECONDS));
} finally {
lock.unlock();
}
} finally {
zkClient.stopAndWait();
}
}
示例16
/**
* Creates a {@link ZKClientService}.
*/
protected final ZKClientService createZKClient() {
TwillRuntimeSpecification twillRuntimeSpec = getTwillRuntimeSpecification();
return ZKClientServices.delegate(
ZKClients.namespace(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(
ZKClientService.Builder.of(twillRuntimeSpec.getZkConnectStr()).build(),
RetryStrategies.fixDelay(1, TimeUnit.SECONDS)
)
), "/" + twillRuntimeSpec.getTwillAppName()
)
);
}
示例17
private void doMain() throws Exception {
RunId runId = twillRuntimeSpec.getTwillAppRunId();
ZKClientService zkClientService = createZKClient();
Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
setRMSchedulerAddress(conf, twillRuntimeSpec.getRmSchedulerAddr());
final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create();
ApplicationMasterService service =
new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, conf,
createAppLocation(conf, twillRuntimeSpec.getFsUser(),
twillRuntimeSpec.getTwillAppDir()));
TrackerService trackerService = new TrackerService(service);
List<Service> prerequisites = Lists.newArrayList(
new YarnAMClientService(amClient, trackerService),
zkClientService,
new AppMasterTwillZKPathService(zkClientService, runId)
);
if (twillRuntimeSpec.isLogCollectionEnabled()) {
prerequisites.add(new ApplicationKafkaService(zkClientService, twillRuntimeSpec.getKafkaZKConnect()));
} else {
LOG.info("Log collection through kafka disabled");
}
new ApplicationMasterMain(twillRuntimeSpec)
.doMain(
service,
prerequisites.toArray(new Service[prerequisites.size()])
);
}
示例18
private ZKClientService getZKClientService(String zkConnect) {
return ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnect)
.setSessionTimeout(ZK_TIMEOUT)
.build(), RetryStrategies.exponentialDelay(100, 2000, TimeUnit.MILLISECONDS))));
}
示例19
@Override
public void start() {
Preconditions.checkNotNull(zkConnectStr);
eventConverter = new LogEventConverter(hostname, runnableName);
scheduler = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory(PUBLISH_THREAD_NAME));
zkClientService = ZKClientServices.delegate(
ZKClients.reWatchOnExpire(
ZKClients.retryOnFailure(ZKClientService.Builder.of(zkConnectStr).build(),
RetryStrategies.fixDelay(1, TimeUnit.SECONDS))));
kafkaClient = new ZKKafkaClientService(zkClientService);
Futures.addCallback(Services.chainStart(zkClientService, kafkaClient),
new FutureCallback<List<ListenableFuture<Service.State>>>() {
@Override
public void onSuccess(List<ListenableFuture<Service.State>> result) {
for (ListenableFuture<Service.State> future : result) {
Preconditions.checkState(Futures.getUnchecked(future) == Service.State.RUNNING,
"Service is not running.");
}
addInfo("Kafka client started: " + zkConnectStr);
scheduler.scheduleWithFixedDelay(flushTask, 0, flushPeriod, TimeUnit.MILLISECONDS);
}
@Override
public void onFailure(Throwable t) {
// Fail to talk to kafka. Other than logging, what can be done?
addError("Failed to start kafka appender.", t);
}
}, Threads.SAME_THREAD_EXECUTOR);
super.start();
}
示例20
@Test
public void testController() throws ExecutionException, InterruptedException, TimeoutException {
InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
zkServer.startAndWait();
LOG.info("ZKServer: " + zkServer.getConnectionStr());
try {
RunId runId = RunIds.generate();
ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
zkClientService.startAndWait();
Service service = createService(zkClientService, runId);
service.startAndWait();
TwillController controller = getController(zkClientService, "testController", runId);
controller.sendCommand(Command.Builder.of("test").build()).get(2, TimeUnit.SECONDS);
controller.terminate().get(2, TimeUnit.SECONDS);
final CountDownLatch terminateLatch = new CountDownLatch(1);
service.addListener(new ServiceListenerAdapter() {
@Override
public void terminated(Service.State from) {
terminateLatch.countDown();
}
}, Threads.SAME_THREAD_EXECUTOR);
Assert.assertTrue(service.state() == Service.State.TERMINATED || terminateLatch.await(2, TimeUnit.SECONDS));
zkClientService.stopAndWait();
} finally {
zkServer.stopAndWait();
}
}
示例21
@Test
public void testControllerBefore() throws InterruptedException, ExecutionException, TimeoutException {
InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
zkServer.startAndWait();
LOG.info("ZKServer: " + zkServer.getConnectionStr());
try {
RunId runId = RunIds.generate();
ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
zkClientService.startAndWait();
final CountDownLatch runLatch = new CountDownLatch(1);
TwillController controller = getController(zkClientService, "testControllerBefore", runId);
controller.onRunning(new Runnable() {
@Override
public void run() {
runLatch.countDown();
}
}, Threads.SAME_THREAD_EXECUTOR);
Service service = createService(zkClientService, runId);
service.start();
Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
try {
controller.awaitTerminated(2, TimeUnit.SECONDS);
Assert.fail("Service should not be terminated");
} catch (TimeoutException e) {
// Expected
}
service.stop();
controller.awaitTerminated(120, TimeUnit.SECONDS);
} finally {
zkServer.stopAndWait();
}
}
示例22
@Test
public void testControllerListener() throws InterruptedException {
InMemoryZKServer zkServer = InMemoryZKServer.builder().build();
zkServer.startAndWait();
LOG.info("ZKServer: " + zkServer.getConnectionStr());
try {
RunId runId = RunIds.generate();
ZKClientService zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
zkClientService.startAndWait();
Service service = createService(zkClientService, runId);
service.startAndWait();
final CountDownLatch runLatch = new CountDownLatch(1);
TwillController controller = getController(zkClientService, "testControllerListener", runId);
controller.onRunning(new Runnable() {
@Override
public void run() {
runLatch.countDown();
}
}, Threads.SAME_THREAD_EXECUTOR);
Assert.assertTrue(runLatch.await(2, TimeUnit.SECONDS));
service.stopAndWait();
zkClientService.stopAndWait();
} finally {
zkServer.stopAndWait();
}
}
示例23
@BeforeClass
public static void init() throws Exception {
zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
zkServer.startAndWait();
// Extract the kafka.tgz and start the kafka server
kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr()));
kafkaServer.startAndWait();
zkClientService = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
kafkaClient = new ZKKafkaClientService(zkClientService);
Services.chainStart(zkClientService, kafkaClient).get();
}
示例24
@BeforeClass
public static void setup() throws Exception {
testUtil = new HBaseTestingUtility();
Configuration conf = testUtil.getConfiguration();
conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
// Tune down the connection thread pool size
conf.setInt("hbase.hconnection.threads.core", 5);
conf.setInt("hbase.hconnection.threads.max", 10);
// Tunn down handler threads in regionserver
conf.setInt("hbase.regionserver.handler.count", 10);
// Set to random port
conf.setInt("hbase.master.port", 0);
conf.setInt("hbase.master.info.port", 0);
conf.setInt("hbase.regionserver.port", 0);
conf.setInt("hbase.regionserver.info.port", 0);
testUtil.startMiniCluster();
String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode
String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
Modules.override(new TransactionModules().getDistributedModules())
.with(new AbstractModule() {
@Override
protected void configure() {
bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
}
}),
new TransactionClientModule()
);
zkClientService = injector.getInstance(ZKClientService.class);
zkClientService.startAndWait();
// start a tx server
txService = injector.getInstance(TransactionService.class);
try {
LOG.info("Starting transaction service");
txService.startAndWait();
} catch (Exception e) {
LOG.error("Failed to start service: ", e);
throw e;
}
Tests.waitForTxReady(injector.getInstance(TransactionSystemClient.class));
}
示例25
@BeforeClass
public static void setup() throws Exception {
testUtil = new HBaseTestingUtility();
Configuration conf = testUtil.getConfiguration();
conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
// Tune down the connection thread pool size
conf.setInt("hbase.hconnection.threads.core", 5);
conf.setInt("hbase.hconnection.threads.max", 10);
// Tunn down handler threads in regionserver
conf.setInt("hbase.regionserver.handler.count", 10);
// Set to random port
conf.setInt("hbase.master.port", 0);
conf.setInt("hbase.master.info.port", 0);
conf.setInt("hbase.regionserver.port", 0);
conf.setInt("hbase.regionserver.info.port", 0);
testUtil.startMiniCluster();
String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode
String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
Modules.override(new TransactionModules().getDistributedModules())
.with(new AbstractModule() {
@Override
protected void configure() {
bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
}
}),
new TransactionClientModule()
);
zkClientService = injector.getInstance(ZKClientService.class);
zkClientService.startAndWait();
// start a tx server
txService = injector.getInstance(TransactionService.class);
try {
LOG.info("Starting transaction service");
txService.startAndWait();
} catch (Exception e) {
LOG.error("Failed to start service: ", e);
throw e;
}
Tests.waitForTxReady(injector.getInstance(TransactionSystemClient.class));
}
示例26
@BeforeClass
public static void setup() throws Exception {
testUtil = new HBaseTestingUtility();
Configuration conf = testUtil.getConfiguration();
conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
// Tune down the connection thread pool size
conf.setInt("hbase.hconnection.threads.core", 5);
conf.setInt("hbase.hconnection.threads.max", 10);
// Tunn down handler threads in regionserver
conf.setInt("hbase.regionserver.handler.count", 10);
// Set to random port
conf.setInt("hbase.master.port", 0);
conf.setInt("hbase.master.info.port", 0);
conf.setInt("hbase.regionserver.port", 0);
conf.setInt("hbase.regionserver.info.port", 0);
testUtil.startMiniCluster();
String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode
String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
Modules.override(new TransactionModules().getDistributedModules())
.with(new AbstractModule() {
@Override
protected void configure() {
bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
}
}),
new TransactionClientModule()
);
zkClientService = injector.getInstance(ZKClientService.class);
zkClientService.startAndWait();
// start a tx server
txService = injector.getInstance(TransactionService.class);
try {
LOG.info("Starting transaction service");
txService.startAndWait();
} catch (Exception e) {
LOG.error("Failed to start service: ", e);
throw e;
}
Tests.waitForTxReady(injector.getInstance(TransactionSystemClient.class));
}
示例27
@BeforeClass
public static void setup() throws Exception {
testUtil = new HBaseTestingUtility();
Configuration conf = testUtil.getConfiguration();
conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
// Tune down the connection thread pool size
conf.setInt("hbase.hconnection.threads.core", 5);
conf.setInt("hbase.hconnection.threads.max", 10);
// Tunn down handler threads in regionserver
conf.setInt("hbase.regionserver.handler.count", 10);
// Set to random port
conf.setInt("hbase.master.port", 0);
conf.setInt("hbase.master.info.port", 0);
conf.setInt("hbase.regionserver.port", 0);
conf.setInt("hbase.regionserver.info.port", 0);
testUtil.startMiniCluster();
String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode
String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
Modules.override(new TransactionModules().getDistributedModules())
.with(new AbstractModule() {
@Override
protected void configure() {
bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
}
}),
new TransactionClientModule()
);
zkClientService = injector.getInstance(ZKClientService.class);
zkClientService.startAndWait();
// start a tx server
txService = injector.getInstance(TransactionService.class);
try {
LOG.info("Starting transaction service");
txService.startAndWait();
} catch (Exception e) {
LOG.error("Failed to start service: ", e);
throw e;
}
Tests.waitForTxReady(injector.getInstance(TransactionSystemClient.class));
}
示例28
@Provides
@Singleton
private ZKDiscoveryService providesZKDiscoveryService(ZKClientService zkClient) {
return new ZKDiscoveryService(zkClient);
}
示例29
@VisibleForTesting
public static void doMain(boolean verbose, Configuration conf) throws Exception {
LOG.info("Starting tx server client test.");
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
ZKClientService zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
try {
TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class);
LOG.info("Starting tx...");
Transaction tx = client.startShort();
if (verbose) {
LOG.info("Started tx details: " + tx.toString());
} else {
LOG.info("Started tx: " + tx.getTransactionId() +
", readPointer: " + tx.getReadPointer() +
", invalids: " + tx.getInvalids().length +
", inProgress: " + tx.getInProgress().length);
}
try {
LOG.info("Checking if canCommit tx...");
client.canCommitOrThrow(tx, Collections.<byte[]>emptyList());
LOG.info("canCommit: success");
LOG.info("Committing tx...");
client.commitOrThrow(tx);
LOG.info("Committed tx: success");
} catch (TransactionConflictException e) {
LOG.info("Aborting tx...");
client.abort(tx);
LOG.info("Aborted tx...");
}
} finally {
zkClient.stopAndWait();
}
}
示例30
@BeforeClass
public static void start() throws Exception {
zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
zkServer.startAndWait();
conf = new Configuration();
conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
Modules.override(new TransactionModules().getDistributedModules())
.with(new AbstractModule() {
@Override
protected void configure() {
bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
}
}),
new TransactionClientModule()
);
zkClientService = injector.getInstance(ZKClientService.class);
zkClientService.startAndWait();
// start a tx server
txService = injector.getInstance(TransactionService.class);
txClient = injector.getInstance(TransactionSystemClient.class);
try {
LOG.info("Starting transaction service");
txService.startAndWait();
} catch (Exception e) {
LOG.error("Failed to start service: ", e);
throw e;
}
Tests.waitForTxReady(txClient);
}