blob: f4f83336feb50bedd34ae336bd25441aa6a12d05 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.sink.kafka;
import java.io.Closeable;
import java.util.Locale;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.ConsoleReporter;
import com.yammer.metrics.reporting.CsvReporter;
/**
* The abstract base class for all scheduled reporters (i.e., reporters which
* process a registry's metrics periodically).
*
* @see ConsoleReporter
* @see CsvReporter
* @see Slf4jReporter
*/
public abstract class ScheduledReporter implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ScheduledReporter.class);
/**
* A simple named thread factory.
*/
@SuppressWarnings("NullableProblems")
private static class NamedThreadFactory implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private NamedThreadFactory(String name) {
final SecurityManager s = System.getSecurityManager();
this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = "metrics-" + name + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
private static final AtomicInteger FACTORY_ID = new AtomicInteger();
private final MetricsRegistry registry;
private final ScheduledExecutorService executor;
private final double durationFactor;
private final String durationUnit;
private final double rateFactor;
private final String rateUnit;
/**
* Creates a new {@link ScheduledReporter} instance.
*
* @param registry
* the {@link com.codahale.metrics.MetricRegistry} containing the
* metrics this reporter will report
* @param name
* the reporter's name
* @param rateUnit
* a unit of time
* @param durationUnit
* a unit of time
*/
protected ScheduledReporter(MetricsRegistry registry, String name, TimeUnit rateUnit, TimeUnit durationUnit) {
this(registry, rateUnit, durationUnit, Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-'
+ FACTORY_ID.incrementAndGet())));
}
/**
* Creates a new {@link ScheduledReporter} instance.
*
* @param registry
* the {@link com.codahale.metrics.MetricRegistry} containing the
* metrics this reporter will report
* @param executor
* the executor to use while scheduling reporting of metrics.
*/
protected ScheduledReporter(MetricsRegistry registry, TimeUnit rateUnit, TimeUnit durationUnit,
ScheduledExecutorService executor) {
this.registry = registry;
this.executor = executor;
this.rateFactor = rateUnit.toSeconds(1);
this.rateUnit = calculateRateUnit(rateUnit);
this.durationFactor = 1.0 / durationUnit.toNanos(1);
this.durationUnit = durationUnit.toString().toLowerCase(Locale.US);
}
/**
* Starts the reporter polling at the given period.
*
* @param period
* the amount of time between polls
* @param unit
* the unit for {@code period}
*/
public void start(long period, TimeUnit unit) {
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
report();
} catch (RuntimeException ex) {
LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this
.getClass().getSimpleName(), ex);
}
}
}, period, period, unit);
}
/**
* Stops the reporter and shuts down its thread of execution.
*
* Uses the shutdown pattern from
* http://docs.oracle.com/javase/7/docs/api/java
* /util/concurrent/ExecutorService.html
*/
public void stop() {
executor.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
executor.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
/**
* Stops the reporter and shuts down its thread of execution.
*/
@Override
public void close() {
stop();
}
/**
* Report the current values of all metrics in the registry.
*/
public void report() {
synchronized (this) {
report(registry.allMetrics().entrySet());
}
}
/**
* Called periodically by the polling thread. Subclasses should report all the
* given metrics.
*
* @param metrics
* all the metrics in the registry
*/
public abstract void report(Set<Entry<MetricName, Metric>> metrics);
protected String getRateUnit() {
return rateUnit;
}
protected String getDurationUnit() {
return durationUnit;
}
protected double convertDuration(double duration) {
return duration * durationFactor;
}
protected double convertRate(double rate) {
return rate * rateFactor;
}
private String calculateRateUnit(TimeUnit unit) {
final String s = unit.toString().toLowerCase(Locale.US);
return s.substring(0, s.length() - 1);
}
}