Java源码示例:org.apache.beam.sdk.metrics.MetricResult

示例1
@Test
@Category(NeedsRunner.class)
public void testWordCountSimple() {
  PCollection<KV<String, Long>> pc =
      pipeline.apply(Create.of(INPUT_STRS)).apply(new CountWords());
  PAssert.that(pc).containsInAnyOrder(KV.of("hello", 2L), KV.of(("world"), 1L));
  PipelineResult result = pipeline.run();
  result.waitUntilFinish();

  Map<String, Long> expectedCounters = new HashMap<>();
  expectedCounters.put("emptyLines", 2L);
  for (MetricResult c :
      result.metrics().queryMetrics(MetricsFilter.builder().build()).getCounters()) {
    String name = c.getName().getName();
    if (expectedCounters.containsKey(name)) {
      assertEquals(expectedCounters.get(name), c.getCommitted());
      expectedCounters.remove(name);
    }
  }
  assertTrue(expectedCounters.isEmpty());
}
 
示例2
/**
 * Return the current value for a long counter, or -1 if can't be retrieved. Note this uses only
 * attempted metrics because some runners don't support committed metrics.
 */
public long getCounterMetric(String name) {
  MetricQueryResults metrics =
      result
          .metrics()
          .queryMetrics(
              MetricsFilter.builder()
                  .addNameFilter(MetricNameFilter.named(namespace, name))
                  .build());
  Iterable<MetricResult<Long>> counters = metrics.getCounters();

  checkIfMetricResultIsUnique(name, counters);

  try {
    MetricResult<Long> metricResult = counters.iterator().next();
    return metricResult.getAttempted();
  } catch (NoSuchElementException e) {
    LOG.error("Failed to get metric {}, from namespace {}", name, namespace);
  }
  return ERRONEOUS_METRIC_VALUE;
}
 
示例3
private static <T> void mergeCommittedResults(
    Map<MetricKey, MetricResult<T>> metricResultMap,
    Iterable<MetricUpdate<T>> updates,
    BiFunction<T, T, T> combine) {
  for (MetricUpdate<T> metricUpdate : updates) {
    MetricKey key = metricUpdate.getKey();
    MetricResult<T> current = metricResultMap.get(key);
    if (current == null) {
      throw new IllegalStateException(
          String.format(
              "%s: existing 'attempted' result not found for 'committed' value %s",
              key, metricUpdate.getUpdate()));
    }
    metricResultMap.put(key, current.addCommitted(metricUpdate.getUpdate(), combine));
  }
}
 
示例4
@Override
public MetricResults metrics() {
  return new MetricResults() {
    @Override
    public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
      return new MetricQueryResults() {
        @Override
        public Iterable<MetricResult<Long>> getCounters() {
          return Collections.emptyList();
        }

        @Override
        public Iterable<MetricResult<DistributionResult>> getDistributions() {
          return Collections.emptyList();
        }

        @Override
        public Iterable<MetricResult<GaugeResult>> getGauges() {
          return Collections.emptyList();
        }
      };
    }
  };
}
 
示例5
@VisibleForTesting
String renderName(MetricResult<?> metricResult) {
  MetricKey key = metricResult.getKey();
  MetricName name = key.metricName();
  String step = key.stepName();

  ArrayList<String> pieces = new ArrayList<>();

  if (step != null) {
    step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
    if (step.endsWith("_")) {
      step = step.substring(0, step.length() - 1);
    }
    pieces.add(step);
  }

  pieces.addAll(
      ImmutableList.of(name.getNamespace(), name.getName()).stream()
          .map(str -> str.replaceAll(ILLEGAL_CHARACTERS, "_"))
          .collect(toList()));

  return String.join(".", pieces);
}
 
示例6
@VisibleForTesting
String renderName(MetricResult<?> metricResult) {
  MetricKey key = metricResult.getKey();
  MetricName name = key.metricName();
  String step = key.stepName();

  ArrayList<String> pieces = new ArrayList<>();

  if (step != null) {
    step = step.replaceAll(ILLEGAL_CHARACTERS, "_");
    if (step.endsWith("_")) {
      step = step.substring(0, step.length() - 1);
    }
    pieces.add(step);
  }

  pieces.addAll(
      ImmutableList.of(name.getNamespace(), name.getName()).stream()
          .map(str -> str.replaceAll(ILLEGAL_CHARACTERS, "_"))
          .collect(toList()));

  return String.join(".", pieces);
}
 
示例7
private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) {
  for (MetricResult<DistributionResult> metricResult : distributions) {
    String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());

    DistributionResult update = metricResult.getAttempted();

    // update flink metric
    FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName);
    if (gauge == null) {
      gauge =
          runtimeContext
              .getMetricGroup()
              .gauge(flinkMetricName, new FlinkDistributionGauge(update));
      flinkDistributionGaugeCache.put(flinkMetricName, gauge);
    } else {
      gauge.update(update);
    }
  }
}
 
