Java源码示例:org.apache.spark.streaming.kafka010.LocationStrategies

示例1
public static void main(String[] args) throws InterruptedException {

        System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);

        final SparkConf conf = new SparkConf()
                .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
                .setAppName(APPLICATION_NAME)
                .set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);

        JavaSparkContext sparkContext = new JavaSparkContext(conf);

        JavaRDD<ConsumerRecord<String, String>> rdd = 
	    KafkaUtils.createRDD(sparkContext, KAFKA_CONSUMER_PROPERTIES,
                offsetRange, LocationStrategies.PreferConsistent());
       
        MongoSpark.save(rdd.map(r -> Document.parse(r.value())));

        sparkContext.stop();
        sparkContext.close();
    }
 
示例2
public static void main(String[] args) throws InterruptedException {

        System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);

        final SparkConf conf = new SparkConf()
                .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
                .setAppName(APPLICATION_NAME)
                .set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);

        JavaSparkContext sparkContext = new JavaSparkContext(conf);    
        
        JavaRDD<ConsumerRecord<String, String>> rdd = 
            KafkaUtils.createRDD(sparkContext, KAFKA_CONSUMER_PROPERTIES, 
                offsetRanges, LocationStrategies.PreferConsistent());                

        MongoSpark.save(
            rdd.map(
                f -> Document.parse(f.value())
            )
        );                                        

        sparkContext.stop();
        sparkContext.close();
    }
 
示例3
public static void main(String[] args) throws InterruptedException {
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("data-in");

    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaSpark");
    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));

    final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    JavaPairDStream<String, Integer>  countOfMessageKeys = stream
            .map((ConsumerRecord<String, String> record) -> record.key())
            .mapToPair((String s) -> new Tuple2<>(s, 1))
            .reduceByKey((Integer i1, Integer i2)-> i1 + i2);

    countOfMessageKeys.print();

    // Start the computation
    streamingContext.start();
    streamingContext.awaitTermination();
}
 
示例4
@Override
public JavaDStream<?> getDStream() throws Exception {
  if (dStream == null) {
    JavaStreamingContext jssc = Contexts.getJavaStreamingContext();
    Map<TopicPartition, Long> lastOffsets = null;
    if (doesRecordProgress(config) && !usingKafkaManagedOffsets(config)) {
      lastOffsets = getLastOffsets();
    }

    if (lastOffsets != null) {
      dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
          ConsumerStrategies.Subscribe(topics, kafkaParams, lastOffsets));
    } else {
      dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
          ConsumerStrategies.Subscribe(topics, kafkaParams));
    }

    if (ConfigUtils.getOrElse(config, WINDOW_ENABLED_CONFIG, false)) {
      int windowDuration = config.getInt(WINDOW_MILLISECONDS_CONFIG);
      if (config.hasPath(WINDOW_SLIDE_MILLISECONDS_CONFIG)) {
        int slideDuration = config.getInt(WINDOW_SLIDE_MILLISECONDS_CONFIG);
        dStream = dStream.window(new Duration(windowDuration), new Duration(slideDuration));
      } else {
        dStream = dStream.window(new Duration(windowDuration));
      }
    }
  }

  return dStream;
}
 
示例5
@Override
public JavaStreamingContext createDStream(JavaStreamingContext result, Map<String, Object> props) {
  props.put("bootstrap.servers", metaDataBrokerList);
  if (!autoOffsetValue.isEmpty()) {
    autoOffsetValue = getConfigurableAutoOffsetResetIfNonEmpty(autoOffsetValue);
    props.put(AUTO_OFFSET_RESET, autoOffsetValue);
  }
  props.putAll(extraKafkaConfigs);

  List<String> topics = ImmutableList.of(topic);
  JavaInputDStream<ConsumerRecord<byte[], byte[]>> stream;

  if (offsetHelper.isSDCCheckPointing()) {
    Map<TopicPartition, Long> fromOffsets = KafkaOffsetManagerImpl.get().getOffsetForDStream(topic, numberOfPartitions);
    stream =
        KafkaUtils.createDirectStream(
            result,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<byte[], byte[]>Assign(new ArrayList<TopicPartition>(fromOffsets.keySet()), props, fromOffsets)
        );
  } else {
    stream  = KafkaUtils.createDirectStream(
        result,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<byte[], byte[]>Subscribe(topics, props)
    );

  }
  Driver$.MODULE$.foreach(stream.dstream(), KafkaOffsetManagerImpl.get());
  return result;
}
 
