Java源码示例:org.apache.flink.streaming.api.operators.KeyContext
示例1
public TriggerTestHarness(
Trigger<T, W> trigger,
TypeSerializer<W> windowSerializer) throws Exception {
this.trigger = trigger;
this.windowSerializer = windowSerializer;
// we only ever use one key, other tests make sure that windows work across different
// keys
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
MemoryStateBackend backend = new MemoryStateBackend();
@SuppressWarnings("unchecked")
HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(
dummyEnv,
new JobID(),
"test_op",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
this.stateBackend = stateBackend;
this.stateBackend.setCurrentKey(KEY);
this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
@Override
public void setCurrentKey(Object key) {
// ignore
}
@Override
public Object getCurrentKey() {
return KEY;
}
});
}
示例2
public TriggerTestHarness(
Trigger<T, W> trigger,
TypeSerializer<W> windowSerializer) throws Exception {
this.trigger = trigger;
this.windowSerializer = windowSerializer;
// we only ever use one key, other tests make sure that windows work across different
// keys
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
MemoryStateBackend backend = new MemoryStateBackend();
@SuppressWarnings("unchecked")
HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(
dummyEnv,
new JobID(),
"test_op",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
this.stateBackend = stateBackend;
this.stateBackend.setCurrentKey(KEY);
this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
@Override
public void setCurrentKey(Object key) {
// ignore
}
@Override
public Object getCurrentKey() {
return KEY;
}
});
}
示例3
public TriggerTestHarness(
Trigger<T, W> trigger,
TypeSerializer<W> windowSerializer) throws Exception {
this.trigger = trigger;
this.windowSerializer = windowSerializer;
// we only ever use one key, other tests make sure that windows work across different
// keys
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
MemoryStateBackend backend = new MemoryStateBackend();
@SuppressWarnings("unchecked")
HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(
dummyEnv,
new JobID(),
"test_op",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
this.stateBackend = stateBackend;
this.stateBackend.setCurrentKey(KEY);
this.internalTimerService = new TestInternalTimerService<>(new KeyContext() {
@Override
public void setCurrentKey(Object key) {
// ignore
}
@Override
public Object getCurrentKey() {
return KEY;
}
});
}
示例4
/**
* Sets keyContext to RankFunction.
*
* @param keyContext keyContext of current function.
*/
public void setKeyContext(KeyContext keyContext) {
this.keyContext = keyContext;
}
示例5
/**
* Sets keyContext to RankFunction.
*
* @param keyContext keyContext of current function.
*/
public void setKeyContext(KeyContext keyContext) {
this.keyContext = keyContext;
}