Java源码示例:org.apache.spark.streaming.kafka010.LocationStrategies
示例1
public static void main(String[] args) throws InterruptedException {
System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
final SparkConf conf = new SparkConf()
.setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
.setAppName(APPLICATION_NAME)
.set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<ConsumerRecord<String, String>> rdd =
KafkaUtils.createRDD(sparkContext, KAFKA_CONSUMER_PROPERTIES,
offsetRange, LocationStrategies.PreferConsistent());
MongoSpark.save(rdd.map(r -> Document.parse(r.value())));
sparkContext.stop();
sparkContext.close();
}
示例2
public static void main(String[] args) throws InterruptedException {
System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
final SparkConf conf = new SparkConf()
.setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
.setAppName(APPLICATION_NAME)
.set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaRDD<ConsumerRecord<String, String>> rdd =
KafkaUtils.createRDD(sparkContext, KAFKA_CONSUMER_PROPERTIES,
offsetRanges, LocationStrategies.PreferConsistent());
MongoSpark.save(
rdd.map(
f -> Document.parse(f.value())
)
);
sparkContext.stop();
sparkContext.close();
}
示例3
public static void main(String[] args) throws InterruptedException {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("data-in");
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaSpark");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, Integer> countOfMessageKeys = stream
.map((ConsumerRecord<String, String> record) -> record.key())
.mapToPair((String s) -> new Tuple2<>(s, 1))
.reduceByKey((Integer i1, Integer i2)-> i1 + i2);
countOfMessageKeys.print();
// Start the computation
streamingContext.start();
streamingContext.awaitTermination();
}
示例4
@Override
public JavaDStream<?> getDStream() throws Exception {
if (dStream == null) {
JavaStreamingContext jssc = Contexts.getJavaStreamingContext();
Map<TopicPartition, Long> lastOffsets = null;
if (doesRecordProgress(config) && !usingKafkaManagedOffsets(config)) {
lastOffsets = getLastOffsets();
}
if (lastOffsets != null) {
dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams, lastOffsets));
} else {
dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
}
if (ConfigUtils.getOrElse(config, WINDOW_ENABLED_CONFIG, false)) {
int windowDuration = config.getInt(WINDOW_MILLISECONDS_CONFIG);
if (config.hasPath(WINDOW_SLIDE_MILLISECONDS_CONFIG)) {
int slideDuration = config.getInt(WINDOW_SLIDE_MILLISECONDS_CONFIG);
dStream = dStream.window(new Duration(windowDuration), new Duration(slideDuration));
} else {
dStream = dStream.window(new Duration(windowDuration));
}
}
}
return dStream;
}
示例5
@Override
public JavaStreamingContext createDStream(JavaStreamingContext result, Map<String, Object> props) {
props.put("bootstrap.servers", metaDataBrokerList);
if (!autoOffsetValue.isEmpty()) {
autoOffsetValue = getConfigurableAutoOffsetResetIfNonEmpty(autoOffsetValue);
props.put(AUTO_OFFSET_RESET, autoOffsetValue);
}
props.putAll(extraKafkaConfigs);
List<String> topics = ImmutableList.of(topic);
JavaInputDStream<ConsumerRecord<byte[], byte[]>> stream;
if (offsetHelper.isSDCCheckPointing()) {
Map<TopicPartition, Long> fromOffsets = KafkaOffsetManagerImpl.get().getOffsetForDStream(topic, numberOfPartitions);
stream =
KafkaUtils.createDirectStream(
result,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<byte[], byte[]>Assign(new ArrayList<TopicPartition>(fromOffsets.keySet()), props, fromOffsets)
);
} else {
stream = KafkaUtils.createDirectStream(
result,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<byte[], byte[]>Subscribe(topics, props)
);
}
Driver$.MODULE$.foreach(stream.dstream(), KafkaOffsetManagerImpl.get());
return result;
}
示例6
protected final JavaInputDStream<ConsumerRecord<K,M>> buildInputDStream(
JavaStreamingContext streamingContext) {
Preconditions.checkArgument(
KafkaUtils.topicExists(inputTopicLockMaster, inputTopic),
"Topic %s does not exist; did you create it?", inputTopic);
if (updateTopic != null && updateTopicLockMaster != null) {
Preconditions.checkArgument(
KafkaUtils.topicExists(updateTopicLockMaster, updateTopic),
"Topic %s does not exist; did you create it?", updateTopic);
}
String groupID = getGroupID();
Map<String,Object> kafkaParams = new HashMap<>();
kafkaParams.put("group.id", groupID);
// Don't re-consume old messages from input by default
kafkaParams.put("auto.offset.reset", "latest"); // Ignored by Kafka 0.10 Spark integration
kafkaParams.put("bootstrap.servers", inputBroker);
kafkaParams.put("key.deserializer", keyDecoderClass.getName());
kafkaParams.put("value.deserializer", messageDecoderClass.getName());
LocationStrategy locationStrategy = LocationStrategies.PreferConsistent();
ConsumerStrategy<K,M> consumerStrategy = ConsumerStrategies.Subscribe(
Collections.singleton(inputTopic), kafkaParams, Collections.emptyMap());
return org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
streamingContext,
locationStrategy,
consumerStrategy);
}
示例7
public static void main(String []args) throws InterruptedException, IOException {
HTTPServer server = new HTTPServer(Integer.valueOf(getPropOrEnv("PROMETHEUS_PORT", "9111")));
SparkConf sparkConf = new SparkConf()
.setAppName("Trace DSL")
.setMaster(getPropOrEnv("SPARK_MASTER","local[*]"));
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(Integer.parseInt(getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "5000"))));
Set<String> topics = Collections.singleton(getPropOrEnv("KAFKA_JAEGER_TOPIC", "jaeger-spans"));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", getPropOrEnv("KAFKA_BOOTSTRAP_SERVER", "localhost:9092"));
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", ProtoSpanDeserializer.class);
// hack to start always from beginning
kafkaParams.put("group.id", "jaeger-trace-aggregation-" + System.currentTimeMillis());
if (Boolean.parseBoolean(getPropOrEnv("KAFKA_START_FROM_BEGINNING", "true"))) {
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("startingOffsets", "earliest");
}
JavaInputDStream<ConsumerRecord<String, Span>> messages =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
JavaPairDStream<String, Span> traceIdSpanTuple = messages.mapToPair(record -> {
return new Tuple2<>(record.value().traceId, record.value());
});
JavaDStream<Trace> tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> {
System.out.printf("traceID: %s\n", traceIdSpans._1);
Iterable<Span> spans = traceIdSpans._2();
Trace trace = new Trace();
trace.traceId = traceIdSpans._1();
trace.spans = StreamSupport.stream(spans.spliterator(), false)
.collect(Collectors.toList());
return trace;
});
MinimumClientVersion minimumClientVersion = MinimumClientVersion.builder()
.withJavaVersion(getPropOrEnv("TRACE_QUALITY_JAVA_VERSION", "1.0.0"))
.withGoVersion(getPropOrEnv("TRACE_QUALITY_GO_VERSION", "2.22.0"))
.withNodeVersion(getPropOrEnv("TRACE_QUALITY_NODE_VERSION", "3.17.1"))
.withPythonVersion(getPropOrEnv("TRACE_QUALITY_PYTHON_VERSION", "4.0.0"))
.build();
List<ModelRunner> modelRunner = Arrays.asList(
new TraceHeight(),
new ServiceDepth(),
new ServiceHeight(),
new NetworkLatency(),
new NumberOfErrors(),
new DirectDependencies(),
// trace quality
minimumClientVersion,
new HasClientServerSpans(),
new UniqueSpanId());
tracesStream.foreachRDD((traceRDD, time) -> {
traceRDD.foreach(trace -> {
Graph graph = GraphCreator.create(trace);
for (ModelRunner model: modelRunner) {
model.runWithMetrics(graph);
}
});
});
ssc.start();
ssc.awaitTermination();
}
示例8
public static void main(String[] args) throws InterruptedException {
System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
final SparkConf conf = new SparkConf()
.setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
.setAppName(APPLICATION_NAME)
.set("spark.sql.caseSensitive", CASE_SENSITIVE);
JavaStreamingContext streamingContext = new JavaStreamingContext(conf,
new Duration(BATCH_DURATION_INTERVAL_MS));
JavaInputDStream<ConsumerRecord<String, String>> meetupStream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
);
JavaDStream<String> meetupStreamValues =
meetupStream.map(v -> {
return v.value();
});
// Prepare the training data as strings of type: (y,[x1,x2,x3,...,xn])
// Where n is the number of features, y is a binary label,
// and n must be the same for train and test.
// e.g. "(response, [group_lat, group_long])";
JavaDStream<String> trainData = meetupStreamValues.map(e -> {
JSONParser jsonParser = new JSONParser();
JSONObject json = (JSONObject)jsonParser.parse(e);
String result = "("
+ (String.valueOf(json.get("response")).equals("yes") ? "1.0,[":"0.0,[")
+ ((JSONObject)json.get("group")).get("group_lat") + ","
+ ((JSONObject)json.get("group")).get("group_lon")
+ "])";
return result;
});
trainData.print();
JavaDStream<LabeledPoint> labeledPoints = trainData.map(LabeledPoint::parse);
StreamingLogisticRegressionWithSGD streamingLogisticRegressionWithSGD
= new StreamingLogisticRegressionWithSGD()
.setInitialWeights(Vectors.zeros(2));
streamingLogisticRegressionWithSGD.trainOn(labeledPoints);
JavaPairDStream<Double, Vector> values =
labeledPoints.mapToPair(f -> new Tuple2<>(f.label(), f.features()));
streamingLogisticRegressionWithSGD.predictOnValues(values).print();
// some time later, after outputs have completed
meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();
((CanCommitOffsets) meetupStream.inputDStream())
.commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
});
streamingContext.start();
streamingContext.awaitTermination();
}
示例9
public static void main(String[] args) throws InterruptedException {
System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
final SparkConf conf = new SparkConf()
.setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
.setAppName(APPLICATION_NAME)
.set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI)
.set("spark.streaming.kafka.consumer.cache.enabled", "false");
final JavaStreamingContext streamingContext
= new JavaStreamingContext(conf, new Duration(BATCH_DURATION_INTERVAL_MS));
streamingContext.checkpoint(CHECKPOINT_FOLDER);
final JavaInputDStream<ConsumerRecord<String, String>> meetupStream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
);
// transformations, streaming algorithms, etc
JavaDStream<Long> countStream
= meetupStream.countByWindow(
new Duration(WINDOW_LENGTH_MS),
new Duration(SLIDING_INTERVAL_MS));
countStream.foreachRDD((JavaRDD<Long> countRDD) -> {
MongoSpark.save(
countRDD.map(
r -> Document.parse("{\"rsvps_count\":\"" + String.valueOf(r) + "\"}")
)
);
});
// some time later, after outputs have completed
meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();
((CanCommitOffsets) meetupStream.inputDStream())
.commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
});
streamingContext.start();
streamingContext.awaitTermination();
}
示例10
public static void main(String[] args) throws InterruptedException {
System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);
final SparkConf conf = new SparkConf()
.setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
.setAppName(APPLICATION_NAME)
.set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);
final JavaStreamingContext streamingContext
= new JavaStreamingContext(conf, new Duration(BATCH_DURATION_INTERVAL_MS));
final JavaInputDStream<ConsumerRecord<String, String>> meetupStream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
);
// transformations, streaming algorithms, etc
JavaDStream<ConsumerRecord<String, String>> rsvpsWithGuestsStream =
meetupStream.filter(f -> !f.value().contains("\"guests\":0"));
rsvpsWithGuestsStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> r) -> {
MongoSpark.save(
r.map(
e -> Document.parse(e.value())
)
);
});
// some time later, after outputs have completed
meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();
((CanCommitOffsets) meetupStream.inputDStream())
.commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
});
streamingContext.start();
streamingContext.awaitTermination();
}
示例11
public JavaDStream<Row> createSource(JavaStreamingContext ssc, KafkaSourceConfig config, SourceContext context)
{
String topics = config.getTopics();
String brokers = config.getBrokers(); //需要把集群的host 配置到程序所在机器
String groupId = config.getGroupid(); //消费者的名字
String offsetMode = config.getOffsetMode();
Map<String, Object> kafkaParams = new HashMap<>(config.getOtherConfig());
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class); //StringDeserializer
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class); //StringDeserializer
kafkaParams.put("enable.auto.commit", false); //不自动提交偏移量
// "fetch.message.max.bytes" ->
// "session.timeout.ms" -> "30000", //session默认是30秒
// "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
kafkaParams.put("group.id", groupId); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
kafkaParams.put("auto.offset.reset", offsetMode); //latest earliest
List<String> topicSets = Arrays.asList(topics.split(","));
JavaInputDStream<ConsumerRecord<byte[], byte[]>> inputStream = KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicSets, kafkaParams));
DStream<ConsumerRecord<byte[], byte[]>> sylphKafkaOffset = new SylphKafkaOffset<ConsumerRecord<byte[], byte[]>>(inputStream.inputDStream())
{
@Override
public void commitOffsets(RDD<?> kafkaRdd)
{
OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaRdd).offsetRanges();
log().info("commitKafkaOffsets {}", (Object) offsetRanges);
DStream<?> firstDStream = DStreamUtil.getFirstDStream(inputStream.dstream());
((CanCommitOffsets) firstDStream).commitAsync(offsetRanges);
}
};
JavaDStream<ConsumerRecord<byte[], byte[]>> javaDStream = new JavaDStream<>(sylphKafkaOffset, ClassTag$.MODULE$.apply(ConsumerRecord.class));
if ("json".equalsIgnoreCase(config.getValueType())) {
JsonSchema jsonParser = new JsonSchema(context.getSchema());
return javaDStream
.map(record -> jsonParser.deserialize(record.key(), record.value(), record.topic(), record.partition(), record.offset()));
}
else {
List<String> names = context.getSchema().getFieldNames();
return javaDStream
.map(record -> {
Object[] values = new Object[names.size()];
for (int i = 0; i < names.size(); i++) {
switch (names.get(i)) {
case "_topic":
values[i] = record.topic();
continue;
case "_message":
values[i] = new String(record.value(), UTF_8);
continue;
case "_key":
values[i] = record.key() == null ? null : new String(record.key(), UTF_8);
continue;
case "_partition":
values[i] = record.partition();
continue;
case "_offset":
values[i] = record.offset();
case "_timestamp":
values[i] = record.timestamp();
case "_timestampType":
values[i] = record.timestampType().id;
default:
values[i] = null;
}
}
return new GenericRow(values); //GenericRowWithSchema
}); //.window(Duration(10 * 1000))
}
}
示例12
public static void main(String[] args) {
//Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
System.setProperty("hadoop.home.dir", "E:\\hadoop");
//Logger rootLogger = LogManager.getRootLogger();
//rootLogger.setLevel(Level.WARN);
SparkConf conf = new SparkConf().setAppName("KafkaExample").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.minutes(2));
streamingContext.checkpoint("E:\\hadoop\\checkpoint");
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.WARN);
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "10.0.75.1:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_strea");
kafkaParams.put("auto.offset.reset", "latest");
// kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("mytopic", "anothertopic");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> pairRDD = stream.mapToPair(record-> new Tuple2<>(record.key(), record.value()));
pairRDD.foreachRDD(pRDD-> { pRDD.foreach(tuple-> System.out.println(new Date()+" :: Kafka msg key ::"+tuple._1() +" the val is ::"+tuple._2()));});
JavaDStream<String> tweetRDD = pairRDD.map(x-> x._2()).map(new TweetText());
tweetRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" :: "+x)));
JavaDStream<String> hashtagRDD = tweetRDD.flatMap(twt-> Arrays.stream(twt.split(" ")).filter(str-> str.contains("#")).collect(Collectors.toList()).iterator() );
hashtagRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(x)));
JavaPairDStream<String, Long> cntByVal = hashtagRDD.countByValue();
cntByVal.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The count tag is ::"+x._1() +" and the val is ::"+x._2())));
/* hashtagRDD.window(Durations.seconds(60), Durations.seconds(30))
.countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.countByValueAndWindow(Durations.seconds(60), Durations.seconds(30))
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println("The window&count tag is ::"+x._1() +" and the val is ::"+x._2())));
*/
hashtagRDD.window(Durations.minutes(8)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.window(Durations.minutes(8),Durations.minutes(2)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.window(Durations.minutes(12),Durations.minutes(8)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.window(Durations.minutes(2),Durations.minutes(2)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.window(Durations.minutes(12),Durations.minutes(12)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
/*hashtagRDD.window(Durations.minutes(5),Durations.minutes(2)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));*/
/* hashtagRDD.window(Durations.minutes(10),Durations.minutes(1)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));*/
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
示例13
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(x -> (String) x.value());
}
示例14
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
示例15
public static void main(String[] args) throws InterruptedException {
Logger.getLogger("org")
.setLevel(Level.OFF);
Logger.getLogger("akka")
.setLevel(Level.OFF);
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local[2]");
sparkConf.setAppName("WordCountingAppWithCheckpoint");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
sparkContext = streamingContext.sparkContext();
streamingContext.checkpoint("./.checkpoint");
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
JavaDStream<String> lines = results.map(tuple2 -> tuple2._2());
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+"))
.iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((word, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}));
cumulativeWordCounts.foreachRDD(javaRdd -> {
List<Tuple2<String, Integer>> wordCountList = javaRdd.collect();
for (Tuple2<String, Integer> tuple : wordCountList) {
List<Word> wordList = Arrays.asList(new Word(tuple._1, tuple._2));
JavaRDD<Word> rdd = sparkContext.parallelize(wordList);
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
.saveToCassandra();
}
});
streamingContext.start();
streamingContext.awaitTermination();
}