Java源码示例:org.apache.kafka.streams.test.OutputVerifier

示例1
@Test
public void testMetricDataToMappedMetricData() {
    initLogAndFail();
    when(mapper.getDetectorsFromCache(any(MetricDefinition.class)))
            .thenReturn(Collections.singletonList(detector));

    logAndFailDriver.pipeInput(metricDataFactory.create(INPUT_TOPIC, KAFKA_KEY, metricData));

    // The streams app remaps the key to the detector UUID. [WLW]
    val outputRecord = logAndFailDriver.readOutput(DEFAULT_OUTPUT_TOPIC, stringDeser, mmdDeser);
    log.trace("outputRecord={}", outputRecord);
    val outputKafkaKey = mappedMetricData.getDetectorUuid().toString();
    OutputVerifier.compareKeyValue(outputRecord, outputKafkaKey, mappedMetricData);
    assertEquals(DEFAULT_OUTPUT_TOPIC, outputRecord.topic());
    logAndFailDriver.close();
}
 
示例2
@Test
public void shouldOutputResultWhenReceivedMoreEntriesThanOptimumBatchSize() {
    initLogAndContinue();

    when(mapper.getDetectorsFromCache(any(MetricDefinition.class)))
            .thenReturn(Collections.singletonList(detector));
    when(mapper.optimalBatchSize()).thenReturn(2);

    logAndContinueDriver.pipeInput(metricDataFactory.create(INPUT_TOPIC, "key-1", TestObjectMother.metricData()));
    logAndContinueDriver.pipeInput(metricDataFactory.create(INPUT_TOPIC, "key-1", TestObjectMother.metricData()));
    kvStore = logAndContinueDriver.getKeyValueStore(STATE_STORE_NAME);
    val outputRecord = logAndContinueDriver.readOutput(DEFAULT_OUTPUT_TOPIC, stringDeser, mmdDeser);

    OutputVerifier.compareKeyValue(outputRecord, detector.getUuid().toString(), mappedMetricData);
    assertEquals(kvStore.approximateNumEntries(), 0);

    logAndContinueDriver.close();
}
 
示例3
@SuppressWarnings("unchecked")
void verifyOutput(final TopologyTestDriver testDriver) {
  for (final Record expectedOutput : expectedOutputs) {
    try {
      OutputVerifier.compareKeyValueTimestamp(testDriver.readOutput(expectedOutput.topic,
          expectedOutput.keyDeserializer(),
          Serdes.String().deserializer()),
          expectedOutput.key(),
          expectedOutput.value,
          expectedOutput.timestamp);
    } catch (AssertionError assertionError) {
      throw new AssertionError("Query name: "
          + name
          + " in file: " + testPath
          + " failed due to: "
          + assertionError.getMessage());
    }
  }
  // check for no more records
}
 
示例4
@Test
public void testMetricDataToExternalDetectorMappedMetricData() {
    initLogAndFail();
    when(mapper.getDetectorsFromCache(any(MetricDefinition.class)))
            .thenReturn(Collections.singletonList(externalDetector));

    logAndFailDriver.pipeInput(metricDataFactory.create(INPUT_TOPIC, KAFKA_KEY, metricData));
    String externalDetectorKafkaTopic = DEFAULT_OUTPUT_TOPIC + "-" + EXTERNAL_DETECTOR_CONSUMER_ID;
    val outputRecord = logAndFailDriver.readOutput(externalDetectorKafkaTopic, stringDeser, mmdDeser);
    val outputKafkaKey = externalDetectorMappedMetricData.getDetectorUuid().toString();
    log.info("outputRecord={}", outputRecord);
    OutputVerifier.compareKeyValue(outputRecord, outputKafkaKey, externalDetectorMappedMetricData);
    assertEquals(externalDetectorKafkaTopic, outputRecord.topic());
    logAndFailDriver.close();
}
 
示例5
@Test
public void testTransform() {
    logAndFailDriver.pipeInput(mappedMetricDataFactory.create(INBOUND_TOPIC, KAFKA_KEY, mappedMetricData));

    val outputRecord = logAndFailDriver.readOutput(OUTBOUND_TOPIC, stringDeserializer, alertDeserializer);
    val expectedKey = mappedMetricData.getDetectorUuid().toString();
    log.trace("Output Record={}", outputRecord.toString());

    OutputVerifier.compareKeyValue(outputRecord, expectedKey, alert);
}
 
