Java源码示例:org.apache.flink.streaming.api.operators.AbstractStreamOperator

示例1
/**
 * Users of the test harness can call this utility method to setup the stream config
 * if there will only be a single operator to be tested. The method will setup the
 * outgoing network connection for the operator.
 *
 * <p>For more advanced test cases such as testing chains of multiple operators with the harness,
 * please manually configure the stream config.
 */
public void setupOutputForSingletonOperatorChain() {
	Preconditions.checkState(!setupCalled, "This harness was already setup.");
	setupCalled = true;
	streamConfig.setChainStart();
	streamConfig.setBufferTimeout(0);
	streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
	streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
	streamConfig.setNumberOfOutputs(1);
	streamConfig.setTypeSerializerOut(outputSerializer);
	streamConfig.setVertexID(0);
	streamConfig.setOperatorID(new OperatorID(4711L, 123L));

	StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
		private static final long serialVersionUID = 1L;
	};

	List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
	StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
	StreamNode targetVertexDummy = new StreamNode(null, 1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);

	outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));

	streamConfig.setOutEdgesInOrder(outEdgesInOrder);
	streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
 
示例2
private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
	try (MockEnvironment environment =
			new MockEnvironmentBuilder()
				.setTaskName("MockTask")
				.setMemorySize(3 * 1024 * 1024)
				.setInputSplitProvider(new MockInputSplitProvider())
				.setBufferSize(1024)
				.build()) {

		AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
		when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());

		RuntimeContext runtimeContext = new StreamingRuntimeContext(
			operator,
			environment,
			new HashMap<>());
		((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
		((RichFunction) sourceFunction).open(new Configuration());

		return runNonRichSourceFunction(sourceFunction);
	}
}
 
示例3
@Test
public void testInitializeAfterOpenning() throws Throwable {
	expectedException.expect(IllegalStateException.class);
	expectedException.expectMessage(containsString("TestHarness has already been initialized."));

	AbstractStreamOperatorTestHarness<Integer> result;
	result =
		new AbstractStreamOperatorTestHarness<>(
			new AbstractStreamOperator<Integer>() {
			},
			1,
			1,
			0);
	result.setup();
	result.open();
	result.initializeState(new OperatorSubtaskState());
}
 
示例4
/**
 * Users of the test harness can call this utility method to setup the stream config
 * if there will only be a single operator to be tested. The method will setup the
 * outgoing network connection for the operator.
 *
 * <p>For more advanced test cases such as testing chains of multiple operators with the harness,
 * please manually configure the stream config.
 */
public void setupOutputForSingletonOperatorChain() {
	Preconditions.checkState(!setupCalled, "This harness was already setup.");
	setupCalled = true;
	streamConfig.setChainStart();
	streamConfig.setBufferTimeout(0);
	streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
	streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
	streamConfig.setNumberOfOutputs(1);
	streamConfig.setTypeSerializerOut(outputSerializer);
	streamConfig.setVertexID(0);
	streamConfig.setOperatorID(new OperatorID(4711L, 123L));

	StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
		private static final long serialVersionUID = 1L;
	};

	List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
	StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
	StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);

	outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));

	streamConfig.setOutEdgesInOrder(outEdgesInOrder);
	streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
 
示例5
private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
	try (MockEnvironment environment =
			new MockEnvironmentBuilder()
				.setTaskName("MockTask")
				.setMemorySize(3 * 1024 * 1024)
				.setInputSplitProvider(new MockInputSplitProvider())
				.setBufferSize(1024)
				.build()) {

		AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
		when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());

		RuntimeContext runtimeContext = new StreamingRuntimeContext(
			operator,
			environment,
			new HashMap<>());
		((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
		((RichFunction) sourceFunction).open(new Configuration());

		return runNonRichSourceFunction(sourceFunction);
	}
}
 
示例6
@Test
public void testInitializeAfterOpenning() throws Throwable {
	expectedException.expect(IllegalStateException.class);
	expectedException.expectMessage(containsString("TestHarness has already been initialized."));

	AbstractStreamOperatorTestHarness<Integer> result;
	result =
		new AbstractStreamOperatorTestHarness<>(
			new AbstractStreamOperator<Integer>() {
			},
			1,
			1,
			0);
	result.setup();
	result.open();
	result.initializeState(new OperatorSubtaskState());
}
 