示例6
protected final JavaInputDStream<ConsumerRecord<K,M>> buildInputDStream(
    JavaStreamingContext streamingContext) {

  Preconditions.checkArgument(
      KafkaUtils.topicExists(inputTopicLockMaster, inputTopic),
      "Topic %s does not exist; did you create it?", inputTopic);
  if (updateTopic != null && updateTopicLockMaster != null) {
    Preconditions.checkArgument(
        KafkaUtils.topicExists(updateTopicLockMaster, updateTopic),
        "Topic %s does not exist; did you create it?", updateTopic);
  }

  String groupID = getGroupID();

  Map<String,Object> kafkaParams = new HashMap<>();
  kafkaParams.put("group.id", groupID);
  // Don't re-consume old messages from input by default
  kafkaParams.put("auto.offset.reset", "latest"); // Ignored by Kafka 0.10 Spark integration
  kafkaParams.put("bootstrap.servers", inputBroker);
  kafkaParams.put("key.deserializer", keyDecoderClass.getName());
  kafkaParams.put("value.deserializer", messageDecoderClass.getName());

  LocationStrategy locationStrategy = LocationStrategies.PreferConsistent();
  ConsumerStrategy<K,M> consumerStrategy = ConsumerStrategies.Subscribe(
      Collections.singleton(inputTopic), kafkaParams, Collections.emptyMap());
  return org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(
      streamingContext,
      locationStrategy,
      consumerStrategy);
}
 
示例7
public static void main(String []args) throws InterruptedException, IOException {
  HTTPServer server = new HTTPServer(Integer.valueOf(getPropOrEnv("PROMETHEUS_PORT", "9111")));

  SparkConf sparkConf = new SparkConf()
      .setAppName("Trace DSL")
      .setMaster(getPropOrEnv("SPARK_MASTER","local[*]"));

  JavaSparkContext sc = new JavaSparkContext(sparkConf);
  JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(Integer.parseInt(getPropOrEnv("SPARK_STREAMING_BATCH_DURATION", "5000"))));

  Set<String> topics = Collections.singleton(getPropOrEnv("KAFKA_JAEGER_TOPIC", "jaeger-spans"));
  Map<String, Object> kafkaParams = new HashMap<>();
  kafkaParams.put("bootstrap.servers", getPropOrEnv("KAFKA_BOOTSTRAP_SERVER", "localhost:9092"));
  kafkaParams.put("key.deserializer", StringDeserializer.class);
  kafkaParams.put("value.deserializer", ProtoSpanDeserializer.class);
  // hack to start always from beginning
  kafkaParams.put("group.id", "jaeger-trace-aggregation-" + System.currentTimeMillis());

  if (Boolean.parseBoolean(getPropOrEnv("KAFKA_START_FROM_BEGINNING", "true"))) {
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", false);
    kafkaParams.put("startingOffsets", "earliest");
  }

  JavaInputDStream<ConsumerRecord<String, Span>> messages =
      KafkaUtils.createDirectStream(
          ssc,
          LocationStrategies.PreferConsistent(),
          ConsumerStrategies.Subscribe(topics, kafkaParams));

  JavaPairDStream<String, Span> traceIdSpanTuple = messages.mapToPair(record -> {
    return new Tuple2<>(record.value().traceId, record.value());
  });

 JavaDStream<Trace> tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> {
   System.out.printf("traceID: %s\n", traceIdSpans._1);
    Iterable<Span> spans = traceIdSpans._2();
    Trace trace = new Trace();
    trace.traceId = traceIdSpans._1();
    trace.spans = StreamSupport.stream(spans.spliterator(), false)
        .collect(Collectors.toList());
    return trace;
  });

  MinimumClientVersion minimumClientVersion = MinimumClientVersion.builder()
      .withJavaVersion(getPropOrEnv("TRACE_QUALITY_JAVA_VERSION", "1.0.0"))
      .withGoVersion(getPropOrEnv("TRACE_QUALITY_GO_VERSION", "2.22.0"))
      .withNodeVersion(getPropOrEnv("TRACE_QUALITY_NODE_VERSION", "3.17.1"))
      .withPythonVersion(getPropOrEnv("TRACE_QUALITY_PYTHON_VERSION", "4.0.0"))
      .build();

  List<ModelRunner> modelRunner = Arrays.asList(
      new TraceHeight(),
      new ServiceDepth(),
      new ServiceHeight(),
      new NetworkLatency(),
      new NumberOfErrors(),
      new DirectDependencies(),
      // trace quality
      minimumClientVersion,
      new HasClientServerSpans(),
      new UniqueSpanId());

  tracesStream.foreachRDD((traceRDD, time) -> {
    traceRDD.foreach(trace -> {
      Graph graph = GraphCreator.create(trace);

      for (ModelRunner model: modelRunner) {
        model.runWithMetrics(graph);
      }
    });
  });

  ssc.start();
  ssc.awaitTermination();
}
 
