Java源码示例:org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher

示例1
@Override
public void cancel() {
	running = false;

	KinesisDataFetcher fetcher = this.fetcher;
	this.fetcher = null;

	// this method might be called before the subtask actually starts running,
	// so we must check if the fetcher is actually created
	if (fetcher != null) {
		try {
			// interrupt the fetcher of any work
			fetcher.shutdownFetcher();
			fetcher.awaitTermination();
		} catch (Exception e) {
			LOG.warn("Error while closing Kinesis data fetcher", e);
		}
	}
}
 
示例2
private static KinesisDataFetcher mockKinesisDataFetcher() throws Exception {
	KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);

	java.lang.reflect.Constructor<KinesisDataFetcher> ctor = (java.lang.reflect.Constructor<KinesisDataFetcher>) KinesisDataFetcher.class.getConstructors()[0];
	Class<?>[] otherParamTypes = new Class<?>[ctor.getParameterTypes().length - 1];
	System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes, 0, ctor.getParameterTypes().length - 1);

	Supplier<Object[]> argumentSupplier = () -> {
		Object[] otherParamArgs = new Object[otherParamTypes.length];
		for (int i = 0; i < otherParamTypes.length; i++) {
			otherParamArgs[i] = Mockito.nullable(otherParamTypes[i]);
		}
		return otherParamArgs;
	};
	PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
		argumentSupplier.get()).thenReturn(mockedFetcher);
	return mockedFetcher;
}
 
示例3
@Override
public void cancel() {
	running = false;

	KinesisDataFetcher fetcher = this.fetcher;
	this.fetcher = null;

	// this method might be called before the subtask actually starts running,
	// so we must check if the fetcher is actually created
	if (fetcher != null) {
		try {
			// interrupt the fetcher of any work
			fetcher.shutdownFetcher();
			fetcher.awaitTermination();
		} catch (Exception e) {
			LOG.warn("Error while closing Kinesis data fetcher", e);
		}
	}
}
 
示例4
private static KinesisDataFetcher mockKinesisDataFetcher() throws Exception {
	KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);

	java.lang.reflect.Constructor<KinesisDataFetcher> ctor = (java.lang.reflect.Constructor<KinesisDataFetcher>) KinesisDataFetcher.class.getConstructors()[0];
	Class<?>[] otherParamTypes = new Class<?>[ctor.getParameterTypes().length - 1];
	System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes, 0, ctor.getParameterTypes().length - 1);

	Supplier<Object[]> argumentSupplier = () -> {
		Object[] otherParamArgs = new Object[otherParamTypes.length];
		for (int i = 0; i < otherParamTypes.length; i++) {
			otherParamArgs[i] = Mockito.nullable(otherParamTypes[i]);
		}
		return otherParamArgs;
	};
	PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
		argumentSupplier.get()).thenReturn(mockedFetcher);
	return mockedFetcher;
}
 
示例5
@Override
public void cancel() {
	running = false;

	KinesisDataFetcher fetcher = this.fetcher;
	this.fetcher = null;

	// this method might be called before the subtask actually starts running,
	// so we must check if the fetcher is actually created
	if (fetcher != null) {
		try {
			// interrupt the fetcher of any work
			fetcher.shutdownFetcher();
			fetcher.awaitTermination();
		} catch (Exception e) {
			LOG.warn("Error while closing Kinesis data fetcher", e);
		}
	}
}
 
示例6
private static KinesisDataFetcher mockKinesisDataFetcher() throws Exception {
	KinesisDataFetcher mockedFetcher = Mockito.mock(KinesisDataFetcher.class);

	java.lang.reflect.Constructor<KinesisDataFetcher> ctor = (java.lang.reflect.Constructor<KinesisDataFetcher>) KinesisDataFetcher.class.getConstructors()[0];
	Class<?>[] otherParamTypes = new Class<?>[ctor.getParameterTypes().length - 1];
	System.arraycopy(ctor.getParameterTypes(), 1, otherParamTypes, 0, ctor.getParameterTypes().length - 1);

	Supplier<Object[]> argumentSupplier = () -> {
		Object[] otherParamArgs = new Object[otherParamTypes.length];
		for (int i = 0; i < otherParamTypes.length; i++) {
			otherParamArgs[i] = Mockito.nullable(otherParamTypes[i]);
		}
		return otherParamArgs;
	};
	PowerMockito.whenNew(ctor).withArguments(Mockito.any(ctor.getParameterTypes()[0]),
		argumentSupplier.get()).thenReturn(mockedFetcher);
	return mockedFetcher;
}
 