示例7
@Test
public void testClosingOperatorWithException() {
	AbstractStreamOperator streamOperator = new AbstractStreamOperator<Void>() {
		@Override
		public void close() throws Exception {
			throw new Exception("test exception at closing");
		}
	};

	StreamOperatorWrapper<?, ?> operatorWrapper = new StreamOperatorWrapper<>(
		streamOperator,
		Optional.ofNullable(streamOperator.getProcessingTimeService()),
		containingTask.getMailboxExecutorFactory().createExecutor(Integer.MAX_VALUE - 1));

	try {
		operatorWrapper.close(containingTask.getActionExecutor());
		fail("should throw an exception");
	} catch (Throwable t) {
		Optional<Throwable> optional = ExceptionUtils.findThrowableWithMessage(t, "test exception at closing");
		assertTrue(optional.isPresent());
	}
}
 
示例8
/**
 * Users of the test harness can call this utility method to setup the stream config
 * if there will only be a single operator to be tested. The method will setup the
 * outgoing network connection for the operator.
 *
 * <p>For more advanced test cases such as testing chains of multiple operators with the harness,
 * please manually configure the stream config.
 */
public void setupOutputForSingletonOperatorChain() {
	Preconditions.checkState(!setupCalled, "This harness was already setup.");
	setupCalled = true;
	streamConfig.setChainStart();
	streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
	streamConfig.setOutputSelectors(Collections.emptyList());
	streamConfig.setNumberOfOutputs(1);
	streamConfig.setTypeSerializerOut(outputSerializer);
	streamConfig.setVertexID(0);
	streamConfig.setOperatorID(new OperatorID(4711L, 123L));

	StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
		private static final long serialVersionUID = 1L;
	};

	List<StreamEdge> outEdgesInOrder = new LinkedList<>();
	StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
	StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);

	outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<>(), new BroadcastPartitioner<>(), null /* output tag */));

	streamConfig.setOutEdgesInOrder(outEdgesInOrder);
	streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
 
示例9
public MockStreamConfig(Configuration configuration, int numberOfOutputs) {
	super(configuration);

	setChainStart();
	setOutputSelectors(Collections.emptyList());
	setNumberOfOutputs(numberOfOutputs);
	setTypeSerializerOut(new StringSerializer());
	setVertexID(0);
	setStreamOperator(new TestSequentialReadingStreamOperator("test operator"));
	setOperatorID(new OperatorID());

	StreamOperator dummyOperator = new AbstractStreamOperator() {
		private static final long serialVersionUID = 1L;
	};

	StreamNode sourceVertex = new StreamNode(0, null, null, dummyOperator, "source", new ArrayList<>(), SourceStreamTask.class);
	StreamNode targetVertex = new StreamNode(1, null, null, dummyOperator, "target", new ArrayList<>(), SourceStreamTask.class);

	List<StreamEdge> outEdgesInOrder = new ArrayList<>(numberOfOutputs);
	for (int i = 0; i < numberOfOutputs; i++) {
		outEdgesInOrder.add(
			new StreamEdge(sourceVertex, targetVertex, numberOfOutputs, new ArrayList<>(), new BroadcastPartitioner<>(), null));
	}
	setOutEdgesInOrder(outEdgesInOrder);
	setNonChainedOutputs(outEdgesInOrder);
}
 
示例10
private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
	try (MockEnvironment environment =
			new MockEnvironmentBuilder()
				.setTaskName("MockTask")
				.setManagedMemorySize(3 * 1024 * 1024)
				.setInputSplitProvider(new MockInputSplitProvider())
				.setBufferSize(1024)
				.build()) {

		AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
		when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());

		RuntimeContext runtimeContext = new StreamingRuntimeContext(
			operator,
			environment,
			new HashMap<>());
		((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
		((RichFunction) sourceFunction).open(new Configuration());

		return runNonRichSourceFunction(sourceFunction);
	}
}
 
示例11
@Test
public void testInitializeAfterOpenning() throws Throwable {
	expectedException.expect(IllegalStateException.class);
	expectedException.expectMessage(containsString("TestHarness has already been initialized."));

	AbstractStreamOperatorTestHarness<Integer> result;
	result =
		new AbstractStreamOperatorTestHarness<>(
			new AbstractStreamOperator<Integer>() {
			},
			1,
			1,
			0);
	result.setup();
	result.open();
	result.initializeState(new OperatorSubtaskState());
}
 
示例12
/**
 * Calls {@link SetupableStreamOperator#setup(StreamTask, StreamConfig, Output)} ()}.
 */
