Java源码示例:org.apache.flink.streaming.api.operators.AbstractStreamOperator
示例1
/**
* Users of the test harness can call this utility method to setup the stream config
* if there will only be a single operator to be tested. The method will setup the
* outgoing network connection for the operator.
*
* <p>For more advanced test cases such as testing chains of multiple operators with the harness,
* please manually configure the stream config.
*/
public void setupOutputForSingletonOperatorChain() {
Preconditions.checkState(!setupCalled, "This harness was already setup.");
setupCalled = true;
streamConfig.setChainStart();
streamConfig.setBufferTimeout(0);
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
streamConfig.setNumberOfOutputs(1);
streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
streamConfig.setOperatorID(new OperatorID(4711L, 123L));
StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
private static final long serialVersionUID = 1L;
};
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(null, 1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
示例2
private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
try (MockEnvironment environment =
new MockEnvironmentBuilder()
.setTaskName("MockTask")
.setMemorySize(3 * 1024 * 1024)
.setInputSplitProvider(new MockInputSplitProvider())
.setBufferSize(1024)
.build()) {
AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
RuntimeContext runtimeContext = new StreamingRuntimeContext(
operator,
environment,
new HashMap<>());
((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
((RichFunction) sourceFunction).open(new Configuration());
return runNonRichSourceFunction(sourceFunction);
}
}
示例3
@Test
public void testInitializeAfterOpenning() throws Throwable {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(containsString("TestHarness has already been initialized."));
AbstractStreamOperatorTestHarness<Integer> result;
result =
new AbstractStreamOperatorTestHarness<>(
new AbstractStreamOperator<Integer>() {
},
1,
1,
0);
result.setup();
result.open();
result.initializeState(new OperatorSubtaskState());
}
示例4
/**
* Users of the test harness can call this utility method to setup the stream config
* if there will only be a single operator to be tested. The method will setup the
* outgoing network connection for the operator.
*
* <p>For more advanced test cases such as testing chains of multiple operators with the harness,
* please manually configure the stream config.
*/
public void setupOutputForSingletonOperatorChain() {
Preconditions.checkState(!setupCalled, "This harness was already setup.");
setupCalled = true;
streamConfig.setChainStart();
streamConfig.setBufferTimeout(0);
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
streamConfig.setNumberOfOutputs(1);
streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
streamConfig.setOperatorID(new OperatorID(4711L, 123L));
StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
private static final long serialVersionUID = 1L;
};
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
示例5
private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
try (MockEnvironment environment =
new MockEnvironmentBuilder()
.setTaskName("MockTask")
.setMemorySize(3 * 1024 * 1024)
.setInputSplitProvider(new MockInputSplitProvider())
.setBufferSize(1024)
.build()) {
AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
RuntimeContext runtimeContext = new StreamingRuntimeContext(
operator,
environment,
new HashMap<>());
((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
((RichFunction) sourceFunction).open(new Configuration());
return runNonRichSourceFunction(sourceFunction);
}
}
示例6
@Test
public void testInitializeAfterOpenning() throws Throwable {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(containsString("TestHarness has already been initialized."));
AbstractStreamOperatorTestHarness<Integer> result;
result =
new AbstractStreamOperatorTestHarness<>(
new AbstractStreamOperator<Integer>() {
},
1,
1,
0);
result.setup();
result.open();
result.initializeState(new OperatorSubtaskState());
}
示例7
@Test
public void testClosingOperatorWithException() {
AbstractStreamOperator streamOperator = new AbstractStreamOperator<Void>() {
@Override
public void close() throws Exception {
throw new Exception("test exception at closing");
}
};
StreamOperatorWrapper<?, ?> operatorWrapper = new StreamOperatorWrapper<>(
streamOperator,
Optional.ofNullable(streamOperator.getProcessingTimeService()),
containingTask.getMailboxExecutorFactory().createExecutor(Integer.MAX_VALUE - 1));
try {
operatorWrapper.close(containingTask.getActionExecutor());
fail("should throw an exception");
} catch (Throwable t) {
Optional<Throwable> optional = ExceptionUtils.findThrowableWithMessage(t, "test exception at closing");
assertTrue(optional.isPresent());
}
}
示例8
/**
* Users of the test harness can call this utility method to setup the stream config
* if there will only be a single operator to be tested. The method will setup the
* outgoing network connection for the operator.
*
* <p>For more advanced test cases such as testing chains of multiple operators with the harness,
* please manually configure the stream config.
*/
public void setupOutputForSingletonOperatorChain() {
Preconditions.checkState(!setupCalled, "This harness was already setup.");
setupCalled = true;
streamConfig.setChainStart();
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
streamConfig.setOutputSelectors(Collections.emptyList());
streamConfig.setNumberOfOutputs(1);
streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
streamConfig.setOperatorID(new OperatorID(4711L, 123L));
StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
private static final long serialVersionUID = 1L;
};
List<StreamEdge> outEdgesInOrder = new LinkedList<>();
StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<>(), new BroadcastPartitioner<>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
示例9
public MockStreamConfig(Configuration configuration, int numberOfOutputs) {
super(configuration);
setChainStart();
setOutputSelectors(Collections.emptyList());
setNumberOfOutputs(numberOfOutputs);
setTypeSerializerOut(new StringSerializer());
setVertexID(0);
setStreamOperator(new TestSequentialReadingStreamOperator("test operator"));
setOperatorID(new OperatorID());
StreamOperator dummyOperator = new AbstractStreamOperator() {
private static final long serialVersionUID = 1L;
};
StreamNode sourceVertex = new StreamNode(0, null, null, dummyOperator, "source", new ArrayList<>(), SourceStreamTask.class);
StreamNode targetVertex = new StreamNode(1, null, null, dummyOperator, "target", new ArrayList<>(), SourceStreamTask.class);
List<StreamEdge> outEdgesInOrder = new ArrayList<>(numberOfOutputs);
for (int i = 0; i < numberOfOutputs; i++) {
outEdgesInOrder.add(
new StreamEdge(sourceVertex, targetVertex, numberOfOutputs, new ArrayList<>(), new BroadcastPartitioner<>(), null));
}
setOutEdgesInOrder(outEdgesInOrder);
setNonChainedOutputs(outEdgesInOrder);
}
示例10
private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
try (MockEnvironment environment =
new MockEnvironmentBuilder()
.setTaskName("MockTask")
.setManagedMemorySize(3 * 1024 * 1024)
.setInputSplitProvider(new MockInputSplitProvider())
.setBufferSize(1024)
.build()) {
AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
RuntimeContext runtimeContext = new StreamingRuntimeContext(
operator,
environment,
new HashMap<>());
((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
((RichFunction) sourceFunction).open(new Configuration());
return runNonRichSourceFunction(sourceFunction);
}
}
示例11
@Test
public void testInitializeAfterOpenning() throws Throwable {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage(containsString("TestHarness has already been initialized."));
AbstractStreamOperatorTestHarness<Integer> result;
result =
new AbstractStreamOperatorTestHarness<>(
new AbstractStreamOperator<Integer>() {
},
1,
1,
0);
result.setup();
result.open();
result.initializeState(new OperatorSubtaskState());
}
示例12
/**
* Calls {@link SetupableStreamOperator#setup(StreamTask, StreamConfig, Output)} ()}.
*/
public void setup(TypeSerializer<OUT> outputSerializer) {
if (!setupCalled) {
streamTaskStateInitializer =
createStreamTaskStateManager(environment, stateBackend, ttlTimeProvider);
mockTask.setStreamTaskStateInitializer(streamTaskStateInitializer);
if (operator == null) {
this.operator = StreamOperatorFactoryUtil.createOperator(factory, mockTask, config,
new MockOutput(outputSerializer), null).f0;
} else {
if (operator instanceof AbstractStreamOperator) {
((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
}
if (operator instanceof SetupableStreamOperator) {
((SetupableStreamOperator) operator).setup(mockTask, config, new MockOutput(outputSerializer));
}
}
setupCalled = true;
this.mockTask.init();
}
}
示例13
public int numKeyedStateEntries() {
AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
if (keyedStateBackend instanceof HeapKeyedStateBackend) {
return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
} else {
throw new UnsupportedOperationException();
}
}
示例14
public int numKeyedStateEntries() {
AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
if (keyedStateBackend instanceof HeapKeyedStateBackend) {
return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
} else {
throw new UnsupportedOperationException();
}
}
示例15
public <N> int numKeyedStateEntries(N namespace) {
AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
if (keyedStateBackend instanceof HeapKeyedStateBackend) {
return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries(namespace);
} else {
throw new UnsupportedOperationException();
}
}
示例16
@VisibleForTesting
public int numProcessingTimeTimers() {
if (operator instanceof AbstractStreamOperator) {
return ((AbstractStreamOperator) operator).numProcessingTimeTimers();
} else {
throw new UnsupportedOperationException();
}
}
示例17
@VisibleForTesting
public int numEventTimeTimers() {
if (operator instanceof AbstractStreamOperator) {
return ((AbstractStreamOperator) operator).numEventTimeTimers();
} else {
throw new UnsupportedOperationException();
}
}
示例18
public int numKeyedStateEntries() {
AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
if (keyedStateBackend instanceof HeapKeyedStateBackend) {
return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
} else {
throw new UnsupportedOperationException();
}
}
示例19
public int numKeyedStateEntries() {
AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
if (keyedStateBackend instanceof HeapKeyedStateBackend) {
return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
} else {
throw new UnsupportedOperationException();
}
}
示例20
public <N> int numKeyedStateEntries(N namespace) {
AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
if (keyedStateBackend instanceof HeapKeyedStateBackend) {
return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries(namespace);
} else {
throw new UnsupportedOperationException();
}
}
示例21
@VisibleForTesting
public int numProcessingTimeTimers() {
if (operator instanceof AbstractStreamOperator) {
return ((AbstractStreamOperator) operator).numProcessingTimeTimers();
} else {
throw new UnsupportedOperationException();
}
}
示例22
@VisibleForTesting
public int numEventTimeTimers() {
if (operator instanceof AbstractStreamOperator) {
return ((AbstractStreamOperator) operator).numEventTimeTimers();
} else {
throw new UnsupportedOperationException();
}
}
示例23
@Override
protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
inputGates = new StreamTestSingleInputGate[inputSerializers.size()];
List<StreamEdge> inPhysicalEdges = new LinkedList<>();
StreamOperator<?> dummyOperator = new AbstractStreamOperator<Object>() {
private static final long serialVersionUID = 1L;
};
StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
for (int i = 0; i < inputSerializers.size(); i++) {
TypeSerializer<?> inputSerializer = inputSerializers.get(i);
inputGates[i] = new StreamTestSingleInputGate<>(
inputChannelsPerGate.get(i),
i,
inputSerializer,
bufferSize);
StreamEdge streamEdge = new StreamEdge(
sourceVertexDummy,
targetVertexDummy,
i + 1,
new LinkedList<>(),
new BroadcastPartitioner<>(),
null);
inPhysicalEdges.add(streamEdge);
streamMockEnvironment.addInputGate(inputGates[i].getInputGate());
}
streamConfig.setInPhysicalEdges(inPhysicalEdges);
streamConfig.setNumberOfInputs(inputGates.length);
streamConfig.setTypeSerializersIn(inputSerializers.toArray(new TypeSerializer[inputSerializers.size()]));
}
示例24
@Test
public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception {
TestTaskStateManager stateManager = new TestTaskStateManager();
MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager(stateManager).build();
SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
.setEnvironment(mockEnvironment)
.setUnalignedCheckpointEnabled(true)
.build();
CheckpointOperator checkpointOperator = new CheckpointOperator(new OperatorSnapshotFutures());
final OperatorChain<String, AbstractStreamOperator<String>> operatorChain = operatorChain(checkpointOperator);
long checkpointId = 42L;
// notify checkpoint aborted before execution.
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
assertEquals(1, subtaskCheckpointCoordinator.getAbortedCheckpointSize());
subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
subtaskCheckpointCoordinator.checkpointState(
new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
CheckpointOptions.forCheckpointWithDefaultLocation(),
new CheckpointMetrics(),
operatorChain,
() -> true);
assertFalse(checkpointOperator.isCheckpointed());
assertEquals(-1, stateManager.getReportedCheckpointId());
assertEquals(0, subtaskCheckpointCoordinator.getAbortedCheckpointSize());
assertEquals(0, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
}
示例25
@Test
public void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception {
MockEnvironment mockEnvironment = MockEnvironment.builder().build();
SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()
.setEnvironment(mockEnvironment)
.setExecutor(Executors.newSingleThreadExecutor())
.setUnalignedCheckpointEnabled(true)
.build();
final BlockingRunnableFuture rawKeyedStateHandleFuture = new BlockingRunnableFuture();
OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures(
DoneFuture.of(SnapshotResult.empty()),
rawKeyedStateHandleFuture,
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(SnapshotResult.empty()),
DoneFuture.of(SnapshotResult.empty()));
final OperatorChain<String, AbstractStreamOperator<String>> operatorChain = operatorChain(new CheckpointOperator(operatorSnapshotResult));
long checkpointId = 42L;
subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
subtaskCheckpointCoordinator.checkpointState(
new CheckpointMetaData(checkpointId, System.currentTimeMillis()),
CheckpointOptions.forCheckpointWithDefaultLocation(),
new CheckpointMetrics(),
operatorChain,
() -> true);
rawKeyedStateHandleFuture.awaitRun();
assertEquals(1, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
assertFalse(rawKeyedStateHandleFuture.isCancelled());
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
assertTrue(rawKeyedStateHandleFuture.isCancelled());
assertEquals(0, subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
}
示例26
/**
* Users of the test harness can call this utility method to setup the stream config
* if there will only be a single operator to be tested. The method will setup the
* outgoing network connection for the operator.
*
* <p>For more advanced test cases such as testing chains of multiple operators with the harness,
* please manually configure the stream config.
*/
public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(
StreamOperatorFactory<?> factory,
OperatorID operatorID) {
checkState(!setupCalled, "This harness was already setup.");
setupCalled = true;
streamConfig.setChainStart();
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
streamConfig.setNumberOfOutputs(1);
streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
private static final long serialVersionUID = 1L;
};
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
streamConfig.setStreamOperatorFactory(factory);
streamConfig.setOperatorID(operatorID);
return this;
}
示例27
MockStreamTask(
Environment env,
OperatorChain<String, AbstractStreamOperator<String>> operatorChain,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
super(env, null, uncaughtExceptionHandler);
this.overrideOperatorChain = operatorChain;
}
示例28
@Test
public void testSetTtlTimeProvider() throws Exception {
AbstractStreamOperator<Integer> operator = new AbstractStreamOperator<Integer>() {};
try (AbstractStreamOperatorTestHarness<Integer> result = new AbstractStreamOperatorTestHarness<>(
operator,
1,
1,
0)) {
result.config.setStateKeySerializer(IntSerializer.INSTANCE);
Time timeToLive = Time.hours(1);
result.initializeState(new OperatorSubtaskState());
result.open();
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(timeToLive).build());
KeyedStateBackend<Integer> keyedStateBackend = operator.getKeyedStateBackend();
ValueState<Integer> state = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
int expectedValue = 42;
keyedStateBackend.setCurrentKey(1);
result.setStateTtlProcessingTime(0L);
state.update(expectedValue);
Assert.assertEquals(expectedValue, (int) state.value());
result.setStateTtlProcessingTime(timeToLive.toMilliseconds() + 1);
Assert.assertNull(state.value());
}
}
示例29
public int numKeyedStateEntries() {
AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
if (keyedStateBackend instanceof HeapKeyedStateBackend) {
return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
} else {
throw new UnsupportedOperationException();
}
}
示例30
public int numKeyedStateEntries() {
AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator<?>) operator;
KeyedStateBackend<Object> keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
if (keyedStateBackend instanceof HeapKeyedStateBackend) {
return ((HeapKeyedStateBackend) keyedStateBackend).numKeyValueStateEntries();
} else {
throw new UnsupportedOperationException();
}
}