Java源码示例:org.apache.flink.runtime.state.SharedStateRegistryFactory

示例1
public CheckpointCoordinator(
	JobID job,
	CheckpointCoordinatorConfiguration chkConfig,
	ExecutionVertex[] tasksToTrigger,
	ExecutionVertex[] tasksToWaitFor,
	ExecutionVertex[] tasksToCommitTo,
	Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
	CheckpointIDCounter checkpointIDCounter,
	CompletedCheckpointStore completedCheckpointStore,
	StateBackend checkpointStateBackend,
	Executor executor,
	ScheduledExecutor timer,
	SharedStateRegistryFactory sharedStateRegistryFactory,
	CheckpointFailureManager failureManager) {

	this(
		job,
		chkConfig,
		tasksToTrigger,
		tasksToWaitFor,
		tasksToCommitTo,
		coordinatorsToCheckpoint,
		checkpointIDCounter,
		completedCheckpointStore,
		checkpointStateBackend,
		executor,
		timer,
		sharedStateRegistryFactory,
		failureManager,
		SystemClock.getInstance());
}
 
示例2
public CheckpointCoordinator(
		JobID job,
		long baseInterval,
		long checkpointTimeout,
		long minPauseBetweenCheckpoints,
		int maxConcurrentCheckpointAttempts,
		CheckpointRetentionPolicy retentionPolicy,
		ExecutionVertex[] tasksToTrigger,
		ExecutionVertex[] tasksToWaitFor,
		ExecutionVertex[] tasksToCommitTo,
		CheckpointIDCounter checkpointIDCounter,
		CompletedCheckpointStore completedCheckpointStore,
		StateBackend checkpointStateBackend,
		Executor executor,
		SharedStateRegistryFactory sharedStateRegistryFactory) {

	// sanity checks
	checkNotNull(checkpointStateBackend);
	checkArgument(baseInterval > 0, "Checkpoint base interval must be larger than zero");
	checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
	checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
	checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");

	// max "in between duration" can be one year - this is to prevent numeric overflows
	if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
		minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
	}

	// it does not make sense to schedule checkpoints more often then the desired
	// time between checkpoints
	if (baseInterval < minPauseBetweenCheckpoints) {
		baseInterval = minPauseBetweenCheckpoints;
	}

	this.job = checkNotNull(job);
	this.baseInterval = baseInterval;
	this.checkpointTimeout = checkpointTimeout;
	this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
	this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
	this.tasksToTrigger = checkNotNull(tasksToTrigger);
	this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
	this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
	this.pendingCheckpoints = new LinkedHashMap<>();
	this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
	this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
	this.executor = checkNotNull(executor);
	this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
	this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);

	this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
	this.masterHooks = new HashMap<>();

	this.timer = new ScheduledThreadPoolExecutor(1,
			new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));

	// make sure the timer internally cleans up and does not hold onto stale scheduled tasks
	this.timer.setRemoveOnCancelPolicy(true);
	this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
	this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

	this.checkpointProperties = CheckpointProperties.forCheckpoint(retentionPolicy);

	try {
		this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);

		// Make sure the checkpoint ID enumerator is running. Possibly
		// issues a blocking call to ZooKeeper.
		checkpointIDCounter.start();
	} catch (Throwable t) {
		throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
	}
}
 
