Java源码示例:org.apache.flink.streaming.util.LatencyStats

示例1
private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) {
	try {
		int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
		if (historySize <= 0) {
			LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
			historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
		}

		final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
		LatencyStats.Granularity granularity;
		try {
			granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
		} catch (IllegalArgumentException iae) {
			granularity = LatencyStats.Granularity.OPERATOR;
			LOG.warn(
				"Configured value {} option for {} is invalid. Defaulting to {}.",
				configuredGranularity,
				MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
				granularity);
		}
		TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
		return new LatencyStats(jobMetricGroup.addGroup("latency"),
			historySize,
			indexInSubtaskGroup,
			getOperatorID(),
			granularity);
	} catch (Exception e) {
		LOG.warn("An error occurred while instantiating latency metrics.", e);
		return new LatencyStats(
			UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
			1,
			0,
			new OperatorID(),
			LatencyStats.Granularity.SINGLE);
	}
}