示例8
public static void main(String[] args) throws InterruptedException {

                System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);

                final SparkConf conf = new SparkConf()
                    .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
                    .setAppName(APPLICATION_NAME)
                    .set("spark.sql.caseSensitive", CASE_SENSITIVE);                               

                JavaStreamingContext streamingContext = new JavaStreamingContext(conf,
                    new Duration(BATCH_DURATION_INTERVAL_MS));
                
                JavaInputDStream<ConsumerRecord<String, String>> meetupStream = 
                    KafkaUtils.createDirectStream(
                                streamingContext, 
				LocationStrategies.PreferConsistent(),
                                ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
                    );

                JavaDStream<String> meetupStreamValues = 
		    meetupStream.map(v -> {                     
                        return v.value();
                    });

                // Prepare the training data as strings of type: (y,[x1,x2,x3,...,xn])
                // Where n is the number of features, y is a binary label, 
                // and n must be the same for train and test.
                // e.g. "(response, [group_lat, group_long])";
                JavaDStream<String> trainData = meetupStreamValues.map(e -> {
                        
                        JSONParser jsonParser = new JSONParser();
                        JSONObject json = (JSONObject)jsonParser.parse(e);

                        String result = "(" 
                            + (String.valueOf(json.get("response")).equals("yes") ? "1.0,[":"0.0,[") 
                            + ((JSONObject)json.get("group")).get("group_lat") + "," 
                            + ((JSONObject)json.get("group")).get("group_lon")
                            + "])";
                        
                        return result;
                });

                trainData.print();

                JavaDStream<LabeledPoint> labeledPoints = trainData.map(LabeledPoint::parse);
        
                StreamingLogisticRegressionWithSGD streamingLogisticRegressionWithSGD 
			= new StreamingLogisticRegressionWithSGD()
                            .setInitialWeights(Vectors.zeros(2));

                streamingLogisticRegressionWithSGD.trainOn(labeledPoints);

                JavaPairDStream<Double, Vector> values = 
			labeledPoints.mapToPair(f -> new Tuple2<>(f.label(), f.features()));

                streamingLogisticRegressionWithSGD.predictOnValues(values).print();

                // some time later, after outputs have completed
                meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {        
                    OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();            

                ((CanCommitOffsets) meetupStream.inputDStream())
                    .commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
                });

                streamingContext.start();
                streamingContext.awaitTermination();
        }
 
示例9
public static void main(String[] args) throws InterruptedException {

        System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);

        final SparkConf conf = new SparkConf()
                .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
                .setAppName(APPLICATION_NAME)
                .set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI)
                .set("spark.streaming.kafka.consumer.cache.enabled", "false");

        final JavaStreamingContext streamingContext
                = new JavaStreamingContext(conf, new Duration(BATCH_DURATION_INTERVAL_MS));

        streamingContext.checkpoint(CHECKPOINT_FOLDER);

        final JavaInputDStream<ConsumerRecord<String, String>> meetupStream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
                );
                
        // transformations, streaming algorithms, etc
        JavaDStream<Long> countStream  
            = meetupStream.countByWindow(
                 new Duration(WINDOW_LENGTH_MS), 
                 new Duration(SLIDING_INTERVAL_MS));        

        countStream.foreachRDD((JavaRDD<Long> countRDD) -> {                
            MongoSpark.save(        
                    countRDD.map(
                        r -> Document.parse("{\"rsvps_count\":\"" + String.valueOf(r) + "\"}")
                    )
            );            
        });
        
        // some time later, after outputs have completed
        meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {        
            OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();            

            ((CanCommitOffsets) meetupStream.inputDStream())
                .commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
        });
        
        streamingContext.start();
        streamingContext.awaitTermination();    
    }
 