示例3
public CheckpointCoordinator(
		JobID job,
		CheckpointCoordinatorConfiguration chkConfig,
		ExecutionVertex[] tasksToTrigger,
		ExecutionVertex[] tasksToWaitFor,
		ExecutionVertex[] tasksToCommitTo,
		CheckpointIDCounter checkpointIDCounter,
		CompletedCheckpointStore completedCheckpointStore,
		StateBackend checkpointStateBackend,
		Executor executor,
		SharedStateRegistryFactory sharedStateRegistryFactory,
		CheckpointFailureManager failureManager) {

	// sanity checks
	checkNotNull(checkpointStateBackend);

	// max "in between duration" can be one year - this is to prevent numeric overflows
	long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
	if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
		minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
	}

	// it does not make sense to schedule checkpoints more often then the desired
	// time between checkpoints
	long baseInterval = chkConfig.getCheckpointInterval();
	if (baseInterval < minPauseBetweenCheckpoints) {
		baseInterval = minPauseBetweenCheckpoints;
	}

	this.job = checkNotNull(job);
	this.baseInterval = baseInterval;
	this.checkpointTimeout = chkConfig.getCheckpointTimeout();
	this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000;
	this.maxConcurrentCheckpointAttempts = chkConfig.getMaxConcurrentCheckpoints();
	this.tasksToTrigger = checkNotNull(tasksToTrigger);
	this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
	this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
	this.pendingCheckpoints = new LinkedHashMap<>();
	this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
	this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
	this.executor = checkNotNull(executor);
	this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
	this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
	this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
	this.failureManager = checkNotNull(failureManager);

	this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
	this.masterHooks = new HashMap<>();

	this.timer = new ScheduledThreadPoolExecutor(1,
			new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));

	// make sure the timer internally cleans up and does not hold onto stale scheduled tasks
	this.timer.setRemoveOnCancelPolicy(true);
	this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
	this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

	this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());

	try {
		this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
	} catch (IOException e) {
		throw new FlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", e);
	}

	try {
		// Make sure the checkpoint ID enumerator is running. Possibly
		// issues a blocking call to ZooKeeper.
		checkpointIDCounter.start();
	} catch (Throwable t) {
		throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
	}
}
 
示例4
@VisibleForTesting
public CheckpointCoordinator(
		JobID job,
		CheckpointCoordinatorConfiguration chkConfig,
		ExecutionVertex[] tasksToTrigger,
		ExecutionVertex[] tasksToWaitFor,
		ExecutionVertex[] tasksToCommitTo,
		Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
		CheckpointIDCounter checkpointIDCounter,
		CompletedCheckpointStore completedCheckpointStore,
		StateBackend checkpointStateBackend,
		Executor executor,
		ScheduledExecutor timer,
		SharedStateRegistryFactory sharedStateRegistryFactory,
		CheckpointFailureManager failureManager,
		Clock clock) {

	// sanity checks
	checkNotNull(checkpointStateBackend);

	// max "in between duration" can be one year - this is to prevent numeric overflows
	long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
	if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) {
		minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000;
	}

	// it does not make sense to schedule checkpoints more often then the desired
	// time between checkpoints
	long baseInterval = chkConfig.getCheckpointInterval();
	if (baseInterval < minPauseBetweenCheckpoints) {
		baseInterval = minPauseBetweenCheckpoints;
	}

	this.job = checkNotNull(job);
	this.baseInterval = baseInterval;
	this.checkpointTimeout = chkConfig.getCheckpointTimeout();
	this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
	this.tasksToTrigger = checkNotNull(tasksToTrigger);
	this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
	this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
	this.coordinatorsToCheckpoint = Collections.unmodifiableCollection(coordinatorsToCheckpoint);
	this.pendingCheckpoints = new LinkedHashMap<>();
	this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
	this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
	this.executor = checkNotNull(executor);
	this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
	this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
	this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
	this.failureManager = checkNotNull(failureManager);
	this.clock = checkNotNull(clock);
	this.isExactlyOnceMode = chkConfig.isExactlyOnce();
	this.unalignedCheckpointsEnabled = chkConfig.isUnalignedCheckpointsEnabled();

	this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
	this.masterHooks = new HashMap<>();

	this.timer = timer;

	this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());

	try {
		this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
		checkpointStorage.initializeBaseLocations();
	} catch (IOException e) {
		throw new FlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", e);
	}

	try {
		// Make sure the checkpoint ID enumerator is running. Possibly
		// issues a blocking call to ZooKeeper.
		checkpointIDCounter.start();
	} catch (Throwable t) {
		throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
	}
	this.requestDecider = new CheckpointRequestDecider(
		chkConfig.getMaxConcurrentCheckpoints(),
		this::rescheduleTrigger,
		this.clock,
		this.minPauseBetweenCheckpoints,
		this.pendingCheckpoints::size,
		this.lock);
}
 
示例5
public CheckpointCoordinatorBuilder setSharedStateRegistryFactory(
	SharedStateRegistryFactory sharedStateRegistryFactory) {
	this.sharedStateRegistryFactory = sharedStateRegistryFactory;
	return this;
}