Java源码示例:org.apache.flink.streaming.runtime.tasks.ProcessingTimeService

示例1
TestFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
		SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval,
		OneShotLatch fetchLoopWaitLatch,
		OneShotLatch stateIterationBlockLatch) throws Exception {

	super(
		sourceContext,
		assignedPartitionsWithStartOffsets,
		watermarkStrategy,
		processingTimeProvider,
		autoWatermarkInterval,
		TestFetcher.class.getClassLoader(),
		new UnregisteredMetricsGroup(),
		false);

	this.fetchLoopWaitLatch = fetchLoopWaitLatch;
	this.stateIterationBlockLatch = stateIterationBlockLatch;
}
 
示例2
public TestingSourceOperator(
		SourceReader<T, MockSourceSplit> reader,
		WatermarkStrategy<T> watermarkStrategy,
		ProcessingTimeService timeService,
		OperatorEventGateway eventGateway,
		int subtaskIndex,
		int parallelism) {

	super(
		(context) -> reader,
		eventGateway,
		new MockSourceSplitSerializer(),
		watermarkStrategy,
		timeService);

	this.subtaskIndex = subtaskIndex;
	this.parallelism = parallelism;
	this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
 
示例3
/**
 * Create a watermark context.
 *
 * @param timeService the time service to schedule idleness detection tasks
 * @param checkpointLock the checkpoint lock
 * @param streamStatusMaintainer the stream status maintainer to toggle and retrieve current status
 * @param idleTimeout (-1 if idleness checking is disabled)
 */
public WatermarkContext(
		final ProcessingTimeService timeService,
		final Object checkpointLock,
		final StreamStatusMaintainer streamStatusMaintainer,
		final long idleTimeout) {

	this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
	this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "Checkpoint Lock cannot be null.");
	this.streamStatusMaintainer = Preconditions.checkNotNull(streamStatusMaintainer, "Stream Status Maintainer cannot be null.");

	if (idleTimeout != -1) {
		Preconditions.checkArgument(idleTimeout >= 1, "The idle timeout cannot be smaller than 1 ms.");
	}
	this.idleTimeout = idleTimeout;

	scheduleNextIdleDetectionTask();
}
 
示例4
TestFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval) throws Exception {

	this(
		sourceContext,
		assignedPartitionsWithStartOffsets,
		watermarksPeriodic,
		watermarksPunctuated,
		processingTimeProvider,
		autoWatermarkInterval,
		null,
		null);
}
 
示例5
TestFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval,
		OneShotLatch fetchLoopWaitLatch,
		OneShotLatch stateIterationBlockLatch) throws Exception {

	super(
		sourceContext,
		assignedPartitionsWithStartOffsets,
		watermarksPeriodic,
		watermarksPunctuated,
		processingTimeProvider,
		autoWatermarkInterval,
		TestFetcher.class.getClassLoader(),
		new UnregisteredMetricsGroup(),
		false);

	this.fetchLoopWaitLatch = fetchLoopWaitLatch;
	this.stateIterationBlockLatch = stateIterationBlockLatch;
}
 
示例6
public AsyncWaitOperator(
		@Nonnull AsyncFunction<IN, OUT> asyncFunction,
		long timeout,
		int capacity,
		@Nonnull AsyncDataStream.OutputMode outputMode,
		@Nonnull ProcessingTimeService processingTimeService,
		@Nonnull MailboxExecutor mailboxExecutor) {
	super(asyncFunction);

	setChainingStrategy(ChainingStrategy.ALWAYS);

	Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
	this.capacity = capacity;

	this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

	this.timeout = timeout;

	this.processingTimeService = Preconditions.checkNotNull(processingTimeService);

	this.mailboxExecutor = mailboxExecutor;
}
 
示例7
private static InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService(
		Triggerable<Integer, String> triggerable,
		KeyContext keyContext,
		ProcessingTimeService processingTimeService,
		KeyGroupRange keyGroupList,
		PriorityQueueSetFactory priorityQueueSetFactory) {
	InternalTimerServiceImpl<Integer, String> service = createInternalTimerService(
		keyGroupList,
		keyContext,
		processingTimeService,
		IntSerializer.INSTANCE,
		StringSerializer.INSTANCE,
		priorityQueueSetFactory);

	service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, triggerable);
	return service;
}
 
