Java源码示例:org.apache.kafka.streams.processor.WallclockTimestampExtractor
示例1
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-interactive-stock-analysis-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-interactive-stock-analysis-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-interactive-stock-analysis-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.topicPrefix("retention.bytes"), 1024 * 1024);
props.put(StreamsConfig.topicPrefix("retention.ms"), 3600000);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializerErrorHandler.class);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(bbejeck.chapter_7.interceptors.StockTransactionConsumerInterceptor.class));
return props;
}
示例2
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "KTable-aggregations");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KTable-aggregations-id");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "KTable-aggregations-client");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "30000");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例3
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "KStreamVSKTable_app");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KStreamVSKTable_group");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "KStreamVSKTable_client");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "30000");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "15000");
//props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,"0");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "10000");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StreamsSerdes.StockTickerSerde().getClass().getName());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例4
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "regex_id");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "regex_group_id");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "regex_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "15000");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例5
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kafkaStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simpleKafkaStream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new StreamsConfig(props);
}
示例6
public static Properties createKStreamProperties(String nameProcess, String bootstrapServers) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-process" + nameProcess);
props.put(StreamsConfig.CLIENT_ID_CONFIG, nameProcess);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
return props;
}
示例7
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-connect-stock-analysis-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-connect-stock-analysis-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-connect-stock-analysis-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializerErrorHandler.class);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(bbejeck.chapter_7.interceptors.StockTransactionConsumerInterceptor.class));
return props;
}
示例8
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "FirstZmart-Kafka-Streams-Client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-purchases");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例9
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Kafka-Streams-Job");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-purchases");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing-streams-api");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例10
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-papi-stock-analysis-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-papi-stock-analysis-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-stock-analysis-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例11
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "zmart-processor-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-processor-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "zmart-processor-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例12
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "beer-app-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "beer-app-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "beer-app-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例13
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-papi-stock-analysis-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-papi-stock-analysis-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-stock-analysis-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例14
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "cogrouping-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cogrouping-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cogrouping-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例15
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "stock-analysis-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "stock-analysis-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stock-analysis-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例16
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "cogrouping-restoring-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cogrouping-restoring-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cogrouping-restoring-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例17
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "ks-stats-stock-analysis-client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ks-stats-stock-analysis-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ks-stats-stock-analysis-appid");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), Collections.singletonList(bbejeck.chapter_7.interceptors.StockTransactionConsumerInterceptor.class));
return props;
}
示例18
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "AddingStateConsumer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AddingStateGroupId");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "AddingStateAppId");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例19
@BeforeEach
public void setUp() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "FirstZmart-Kafka-Streams-Client");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zmart-purchases");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "FirstZmart-Kafka-Streams-App");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
StreamsConfig streamsConfig = new StreamsConfig(props);
Topology topology = ZMartTopology.build();
topologyTestDriver = new ProcessorTopologyTestDriver(streamsConfig, topology);
}
示例20
public static void main(final String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamingTable");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());
//Uncomment to Enable record cache of size 10 MB.
//props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
//Uncomment to Set commit interval to 1 second.
//props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
StreamsBuilder streamBuilder = new StreamsBuilder();
KTable<String, String> KT0 = streamBuilder.table("stock-tick");
/*
//Uncomment this block and comment next line to suppress
KTable<String, String> KT1 = KT0.filter((key, value) -> key.contains("HDFCBANK"))
.suppress(Suppressed.untilTimeLimit(
Duration.ofMinutes(5),
Suppressed.BufferConfig.maxBytes(1000000L).emitEarlyWhenFull())
);
*/
KTable<String, String> KT1 = KT0.filter((key, value) -> key.contains("HDFCBANK"));
KStream<String, String> KS2 = KT1.toStream();
KS2.peek((k, v) -> System.out.println("Key = " + k + " Value = " + v));
KafkaStreams streams = new KafkaStreams(streamBuilder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
示例21
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
props.put("group.id", "test-consumer-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing-processor-api");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例22
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Sample-Stateful-Processor");
props.put("group.id", "test-consumer-group");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateful_processor_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例23
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Example-Kafka-Streams-Job");
props.put("group.id", "streams-purchases");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing-streams-api");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例24
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Twitter-Streams-Analysis");
props.put("group.id", "twitter-streams");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "twitter-streams-id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例25
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Stocks-Streams-Processor");
props.put("group.id", "stock-streams");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stocks_streams_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
return props;
}
示例26
@Test
public void testKsDsl2() {
final String storeName = "stateStore";
final String globalStoreName = "glob-stateStore";
final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
Serdes.String(),
Serdes.String());
final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(globalStoreName),
Serdes.String(),
Serdes.String());
builder.addGlobalStore(globalStoreBuilder, "some-global-topic", Consumed.with(Serdes.Short(), Serdes.String(), new WallclockTimestampExtractor(), Topology.AutoOffsetReset.EARLIEST), FakeProcessor::new);
builder.addStateStore(storeBuilder);
builder.<String, String>stream("input")
.filter((k, v) -> v.endsWith("FOO"))
.through("some-through-topic")
.transformValues(() -> new SimpleValueTransformer(storeName), storeName)
.to("output");
final Topology topology = builder.build();
final String text = topology.describe().toString();
System.out.println(text);
final KStreamsTopologyDescriptionParser parsed = new KStreamsTopologyDescriptionParser(text);
assertEquals(8, parsed.size());
}
示例27
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}