示例10
public static void main(String[] args) throws InterruptedException {

        System.setProperty("hadoop.home.dir", HADOOP_HOME_DIR_VALUE);

        final SparkConf conf = new SparkConf()
                .setMaster(RUN_LOCAL_WITH_AVAILABLE_CORES)
                .setAppName(APPLICATION_NAME)
                .set("spark.mongodb.output.uri", MONGODB_OUTPUT_URI);

        final JavaStreamingContext streamingContext
                = new JavaStreamingContext(conf, new Duration(BATCH_DURATION_INTERVAL_MS));

        final JavaInputDStream<ConsumerRecord<String, String>> meetupStream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(TOPICS, KAFKA_CONSUMER_PROPERTIES)
                );
                
        // transformations, streaming algorithms, etc
        JavaDStream<ConsumerRecord<String, String>> rsvpsWithGuestsStream =
                meetupStream.filter(f -> !f.value().contains("\"guests\":0"));

        rsvpsWithGuestsStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> r) -> {        
            MongoSpark.save(
                    r.map(
                        e -> Document.parse(e.value())
                    )
            );            
        });
        
        // some time later, after outputs have completed
        meetupStream.foreachRDD((JavaRDD<ConsumerRecord<String, String>> meetupRDD) -> {        
            OffsetRange[] offsetRanges = ((HasOffsetRanges) meetupRDD.rdd()).offsetRanges();            

            ((CanCommitOffsets) meetupStream.inputDStream())
                .commitAsync(offsetRanges, new MeetupOffsetCommitCallback());
        });

        streamingContext.start();
        streamingContext.awaitTermination();    
    }
 
示例11
public JavaDStream<Row> createSource(JavaStreamingContext ssc, KafkaSourceConfig config, SourceContext context)
{
    String topics = config.getTopics();
    String brokers = config.getBrokers(); //需要把集群的host 配置到程序所在机器
    String groupId = config.getGroupid(); //消费者的名字
    String offsetMode = config.getOffsetMode();

    Map<String, Object> kafkaParams = new HashMap<>(config.getOtherConfig());
    kafkaParams.put("bootstrap.servers", brokers);
    kafkaParams.put("key.deserializer", ByteArrayDeserializer.class); //StringDeserializer
    kafkaParams.put("value.deserializer", ByteArrayDeserializer.class); //StringDeserializer
    kafkaParams.put("enable.auto.commit", false); //不自动提交偏移量
    //      "fetch.message.max.bytes" ->
    //      "session.timeout.ms" -> "30000", //session默认是30秒
    //      "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
    kafkaParams.put("group.id", groupId); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
    kafkaParams.put("auto.offset.reset", offsetMode); //latest   earliest

    List<String> topicSets = Arrays.asList(topics.split(","));
    JavaInputDStream<ConsumerRecord<byte[], byte[]>> inputStream = KafkaUtils.createDirectStream(
            ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicSets, kafkaParams));

    DStream<ConsumerRecord<byte[], byte[]>> sylphKafkaOffset = new SylphKafkaOffset<ConsumerRecord<byte[], byte[]>>(inputStream.inputDStream())
    {
        @Override
        public void commitOffsets(RDD<?> kafkaRdd)
        {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaRdd).offsetRanges();
            log().info("commitKafkaOffsets {}", (Object) offsetRanges);
            DStream<?> firstDStream = DStreamUtil.getFirstDStream(inputStream.dstream());
            ((CanCommitOffsets) firstDStream).commitAsync(offsetRanges);
        }
    };

    JavaDStream<ConsumerRecord<byte[], byte[]>> javaDStream = new JavaDStream<>(sylphKafkaOffset, ClassTag$.MODULE$.apply(ConsumerRecord.class));
    if ("json".equalsIgnoreCase(config.getValueType())) {
        JsonSchema jsonParser = new JsonSchema(context.getSchema());
        return javaDStream
                .map(record -> jsonParser.deserialize(record.key(), record.value(), record.topic(), record.partition(), record.offset()));
    }
    else {
        List<String> names = context.getSchema().getFieldNames();
        return javaDStream
                .map(record -> {
                    Object[] values = new Object[names.size()];
                    for (int i = 0; i < names.size(); i++) {
                        switch (names.get(i)) {
                            case "_topic":
                                values[i] = record.topic();
                                continue;
                            case "_message":
                                values[i] = new String(record.value(), UTF_8);
                                continue;
                            case "_key":
                                values[i] = record.key() == null ? null : new String(record.key(), UTF_8);
                                continue;
                            case "_partition":
                                values[i] = record.partition();
                                continue;
                            case "_offset":
                                values[i] = record.offset();
                            case "_timestamp":
                                values[i] = record.timestamp();
                            case "_timestampType":
                                values[i] = record.timestampType().id;
                            default:
                                values[i] = null;
                        }
                    }
                    return new GenericRow(values);  //GenericRowWithSchema
                });  //.window(Duration(10 * 1000))
    }
}
 