示例8
private AutomaticWatermarkContext(
		final Output<StreamRecord<T>> output,
		final long watermarkInterval,
		final ProcessingTimeService timeService,
		final Object checkpointLock,
		final StreamStatusMaintainer streamStatusMaintainer,
		final long idleTimeout) {

	super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);

	this.output = Preconditions.checkNotNull(output, "The output cannot be null.");

	Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms.");
	this.watermarkInterval = watermarkInterval;

	this.reuse = new StreamRecord<>(null);

	this.lastRecordTime = Long.MIN_VALUE;

	long now = this.timeService.getCurrentProcessingTime();
	this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
		new WatermarkEmittingTask(this.timeService, checkpointLock, output));
}
 
示例9
private AutomaticWatermarkContext(
		final Output<StreamRecord<T>> output,
		final long watermarkInterval,
		final ProcessingTimeService timeService,
		final Object checkpointLock,
		final StreamStatusMaintainer streamStatusMaintainer,
		final long idleTimeout) {

	super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);

	this.output = Preconditions.checkNotNull(output, "The output cannot be null.");

	Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms.");
	this.watermarkInterval = watermarkInterval;

	this.reuse = new StreamRecord<>(null);

	this.lastRecordTime = Long.MIN_VALUE;

	long now = this.timeService.getCurrentProcessingTime();
	this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
		new WatermarkEmittingTask(this.timeService, checkpointLock, output));
}
 
示例10
/**
 * Create a watermark context.
 *
 * @param timeService the time service to schedule idleness detection tasks
 * @param checkpointLock the checkpoint lock
 * @param streamStatusMaintainer the stream status maintainer to toggle and retrieve current status
 * @param idleTimeout (-1 if idleness checking is disabled)
 */
public WatermarkContext(
		final ProcessingTimeService timeService,
		final Object checkpointLock,
		final StreamStatusMaintainer streamStatusMaintainer,
		final long idleTimeout) {

	this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
	this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "Checkpoint Lock cannot be null.");
	this.streamStatusMaintainer = Preconditions.checkNotNull(streamStatusMaintainer, "Stream Status Maintainer cannot be null.");

	if (idleTimeout != -1) {
		Preconditions.checkArgument(idleTimeout >= 1, "The idle timeout cannot be smaller than 1 ms.");
	}
	this.idleTimeout = idleTimeout;

	scheduleNextIdleDetectionTask();
}
 
示例11
InternalTimerServiceImpl(
	KeyGroupRange localKeyGroupRange,
	KeyContext keyContext,
	ProcessingTimeService processingTimeService,
	KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
	KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {

	this.keyContext = checkNotNull(keyContext);
	this.processingTimeService = checkNotNull(processingTimeService);
	this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
	this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
	this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);

	// find the starting index of the local key-group range
	int startIdx = Integer.MAX_VALUE;
	for (Integer keyGroupIdx : localKeyGroupRange) {
		startIdx = Math.min(keyGroupIdx, startIdx);
	}
	this.localKeyGroupRangeStartIdx = startIdx;
}
 
示例12
private static <K, N> InternalTimerServiceImpl<K, N> createInternalTimerService(
	KeyGroupRange keyGroupsList,
	KeyContext keyContext,
	ProcessingTimeService processingTimeService,
	TypeSerializer<K> keySerializer,
	TypeSerializer<N> namespaceSerializer,
	PriorityQueueSetFactory priorityQueueSetFactory) {

	TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);

	return new InternalTimerServiceImpl<>(
		keyGroupsList,
		keyContext,
		processingTimeService,
		createTimerQueue("__test_processing_timers", timerSerializer, priorityQueueSetFactory),
		createTimerQueue("__test_event_timers", timerSerializer, priorityQueueSetFactory));
}
 
示例13
static PartitionCommitTrigger create(
		boolean isRestored,
		OperatorStateStore stateStore,
		Configuration conf,
		ClassLoader cl,
		List<String> partitionKeys,
		ProcessingTimeService procTimeService) throws Exception {
	String trigger = conf.get(SINK_PARTITION_COMMIT_TRIGGER);
	switch (trigger) {
		case PARTITION_TIME:
			return new PartitionTimeCommitTigger(
					isRestored, stateStore, conf, cl, partitionKeys);
		case PROCESS_TIME:
			return new ProcTimeCommitTigger(
					isRestored, stateStore, conf, procTimeService);
		default:
			throw new UnsupportedOperationException(
					"Unsupported partition commit trigger: " + trigger);
	}
}
 
