Java源码示例:org.apache.kafka.streams.test.OutputVerifier
示例1
@Test
public void testMetricDataToMappedMetricData() {
initLogAndFail();
when(mapper.getDetectorsFromCache(any(MetricDefinition.class)))
.thenReturn(Collections.singletonList(detector));
logAndFailDriver.pipeInput(metricDataFactory.create(INPUT_TOPIC, KAFKA_KEY, metricData));
// The streams app remaps the key to the detector UUID. [WLW]
val outputRecord = logAndFailDriver.readOutput(DEFAULT_OUTPUT_TOPIC, stringDeser, mmdDeser);
log.trace("outputRecord={}", outputRecord);
val outputKafkaKey = mappedMetricData.getDetectorUuid().toString();
OutputVerifier.compareKeyValue(outputRecord, outputKafkaKey, mappedMetricData);
assertEquals(DEFAULT_OUTPUT_TOPIC, outputRecord.topic());
logAndFailDriver.close();
}
示例2
@Test
public void shouldOutputResultWhenReceivedMoreEntriesThanOptimumBatchSize() {
initLogAndContinue();
when(mapper.getDetectorsFromCache(any(MetricDefinition.class)))
.thenReturn(Collections.singletonList(detector));
when(mapper.optimalBatchSize()).thenReturn(2);
logAndContinueDriver.pipeInput(metricDataFactory.create(INPUT_TOPIC, "key-1", TestObjectMother.metricData()));
logAndContinueDriver.pipeInput(metricDataFactory.create(INPUT_TOPIC, "key-1", TestObjectMother.metricData()));
kvStore = logAndContinueDriver.getKeyValueStore(STATE_STORE_NAME);
val outputRecord = logAndContinueDriver.readOutput(DEFAULT_OUTPUT_TOPIC, stringDeser, mmdDeser);
OutputVerifier.compareKeyValue(outputRecord, detector.getUuid().toString(), mappedMetricData);
assertEquals(kvStore.approximateNumEntries(), 0);
logAndContinueDriver.close();
}
示例3
@SuppressWarnings("unchecked")
void verifyOutput(final TopologyTestDriver testDriver) {
for (final Record expectedOutput : expectedOutputs) {
try {
OutputVerifier.compareKeyValueTimestamp(testDriver.readOutput(expectedOutput.topic,
expectedOutput.keyDeserializer(),
Serdes.String().deserializer()),
expectedOutput.key(),
expectedOutput.value,
expectedOutput.timestamp);
} catch (AssertionError assertionError) {
throw new AssertionError("Query name: "
+ name
+ " in file: " + testPath
+ " failed due to: "
+ assertionError.getMessage());
}
}
// check for no more records
}
示例4
@Test
public void testMetricDataToExternalDetectorMappedMetricData() {
initLogAndFail();
when(mapper.getDetectorsFromCache(any(MetricDefinition.class)))
.thenReturn(Collections.singletonList(externalDetector));
logAndFailDriver.pipeInput(metricDataFactory.create(INPUT_TOPIC, KAFKA_KEY, metricData));
String externalDetectorKafkaTopic = DEFAULT_OUTPUT_TOPIC + "-" + EXTERNAL_DETECTOR_CONSUMER_ID;
val outputRecord = logAndFailDriver.readOutput(externalDetectorKafkaTopic, stringDeser, mmdDeser);
val outputKafkaKey = externalDetectorMappedMetricData.getDetectorUuid().toString();
log.info("outputRecord={}", outputRecord);
OutputVerifier.compareKeyValue(outputRecord, outputKafkaKey, externalDetectorMappedMetricData);
assertEquals(externalDetectorKafkaTopic, outputRecord.topic());
logAndFailDriver.close();
}
示例5
@Test
public void testTransform() {
logAndFailDriver.pipeInput(mappedMetricDataFactory.create(INBOUND_TOPIC, KAFKA_KEY, mappedMetricData));
val outputRecord = logAndFailDriver.readOutput(OUTBOUND_TOPIC, stringDeserializer, alertDeserializer);
val expectedKey = mappedMetricData.getDetectorUuid().toString();
log.trace("Output Record={}", outputRecord.toString());
OutputVerifier.compareKeyValue(outputRecord, expectedKey, alert);
}
示例6
@Test
public void testTopologyResolvingOneRef() {
final String testTableName = "tableA";
final String testId = "id1";
final Map<String, Object> testContents = new HashMap<String, Object>() {{
put("~table", testTableName);
put("~id", testId);
put("~version", 0);
put("~signature", "abc123");
put("~deleted", false);
put("~firstUpdateAt", ISO8601Utils.format(new Date()));
put("~lastUpdateAt", ISO8601Utils.format(new Date()));
put("~lastMutateAt", ISO8601Utils.format(new Date()));
}};
MegabusRef megabusRef = new MegabusRef(testTableName, testId, TimeUUIDs.newUUID(), Instant.now(), MegabusRef.RefType.NORMAL);
((TestDataProvider) dataProvider).addTable(testTableName, new InMemoryTable(testTableName, new TableOptionsBuilder().setPlacement("app_global").build(), new HashMap<>()));
((TestDataProvider) dataProvider).add(testContents);
List<MegabusRef> megabusRefs = Collections.singletonList(megabusRef);
// push the megabusref to the input topic
testDriver.pipeInput(recordFactory.create(refTopic.getName(), "eventId1", megabusRefs));
// read the result
final ProducerRecord<String, Map<String, Object>> output = testDriver.readOutput(resolvedTopic.getName(), stringDeserializer, jsonPOJOSerdeForMaps.deserializer());
// ensure ref was resolved successfully and had the correct value and was written to the correct topic
OutputVerifier.compareKeyValue(output, String.format("%s/%s", testTableName, testId), testContents);
// ensure no more records left unread
assertNull(testDriver.readOutput(resolvedTopic.getName(), stringDeserializer, jsonPOJOSerdeForMaps.deserializer()));
}
示例7
@Test
public void testTopologyMergesRetries() {
final String testTableName = "tableA";
final String testId = "id1";
final Map<String, Object> testContents = new HashMap<String, Object>() {{
put("~table", testTableName);
put("~id", testId);
put("~version", 0);
put("~signature", "abc123");
put("~deleted", false);
put("~firstUpdateAt", ISO8601Utils.format(new Date()));
put("~lastUpdateAt", ISO8601Utils.format(new Date()));
put("~lastMutateAt", ISO8601Utils.format(new Date()));
}};
MegabusRef megabusRef = new MegabusRef(testTableName, testId, TimeUUIDs.newUUID(), Instant.now(), MegabusRef.RefType.NORMAL);
((TestDataProvider) dataProvider).addTable(testTableName, new InMemoryTable(testTableName, new TableOptionsBuilder().setPlacement("app_global").build(), new HashMap<>()));
((TestDataProvider) dataProvider).add(testContents);
List<MegabusRef> megabusRefs = Collections.singletonList(megabusRef);
// push the megabusref to the input topic
testDriver.pipeInput(recordFactory.create(retryTopic.getName(), "eventId1", megabusRefs));
// read the result
final ProducerRecord<String, Map<String, Object>> output = testDriver.readOutput(resolvedTopic.getName(), stringDeserializer, jsonPOJOSerdeForMaps.deserializer());
// ensure ref was resolved successfully and had the correct value and was written to the correct topic
OutputVerifier.compareKeyValue(output, String.format("%s/%s", testTableName, testId), testContents);
// ensure no more records left unread
assertNull(testDriver.readOutput(resolvedTopic.getName(), stringDeserializer, jsonPOJOSerdeForMaps.deserializer()));
}
示例8
@Test void should_aggregateSpans_and_mapDependencies() {
// Given: configuration
Duration traceTimeout = Duration.ofSeconds(1);
SpansSerde spansSerde = new SpansSerde();
DependencyLinkSerde dependencyLinkSerde = new DependencyLinkSerde();
// When: topology built
Topology topology = new SpanAggregationTopology(
spansTopic,
traceTopic,
dependencyTopic,
traceTimeout,
true).get();
TopologyDescription description = topology.describe();
// Then: single threaded topology
assertThat(description.subtopologies()).hasSize(1);
// Given: test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
// When: two related spans coming on the same Session window
ConsumerRecordFactory<String, List<Span>> factory =
new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer());
Span a = Span.newBuilder().traceId("a").id("a").name("op_a").kind(Span.Kind.CLIENT)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
.build();
Span b = Span.newBuilder().traceId("a").id("b").name("op_b").kind(Span.Kind.SERVER)
.localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
.build();
testDriver.pipeInput(
factory.create(spansTopic, a.traceId(), Collections.singletonList(a), 0L));
testDriver.pipeInput(
factory.create(spansTopic, b.traceId(), Collections.singletonList(b), 0L));
// When: and new record arrive, moving the event clock further than inactivity gap
Span c = Span.newBuilder().traceId("c").id("c").build();
testDriver.pipeInput(factory.create(spansTopic, c.traceId(), Collections.singletonList(c),
traceTimeout.toMillis() + 1));
// Then: a trace is aggregated.1
ProducerRecord<String, List<Span>> trace =
testDriver.readOutput(traceTopic, new StringDeserializer(), spansSerde.deserializer());
assertThat(trace).isNotNull();
OutputVerifier.compareKeyValue(trace, a.traceId(), Arrays.asList(a, b));
// Then: a dependency link is created
ProducerRecord<String, DependencyLink> linkRecord =
testDriver.readOutput(dependencyTopic, new StringDeserializer(),
dependencyLinkSerde.deserializer());
assertThat(linkRecord).isNotNull();
DependencyLink link = DependencyLink.newBuilder()
.parent("svc_a").child("svc_b").callCount(1).errorCount(0)
.build();
OutputVerifier.compareKeyValue(linkRecord, "svc_a:svc_b", link);
//Finally close resources
testDriver.close();
spansSerde.close();
dependencyLinkSerde.close();
}