blob: e2f4f0c1cd05f175c9e21398db512a8a7a63393e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.metricsmgr.sink;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.metricsmgr.MetricsUtil;
import org.apache.heron.spi.metricsmgr.metrics.MetricsInfo;
import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord;
import org.apache.heron.spi.metricsmgr.sink.SinkContext;
/**
* A metrics sink that publishes metrics on a http endpoint
*/
public class WebSink extends AbstractWebSink {
private static final Logger LOG = Logger.getLogger(WebSink.class.getName());
private static final ObjectMapper MAPPER = new ObjectMapper();
// If flat-metrics is false, the metrics will be grouped by the metrics
// source source,
// if flat-metrics is true, the metric names will be prefixed by the source
private static final String KEY_FLAT_METRICS = "flat-metrics";
private static final boolean DEFAULT_FLATTEN_METRICS = true;
// Include the topology name in the metric name
private static final String KEY_INCLUDE_TOPOLOGY_NAME = "include-topology-name";
private static final boolean DEFAULT_INCLUDE_TOPOLOGY_NAME = false;
// This is the cache that is used to serve the metrics
private Cache<String, Object> metricsCache;
private boolean flattenMetrics;
private boolean shouldIncludeTopologyName;
public WebSink() {
super();
}
@VisibleForTesting
WebSink(Ticker cacheTicker) {
super(cacheTicker);
}
Cache<String, Object> getMetricsCache() {
return metricsCache;
}
@Override
void initialize(Map<String, Object> configuration, SinkContext context) {
flattenMetrics = TypeUtils.getBoolean(
configuration.getOrDefault(KEY_FLAT_METRICS, DEFAULT_FLATTEN_METRICS));
shouldIncludeTopologyName = TypeUtils.getBoolean(
configuration.getOrDefault(KEY_INCLUDE_TOPOLOGY_NAME, DEFAULT_INCLUDE_TOPOLOGY_NAME));
metricsCache = createCache();
}
@Override
byte[] generateResponse() throws IOException {
metricsCache.cleanUp();
return MAPPER.writeValueAsString(metricsCache.asMap()).getBytes();
}
@Override
public void processRecord(MetricsRecord record) {
final String[] sources = MetricsUtil.splitRecordSource(record);
final String source;
if (sources.length > 2) {
source = shouldIncludeTopologyName
? String.format("%s/%s/%s", getTopologyName(), sources[1], sources[2])
: String.format("/%s/%s", sources[1], sources[2]);
} else {
source = shouldIncludeTopologyName
? String.format("%s/%s", getTopologyName(), record.getSource())
: String.format("/%s", record.getSource());
}
if (flattenMetrics) {
metricsCache.putAll(processMetrics(source + "/", record.getMetrics()));
} else {
Map<String, Double> sourceCache;
Object sourceObj = metricsCache.getIfPresent(source);
if (sourceObj instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Double> castObj = (Map<String, Double>) sourceObj;
sourceCache = castObj;
} else {
final Cache<String, Double> newSourceCache = createCache();
sourceCache = newSourceCache.asMap();
}
sourceCache.putAll(processMetrics("", record.getMetrics()));
metricsCache.put(source, sourceCache);
}
}
/**
* Helper to prefix metric names, convert metric value to double and return as map
*
* @param prefix
* @param metrics
* @return Map of metric name to metric value
*/
static Map<String, Double> processMetrics(String prefix, Iterable<MetricsInfo> metrics) {
Map<String, Double> map = new HashMap<>();
for (MetricsInfo r : metrics) {
try {
map.put(prefix + r.getName(), Double.valueOf(r.getValue()));
} catch (NumberFormatException ne) {
LOG.log(Level.SEVERE, "Could not parse metric, Name: "
+ r.getName() + " Value: " + r.getValue(), ne);
}
}
return map;
}
}