private LoggingReader initLogging(Configuration configuration) throws IOException
{
Set<File> mainFiles = Sets.newHashSet();
Set<File> archiveDirectories = Sets.newHashSet();
LoggingFactory loggingFactory = configuration.getLoggingFactory();
if ( loggingFactory instanceof DefaultLoggingFactory )
{
for ( AppenderFactory appenderFactory : ((DefaultLoggingFactory)loggingFactory).getAppenders() )
{
if ( appenderFactory instanceof FileAppenderFactory )
{
FileAppenderFactory fileAppenderFactory = (FileAppenderFactory)appenderFactory;
if ( fileAppenderFactory.getCurrentLogFilename() != null )
{
mainFiles.add(new File(fileAppenderFactory.getCurrentLogFilename()).getCanonicalFile());
}
if ( fileAppenderFactory.getArchivedLogFilenamePattern() != null )
{
File archive = new File(fileAppenderFactory.getArchivedLogFilenamePattern()).getParentFile().getCanonicalFile();
archiveDirectories.add(archive);
}
}
}
}
if ( (mainFiles.size() == 0) && (archiveDirectories.size() == 0) )
{
log.warn("No log files found in config");
}
return new LoggingReader(mainFiles, archiveDirectories);
}
public static void main(String... args) throws Exception {
final String DROPWIZARD_PROPERTY_PREFIX = "dw";
// Load the config.yaml file specified as the first argument.
ConfigurationFactory<EmoConfiguration> configFactory = new ConfigurationFactory(
EmoConfiguration.class, Validation.buildDefaultValidatorFactory().getValidator(), Jackson.newObjectMapper(), DROPWIZARD_PROPERTY_PREFIX);
EmoConfiguration configuration = configFactory.build(new File(args[0]));
int numWriterThreads = Integer.parseInt(args[1]);
int numReaderThreads = Integer.parseInt(args[2]);
String adminApiKey = configuration.getAuthorizationConfiguration().getAdminApiKey();
MetricRegistry metricRegistry = new MetricRegistry();
new LoggingFactory().configure(metricRegistry, "stress");
CuratorFramework curator = configuration.getZooKeeperConfiguration().newCurator();
curator.start();
QueueClientFactory queueFactory = QueueClientFactory.forClusterAndHttpConfiguration(
configuration.getCluster(), configuration.getHttpClientConfiguration(), metricRegistry);
AuthQueueService authQueueService = ServicePoolBuilder.create(AuthQueueService.class)
.withServiceFactory(queueFactory)
.withHostDiscovery(new ZooKeeperHostDiscovery(curator, queueFactory.getServiceName(), metricRegistry))
.withMetricRegistry(metricRegistry)
.withCachingPolicy(ServiceCachingPolicyBuilder.getMultiThreadedClientPolicy())
.buildProxy(new ExponentialBackoffRetry(5, 50, 1000, TimeUnit.MILLISECONDS));
QueueService queueService = QueueServiceAuthenticator.proxied(authQueueService)
.usingCredentials(adminApiKey);
final QueueStressTest stressTest = new QueueStressTest(queueService);
ThreadFactory writerFactory = new ThreadFactoryBuilder().setNameFormat("Writer-%d").build();
for (int i = 0; i < numWriterThreads; i++) {
writerFactory.newThread(new Runnable() {
@Override
public void run() {
stressTest.write();
}
}).start();
}
ThreadFactory readerFactory = new ThreadFactoryBuilder().setNameFormat("Reader-%d").build();
for (int i = 0; i < numReaderThreads; i++) {
readerFactory.newThread(new Runnable() {
@Override
public void run() {
stressTest.read();
}
}).start();
}
ThreadFactory reportFactory = new ThreadFactoryBuilder().setNameFormat("Report-%d").build();
Executors.newScheduledThreadPool(1, reportFactory).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
stressTest.report();
}
}, 1, 1, TimeUnit.SECONDS);
// Run forever
}
public static void main(String... args) throws Exception {
final String DROPWIZARD_PROPERTY_PREFIX = "dw";
// Load the config.yaml file specified as the first argument.
ConfigurationFactory<EmoConfiguration> configFactory = new ConfigurationFactory(
EmoConfiguration.class, Validation.buildDefaultValidatorFactory().getValidator(), Jackson.newObjectMapper(), DROPWIZARD_PROPERTY_PREFIX);
EmoConfiguration configuration = configFactory.build(new File(args[0]));
int numWriterThreads = Integer.parseInt(args[1]);
int numReaderThreads = Integer.parseInt(args[2]);
String apiKey = configuration.getAuthorizationConfiguration().getAdminApiKey();
MetricRegistry metricRegistry = new MetricRegistry();
new LoggingFactory().configure(metricRegistry, "stress");
CuratorFramework curator = configuration.getZooKeeperConfiguration().newCurator();
curator.start();
DataStoreClientFactory dataStoreFactory = DataStoreClientFactory.forClusterAndHttpConfiguration(
configuration.getCluster(), configuration.getHttpClientConfiguration(), metricRegistry);
AuthDataStore authDataStore = ServicePoolBuilder.create(AuthDataStore.class)
.withServiceFactory(dataStoreFactory)
.withHostDiscovery(new ZooKeeperHostDiscovery(curator, dataStoreFactory.getServiceName(), metricRegistry))
.withMetricRegistry(metricRegistry)
.withCachingPolicy(ServiceCachingPolicyBuilder.getMultiThreadedClientPolicy())
.buildProxy(new ExponentialBackoffRetry(5, 50, 1000, TimeUnit.MILLISECONDS));
DataStore dataStore = DataStoreAuthenticator.proxied(authDataStore).usingCredentials(apiKey);
DatabusClientFactory databusFactory = DatabusClientFactory.forClusterAndHttpConfiguration(
configuration.getCluster(), configuration.getHttpClientConfiguration(), metricRegistry);
AuthDatabus authDatabus = ServicePoolBuilder.create(AuthDatabus.class)
.withServiceFactory(databusFactory)
.withHostDiscovery(new ZooKeeperHostDiscovery(curator, databusFactory.getServiceName(), metricRegistry))
.withMetricRegistry(metricRegistry)
.withCachingPolicy(ServiceCachingPolicyBuilder.getMultiThreadedClientPolicy())
.buildProxy(new ExponentialBackoffRetry(5, 50, 1000, TimeUnit.MILLISECONDS));
Databus databus = DatabusAuthenticator.proxied(authDatabus).usingCredentials(apiKey);
final SorStressTest stressTest = new SorStressTest(dataStore, databus);
if (!dataStore.getTableExists(TABLE)) {
TableOptions options = new TableOptionsBuilder().setPlacement("ugc_global:ugc").build();
dataStore.createTable(TABLE, options, ImmutableMap.of("table", TABLE), new AuditBuilder().setLocalHost().build());
}
databus.subscribe(SUBSCRIPTION, Conditions.alwaysTrue(), Duration.ofDays(7), Duration.ofDays(1));
ThreadFactory writerFactory = new ThreadFactoryBuilder().setNameFormat("SoR Writer-%d").build();
for (int i = 0; i < numWriterThreads; i++) {
writerFactory.newThread(new Runnable() {
@Override
public void run() {
stressTest.writeDeltas();
}
}).start();
}
ThreadFactory readerFactory = new ThreadFactoryBuilder().setNameFormat("Databus Reader-%d").build();
for (int i = 0; i < numReaderThreads; i++) {
readerFactory.newThread(new Runnable() {
@Override
public void run() {
stressTest.readDatabus();
}
}).start();
}
ThreadFactory reportFactory = new ThreadFactoryBuilder().setNameFormat("Report-%d").build();
Executors.newScheduledThreadPool(1, reportFactory).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
stressTest.report();
}
}, 1, 1, TimeUnit.SECONDS);
ServicePoolProxies.close(dataStore);
Closeables.close(curator, true);
}
@Override
@JsonProperty("logging")
@Value.Default
public LoggingFactory getLoggingFactory() {
return new DefaultLoggingFactory();
}