blob: 5961d7af80d95c2b30de98a735dfc9aed3995552 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics.reporter;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import com.google.common.annotations.VisibleForTesting;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.Metric;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.metrics.MetricsVisitor;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of {@link MetricsReporter} which logs metrics which match a regex.
* The regex is checked against "[source name]-[group name]-[metric name]".
*/
public class LoggingMetricsReporter implements MetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class);
/**
* First part is source, second part is group name, third part is metric name
*/
private static final String FULL_METRIC_FORMAT = "%s-%s-%s";
private final ScheduledExecutorService scheduledExecutorService;
private final Pattern metricsToLog;
private final long loggingIntervalSeconds;
private final Queue<Runnable> loggingTasks = new ConcurrentLinkedQueue<>();
/**
* @param scheduledExecutorService executes the logging tasks
* @param metricsToLog Only log the metrics which match this regex. The strings for matching against this metric are
* constructed by concatenating source name, group name, and metric name, delimited by dashes.
* @param loggingIntervalSeconds interval at which to log metrics
*/
public LoggingMetricsReporter(ScheduledExecutorService scheduledExecutorService, Pattern metricsToLog,
long loggingIntervalSeconds) {
this.scheduledExecutorService = scheduledExecutorService;
this.metricsToLog = metricsToLog;
this.loggingIntervalSeconds = loggingIntervalSeconds;
}
@Override
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(() -> this.loggingTasks.forEach(Runnable::run),
this.loggingIntervalSeconds, this.loggingIntervalSeconds, TimeUnit.SECONDS);
}
@Override
public void register(String source, ReadableMetricsRegistry registry) {
this.loggingTasks.add(buildLoggingTask(source, registry));
}
@Override
public void stop() {
this.scheduledExecutorService.shutdown();
try {
this.scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("Interrupted while shutting down executor", e);
}
if (!this.scheduledExecutorService.isTerminated()) {
LOG.warn("Unable to shutdown executor");
}
}
/**
* VisibleForTesting so that the logging call can be verified in unit tests.
*/
@VisibleForTesting
void doLog(String logString) {
LOG.info(logString);
}
private Runnable buildLoggingTask(String source, ReadableMetricsRegistry registry) {
return () -> {
for (String group : registry.getGroups()) {
for (Map.Entry<String, Metric> metricGroupEntry : registry.getGroup(group).entrySet()) {
metricGroupEntry.getValue().visit(new MetricsVisitor() {
@Override
public void counter(Counter counter) {
logMetric(source, group, counter.getName(), counter.getCount());
}
@Override
public <T> void gauge(Gauge<T> gauge) {
logMetric(source, group, gauge.getName(), gauge.getValue());
}
@Override
public void timer(Timer timer) {
logMetric(source, group, timer.getName(), timer.getSnapshot().getAverage());
}
});
}
}
};
}
private <T> void logMetric(String source, String group, String metricName, T value) {
String fullMetricName = String.format(FULL_METRIC_FORMAT, source, group, metricName);
if (this.metricsToLog.matcher(fullMetricName).matches()) {
doLog(String.format("Metric: %s, Value: %s", fullMetricName, value));
}
}
}