示例12
public static void main(String[] args) {
  	//Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
 System.setProperty("hadoop.home.dir", "E:\\hadoop");
  	//Logger rootLogger = LogManager.getRootLogger();
 		//rootLogger.setLevel(Level.WARN); 
      SparkConf conf = new SparkConf().setAppName("KafkaExample").setMaster("local[*]");    
      JavaSparkContext sc = new JavaSparkContext(conf);
      JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.minutes(2));
      streamingContext.checkpoint("E:\\hadoop\\checkpoint");
      Logger rootLogger = LogManager.getRootLogger();
 		rootLogger.setLevel(Level.WARN); 
      Map<String, Object> kafkaParams = new HashMap<>();
      kafkaParams.put("bootstrap.servers", "10.0.75.1:9092");
      kafkaParams.put("key.deserializer", StringDeserializer.class);
      kafkaParams.put("value.deserializer", StringDeserializer.class);
      kafkaParams.put("group.id", "use_a_separate_group_id_for_each_strea");
      kafkaParams.put("auto.offset.reset", "latest");
     // kafkaParams.put("enable.auto.commit", false);

      Collection<String> topics = Arrays.asList("mytopic", "anothertopic");

      final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),
      				ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

      JavaPairDStream<String, String> pairRDD = stream.mapToPair(record-> new Tuple2<>(record.key(), record.value()));
     
      pairRDD.foreachRDD(pRDD-> { pRDD.foreach(tuple-> System.out.println(new Date()+" :: Kafka msg key ::"+tuple._1() +" the val is ::"+tuple._2()));});
     
      JavaDStream<String> tweetRDD = pairRDD.map(x-> x._2()).map(new TweetText());
      
      tweetRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" :: "+x)));
      
     JavaDStream<String> hashtagRDD = tweetRDD.flatMap(twt-> Arrays.stream(twt.split(" ")).filter(str-> str.contains("#")).collect(Collectors.toList()).iterator() );
 
      hashtagRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(x)));
      
      JavaPairDStream<String, Long> cntByVal = hashtagRDD.countByValue();
      
      cntByVal.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The count tag is ::"+x._1() +" and the val is ::"+x._2())));
      
     /* hashtagRDD.window(Durations.seconds(60), Durations.seconds(30))
                .countByValue()
               .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
      
     hashtagRDD.countByValueAndWindow(Durations.seconds(60), Durations.seconds(30))
               .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println("The window&count tag is ::"+x._1() +" and the val is ::"+x._2())));
      */
     hashtagRDD.window(Durations.minutes(8)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     hashtagRDD.window(Durations.minutes(8),Durations.minutes(2)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     hashtagRDD.window(Durations.minutes(12),Durations.minutes(8)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     hashtagRDD.window(Durations.minutes(2),Durations.minutes(2)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     hashtagRDD.window(Durations.minutes(12),Durations.minutes(12)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     
     /*hashtagRDD.window(Durations.minutes(5),Durations.minutes(2)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));*/
     /* hashtagRDD.window(Durations.minutes(10),Durations.minutes(1)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));*/
     
      streamingContext.start();
      try {
	streamingContext.awaitTermination();
} catch (InterruptedException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
}
  }
 
示例13
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
  return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
          LocationStrategies.PreferConsistent()).map(x -> (String) x.value());
}
 
示例14
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {
  return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
          LocationStrategies.PreferConsistent()).map(obj -> (GenericRecord) obj.value());
}
 
示例15
public static void main(String[] args) throws InterruptedException {

        Logger.getLogger("org")
            .setLevel(Level.OFF);
        Logger.getLogger("akka")
            .setLevel(Level.OFF);

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("messages");

        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local[2]");
        sparkConf.setAppName("WordCountingAppWithCheckpoint");
        sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");

        JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));

        sparkContext = streamingContext.sparkContext();

        streamingContext.checkpoint("./.checkpoint");

        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));

        JavaPairDStream<String, String> results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

        JavaDStream<String> lines = results.map(tuple2 -> tuple2._2());

        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+"))
            .iterator());

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
            .reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);

        JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((word, one, state) -> {
            int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
            Tuple2<String, Integer> output = new Tuple2<>(word, sum);
            state.update(sum);
            return output;
        }));

        cumulativeWordCounts.foreachRDD(javaRdd -> {
            List<Tuple2<String, Integer>> wordCountList = javaRdd.collect();
            for (Tuple2<String, Integer> tuple : wordCountList) {
                List<Word> wordList = Arrays.asList(new Word(tuple._1, tuple._2));
                JavaRDD<Word> rdd = sparkContext.parallelize(wordList);
                javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
                    .saveToCassandra();
            }
        });

        streamingContext.start();
        streamingContext.awaitTermination();
    }