示例7
@Override
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceFunction.SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializationSchema) {
	return new DynamoDBStreamsDataFetcher<T>(
			streams,
			sourceContext,
			runtimeContext,
			configProps,
			deserializationSchema,
			getShardAssigner());
}
 
示例8
/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceFunction.SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializationSchema) {

	return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
}
 
示例9
@Override
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializer) {
	return mockFetcher;
}
 
示例10
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
}
 
示例11
@Override
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceFunction.SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializationSchema) {
	return new DynamoDBStreamsDataFetcher<T>(
			streams,
			sourceContext,
			runtimeContext,
			configProps,
			deserializationSchema,
			getShardAssigner());
}
 
示例12
/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceFunction.SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializationSchema) {

	return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
}
 
示例13
@Override
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializer) {
	return mockFetcher;
}
 
示例14
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
}
 
示例15
@Override
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceFunction.SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializationSchema) {
	return new DynamoDBStreamsDataFetcher<T>(
			streams,
			sourceContext,
			runtimeContext,
			configProps,
			deserializationSchema,
			getShardAssigner());
}
 
示例16
/** This method is exposed for tests that need to mock the KinesisDataFetcher in the consumer. */
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceFunction.SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializationSchema) {

	return new KinesisDataFetcher<>(streams, sourceContext, runtimeContext, configProps, deserializationSchema, shardAssigner, periodicWatermarkAssigner, watermarkTracker);
}
 
示例17
@Override
protected KinesisDataFetcher<T> createFetcher(
		List<String> streams,
		SourceContext<T> sourceContext,
		RuntimeContext runtimeContext,
		Properties configProps,
		KinesisDeserializationSchema<T> deserializer) {
	return mockFetcher;
}
 
示例18
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoint() throws Exception {
	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
}
 
示例19
@Override
public void run(SourceContext<T> sourceContext) throws Exception {

	// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
	// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
	// can potentially have new shards to subscribe to later on
	KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

	// initial discovery
	List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();

	for (StreamShardHandle shard : allShards) {
		StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
			new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));

		if (sequenceNumsToRestore != null) {

			if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
				// if the shard was already seen and is contained in the state,
				// just use the sequence number stored in the state
				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
							" starting state set to the restored sequence number {}",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
				}
			} else {
				// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
							" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
				}
			}
		} else {
			// we're starting fresh; use the configured start position as initial state
			SentinelSequenceNumber startingSeqNum =
				InitialPosition.valueOf(configProps.getProperty(
					ConsumerConfigConstants.STREAM_INITIAL_POSITION,
					ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();

			fetcher.registerNewSubscribedShardState(
				new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));

			if (LOG.isInfoEnabled()) {
				LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
					getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
			}
		}
	}

	// check that we are running before starting the fetcher
	if (!running) {
		return;
	}

	// expose the fetcher from this point, so that state
	// snapshots can be taken from the fetcher's state holders
	this.fetcher = fetcher;

	// start the fetcher loop. The fetcher will stop running only when cancel() or
	// close() is called, or an error is thrown by threads created by the fetcher
	fetcher.runFetcher();

	// check that the fetcher has terminated before fully closing
	fetcher.awaitTermination();
	sourceContext.close();
}
 
示例20
DummyFlinkKinesisConsumer(KinesisDataFetcher<T> mockFetcher, KinesisDeserializationSchema<T> schema) {
	super(TEST_STREAM_NAME, schema, dummyConfig);
	this.mockFetcher = mockFetcher;
}
 
示例21
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
	Properties config = TestUtils.getStandardProperties();

	List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
		new SequenceNumber("1")));

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
		listState.add(state);
	}

	FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
	RuntimeContext context = mock(RuntimeContext.class);
	when(context.getIndexOfThisSubtask()).thenReturn(0);
	when(context.getNumberOfParallelSubtasks()).thenReturn(2);
	consumer.setRuntimeContext(context);

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	consumer.initializeState(initializationContext);

	// only opened, not run
	consumer.open(new Configuration());

	// arbitrary checkpoint id and timestamp
	consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));

	assertTrue(listState.isClearCalled());

	// the checkpointed list state should contain only the shards that it should subscribe to
	assertEquals(globalUnionState.size() / 2, listState.getList().size());
	assertTrue(listState.getList().contains(globalUnionState.get(0)));
	assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
 
