Java源码示例:org.apache.spark.api.java.Optional

示例1
@Override
public SummaryResultTable call(Tuple2<Integer, Tuple2<Optional<Iterable<List<Object>>>, Optional<Iterable<List<Object>>>>> v1)
{
    Integer shardNumber = v1._1();
    Optional<Iterable<List<Object>>> actualOptional = v1._2()._1();
    Optional<Iterable<List<Object>>> expectedOptional = v1._2()._2();
    Iterable<List<Object>> actualRows = actualOptional.isPresent() ? actualOptional.get() : Collections.emptyList();
    Iterable<List<Object>> expectedRows = expectedOptional.isPresent() ? expectedOptional.get() : Collections.emptyList();
    VerifiableTable actualTable = getVerifiableTable(actualRows, this.actualColumns);
    VerifiableTable expectedTable = getVerifiableTable(expectedRows, this.expectedColumns);
    IndexMapTableVerifier singleSingleTableVerifier = new IndexMapTableVerifier(
            this.columnComparators,
            false,
            IndexMapTableVerifier.DEFAULT_BEST_MATCH_THRESHOLD,
            false,
            false,
            this.ignoreSurplusColumns,
            false,
            0);
    ResultTable resultTable = singleSingleTableVerifier.verify(actualTable, expectedTable);
    LOGGER.info("Verification of shard {} {}", shardNumber, resultTable.isSuccess() ? "PASSED" : "FAILED");
    return new SummaryResultTable(resultTable);
}
 
示例2
public static void main(String[] args) throws Exception {
 System.setProperty("hadoop.home.dir", "E:\\hadoop");

   SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
   JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
   streamingContext.checkpoint("E:\\hadoop\\checkpoint");
// Initial state RDD input to mapWithState
   @SuppressWarnings("unchecked")
   List<Tuple2<String, Integer>> tuples =Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
   JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);
   
   JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);
   
   JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );
  
   JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );
  


  // Update the cumulative count function
  Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
      new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(String word, Optional<Integer> one,
            State<Integer> state) {
          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
          state.update(sum);
          return output;
        }
      };

  // DStream made of get cumulative counts that get updated in every batch
  JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

  stateDstream.print();
  streamingContext.start();
  streamingContext.awaitTermination();
}
 
示例3
protected static JavaStreamingContext createContext(String ip, int port, String checkpointDirectory) {
	SparkConf sparkConf = new SparkConf().setAppName("WordCountRecoverableEx").setMaster("local[*]");
	JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
	streamingContext.checkpoint(checkpointDirectory);
	// Initial state RDD input to mapWithState
	@SuppressWarnings("unchecked")
	List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
	JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);

	JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream(ip,port, StorageLevels.MEMORY_AND_DISK_SER);

	JavaDStream<String> words = StreamingLines.flatMap(str -> Arrays.asList(str.split(" ")).iterator());

	JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str -> new Tuple2<>(str, 1))
			.reduceByKey((count1, count2) -> count1 + count2);

	// Update the cumulative count function
	Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
		@Override
		public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
			int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
			Tuple2<String, Integer> output = new Tuple2<>(word, sum);
			state.update(sum);
			return output;
		}
	};

	// DStream made of get cumulative counts that get updated in every batch
	JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts
			.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

	stateDstream.print();
	return streamingContext;
}
 
示例4
/**
 * Tagging for global index should only consider the record key.
 */
@Override
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
    JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {

  JavaPairRDD<String, HoodieRecord<T>> incomingRowKeyRecordPairRDD =
      recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));

  JavaPairRDD<String, Tuple2<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
      keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));

  // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
  return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record -> {
    final HoodieRecord<T> hoodieRecord = record._1;
    final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
    if (recordLocationHoodieKeyPair.isPresent()) {
      // Record key matched to file
      if (config.getBloomIndexUpdatePartitionPath()
          && !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
        // Create an empty record to delete the record in the old partition
        HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
            new EmptyHoodieRecordPayload());
        // Tag the incoming record for inserting to the new partition
        HoodieRecord<T> taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
        return Arrays.asList(emptyRecord, taggedRecord).iterator();
      } else {
        // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
        // When it differs, the record will still be updated at its old partition.
        return Collections.singletonList(
            (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
                Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
      }
    } else {
      return Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty())).iterator();
    }
  });
}
 