示例14
InternalTimerServiceImpl(
	KeyGroupRange localKeyGroupRange,
	KeyContext keyContext,
	ProcessingTimeService processingTimeService,
	KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
	KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {

	this.keyContext = checkNotNull(keyContext);
	this.processingTimeService = checkNotNull(processingTimeService);
	this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
	this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
	this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);

	// find the starting index of the local key-group range
	int startIdx = Integer.MAX_VALUE;
	for (Integer keyGroupIdx : localKeyGroupRange) {
		startIdx = Math.min(keyGroupIdx, startIdx);
	}
	this.localKeyGroupRangeStartIdx = startIdx;
}
 
示例15
public PulsarRowFetcher(
        SourceFunction.SourceContext<Row> sourceContext,
        Map<String, MessageId> seedTopicsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<Row>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<Row>> watermarksPunctuated,
        ProcessingTimeService processingTimeProvider,
        long autoWatermarkInterval,
        ClassLoader userCodeClassLoader,
        StreamingRuntimeContext runtimeContext,
        ClientConfigurationData clientConf,
        Map<String, Object> readerConf,
        int pollTimeoutMs,
        DeserializationSchema<Row> deserializer,
        PulsarMetadataReader metadataReader) throws Exception {

    super(sourceContext, seedTopicsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, runtimeContext, clientConf, readerConf, pollTimeoutMs, deserializer, metadataReader);
}
 
示例16
TestFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
		SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval) throws Exception {
	super(
			sourceContext,
			assignedPartitionsWithStartOffsets,
			watermarkStrategy,
			processingTimeProvider,
			autoWatermarkInterval,
			TestFetcher.class.getClassLoader(),
			new UnregisteredMetricsGroup(),
			false);
}
 
示例17
private static <T> SourceOperator<T, MockSourceSplit> createTestOperator(
		SourceReader<T, MockSourceSplit> reader,
		WatermarkStrategy<T> watermarkStrategy,
		ProcessingTimeService timeService) throws Exception {

	final OperatorStateStore operatorStateStore =
			new MemoryStateBackend().createOperatorStateBackend(
					new MockEnvironmentBuilder().build(),
					"test-operator",
					Collections.emptyList(),
					new CloseableRegistry());

	final StateInitializationContext stateContext = new StateInitializationContextImpl(
		false, operatorStateStore, null, null, null);

	final SourceOperator<T, MockSourceSplit> sourceOperator =
			new TestingSourceOperator<>(reader, watermarkStrategy, timeService);
	sourceOperator.initializeState(stateContext);
	sourceOperator.open();

	return sourceOperator;
}
 
示例18
public TestingFetcher(
        SourceFunction.SourceContext<T> sourceContext,
        Map<String, MessageId> seedTopicsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        ProcessingTimeService processingTimeProvider,
        long autoWatermarkInterval) throws Exception {
    super(
            sourceContext,
            seedTopicsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            processingTimeProvider,
            autoWatermarkInterval,
            TestingFetcher.class.getClassLoader(),
            null,
            null,
            null,
            0,
            null,
            null);
}
 
示例19
@Override
protected void init() throws Exception {
	Preconditions.checkState(
		operatorChain.getNumberOfOperators() == 1,
		"BoundedStreamTask's should only run a single operator");

	// re-initialize the operator with the correct collector.
	StreamOperatorFactory<OUT> operatorFactory = configuration.getStreamOperatorFactory(getUserCodeClassLoader());
	Tuple2<OP, Optional<ProcessingTimeService>> headOperatorAndTimeService = StreamOperatorFactoryUtil.createOperator(
			operatorFactory,
			this,
			configuration,
			new CollectorWrapper<>(collector),
			operatorChain.getOperatorEventDispatcher());
	headOperator = headOperatorAndTimeService.f0;
	headOperator.initializeState(createStreamTaskStateInitializer());
	headOperator.open();
}
 
