Java源码示例:org.apache.flink.runtime.state.VoidNamespace
示例1
private void migrateOldState() throws Exception {
getKeyedStateBackend().applyToAllKeys(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>(
"nfaOperatorStateName",
new NFA.NFASerializer<>(inputSerializer)
),
new KeyedStateFunction<Object, ValueState<MigratedNFA<IN>>>() {
@Override
public void process(Object key, ValueState<MigratedNFA<IN>> state) throws Exception {
MigratedNFA<IN> oldState = state.value();
computationStates.update(new NFAState(oldState.getComputationStates()));
org.apache.flink.cep.nfa.SharedBuffer<IN> sharedBuffer = oldState.getSharedBuffer();
partialMatches.init(sharedBuffer.getEventsBuffer(), sharedBuffer.getPages());
state.clear();
}
}
);
}
示例2
@Nonnull
private List<Integer> readInputSplit(KeyGroupRangeInputSplit split, KeyedStateReaderFunction<Integer, Integer> userFunction) throws IOException {
KeyedStateInputFormat<Integer, VoidNamespace, Integer> format = new KeyedStateInputFormat<>(
new OperatorState(OperatorIDGenerator.fromUid("uid"), 1, 4),
new MemoryStateBackend(),
new Configuration(),
new KeyedStateReaderOperator<>(userFunction, Types.INT));
List<Integer> data = new ArrayList<>();
format.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0));
format.openInputFormat();
format.open(split);
while (!format.reachedEnd()) {
data.add(format.nextRecord(0));
}
format.close();
format.closeInputFormat();
data.sort(Comparator.comparingInt(id -> id));
return data;
}
示例3
static void fireExpiredAsyncOperations(
MapStateDescriptor<Long, Message> asyncOperationStateDescriptor,
Reductions reductions,
KeyedStateBackend<String> keyedStateBackend)
throws Exception {
AsyncOperationFailureNotifier asyncOperationFailureNotifier =
new AsyncOperationFailureNotifier(reductions);
keyedStateBackend.applyToAllKeys(
VoidNamespace.get(),
VoidNamespaceSerializer.INSTANCE,
asyncOperationStateDescriptor,
asyncOperationFailureNotifier);
if (asyncOperationFailureNotifier.enqueued()) {
reductions.processEnvelopes();
}
}
示例4
static void fireExpiredAsyncOperations(
MapStateDescriptor<Long, Message> asyncOperationStateDescriptor,
Reductions reductions,
MapState<Long, Message> asyncOperationState,
KeyedStateBackend<String> keyedStateBackend)
throws Exception {
AsyncOperationFailureNotifier asyncOperationFailureNotifier =
new AsyncOperationFailureNotifier(reductions, asyncOperationState);
keyedStateBackend.applyToAllKeys(
VoidNamespace.get(),
VoidNamespaceSerializer.INSTANCE,
asyncOperationStateDescriptor,
asyncOperationFailureNotifier);
if (asyncOperationFailureNotifier.enqueued()) {
reductions.processEnvelopes();
}
}
示例5
@Override
public void open() throws Exception {
super.open();
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
collector = new TimestampedCollector<>(output);
this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
for (MapStateDescriptor<?, ?> descriptor: broadcastStateDescriptors) {
broadcastStates.put(descriptor, getOperatorStateBackend().getBroadcastState(descriptor));
}
rwContext = new ReadWriteContextImpl(getExecutionConfig(), getKeyedStateBackend(), userFunction, broadcastStates, timerService);
rContext = new ReadOnlyContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
onTimerContext = new OnTimerContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
}
示例6
@Override
protected KeyGroupPartitioner<StateTableEntry<Integer, VoidNamespace, Integer>> createPartitioner(
StateTableEntry<Integer, VoidNamespace, Integer>[] data,
int numElements,
KeyGroupRange keyGroupRange,
int totalKeyGroups,
KeyGroupPartitioner.ElementWriterFunction<
StateTableEntry<Integer, VoidNamespace, Integer>> elementWriterFunction) {
return new CopyOnWriteStateTableSnapshot.StateTableKeyGroupPartitioner<>(
data,
numElements,
keyGroupRange,
totalKeyGroups,
elementWriterFunction);
}
示例7
@Override
public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {
registeredTimer.clear();
long lastUnprocessedTime = emitResultAndCleanUpState(timerService.currentWatermark());
if (lastUnprocessedTime < Long.MAX_VALUE) {
registerTimer(lastUnprocessedTime);
}
// if we have more state at any side, then update the timer, else clean it up.
if (stateCleaningEnabled) {
if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {
registerProcessingCleanupTimer();
} else {
cleanupLastTimer();
}
}
}
示例8
@Override
public void open() throws Exception {
super.open();
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
collector = new TimestampedCollector<>(output);
this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
for (MapStateDescriptor<?, ?> descriptor: broadcastStateDescriptors) {
broadcastStates.put(descriptor, getOperatorStateBackend().getBroadcastState(descriptor));
}
rwContext = new ReadWriteContextImpl(getExecutionConfig(), getKeyedStateBackend(), userFunction, broadcastStates, timerService);
rContext = new ReadOnlyContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
onTimerContext = new OnTimerContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService);
}
示例9
@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
String[] command = element.getValue().f1.split(":");
switch (command[0]) {
case "SET_STATE":
getPartitionedState(stateDescriptor).update(command[1]);
break;
case "DELETE_STATE":
getPartitionedState(stateDescriptor).clear();
break;
case "SET_EVENT_TIME_TIMER":
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1]));
break;
case "SET_PROC_TIME_TIMER":
timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1]));
break;
case "EMIT_STATE":
String stateValue = getPartitionedState(stateDescriptor).value();
output.collect(new StreamRecord<>("ON_ELEMENT:" + element.getValue().f0 + ":" + stateValue));
break;
default:
throw new IllegalArgumentException();
}
}
示例10
@Override
public void onProcessingTime(InternalTimer<BaseRow, VoidNamespace> timer) throws Exception {
// gets all rows for the triggering timestamps
Iterable<BaseRow> inputs = dataState.get();
// insert all rows into the sort buffer
sortBuffer.clear();
inputs.forEach(sortBuffer::add);
// sort the rows
sortBuffer.sort(comparator);
// Emit the rows in order
sortBuffer.forEach((BaseRow row) -> collector.collect(row));
// remove all buffered rows
dataState.clear();
}
示例11
@Override
public void onEventTime(InternalTimer<BaseRow, VoidNamespace> timer) throws Exception {
long timestamp = timer.getTimestamp();
// gets all rows for the triggering timestamps
List<BaseRow> inputs = dataState.get(timestamp);
if (inputs != null) {
// sort rows on secondary fields if necessary
if (comparator != null) {
inputs.sort(comparator);
}
// emit rows in order
inputs.forEach((BaseRow row) -> collector.collect(row));
// remove emitted rows from state
dataState.remove(timestamp);
lastTriggeringTsState.update(timestamp);
}
}
示例12
@Override
public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {
registeredTimer.clear();
long lastUnprocessedTime = emitResultAndCleanUpState(timerService.currentWatermark());
if (lastUnprocessedTime < Long.MAX_VALUE) {
registerTimer(lastUnprocessedTime);
}
// if we have more state at any side, then update the timer, else clean it up.
if (stateCleaningEnabled) {
if (lastUnprocessedTime < Long.MAX_VALUE || rightState.iterator().hasNext()) {
registerProcessingCleanupTimer();
} else {
cleanupLastTimer();
}
}
}
示例13
public FlinkWatermarkHoldState(
KeyedStateBackend<ByteBuffer> flinkStateBackend,
MapStateDescriptor<String, Instant> watermarkHoldStateDescriptor,
String stateId,
StateNamespace namespace,
TimestampCombiner timestampCombiner) {
this.timestampCombiner = timestampCombiner;
// Combines StateNamespace and stateId to generate a unique namespace for
// watermarkHoldsState. We do not want to use Flink's namespacing to be
// able to recover watermark holds efficiently during recovery.
this.namespaceString = namespace.stringKey() + stateId;
try {
this.watermarkHoldsState =
flinkStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
watermarkHoldStateDescriptor);
} catch (Exception e) {
throw new RuntimeException("Could not access state for watermark partition view");
}
}
示例14
/**
* Read keyed state from an operator in a {@code Savepoint}.
* @param uid The uid of the operator.
* @param function The {@link KeyedStateReaderFunction} that is called for each key in state.
* @param keyTypeInfo The type information of the key in state.
* @param outTypeInfo The type information of the output of the transform reader function.
* @param <K> The type of the key in state.
* @param <OUT> The output type of the transform function.
* @return A {@code DataSet} of objects read from keyed state.
* @throws IOException If the savepoint does not contain operator state with the given uid.
*/
public <K, OUT> DataSet<OUT> readKeyedState(
String uid,
KeyedStateReaderFunction<K, OUT> function,
TypeInformation<K> keyTypeInfo,
TypeInformation<OUT> outTypeInfo) throws IOException {
OperatorState operatorState = metadata.getOperatorState(uid);
KeyedStateInputFormat<K, VoidNamespace, OUT> inputFormat = new KeyedStateInputFormat<>(
operatorState,
stateBackend,
env.getConfiguration(),
new KeyedStateReaderOperator<>(function, keyTypeInfo));
return env.createInput(inputFormat, outTypeInfo);
}
示例15
@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
String[] command = element.getValue().f1.split(":");
switch (command[0]) {
case "SET_STATE":
getPartitionedState(stateDescriptor).update(command[1]);
break;
case "DELETE_STATE":
getPartitionedState(stateDescriptor).clear();
break;
case "SET_EVENT_TIME_TIMER":
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1]));
break;
case "SET_PROC_TIME_TIMER":
timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, Long.parseLong(command[1]));
break;
case "EMIT_STATE":
String stateValue = getPartitionedState(stateDescriptor).value();
output.collect(new StreamRecord<>("ON_ELEMENT:" + element.getValue().f0 + ":" + stateValue));
break;
default:
throw new IllegalArgumentException();
}
}
示例16
@Test
public void testIteratorRemovesFromAllDescriptors() throws Exception {
AbstractKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend();
setKey(keyedStateBackend, descriptors.get(0), 1);
setKey(keyedStateBackend, descriptors.get(1), 1);
MultiStateKeyIterator<Integer> iterator =
new MultiStateKeyIterator<>(descriptors, keyedStateBackend);
int key = iterator.next();
Assert.assertEquals("Unexpected keys pulled from state backend", 1, key);
iterator.remove();
Assert.assertFalse("Failed to drop key from all descriptors in state backend", iterator.hasNext());
for (StateDescriptor<?, ?> descriptor : descriptors) {
Assert.assertEquals(
"Failed to drop key for state descriptor",
0,
keyedStateBackend.getKeys(descriptor.getName(), VoidNamespace.INSTANCE).count());
}
}
示例17
@Override
public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
// 1) get the queue of pending elements for the key and the corresponding NFA,
// 2) process the pending elements in process time order and custom comparator if exists
// by feeding them in the NFA
// 3) update the stored state for the key, by only storing the new NFA and MapState iff they
// have state to be used later.
// STEP 1
PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
NFAState nfa = getNFAState();
// STEP 2
while (!sortedTimestamps.isEmpty()) {
long timestamp = sortedTimestamps.poll();
advanceTime(nfa, timestamp);
try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
elements.forEachOrdered(
event -> {
try {
processEvent(nfa, event, timestamp);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
}
elementQueueState.remove(timestamp);
}
// STEP 3
updateNFA(nfa);
}
示例18
private void registerSmallestTimer(long timestamp) throws IOException {
Long currentRegisteredTimer = registeredTimer.value();
if (currentRegisteredTimer == null) {
registerTimer(timestamp);
} else if (currentRegisteredTimer > timestamp) {
timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, currentRegisteredTimer);
registerTimer(timestamp);
}
}
示例19
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
示例20
@Override
public Stream<T> getElements() {
return backend
.getKeys(stateName, VoidNamespace.INSTANCE)
.flatMap(
key -> {
try {
backend.setCurrentKey(key);
return StreamSupport.stream(state.get().spliterator(), false);
} catch (Exception e) {
throw new RuntimeException("Error reading keyed state.", e);
}
});
}
示例21
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
示例22
@SuppressWarnings("unchecked")
private static AbstractStreamOperator<?> createListPlainMockOp() throws Exception {
AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class);
ExecutionConfig config = new ExecutionConfig();
KeyedStateBackend keyedStateBackend = mock(KeyedStateBackend.class);
DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, config);
when(operatorMock.getExecutionConfig()).thenReturn(config);
doAnswer(new Answer<ListState<String>>() {
@Override
public ListState<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
ListStateDescriptor<String> descr =
(ListStateDescriptor<String>) invocationOnMock.getArguments()[2];
AbstractKeyedStateBackend<Integer> backend = new MemoryStateBackend().createKeyedStateBackend(
new DummyEnvironment("test_task", 1, 0),
new JobID(),
"test_op",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
backend.setCurrentKey(0);
return backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, descr);
}
}).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(ListStateDescriptor.class));
when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore);
when(operatorMock.getOperatorID()).thenReturn(new OperatorID());
return operatorMock;
}
示例23
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
示例24
private static StateTableEntry<Integer, VoidNamespace, Integer> generateElement(
@Nonnull Random random,
@Nullable StateTableEntry<Integer, VoidNamespace, Integer> next) {
Integer generatedKey = random.nextInt() & Integer.MAX_VALUE;
return new StateTableEntry<>(
generatedKey,
VoidNamespace.INSTANCE,
random.nextInt(),
generatedKey.hashCode(),
next,
0,
0);
}
示例25
@Test
public void testSetTtlTimeProvider() throws Exception {
AbstractStreamOperator<Integer> operator = new AbstractStreamOperator<Integer>() {};
try (AbstractStreamOperatorTestHarness<Integer> result = new AbstractStreamOperatorTestHarness<>(
operator,
1,
1,
0)) {
result.config.setStateKeySerializer(IntSerializer.INSTANCE);
Time timeToLive = Time.hours(1);
result.initializeState(new OperatorSubtaskState());
result.open();
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(timeToLive).build());
KeyedStateBackend<Integer> keyedStateBackend = operator.getKeyedStateBackend();
ValueState<Integer> state = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
int expectedValue = 42;
keyedStateBackend.setCurrentKey(1);
result.setStateTtlProcessingTime(0L);
state.update(expectedValue);
Assert.assertEquals(expectedValue, (int) state.value());
result.setStateTtlProcessingTime(timeToLive.toMilliseconds() + 1);
Assert.assertNull(state.value());
}
}
示例26
@Override
public void onEventTime(InternalTimer<KS, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
示例27
@Override
public void onProcessingTime(InternalTimer<KS, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
示例28
@Override
public <VS, S extends State> void applyToKeyedState(
final StateDescriptor<S, VS> stateDescriptor,
final KeyedStateFunction<KS, S> function) throws Exception {
keyedStateBackend.applyToAllKeys(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
Preconditions.checkNotNull(stateDescriptor),
Preconditions.checkNotNull(function));
}
示例29
private void populateOutputTimestampQueue() {
Preconditions.checkState(
outputTimestampQueue.isEmpty(),
"Output timestamp queue should be empty when recomputing the minimum output timestamp across all timers.");
final KeyedStateBackend<Object> keyedStateBackend = getKeyedStateBackend();
final Object currentKey = keyedStateBackend.getCurrentKey();
try (Stream<Object> keys =
keyedStateBackend.getKeys(PENDING_TIMERS_STATE_NAME, VoidNamespace.INSTANCE)) {
keys.forEach(
key -> {
keyedStateBackend.setCurrentKey(key);
try {
for (TimerData timerData : pendingTimersById.values()) {
if (timerData.getDomain() == TimeDomain.EVENT_TIME) {
long outputTimeStampMs = timerData.getOutputTimestamp().getMillis();
if (timerUsesOutputTimestamp(timerData)) {
outputTimestampQueue.add(outputTimeStampMs);
}
}
}
} catch (Exception e) {
throw new RuntimeException(
"Exception while reading set of timers for key: " + key, e);
}
});
} finally {
if (currentKey != null) {
keyedStateBackend.setCurrentKey(currentKey);
}
}
}
示例30
void accept(Message message, long delayMillis) {
Objects.requireNonNull(message);
Preconditions.checkArgument(delayMillis >= 0);
final long triggerTime = delayedMessagesTimerService.currentProcessingTime() + delayMillis;
delayedMessagesTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, triggerTime);
delayedMessagesBuffer.add(message, triggerTime);
}