Java源码示例:org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
示例1
@Override
public void cancel() {
running = false;
KinesisDataFetcher fetcher = this.fetcher;
this.fetcher = null;
// this method might be called before the subtask actually starts running,
// so we must check if the fetcher is actually created
if (fetcher != null) {
try {
// interrupt the fetcher of any work
fetcher.shutdownFetcher();
fetcher.awaitTermination();
} catch (Exception e) {
LOG.warn("Error while closing Kinesis data fetcher", e);
}
}
}
示例2
private static KinesisDataFetcher mockKinesisDataFetcher() throws Exception {
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
java.lang.reflect.Constructor<KinesisDataFetcher> ctor = (java.lang.reflect.Constructor<KinesisDataFetcher>) KinesisDataFetcher.class.getConstructors()[0];
Class<?>[] otherParamTypes = new Class<?>[ctor.getParameterTypes().length - 1];
System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes, 0, ctor.getParameterTypes().length - 1);
Supplier<Object[]> argumentSupplier = () -> {
Object[] otherParamArgs = new Object[otherParamTypes.length];
for (int i = 0; i < otherParamTypes.length; i++) {
otherParamArgs[i] = Mockito.nullable(otherParamTypes[i]);
}
return otherParamArgs;
};
PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
argumentSupplier.get()).thenReturn(mockedFetcher);
return mockedFetcher;
}
示例3
@Override
public void cancel() {
running = false;
KinesisDataFetcher fetcher = this.fetcher;
this.fetcher = null;
// this method might be called before the subtask actually starts running,
// so we must check if the fetcher is actually created
if (fetcher != null) {
try {
// interrupt the fetcher of any work
fetcher.shutdownFetcher();
fetcher.awaitTermination();
} catch (Exception e) {
LOG.warn("Error while closing Kinesis data fetcher", e);
}
}
}
示例4
private static KinesisDataFetcher mockKinesisDataFetcher() throws Exception {
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
java.lang.reflect.Constructor<KinesisDataFetcher> ctor = (java.lang.reflect.Constructor<KinesisDataFetcher>) KinesisDataFetcher.class.getConstructors()[0];
Class<?>[] otherParamTypes = new Class<?>[ctor.getParameterTypes().length - 1];
System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes, 0, ctor.getParameterTypes().length - 1);
Supplier<Object[]> argumentSupplier = () -> {
Object[] otherParamArgs = new Object[otherParamTypes.length];
for (int i = 0; i < otherParamTypes.length; i++) {
otherParamArgs[i] = Mockito.nullable(otherParamTypes[i]);
}
return otherParamArgs;
};
PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
argumentSupplier.get()).thenReturn(mockedFetcher);
return mockedFetcher;
}
示例5
@Override
public void cancel() {
running = false;
KinesisDataFetcher fetcher = this.fetcher;
this.fetcher = null;
// this method might be called before the subtask actually starts running,
// so we must check if the fetcher is actually created
if (fetcher != null) {
try {
// interrupt the fetcher of any work
fetcher.shutdownFetcher();
fetcher.awaitTermination();
} catch (Exception e) {
LOG.warn("Error while closing Kinesis data fetcher", e);
}
}
}
示例6
private static KinesisDataFetcher mockKinesisDataFetcher() throws Exception {
KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);
java.lang.reflect.Constructor<KinesisDataFetcher> ctor = (java.lang.reflect.Constructor<KinesisDataFetcher>) KinesisDataFetcher.class.getConstructors()[0];
Class<?>[] otherParamTypes = new Class<?>[ctor.getParameterTypes().length - 1];
System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes, 0, ctor.getParameterTypes().length - 1);
Supplier<Object[]> argumentSupplier = () -> {
Object[] otherParamArgs = new Object[otherParamTypes.length];
for (int i = 0; i < otherParamTypes.length; i++) {
otherParamArgs[i] = Mockito.nullable(otherParamTypes[i]);
}
return otherParamArgs;
};
PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
argumentSupplier.get()).thenReturn(mockedFetcher);
return mockedFetcher;
}
示例7
@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return new DynamoDBStreamsDataFetcher<T>(
streams,
sourceContext,
runtimeContext,
configProps,
deserializationSchema,
getShardAssigner());
}
示例8
/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
}
示例9
@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializer) {
return mockFetcher;
}
示例10
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
}
示例11
@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return new DynamoDBStreamsDataFetcher<T>(
streams,
sourceContext,
runtimeContext,
configProps,
deserializationSchema,
getShardAssigner());
}
示例12
/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
}
示例13
@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializer) {
return mockFetcher;
}
示例14
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
}
示例15
@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return new DynamoDBStreamsDataFetcher<T>(
streams,
sourceContext,
runtimeContext,
configProps,
deserializationSchema,
getShardAssigner());
}
示例16
/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceFunction.SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializationSchema) {
return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
}
示例17
@Override
protected KinesisDataFetcher<T> createFetcher(
List<String> streams,
SourceContext<T> sourceContext,
RuntimeContext runtimeContext,
Properties configProps,
KinesisDeserializationSchema<T> deserializer) {
return mockFetcher;
}
示例18
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
}
示例19
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
// can potentially have new shards to subscribe to later on
KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);
// initial discovery
List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
for (StreamShardHandle shard : allShards) {
StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));
if (sequenceNumsToRestore != null) {
if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
// just use the sequence number stored in the state
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
" starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
}
} else {
// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
}
}
} else {
// we're starting fresh; use the configured start position as initial state
SentinelSequenceNumber startingSeqNum =
InitialPosition.valueOf(configProps.getProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
}
}
}
// check that we are running before starting the fetcher
if (!running) {
return;
}
// expose the fetcher from this point, so that state
// snapshots can be taken from the fetcher's state holders
this.fetcher = fetcher;
// start the fetcher loop. The fetcher will stop running only when cancel() or
// close() is called, or an error is thrown by threads created by the fetcher
fetcher.runFetcher();
// check that the fetcher has terminated before fully closing
fetcher.awaitTermination();
sourceContext.close();
}
示例20
DummyFlinkKinesisConsumer(KinesisDataFetcher<T> mockFetcher, KinesisDeserializationSchema<T> schema) {
super(TEST_STREAM_NAME, schema, dummyConfig);
this.mockFetcher = mockFetcher;
}
示例21
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
Properties config = TestUtils.getStandardProperties();
List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(0);
when(context.getNumberOfParallelSubtasks()).thenReturn(2);
consumer.setRuntimeContext(context);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
consumer.initializeState(initializationContext);
// only opened, not run
consumer.open(new Configuration());
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
assertTrue(listState.isClearCalled());
// the checkpointed list state should contain only the shards that it should subscribe to
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
示例22
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
示例23
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
shards.add(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
fakeRestoredState.put(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
示例24
/**
* FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or
* {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored.
* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
* <p></p>
* Therefore, we will rely on synchronizing the snapshot's state with the Kinesis shard before attempting to find back
* the sequence number to restore.
*/
@Test
public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheStateWasStored() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
// create a fake stream shard handle based on the first entry in the restored state
final StreamShardHandle originalStreamShardHandle = fakeRestoredState.keySet().iterator().next();
final StreamShardHandle closedStreamShardHandle = new StreamShardHandle(originalStreamShardHandle.getStreamName(), originalStreamShardHandle.getShard());
// close the shard handle by setting an ending sequence number
final SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
sequenceNumberRange.setEndingSequenceNumber("1293844");
closedStreamShardHandle.getShard().setSequenceNumberRange(sequenceNumberRange);
shards.add(closedStreamShardHandle);
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(closedStreamShardHandle),
closedStreamShardHandle, fakeRestoredState.get(closedStreamShardHandle)));
}
示例25
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
// can potentially have new shards to subscribe to later on
KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);
// initial discovery
List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();
for (StreamShardHandle shard : allShards) {
StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));
if (sequenceNumsToRestore != null) {
if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
// if the shard was already seen and is contained in the state,
// just use the sequence number stored in the state
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
" starting state set to the restored sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
}
} else {
// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
}
}
} else {
// we're starting fresh; use the configured start position as initial state
SentinelSequenceNumber startingSeqNum =
InitialPosition.valueOf(configProps.getProperty(
ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();
fetcher.registerNewSubscribedShardState(
new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));
if (LOG.isInfoEnabled()) {
LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
}
}
}
// check that we are running before starting the fetcher
if (!running) {
return;
}
// expose the fetcher from this point, so that state
// snapshots can be taken from the fetcher's state holders
this.fetcher = fetcher;
// start the fetcher loop. The fetcher will stop running only when cancel() or
// close() is called, or an error is thrown by threads created by the fetcher
fetcher.runFetcher();
// check that the fetcher has terminated before fully closing
fetcher.awaitTermination();
sourceContext.close();
}
示例26
DummyFlinkKinesisConsumer(KinesisDataFetcher<T> mockFetcher, KinesisDeserializationSchema<T> schema) {
super(TEST_STREAM_NAME, schema, dummyConfig);
this.mockFetcher = mockFetcher;
}
示例27
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
Properties config = TestUtils.getStandardProperties();
List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
new SequenceNumber("1")));
globalUnionState.add(Tuple2.of(
KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
new SequenceNumber("1")));
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
listState.add(state);
}
FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
RuntimeContext context = mock(RuntimeContext.class);
when(context.getIndexOfThisSubtask()).thenReturn(0);
when(context.getNumberOfParallelSubtasks()).thenReturn(2);
consumer.setRuntimeContext(context);
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
consumer.initializeState(initializationContext);
// only opened, not run
consumer.open(new Configuration());
// arbitrary checkpoint id and timestamp
consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));
assertTrue(listState.isClearCalled());
// the checkpointed list state should contain only the shards that it should subscribe to
assertEquals(globalUnionState.size() / 2, listState.getList().size());
assertTrue(listState.getList().contains(globalUnionState.get(0)));
assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
示例28
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
示例29
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
shards.addAll(fakeRestoredState.keySet());
shards.add(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
fakeRestoredState.put(new StreamShardHandle("fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
restoredShard.getKey(), restoredShard.getValue()));
}
}
示例30
/**
* FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#getShardId()} or
* {@link StreamShardMetadata#getStreamName()} does not result in the shard not being able to be restored.
* This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
* job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
* {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
* <p></p>
* Therefore, we will rely on synchronizing the snapshot's state with the Kinesis shard before attempting to find back
* the sequence number to restore.
*/
@Test
public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheStateWasStored() throws Exception {
// ----------------------------------------------------------------------
// setup initial state
// ----------------------------------------------------------------------
HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");
// ----------------------------------------------------------------------
// mock operator state backend and initial state for initializeState()
// ----------------------------------------------------------------------
TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
}
OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
StateInitializationContext initializationContext = mock(StateInitializationContext.class);
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
when(initializationContext.isRestored()).thenReturn(true);
// ----------------------------------------------------------------------
// mock fetcher
// ----------------------------------------------------------------------
KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
List<StreamShardHandle> shards = new ArrayList<>();
// create a fake stream shard handle based on the first entry in the restored state
final StreamShardHandle originalStreamShardHandle = fakeRestoredState.keySet().iterator().next();
final StreamShardHandle closedStreamShardHandle = new StreamShardHandle(originalStreamShardHandle.getStreamName(), originalStreamShardHandle.getShard());
// close the shard handle by setting an ending sequence number
final SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
sequenceNumberRange.setEndingSequenceNumber("1293844");
closedStreamShardHandle.getShard().setSequenceNumberRange(sequenceNumberRange);
shards.add(closedStreamShardHandle);
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
// assume the given config is correct
PowerMockito.mockStatic(KinesisConfigUtil.class);
PowerMockito.doNothing().when(KinesisConfigUtil.class);
// ----------------------------------------------------------------------
// start to test fetcher's initial state seeding
// ----------------------------------------------------------------------
TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
"fakeStream", new Properties(), 10, 2);
consumer.initializeState(initializationContext);
consumer.open(new Configuration());
consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(closedStreamShardHandle),
closedStreamShardHandle, fakeRestoredState.get(closedStreamShardHandle)));
}