示例8
private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
  for (MetricResult<GaugeResult> metricResult : gauges) {
    String flinkMetricName = getFlinkMetricNameString(metricResult.getKey());

    GaugeResult update = metricResult.getAttempted();

    // update flink metric
    FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
    if (gauge == null) {
      gauge = runtimeContext.getMetricGroup().gauge(flinkMetricName, new FlinkGauge(update));
      flinkGaugeCache.put(flinkMetricName, gauge);
    } else {
      gauge.update(update);
    }
  }
}
 
示例9
@Override
public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
  List<MetricUpdate> metricUpdates;
  ImmutableList<MetricResult<Long>> counters = ImmutableList.of();
  ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
  ImmutableList<MetricResult<GaugeResult>> gauges = ImmutableList.of();
  JobMetrics jobMetrics;
  try {
    jobMetrics = getJobMetrics();
  } catch (IOException e) {
    LOG.warn("Unable to query job metrics.\n");
    return MetricQueryResults.create(counters, distributions, gauges);
  }
  metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList());
  return populateMetricQueryResults(metricUpdates, filter);
}
 
示例10
private static <T> String createNormalizedMetricName(
    MetricResult<T> metric,
    String metricType,
    String valueType,
    CommittedOrAttemped committedOrAttemped) {
  String metricName =
      String.format(
          "beam.%s.%s.%s.%s.%s",
          metricType,
          metric.getName().getNamespace(),
          metric.getName().getName(),
          committedOrAttemped,
          valueType);

  return WHITESPACE.matcher(metricName).replaceAll(SPACE_REPLACEMENT);
}
 
示例11
@Override
public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
  ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
  for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) {
    maybeExtractResult(filter, counterResults, counter);
  }
  ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
      ImmutableList.builder();
  for (Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> distribution :
      distributions.entries()) {
    maybeExtractResult(filter, distributionResults, distribution);
  }
  ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
  for (Entry<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauge : gauges.entries()) {
    maybeExtractResult(filter, gaugeResults, gauge);
  }

  return MetricQueryResults.create(
      counterResults.build(), distributionResults.build(), gaugeResults.build());
}
 
示例12
private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) {
	for (MetricResult<DistributionResult> metricResult : distributions) {
		if (!isUserMetric(metricResult)) {
			continue;
		}
		// get identifier
		String flinkMetricIdentifier = getFlinkMetricIdentifierString(metricResult.getKey());
		DistributionResult update = metricResult.getAttempted();

		// update flink metric
		FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricIdentifier);
		if (gauge == null) {
			MetricGroup metricGroup =
				registerMetricGroup(metricResult.getKey(), baseMetricGroup);
			gauge = metricGroup.gauge(
				metricResult.getKey().metricName().getName(),
				new FlinkDistributionGauge(update));
			flinkDistributionGaugeCache.put(flinkMetricIdentifier, gauge);
		} else {
			gauge.update(update);
		}
	}
}
 
示例13
private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
	for (MetricResult<GaugeResult> metricResult : gauges) {
		if (!isUserMetric(metricResult)) {
			continue;
		}
		// get identifier
		String flinkMetricIdentifier = getFlinkMetricIdentifierString(metricResult.getKey());

		GaugeResult update = metricResult.getAttempted();

		// update flink metric
		FlinkGauge gauge = flinkGaugeCache.get(flinkMetricIdentifier);
		if (gauge == null) {
			MetricGroup metricGroup = registerMetricGroup(metricResult.getKey(), baseMetricGroup);
			gauge = metricGroup.gauge(
				metricResult.getKey().metricName().getName(),
				new FlinkGauge(update));
			flinkGaugeCache.put(flinkMetricIdentifier, gauge);
		} else {
			gauge.update(update);
		}
	}
}
 
示例14
@Ignore
public void createRunPipeline( PipelineMeta pipelineMeta ) throws Exception {

  /*
  FileOutputStream fos = new FileOutputStream( "/tmp/"+pipelineMeta.getName()+".ktr" );
  fos.write( pipelineMeta.getXML().getBytes() );
  fos.close();
  */

  PipelineOptions pipelineOptions = PipelineOptionsFactory.create();

  pipelineOptions.setJobName( pipelineMeta.getName() );
  pipelineOptions.setUserAgent( BeamConst.STRING_HOP_BEAM );

  BeamDirectPipelineRunConfiguration beamRunConfig = new BeamDirectPipelineRunConfiguration();
  beamRunConfig.setTempLocation( System.getProperty( "java.io.tmpdir" ) );

  // No extra plugins to load : null option
  HopPipelineMetaToBeamPipelineConverter converter = new HopPipelineMetaToBeamPipelineConverter( pipelineMeta, metadataProvider, beamRunConfig );
  Pipeline pipeline = converter.createPipeline();

  PipelineResult pipelineResult = pipeline.run();
  pipelineResult.waitUntilFinish();

  MetricResults metricResults = pipelineResult.metrics();

  MetricQueryResults allResults = metricResults.queryMetrics( MetricsFilter.builder().build() );
  for ( MetricResult<Long> result : allResults.getCounters() ) {
    System.out.println( "Name: " + result.getName() + " Attempted: " + result.getAttempted() );
  }
}
 
