blob: 779771f75272d4b68ec9491be4df6ca269c416b0 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.metricscachemgr;
import java.io.IOException;
import java.net.InetAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import com.twitter.heron.common.basics.NIOLooper;
import com.twitter.heron.common.basics.SysUtils;
import com.twitter.heron.common.config.SystemConfig;
import com.twitter.heron.common.network.HeronSocketOptions;
import com.twitter.heron.common.utils.logging.ErrorReportLoggingHandler;
import com.twitter.heron.common.utils.logging.LoggingHelper;
import com.twitter.heron.metricscachemgr.metricscache.MetricsCache;
import com.twitter.heron.metricsmgr.MetricsSinksConfig;
import com.twitter.heron.proto.tmaster.TopologyMaster;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.ConfigLoader;
import com.twitter.heron.spi.common.Context;
import com.twitter.heron.spi.common.Key;
import com.twitter.heron.spi.statemgr.IStateManager;
import com.twitter.heron.spi.utils.ReflectionUtils;
/**
* main entry for metrics cache manager
* MetricsCacheManager holds three major objects:
* 1.MetricsCache metricsCache: it is the store where metrics and exceptions are cached.
* 2.MetricsCacheManagerServer metricsCacheManagerServer: it accepts the metric message from sink.
* 3.MetricsCacheManagerHttpServer metricsCacheManagerHttpServer: it responds to queries.
*/
public class MetricsCacheManager {
private static final Logger LOG = Logger.getLogger(MetricsCacheManager.class.getName());
private static final String METRICS_CACHE_HOST = "0.0.0.0";
private static final String METRICS_CACHE_COMPONENT_NAME = "__metricscachemgr__";
private static final int METRICS_CACHE_INSTANCE_ID = -1;
// accepts messages from sinks
private MetricsCacheManagerServer metricsCacheManagerServer;
// The looper drives MetricsManagerServer
private NIOLooper metricsCacheManagerServerLoop;
// respond to query requests
private MetricsCacheManagerHttpServer metricsCacheManagerHttpServer;
private String topologyName;
private MetricsCache metricsCache;
private Config config;
private TopologyMaster.MetricsCacheLocation metricsCacheLocation;
/**
* Constructor: MetricsCacheManager needs 4 type information:
* 1. Servers: host and 2 ports
* 2. Topology: name
* 3. Config: heron config, sink config, and cli config
* 4. State: location node to be put in the state-mgr
*
* @param topologyName topology name
* @param serverHost server host
* @param masterPort port to accept message from sink
* @param statsPort port to respond to query request
* @param systemConfig heron config
* @param metricsSinkConfig sink config
* @param configExpand other config
* @param metricsCacheLocation location for state mgr
*/
public MetricsCacheManager(String topologyName,
String serverHost, int masterPort, int statsPort,
SystemConfig systemConfig, MetricsSinksConfig metricsSinkConfig,
Config configExpand,
TopologyMaster.MetricsCacheLocation metricsCacheLocation)
throws IOException {
this.topologyName = topologyName;
this.config = configExpand;
this.metricsCacheLocation = metricsCacheLocation;
metricsCacheManagerServerLoop = new NIOLooper();
// initialize cache and hook to the shared nio-looper
metricsCache = new MetricsCache(systemConfig, metricsSinkConfig, metricsCacheManagerServerLoop);
// Init the HeronSocketOptions
HeronSocketOptions serverSocketOptions =
new HeronSocketOptions(systemConfig.getMetricsMgrNetworkWriteBatchSize(),
systemConfig.getMetricsMgrNetworkWriteBatchTime(),
systemConfig.getMetricsMgrNetworkReadBatchSize(),
systemConfig.getMetricsMgrNetworkReadBatchTime(),
systemConfig.getMetricsMgrNetworkOptionsSocketSendBufferSize(),
systemConfig.getMetricsMgrNetworkOptionsSocketReceivedBufferSize(),
systemConfig.getMetricsMgrNetworkOptionsMaximumPacketSize());
// Construct the server to accepts messages from sinks
metricsCacheManagerServer = new MetricsCacheManagerServer(metricsCacheManagerServerLoop,
serverHost, masterPort, serverSocketOptions, metricsCache);
metricsCacheManagerServer.registerOnMessage(TopologyMaster.PublishMetrics.newBuilder());
metricsCacheManagerServer.registerOnRequest(TopologyMaster.MetricRequest.newBuilder());
metricsCacheManagerServer.registerOnRequest(TopologyMaster.ExceptionLogRequest.newBuilder());
// Construct the server to respond to query request
metricsCacheManagerHttpServer = new MetricsCacheManagerHttpServer(metricsCache, statsPort);
}
// Construct all required command line options
private static Options constructOptions() {
Options options = new Options();
Option cluster = Option.builder("c")
.desc("Cluster name in which the topology needs to run on")
.longOpt("cluster")
.hasArgs()
.argName("cluster")
.required()
.build();
Option role = Option.builder("r")
.desc("Role under which the topology needs to run")
.longOpt("role")
.hasArgs()
.argName("role")
.required()
.build();
Option environment = Option.builder("e")
.desc("Environment under which the topology needs to run")
.longOpt("environment")
.hasArgs()
.argName("environment")
.required()
.build();
Option masterPort = Option.builder("m")
.desc("Master port to accept the metric/exception messages from sinks")
.longOpt("master_port")
.hasArgs()
.argName("master port")
.required()
.build();
Option statsPort = Option.builder("s")
.desc("Stats port to respond the queries on metrics/exceptions")
.longOpt("stats_port")
.hasArgs()
.argName("stats port")
.required()
.build();
Option systemConfig = Option.builder("y")
.desc("System configuration file path")
.longOpt("system_config_file")
.hasArgs()
.argName("system config file")
.build();
Option overrideConfig = Option.builder("o")
.desc("Override configuration file path")
.longOpt("override_config_file")
.hasArgs()
.argName("override config file")
.build();
Option sinkConfig = Option.builder("i")
.desc("Sink configuration file path")
.longOpt("sink_config_file")
.hasArgs()
.argName("sink config file")
.build();
Option topologyName = Option.builder("n")
.desc("The topology name")
.longOpt("topology_name")
.hasArgs()
.argName("topology name")
.required()
.build();
Option topologyId = Option.builder("t")
.desc("The topology id")
.longOpt("topology_id")
.hasArgs()
.argName("topology id")
.required()
.build();
Option metricsCacheId = Option.builder("r")
.desc("The MetricsCache id")
.longOpt("metricscache_id")
.hasArgs()
.argName("metricscache id")
.required()
.build();
Option verbose = Option.builder("v")
.desc("Enable debug logs")
.longOpt("verbose")
.build();
options.addOption(cluster);
options.addOption(role);
options.addOption(environment);
options.addOption(masterPort);
options.addOption(statsPort);
options.addOption(systemConfig);
options.addOption(overrideConfig);
options.addOption(sinkConfig);
options.addOption(topologyName);
options.addOption(topologyId);
options.addOption(metricsCacheId);
options.addOption(verbose);
return options;
}
// construct command line help options
private static Options constructHelpOptions() {
Options options = new Options();
Option help = Option.builder("h")
.desc("List all options and their description")
.longOpt("help")
.build();
options.addOption(help);
return options;
}
// Print usage options
private static void usage(Options options) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("MetricsCache", options);
}
public static void main(String[] args) throws Exception {
Options options = constructOptions();
Options helpOptions = constructHelpOptions();
CommandLineParser parser = new DefaultParser();
// parse the help options first.
CommandLine cmd = parser.parse(helpOptions, args, true);
if (cmd.hasOption("h")) {
usage(options);
return;
}
try {
// Now parse the required options
cmd = parser.parse(options, args);
} catch (ParseException e) {
usage(options);
throw new RuntimeException("Error parsing command line options: ", e);
}
Level logLevel = Level.INFO;
if (cmd.hasOption("v")) {
logLevel = Level.ALL;
}
String cluster = cmd.getOptionValue("cluster");
String role = cmd.getOptionValue("role");
String environ = cmd.getOptionValue("environment");
int masterPort = Integer.valueOf(cmd.getOptionValue("master_port"));
int statsPort = Integer.valueOf(cmd.getOptionValue("stats_port"));
String systemConfigFilename = cmd.getOptionValue("system_config_file");
String overrideConfigFilename = cmd.getOptionValue("override_config_file");
String metricsSinksConfigFilename = cmd.getOptionValue("sink_config_file");
String topologyName = cmd.getOptionValue("topology_name");
String topologyId = cmd.getOptionValue("topology_id");
String metricsCacheMgrId = cmd.getOptionValue("metricscache_id");
// read heron internal config file
SystemConfig systemConfig = SystemConfig.newBuilder(true)
.putAll(systemConfigFilename, true)
.putAll(overrideConfigFilename, true)
.build();
// Log to file and sink(exception)
LoggingHelper.loggerInit(logLevel, true);
LoggingHelper.addLoggingHandler(LoggingHelper.getFileHandler(metricsCacheMgrId,
systemConfig.getHeronLoggingDirectory(), true,
systemConfig.getHeronLoggingMaximumSize(),
systemConfig.getHeronLoggingMaximumFiles()));
LoggingHelper.addLoggingHandler(new ErrorReportLoggingHandler());
LOG.info(String.format("Starting MetricsCache for topology %s with topologyId %s with "
+ "MetricsCache Id %s, master port: %d.",
topologyName, topologyId, metricsCacheMgrId, masterPort));
LOG.info("System Config: " + systemConfig);
// read sink config file
MetricsSinksConfig sinksConfig = new MetricsSinksConfig(metricsSinksConfigFilename);
LOG.info("Sinks Config: " + sinksConfig.toString());
// build config from cli
Config config = Config.toClusterMode(Config.newBuilder()
.putAll(ConfigLoader.loadClusterConfig())
.putAll(Config.newBuilder()
.put(Key.CLUSTER, cluster).put(Key.ROLE, role).put(Key.ENVIRON, environ)
.build())
.putAll(Config.newBuilder()
.put(Key.TOPOLOGY_NAME, topologyName).put(Key.TOPOLOGY_ID, topologyId)
.build())
.build());
LOG.info("Cli Config: " + config.toString());
// build metricsCache location
TopologyMaster.MetricsCacheLocation metricsCacheLocation =
TopologyMaster.MetricsCacheLocation.newBuilder()
.setTopologyName(topologyName)
.setTopologyId(topologyId)
.setHost(InetAddress.getLocalHost().getHostName())
.setControllerPort(-1) // not used for metricscache
.setMasterPort(masterPort)
.setStatsPort(statsPort)
.build();
MetricsCacheManager metricsCacheManager = new MetricsCacheManager(
topologyName, METRICS_CACHE_HOST, masterPort, statsPort,
systemConfig, sinksConfig, config, metricsCacheLocation);
metricsCacheManager.start();
LOG.info("Loops terminated. MetricsCache Manager exits.");
}
public void start() throws Exception {
// 1. Do prepare work
// create an instance of state manager
String statemgrClass = Context.stateManagerClass(config);
LOG.info("Context.stateManagerClass " + statemgrClass);
IStateManager statemgr;
try {
statemgr = ReflectionUtils.newInstance(statemgrClass);
} catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
throw new Exception(String.format(
"Failed to instantiate state manager class '%s'",
statemgrClass), e);
}
// Put it in a try block so that we can always clean resources
try {
// initialize the statemgr
statemgr.initialize(config);
statemgr.setMetricsCacheLocation(metricsCacheLocation, topologyName);
LOG.info("metricsCacheLocation " + metricsCacheLocation.toString());
LOG.info("topologyName " + topologyName.toString());
LOG.info("Starting Metrics Cache HTTP Server");
metricsCacheManagerHttpServer.start();
// 2. The MetricsCacheServer would run in the main thread
// We do it in the final step since it would await the main thread
LOG.info("Starting Metrics Cache Server");
metricsCacheManagerServer.start();
metricsCacheManagerServerLoop.loop();
} finally {
// 3. Do post work basing on the result
// Currently nothing to do here
// 4. Close the resources
SysUtils.closeIgnoringExceptions(statemgr);
}
}
}