Java源码示例:org.apache.spark.api.java.Optional
示例1
@Override
public SummaryResultTable call(Tuple2<Integer, Tuple2<Optional<Iterable<List<Object>>>, Optional<Iterable<List<Object>>>>> v1)
{
Integer shardNumber = v1._1();
Optional<Iterable<List<Object>>> actualOptional = v1._2()._1();
Optional<Iterable<List<Object>>> expectedOptional = v1._2()._2();
Iterable<List<Object>> actualRows = actualOptional.isPresent() ? actualOptional.get() : Collections.emptyList();
Iterable<List<Object>> expectedRows = expectedOptional.isPresent() ? expectedOptional.get() : Collections.emptyList();
VerifiableTable actualTable = getVerifiableTable(actualRows, this.actualColumns);
VerifiableTable expectedTable = getVerifiableTable(expectedRows, this.expectedColumns);
IndexMapTableVerifier singleSingleTableVerifier = new IndexMapTableVerifier(
this.columnComparators,
false,
IndexMapTableVerifier.DEFAULT_BEST_MATCH_THRESHOLD,
false,
false,
this.ignoreSurplusColumns,
false,
0);
ResultTable resultTable = singleSingleTableVerifier.verify(actualTable, expectedTable);
LOGGER.info("Verification of shard {} {}", shardNumber, resultTable.isSuccess() ? "PASSED" : "FAILED");
return new SummaryResultTable(resultTable);
}
示例2
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir", "E:\\hadoop");
SparkConf sparkConf = new SparkConf().setAppName("WordCountSocketEx").setMaster("local[*]");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
streamingContext.checkpoint("E:\\hadoop\\checkpoint");
// Initial state RDD input to mapWithState
@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> tuples =Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);
JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream( "10.0.75.1", Integer.parseInt("9000"), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = StreamingLines.flatMap( str -> Arrays.asList(str.split(" ")).iterator() );
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str-> new Tuple2<>(str, 1)).reduceByKey((count1,count2) ->count1+count2 );
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
stateDstream.print();
streamingContext.start();
streamingContext.awaitTermination();
}
示例3
protected static JavaStreamingContext createContext(String ip, int port, String checkpointDirectory) {
SparkConf sparkConf = new SparkConf().setAppName("WordCountRecoverableEx").setMaster("local[*]");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
streamingContext.checkpoint(checkpointDirectory);
// Initial state RDD input to mapWithState
@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
JavaPairRDD<String, Integer> initialRDD = streamingContext.sparkContext().parallelizePairs(tuples);
JavaReceiverInputDStream<String> StreamingLines = streamingContext.socketTextStream(ip,port, StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = StreamingLines.flatMap(str -> Arrays.asList(str.split(" ")).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(str -> new Tuple2<>(str, 1))
.reduceByKey((count1, count2) -> count1 + count2);
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream = wordCounts
.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
stateDstream.print();
return streamingContext;
}
示例4
/**
* Tagging for global index should only consider the record key.
*/
@Override
protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
JavaPairRDD<HoodieKey, HoodieRecordLocation> keyLocationPairRDD, JavaRDD<HoodieRecord<T>> recordRDD) {
JavaPairRDD<String, HoodieRecord<T>> incomingRowKeyRecordPairRDD =
recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
JavaPairRDD<String, Tuple2<HoodieRecordLocation, HoodieKey>> existingRecordKeyToRecordLocationHoodieKeyMap =
keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record -> {
final HoodieRecord<T> hoodieRecord = record._1;
final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
if (recordLocationHoodieKeyPair.isPresent()) {
// Record key matched to file
if (config.getBloomIndexUpdatePartitionPath()
&& !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
// Create an empty record to delete the record in the old partition
HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
new EmptyHoodieRecordPayload());
// Tag the incoming record for inserting to the new partition
HoodieRecord<T> taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
return Arrays.asList(emptyRecord, taggedRecord).iterator();
} else {
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
// When it differs, the record will still be updated at its old partition.
return Collections.singletonList(
(HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
}
} else {
return Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty())).iterator();
}
});
}
示例5
@Override
public Optional<String> getCheckpointFile() {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例6
@Override
public org.apache.spark.api.java.Optional<Partitioner> partitioner() {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例7
@Override
public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
final Partitioner partitioner) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例8
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
final Partitioner partitioner) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例9
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
final Partitioner partitioner) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例10
@Override
public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例11
@Override
public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
final int numPartitions) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例12
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例13
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
final int numPartitions) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例14
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例15
@Override
public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
final int numPartitions) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例16
public static void main(String[] args) throws InterruptedException {
System.setProperty("hadoop.home.dir", "C:\\softwares\\Winutils");
SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Stateful Streaming Example")
.config("spark.sql.warehouse.dir", "file:////C:/Users/sgulati/spark-warehouse").getOrCreate();
JavaStreamingContext jssc= new JavaStreamingContext(new JavaSparkContext(sparkSession.sparkContext()),
Durations.milliseconds(1000));
JavaReceiverInputDStream<String> inStream = jssc.socketTextStream("10.204.136.223", 9999);
jssc.checkpoint("C:\\Users\\sgulati\\spark-checkpoint");
JavaDStream<FlightDetails> flightDetailsStream = inStream.map(x -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(x, FlightDetails.class);
});
JavaPairDStream<String, FlightDetails> flightDetailsPairStream = flightDetailsStream
.mapToPair(f -> new Tuple2<String, FlightDetails>(f.getFlightId(), f));
Function3<String, Optional<FlightDetails>, State<List<FlightDetails>>, Tuple2<String, Double>> mappingFunc = (
flightId, curFlightDetail, state) -> {
List<FlightDetails> details = state.exists() ? state.get() : new ArrayList<>();
boolean isLanded = false;
if (curFlightDetail.isPresent()) {
details.add(curFlightDetail.get());
if (curFlightDetail.get().isLanded()) {
isLanded = true;
}
}
Double avgSpeed = details.stream().mapToDouble(f -> f.getTemperature()).average().orElse(0.0);
if (isLanded) {
state.remove();
} else {
state.update(details);
}
return new Tuple2<String, Double>(flightId, avgSpeed);
};
JavaMapWithStateDStream<String, FlightDetails, List<FlightDetails>, Tuple2<String, Double>> streamWithState = flightDetailsPairStream
.mapWithState(StateSpec.function(mappingFunc).timeout(Durations.minutes(5)));
streamWithState.print();
jssc.start();
jssc.awaitTermination();
}
示例17
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint(".");
// Initial state RDD input to mapWithState
@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> tuples =
Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
// Update the cumulative count function
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
// DStream made of get cumulative counts that get updated in every batch
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
stateDstream.print();
ssc.start();
ssc.awaitTermination();
}
示例18
/**
* Compares two HDFS datasets and produces a detailed yet compact HTML break report
* @param dataName the name to use in the output HTML
* @param actualDataSupplier the actual data supplier
* @param expectedDataSupplier the expected data supplier
* @return a SparkResult containing pass/fail and the HTML report
*/
public SparkResult verify(String dataName, Supplier<DistributedTable> actualDataSupplier, Supplier<DistributedTable> expectedDataSupplier)
{
DistributedTable actualDistributedTable = actualDataSupplier.get();
if (!new HashSet<>(actualDistributedTable.getHeaders()).containsAll(this.groupKeyColumns)) {
throw new IllegalArgumentException("Actual data does not contain all group key columns: " + this.groupKeyColumns);
}
DistributedTable expectedDistributedTable = expectedDataSupplier.get();
if (!new HashSet<>(expectedDistributedTable.getHeaders()).containsAll(this.groupKeyColumns)) {
throw new IllegalArgumentException("Expected data does not contain all group key columns: " + this.groupKeyColumns);
}
PartialResult<BoundedDouble> countApproxPartialResult = expectedDistributedTable.getRows().countApprox(TimeUnit.SECONDS.toMillis(5), 0.9);
int maximumNumberOfGroups = getMaximumNumberOfGroups(countApproxPartialResult.getFinalValue(), maxGroupSize);
LOGGER.info("Maximum number of groups : " + maximumNumberOfGroups);
Set<String> groupKeyColumnSet = new LinkedHashSet<>(this.groupKeyColumns);
JavaPairRDD<Integer, Iterable<List<Object>>> actualGroups = actualDistributedTable.getRows()
.mapToPair(new GroupRowsFunction(actualDistributedTable.getHeaders(), groupKeyColumnSet, maximumNumberOfGroups))
.groupByKey();
JavaPairRDD<Integer, Iterable<List<Object>>> expectedGroups = expectedDistributedTable.getRows()
.mapToPair(new GroupRowsFunction(expectedDistributedTable.getHeaders(), groupKeyColumnSet, maximumNumberOfGroups))
.groupByKey();
JavaPairRDD<Integer, Tuple2<Optional<Iterable<List<Object>>>, Optional<Iterable<List<Object>>>>> joinedRdd = actualGroups.fullOuterJoin(expectedGroups);
VerifyGroupFunction verifyGroupFunction = new VerifyGroupFunction(
groupKeyColumnSet,
actualDistributedTable.getHeaders(),
expectedDistributedTable.getHeaders(),
this.ignoreSurplusColumns,
this.columnComparatorsBuilder.build(),
this.columnsToIgnore);
SummaryResultTable summaryResultTable = joinedRdd.map(verifyGroupFunction).reduce(new SummaryResultTableReducer());
HtmlOptions htmlOptions = new HtmlOptions(false, HtmlFormatter.DEFAULT_ROW_LIMIT, false, false, false, Collections.emptySet());
HtmlFormatter htmlFormatter = new HtmlFormatter(null, htmlOptions);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try
{
htmlFormatter.appendResults(dataName, Collections.singletonMap("Summary", summaryResultTable), metadata, 1, null, bytes);
return new SparkResult(summaryResultTable.isSuccess(), new String(bytes.toByteArray(), StandardCharsets.UTF_8));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
示例19
@Override
public Optional<String> getCheckpointFile() {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
示例20
@Override
public org.apache.spark.api.java.Optional<Partitioner> partitioner() {
throw new UnsupportedOperationException("Operation not yet implemented.");
}