示例15
private void logMetrics( PipelineResult pipelineResult ) {
  MetricResults metricResults = pipelineResult.metrics();

  logChannel.logBasic( "  ----------------- Metrics refresh @ " + new SimpleDateFormat( "yyyy/MM/dd HH:mm:ss" ).format( new Date() ) + " -----------------------" );

  MetricQueryResults allResults = metricResults.queryMetrics( MetricsFilter.builder().build() );
  for ( MetricResult<Long> result : allResults.getCounters() ) {
    logChannel.logBasic( "Name: " + result.getName() + " Attempted: " + result.getAttempted() );
  }
}
 
示例16
@Ignore
public void createRunPipeline( TransMeta transMeta ) throws Exception {

  /*
  FileOutputStream fos = new FileOutputStream( "/tmp/"+transMeta.getName()+".ktr" );
  fos.write( transMeta.getXML().getBytes() );
  fos.close();
  */

  PipelineOptions pipelineOptions = PipelineOptionsFactory.create();

  pipelineOptions.setJobName( transMeta.getName() );
  pipelineOptions.setUserAgent( BeamConst.STRING_KETTLE_BEAM );

  BeamJobConfig jobConfig = new BeamJobConfig();
  jobConfig.setName("Direct runner test");
  jobConfig.setRunnerTypeName( RunnerType.Direct.name() );

  // No extra plugins to load : null option
  TransMetaPipelineConverter converter = new TransMetaPipelineConverter( transMeta, metaStore, (String) null, jobConfig );
  Pipeline pipeline = converter.createPipeline( pipelineOptions );

  PipelineResult pipelineResult = pipeline.run();
  pipelineResult.waitUntilFinish();

  MetricResults metricResults = pipelineResult.metrics();

  MetricQueryResults allResults = metricResults.queryMetrics( MetricsFilter.builder().build() );
  for ( MetricResult<Long> result : allResults.getCounters() ) {
    System.out.println( "Name: " + result.getName() + " Attempted: " + result.getAttempted() );
  }
}
 
示例17
public static Map<String, Long> getMetrics(final PipelineResult result) {
  final MetricQueryResults metricQueryResults =
      result.metrics().queryMetrics(MetricsFilter.builder().build());

  final Map<String, Long> gauges =
      StreamSupport.stream(metricQueryResults.getGauges().spliterator(), false)
          .collect(
              Collectors.groupingBy(
                  MetricResult::getName,
                  Collectors.reducing(
                      GaugeResult.empty(),
                      GET_COMMITTED_GAUGE,
                      BinaryOperator.maxBy(Comparator.comparing(GaugeResult::getTimestamp)))))
          .entrySet()
          .stream()
          .collect(Collectors.toMap(e -> e.getKey().getName(), e -> e.getValue().getValue()));

  final Map<String, Long> counters =
      StreamSupport.stream(metricQueryResults.getCounters().spliterator(), false)
          .collect(
              Collectors.groupingBy(
                  m -> m.getName().getName(), Collectors.summingLong(GET_COMMITTED_COUNTER)));
  Map<String, Long> ret = new HashMap<>();
  ret.putAll(gauges);
  ret.putAll(counters);
  addCalculatedMetrics(counters, ret);
  return Collections.unmodifiableMap(ret);
}
 
示例18
private Long getLowestMin(Iterable<MetricResult<DistributionResult>> distributions) {
  Optional<Long> lowestMin =
      StreamSupport.stream(distributions.spliterator(), true)
          .map(element -> element.getAttempted().getMin())
          .filter(this::isCredible)
          .min(Long::compareTo);

  return lowestMin.orElse(ERRONEOUS_METRIC_VALUE);
}
 
示例19
private Long getGreatestMax(Iterable<MetricResult<DistributionResult>> distributions) {
  Optional<Long> greatestMax =
      StreamSupport.stream(distributions.spliterator(), true)
          .map(element -> element.getAttempted().getMax())
          .filter(this::isCredible)
          .max(Long::compareTo);

  return greatestMax.orElse(ERRONEOUS_METRIC_VALUE);
}
 
示例20
private Iterable<MetricResult<DistributionResult>> getDistributions(String name) {
  MetricQueryResults metrics =
      result
          .metrics()
          .queryMetrics(
              MetricsFilter.builder()
                  .addNameFilter(MetricNameFilter.named(namespace, name))
                  .build());
  return metrics.getDistributions();
}
 
