Java源码示例:org.apache.spark.partial.BoundedDouble
示例1
static int getMaximumNumberOfGroups(BoundedDouble approxCountBoundedDouble, int maxGroupSize)
{
long countApprox = Math.round(approxCountBoundedDouble.mean());
LOGGER.info("Approximate count of expected results: " + countApprox);
LOGGER.info("Maximum group size: " + maxGroupSize);
long maximumNumberOfGroups = Math.max(1, countApprox / maxGroupSize);
if (maximumNumberOfGroups > Integer.MAX_VALUE) {
throw new IllegalStateException("Invalid max group size: " + maximumNumberOfGroups);
}
return (int) maximumNumberOfGroups;
}
示例2
@Override
public PartialResult<BoundedDouble> countApprox(final long timeout) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例3
@Override
public PartialResult<BoundedDouble> countApprox(final long timeout, final double confidence) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例4
@Override
public PartialResult<Map<T, BoundedDouble>> countByValueApprox(final long timeout) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例5
@Override
public PartialResult<Map<T, BoundedDouble>> countByValueApprox(final long timeout, final double confidence) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例6
@Override
public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例7
@Override
public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout,
final double confidence) {
throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
示例8
/**
* Compares two HDFS datasets and produces a detailed yet compact HTML break report
* @param dataName the name to use in the output HTML
* @param actualDataSupplier the actual data supplier
* @param expectedDataSupplier the expected data supplier
* @return a SparkResult containing pass/fail and the HTML report
*/
public SparkResult verify(String dataName, Supplier<DistributedTable> actualDataSupplier, Supplier<DistributedTable> expectedDataSupplier)
{
DistributedTable actualDistributedTable = actualDataSupplier.get();
if (!new HashSet<>(actualDistributedTable.getHeaders()).containsAll(this.groupKeyColumns)) {
throw new IllegalArgumentException("Actual data does not contain all group key columns: " + this.groupKeyColumns);
}
DistributedTable expectedDistributedTable = expectedDataSupplier.get();
if (!new HashSet<>(expectedDistributedTable.getHeaders()).containsAll(this.groupKeyColumns)) {
throw new IllegalArgumentException("Expected data does not contain all group key columns: " + this.groupKeyColumns);
}
PartialResult<BoundedDouble> countApproxPartialResult = expectedDistributedTable.getRows().countApprox(TimeUnit.SECONDS.toMillis(5), 0.9);
int maximumNumberOfGroups = getMaximumNumberOfGroups(countApproxPartialResult.getFinalValue(), maxGroupSize);
LOGGER.info("Maximum number of groups : " + maximumNumberOfGroups);
Set<String> groupKeyColumnSet = new LinkedHashSet<>(this.groupKeyColumns);
JavaPairRDD<Integer, Iterable<List<Object>>> actualGroups = actualDistributedTable.getRows()
.mapToPair(new GroupRowsFunction(actualDistributedTable.getHeaders(), groupKeyColumnSet, maximumNumberOfGroups))
.groupByKey();
JavaPairRDD<Integer, Iterable<List<Object>>> expectedGroups = expectedDistributedTable.getRows()
.mapToPair(new GroupRowsFunction(expectedDistributedTable.getHeaders(), groupKeyColumnSet, maximumNumberOfGroups))
.groupByKey();
JavaPairRDD<Integer, Tuple2<Optional<Iterable<List<Object>>>, Optional<Iterable<List<Object>>>>> joinedRdd = actualGroups.fullOuterJoin(expectedGroups);
VerifyGroupFunction verifyGroupFunction = new VerifyGroupFunction(
groupKeyColumnSet,
actualDistributedTable.getHeaders(),
expectedDistributedTable.getHeaders(),
this.ignoreSurplusColumns,
this.columnComparatorsBuilder.build(),
this.columnsToIgnore);
SummaryResultTable summaryResultTable = joinedRdd.map(verifyGroupFunction).reduce(new SummaryResultTableReducer());
HtmlOptions htmlOptions = new HtmlOptions(false, HtmlFormatter.DEFAULT_ROW_LIMIT, false, false, false, Collections.emptySet());
HtmlFormatter htmlFormatter = new HtmlFormatter(null, htmlOptions);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try
{
htmlFormatter.appendResults(dataName, Collections.singletonMap("Summary", summaryResultTable), metadata, 1, null, bytes);
return new SparkResult(summaryResultTable.isSuccess(), new String(bytes.toByteArray(), StandardCharsets.UTF_8));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
示例9
@Override
public PartialResult<BoundedDouble> countApprox(final long timeout) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
示例10
@Override
public PartialResult<BoundedDouble> countApprox(final long timeout, final double confidence) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
示例11
@Override
public PartialResult<Map<T, BoundedDouble>> countByValueApprox(final long timeout) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
示例12
@Override
public PartialResult<Map<T, BoundedDouble>> countByValueApprox(final long timeout, final double confidence) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}