示例5
@Override
public Optional<String> getCheckpointFile() {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例6
@Override
public org.apache.spark.api.java.Optional<Partitioner> partitioner() {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例7
@Override
public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
              final Partitioner partitioner) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例8
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
               final Partitioner partitioner) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例9
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
              final Partitioner partitioner) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例10
@Override
public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例11
@Override
public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
              final int numPartitions) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例12
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例13
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
               final int numPartitions) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例14
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例15
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
              final int numPartitions) {
  throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
 
示例16
public static void main(String[] args) throws InterruptedException {

		System.setProperty("hadoop.home.dir", "C:\\softwares\\Winutils");

		SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Stateful Streaming Example")
				.config("spark.sql.warehouse.dir", "file:////C:/Users/sgulati/spark-warehouse").getOrCreate();

		JavaStreamingContext jssc= new JavaStreamingContext(new JavaSparkContext(sparkSession.sparkContext()),
				Durations.milliseconds(1000));
		JavaReceiverInputDStream<String> inStream = jssc.socketTextStream("10.204.136.223", 9999);
		jssc.checkpoint("C:\\Users\\sgulati\\spark-checkpoint");

		JavaDStream<FlightDetails> flightDetailsStream = inStream.map(x -> {
			ObjectMapper mapper = new ObjectMapper();
			return mapper.readValue(x, FlightDetails.class);
		});
		
		

		JavaPairDStream<String, FlightDetails> flightDetailsPairStream = flightDetailsStream
				.mapToPair(f -> new Tuple2<String, FlightDetails>(f.getFlightId(), f));

		Function3<String, Optional<FlightDetails>, State<List<FlightDetails>>, Tuple2<String, Double>> mappingFunc = (
				flightId, curFlightDetail, state) -> {
			List<FlightDetails> details = state.exists() ? state.get() : new ArrayList<>();

			boolean isLanded = false;

			if (curFlightDetail.isPresent()) {
				details.add(curFlightDetail.get());
				if (curFlightDetail.get().isLanded()) {
					isLanded = true;
				}
			}
			Double avgSpeed = details.stream().mapToDouble(f -> f.getTemperature()).average().orElse(0.0);

			if (isLanded) {
				state.remove();
			} else {
				state.update(details);
			}
			return new Tuple2<String, Double>(flightId, avgSpeed);
		};

		JavaMapWithStateDStream<String, FlightDetails, List<FlightDetails>, Tuple2<String, Double>> streamWithState = flightDetailsPairStream
				.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(5)));
		
		streamWithState.print();
		jssc.start();
		jssc.awaitTermination();
	}
 
示例17
public static void main(String[] args) throws Exception {
  if (args.length < 2) {
    System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>");
    System.exit(1);
  }

  StreamingExamples.setStreamingLogLevels();

  // Create the context with a 1 second batch size
  SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount");
  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
  ssc.checkpoint(".");

  // Initial state RDD input to mapWithState
  @SuppressWarnings("unchecked")
  List<Tuple2<String, Integer>> tuples =
      Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
  JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);

  JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
          args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);

  JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterator<String> call(String x) {
      return Arrays.asList(SPACE.split(x)).iterator();
    }
  });

  JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<>(s, 1);
        }
      });

  // Update the cumulative count function
  Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
      new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> call(String word, Optional<Integer> one,
            State<Integer> state) {
          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
          state.update(sum);
          return output;
        }
      };

  // DStream made of get cumulative counts that get updated in every batch
  JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
      wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

  stateDstream.print();
  ssc.start();
  ssc.awaitTermination();
}
 
