Java源码示例:org.apache.flink.streaming.kafka.test.base.KafkaEventSchema

示例1
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
			.addSource(
				new FlinkKafkaConsumer011<>(
					parameterTool.getRequired("input-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
			.keyBy("word")
			.map(new RollingAdditionMapper());

	input.addSink(
			new FlinkKafkaProducer011<>(
					parameterTool.getRequired("output-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties()));

	env.execute("Kafka 0.11 Example");
}
 
示例2
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
			.addSource(
				new FlinkKafkaConsumer010<>(
					parameterTool.getRequired("input-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
			.keyBy("word")
			.map(new RollingAdditionMapper());

	input.addSink(
			new FlinkKafkaProducer010<>(
					parameterTool.getRequired("output-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties()));

	env.execute("Kafka 0.10 Example");
}
 
示例3
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
		.addSource(
			new FlinkKafkaConsumer<>(
				parameterTool.getRequired("input-topic"),
				new KafkaEventSchema(),
				parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
		.keyBy("word")
		.map(new RollingAdditionMapper());

	input.addSink(
		new FlinkKafkaProducer<>(
			parameterTool.getRequired("output-topic"),
			new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
			parameterTool.getProperties(),
			FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

	env.execute("Modern Kafka Example");
}
 
示例4
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
			.addSource(
				new FlinkKafkaConsumer010<>(
					parameterTool.getRequired("input-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
			.keyBy("word")
			.map(new RollingAdditionMapper());

	input.addSink(
			new FlinkKafkaProducer010<>(
					parameterTool.getRequired("output-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties()));

	env.execute("Kafka 0.10 Example");
}
 
示例5
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
		.addSource(
			new FlinkKafkaConsumer<>(
				parameterTool.getRequired("input-topic"),
				new KafkaEventSchema(),
				parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
		.keyBy("word")
		.map(new RollingAdditionMapper());

	input.addSink(
		new FlinkKafkaProducer<>(
			parameterTool.getRequired("output-topic"),
			new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
			parameterTool.getProperties(),
			FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

	env.execute("Modern Kafka Example");
}
 
示例6
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
			.addSource(
				new FlinkKafkaConsumer011<>(
					parameterTool.getRequired("input-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
			.keyBy("word")
			.map(new RollingAdditionMapper());

	input.addSink(
			new FlinkKafkaProducer011<>(
					parameterTool.getRequired("output-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties()));

	env.execute("Kafka 0.11 Example");
}
 
示例7
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
			.addSource(
				new FlinkKafkaConsumer010<>(
					parameterTool.getRequired("input-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
			.keyBy("word")
			.map(new RollingAdditionMapper());

	input.addSink(
			new FlinkKafkaProducer010<>(
					parameterTool.getRequired("output-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties()));

	env.execute("Kafka 0.10 Example");
}
 
示例8
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
		.addSource(
			new FlinkKafkaConsumer<>(
				parameterTool.getRequired("input-topic"),
				new KafkaEventSchema(),
				parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
		.keyBy("word")
		.map(new RollingAdditionMapper());

	input.addSink(
		new FlinkKafkaProducer<>(
			parameterTool.getRequired("output-topic"),
			new KeyedSerializationSchemaWrapper<>(new KafkaEventSchema()),
			parameterTool.getProperties(),
			FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

	env.execute("Modern Kafka Example");
}
 
示例9
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	DataStream<KafkaEvent> input = env
			.addSource(
				new FlinkKafkaConsumer011<>(
					parameterTool.getRequired("input-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties())
				.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
			.keyBy("word")
			.map(new RollingAdditionMapper());

	input.addSink(
			new FlinkKafkaProducer011<>(
					parameterTool.getRequired("output-topic"),
					new KafkaEventSchema(),
					parameterTool.getProperties()));

	env.execute("Kafka 0.11 Example");
}
 
示例10
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	String inputStream = parameterTool.getRequired("input-stream");
	String outputStream = parameterTool.getRequired("output-stream");

	FlinkKinesisConsumer<KafkaEvent> consumer = new FlinkKinesisConsumer<>(
		inputStream,
		new KafkaEventSchema(),
		parameterTool.getProperties());
	consumer.setPeriodicWatermarkAssigner(new CustomWatermarkExtractor());

	Properties producerProperties = new Properties(parameterTool.getProperties());
	// producer needs region even when URL is specified
	producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
	// test driver does not deaggregate
	producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false));

	// KPL does not recognize endpoint URL..
	String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
	if (kinesisUrl != null) {
		URL url = new URL(kinesisUrl);
		producerProperties.put("KinesisEndpoint", url.getHost());
		producerProperties.put("KinesisPort", Integer.toString(url.getPort()));
		producerProperties.put("VerifyCertificate", "false");
	}

	FlinkKinesisProducer<KafkaEvent> producer = new FlinkKinesisProducer<>(
		new KafkaEventSchema(),
		producerProperties);
	producer.setDefaultStream(outputStream);
	producer.setDefaultPartition("fakePartition");

	DataStream<KafkaEvent> input = env
		.addSource(consumer)
		.keyBy("word")
		.map(new RollingAdditionMapper());

	input.addSink(producer);
	env.execute();
}
 
示例11
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	String inputStream = parameterTool.getRequired("input-stream");
	String outputStream = parameterTool.getRequired("output-stream");

	FlinkKinesisConsumer<KafkaEvent> consumer = new FlinkKinesisConsumer<>(
		inputStream,
		new KafkaEventSchema(),
		parameterTool.getProperties());
	consumer.setPeriodicWatermarkAssigner(new CustomWatermarkExtractor());

	Properties producerProperties = new Properties(parameterTool.getProperties());
	// producer needs region even when URL is specified
	producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
	// test driver does not deaggregate
	producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false));

	// KPL does not recognize endpoint URL..
	String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
	if (kinesisUrl != null) {
		URL url = new URL(kinesisUrl);
		producerProperties.put("KinesisEndpoint", url.getHost());
		producerProperties.put("KinesisPort", Integer.toString(url.getPort()));
		producerProperties.put("VerifyCertificate", "false");
	}

	FlinkKinesisProducer<KafkaEvent> producer = new FlinkKinesisProducer<>(
		new KafkaEventSchema(),
		producerProperties);
	producer.setDefaultStream(outputStream);
	producer.setDefaultPartition("fakePartition");

	DataStream<KafkaEvent> input = env
		.addSource(consumer)
		.keyBy("word")
		.map(new RollingAdditionMapper());

	input.addSink(producer);
	env.execute();
}
 
示例12
public static void main(String[] args) throws Exception {
	// parse input arguments
	final ParameterTool parameterTool = ParameterTool.fromArgs(args);
	StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);

	String inputStream = parameterTool.getRequired("input-stream");
	String outputStream = parameterTool.getRequired("output-stream");

	FlinkKinesisConsumer<KafkaEvent> consumer = new FlinkKinesisConsumer<>(
		inputStream,
		new KafkaEventSchema(),
		parameterTool.getProperties());
	consumer.setPeriodicWatermarkAssigner(new CustomWatermarkExtractor());

	Properties producerProperties = new Properties(parameterTool.getProperties());
	// producer needs region even when URL is specified
	producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
	// test driver does not deaggregate
	producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false));

	// KPL does not recognize endpoint URL..
	String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
	if (kinesisUrl != null) {
		URL url = new URL(kinesisUrl);
		producerProperties.put("KinesisEndpoint", url.getHost());
		producerProperties.put("KinesisPort", Integer.toString(url.getPort()));
		producerProperties.put("VerifyCertificate", "false");
	}

	FlinkKinesisProducer<KafkaEvent> producer = new FlinkKinesisProducer<>(
		new KafkaEventSchema(),
		producerProperties);
	producer.setDefaultStream(outputStream);
	producer.setDefaultPartition("fakePartition");

	DataStream<KafkaEvent> input = env
		.addSource(consumer)
		.keyBy("word")
		.map(new RollingAdditionMapper());

	input.addSink(producer);
	env.execute();
}