private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) {
try {
int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
if (historySize <= 0) {
LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
}
final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
LatencyStats.Granularity granularity;
try {
granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException iae) {
granularity = LatencyStats.Granularity.OPERATOR;
LOG.warn(
"Configured value {} option for {} is invalid. Defaulting to {}.",
configuredGranularity,
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
granularity);
}
TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
return new LatencyStats(jobMetricGroup.addGroup("latency"),
historySize,
indexInSubtaskGroup,
getOperatorID(),
granularity);
} catch (Exception e) {
LOG.warn("An error occurred while instantiating latency metrics.", e);
return new LatencyStats(
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
1,
0,
new OperatorID(),
LatencyStats.Granularity.SINGLE);
}
}