Java源码示例:org.apache.beam.sdk.metrics.MetricResult
示例1
@Test
@Category(NeedsRunner.class)
public void testWordCountSimple() {
PCollection<KV<String, Long>> pc =
pipeline.apply(Create.of(INPUT_STRS)).apply(new CountWords());
PAssert.that(pc).containsInAnyOrder(KV.of("hello", 2L), KV.of(("world"), 1L));
PipelineResult result = pipeline.run();
result.waitUntilFinish();
Map<String, Long> expectedCounters = new HashMap<>();
expectedCounters.put("emptyLines", 2L);
for (MetricResult c :
result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
String name = c.getName().getName();
if (expectedCounters.containsKey(name)) {
assertEquals(expectedCounters.get(name), c.getCommitted());
expectedCounters.remove(name);
}
}
assertTrue(expectedCounters.isEmpty());
}
示例2
/**
* Return the current value for a long counter, or -1 if can't be retrieved. Note this uses only
* attempted metrics because some runners don't support committed metrics.
*/
public long getCounterMetric(String name) {
MetricQueryResults metrics =
result
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(namespace, name))
.build());
Iterable<MetricResult<Long>> counters = metrics.getCounters();
checkIfMetricResultIsUnique(name, counters);
try {
MetricResult<Long> metricResult = counters.iterator().next();
return metricResult.getAttempted();
} catch (NoSuchElementException e) {
LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
}
return ERRONEOUS_METRIC_VALUE;
}
示例3
private static <T> void mergeCommittedResults(
Map<MetricKey, MetricResult<T>> metricResultMap,
Iterable<MetricUpdate<T>> updates,
BiFunction<T, T, T> combine) {
for (MetricUpdate<T> metricUpdate : updates) {
MetricKey key = metricUpdate.getKey();
MetricResult<T> current = metricResultMap.get(key);
if (current == null) {
throw new IllegalStateException(
String.format(
"%s: existing 'attempted' result not found for 'committed' value %s",
key, metricUpdate.getUpdate()));
}
metricResultMap.put(key, current.addCommitted(metricUpdate.getUpdate(), combine));
}
}
示例4
@Override
public MetricResults metrics() {
return new MetricResults() {
@Override
public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
return new MetricQueryResults() {
@Override
public Iterable<MetricResult<Long>> getCounters() {
return Collections.emptyList();
}
@Override
public Iterable<MetricResult<DistributionResult>> getDistributions() {
return Collections.emptyList();
}
@Override
public Iterable<MetricResult<GaugeResult>> getGauges() {
return Collections.emptyList();
}
};
}
};
}
示例5
@VisibleForTesting
String renderName(MetricResult<?> metricResult) {
MetricKey key = metricResult.getKey();
MetricName name = key.metricName();
String step = key.stepName();
ArrayList<String> pieces = new ArrayList<>();
if (step != null) {
step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
if (step.endsWith("_")) {
step = step.substring(0, step.length() - 1);
}
pieces.add(step);
}
pieces.addAll(
ImmutableList.of(name.getNamespace(), name.getName()).stream()
.map(str -> str.replaceAll(ILLEGAL_CHARACTERS, "_"))
.collect(toList()));
return String.join(".", pieces);
}
示例6
@VisibleForTesting
String renderName(MetricResult<?> metricResult) {
MetricKey key = metricResult.getKey();
MetricName name = key.metricName();
String step = key.stepName();
ArrayList<String> pieces = new ArrayList<>();
if (step != null) {
step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
if (step.endsWith("_")) {
step = step.substring(0, step.length() - 1);
}
pieces.add(step);
}
pieces.addAll(
ImmutableList.of(name.getNamespace(), name.getName()).stream()
.map(str -> str.replaceAll(ILLEGAL_CHARACTERS, "_"))
.collect(toList()));
return String.join(".", pieces);
}
示例7
private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) {
for (MetricResult<DistributionResult> metricResult : distributions) {
String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());
DistributionResult update = metricResult.getAttempted();
// update flink metric
FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName);
if (gauge == null) {
gauge =
runtimeContext
.getMetricGroup()
.gauge(flinkMetricName, new FlinkDistributionGauge(update));
flinkDistributionGaugeCache.put(flinkMetricName, gauge);
} else {
gauge.update(update);
}
}
}
示例8
private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
for (MetricResult<GaugeResult> metricResult : gauges) {
String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());
GaugeResult update = metricResult.getAttempted();
// update flink metric
FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
if (gauge == null) {
gauge = runtimeContext.getMetricGroup().gauge(flinkMetricName, new FlinkGauge(update));
flinkGaugeCache.put(flinkMetricName, gauge);
} else {
gauge.update(update);
}
}
}
示例9
@Override
public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
List<MetricUpdate> metricUpdates;
ImmutableList<MetricResult<Long>> counters = ImmutableList.of();
ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
ImmutableList<MetricResult<GaugeResult>> gauges = ImmutableList.of();
JobMetrics jobMetrics;
try {
jobMetrics = getJobMetrics();
} catch (IOException e) {
LOG.warn("Unable to query job metrics.\n");
return MetricQueryResults.create(counters, distributions, gauges);
}
metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList());
return populateMetricQueryResults(metricUpdates, filter);
}
示例10
private static <T> String createNormalizedMetricName(
MetricResult<T> metric,
String metricType,
String valueType,
CommittedOrAttemped committedOrAttemped) {
String metricName =
String.format(
"beam.%s.%s.%s.%s.%s",
metricType,
metric.getName().getNamespace(),
metric.getName().getName(),
committedOrAttemped,
valueType);
return WHITESPACE.matcher(metricName).replaceAll(SPACE_REPLACEMENT);
}
示例11
@Override
public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) {
maybeExtractResult(filter, counterResults, counter);
}
ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
ImmutableList.builder();
for (Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> distribution :
distributions.entries()) {
maybeExtractResult(filter, distributionResults, distribution);
}
ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
for (Entry<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauge : gauges.entries()) {
maybeExtractResult(filter, gaugeResults, gauge);
}
return MetricQueryResults.create(
counterResults.build(), distributionResults.build(), gaugeResults.build());
}
示例12
private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) {
for (MetricResult<DistributionResult> metricResult : distributions) {
if (!isUserMetric(metricResult)) {
continue;
}
// get identifier
String flinkMetricIdentifier = getFlinkMetricIdentifierString(metricResult.getKey());
DistributionResult update = metricResult.getAttempted();
// update flink metric
FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricIdentifier);
if (gauge == null) {
MetricGroup metricGroup =
registerMetricGroup(metricResult.getKey(), baseMetricGroup);
gauge = metricGroup.gauge(
metricResult.getKey().metricName().getName(),
new FlinkDistributionGauge(update));
flinkDistributionGaugeCache.put(flinkMetricIdentifier, gauge);
} else {
gauge.update(update);
}
}
}
示例13
private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
for (MetricResult<GaugeResult> metricResult : gauges) {
if (!isUserMetric(metricResult)) {
continue;
}
// get identifier
String flinkMetricIdentifier = getFlinkMetricIdentifierString(metricResult.getKey());
GaugeResult update = metricResult.getAttempted();
// update flink metric
FlinkGauge gauge = flinkGaugeCache.get(flinkMetricIdentifier);
if (gauge == null) {
MetricGroup metricGroup = registerMetricGroup(metricResult.getKey(), baseMetricGroup);
gauge = metricGroup.gauge(
metricResult.getKey().metricName().getName(),
new FlinkGauge(update));
flinkGaugeCache.put(flinkMetricIdentifier, gauge);
} else {
gauge.update(update);
}
}
}
示例14
@Ignore
public void createRunPipeline( PipelineMeta pipelineMeta ) throws Exception {
/*
FileOutputStream fos = new FileOutputStream( "/tmp/"+pipelineMeta.getName()+".ktr" );
fos.write( pipelineMeta.getXML().getBytes() );
fos.close();
*/
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
pipelineOptions.setJobName( pipelineMeta.getName() );
pipelineOptions.setUserAgent( BeamConst.STRING_HOP_BEAM );
BeamDirectPipelineRunConfiguration beamRunConfig = new BeamDirectPipelineRunConfiguration();
beamRunConfig.setTempLocation( System.getProperty( "java.io.tmpdir" ) );
// No extra plugins to load : null option
HopPipelineMetaToBeamPipelineConverter converter = new HopPipelineMetaToBeamPipelineConverter( pipelineMeta, metadataProvider, beamRunConfig );
Pipeline pipeline = converter.createPipeline();
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
MetricResults metricResults = pipelineResult.metrics();
MetricQueryResults allResults = metricResults.queryMetrics( MetricsFilter.builder().build() );
for ( MetricResult<Long> result : allResults.getCounters() ) {
System.out.println( "Name: " + result.getName() + " Attempted: " + result.getAttempted() );
}
}
示例15
private void logMetrics( PipelineResult pipelineResult ) {
MetricResults metricResults = pipelineResult.metrics();
logChannel.logBasic( " ----------------- Metrics refresh @ " + new SimpleDateFormat( "yyyy/MM/dd HH:mm:ss" ).format( new Date() ) + " -----------------------" );
MetricQueryResults allResults = metricResults.queryMetrics( MetricsFilter.builder().build() );
for ( MetricResult<Long> result : allResults.getCounters() ) {
logChannel.logBasic( "Name: " + result.getName() + " Attempted: " + result.getAttempted() );
}
}
示例16
@Ignore
public void createRunPipeline( TransMeta transMeta ) throws Exception {
/*
FileOutputStream fos = new FileOutputStream( "/tmp/"+transMeta.getName()+".ktr" );
fos.write( transMeta.getXML().getBytes() );
fos.close();
*/
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
pipelineOptions.setJobName( transMeta.getName() );
pipelineOptions.setUserAgent( BeamConst.STRING_KETTLE_BEAM );
BeamJobConfig jobConfig = new BeamJobConfig();
jobConfig.setName("Direct runner test");
jobConfig.setRunnerTypeName( RunnerType.Direct.name() );
// No extra plugins to load : null option
TransMetaPipelineConverter converter = new TransMetaPipelineConverter( transMeta, metaStore, (String) null, jobConfig );
Pipeline pipeline = converter.createPipeline( pipelineOptions );
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
MetricResults metricResults = pipelineResult.metrics();
MetricQueryResults allResults = metricResults.queryMetrics( MetricsFilter.builder().build() );
for ( MetricResult<Long> result : allResults.getCounters() ) {
System.out.println( "Name: " + result.getName() + " Attempted: " + result.getAttempted() );
}
}
示例17
public static Map<String, Long> getMetrics(final PipelineResult result) {
final MetricQueryResults metricQueryResults =
result.metrics().queryMetrics(MetricsFilter.builder().build());
final Map<String, Long> gauges =
StreamSupport.stream(metricQueryResults.getGauges().spliterator(), false)
.collect(
Collectors.groupingBy(
MetricResult::getName,
Collectors.reducing(
GaugeResult.empty(),
GET_COMMITTED_GAUGE,
BinaryOperator.maxBy(Comparator.comparing(GaugeResult::getTimestamp)))))
.entrySet()
.stream()
.collect(Collectors.toMap(e -> e.getKey().getName(), e -> e.getValue().getValue()));
final Map<String, Long> counters =
StreamSupport.stream(metricQueryResults.getCounters().spliterator(), false)
.collect(
Collectors.groupingBy(
m -> m.getName().getName(), Collectors.summingLong(GET_COMMITTED_COUNTER)));
Map<String, Long> ret = new HashMap<>();
ret.putAll(gauges);
ret.putAll(counters);
addCalculatedMetrics(counters, ret);
return Collections.unmodifiableMap(ret);
}
示例18
private Long getLowestMin(Iterable<MetricResult<DistributionResult>> distributions) {
Optional<Long> lowestMin =
StreamSupport.stream(distributions.spliterator(), true)
.map(element -> element.getAttempted().getMin())
.filter(this::isCredible)
.min(Long::compareTo);
return lowestMin.orElse(ERRONEOUS_METRIC_VALUE);
}
示例19
private Long getGreatestMax(Iterable<MetricResult<DistributionResult>> distributions) {
Optional<Long> greatestMax =
StreamSupport.stream(distributions.spliterator(), true)
.map(element -> element.getAttempted().getMax())
.filter(this::isCredible)
.max(Long::compareTo);
return greatestMax.orElse(ERRONEOUS_METRIC_VALUE);
}
示例20
private Iterable<MetricResult<DistributionResult>> getDistributions(String name) {
MetricQueryResults metrics =
result
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(namespace, name))
.build());
return metrics.getDistributions();
}
示例21
private <T> void checkIfMetricResultIsUnique(String name, Iterable<MetricResult<T>> metricResult)
throws IllegalStateException {
int resultCount = Iterables.size(metricResult);
Preconditions.checkState(
resultCount <= 1,
"More than one metric result matches name: %s in namespace %s. Metric results count: %s",
name,
namespace,
resultCount);
}
示例22
/**
* Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful.
*
* <p>Note this only runs for runners which support Metrics. Runners which do not should verify
* this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001
*/
public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) {
if (MetricsEnvironment.isMetricsSupported()) {
long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline);
long successfulAssertions = 0;
Iterable<MetricResult<Long>> successCounterResults =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
.build())
.getCounters();
for (MetricResult<Long> counter : successCounterResults) {
if (counter.getAttempted() > 0) {
successfulAssertions++;
}
}
assertThat(
String.format(
"Expected %d successful assertions, but found %d.",
expectedNumberOfAssertions, successfulAssertions),
successfulAssertions,
is(expectedNumberOfAssertions));
}
}
示例23
@SuppressWarnings("ConstantConditions")
private static <T> void mergeAttemptedResults(
Map<MetricKey, MetricResult<T>> metricResultMap,
Iterable<MetricUpdate<T>> updates,
BiFunction<T, T, T> combine) {
for (MetricUpdate<T> metricUpdate : updates) {
MetricKey key = metricUpdate.getKey();
MetricResult<T> current = metricResultMap.get(key);
if (current == null) {
metricResultMap.put(key, MetricResult.attempted(key, metricUpdate.getUpdate()));
} else {
metricResultMap.put(key, current.addAttempted(metricUpdate.getUpdate(), combine));
}
}
}
示例24
public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
}
示例25
public static long getCounterValue(String counterName) {
for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
if (metricResult.getName().getName().equals(counterName)) {
return metricResult.getAttempted();
}
}
return 0L;
}
示例26
public static List<MetricResult<Long>> getSystemCounters() {
List<MetricResult<Long>> counters =
StreamSupport.stream(metricQueryResults.getCounters().spliterator(), false)
.filter(result -> result.getKey().metricName().getName().contains(SYSTEM_METRIC_PREFIX))
.collect(Collectors.toList());
return counters;
}
示例27
private PortableMetrics(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
}
示例28
private static PortableMetrics convertMonitoringInfosToMetricResults(
JobApi.MetricResults jobMetrics) {
List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
monitoringInfoList.addAll(jobMetrics.getAttemptedList());
monitoringInfoList.addAll(jobMetrics.getCommittedList());
Iterable<MetricResult<Long>> countersFromJobMetrics =
extractCountersFromJobMetrics(monitoringInfoList);
Iterable<MetricResult<DistributionResult>> distributionsFromMetrics =
extractDistributionMetricsFromJobMetrics(monitoringInfoList);
Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
extractGaugeMetricsFromJobMetrics(monitoringInfoList);
return new PortableMetrics(countersFromJobMetrics, distributionsFromMetrics, gaugesFromMetrics);
}
示例29
private static Iterable<MetricResult<DistributionResult>>
extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> monitoringInfoList) {
return monitoringInfoList.stream()
.filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
.collect(Collectors.toList());
}
示例30
private static Iterable<MetricResult<GaugeResult>> extractGaugeMetricsFromJobMetrics(
List<MetricsApi.MonitoringInfo> monitoringInfoList) {
return monitoringInfoList.stream()
.filter(item -> LATEST_INT64_TYPE.equals(item.getType()))
.filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
.map(PortableMetrics::convertGaugeMonitoringInfoToGauge)
.collect(Collectors.toList());
}