示例22
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {

	// ----------------------------------------------------------------------
	// setup initial state
	// ----------------------------------------------------------------------

	HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");

	// ----------------------------------------------------------------------
	// mock operator state backend and initial state for initializeState()
	// ----------------------------------------------------------------------

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
		listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
	}

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	// ----------------------------------------------------------------------
	// mock fetcher
	// ----------------------------------------------------------------------

	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
	List<StreamShardHandle> shards = new ArrayList<>();
	shards.addAll(fakeRestoredState.keySet());
	when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	// ----------------------------------------------------------------------
	// start to test fetcher's initial state seeding
	// ----------------------------------------------------------------------

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.initializeState(initializationContext);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

	for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
		Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
				restoredShard.getKey(), restoredShard.getValue()));
	}
}
 
示例23
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {

	// ----------------------------------------------------------------------
	// setup initial state
	// ----------------------------------------------------------------------

	HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");

	// ----------------------------------------------------------------------
	// mock operator state backend and initial state for initializeState()
	// ----------------------------------------------------------------------

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
		listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
	}

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	// ----------------------------------------------------------------------
	// mock fetcher
	// ----------------------------------------------------------------------

	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
	List<StreamShardHandle> shards = new ArrayList<>();
	shards.addAll(fakeRestoredState.keySet());
	shards.add(new StreamShardHandle("fakeStream2",
		new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
	when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	// ----------------------------------------------------------------------
	// start to test fetcher's initial state seeding
	// ----------------------------------------------------------------------

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.initializeState(initializationContext);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

	fakeRestoredState.put(new StreamShardHandle("fakeStream2",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
		SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
	for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
		Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
				restoredShard.getKey(), restoredShard.getValue()));
	}
}
 
示例24
/**
 * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#shardId} or
 * {@link StreamShardMetadata#streamName} does not result in the shard not being able to be restored.
 * This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
 * job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
 * {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
 * <p></p>
 * Therefore, we will rely on synchronizing the snapshot's state with the Kinesis shard before attempting to find back
 * the sequence number to restore.
 */
@Test
public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheStateWasStored() throws Exception {
	// ----------------------------------------------------------------------
	// setup initial state
	// ----------------------------------------------------------------------

	HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");

	// ----------------------------------------------------------------------
	// mock operator state backend and initial state for initializeState()
	// ----------------------------------------------------------------------

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
		listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
	}

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	// ----------------------------------------------------------------------
	// mock fetcher
	// ----------------------------------------------------------------------

	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
	List<StreamShardHandle> shards = new ArrayList<>();

	// create a fake stream shard handle based on the first entry in the restored state
	final StreamShardHandle originalStreamShardHandle = fakeRestoredState.keySet().iterator().next();
	final StreamShardHandle closedStreamShardHandle = new StreamShardHandle(originalStreamShardHandle.getStreamName(), originalStreamShardHandle.getShard());
	// close the shard handle by setting an ending sequence number
	final SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
	sequenceNumberRange.setEndingSequenceNumber("1293844");
	closedStreamShardHandle.getShard().setSequenceNumberRange(sequenceNumberRange);

	shards.add(closedStreamShardHandle);

	when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	// ----------------------------------------------------------------------
	// start to test fetcher's initial state seeding
	// ----------------------------------------------------------------------

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.initializeState(initializationContext);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

	Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
		new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(closedStreamShardHandle),
			closedStreamShardHandle, fakeRestoredState.get(closedStreamShardHandle)));
}
 
