Java源码示例:org.apache.beam.sdk.metrics.Counter
示例1
/** Factory method to create mapper instance. */
public static MapWithFailures<PubsubMessage, PubsubMessage, PubsubMessage> of(int maxBytes) {
final Counter countPayloadTooLarge = Metrics.counter(LimitPayloadSize.class,
"payload_too_large");
return MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via((PubsubMessage msg) -> {
msg = PubsubConstraints.ensureNonNull(msg);
int numBytes = msg.getPayload().length;
if (numBytes > maxBytes) {
countPayloadTooLarge.inc();
throw new PayloadTooLargeException("Message payload is " + numBytes
+ "bytes, larger than the" + " configured limit of " + maxBytes);
}
return msg;
}).exceptionsInto(TypeDescriptor.of(PubsubMessage.class))
.exceptionsVia((ExceptionElement<PubsubMessage> ee) -> {
try {
throw ee.exception();
} catch (PayloadTooLargeException e) {
return FailureMessage.of(LimitPayloadSize.class.getSimpleName(), ee.element(),
ee.exception());
}
});
}
示例2
@VisibleForTesting
static Counter getOrCreateCounter(@Nullable Map<String, String> attributes, String name) {
if (attributes == null) {
attributes = EMPTY_ATTRIBUTES;
}
String namespace = attributes.getOrDefault("document_namespace", "unknown_namespace");
// Dataflow's UI collapses the metric name, but always shows the penultimate path component
// as part of "Step", so we're a bit more verbose with the default doctype value.
String docType = attributes.getOrDefault("document_type", "unknown_doctype");
String version = attributes.get("document_version");
if (version != null) {
docType = docType + "_v" + version;
}
String key = String.format("%s/%s/%s", namespace, docType, name)
// We change dashes to underscores to make sure we're following Stackdriver's naming scheme:
// https://cloud.google.com/monitoring/api/v3/metrics-details#label_names
.replace("-", "_");
return counters.computeIfAbsent(key, k -> Metrics.counter(PerDocTypeCounter.class, k));
}
示例3
Future<RecordMetadata> sendRecord(TimestampedValue<KV<K, V>> record, Counter sendCounter) {
try {
Long timestampMillis =
spec.getPublishTimestampFunction() != null
? spec.getPublishTimestampFunction()
.getTimestamp(record.getValue(), record.getTimestamp())
.getMillis()
: null;
Future<RecordMetadata> result =
producer.send(
new ProducerRecord<>(
spec.getTopic(),
null,
timestampMillis,
record.getValue().getKey(),
record.getValue().getValue()));
sendCounter.inc();
return result;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
示例4
void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
try {
// Store id in consumer group metadata for the partition.
// NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits
// how long the pipeline could be down before resuming it. It does not look like
// this TTL can be adjusted (asked about it on Kafka users list).
ProducerSpEL.sendOffsetsToTransaction(
producer,
ImmutableMap.of(
new TopicPartition(spec.getTopic(), shard),
new OffsetAndMetadata(
0L,
JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId, writerId)))),
spec.getSinkGroupId());
ProducerSpEL.commitTransaction(producer);
numTransactions.inc();
LOG.debug("{} : committed {} records", shard, lastRecordId - committedId);
committedId = lastRecordId;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
示例5
/** Generate Avro schema by reading one row. Expose Beam metrics via a Beam PTransform. */
public static Schema createSchema(
final Pipeline pipeline, final JdbcExportArgs args, final Connection connection)
throws Exception {
final long startTime = System.nanoTime();
final Schema generatedSchema = generateAvroSchema(args, connection);
final long elapsedTimeSchema = (System.nanoTime() - startTime) / 1000000;
LOGGER.info("Elapsed time to schema {} seconds", elapsedTimeSchema / 1000.0);
final Counter cnt =
Metrics.counter(BeamJdbcAvroSchema.class.getCanonicalName(), "schemaElapsedTimeMs");
pipeline
.apply(
"ExposeSchemaCountersSeed",
Create.of(Collections.singletonList(0)).withType(TypeDescriptors.integers()))
.apply(
"ExposeSchemaCounters",
MapElements.into(TypeDescriptors.integers())
.via(
v -> {
cnt.inc(elapsedTimeSchema);
return v;
}));
return generatedSchema;
}
示例6
@Override
public PCollection<Event> expand(PCollection<Event> events) {
final Coder<Event> coder = events.getCoder();
return events
// Force round trip through coder.
.apply(
name + ".Serialize",
ParDo.of(
new DoFn<Event, Event>() {
private final Counter bytesMetric = Metrics.counter(name, "bytes");
@ProcessElement
public void processElement(ProcessContext c) throws CoderException, IOException {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
coder.encode(c.element(), outStream, Coder.Context.OUTER);
byte[] byteArray = outStream.toByteArray();
bytesMetric.inc((long) byteArray.length);
ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
Event event = coder.decode(inStream, Coder.Context.OUTER);
c.output(event);
}
}));
}
示例7
private PTransform<? super PCollection<Row>, PCollection<Row>> logBytesMetric(
final Coder<Row> coder) {
return ParDo.of(
new DoFn<Row, Row>() {
private final Counter bytesMetric = Metrics.counter(name, "bytes");
@ProcessElement
public void processElement(@Element Row element, OutputReceiver<Row> o)
throws IOException {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
coder.encode(element, outStream, Coder.Context.OUTER);
byte[] byteArray = outStream.toByteArray();
bytesMetric.inc((long) byteArray.length);
ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
Row row = coder.decode(inStream, Coder.Context.OUTER);
o.output(row);
}
});
}
示例8
Future<RecordMetadata> sendRecord(
TimestampedValue<ProducerRecord<K, V>> record, Counter sendCounter) {
try {
Long timestampMillis =
spec.getPublishTimestampFunction() != null
? spec.getPublishTimestampFunction()
.getTimestamp(record.getValue(), record.getTimestamp())
.getMillis()
: null;
Future<RecordMetadata> result =
producer.send(
new ProducerRecord<>(
spec.getTopic(),
null,
timestampMillis,
record.getValue().key(),
record.getValue().value()));
sendCounter.inc();
return result;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
示例9
void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
try {
// Store id in consumer group metadata for the partition.
// NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits
// how long the pipeline could be down before resuming it. It does not look like
// this TTL can be adjusted (asked about it on Kafka users list).
ProducerSpEL.sendOffsetsToTransaction(
producer,
ImmutableMap.of(
new TopicPartition(spec.getTopic(), shard),
new OffsetAndMetadata(
0L,
JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId, writerId)))),
spec.getSinkGroupId());
ProducerSpEL.commitTransaction(producer);
numTransactions.inc();
LOG.debug("{} : committed {} records", shard, lastRecordId - committedId);
committedId = lastRecordId;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
示例10
@Test
public void testOperationsUpdateCounterFromContainerWhenContainerIsPresent() {
HashMap<String, String> labels = new HashMap<String, String>();
String urn = MonitoringInfoConstants.Urns.ELEMENT_COUNT;
MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn, labels);
MetricsContainer mockContainer = Mockito.mock(MetricsContainer.class);
Counter mockCounter = Mockito.mock(Counter.class);
when(mockContainer.getCounter(name)).thenReturn(mockCounter);
Counter counter = LabeledMetrics.counter(name);
MetricsEnvironment.setCurrentContainer(mockContainer);
counter.inc();
verify(mockCounter).inc(1);
counter.inc(47L);
verify(mockCounter).inc(47);
counter.dec(5L);
verify(mockCounter).inc(-5);
}
示例11
@Test
public void extractThrottleTimeCounters() {
BatchModeExecutionContext executionContext =
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage");
DataflowOperationContext operationContext =
executionContext.createOperationContext(NameContextsForTests.nameContextForTest());
Counter counter =
operationContext
.metricsContainer()
.getCounter(
MetricName.named(
BatchModeExecutionContext.DATASTORE_THROTTLE_TIME_NAMESPACE,
"cumulativeThrottlingSeconds"));
counter.inc(12);
counter.inc(17);
counter.inc(1);
assertEquals(30L, (long) executionContext.extractThrottleTime());
}
示例12
@Test
public void getCounterTest() {
Map<String, String> attributes = ImmutableMap //
.of("document_namespace", "telemetry", //
"document_type", "core", //
"document_version", "10");
Counter counter = PerDocTypeCounter.getOrCreateCounter(attributes, "my-name");
assertEquals("telemetry/core_v10/my_name", counter.getName().getName());
}
示例13
@Test
public void getCounterVersionlessTest() {
Map<String, String> attributes = ImmutableMap //
.of("document_namespace", "telemetry", //
"document_type", "core");
Counter counter = PerDocTypeCounter.getOrCreateCounter(attributes, "my-name");
assertEquals("telemetry/core/my_name", counter.getName().getName());
}
示例14
/** Return a transform to pass-through events, but count them as they go by. */
public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
return ParDo.of(
new DoFn<Event, Event>() {
final Counter eventCounter = Metrics.counter(name, "events");
final Counter newPersonCounter = Metrics.counter(name, "newPersons");
final Counter newAuctionCounter = Metrics.counter(name, "newAuctions");
final Counter bidCounter = Metrics.counter(name, "bids");
final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream");
@ProcessElement
public void processElement(ProcessContext c) {
eventCounter.inc();
if (c.element().newPerson != null) {
newPersonCounter.inc();
} else if (c.element().newAuction != null) {
newAuctionCounter.inc();
} else if (c.element().bid != null) {
bidCounter.inc();
} else {
endOfStreamCounter.inc();
}
info("%s snooping element %s", name, c.element());
c.output(c.element());
}
});
}
示例15
/** Return a transform to count and discard each element. */
public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
return ParDo.of(
new DoFn<T, Void>() {
final Counter discardedCounterMetric = Metrics.counter(name, "discarded");
@ProcessElement
public void processElement(ProcessContext c) {
discardedCounterMetric.inc();
}
});
}
示例16
/** Return a transform to format each element as a string. */
public static <T> ParDo.SingleOutput<T, String> format(final String name) {
return ParDo.of(
new DoFn<T, String>() {
final Counter recordCounterMetric = Metrics.counter(name, "records");
@ProcessElement
public void processElement(ProcessContext c) {
recordCounterMetric.inc();
c.output(c.element().toString());
}
});
}
示例17
@ProcessElement
public void processElement(ProcessContext processContext) {
for (int i = 0; i < numberOfOperations; i++) {
for (Counter counter : counters) {
counter.inc();
}
}
processContext.output(processContext.element());
}
示例18
@Test
@Category({NeedsRunner.class, UsesCommittedMetrics.class, UsesCounterMetrics.class})
public void testRunPTransform() {
final String namespace = PipelineRunnerTest.class.getName();
final Counter counter = Metrics.counter(namespace, "count");
final PipelineResult result =
PipelineRunner.fromOptions(p.getOptions())
.run(
new PTransform<PBegin, POutput>() {
@Override
public POutput expand(PBegin input) {
PCollection<Double> output =
input
.apply(Create.of(1, 2, 3, 4))
.apply("ScaleByTwo", MapElements.via(new ScaleFn<>(2.0, counter)));
PAssert.that(output).containsInAnyOrder(2.0, 4.0, 6.0, 8.0);
return output;
}
});
// Checking counters to verify the pipeline actually ran.
assertThat(
result
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.inNamespace(namespace))
.build())
.getCounters(),
hasItem(metricsResult(namespace, "count", "ScaleByTwo", 4L, true)));
}
示例19
@Test
public void testCounterDoesNotFailOperationsWhenNoMetricsContainerPresent() {
MetricsEnvironment.setCurrentContainer(null);
assertNull(MetricsEnvironment.getCurrentContainer());
HashMap<String, String> labels = new HashMap<String, String>();
String urn = MonitoringInfoConstants.Urns.ELEMENT_COUNT;
MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn, labels);
Counter counter = LabeledMetrics.counter(name);
counter.inc();
counter.inc(5L);
counter.dec();
counter.dec(5L);
}
示例20
@Test
public void getCounterNullTest() {
Counter counter = PerDocTypeCounter.getOrCreateCounter(null, "my-name");
assertEquals("unknown_namespace/unknown_doctype/my_name", counter.getName().getName());
}
示例21
@Override
public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>> userScores) {
// Get the sum of scores for each user.
PCollection<KV<String, Integer>> sumScores =
userScores.apply("UserSum", Sum.integersPerKey());
// Extract the score from each element, and use it to find the global mean.
final PCollectionView<Double> globalMeanScore =
sumScores.apply(Values.create()).apply(Mean.<Integer>globally().asSingletonView());
// Filter the user sums using the global mean.
PCollection<KV<String, Integer>> filtered =
sumScores.apply(
"ProcessAndFilter",
ParDo
// use the derived mean total score as a side input
.of(
new DoFn<KV<String, Integer>, KV<String, Integer>>() {
private final Counter numSpammerUsers =
Metrics.counter("main", "SpammerUsers");
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
Double gmc = c.sideInput(globalMeanScore);
if (score > (gmc * SCORE_WEIGHT)) {
LOG.info(
"user "
+ c.element().getKey()
+ " spammer score "
+ score
+ " with mean "
+ gmc);
numSpammerUsers.inc();
c.output(c.element());
}
}
})
.withSideInputs(globalMeanScore));
return filtered;
}
示例22
@Override
public PCollection<KV<String, Integer>> expand(PCollection<KV<String, Integer>> userScores) {
// Get the sum of scores for each user.
PCollection<KV<String, Integer>> sumScores =
userScores.apply("UserSum", Sum.integersPerKey());
// Extract the score from each element, and use it to find the global mean.
final PCollectionView<Double> globalMeanScore =
sumScores.apply(Values.create()).apply(Mean.<Integer>globally().asSingletonView());
// Filter the user sums using the global mean.
PCollection<KV<String, Integer>> filtered =
sumScores.apply(
"ProcessAndFilter",
ParDo
// use the derived mean total score as a side input
.of(
new DoFn<KV<String, Integer>, KV<String, Integer>>() {
private final Counter numSpammerUsers =
Metrics.counter("main", "SpammerUsers");
@ProcessElement
public void processElement(ProcessContext c) {
Integer score = c.element().getValue();
Double gmc = c.sideInput(globalMeanScore);
if (score > (gmc * SCORE_WEIGHT)) {
LOG.info(
"user "
+ c.element().getKey()
+ " spammer score "
+ score
+ " with mean "
+ gmc);
numSpammerUsers.inc();
c.output(c.element());
}
}
})
.withSideInputs(globalMeanScore));
return filtered;
}
示例23
@Override
public PCollection<AuctionBid> expand(PCollection<Event> events) {
// Window auctions and bids into custom auction windows. New people events will be discarded.
// This will allow us to bring bids and auctions together irrespective of how long
// each auction is open for.
events = events.apply("Window", Window.into(auctionOrBidWindowFn));
// Key auctions by their id.
PCollection<KV<Long, Auction>> auctionsById =
events
.apply(NexmarkQueryUtil.JUST_NEW_AUCTIONS)
.apply("AuctionById:", NexmarkQueryUtil.AUCTION_BY_ID);
// Key bids by their auction id.
PCollection<KV<Long, Bid>> bidsByAuctionId =
events
.apply(NexmarkQueryUtil.JUST_BIDS)
.apply("BidByAuction", NexmarkQueryUtil.BID_BY_AUCTION);
// Find the highest price valid bid for each closed auction.
return
// Join auctions and bids.
KeyedPCollectionTuple.of(NexmarkQueryUtil.AUCTION_TAG, auctionsById)
.and(NexmarkQueryUtil.BID_TAG, bidsByAuctionId)
.apply(CoGroupByKey.create())
// Filter and select.
.apply(
name + ".Join",
ParDo.of(
new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
@ProcessElement
public void processElement(ProcessContext c) {
@Nullable
Auction auction =
c.element().getValue().getOnly(NexmarkQueryUtil.AUCTION_TAG, null);
if (auction == null) {
// We have bids without a matching auction. Give up.
noAuctionCounter.inc();
return;
}
// Find the current winning bid for auction.
// The earliest bid with the maximum price above the reserve wins.
Bid bestBid = null;
for (Bid bid : c.element().getValue().getAll(NexmarkQueryUtil.BID_TAG)) {
// Bids too late for their auction will have been
// filtered out by the window merge function.
checkState(bid.dateTime.compareTo(auction.expires) < 0);
if (bid.price < auction.reserve) {
// Bid price is below auction reserve.
underReserveCounter.inc();
continue;
}
if (bestBid == null
|| Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
bestBid = bid;
}
}
if (bestBid == null) {
// We don't have any valid bids for auction.
noValidBidsCounter.inc();
return;
}
c.output(new AuctionBid(auction, bestBid));
}
}));
}
示例24
public ScaleFn(double scalar, Counter counter) {
this.scalar = scalar;
this.counter = counter;
}
示例25
/**
* Create a metric that can be incremented and decremented, and is aggregated by taking the sum.
*/
public static Counter counter(MonitoringInfoMetricName metricName) {
return new DelegatingCounter(metricName);
}
示例26
@Override
public Counter getCounter(MetricName metricName) {
return counters.computeIfAbsent(metricName, CounterImpl::new);
}
示例27
@Override
public Counter getCounter(MetricName metricName) {
return counters.get(metricName);
}
示例28
@Override
public Counter getCounter(MetricName metricName) {
return getCurrentContainer().getCounter(metricName);
}
示例29
@Test
public void extractMetricUpdatesCounter() {
BatchModeExecutionContext executionContext =
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage");
DataflowOperationContext operationContext =
executionContext.createOperationContext(NameContextsForTests.nameContextForTest());
Counter counter =
operationContext
.metricsContainer()
.getCounter(MetricName.named("namespace", "some-counter"));
counter.inc(1);
counter.inc(41);
counter.inc(1);
counter.inc(-1);
final CounterUpdate expected =
new CounterUpdate()
.setStructuredNameAndMetadata(
new CounterStructuredNameAndMetadata()
.setName(
new CounterStructuredName()
.setOrigin("USER")
.setOriginNamespace("namespace")
.setName("some-counter")
.setOriginalStepName("originalName"))
.setMetadata(new CounterMetadata().setKind(Kind.SUM.toString())))
.setCumulative(true)
.setInteger(longToSplitInt(42));
assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));
executionContext.commitMetricUpdates();
Counter counterUncommitted =
operationContext
.metricsContainer()
.getCounter(MetricName.named("namespace", "uncommitted-counter"));
counterUncommitted.inc(64);
final CounterUpdate expectedUncommitted =
new CounterUpdate()
.setStructuredNameAndMetadata(
new CounterStructuredNameAndMetadata()
.setName(
new CounterStructuredName()
.setOrigin("USER")
.setOriginNamespace("namespace")
.setName("uncommitted-counter")
.setOriginalStepName("originalName"))
.setMetadata(new CounterMetadata().setKind(Kind.SUM.toString())))
.setCumulative(true)
.setInteger(longToSplitInt(64));
// Expect to get only the uncommitted metric, unless final update.
assertThat(
executionContext.extractMetricUpdates(false), containsInAnyOrder(expectedUncommitted));
assertThat(
executionContext.extractMetricUpdates(true),
containsInAnyOrder(expected, expectedUncommitted));
executionContext.commitMetricUpdates();
// All Metrics are committed, expect none unless final update.
assertThat(executionContext.extractMetricUpdates(false), emptyIterable());
assertThat(
executionContext.extractMetricUpdates(true),
containsInAnyOrder(expected, expectedUncommitted));
}