Java源码示例:org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig
示例1
/**
* This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
* the snapshot method does indeed finishes without waiting for pending records;
* we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(false);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg"));
// make sure that all callbacks have not been completed
verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
// should return even if there are pending records
testHarness.snapshot(123L, 123L);
testHarness.close();
}
示例2
/**
* This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
* the snapshot method does indeed finishes without waiting for pending records;
* we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(false);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg"));
// make sure that all callbacks have not been completed
verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
// should return even if there are pending records
testHarness.snapshot(123L, 123L);
testHarness.close();
}
示例3
/**
* This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled,
* the snapshot method does indeed finishes without waiting for pending records;
* we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(false);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg"));
// make sure that all callbacks have not been completed
verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class));
// should return even if there are pending records
testHarness.snapshot(123L, 123L);
testHarness.close();
}
示例4
/**
* Tests that partitions list is determinate and correctly provided to custom partitioner.
*/
@SuppressWarnings("unchecked")
@Test
public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);
RuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
// out-of-order list of 4 partitions
List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner);
producer.setRuntimeContext(mockRuntimeContext);
final KafkaProducer mockProducer = producer.getMockKafkaProducer();
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
when(mockProducer.metrics()).thenReturn(null);
producer.open(new Configuration());
verify(mockPartitioner, times(1)).open(0, 1);
producer.invoke("foobar", SinkContextUtil.forTimestamp(0));
verify(mockPartitioner, times(1)).partition(
"foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
}
示例5
/**
* Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown.
*/
@Test
public void testAsyncErrorRethrownOnInvoke() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
// let the message request return an async exception
producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
try {
testHarness.processElement(new StreamRecord<>("msg-2"));
} catch (Exception e) {
// the next invoke should rethrow the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
// test succeeded
return;
}
Assert.fail();
}
示例6
/**
* Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown.
*/
@Test
public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
// let the message request return an async exception
producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
try {
testHarness.snapshot(123L, 123L);
} catch (Exception e) {
// the next invoke should rethrow the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
// test succeeded
return;
}
Assert.fail();
}
示例7
/**
* Tests that partitions list is determinate and correctly provided to custom partitioner.
*/
@SuppressWarnings("unchecked")
@Test
public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);
RuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
// out-of-order list of 4 partitions
List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner);
producer.setRuntimeContext(mockRuntimeContext);
final KafkaProducer mockProducer = producer.getMockKafkaProducer();
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
when(mockProducer.metrics()).thenReturn(null);
producer.open(new Configuration());
verify(mockPartitioner, times(1)).open(0, 1);
producer.invoke("foobar", SinkContextUtil.forTimestamp(0));
verify(mockPartitioner, times(1)).partition(
"foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
}
示例8
/**
* Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown.
*/
@Test
public void testAsyncErrorRethrownOnInvoke() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
// let the message request return an async exception
producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
try {
testHarness.processElement(new StreamRecord<>("msg-2"));
} catch (Exception e) {
// the next invoke should rethrow the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
// test succeeded
return;
}
Assert.fail();
}
示例9
/**
* Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown.
*/
@Test
public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
// let the message request return an async exception
producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
try {
testHarness.snapshot(123L, 123L);
} catch (Exception e) {
// the next invoke should rethrow the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
// test succeeded
return;
}
Assert.fail();
}
示例10
/**
* Tests that partitions list is determinate and correctly provided to custom partitioner.
*/
@SuppressWarnings("unchecked")
@Test
public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);
RuntimeContext mockRuntimeContext = mock(StreamingRuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
// out-of-order list of 4 partitions
List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner);
producer.setRuntimeContext(mockRuntimeContext);
final KafkaProducer mockProducer = producer.getMockKafkaProducer();
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
when(mockProducer.metrics()).thenReturn(null);
producer.open(new Configuration());
verify(mockPartitioner, times(1)).open(0, 1);
producer.invoke("foobar", SinkContextUtil.forTimestamp(0));
verify(mockPartitioner, times(1)).partition(
"foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
}
示例11
/**
* Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown.
*/
@Test
public void testAsyncErrorRethrownOnInvoke() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
// let the message request return an async exception
producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
try {
testHarness.processElement(new StreamRecord<>("msg-2"));
} catch (Exception e) {
// the next invoke should rethrow the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
// test succeeded
return;
}
Assert.fail();
}
示例12
/**
* Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown.
*/
@Test
public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
// let the message request return an async exception
producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception"));
try {
testHarness.snapshot(123L, 123L);
} catch (Exception e) {
// the next invoke should rethrow the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception"));
// test succeeded
return;
}
Assert.fail();
}
示例13
/**
* Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
* it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
*
* <p>Note that this test does not test the snapshot method is blocked correctly when there are pending records.
* The test for that is covered in testAtLeastOnceProducer.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
// only let the first callback succeed for now
producer.getPendingCallbacks().get(0).onCompletion(null, null);
CheckedThread snapshotThread = new CheckedThread() {
@Override
public void go() throws Exception {
// this should block at first, since there are still two pending records that needs to be flushed
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// let the 2nd message fail with an async exception
producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
producer.getPendingCallbacks().get(2).onCompletion(null, null);
try {
snapshotThread.sync();
} catch (Exception e) {
// the snapshot should have failed with the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure for 2nd message"));
// test succeeded
return;
}
Assert.fail();
}
示例14
/**
* Test ensuring that the producer is not dropping buffered records;
* we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
Assert.assertEquals(3, producer.getPendingSize());
// start a thread to perform checkpointing
CheckedThread snapshotThread = new CheckedThread() {
@Override
public void go() throws Exception {
// this should block until all records are flushed;
// if the snapshot implementation returns before pending records are flushed,
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// before proceeding, make sure that flushing has started and that the snapshot is still blocked;
// this would block forever if the snapshot didn't perform a flush
producer.waitUntilFlushStarted();
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
// now, complete the callbacks
producer.getPendingCallbacks().get(0).onCompletion(null, null);
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
Assert.assertEquals(2, producer.getPendingSize());
producer.getPendingCallbacks().get(1).onCompletion(null, null);
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
Assert.assertEquals(1, producer.getPendingSize());
producer.getPendingCallbacks().get(2).onCompletion(null, null);
Assert.assertEquals(0, producer.getPendingSize());
// this would fail with an exception if flushing wasn't completed before the snapshot method returned
snapshotThread.sync();
testHarness.close();
}
示例15
/**
* Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
* it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
*
* <p>Note that this test does not test the snapshot method is blocked correctly when there are pending records.
* The test for that is covered in testAtLeastOnceProducer.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
// only let the first callback succeed for now
producer.getPendingCallbacks().get(0).onCompletion(null, null);
CheckedThread snapshotThread = new CheckedThread() {
@Override
public void go() throws Exception {
// this should block at first, since there are still two pending records that needs to be flushed
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// let the 2nd message fail with an async exception
producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
producer.getPendingCallbacks().get(2).onCompletion(null, null);
try {
snapshotThread.sync();
} catch (Exception e) {
// the snapshot should have failed with the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure for 2nd message"));
// test succeeded
return;
}
Assert.fail();
}
示例16
/**
* Test ensuring that the producer is not dropping buffered records;
* we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
Assert.assertEquals(3, producer.getPendingSize());
// start a thread to perform checkpointing
CheckedThread snapshotThread = new CheckedThread() {
@Override
public void go() throws Exception {
// this should block until all records are flushed;
// if the snapshot implementation returns before pending records are flushed,
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// before proceeding, make sure that flushing has started and that the snapshot is still blocked;
// this would block forever if the snapshot didn't perform a flush
producer.waitUntilFlushStarted();
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
// now, complete the callbacks
producer.getPendingCallbacks().get(0).onCompletion(null, null);
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
Assert.assertEquals(2, producer.getPendingSize());
producer.getPendingCallbacks().get(1).onCompletion(null, null);
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
Assert.assertEquals(1, producer.getPendingSize());
producer.getPendingCallbacks().get(2).onCompletion(null, null);
Assert.assertEquals(0, producer.getPendingSize());
// this would fail with an exception if flushing wasn't completed before the snapshot method returned
snapshotThread.sync();
testHarness.close();
}
示例17
/**
* Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint,
* it should be rethrown; we set a timeout because the test will not finish if the logic is broken.
*
* <p>Note that this test does not test the snapshot method is blocked correctly when there are pending records.
* The test for that is covered in testAtLeastOnceProducer.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
// only let the first callback succeed for now
producer.getPendingCallbacks().get(0).onCompletion(null, null);
CheckedThread snapshotThread = new CheckedThread() {
@Override
public void go() throws Exception {
// this should block at first, since there are still two pending records that needs to be flushed
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// let the 2nd message fail with an async exception
producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message"));
producer.getPendingCallbacks().get(2).onCompletion(null, null);
try {
snapshotThread.sync();
} catch (Exception e) {
// the snapshot should have failed with the async exception
Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure for 2nd message"));
// test succeeded
return;
}
Assert.fail();
}
示例18
/**
* Test ensuring that the producer is not dropping buffered records;
* we set a timeout because the test will not finish if the logic is broken.
*/
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testAtLeastOnceProducer() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
final OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
testHarness.open();
testHarness.processElement(new StreamRecord<>("msg-1"));
testHarness.processElement(new StreamRecord<>("msg-2"));
testHarness.processElement(new StreamRecord<>("msg-3"));
verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class));
Assert.assertEquals(3, producer.getPendingSize());
// start a thread to perform checkpointing
CheckedThread snapshotThread = new CheckedThread() {
@Override
public void go() throws Exception {
// this should block until all records are flushed;
// if the snapshot implementation returns before pending records are flushed,
testHarness.snapshot(123L, 123L);
}
};
snapshotThread.start();
// before proceeding, make sure that flushing has started and that the snapshot is still blocked;
// this would block forever if the snapshot didn't perform a flush
producer.waitUntilFlushStarted();
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
// now, complete the callbacks
producer.getPendingCallbacks().get(0).onCompletion(null, null);
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
Assert.assertEquals(2, producer.getPendingSize());
producer.getPendingCallbacks().get(1).onCompletion(null, null);
Assert.assertTrue("Snapshot returned before all records were flushed", snapshotThread.isAlive());
Assert.assertEquals(1, producer.getPendingSize());
producer.getPendingCallbacks().get(2).onCompletion(null, null);
Assert.assertEquals(0, producer.getPendingSize());
// this would fail with an exception if flushing wasn't completed before the snapshot method returned
snapshotThread.sync();
testHarness.close();
}