示例25
@Override
public void run(SourceContext<T> sourceContext) throws Exception {

	// all subtasks will run a fetcher, regardless of whether or not the subtask will initially have
	// shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks
	// can potentially have new shards to subscribe to later on
	KinesisDataFetcher<T> fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer);

	// initial discovery
	List<StreamShardHandle> allShards = fetcher.discoverNewShardsToSubscribe();

	for (StreamShardHandle shard : allShards) {
		StreamShardMetadata.EquivalenceWrapper kinesisStreamShard =
			new StreamShardMetadata.EquivalenceWrapper(KinesisDataFetcher.convertToStreamShardMetadata(shard));

		if (sequenceNumsToRestore != null) {

			if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
				// if the shard was already seen and is contained in the state,
				// just use the sequence number stored in the state
				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, sequenceNumsToRestore.get(kinesisStreamShard)));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} is seeding the fetcher with restored shard {}," +
							" starting state set to the restored sequence number {}",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), sequenceNumsToRestore.get(kinesisStreamShard));
				}
			} else {
				// the shard wasn't discovered in the previous run, therefore should be consumed from the beginning
				fetcher.registerNewSubscribedShardState(
					new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()));

				if (LOG.isInfoEnabled()) {
					LOG.info("Subtask {} is seeding the fetcher with new discovered shard {}," +
							" starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM",
						getRuntimeContext().getIndexOfThisSubtask(), shard.toString());
				}
			}
		} else {
			// we're starting fresh; use the configured start position as initial state
			SentinelSequenceNumber startingSeqNum =
				InitialPosition.valueOf(configProps.getProperty(
					ConsumerConfigConstants.STREAM_INITIAL_POSITION,
					ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)).toSentinelSequenceNumber();

			fetcher.registerNewSubscribedShardState(
				new KinesisStreamShardState(kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get()));

			if (LOG.isInfoEnabled()) {
				LOG.info("Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}",
					getRuntimeContext().getIndexOfThisSubtask(), shard.toString(), startingSeqNum.get());
			}
		}
	}

	// check that we are running before starting the fetcher
	if (!running) {
		return;
	}

	// expose the fetcher from this point, so that state
	// snapshots can be taken from the fetcher's state holders
	this.fetcher = fetcher;

	// start the fetcher loop. The fetcher will stop running only when cancel() or
	// close() is called, or an error is thrown by threads created by the fetcher
	fetcher.runFetcher();

	// check that the fetcher has terminated before fully closing
	fetcher.awaitTermination();
	sourceContext.close();
}
 
示例26
DummyFlinkKinesisConsumer(KinesisDataFetcher<T> mockFetcher, KinesisDeserializationSchema<T> schema) {
	super(TEST_STREAM_NAME, schema, dummyConfig);
	this.mockFetcher = mockFetcher;
}
 
示例27
@Test
public void testUseRestoredStateForSnapshotIfFetcherNotInitialized() throws Exception {
	Properties config = TestUtils.getStandardProperties();

	List<Tuple2<StreamShardMetadata, SequenceNumber>> globalUnionState = new ArrayList<>(4);
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2)))),
		new SequenceNumber("1")));
	globalUnionState.add(Tuple2.of(
		KinesisDataFetcher.convertToStreamShardMetadata(new StreamShardHandle("fakeStream",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(3)))),
		new SequenceNumber("1")));

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Tuple2<StreamShardMetadata, SequenceNumber> state : globalUnionState) {
		listState.add(state);
	}

	FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("fakeStream", new SimpleStringSchema(), config);
	RuntimeContext context = mock(RuntimeContext.class);
	when(context.getIndexOfThisSubtask()).thenReturn(0);
	when(context.getNumberOfParallelSubtasks()).thenReturn(2);
	consumer.setRuntimeContext(context);

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	consumer.initializeState(initializationContext);

	// only opened, not run
	consumer.open(new Configuration());

	// arbitrary checkpoint id and timestamp
	consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123));

	assertTrue(listState.isClearCalled());

	// the checkpointed list state should contain only the shards that it should subscribe to
	assertEquals(globalUnionState.size() / 2, listState.getList().size());
	assertTrue(listState.getList().contains(globalUnionState.get(0)));
	assertTrue(listState.getList().contains(globalUnionState.get(2)));
}
 
示例28
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {

	// ----------------------------------------------------------------------
	// setup initial state
	// ----------------------------------------------------------------------

	HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");

	// ----------------------------------------------------------------------
	// mock operator state backend and initial state for initializeState()
	// ----------------------------------------------------------------------

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
		listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
	}

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	// ----------------------------------------------------------------------
	// mock fetcher
	// ----------------------------------------------------------------------

	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
	List<StreamShardHandle> shards = new ArrayList<>();
	shards.addAll(fakeRestoredState.keySet());
	when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	// ----------------------------------------------------------------------
	// start to test fetcher's initial state seeding
	// ----------------------------------------------------------------------

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.initializeState(initializationContext);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

	for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
		Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
				restoredShard.getKey(), restoredShard.getValue()));
	}
}
 
