blob: 8d4ab5ea9fc3278985f0717229699dec4d0df50d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.metricsmgr;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.heron.api.metric.MultiCountMetric;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.NIOLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.basics.SysUtils;
import org.apache.heron.common.basics.TypeUtils;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.network.HeronSocketOptions;
import org.apache.heron.common.utils.logging.ErrorReportLoggingHandler;
import org.apache.heron.common.utils.logging.LoggingHelper;
import org.apache.heron.common.utils.metrics.JVMMetrics;
import org.apache.heron.common.utils.metrics.MetricsCollector;
import org.apache.heron.metricsmgr.executor.SinkExecutor;
import org.apache.heron.metricsmgr.sink.SinkContextImpl;
import org.apache.heron.proto.system.Metrics;
import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord;
import org.apache.heron.spi.metricsmgr.sink.IMetricsSink;
import org.apache.heron.spi.metricsmgr.sink.SinkContext;
/**
* Main entry to start the Metrics Manager
* 1. It would read and parse the sink-configs, and then instantiate new IMetricsSink
* instance by reflection.
* <p>
* 2. Every IMetricsSink would be driven by an individual thread; and in fact it is
* an executor running within a fixed-size thread's ExecutorService.
* We would set the thread name of running executor driving the IMetricsSink, for restart when
* uncaught exceptions of that IMetricsSink are caught.
* <p>
* 3. When one IMetricsSink throws uncaught exceptions, we would try to restart this sink unless
* we have hit the # of retry attempts.
* <p>
* 4. When Metrics Manager internal exceptions are caught or we have restart IMetricsSink with
* too many attempts, Metrics Manager would flush any remain log and exit.
*/
public class MetricsManager {
private static final Logger LOG = Logger.getLogger(MetricsManager.class.getName());
// Pre-defined value
private static final String METRICS_MANAGER_HOST = "127.0.0.1";
private static final String METRICS_MANAGER_COMPONENT_NAME = "__metricsmgr__";
private static final int METRICS_MANAGER_INSTANCE_ID = -1;
private final MetricsSinksConfig config;
private final ExecutorService executors;
// A ConcurrentHashMap from sinkId to SinkExecutor
private final Map<String, SinkExecutor> sinkExecutors;
private final MetricsManagerServer metricsManagerServer;
// # of attempts for different IMetricsSink to restart when failure happens
private final Map<String, Integer> sinksRetryAttempts;
// The looper drives MetricsManagerServer
private final NIOLooper metricsManagerServerLoop;
private final JVMMetrics jvmMetrics;
// MetricsCollector used to collect internal metrics of Metrics Manager
private final MetricsCollector metricsCollector;
// Communicator to be bind with MetricsCollector to collect metrics
private final Communicator<Metrics.MetricPublisherPublishMessage> metricsQueue;
private final Metrics.MetricPublisher metricsManagerPublisher;
private final Duration heronMetricsExportInterval;
private final String topologyName;
private final String cluster;
private final String role;
private final String environment;
private final String metricsmgrId;
private final long mainThreadId;
/**
* Metrics manager constructor
*/
public MetricsManager(String topologyName, String cluster, String role, String environment,
String serverHost, int serverPort, String metricsmgrId,
SystemConfig systemConfig, MetricsSinksConfig config)
throws IOException {
this.topologyName = topologyName;
this.cluster = cluster;
this.role = role;
this.environment = environment;
this.metricsmgrId = metricsmgrId;
this.config = config;
this.metricsManagerServerLoop = new NIOLooper();
// Init the Internal Metrics Export related stuff
this.metricsManagerPublisher =
Metrics.MetricPublisher.newBuilder().
setHostname(getLocalHostName()).setPort(serverPort).
setComponentName(METRICS_MANAGER_COMPONENT_NAME).
setInstanceId(metricsmgrId).
setInstanceIndex(METRICS_MANAGER_INSTANCE_ID).build();
this.jvmMetrics = new JVMMetrics();
this.metricsQueue =
new Communicator<Metrics.MetricPublisherPublishMessage>(null,
this.metricsManagerServerLoop);
this.metricsCollector = new MetricsCollector(metricsManagerServerLoop, metricsQueue);
this.heronMetricsExportInterval = systemConfig.getHeronMetricsExportInterval();
this.mainThreadId = Thread.currentThread().getId();
// Init the ErrorReportHandler
ErrorReportLoggingHandler.init(metricsCollector, heronMetricsExportInterval,
systemConfig.getHeronMetricsMaxExceptionsPerMessageCount());
// Set up the internal Metrics Export routine
setupInternalMetricsExport();
// Set up JVM metrics
// TODO -- change the config name
setupJVMMetrics(systemConfig.getInstanceMetricsSystemSampleInterval());
// Init the HeronSocketOptions
HeronSocketOptions serverSocketOptions =
new HeronSocketOptions(systemConfig.getMetricsMgrNetworkWriteBatchSize(),
systemConfig.getMetricsMgrNetworkWriteBatchTime(),
systemConfig.getMetricsMgrNetworkReadBatchSize(),
systemConfig.getMetricsMgrNetworkReadBatchTime(),
systemConfig.getMetricsMgrNetworkOptionsSocketSendBufferSize(),
systemConfig.getMetricsMgrNetworkOptionsSocketReceivedBufferSize(),
systemConfig.getMetricsMgrNetworkOptionsMaximumPacketSize());
// Set the MultiCountMetric for MetricsManagerServer
MultiCountMetric serverCounters = new MultiCountMetric();
metricsCollector.registerMetric(METRICS_MANAGER_COMPONENT_NAME,
serverCounters, (int) heronMetricsExportInterval.getSeconds());
// Construct the MetricsManagerServer
metricsManagerServer = new MetricsManagerServer(metricsManagerServerLoop, serverHost,
serverPort, serverSocketOptions, serverCounters);
executors = Executors.newFixedThreadPool(config.getNumberOfSinks());
sinkExecutors = new ConcurrentHashMap<>(config.getNumberOfSinks());
sinksRetryAttempts = new ConcurrentHashMap<>(config.getNumberOfSinks());
// Add exception handler for any uncaught exception here.
Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler());
// Init the Sinks
for (String sinkId : config.getSinkIds()) {
// Instantiate a new instance by using reflection
SinkExecutor sinkExecutor = initSinkExecutor(sinkId);
// Update the SinkExecutor in sinkExecutors
sinkExecutors.put(sinkId, sinkExecutor);
// Set the retry attempts
Object restartAttempts = config.getConfigForSink(sinkId).
get(MetricsSinksConfig.CONFIG_KEY_SINK_RESTART_ATTEMPTS);
// Supply with default value is config is null
sinksRetryAttempts.put(sinkId,
restartAttempts == null
? MetricsSinksConfig.DEFAULT_SINK_RESTART_ATTEMPTS
: TypeUtils.getInteger(restartAttempts));
// Update the list of Communicator in Metrics Manager Server
metricsManagerServer.addSinkCommunicator(sinkExecutor.getCommunicator());
}
}
private static String getLocalHostName() {
String hostName;
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.severe("Unknown host.");
hostName = METRICS_MANAGER_HOST;
}
return hostName;
}
// Construct all required command line options
private static Options constructOptions() {
Option id = Option.builder()
.desc("Metrics manager id")
.longOpt("id")
.hasArgs()
.argName("id")
.required()
.build();
Option port = Option.builder()
.desc("Metrics manager port")
.longOpt("port")
.hasArgs()
.argName("port")
.required()
.build();
Option topology = Option.builder()
.desc("The name of the topology to collect metrics from")
.longOpt("topology")
.hasArgs()
.argName("topology")
.required()
.build();
Option topologyId = Option.builder()
.desc("The name of the topology to collect metrics from")
.longOpt("topology-id")
.hasArgs()
.argName("topologyId")
.required()
.build();
Option cluster = Option.builder()
.desc("The name of the topology to collect metrics from")
.longOpt("cluster")
.hasArgs()
.argName("cluster")
.required()
.build();
Option role = Option.builder()
.desc("The name of the topology to collect metrics from")
.longOpt("role")
.hasArgs()
.argName("role")
.required()
.build();
Option environment = Option.builder()
.desc("The name of the topology to collect metrics from")
.longOpt("environment")
.hasArgs()
.argName("environment")
.required()
.build();
Option sinkConfig = Option.builder()
.desc("The name of the topology to collect metrics from")
.longOpt("sink-config-file")
.hasArgs()
.argName("sink config file")
.required()
.build();
Option systemConfig = Option.builder()
.desc("The name of the topology to collect metrics from")
.longOpt("system-config-file")
.hasArgs()
.argName("system config file")
.required()
.build();
Option overrideConfig = Option.builder()
.desc("The name of the topology to collect metrics from")
.longOpt("override-config-file")
.hasArgs()
.argName("override config file")
.required()
.build();
return new Options()
.addOption(id)
.addOption(port)
.addOption(topology)
.addOption(topologyId)
.addOption(cluster)
.addOption(role)
.addOption(environment)
.addOption(systemConfig)
.addOption(overrideConfig)
.addOption(sinkConfig);
}
// construct command line help options
private static Options constructHelpOptions() {
Options options = new Options();
Option help = Option.builder("h")
.desc("List all options and their description")
.longOpt("help")
.build();
options.addOption(help);
return options;
}
// Print usage options
private static void usage(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("MetricsManager", options);
}
public static void main(String[] args) throws Exception {
final Options options = constructOptions();
final Options helpOptions = constructHelpOptions();
final CommandLineParser parser = new DefaultParser();
// parse the help options first.
CommandLine cmd = parser.parse(helpOptions, args, true);
if (cmd.hasOption("h")) {
usage(options);
return;
}
try {
// Now parse the required options
cmd = parser.parse(options, args);
} catch (ParseException pe) {
usage(options);
throw new RuntimeException("Error parsing command line options: ", pe);
}
String metricsmgrId = cmd.getOptionValue("id");
int metricsPort = Integer.parseInt(cmd.getOptionValue("port"));
String topologyName = cmd.getOptionValue("topology");
String topologyId = cmd.getOptionValue("topology-id");
String systemConfigFilename = cmd.getOptionValue("system-config-file");
String overrideConfigFilename = cmd.getOptionValue("override-config-file");
String metricsSinksConfigFilename = cmd.getOptionValue("sink-config-file");
String cluster = cmd.getOptionValue("cluster");
String role = cmd.getOptionValue("role");
String environment = cmd.getOptionValue("environment");
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(systemConfigFilename, true)
.putAll(overrideConfigFilename, true)
.build();
// Add the SystemConfig into SingletonRegistry
SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig);
// Init the logging setting and redirect the stdout and stderr to logging
// For now we just set the logging level as INFO; later we may accept an argument to set it.
Level loggingLevel = Level.INFO;
String loggingDir = systemConfig.getHeronLoggingDirectory();
// Log to file and TMaster
LoggingHelper.loggerInit(loggingLevel, true);
LoggingHelper.addLoggingHandler(
LoggingHelper.getFileHandler(metricsmgrId, loggingDir, true,
systemConfig.getHeronLoggingMaximumSize(),
systemConfig.getHeronLoggingMaximumFiles()));
LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler());
LOG.info(String.format("Starting Metrics Manager for topology %s with topologyId %s with "
+ "Metrics Manager Id %s, Metrics Manager Port: %d, for cluster/role/env %s.",
topologyName, topologyId, metricsmgrId, metricsPort,
String.format("%s/%s/%s", cluster, role, environment)));
LOG.info("System Config: " + systemConfig);
// Populate the config
MetricsSinksConfig sinksConfig = new MetricsSinksConfig(metricsSinksConfigFilename,
overrideConfigFilename);
LOG.info("Sinks Config:" + sinksConfig.toString());
MetricsManager metricsManager =
new MetricsManager(topologyName, cluster, role, environment,
METRICS_MANAGER_HOST, metricsPort, metricsmgrId, systemConfig, sinksConfig);
metricsManager.start();
LOG.info("Loops terminated. Metrics Manager exits.");
}
private void setupJVMMetrics(Duration systemMetricsSampleInterval) {
this.jvmMetrics.registerMetrics(metricsCollector);
// Attach sample Runnable to gatewayMetricsCollector
this.metricsCollector.registerMetricSampleRunnable(jvmMetrics.getJVMSampleRunnable(),
systemMetricsSampleInterval);
}
private void setupInternalMetricsExport() {
Runnable gatherInternalMetrics = new Runnable() {
@Override
public void run() {
while (!metricsQueue.isEmpty()) {
Metrics.MetricPublisherPublishMessage message = metricsQueue.poll();
metricsManagerServer.onInternalMessage(metricsManagerPublisher, message);
}
// It schedules itself in future
metricsManagerServerLoop.registerTimerEvent(heronMetricsExportInterval,
this);
}
};
metricsManagerServerLoop.registerTimerEvent(heronMetricsExportInterval,
gatherInternalMetrics);
}
private SinkExecutor initSinkExecutor(String sinkId) {
IMetricsSink sink;
String classname =
(String) config.getConfigForSink(sinkId).get(MetricsSinksConfig.CONFIG_KEY_CLASSNAME);
try {
sink = (IMetricsSink) Class.forName(classname).newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(e + " IMetricsSink class must have a no-arg constructor.");
} catch (IllegalAccessException e) {
throw new RuntimeException(e + " IMetricsSink class must be concrete.");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e + " IMetricsSink class must be a class path.");
}
SlaveLooper sinkExecutorLoop = new SlaveLooper();
Communicator<MetricsRecord> executorInMetricsQueue =
new Communicator<MetricsRecord>(null, sinkExecutorLoop);
// Since MetricsCollector is not thread-safe,
// we need to specify individual MetricsCollector and MultiCountMetric
// for different SinkExecutor
MetricsCollector sinkMetricsCollector = new MetricsCollector(sinkExecutorLoop, metricsQueue);
MultiCountMetric internalCounters = new MultiCountMetric();
sinkMetricsCollector
.registerMetric(sinkId, internalCounters, (int) heronMetricsExportInterval.getSeconds());
// Set up the SinkContext
SinkContext sinkContext =
new SinkContextImpl(topologyName, cluster, role, environment,
metricsmgrId, sinkId, internalCounters);
SinkExecutor sinkExecutor =
new SinkExecutor(sinkId, sink, sinkExecutorLoop, executorInMetricsQueue, sinkContext);
sinkExecutor.setPropertyMap(config.getConfigForSink(sinkId));
return sinkExecutor;
}
public void start() {
LOG.info("Starting the Executors.");
// Execute the SinkExecutor in separate threads
for (SinkExecutor executor : sinkExecutors.values()) {
executors.execute(executor);
}
// The MetricsManagerServer would run in the main thread
// We do it in the final step since it would await the main thread
LOG.info("Starting Metrics Manager Server");
metricsManagerServer.start();
metricsManagerServerLoop.loop();
}
/**
* Handler for catching exceptions thrown by any threads (owned either by topology or heron
* infrastructure).
* When one IMetricsSink throws uncaught exceptions, we would try to restart this sink unless
* we have hit the # of retry attempts.
* When Metrics Manager internal exceptions are caught or we have restart IMetricsSink with
* too many attempts, Metrics Manager would flush any remain logs and exit.
*/
public class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler {
/**
* Handler for uncaughtException
*/
public void uncaughtException(Thread thread, Throwable exception) {
// Add try and catch block to prevent new exceptions stop the handling thread
try {
// Delegate to the actual one
handleException(thread, exception);
// SUPPRESS CHECKSTYLE IllegalCatch
} catch (Throwable t) {
LOG.log(Level.SEVERE, "Failed to handle exception. Process halting", t);
Runtime.getRuntime().halt(1);
}
}
// The actual uncaught exceptions handing logic
private void handleException(Thread thread, Throwable exception) {
// We would fail fast when errors occur
if (exception instanceof Error) {
LOG.log(Level.SEVERE,
"Error caught in thread: " + thread.getName()
+ " with thread id: " + thread.getId() + ". Process halting...",
exception);
Runtime.getRuntime().halt(1);
}
// We would fail fast when exceptions happen in main thread
if (thread.getId() == mainThreadId) {
LOG.log(Level.SEVERE,
"Exception caught in main thread. Process halting...",
exception);
Runtime.getRuntime().halt(1);
}
LOG.log(Level.SEVERE,
"Exception caught in thread: " + thread.getName()
+ " with thread id: " + thread.getId(),
exception);
String sinkId = null;
Integer thisSinkRetryAttempts = 0;
// We enforced the name of thread running particular IMetricsSink equal to its sink-id
// If the thread name is a key of SinkExecutors, then it is a thread running IMetricsSink
if (sinkExecutors.containsKey(thread.getName())) {
sinkId = thread.getName();
// Remove the old sink executor
SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId);
// Remove the unneeded Communicator bind with Metrics Manager Server
metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator());
// Close the sink
SysUtils.closeIgnoringExceptions(oldSinkExecutor);
thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId);
}
if (sinkId != null && thisSinkRetryAttempts != 0) {
LOG.info(String.format("Restarting IMetricsSink: %s with %d available retries",
sinkId, thisSinkRetryAttempts));
// That means it was a sinkExecutor throwing exceptions and threadName is sinkId
SinkExecutor newSinkExecutor = initSinkExecutor(sinkId);
// Update the SinkExecutor in sinkExecutors
sinkExecutors.put(sinkId, newSinkExecutor);
// Update the retry attempts if it is > 0
if (thisSinkRetryAttempts > 0) {
thisSinkRetryAttempts--;
}
sinksRetryAttempts.put(sinkId, thisSinkRetryAttempts);
// Update the list of Communicator in Metrics Manager Server
metricsManagerServer.addSinkCommunicator(newSinkExecutor.getCommunicator());
// Restart it
executors.execute(newSinkExecutor);
} else if (sinkId != null
&& thisSinkRetryAttempts == 0
&& sinkExecutors.size() > 0) {
// If the dead executor is the only one executor and it is removed,
// e.g. sinkExecutors.size() == 0, we would exit the process directly
LOG.severe("Failed to recover from exceptions for IMetricsSink: " + sinkId);
LOG.info(sinkId + " would close and keep running rest sinks: " + sinkExecutors.keySet());
} else {
// It is not recoverable (retried too many times, or not an exception from IMetricsSink)
// So we would do basic cleaning and exit
LOG.info("Failed to recover from exceptions; Metrics Manager Exiting");
for (Handler handler : java.util.logging.Logger.getLogger("").getHandlers()) {
handler.close();
}
// Attempts to shutdown all the thread in threadsPool. This will send Interrupt to every
// thread in the pool. Threads may implement a clean Interrupt logic.
executors.shutdownNow();
// (including threads not owned by HeronInstance). To be safe, not sending these
// interrupts.
Runtime.getRuntime().halt(1);
}
}
}
}