示例6
@Test
public void testTopologyResolvingOneRef() {
    final String testTableName = "tableA";
    final String testId = "id1";
    final Map<String, Object> testContents = new HashMap<String, Object>() {{
        put("~table", testTableName);
        put("~id", testId);
        put("~version", 0);
        put("~signature", "abc123");
        put("~deleted", false);
        put("~firstUpdateAt", ISO8601Utils.format(new Date()));
        put("~lastUpdateAt", ISO8601Utils.format(new Date()));
        put("~lastMutateAt", ISO8601Utils.format(new Date()));
    }};

    MegabusRef megabusRef = new MegabusRef(testTableName, testId, TimeUUIDs.newUUID(), Instant.now(), MegabusRef.RefType.NORMAL);

    ((TestDataProvider) dataProvider).addTable(testTableName, new InMemoryTable(testTableName, new TableOptionsBuilder().setPlacement("app_global").build(), new HashMap<>()));
    ((TestDataProvider) dataProvider).add(testContents);

    List<MegabusRef> megabusRefs = Collections.singletonList(megabusRef);

    // push the megabusref to the input topic
    testDriver.pipeInput(recordFactory.create(refTopic.getName(), "eventId1", megabusRefs));

    // read the result
    final ProducerRecord<String, Map<String, Object>> output = testDriver.readOutput(resolvedTopic.getName(), stringDeserializer, jsonPOJOSerdeForMaps.deserializer());

    // ensure ref was resolved successfully and had the correct value and was written to the correct topic
    OutputVerifier.compareKeyValue(output,  String.format("%s/%s", testTableName, testId), testContents);

    // ensure no more records left unread
    assertNull(testDriver.readOutput(resolvedTopic.getName(), stringDeserializer, jsonPOJOSerdeForMaps.deserializer()));
}
 
示例7
@Test
public void testTopologyMergesRetries() {
    final String testTableName = "tableA";
    final String testId = "id1";
    final Map<String, Object> testContents = new HashMap<String, Object>() {{
        put("~table", testTableName);
        put("~id", testId);
        put("~version", 0);
        put("~signature", "abc123");
        put("~deleted", false);
        put("~firstUpdateAt", ISO8601Utils.format(new Date()));
        put("~lastUpdateAt", ISO8601Utils.format(new Date()));
        put("~lastMutateAt", ISO8601Utils.format(new Date()));
    }};

    MegabusRef megabusRef = new MegabusRef(testTableName, testId, TimeUUIDs.newUUID(), Instant.now(), MegabusRef.RefType.NORMAL);

    ((TestDataProvider) dataProvider).addTable(testTableName, new InMemoryTable(testTableName, new TableOptionsBuilder().setPlacement("app_global").build(), new HashMap<>()));
    ((TestDataProvider) dataProvider).add(testContents);

    List<MegabusRef> megabusRefs = Collections.singletonList(megabusRef);

    // push the megabusref to the input topic
    testDriver.pipeInput(recordFactory.create(retryTopic.getName(), "eventId1", megabusRefs));

    // read the result
    final ProducerRecord<String, Map<String, Object>> output = testDriver.readOutput(resolvedTopic.getName(), stringDeserializer, jsonPOJOSerdeForMaps.deserializer());

    // ensure ref was resolved successfully and had the correct value and was written to the correct topic
    OutputVerifier.compareKeyValue(output,  String.format("%s/%s", testTableName, testId), testContents);

    // ensure no more records left unread
    assertNull(testDriver.readOutput(resolvedTopic.getName(), stringDeserializer, jsonPOJOSerdeForMaps.deserializer()));
}
 
示例8
@Test void should_aggregateSpans_and_mapDependencies() {
  // Given: configuration
  Duration traceTimeout = Duration.ofSeconds(1);
  SpansSerde spansSerde = new SpansSerde();
  DependencyLinkSerde dependencyLinkSerde = new DependencyLinkSerde();
  // When: topology built
  Topology topology = new SpanAggregationTopology(
      spansTopic,
      traceTopic,
      dependencyTopic,
      traceTimeout,
      true).get();
  TopologyDescription description = topology.describe();
  // Then: single threaded topology
  assertThat(description.subtopologies()).hasSize(1);
  // Given: test driver
  TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
  // When: two related spans coming on the same Session window
  ConsumerRecordFactory<String, List<Span>> factory =
      new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer());
  Span a = Span.newBuilder().traceId("a").id("a").name("op_a").kind(Span.Kind.CLIENT)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
      .build();
  Span b = Span.newBuilder().traceId("a").id("b").name("op_b").kind(Span.Kind.SERVER)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
      .build();
  testDriver.pipeInput(
      factory.create(spansTopic, a.traceId(), Collections.singletonList(a), 0L));
  testDriver.pipeInput(
      factory.create(spansTopic, b.traceId(), Collections.singletonList(b), 0L));
  // When: and new record arrive, moving the event clock further than inactivity gap
  Span c = Span.newBuilder().traceId("c").id("c").build();
  testDriver.pipeInput(factory.create(spansTopic, c.traceId(), Collections.singletonList(c),
      traceTimeout.toMillis() + 1));
  // Then: a trace is aggregated.1
  ProducerRecord<String, List<Span>> trace =
      testDriver.readOutput(traceTopic, new StringDeserializer(), spansSerde.deserializer());
  assertThat(trace).isNotNull();
  OutputVerifier.compareKeyValue(trace, a.traceId(), Arrays.asList(a, b));
  // Then: a dependency link is created
  ProducerRecord<String, DependencyLink> linkRecord =
      testDriver.readOutput(dependencyTopic, new StringDeserializer(),
          dependencyLinkSerde.deserializer());
  assertThat(linkRecord).isNotNull();
  DependencyLink link = DependencyLink.newBuilder()
      .parent("svc_a").child("svc_b").callCount(1).errorCount(0)
      .build();
  OutputVerifier.compareKeyValue(linkRecord, "svc_a:svc_b", link);
  //Finally close resources
  testDriver.close();
  spansSerde.close();
  dependencyLinkSerde.close();
}