示例21
private <T> void checkIfMetricResultIsUnique(String name, Iterable<MetricResult<T>> metricResult)
    throws IllegalStateException {

  int resultCount = Iterables.size(metricResult);
  Preconditions.checkState(
      resultCount <= 1,
      "More than one metric result matches name: %s in namespace %s. Metric results count: %s",
      name,
      namespace,
      resultCount);
}
 
示例22
/**
 * Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful.
 *
 * <p>Note this only runs for runners which support Metrics. Runners which do not should verify
 * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001
 */
public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) {
  if (MetricsEnvironment.isMetricsSupported()) {
    long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline);

    long successfulAssertions = 0;
    Iterable<MetricResult<Long>> successCounterResults =
        pipelineResult
            .metrics()
            .queryMetrics(
                MetricsFilter.builder()
                    .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
                    .build())
            .getCounters();
    for (MetricResult<Long> counter : successCounterResults) {
      if (counter.getAttempted() > 0) {
        successfulAssertions++;
      }
    }

    assertThat(
        String.format(
            "Expected %d successful assertions, but found %d.",
            expectedNumberOfAssertions, successfulAssertions),
        successfulAssertions,
        is(expectedNumberOfAssertions));
  }
}
 
示例23
@SuppressWarnings("ConstantConditions")
private static <T> void mergeAttemptedResults(
    Map<MetricKey, MetricResult<T>> metricResultMap,
    Iterable<MetricUpdate<T>> updates,
    BiFunction<T, T, T> combine) {
  for (MetricUpdate<T> metricUpdate : updates) {
    MetricKey key = metricUpdate.getKey();
    MetricResult<T> current = metricResultMap.get(key);
    if (current == null) {
      metricResultMap.put(key, MetricResult.attempted(key, metricUpdate.getUpdate()));
    } else {
      metricResultMap.put(key, current.addAttempted(metricUpdate.getUpdate(), combine));
    }
  }
}
 
示例24
public DefaultMetricResults(
    Iterable<MetricResult<Long>> counters,
    Iterable<MetricResult<DistributionResult>> distributions,
    Iterable<MetricResult<GaugeResult>> gauges) {
  this.counters = counters;
  this.distributions = distributions;
  this.gauges = gauges;
}
 
示例25
public static long getCounterValue(String counterName) {
  for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
    if (metricResult.getName().getName().equals(counterName)) {
      return metricResult.getAttempted();
    }
  }
  return 0L;
}
 
示例26
public static List<MetricResult<Long>> getSystemCounters() {
  List<MetricResult<Long>> counters =
      StreamSupport.stream(metricQueryResults.getCounters().spliterator(), false)
          .filter(result -> result.getKey().metricName().getName().contains(SYSTEM_METRIC_PREFIX))
          .collect(Collectors.toList());
  return counters;
}
 
示例27
private PortableMetrics(
    Iterable<MetricResult<Long>> counters,
    Iterable<MetricResult<DistributionResult>> distributions,
    Iterable<MetricResult<GaugeResult>> gauges) {
  this.counters = counters;
  this.distributions = distributions;
  this.gauges = gauges;
}
 
示例28
private static PortableMetrics convertMonitoringInfosToMetricResults(
    JobApi.MetricResults jobMetrics) {
  List<MetricsApi.MonitoringInfo> monitoringInfoList = new ArrayList<>();
  monitoringInfoList.addAll(jobMetrics.getAttemptedList());
  monitoringInfoList.addAll(jobMetrics.getCommittedList());
  Iterable<MetricResult<Long>> countersFromJobMetrics =
      extractCountersFromJobMetrics(monitoringInfoList);
  Iterable<MetricResult<DistributionResult>> distributionsFromMetrics =
      extractDistributionMetricsFromJobMetrics(monitoringInfoList);
  Iterable<MetricResult<GaugeResult>> gaugesFromMetrics =
      extractGaugeMetricsFromJobMetrics(monitoringInfoList);
  return new PortableMetrics(countersFromJobMetrics, distributionsFromMetrics, gaugesFromMetrics);
}
 
示例29
private static Iterable<MetricResult<DistributionResult>>
    extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> monitoringInfoList) {
  return monitoringInfoList.stream()
      .filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
      .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
      .map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
      .collect(Collectors.toList());
}
 
示例30
private static Iterable<MetricResult<GaugeResult>> extractGaugeMetricsFromJobMetrics(
    List<MetricsApi.MonitoringInfo> monitoringInfoList) {
  return monitoringInfoList.stream()
      .filter(item -> LATEST_INT64_TYPE.equals(item.getType()))
      .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
      .map(PortableMetrics::convertGaugeMonitoringInfoToGauge)
      .collect(Collectors.toList());
}