示例29
@Test
@SuppressWarnings("unchecked")
public void testFetcherShouldBeCorrectlySeededWithNewDiscoveredKinesisStreamShard() throws Exception {

	// ----------------------------------------------------------------------
	// setup initial state
	// ----------------------------------------------------------------------

	HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");

	// ----------------------------------------------------------------------
	// mock operator state backend and initial state for initializeState()
	// ----------------------------------------------------------------------

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
		listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
	}

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	// ----------------------------------------------------------------------
	// mock fetcher
	// ----------------------------------------------------------------------

	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
	List<StreamShardHandle> shards = new ArrayList<>();
	shards.addAll(fakeRestoredState.keySet());
	shards.add(new StreamShardHandle("fakeStream2",
		new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))));
	when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	// ----------------------------------------------------------------------
	// start to test fetcher's initial state seeding
	// ----------------------------------------------------------------------

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.initializeState(initializationContext);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

	fakeRestoredState.put(new StreamShardHandle("fakeStream2",
			new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
		SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get());
	for (Map.Entry<StreamShardHandle, SequenceNumber> restoredShard : fakeRestoredState.entrySet()) {
		Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
			new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(restoredShard.getKey()),
				restoredShard.getKey(), restoredShard.getValue()));
	}
}
 
示例30
/**
 * FLINK-8484: ensure that a state change in the StreamShardMetadata other than {@link StreamShardMetadata#getShardId()} or
 * {@link StreamShardMetadata#getStreamName()} does not result in the shard not being able to be restored.
 * This handles the corner case where the stored shard metadata is open (no ending sequence number), but after the
 * job restore, the shard has been closed (ending number set) due to re-sharding, and we can no longer rely on
 * {@link StreamShardMetadata#equals(Object)} to find back the sequence number in the collection of restored shard metadata.
 * <p></p>
 * Therefore, we will rely on synchronizing the snapshot's state with the Kinesis shard before attempting to find back
 * the sequence number to restore.
 */
@Test
public void testFindSequenceNumberToRestoreFromIfTheShardHasBeenClosedSinceTheStateWasStored() throws Exception {
	// ----------------------------------------------------------------------
	// setup initial state
	// ----------------------------------------------------------------------

	HashMap<StreamShardHandle, SequenceNumber> fakeRestoredState = getFakeRestoredStore("all");

	// ----------------------------------------------------------------------
	// mock operator state backend and initial state for initializeState()
	// ----------------------------------------------------------------------

	TestingListState<Tuple2<StreamShardMetadata, SequenceNumber>> listState = new TestingListState<>();
	for (Map.Entry<StreamShardHandle, SequenceNumber> state : fakeRestoredState.entrySet()) {
		listState.add(Tuple2.of(KinesisDataFetcher.convertToStreamShardMetadata(state.getKey()), state.getValue()));
	}

	OperatorStateStore operatorStateStore = mock(OperatorStateStore.class);
	when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);

	StateInitializationContext initializationContext = mock(StateInitializationContext.class);
	when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
	when(initializationContext.isRestored()).thenReturn(true);

	// ----------------------------------------------------------------------
	// mock fetcher
	// ----------------------------------------------------------------------

	KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher();
	List<StreamShardHandle> shards = new ArrayList<>();

	// create a fake stream shard handle based on the first entry in the restored state
	final StreamShardHandle originalStreamShardHandle = fakeRestoredState.keySet().iterator().next();
	final StreamShardHandle closedStreamShardHandle = new StreamShardHandle(originalStreamShardHandle.getStreamName(), originalStreamShardHandle.getShard());
	// close the shard handle by setting an ending sequence number
	final SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
	sequenceNumberRange.setEndingSequenceNumber("1293844");
	closedStreamShardHandle.getShard().setSequenceNumberRange(sequenceNumberRange);

	shards.add(closedStreamShardHandle);

	when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);

	// assume the given config is correct
	PowerMockito.mockStatic(KinesisConfigUtil.class);
	PowerMockito.doNothing().when(KinesisConfigUtil.class);

	// ----------------------------------------------------------------------
	// start to test fetcher's initial state seeding
	// ----------------------------------------------------------------------

	TestableFlinkKinesisConsumer consumer = new TestableFlinkKinesisConsumer(
		"fakeStream", new Properties(), 10, 2);
	consumer.initializeState(initializationContext);
	consumer.open(new Configuration());
	consumer.run(Mockito.mock(SourceFunction.SourceContext.class));

	Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
		new KinesisStreamShardState(KinesisDataFetcher.convertToStreamShardMetadata(closedStreamShardHandle),
			closedStreamShardHandle, fakeRestoredState.get(closedStreamShardHandle)));
}