Java源码示例:org.apache.flink.runtime.util.ExecutorThreadFactory
示例1
public static ExecutorService createExecutorService(int numThreads, int threadPriority, String componentName) {
if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
String.format(
"The thread priority must be within (%s, %s) but it was %s.",
Thread.MIN_PRIORITY,
Thread.MAX_PRIORITY,
threadPriority));
}
return Executors.newFixedThreadPool(
numThreads,
new ExecutorThreadFactory.Builder()
.setThreadPriority(threadPriority)
.setPoolName("Flink-" + componentName)
.build());
}
示例2
public static ExecutorService createExecutorService(int numThreads, int threadPriority, String componentName) {
if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
String.format(
"The thread priority must be within (%s, %s) but it was %s.",
Thread.MIN_PRIORITY,
Thread.MAX_PRIORITY,
threadPriority));
}
return Executors.newFixedThreadPool(
numThreads,
new ExecutorThreadFactory.Builder()
.setThreadPriority(threadPriority)
.setPoolName("Flink-" + componentName)
.build());
}
示例3
/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (JdbcBatchingOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
}, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
}
}
示例4
public static ScheduledExecutorService createExecutorService(int numThreads, int threadPriority, String componentName) {
if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
String.format(
"The thread priority must be within (%s, %s) but it was %s.",
Thread.MIN_PRIORITY,
Thread.MAX_PRIORITY,
threadPriority));
}
return Executors.newScheduledThreadPool(
numThreads,
new ExecutorThreadFactory.Builder()
.setThreadPriority(threadPriority)
.setPoolName("Flink-" + componentName)
.build());
}
示例5
private RestClusterClient(
Configuration configuration,
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
ClientHighAvailabilityServices clientHAServices) throws Exception {
this.configuration = checkNotNull(configuration);
this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
if (restClient != null) {
this.restClient = restClient;
} else {
this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
}
this.waitStrategy = checkNotNull(waitStrategy);
this.clusterId = checkNotNull(clusterId);
this.clientHAServices = checkNotNull(clientHAServices);
this.webMonitorRetrievalService = clientHAServices.getClusterRestEndpointLeaderRetriever();
this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
startLeaderRetrievers();
}
示例6
public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
this.terminationFuture = new CompletableFuture<>();
final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
try {
// SSL should be the first handler in the pipeline
if (sslHandlerFactory != null) {
socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
}
socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
t.printStackTrace();
ExceptionUtils.rethrow(t);
}
}
};
NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
bootstrap = new Bootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
.group(group)
.channel(NioSocketChannel.class)
.handler(initializer);
LOG.info("Rest client endpoint started.");
}
示例7
protected void initializeServices(Configuration configuration) throws Exception {
LOG.info("Initializing cluster services.");
synchronized (lock) {
final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
final String portRange = getRPCPortRange(configuration);
commonRpcService = createRpcService(configuration, bindAddress, portRange);
// update the configuration used to create the high availability services
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor);
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = createHeartbeatServices(configuration);
metricRegistry = createMetricRegistry(configuration);
// TODO: This is a temporary hack until we have ported the MetricQueryService to the new RpcEndpoint
// Start actor system for metric query service on any available port
metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, bindAddress, LOG);
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);
archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
transientBlobCache = new TransientBlobCache(
configuration,
new InetSocketAddress(
commonRpcService.getAddress(),
blobServer.getPort()));
}
}
示例8
@VisibleForTesting
RestClusterClient(
Configuration configuration,
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
@Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception {
super(configuration);
this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
if (restClient != null) {
this.restClient = restClient;
} else {
this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
}
this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
this.clusterId = Preconditions.checkNotNull(clusterId);
if (webMonitorRetrievalService == null) {
this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever();
} else {
this.webMonitorRetrievalService = webMonitorRetrievalService;
}
this.dispatcherRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
startLeaderRetrievers();
}
示例9
@Before
public void setUp() throws Exception {
restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(restConfig);
mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
jobGraph = new JobGraph("testjob");
jobId = jobGraph.getJobID();
}
示例10
/**
* Connects to the target database and initializes the prepared statement.
*
* @param taskNumber The number of the parallel instance.
* @throws IOException Thrown, if the output could not be opened due to an
* I/O problem.
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
if (keyFields == null || keyFields.length == 0) {
String insertSQL = dialect.getInsertIntoStatement(tableName, fieldNames);
jdbcWriter = new AppendOnlyWriter(insertSQL, fieldTypes);
} else {
jdbcWriter = UpsertWriter.create(
dialect, tableName, fieldNames, fieldTypes, keyFields,
getRuntimeContext().getExecutionConfig().isObjectReuseEnabled());
}
jdbcWriter.open(connection);
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
if (flushIntervalMills != 0) {
this.scheduler = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (JDBCUpsertOutputFormat.this) {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}, flushIntervalMills, flushIntervalMills, TimeUnit.MILLISECONDS);
}
}
示例11
public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
this.terminationFuture = new CompletableFuture<>();
final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
try {
// SSL should be the first handler in the pipeline
if (sslHandlerFactory != null) {
socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler(socketChannel.alloc()));
}
socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
t.printStackTrace();
ExceptionUtils.rethrow(t);
}
}
};
NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
bootstrap = new Bootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
.group(group)
.channel(NioSocketChannel.class)
.handler(initializer);
LOG.info("Rest client endpoint started.");
}
示例12
protected void initializeServices(Configuration configuration) throws Exception {
LOG.info("Initializing cluster services.");
synchronized (lock) {
final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
final String portRange = getRPCPortRange(configuration);
commonRpcService = createRpcService(configuration, bindAddress, portRange);
// update the configuration used to create the high availability services
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor);
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = createHeartbeatServices(configuration);
metricRegistry = createMetricRegistry(configuration);
final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, bindAddress);
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
}
}
示例13
@VisibleForTesting
RestClusterClient(
Configuration configuration,
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
@Nullable LeaderRetrievalService webMonitorRetrievalService) throws Exception {
super(configuration);
this.restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(configuration);
if (restClient != null) {
this.restClient = restClient;
} else {
this.restClient = new RestClient(restClusterClientConfiguration.getRestClientConfiguration(), executorService);
}
this.waitStrategy = Preconditions.checkNotNull(waitStrategy);
this.clusterId = Preconditions.checkNotNull(clusterId);
if (webMonitorRetrievalService == null) {
this.webMonitorRetrievalService = highAvailabilityServices.getWebMonitorLeaderRetriever();
} else {
this.webMonitorRetrievalService = webMonitorRetrievalService;
}
this.retryExecutorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
startLeaderRetrievers();
}
示例14
@Before
public void setUp() throws Exception {
restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(restConfig);
mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
jobGraph = new JobGraph("testjob");
jobId = jobGraph.getJobID();
}
示例15
public CliTableauResultView(
final Terminal terminal,
final Executor sqlExecutor,
final String sessionId,
final ResultDescriptor resultDescriptor) {
this.terminal = terminal;
this.sqlExecutor = sqlExecutor;
this.sessionId = sessionId;
this.resultDescriptor = resultDescriptor;
this.displayResultExecutorService = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("CliTableauResultView"));
}
示例16
public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
this.terminationFuture = new CompletableFuture<>();
final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
try {
// SSL should be the first handler in the pipeline
if (sslHandlerFactory != null) {
socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler(socketChannel.alloc()));
}
socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
t.printStackTrace();
ExceptionUtils.rethrow(t);
}
}
};
NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
bootstrap = new Bootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
.group(group)
.channel(NioSocketChannel.class)
.handler(initializer);
LOG.debug("Rest client endpoint started.");
}
示例17
protected void initializeServices(Configuration configuration, PluginManager pluginManager) throws Exception {
LOG.info("Initializing cluster services.");
synchronized (lock) {
commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(
configuration,
configuration.getString(JobManagerOptions.ADDRESS),
getRPCPortRange(configuration),
configuration.getString(JobManagerOptions.BIND_HOST),
configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
// update the configuration used to create the high availability services
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
ioExecutor = Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor);
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = createHeartbeatServices(configuration);
metricRegistry = createMetricRegistry(configuration, pluginManager);
final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, commonRpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
final String hostname = RpcUtils.getHostname(commonRpcService);
processMetricGroup = MetricUtils.instantiateProcessMetricGroup(
metricRegistry,
hostname,
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
}
}
示例18
@Before
public void setUp() throws Exception {
restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(restConfig);
mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientTest.class.getSimpleName()));
jobGraph = new JobGraph("testjob");
jobId = jobGraph.getJobID();
}
示例19
@Override
public void start() throws Exception {
this.executorService =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
"collect-sink-operator-coordinator-executor-thread-pool"));
}
示例20
/**
* Creates a new MetricRegistry and starts the configured reporter.
*/
public MetricRegistryImpl(MetricRegistryConfiguration config) {
this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
this.scopeFormats = config.getScopeFormats();
this.globalDelimiter = config.getDelimiter();
this.delimiters = new ArrayList<>(10);
this.terminationFuture = new CompletableFuture<>();
this.isShutdown = false;
// second, instantiate any custom configured reporters
this.reporters = new ArrayList<>(4);
List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
this.queryService = null;
this.metricQueryServicePath = null;
if (reporterConfigurations.isEmpty()) {
// no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
} else {
// we have some reporters so
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
String namedReporter = reporterConfiguration.f0;
Configuration reporterConfig = reporterConfiguration.f1;
final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
if (className == null) {
LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
continue;
}
try {
String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod != null) {
try {
String[] interval = configuredPeriod.split(" ");
period = Long.parseLong(interval[0]);
timeunit = TimeUnit.valueOf(interval[1]);
}
catch (Exception e) {
LOG.error("Cannot parse report interval from config: " + configuredPeriod +
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
}
}
Class<?> reporterClass = Class.forName(className);
MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();
MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);
LOG.info("Configuring {} with {}.", namedReporter, metricConfig);
reporterInstance.open(metricConfig);
if (reporterInstance instanceof Scheduled) {
LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
executor.scheduleWithFixedDelay(
new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
} else {
LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
}
reporters.add(reporterInstance);
String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter));
if (delimiterForReporter.length() != 1) {
LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
delimiterForReporter = String.valueOf(globalDelimiter);
}
this.delimiters.add(delimiterForReporter.charAt(0));
}
catch (Throwable t) {
LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
}
}
}
}
示例21
public static JobManagerSharedServices fromConfiguration(
Configuration config,
BlobServer blobServer) throws Exception {
checkNotNull(config);
checkNotNull(blobServer);
final String classLoaderResolveOrder =
config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(config);
final BlobLibraryCacheManager libraryCacheManager =
new BlobLibraryCacheManager(
blobServer,
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
alwaysParentFirstLoaderPatterns);
final FiniteDuration timeout;
try {
timeout = AkkaUtils.getTimeout(config);
} catch (NumberFormatException e) {
throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
}
final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("jobmanager-future"));
final StackTraceSampleCoordinator stackTraceSampleCoordinator =
new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis());
final int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
final BackPressureStatsTrackerImpl backPressureStatsTracker = new BackPressureStatsTrackerImpl(
stackTraceSampleCoordinator,
cleanUpInterval,
config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));
futureExecutor.scheduleWithFixedDelay(
backPressureStatsTracker::cleanUpOperatorStatsCache,
cleanUpInterval,
cleanUpInterval,
TimeUnit.MILLISECONDS);
return new JobManagerSharedServices(
futureExecutor,
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
stackTraceSampleCoordinator,
backPressureStatsTracker,
blobServer);
}
示例22
public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
this.configuration = checkNotNull(configuration);
this.resourceId = checkNotNull(resourceId);
timeout = AkkaUtils.getTimeoutAsTime(configuration);
this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("taskmanager-future"));
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
rpcService = createRpcService(configuration, highAvailabilityServices);
metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem(configuration, rpcService.getAddress(), LOG);
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
// TODO: Temporary hack until the MetricQueryService has been ported to RpcEndpoint
metricRegistry.startQueryService(metricQueryServiceActorSystem, resourceId);
blobCacheService = new BlobCacheService(
configuration, highAvailabilityServices.createBlobStore(), null
);
taskManager = startTaskManager(
this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
this);
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
MemoryLogger.startIfConfigured(LOG, configuration, metricQueryServiceActorSystem);
}
示例23
public FileCache(String[] tempDirectories, PermanentBlobService blobService) throws IOException {
this (tempDirectories, blobService, Executors.newScheduledThreadPool(10,
new ExecutorThreadFactory("flink-file-cache")), 5000);
}
示例24
@BeforeClass
public static void setUp() throws ConfigurationException {
restServerEndpointConfiguration = RestServerEndpointConfiguration.fromConfiguration(REST_CONFIG);
executor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory(RestClusterClientSavepointTriggerTest.class.getSimpleName()));
}
示例25
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("start open ...");
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
this.helper = new HBaseReadWriteHelper(schema);
this.numPendingRequests = new AtomicLong(0);
if (null == connection) {
this.connection = ConnectionFactory.createConnection(config);
}
// create a parameter instance, set the table name and custom listener reference.
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
.listener(this)
.writeBufferSize(bufferFlushMaxSizeInBytes);
this.mutator = connection.getBufferedMutator(params);
if (bufferFlushIntervalMillis > 0) {
this.executor = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, e);
}
}, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
}
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + hTableName + " not found ", tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
LOG.info("end open.");
}
示例26
private MiniClusterClient(TestingMiniCluster miniCluster) throws ConfigurationException {
restAddress = miniCluster.getRestAddress().join();
executorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClient-IO"));
restClient = createRestClient();
}
示例27
/**
* Creates a new MetricRegistry and starts the configured reporter.
*/
public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations) {
this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
this.scopeFormats = config.getScopeFormats();
this.globalDelimiter = config.getDelimiter();
this.delimiters = new ArrayList<>(10);
this.terminationFuture = new CompletableFuture<>();
this.isShutdown = false;
// second, instantiate any custom configured reporters
this.reporters = new ArrayList<>(4);
this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
this.queryService = null;
this.metricQueryServiceRpcService = null;
if (reporterConfigurations.isEmpty()) {
// no reporters defined
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
} else {
for (ReporterSetup reporterSetup : reporterConfigurations) {
final String namedReporter = reporterSetup.getName();
try {
Optional<String> configuredPeriod = reporterSetup.getIntervalSettings();
TimeUnit timeunit = TimeUnit.SECONDS;
long period = 10;
if (configuredPeriod.isPresent()) {
try {
String[] interval = configuredPeriod.get().split(" ");
period = Long.parseLong(interval[0]);
timeunit = TimeUnit.valueOf(interval[1]);
}
catch (Exception e) {
LOG.error("Cannot parse report interval from config: " + configuredPeriod +
" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
"Using default reporting interval.");
}
}
final MetricReporter reporterInstance = reporterSetup.getReporter();
final String className = reporterInstance.getClass().getName();
if (reporterInstance instanceof Scheduled) {
LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
executor.scheduleWithFixedDelay(
new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
} else {
LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
}
reporters.add(reporterInstance);
String delimiterForReporter = reporterSetup.getDelimiter().orElse(String.valueOf(globalDelimiter));
if (delimiterForReporter.length() != 1) {
LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
delimiterForReporter = String.valueOf(globalDelimiter);
}
this.delimiters.add(delimiterForReporter.charAt(0));
}
catch (Throwable t) {
LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
}
}
}
}
示例28
public static JobManagerSharedServices fromConfiguration(
Configuration config,
BlobServer blobServer) throws Exception {
checkNotNull(config);
checkNotNull(blobServer);
final String classLoaderResolveOrder =
config.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
final String[] alwaysParentFirstLoaderPatterns = CoreOptions.getParentFirstLoaderPatterns(config);
final BlobLibraryCacheManager libraryCacheManager =
new BlobLibraryCacheManager(
blobServer,
FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
alwaysParentFirstLoaderPatterns);
final FiniteDuration timeout;
try {
timeout = AkkaUtils.getTimeout(config);
} catch (NumberFormatException e) {
throw new IllegalConfigurationException(AkkaUtils.formatDurationParsingErrorMessage());
}
final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("jobmanager-future"));
final StackTraceSampleCoordinator stackTraceSampleCoordinator =
new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis());
final int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
final BackPressureStatsTrackerImpl backPressureStatsTracker = new BackPressureStatsTrackerImpl(
stackTraceSampleCoordinator,
cleanUpInterval,
config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));
futureExecutor.scheduleWithFixedDelay(
backPressureStatsTracker::cleanUpOperatorStatsCache,
cleanUpInterval,
cleanUpInterval,
TimeUnit.MILLISECONDS);
return new JobManagerSharedServices(
futureExecutor,
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
stackTraceSampleCoordinator,
backPressureStatsTracker,
blobServer);
}
示例29
public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
this.configuration = checkNotNull(configuration);
this.resourceId = checkNotNull(resourceId);
timeout = AkkaUtils.getTimeoutAsTime(configuration);
this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("taskmanager-future"));
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
rpcService = createRpcService(configuration, highAvailabilityServices);
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
metricRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(configuration),
ReporterSetup.fromConfiguration(configuration));
final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, rpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
blobCacheService = new BlobCacheService(
configuration, highAvailabilityServices.createBlobStore(), null
);
taskManager = startTaskManager(
this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
this);
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
}
示例30
public FileCache(String[] tempDirectories, PermanentBlobService blobService) throws IOException {
this (tempDirectories, blobService, Executors.newScheduledThreadPool(10,
new ExecutorThreadFactory("flink-file-cache")), 5000);
}