Java源码示例:org.apache.kafka.streams.processor.WallclockTimestampExtractor

示例1
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-interactive-stock-analysis-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-interactive-stock-analysis-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-interactive-stock-analysis-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(StreamsConfig.topicPrefix("retention.bytes"), 1024 * 1024);
    props.put(StreamsConfig.topicPrefix("retention.ms"), 3600000);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializerErrorHandler.class);
    props.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(bbejeck.chapter_7.interceptors.StockTransactionConsumerInterceptor.class));
    return props;
}
 
示例2
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-aggregations");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "KTable-aggregations-id");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "KTable-aggregations-client");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "30000");
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
    props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;

}
 
示例3
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "KStreamVSKTable_app");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "KStreamVSKTable_group");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "KStreamVSKTable_client");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "30000");
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "15000");
    //props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,"0");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
    props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StreamsSerdes.StockTickerSerde().getClass().getName());
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;

}
 
示例4
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "regex_id");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "regex_group_id");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "regex_id");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
    props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "15000");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例5
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kafkaStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simpleKafkaStream");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    return new StreamsConfig(props);
}
 
示例6
public static Properties createKStreamProperties(String nameProcess, String bootstrapServers) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-process" + nameProcess);
    props.put(StreamsConfig.CLIENT_ID_CONFIG, nameProcess);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
    return props;
}
 
示例7
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-connect-stock-analysis-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-connect-stock-analysis-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-connect-stock-analysis-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializerErrorHandler.class);
    props.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(bbejeck.chapter_7.interceptors.StockTransactionConsumerInterceptor.class));
    return props;
}
 
示例8
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "FirstZmart-Kafka-Streams-Client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-purchases");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-Kafka-Streams-App");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例9
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Kafka-Streams-Job");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-purchases");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing-streams-api");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例10
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-papi-stock-analysis-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-papi-stock-analysis-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-stock-analysis-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例11
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "zmart-processor-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-processor-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "zmart-processor-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例12
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "beer-app-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "beer-app-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "beer-app-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例13
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-papi-stock-analysis-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-papi-stock-analysis-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-stock-analysis-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例14
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "cogrouping-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "cogrouping-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cogrouping-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例15
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "stock-analysis-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-analysis-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stock-analysis-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例16
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "cogrouping-restoring-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "cogrouping-restoring-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cogrouping-restoring-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例17
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-stats-stock-analysis-client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-stats-stock-analysis-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-stats-stock-analysis-appid");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    props.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(bbejeck.chapter_7.interceptors.StockTransactionConsumerInterceptor.class));
    return props;
}
 
示例18
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "AddingStateConsumer");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "AddingStateGroupId");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "AddingStateAppId");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例19
@BeforeEach
public  void setUp() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "FirstZmart-Kafka-Streams-Client");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-purchases");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-Kafka-Streams-App");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

    StreamsConfig streamsConfig = new StreamsConfig(props);
    Topology topology = ZMartTopology.build();
    
    topologyTestDriver = new ProcessorTopologyTestDriver(streamsConfig, topology);
}
 
示例20
public static void main(final String[] args) {
    final Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamingTable");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
            Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
            Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
            WallclockTimestampExtractor.class.getName());

    //Uncomment to Enable record cache of size 10 MB.
    //props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
    //Uncomment to Set commit interval to 1 second.
    //props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

    StreamsBuilder streamBuilder = new StreamsBuilder();
    KTable<String, String> KT0 = streamBuilder.table("stock-tick");

    /*
    //Uncomment this block and comment next line to suppress
    KTable<String, String> KT1 = KT0.filter((key, value) -> key.contains("HDFCBANK"))
            .suppress(Suppressed.untilTimeLimit(
                    Duration.ofMinutes(5),
                    Suppressed.BufferConfig.maxBytes(1000000L).emitEarlyWhenFull())
            );
    */

    KTable<String, String> KT1 = KT0.filter((key, value) -> key.contains("HDFCBANK"));

    KStream<String, String> KS2 = KT1.toStream();
    KS2.peek((k, v) -> System.out.println("Key = " + k + " Value = " + v));

    KafkaStreams streams = new KafkaStreams(streamBuilder.build(), props);
    streams.start();
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}
 
示例21
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
    props.put("group.id", "test-consumer-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing-processor-api");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例22
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Sample-Stateful-Processor");
    props.put("group.id", "test-consumer-group");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateful_processor_id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例23
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Kafka-Streams-Job");
    props.put("group.id", "streams-purchases");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing-streams-api");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例24
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Twitter-Streams-Analysis");
    props.put("group.id", "twitter-streams");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "twitter-streams-id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例25
private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.CLIENT_ID_CONFIG, "Stocks-Streams-Processor");
    props.put("group.id", "stock-streams");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stocks_streams_id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    return props;
}
 
示例26
@Test
public void testKsDsl2() {

  final String storeName = "stateStore";
  final String globalStoreName = "glob-stateStore";
  final StreamsBuilder builder = new StreamsBuilder();
  final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(
      Stores.persistentKeyValueStore(storeName),
      Serdes.String(),
      Serdes.String());
  final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = Stores.keyValueStoreBuilder(
      Stores.persistentKeyValueStore(globalStoreName),
      Serdes.String(),
      Serdes.String());
  builder.addGlobalStore(globalStoreBuilder, "some-global-topic", Consumed.with(Serdes.Short(), Serdes.String(), new WallclockTimestampExtractor(), Topology.AutoOffsetReset.EARLIEST), FakeProcessor::new);
  builder.addStateStore(storeBuilder);
  builder.<String, String>stream("input")
      .filter((k, v) -> v.endsWith("FOO"))
      .through("some-through-topic")
      .transformValues(() -> new SimpleValueTransformer(storeName), storeName)
      .to("output");

  final Topology topology = builder.build();
  final String text = topology.describe().toString();
  System.out.println(text);

  final KStreamsTopologyDescriptionParser parsed = new KStreamsTopologyDescriptionParser(text);
  assertEquals(8, parsed.size());
}
 
示例27
@Bean
public TimestampExtractor timestampExtractor() {
	return new WallclockTimestampExtractor();
}