示例18
/**
 * Compares two HDFS datasets and produces a detailed yet compact HTML break report
 * @param dataName the name to use in the output HTML
 * @param actualDataSupplier the actual data supplier
 * @param expectedDataSupplier the expected data supplier
 * @return a SparkResult containing pass/fail and the HTML report
 */
public SparkResult verify(String dataName, Supplier<DistributedTable> actualDataSupplier, Supplier<DistributedTable> expectedDataSupplier)
{
    DistributedTable actualDistributedTable = actualDataSupplier.get();
    if (!new HashSet<>(actualDistributedTable.getHeaders()).containsAll(this.groupKeyColumns)) {
        throw new IllegalArgumentException("Actual data does not contain all group key columns: " + this.groupKeyColumns);
    }
    DistributedTable expectedDistributedTable = expectedDataSupplier.get();
    if (!new HashSet<>(expectedDistributedTable.getHeaders()).containsAll(this.groupKeyColumns)) {
        throw new IllegalArgumentException("Expected data does not contain all group key columns: " + this.groupKeyColumns);
    }
    PartialResult<BoundedDouble> countApproxPartialResult = expectedDistributedTable.getRows().countApprox(TimeUnit.SECONDS.toMillis(5), 0.9);
    int maximumNumberOfGroups = getMaximumNumberOfGroups(countApproxPartialResult.getFinalValue(), maxGroupSize);
    LOGGER.info("Maximum number of groups : " + maximumNumberOfGroups);
    Set<String> groupKeyColumnSet = new LinkedHashSet<>(this.groupKeyColumns);
    JavaPairRDD<Integer, Iterable<List<Object>>> actualGroups = actualDistributedTable.getRows()
            .mapToPair(new GroupRowsFunction(actualDistributedTable.getHeaders(), groupKeyColumnSet, maximumNumberOfGroups))
            .groupByKey();
    JavaPairRDD<Integer, Iterable<List<Object>>> expectedGroups = expectedDistributedTable.getRows()
            .mapToPair(new GroupRowsFunction(expectedDistributedTable.getHeaders(), groupKeyColumnSet, maximumNumberOfGroups))
            .groupByKey();
    JavaPairRDD<Integer, Tuple2<Optional<Iterable<List<Object>>>, Optional<Iterable<List<Object>>>>> joinedRdd = actualGroups.fullOuterJoin(expectedGroups);
    VerifyGroupFunction verifyGroupFunction = new VerifyGroupFunction(
            groupKeyColumnSet,
            actualDistributedTable.getHeaders(),
            expectedDistributedTable.getHeaders(),
            this.ignoreSurplusColumns,
            this.columnComparatorsBuilder.build(),
            this.columnsToIgnore);
    SummaryResultTable summaryResultTable = joinedRdd.map(verifyGroupFunction).reduce(new SummaryResultTableReducer());
    HtmlOptions htmlOptions = new HtmlOptions(false, HtmlFormatter.DEFAULT_ROW_LIMIT, false, false, false, Collections.emptySet());
    HtmlFormatter htmlFormatter = new HtmlFormatter(null, htmlOptions);
    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
    try
    {
        htmlFormatter.appendResults(dataName, Collections.singletonMap("Summary", summaryResultTable), metadata, 1, null, bytes);
        return new SparkResult(summaryResultTable.isSuccess(), new String(bytes.toByteArray(), StandardCharsets.UTF_8));
    }
    catch (Exception e)
    {
        throw new RuntimeException(e);
    }
}
 
示例19
@Override
public Optional<String> getCheckpointFile() {
  throw new UnsupportedOperationException("Operation not yet implemented.");
}
 
示例20
@Override
public org.apache.spark.api.java.Optional<Partitioner> partitioner() {
  throw new UnsupportedOperationException("Operation not yet implemented.");
}