| // Copyright 2016 Twitter. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.twitter.heron.metricsmgr; |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.time.Duration; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.logging.Handler; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import com.twitter.heron.api.metric.MultiCountMetric; |
| import com.twitter.heron.common.basics.Communicator; |
| import com.twitter.heron.common.basics.NIOLooper; |
| import com.twitter.heron.common.basics.SingletonRegistry; |
| import com.twitter.heron.common.basics.SlaveLooper; |
| import com.twitter.heron.common.basics.SysUtils; |
| import com.twitter.heron.common.basics.TypeUtils; |
| import com.twitter.heron.common.config.SystemConfig; |
| import com.twitter.heron.common.network.HeronSocketOptions; |
| import com.twitter.heron.common.utils.logging.ErrorReportLoggingHandler; |
| import com.twitter.heron.common.utils.logging.LoggingHelper; |
| import com.twitter.heron.common.utils.metrics.JVMMetrics; |
| import com.twitter.heron.common.utils.metrics.MetricsCollector; |
| import com.twitter.heron.metricsmgr.executor.SinkExecutor; |
| import com.twitter.heron.metricsmgr.sink.SinkContextImpl; |
| import com.twitter.heron.proto.system.Metrics; |
| import com.twitter.heron.spi.metricsmgr.metrics.MetricsRecord; |
| import com.twitter.heron.spi.metricsmgr.sink.IMetricsSink; |
| import com.twitter.heron.spi.metricsmgr.sink.SinkContext; |
| |
| /** |
| * Main entry to start the Metrics Manager |
| * 1. It would read and parse the sink-configs, and then instantiate new IMetricsSink |
| * instance by reflection. |
| * <p> |
| * 2. Every IMetricsSink would be driven by an individual thread; and in fact it is |
| * an executor running within a fixed-size thread's ExecutorService. |
| * We would set the thread name of running executor driving the IMetricsSink, for restart when |
| * uncaught exceptions of that IMetricsSink are caught. |
| * <p> |
| * 3. When one IMetricsSink throws uncaught exceptions, we would try to restart this sink unless |
| * we have hit the # of retry attempts. |
| * <p> |
| * 4. When Metrics Manager internal exceptions are caught or we have restart IMetricsSink with |
| * too many attempts, Metrics Manager would flush any remain log and exit. |
| */ |
| public class MetricsManager { |
| private static final Logger LOG = Logger.getLogger(MetricsManager.class.getName()); |
| |
| // Pre-defined value |
| private static final String METRICS_MANAGER_HOST = "127.0.0.1"; |
| private static final String METRICS_MANAGER_COMPONENT_NAME = "__metricsmgr__"; |
| private static final int METRICS_MANAGER_INSTANCE_ID = -1; |
| |
| private final MetricsSinksConfig config; |
| |
| private final ExecutorService executors; |
| |
| // A ConcurrentHashMap from sinkId to SinkExecutor |
| private final Map<String, SinkExecutor> sinkExecutors; |
| |
| private final MetricsManagerServer metricsManagerServer; |
| |
| // # of attempts for different IMetricsSink to restart when failure happens |
| private final Map<String, Integer> sinksRetryAttempts; |
| |
| // The looper drives MetricsManagerServer |
| private final NIOLooper metricsManagerServerLoop; |
| |
| private final JVMMetrics jvmMetrics; |
| |
| // MetricsCollector used to collect internal metrics of Metrics Manager |
| private final MetricsCollector metricsCollector; |
| // Communicator to be bind with MetricsCollector to collect metrics |
| private final Communicator<Metrics.MetricPublisherPublishMessage> metricsQueue; |
| private final Metrics.MetricPublisher metricsManagerPublisher; |
| |
| private final Duration heronMetricsExportInterval; |
| private final String topologyName; |
| private final String metricsmgrId; |
| |
| private final long mainThreadId; |
| |
| /** |
| * Metrics manager constructor |
| */ |
| public MetricsManager(String topologyName, String serverHost, |
| int serverPort, String metricsmgrId, |
| SystemConfig systemConfig, MetricsSinksConfig config) |
| throws IOException { |
| this.topologyName = topologyName; |
| this.metricsmgrId = metricsmgrId; |
| this.config = config; |
| this.metricsManagerServerLoop = new NIOLooper(); |
| |
| // Init the Internal Metrics Export related stuff |
| this.metricsManagerPublisher = |
| Metrics.MetricPublisher.newBuilder(). |
| setHostname(getLocalHostName()).setPort(serverPort). |
| setComponentName(METRICS_MANAGER_COMPONENT_NAME). |
| setInstanceId(metricsmgrId). |
| setInstanceIndex(METRICS_MANAGER_INSTANCE_ID).build(); |
| this.jvmMetrics = new JVMMetrics(); |
| this.metricsQueue = |
| new Communicator<Metrics.MetricPublisherPublishMessage>(null, |
| this.metricsManagerServerLoop); |
| this.metricsCollector = new MetricsCollector(metricsManagerServerLoop, metricsQueue); |
| this.heronMetricsExportInterval = systemConfig.getHeronMetricsExportInterval(); |
| |
| this.mainThreadId = Thread.currentThread().getId(); |
| |
| // Init the ErrorReportHandler |
| ErrorReportLoggingHandler.init(metricsCollector, heronMetricsExportInterval, |
| systemConfig.getHeronMetricsMaxExceptionsPerMessageCount()); |
| |
| // Set up the internal Metrics Export routine |
| setupInternalMetricsExport(); |
| |
| // Set up JVM metrics |
| // TODO -- change the config name |
| setupJVMMetrics(systemConfig.getInstanceMetricsSystemSampleInterval()); |
| |
| // Init the HeronSocketOptions |
| HeronSocketOptions serverSocketOptions = |
| new HeronSocketOptions(systemConfig.getMetricsMgrNetworkWriteBatchSize(), |
| systemConfig.getMetricsMgrNetworkWriteBatchTime(), |
| systemConfig.getMetricsMgrNetworkReadBatchSize(), |
| systemConfig.getMetricsMgrNetworkReadBatchTime(), |
| systemConfig.getMetricsMgrNetworkOptionsSocketSendBufferSize(), |
| systemConfig.getMetricsMgrNetworkOptionsSocketReceivedBufferSize(), |
| systemConfig.getMetricsMgrNetworkOptionsMaximumPacketSize()); |
| |
| // Set the MultiCountMetric for MetricsManagerServer |
| MultiCountMetric serverCounters = new MultiCountMetric(); |
| metricsCollector.registerMetric(METRICS_MANAGER_COMPONENT_NAME, |
| serverCounters, (int) heronMetricsExportInterval.getSeconds()); |
| |
| // Construct the MetricsManagerServer |
| metricsManagerServer = new MetricsManagerServer(metricsManagerServerLoop, serverHost, |
| serverPort, serverSocketOptions, serverCounters); |
| |
| executors = Executors.newFixedThreadPool(config.getNumberOfSinks()); |
| sinkExecutors = new ConcurrentHashMap<>(config.getNumberOfSinks()); |
| sinksRetryAttempts = new ConcurrentHashMap<>(config.getNumberOfSinks()); |
| // Add exception handler for any uncaught exception here. |
| Thread.setDefaultUncaughtExceptionHandler(new DefaultExceptionHandler()); |
| |
| // Init the Sinks |
| for (String sinkId : config.getSinkIds()) { |
| // Instantiate a new instance by using reflection |
| SinkExecutor sinkExecutor = initSinkExecutor(sinkId); |
| |
| // Update the SinkExecutor in sinkExecutors |
| sinkExecutors.put(sinkId, sinkExecutor); |
| |
| // Set the retry attempts |
| Object restartAttempts = config.getConfigForSink(sinkId). |
| get(MetricsSinksConfig.CONFIG_KEY_SINK_RESTART_ATTEMPTS); |
| // Supply with default value is config is null |
| sinksRetryAttempts.put(sinkId, |
| restartAttempts == null |
| ? MetricsSinksConfig.DEFAULT_SINK_RESTART_ATTEMPTS |
| : TypeUtils.getInteger(restartAttempts)); |
| |
| // Update the list of Communicator in Metrics Manager Server |
| metricsManagerServer.addSinkCommunicator(sinkExecutor.getCommunicator()); |
| } |
| } |
| |
| private static String getLocalHostName() { |
| String hostName; |
| try { |
| hostName = InetAddress.getLocalHost().getHostName(); |
| } catch (UnknownHostException e) { |
| LOG.severe("Unknown host."); |
| hostName = METRICS_MANAGER_HOST; |
| } |
| |
| return hostName; |
| } |
| |
| public static void main(String[] args) throws IOException { |
| if (args.length != 7) { |
| throw new RuntimeException( |
| "Invalid arguments; Usage: java com.twitter.heron.metricsmgr.MetricsManager " |
| + "<id> <port> <topname> <topid> <heron_internals_config_filename> " |
| + "<override_config_filename> <metrics_sinks_config_filename>"); |
| } |
| |
| String metricsmgrId = args[0]; |
| int metricsPort = Integer.parseInt(args[1]); |
| String topologyName = args[2]; |
| String topologyId = args[3]; |
| String systemConfigFilename = args[4]; |
| String overrideConfigFilename = args[5]; |
| String metricsSinksConfigFilename = args[6]; |
| |
| SystemConfig systemConfig = SystemConfig.newBuilder(true) |
| .putAll(systemConfigFilename, true) |
| .putAll(overrideConfigFilename, true) |
| .build(); |
| |
| // Add the SystemConfig into SingletonRegistry |
| SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, systemConfig); |
| |
| // Init the logging setting and redirect the stdout and stderr to logging |
| // For now we just set the logging level as INFO; later we may accept an argument to set it. |
| Level loggingLevel = Level.INFO; |
| String loggingDir = systemConfig.getHeronLoggingDirectory(); |
| |
| // Log to file and TMaster |
| LoggingHelper.loggerInit(loggingLevel, true); |
| LoggingHelper.addLoggingHandler( |
| LoggingHelper.getFileHandler(metricsmgrId, loggingDir, true, |
| systemConfig.getHeronLoggingMaximumSize(), |
| systemConfig.getHeronLoggingMaximumFiles())); |
| LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler()); |
| |
| LOG.info(String.format("Starting Metrics Manager for topology %s with topologyId %s with " |
| + "Metrics Manager Id %s, Merics Manager Port: %d.", |
| topologyName, topologyId, metricsmgrId, metricsPort)); |
| |
| LOG.info("System Config: " + systemConfig); |
| |
| // Populate the config |
| MetricsSinksConfig sinksConfig = new MetricsSinksConfig(metricsSinksConfigFilename); |
| |
| LOG.info("Sinks Config:" + sinksConfig.toString()); |
| |
| MetricsManager metricsManager = new MetricsManager( |
| topologyName, METRICS_MANAGER_HOST, metricsPort, metricsmgrId, systemConfig, sinksConfig); |
| metricsManager.start(); |
| |
| LOG.info("Loops terminated. Metrics Manager exits."); |
| } |
| |
| private void setupJVMMetrics(Duration systemMetricsSampleInterval) { |
| this.jvmMetrics.registerMetrics(metricsCollector); |
| |
| // Attach sample Runnable to gatewayMetricsCollector |
| this.metricsCollector.registerMetricSampleRunnable(jvmMetrics.getJVMSampleRunnable(), |
| systemMetricsSampleInterval); |
| } |
| |
| private void setupInternalMetricsExport() { |
| Runnable gatherInternalMetrics = new Runnable() { |
| @Override |
| public void run() { |
| while (!metricsQueue.isEmpty()) { |
| Metrics.MetricPublisherPublishMessage message = metricsQueue.poll(); |
| metricsManagerServer.onInternalMessage(metricsManagerPublisher, message); |
| } |
| |
| // It schedules itself in future |
| metricsManagerServerLoop.registerTimerEvent(heronMetricsExportInterval, |
| this); |
| } |
| }; |
| |
| metricsManagerServerLoop.registerTimerEvent(heronMetricsExportInterval, |
| gatherInternalMetrics); |
| } |
| |
| private SinkExecutor initSinkExecutor(String sinkId) { |
| IMetricsSink sink; |
| String classname = |
| (String) config.getConfigForSink(sinkId).get(MetricsSinksConfig.CONFIG_KEY_CLASSNAME); |
| try { |
| sink = (IMetricsSink) Class.forName(classname).newInstance(); |
| } catch (InstantiationException e) { |
| throw new RuntimeException(e + " IMetricsSink class must have a no-arg constructor."); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException(e + " IMetricsSink class must be concrete."); |
| } catch (ClassNotFoundException e) { |
| throw new RuntimeException(e + " IMetricsSink class must be a class path."); |
| } |
| SlaveLooper sinkExecutorLoop = new SlaveLooper(); |
| Communicator<MetricsRecord> executorInMetricsQueue = |
| new Communicator<MetricsRecord>(null, sinkExecutorLoop); |
| |
| // Since MetricsCollector is not thread-safe, |
| // we need to specify individual MetricsCollector and MultiCountMetric |
| // for different SinkExecutor |
| MetricsCollector sinkMetricsCollector = new MetricsCollector(sinkExecutorLoop, metricsQueue); |
| MultiCountMetric internalCounters = new MultiCountMetric(); |
| sinkMetricsCollector |
| .registerMetric(sinkId, internalCounters, (int) heronMetricsExportInterval.getSeconds()); |
| |
| // Set up the SinkContext |
| SinkContext sinkContext = |
| new SinkContextImpl(topologyName, metricsmgrId, sinkId, internalCounters); |
| |
| SinkExecutor sinkExecutor = |
| new SinkExecutor(sinkId, sink, sinkExecutorLoop, executorInMetricsQueue, sinkContext); |
| sinkExecutor.setPropertyMap(config.getConfigForSink(sinkId)); |
| |
| return sinkExecutor; |
| } |
| |
| public void start() { |
| LOG.info("Starting the Executors."); |
| // Execute the SinkExecutor in separate threads |
| for (SinkExecutor executor : sinkExecutors.values()) { |
| executors.execute(executor); |
| } |
| |
| // The MetricsManagerServer would run in the main thread |
| // We do it in the final step since it would await the main thread |
| LOG.info("Starting Metrics Manager Server"); |
| metricsManagerServer.start(); |
| metricsManagerServerLoop.loop(); |
| } |
| |
| /** |
| * Handler for catching exceptions thrown by any threads (owned either by topology or heron |
| * infrastructure). |
| * When one IMetricsSink throws uncaught exceptions, we would try to restart this sink unless |
| * we have hit the # of retry attempts. |
| * When Metrics Manager internal exceptions are caught or we have restart IMetricsSink with |
| * too many attempts, Metrics Manager would flush any remain logs and exit. |
| */ |
| public class DefaultExceptionHandler implements Thread.UncaughtExceptionHandler { |
| |
| /** |
| * Handler for uncaughtException |
| */ |
| public void uncaughtException(Thread thread, Throwable exception) { |
| // Add try and catch block to prevent new exceptions stop the handling thread |
| try { |
| // Delegate to the actual one |
| handleException(thread, exception); |
| |
| // SUPPRESS CHECKSTYLE IllegalCatch |
| } catch (Throwable t) { |
| LOG.log(Level.SEVERE, "Failed to handle exception. Process halting", t); |
| Runtime.getRuntime().halt(1); |
| } |
| } |
| |
| // The actual uncaught exceptions handing logic |
| private void handleException(Thread thread, Throwable exception) { |
| // We would fail fast when errors occur |
| if (exception instanceof Error) { |
| LOG.log(Level.SEVERE, |
| "Error caught in thread: " + thread.getName() |
| + " with thread id: " + thread.getId() + ". Process halting...", |
| exception); |
| Runtime.getRuntime().halt(1); |
| } |
| |
| // We would fail fast when exceptions happen in main thread |
| if (thread.getId() == mainThreadId) { |
| LOG.log(Level.SEVERE, |
| "Exception caught in main thread. Process halting...", |
| exception); |
| Runtime.getRuntime().halt(1); |
| } |
| |
| LOG.log(Level.SEVERE, |
| "Exception caught in thread: " + thread.getName() |
| + " with thread id: " + thread.getId(), |
| exception); |
| |
| String sinkId = null; |
| Integer thisSinkRetryAttempts = 0; |
| |
| // We enforced the name of thread running particular IMetricsSink equal to its sink-id |
| // If the thread name is a key of SinkExecutors, then it is a thread running IMetricsSink |
| if (sinkExecutors.containsKey(thread.getName())) { |
| sinkId = thread.getName(); |
| // Remove the old sink executor |
| SinkExecutor oldSinkExecutor = sinkExecutors.remove(sinkId); |
| // Remove the unneeded Communicator bind with Metrics Manager Server |
| metricsManagerServer.removeSinkCommunicator(oldSinkExecutor.getCommunicator()); |
| |
| // Close the sink |
| SysUtils.closeIgnoringExceptions(oldSinkExecutor); |
| |
| thisSinkRetryAttempts = sinksRetryAttempts.remove(sinkId); |
| } |
| |
| if (sinkId != null && thisSinkRetryAttempts != 0) { |
| LOG.info(String.format("Restarting IMetricsSink: %s with %d available retries", |
| sinkId, thisSinkRetryAttempts)); |
| |
| // That means it was a sinkExecutor throwing exceptions and threadName is sinkId |
| SinkExecutor newSinkExecutor = initSinkExecutor(sinkId); |
| |
| // Update the SinkExecutor in sinkExecutors |
| sinkExecutors.put(sinkId, newSinkExecutor); |
| |
| // Update the retry attempts if it is > 0 |
| if (thisSinkRetryAttempts > 0) { |
| thisSinkRetryAttempts--; |
| } |
| sinksRetryAttempts.put(sinkId, thisSinkRetryAttempts); |
| |
| // Update the list of Communicator in Metrics Manager Server |
| metricsManagerServer.addSinkCommunicator(newSinkExecutor.getCommunicator()); |
| |
| // Restart it |
| executors.execute(newSinkExecutor); |
| } else if (sinkId != null |
| && thisSinkRetryAttempts == 0 |
| && sinkExecutors.size() > 0) { |
| // If the dead executor is the only one executor and it is removed, |
| // e.g. sinkExecutors.size() == 0, we would exit the process directly |
| |
| LOG.severe("Failed to recover from exceptions for IMetricsSink: " + sinkId); |
| LOG.info(sinkId + " would close and keep running rest sinks: " + sinkExecutors.keySet()); |
| } else { |
| // It is not recoverable (retried too many times, or not an exception from IMetricsSink) |
| // So we would do basic cleaning and exit |
| LOG.info("Failed to recover from exceptions; Metrics Manager Exiting"); |
| for (Handler handler : java.util.logging.Logger.getLogger("").getHandlers()) { |
| handler.close(); |
| } |
| // Attempts to shutdown all the thread in threadsPool. This will send Interrupt to every |
| // thread in the pool. Threads may implement a clean Interrupt logic. |
| executors.shutdownNow(); |
| |
| // (including threads not owned by HeronInstance). To be safe, not sending these |
| // interrupts. |
| Runtime.getRuntime().halt(1); |
| } |
| } |
| } |
| } |