Java源码示例:org.apache.flink.cep.nfa.NFAStateSerializer
示例1
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// initializeState through the provided context
computationStates = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>(
NFA_STATE_NAME,
new NFAStateSerializer()));
partialMatches = new SharedBuffer<>(context.getKeyedStateStore(), inputSerializer);
elementQueueState = context.getKeyedStateStore().getMapState(
new MapStateDescriptor<>(
EVENT_QUEUE_STATE_NAME,
LongSerializer.INSTANCE,
new ListSerializer<>(inputSerializer)));
migrateOldState();
}
示例2
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// initializeState through the provided context
computationStates = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>(
NFA_STATE_NAME,
new NFAStateSerializer()));
partialMatches = new SharedBuffer<>(context.getKeyedStateStore(), inputSerializer);
elementQueueState = context.getKeyedStateStore().getMapState(
new MapStateDescriptor<>(
EVENT_QUEUE_STATE_NAME,
LongSerializer.INSTANCE,
new ListSerializer<>(inputSerializer)));
migrateOldState();
}
示例3
@Override
public TypeSerializer<NFAState> createPriorSerializer() {
return new NFAStateSerializer();
}
示例4
@Override
public TypeSerializer<NFAState> createUpgradedSerializer() {
return new NFAStateSerializer();
}