Java源码示例:org.apache.flink.runtime.state.SharedStateRegistryFactory
示例1
public CheckpointCoordinator(
JobID job,
CheckpointCoordinatorConfiguration chkConfig,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
Executor executor,
ScheduledExecutor timer,
SharedStateRegistryFactory sharedStateRegistryFactory,
CheckpointFailureManager failureManager) {
this(
job,
chkConfig,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
coordinatorsToCheckpoint,
checkpointIDCounter,
completedCheckpointStore,
checkpointStateBackend,
executor,
timer,
sharedStateRegistryFactory,
failureManager,
SystemClock.getInstance());
}
示例2
public CheckpointCoordinator(
JobID job,
long baseInterval,
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
CheckpointRetentionPolicy retentionPolicy,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
Executor executor,
SharedStateRegistryFactory sharedStateRegistryFactory) {
// sanity checks
checkNotNull(checkpointStateBackend);
checkArgument(baseInterval > 0, "Checkpoint base interval must be larger than zero");
checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
// max "in between duration" can be one year - this is to prevent numeric overflows
if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
}
// it does not make sense to schedule checkpoints more often then the desired
// time between checkpoints
if (baseInterval < minPauseBetweenCheckpoints) {
baseInterval = minPauseBetweenCheckpoints;
}
this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.checkpointTimeout = checkpointTimeout;
this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
this.pendingCheckpoints = new LinkedHashMap<>();
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.executor = checkNotNull(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
this.timer = new ScheduledThreadPoolExecutor(1,
new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
// make sure the timer internally cleans up and does not hold onto stale scheduled tasks
this.timer.setRemoveOnCancelPolicy(true);
this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.checkpointProperties = CheckpointProperties.forCheckpoint(retentionPolicy);
try {
this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
// Make sure the checkpoint ID enumerator is running. Possibly
// issues a blocking call to ZooKeeper.
checkpointIDCounter.start();
} catch (Throwable t) {
throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
}
}
示例3
public CheckpointCoordinator(
JobID job,
CheckpointCoordinatorConfiguration chkConfig,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
Executor executor,
SharedStateRegistryFactory sharedStateRegistryFactory,
CheckpointFailureManager failureManager) {
// sanity checks
checkNotNull(checkpointStateBackend);
// max "in between duration" can be one year - this is to prevent numeric overflows
long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
}
// it does not make sense to schedule checkpoints more often then the desired
// time between checkpoints
long baseInterval = chkConfig.getCheckpointInterval();
if (baseInterval < minPauseBetweenCheckpoints) {
baseInterval = minPauseBetweenCheckpoints;
}
this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.checkpointTimeout = chkConfig.getCheckpointTimeout();
this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
this.maxConcurrentCheckpointAttempts = chkConfig.getMaxConcurrentCheckpoints();
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
this.pendingCheckpoints = new LinkedHashMap<>();
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.executor = checkNotNull(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
this.failureManager = checkNotNull(failureManager);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
this.timer = new ScheduledThreadPoolExecutor(1,
new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
// make sure the timer internally cleans up and does not hold onto stale scheduled tasks
this.timer.setRemoveOnCancelPolicy(true);
this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());
try {
this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", e);
}
try {
// Make sure the checkpoint ID enumerator is running. Possibly
// issues a blocking call to ZooKeeper.
checkpointIDCounter.start();
} catch (Throwable t) {
throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
}
}
示例4
@VisibleForTesting
public CheckpointCoordinator(
JobID job,
CheckpointCoordinatorConfiguration chkConfig,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
Executor executor,
ScheduledExecutor timer,
SharedStateRegistryFactory sharedStateRegistryFactory,
CheckpointFailureManager failureManager,
Clock clock) {
// sanity checks
checkNotNull(checkpointStateBackend);
// max "in between duration" can be one year - this is to prevent numeric overflows
long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
}
// it does not make sense to schedule checkpoints more often then the desired
// time between checkpoints
long baseInterval = chkConfig.getCheckpointInterval();
if (baseInterval < minPauseBetweenCheckpoints) {
baseInterval = minPauseBetweenCheckpoints;
}
this.job = checkNotNull(job);
this.baseInterval = baseInterval;
this.checkpointTimeout = chkConfig.getCheckpointTimeout();
this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
this.coordinatorsToCheckpoint = Collections.unmodifiableCollection(coordinatorsToCheckpoint);
this.pendingCheckpoints = new LinkedHashMap<>();
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.executor = checkNotNull(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
this.failureManager = checkNotNull(failureManager);
this.clock = checkNotNull(clock);
this.isExactlyOnceMode = chkConfig.isExactlyOnce();
this.unalignedCheckpointsEnabled = chkConfig.isUnalignedCheckpointsEnabled();
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
this.timer = timer;
this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());
try {
this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
checkpointStorage.initializeBaseLocations();
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", e);
}
try {
// Make sure the checkpoint ID enumerator is running. Possibly
// issues a blocking call to ZooKeeper.
checkpointIDCounter.start();
} catch (Throwable t) {
throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
}
this.requestDecider = new CheckpointRequestDecider(
chkConfig.getMaxConcurrentCheckpoints(),
this::rescheduleTrigger,
this.clock,
this.minPauseBetweenCheckpoints,
this.pendingCheckpoints::size,
this.lock);
}
示例5
public CheckpointCoordinatorBuilder setSharedStateRegistryFactory(
SharedStateRegistryFactory sharedStateRegistryFactory) {
this.sharedStateRegistryFactory = sharedStateRegistryFactory;
return this;
}