示例20
private static <K, N> InternalTimerServiceImpl<K, N> createInternalTimerService(
	KeyGroupRange keyGroupsList,
	KeyContext keyContext,
	ProcessingTimeService processingTimeService,
	TypeSerializer<K> keySerializer,
	TypeSerializer<N> namespaceSerializer,
	PriorityQueueSetFactory priorityQueueSetFactory) {

	TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);

	return new InternalTimerServiceImpl<>(
		keyGroupsList,
		keyContext,
		processingTimeService,
		createTimerQueue("__test_processing_timers", timerSerializer, priorityQueueSetFactory),
		createTimerQueue("__test_event_timers", timerSerializer, priorityQueueSetFactory));
}
 
示例21
TestFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
		SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
		SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval,
		OneShotLatch fetchLoopWaitLatch,
		OneShotLatch stateIterationBlockLatch) throws Exception {

	super(
		sourceContext,
		assignedPartitionsWithStartOffsets,
		watermarksPeriodic,
		watermarksPunctuated,
		processingTimeProvider,
		autoWatermarkInterval,
		TestFetcher.class.getClassLoader(),
		new UnregisteredMetricsGroup(),
		false);

	this.fetchLoopWaitLatch = fetchLoopWaitLatch;
	this.stateIterationBlockLatch = stateIterationBlockLatch;
}
 
示例22
protected TestingFetcher(
		SourceFunction.SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> seedPartitionsWithInitialOffsets,
		SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval,
		ClassLoader userCodeClassLoader,
		MetricGroup consumerMetricGroup,
		boolean useMetrics) throws Exception {
	super(
			sourceContext,
			seedPartitionsWithInitialOffsets,
			watermarkStrategy,
			processingTimeProvider,
			autoWatermarkInterval,
			userCodeClassLoader,
			consumerMetricGroup,
			useMetrics);
}
 
示例23
@SuppressWarnings("FutureReturnValueIgnored")
private void finishBundleCallback() {
  minEventTimeTimerTimestampInLastBundle = minEventTimeTimerTimestampInCurrentBundle;
  minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
  try {
    if (!closed
        && minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE
        && minEventTimeTimerTimestampInLastBundle <= getEffectiveInputWatermark()) {
      ProcessingTimeService processingTimeService = getProcessingTimeService();
      // We are scheduling a timer for advancing the watermark, to not delay finishing the bundle
      // and temporarily release the checkpoint lock. Otherwise, we could potentially loop when a
      // timer keeps scheduling a timer for the same timestamp.
      processingTimeService.registerTimer(
          processingTimeService.getCurrentProcessingTime(),
          ts -> processWatermark1(new Watermark(getEffectiveInputWatermark())));
    } else {
      processWatermark1(new Watermark(getEffectiveInputWatermark()));
    }
  } catch (Exception e) {
    throw new RuntimeException(
        "Failed to progress watermark to " + getEffectiveInputWatermark(), e);
  }
}
 
示例24
/**
 * This is a utility method to conjure up a "SplitT" generics variable binding so that we can
 * construct the SourceOperator without resorting to "all raw types".
 * That way, this methods puts all "type non-safety" in one place and allows to maintain as much
 * generics safety in the main code as possible.
 */
@SuppressWarnings("unchecked")
private static <T, SplitT extends SourceSplit> SourceOperator<T, SplitT> instantiateSourceOperator(
		Function<SourceReaderContext, SourceReader<T, ?>> readerFactory,
		OperatorEventGateway eventGateway,
		SimpleVersionedSerializer<?> splitSerializer,
		WatermarkStrategy<T> watermarkStrategy,
		ProcessingTimeService timeService) {

	// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
	final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
			(Function<SourceReaderContext, SourceReader<T, SplitT>>) (Function<?, ?>) readerFactory;

	final SimpleVersionedSerializer<SplitT> typedSplitSerializer = (SimpleVersionedSerializer<SplitT>) splitSerializer;

	return new SourceOperator<>(
			typedReaderFactory,
			eventGateway,
			typedSplitSerializer,
			watermarkStrategy,
			timeService);
}
 