public void setup(TypeSerializer<OUT> outputSerializer) {
	if (!setupCalled) {
		streamTaskStateInitializer =
			createStreamTaskStateManager(environment, stateBackend, ttlTimeProvider);
		mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer);

		if (operator == null) {
			this.operator = StreamOperatorFactoryUtil.createOperator(factory, mockTask, config,
					new MockOutput(outputSerializer), null).f0;
		} else {
			if (operator instanceof AbstractStreamOperator) {
				((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
			}
			if (operator instanceof SetupableStreamOperator) {
				((SetupableStreamOperator) operator).setup(mockTask, config, new MockOutput(outputSerializer));
			}
		}
		setupCalled = true;
		this.mockTask.init();
	}
}
 
示例13
public int numKeyedStateEntries() {
	AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
	KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
	if (keyedStateBackend instanceof HeapKeyedStateBackend) {
		return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例14
public int numKeyedStateEntries() {
	AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
	KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
	if (keyedStateBackend instanceof HeapKeyedStateBackend) {
		return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例15
public <N> int numKeyedStateEntries(N namespace) {
	AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
	KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
	if (keyedStateBackend instanceof HeapKeyedStateBackend) {
		return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries(namespace);
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例16
@VisibleForTesting
public int numProcessingTimeTimers() {
	if (operator instanceof AbstractStreamOperator) {
		return ((AbstractStreamOperator) operator).numProcessingTimeTimers();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例17
@VisibleForTesting
public int numEventTimeTimers() {
	if (operator instanceof AbstractStreamOperator) {
		return ((AbstractStreamOperator) operator).numEventTimeTimers();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例18
public int numKeyedStateEntries() {
	AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
	KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
	if (keyedStateBackend instanceof HeapKeyedStateBackend) {
		return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例19
public int numKeyedStateEntries() {
	AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
	KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
	if (keyedStateBackend instanceof HeapKeyedStateBackend) {
		return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例20
public <N> int numKeyedStateEntries(N namespace) {
	AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
	KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
	if (keyedStateBackend instanceof HeapKeyedStateBackend) {
		return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries(namespace);
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例21
@VisibleForTesting
public int numProcessingTimeTimers() {
	if (operator instanceof AbstractStreamOperator) {
		return ((AbstractStreamOperator) operator).numProcessingTimeTimers();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例22
@VisibleForTesting
public int numEventTimeTimers() {
	if (operator instanceof AbstractStreamOperator) {
		return ((AbstractStreamOperator) operator).numEventTimeTimers();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例23
@Override
protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
	inputGates = new StreamTestSingleInputGate[inputSerializers.size()];
	List<StreamEdge> inPhysicalEdges = new LinkedList<>();

	StreamOperator<?> dummyOperator = new AbstractStreamOperator<Object>() {
		private static final long serialVersionUID = 1L;
	};

	StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
	StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);

	for (int i = 0; i < inputSerializers.size(); i++) {
		TypeSerializer<?> inputSerializer = inputSerializers.get(i);
		inputGates[i] = new StreamTestSingleInputGate<>(
			inputChannelsPerGate.get(i),
			i,
			inputSerializer,
			bufferSize);

		StreamEdge streamEdge = new StreamEdge(
			sourceVertexDummy,
			targetVertexDummy,
			i + 1,
			new LinkedList<>(),
			new BroadcastPartitioner<>(),
			null);

		inPhysicalEdges.add(streamEdge);
		streamMockEnvironment.addInputGate(inputGates[i].getInputGate());
	}

	streamConfig.setInPhysicalEdges(inPhysicalEdges);
	streamConfig.setNumberOfInputs(inputGates.length);
	streamConfig.setTypeSerializersIn(inputSerializers.toArray(new TypeSerializer[inputSerializers.size()]));
}
 
示例24
@Test
public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception {
	TestTaskStateManager stateManager = new TestTaskStateManager();
	MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager(stateManager).build();
	SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
		.setEnvironment(mockEnvironment)
		.setUnalignedCheckpointEnabled(true)
		.build();

	CheckpointOperator checkpointOperator = new CheckpointOperator(new OperatorSnapshotFutures());

	final OperatorChain<String, AbstractStreamOperator<String>> operatorChain = operatorChain(checkpointOperator);

	long checkpointId = 42L;
	// notify checkpoint aborted before execution.
	subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
	assertEquals(1, subtaskCheckpointCoordinator.getAbortedCheckpointSize());

	subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
	subtaskCheckpointCoordinator.checkpointState(
		new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
		CheckpointOptions.forCheckpointWithDefaultLocation(),
		new CheckpointMetrics(),
		operatorChain,
		() -> true);
	assertFalse(checkpointOperator.isCheckpointed());
	assertEquals(-1, stateManager.getReportedCheckpointId());
	assertEquals(0, subtaskCheckpointCoordinator.getAbortedCheckpointSize());
	assertEquals(0, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
}
 
示例25
@Test
public void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception {
	MockEnvironment mockEnvironment = MockEnvironment.builder().build();
	SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
		.setEnvironment(mockEnvironment)
		.setExecutor(Executors.newSingleThreadExecutor())
		.setUnalignedCheckpointEnabled(true)
		.build();

	final BlockingRunnableFuture rawKeyedStateHandleFuture = new BlockingRunnableFuture();
	OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(
		DoneFuture.of(SnapshotResult.empty()),
		rawKeyedStateHandleFuture,
		DoneFuture.of(SnapshotResult.empty()),
		DoneFuture.of(SnapshotResult.empty()),
		DoneFuture.of(SnapshotResult.empty()),
		DoneFuture.of(SnapshotResult.empty()));

	final OperatorChain<String, AbstractStreamOperator<String>> operatorChain = operatorChain(new CheckpointOperator(operatorSnapshotResult));

	long checkpointId = 42L;
	subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
	subtaskCheckpointCoordinator.checkpointState(
		new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
		CheckpointOptions.forCheckpointWithDefaultLocation(),
		new CheckpointMetrics(),
		operatorChain,
		() -> true);
	rawKeyedStateHandleFuture.awaitRun();
	assertEquals(1, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
	assertFalse(rawKeyedStateHandleFuture.isCancelled());

	subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
	assertTrue(rawKeyedStateHandleFuture.isCancelled());
	assertEquals(0, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
}
 
示例26
/**
 * Users of the test harness can call this utility method to setup the stream config
 * if there will only be a single operator to be tested. The method will setup the
 * outgoing network connection for the operator.
 *
 * <p>For more advanced test cases such as testing chains of multiple operators with the harness,
 * please manually configure the stream config.
 */
public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(
		StreamOperatorFactory<?> factory,
		OperatorID operatorID) {
	checkState(!setupCalled, "This harness was already setup.");
	setupCalled = true;
	streamConfig.setChainStart();
	streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
	streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
	streamConfig.setNumberOfOutputs(1);
	streamConfig.setTypeSerializerOut(outputSerializer);
	streamConfig.setVertexID(0);

	StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
		private static final long serialVersionUID = 1L;
	};

	List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
	StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
	StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);

	outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));

	streamConfig.setOutEdgesInOrder(outEdgesInOrder);
	streamConfig.setNonChainedOutputs(outEdgesInOrder);

	streamConfig.setStreamOperatorFactory(factory);
	streamConfig.setOperatorID(operatorID);

	return this;
}
 
示例27
MockStreamTask(
		Environment env,
		OperatorChain<String, AbstractStreamOperator<String>> operatorChain,
		Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
	super(env, null, uncaughtExceptionHandler);
	this.overrideOperatorChain = operatorChain;
}
 
示例28
@Test
public void testSetTtlTimeProvider() throws Exception {
	AbstractStreamOperator<Integer> operator = new AbstractStreamOperator<Integer>() {};
	try (AbstractStreamOperatorTestHarness<Integer> result = new AbstractStreamOperatorTestHarness<>(
			operator,
			1,
			1,
			0)) {

		result.config.setStateKeySerializer(IntSerializer.INSTANCE);

		Time timeToLive = Time.hours(1);
		result.initializeState(new OperatorSubtaskState());
		result.open();

		ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
		stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(timeToLive).build());
		KeyedStateBackend<Integer> keyedStateBackend = operator.getKeyedStateBackend();
		ValueState<Integer> state = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);

		int expectedValue = 42;
		keyedStateBackend.setCurrentKey(1);
		result.setStateTtlProcessingTime(0L);
		state.update(expectedValue);
		Assert.assertEquals(expectedValue, (int) state.value());
		result.setStateTtlProcessingTime(timeToLive.toMilliseconds() + 1);
		Assert.assertNull(state.value());
	}
}
 
示例29
public int numKeyedStateEntries() {
	AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
	KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
	if (keyedStateBackend instanceof HeapKeyedStateBackend) {
		return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
	} else {
		throw new UnsupportedOperationException();
	}
}
 
示例30
public int numKeyedStateEntries() {
	AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
	KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
	if (keyedStateBackend instanceof HeapKeyedStateBackend) {
		return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
	} else {
		throw new UnsupportedOperationException();
	}
}