Java源码示例:org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker
示例1
SchedulerNG createInstance(
Logger log,
JobGraph jobGraph,
BackPressureStatsTracker backPressureStatsTracker,
Executor ioExecutor,
Configuration jobMasterConfiguration,
SlotProvider slotProvider,
ScheduledExecutorService futureExecutor,
ClassLoader userCodeLoader,
CheckpointRecoveryFactory checkpointRecoveryFactory,
Time rpcTimeout,
BlobWriter blobWriter,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
Time slotRequestTimeout,
ShuffleMaster<?> shuffleMaster,
PartitionTracker partitionTracker) throws Exception;
示例2
SchedulerNG createInstance(
Logger log,
JobGraph jobGraph,
BackPressureStatsTracker backPressureStatsTracker,
Executor ioExecutor,
Configuration jobMasterConfiguration,
SlotProvider slotProvider,
ScheduledExecutorService futureExecutor,
ClassLoader userCodeLoader,
CheckpointRecoveryFactory checkpointRecoveryFactory,
Time rpcTimeout,
BlobWriter blobWriter,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
Time slotRequestTimeout,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker) throws Exception;
示例3
public JobManagerSharedServices(
ScheduledExecutorService scheduledExecutorService,
LibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
StackTraceSampleCoordinator stackTraceSampleCoordinator,
BackPressureStatsTracker backPressureStatsTracker,
@Nonnull BlobWriter blobWriter) {
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
this.stackTraceSampleCoordinator = checkNotNull(stackTraceSampleCoordinator);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
this.blobWriter = blobWriter;
}
示例4
public JobManagerSharedServices(
ScheduledExecutorService scheduledExecutorService,
LibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
StackTraceSampleCoordinator stackTraceSampleCoordinator,
BackPressureStatsTracker backPressureStatsTracker,
@Nonnull BlobWriter blobWriter) {
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
this.stackTraceSampleCoordinator = checkNotNull(stackTraceSampleCoordinator);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
this.blobWriter = blobWriter;
}
示例5
@Override
public SchedulerNG createInstance(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final PartitionTracker partitionTracker) throws Exception {
return new LegacyScheduler(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
slotProvider,
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
restartStrategyFactory,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
partitionTracker);
}
示例6
public DefaultScheduler(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final PartitionTracker partitionTracker) throws Exception {
super(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
slotProvider,
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
partitionTracker);
}
示例7
@Override
public SchedulerNG createInstance(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final PartitionTracker partitionTracker) throws Exception {
return new DefaultScheduler(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
slotProvider,
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
partitionTracker);
}
示例8
public JobManagerSharedServices(
ScheduledExecutorService scheduledExecutorService,
LibraryCacheManager libraryCacheManager,
BackPressureRequestCoordinator backPressureSampleCoordinator,
BackPressureStatsTracker backPressureStatsTracker,
@Nonnull BlobWriter blobWriter) {
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.backPressureSampleCoordinator = checkNotNull(backPressureSampleCoordinator);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
this.blobWriter = blobWriter;
}
示例9
public BackPressureStatsTracker getBackPressureStatsTracker() {
return backPressureStatsTracker;
}
示例10
public TestingJobManagerSharedServicesBuilder setBackPressureStatsTracker(BackPressureStatsTracker backPressureStatsTracker) {
this.backPressureStatsTracker = backPressureStatsTracker;
return this;
}
示例11
public BackPressureStatsTracker getBackPressureStatsTracker() {
return backPressureStatsTracker;
}
示例12
public LegacyScheduler(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final RestartStrategyFactory restartStrategyFactory,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final PartitionTracker partitionTracker) throws Exception {
this.log = checkNotNull(log);
this.jobGraph = checkNotNull(jobGraph);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
this.ioExecutor = checkNotNull(ioExecutor);
this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
this.slotProvider = checkNotNull(slotProvider);
this.futureExecutor = checkNotNull(futureExecutor);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.checkpointRecoveryFactory = checkNotNull(checkpointRecoveryFactory);
this.rpcTimeout = checkNotNull(rpcTimeout);
final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy();
this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
restartStrategyFactory,
jobGraph.isCheckpointingEnabled());
log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobGraph.getName(), jobGraph.getJobID());
this.blobWriter = checkNotNull(blobWriter);
this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
}
示例13
public TestingJobManagerSharedServicesBuilder setBackPressureStatsTracker(BackPressureStatsTracker backPressureStatsTracker) {
this.backPressureStatsTracker = backPressureStatsTracker;
return this;
}
示例14
public BackPressureStatsTracker getBackPressureStatsTracker() {
return backPressureStatsTracker;
}
示例15
DefaultScheduler(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final ScheduledExecutorService futureExecutor,
final ScheduledExecutor delayExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
final SchedulingStrategyFactory schedulingStrategyFactory,
final FailoverStrategy.Factory failoverStrategyFactory,
final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
final ExecutionVertexOperations executionVertexOperations,
final ExecutionVertexVersioner executionVertexVersioner,
final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) throws Exception {
super(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
new ThrowingSlotProvider(), // this is not used any more in the new scheduler
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
blobWriter,
jobManagerJobMetricGroup,
Time.seconds(0), // this is not used any more in the new scheduler
shuffleMaster,
partitionTracker,
executionVertexVersioner,
false);
this.log = log;
this.delayExecutor = checkNotNull(delayExecutor);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.executionVertexOperations = checkNotNull(executionVertexOperations);
final FailoverStrategy failoverStrategy = failoverStrategyFactory.create(
getSchedulingTopology(),
getResultPartitionAvailabilityChecker());
log.info("Using failover strategy {} for {} ({}).", failoverStrategy, jobGraph.getName(), jobGraph.getJobID());
this.executionFailureHandler = new ExecutionFailureHandler(
getSchedulingTopology(),
failoverStrategy,
restartBackoffTimeStrategy);
this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
this.executionSlotAllocator = checkNotNull(executionSlotAllocatorFactory).createInstance(getPreferredLocationsRetriever());
this.verticesWaitingForRestart = new HashSet<>();
}
示例16
public SchedulerBase(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final RestartStrategyFactory restartStrategyFactory,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
final ExecutionVertexVersioner executionVertexVersioner,
final boolean legacyScheduling) throws Exception {
this.log = checkNotNull(log);
this.jobGraph = checkNotNull(jobGraph);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
this.ioExecutor = checkNotNull(ioExecutor);
this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
this.slotProvider = checkNotNull(slotProvider);
this.futureExecutor = checkNotNull(futureExecutor);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.checkpointRecoveryFactory = checkNotNull(checkpointRecoveryFactory);
this.rpcTimeout = checkNotNull(rpcTimeout);
final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy();
this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
restartStrategyFactory,
jobGraph.isCheckpointingEnabled());
if (legacyScheduling) {
log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobGraph.getName(), jobGraph.getJobID());
}
this.blobWriter = checkNotNull(blobWriter);
this.jobManagerJobMetricGroup = checkNotNull(jobManagerJobMetricGroup);
this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
this.executionVertexVersioner = checkNotNull(executionVertexVersioner);
this.legacyScheduling = legacyScheduling;
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
this.schedulingTopology = executionGraph.getSchedulingTopology();
final StateLocationRetriever stateLocationRetriever =
executionVertexId -> getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();
final InputsLocationsRetriever inputsLocationsRetriever = new ExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);
this.preferredLocationsRetriever = new DefaultPreferredLocationsRetriever(stateLocationRetriever, inputsLocationsRetriever);
this.coordinatorMap = createCoordinatorMap();
}
示例17
@Override
public SchedulerNG createInstance(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final SlotProvider slotProvider,
final ScheduledExecutorService futureExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker) throws Exception {
final SchedulingStrategyFactory schedulingStrategyFactory = createSchedulingStrategyFactory(jobGraph.getScheduleMode());
final RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader
.createRestartBackoffTimeStrategyFactory(
jobGraph
.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy(),
jobMasterConfiguration,
jobGraph.isCheckpointingEnabled())
.create();
log.info("Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, jobGraph.getName(), jobGraph.getJobID());
final ExecutionSlotAllocatorFactory slotAllocatorFactory =
createExecutionSlotAllocatorFactory(
jobGraph.getScheduleMode(),
slotProvider,
slotRequestTimeout,
schedulingStrategyFactory);
return new DefaultScheduler(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
futureExecutor,
new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
shuffleMaster,
partitionTracker,
schedulingStrategyFactory,
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
restartBackoffTimeStrategy,
new DefaultExecutionVertexOperations(),
new ExecutionVertexVersioner(),
slotAllocatorFactory);
}
示例18
public TestingJobManagerSharedServicesBuilder setBackPressureStatsTracker(BackPressureStatsTracker backPressureStatsTracker) {
this.backPressureStatsTracker = backPressureStatsTracker;
return this;
}
示例19
public DefaultSchedulerBuilder setBackPressureStatsTracker(final BackPressureStatsTracker backPressureStatsTracker) {
this.backPressureStatsTracker = backPressureStatsTracker;
return this;
}