示例25
TestFetcher(
		SourceContext<T> sourceContext,
		Map<KafkaTopicPartition, Long> assignedPartitionsWithStartOffsets,
		SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
		ProcessingTimeService processingTimeProvider,
		long autoWatermarkInterval) throws Exception {

	this(
		sourceContext,
		assignedPartitionsWithStartOffsets,
		watermarkStrategy,
		processingTimeProvider,
		autoWatermarkInterval,
		null,
		null);
}
 
示例26
public StreamingFileSinkHelper(
		Buckets<IN, ?> buckets,
		boolean isRestored,
		OperatorStateStore stateStore,
		ProcessingTimeService procTimeService,
		long bucketCheckInterval) throws Exception {
	this.bucketCheckInterval = bucketCheckInterval;
	this.buckets = buckets;
	this.bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
	this.maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
	this.procTimeService = procTimeService;

	if (isRestored) {
		buckets.initializeState(bucketStates, maxPartCountersState);
	}

	long currentProcessingTime = procTimeService.getCurrentProcessingTime();
	procTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, this);
}
 
示例27
private AutomaticWatermarkContext(
		final Output<StreamRecord<T>> output,
		final long watermarkInterval,
		final ProcessingTimeService timeService,
		final Object checkpointLock,
		final StreamStatusMaintainer streamStatusMaintainer,
		final long idleTimeout) {

	super(timeService, checkpointLock, streamStatusMaintainer, idleTimeout);

	this.output = Preconditions.checkNotNull(output, "The output cannot be null.");

	Preconditions.checkArgument(watermarkInterval >= 1L, "The watermark interval cannot be smaller than 1 ms.");
	this.watermarkInterval = watermarkInterval;

	this.reuse = new StreamRecord<>(null);

	this.lastRecordTime = Long.MIN_VALUE;

	long now = this.timeService.getCurrentProcessingTime();
	this.nextWatermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
		new WatermarkEmittingTask(this.timeService, checkpointLock, output));
}
 
示例28
/**
 * Create a watermark context.
 *
 * @param timeService the time service to schedule idleness detection tasks
 * @param checkpointLock the checkpoint lock
 * @param streamStatusMaintainer the stream status maintainer to toggle and retrieve current status
 * @param idleTimeout (-1 if idleness checking is disabled)
 */
public WatermarkContext(
		final ProcessingTimeService timeService,
		final Object checkpointLock,
		final StreamStatusMaintainer streamStatusMaintainer,
		final long idleTimeout) {

	this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
	this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "Checkpoint Lock cannot be null.");
	this.streamStatusMaintainer = Preconditions.checkNotNull(streamStatusMaintainer, "Stream Status Maintainer cannot be null.");

	if (idleTimeout != -1) {
		Preconditions.checkArgument(idleTimeout >= 1, "The idle timeout cannot be smaller than 1 ms.");
	}
	this.idleTimeout = idleTimeout;

	scheduleNextIdleDetectionTask();
}
 
示例29
InternalTimerServiceImpl(
	KeyGroupRange localKeyGroupRange,
	KeyContext keyContext,
	ProcessingTimeService processingTimeService,
	KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
	KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {

	this.keyContext = checkNotNull(keyContext);
	this.processingTimeService = checkNotNull(processingTimeService);
	this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
	this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
	this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);

	// find the starting index of the local key-group range
	int startIdx = Integer.MAX_VALUE;
	for (Integer keyGroupIdx : localKeyGroupRange) {
		startIdx = Math.min(keyGroupIdx, startIdx);
	}
	this.localKeyGroupRangeStartIdx = startIdx;
}
 
示例30
private static <K, N> InternalTimerServiceImpl<K, N> createInternalTimerService(
	KeyGroupRange keyGroupsList,
	KeyContext keyContext,
	ProcessingTimeService processingTimeService,
	TypeSerializer<K> keySerializer,
	TypeSerializer<N> namespaceSerializer,
	PriorityQueueSetFactory priorityQueueSetFactory) {

	TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);

	return new InternalTimerServiceImpl<>(
		keyGroupsList,
		keyContext,
		processingTimeService,
		createTimerQueue("__test_processing_timers", timerSerializer, priorityQueueSetFactory),
		createTimerQueue("__test_event_timers", timerSerializer, priorityQueueSetFactory));
}