Java源码示例:org.apache.flink.streaming.kafka.test.base.KafkaEventSchema
示例1
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer011<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.11 Example");
}
示例2
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.10 Example");
}
示例3
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer<>(
parameterTool.getRequired("output-topic"),
new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
parameterTool.getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
env.execute("Modern Kafka Example");
}
示例4
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.10 Example");
}
示例5
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer<>(
parameterTool.getRequired("output-topic"),
new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
parameterTool.getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
env.execute("Modern Kafka Example");
}
示例6
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer011<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.11 Example");
}
示例7
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer010<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.10 Example");
}
示例8
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer<>(
parameterTool.getRequired("output-topic"),
new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
parameterTool.getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
env.execute("Modern Kafka Example");
}
示例9
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"),
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer011<>(
parameterTool.getRequired("output-topic"),
new KafkaEventSchema(),
parameterTool.getProperties()));
env.execute("Kafka 0.11 Example");
}
示例10
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
String inputStream = parameterTool.getRequired("input-stream");
String outputStream = parameterTool.getRequired("output-stream");
FlinkKinesisConsumer<KafkaEvent> consumer = new FlinkKinesisConsumer<>(
inputStream,
new KafkaEventSchema(),
parameterTool.getProperties());
consumer.setPeriodicWatermarkAssigner(new CustomWatermarkExtractor());
Properties producerProperties = new Properties(parameterTool.getProperties());
// producer needs region even when URL is specified
producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
// test driver does not deaggregate
producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false));
// KPL does not recognize endpoint URL..
String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
if (kinesisUrl != null) {
URL url = new URL(kinesisUrl);
producerProperties.put("KinesisEndpoint", url.getHost());
producerProperties.put("KinesisPort", Integer.toString(url.getPort()));
producerProperties.put("VerifyCertificate", "false");
}
FlinkKinesisProducer<KafkaEvent> producer = new FlinkKinesisProducer<>(
new KafkaEventSchema(),
producerProperties);
producer.setDefaultStream(outputStream);
producer.setDefaultPartition("fakePartition");
DataStream<KafkaEvent> input = env
.addSource(consumer)
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(producer);
env.execute();
}
示例11
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
String inputStream = parameterTool.getRequired("input-stream");
String outputStream = parameterTool.getRequired("output-stream");
FlinkKinesisConsumer<KafkaEvent> consumer = new FlinkKinesisConsumer<>(
inputStream,
new KafkaEventSchema(),
parameterTool.getProperties());
consumer.setPeriodicWatermarkAssigner(new CustomWatermarkExtractor());
Properties producerProperties = new Properties(parameterTool.getProperties());
// producer needs region even when URL is specified
producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
// test driver does not deaggregate
producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false));
// KPL does not recognize endpoint URL..
String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
if (kinesisUrl != null) {
URL url = new URL(kinesisUrl);
producerProperties.put("KinesisEndpoint", url.getHost());
producerProperties.put("KinesisPort", Integer.toString(url.getPort()));
producerProperties.put("VerifyCertificate", "false");
}
FlinkKinesisProducer<KafkaEvent> producer = new FlinkKinesisProducer<>(
new KafkaEventSchema(),
producerProperties);
producer.setDefaultStream(outputStream);
producer.setDefaultPartition("fakePartition");
DataStream<KafkaEvent> input = env
.addSource(consumer)
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(producer);
env.execute();
}
示例12
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
String inputStream = parameterTool.getRequired("input-stream");
String outputStream = parameterTool.getRequired("output-stream");
FlinkKinesisConsumer<KafkaEvent> consumer = new FlinkKinesisConsumer<>(
inputStream,
new KafkaEventSchema(),
parameterTool.getProperties());
consumer.setPeriodicWatermarkAssigner(new CustomWatermarkExtractor());
Properties producerProperties = new Properties(parameterTool.getProperties());
// producer needs region even when URL is specified
producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
// test driver does not deaggregate
producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false));
// KPL does not recognize endpoint URL..
String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
if (kinesisUrl != null) {
URL url = new URL(kinesisUrl);
producerProperties.put("KinesisEndpoint", url.getHost());
producerProperties.put("KinesisPort", Integer.toString(url.getPort()));
producerProperties.put("VerifyCertificate", "false");
}
FlinkKinesisProducer<KafkaEvent> producer = new FlinkKinesisProducer<>(
new KafkaEventSchema(),
producerProperties);
producer.setDefaultStream(outputStream);
producer.setDefaultPartition("fakePartition");
DataStream<KafkaEvent> input = env
.addSource(consumer)
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(producer);
env.execute();
}