blob: 9ae356b153f86796fa7284a0924196625d49d253 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.s4.core;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.Inet4Address;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.HelixBasedCommModule;
import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.core.util.ParametersInjectionModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.io.Resources;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import com.google.inject.util.Modules.OverriddenModuleBuilder;
/**
* Bootstrap class for S4. It creates an S4 node.
*
*/
public class Main {
private static final Logger logger = LoggerFactory.getLogger(Main.class);
/**
* Starts an S4 server.
*
* @param args
*/
public static void main(String[] args) {
MainArgs mainArgs = new MainArgs();
JCommander jc = new JCommander(mainArgs);
try {
jc.parse(args);
} catch (Exception e) {
JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
jc.usage();
System.exit(1);
}
startNode(mainArgs);
}
private static void startNode(MainArgs mainArgs) {
try {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught exception in thread {}", t.getName(), e);
}
});
Injector injector;
InputStream commConfigFileInputStream;
InputStream coreConfigFileInputStream;
String commConfigString;
if (mainArgs.commConfigFilePath == null) {
commConfigFileInputStream = Resources.getResource("default.s4.comm.properties").openStream();
commConfigString = "default.s4.comm.properties from classpath";
} else {
commConfigFileInputStream = new FileInputStream(new File(mainArgs.commConfigFilePath));
commConfigString = mainArgs.commConfigFilePath;
}
String coreConfigString;
if (mainArgs.coreConfigFilePath == null) {
coreConfigFileInputStream = Resources.getResource("default.s4.core.properties").openStream();
coreConfigString = "default.s4.core.properties from classpath";
} else {
coreConfigFileInputStream = new FileInputStream(new File(mainArgs.coreConfigFilePath));
coreConfigString = mainArgs.coreConfigFilePath;
}
logger.info(
"Initializing S4 node with : \n- comm module class [{}]\n- comm configuration file [{}]\n- core module class [{}]\n- core configuration file[{}]\n- extra modules: {}\n- inline parameters: {}",
new String[] { mainArgs.commModuleClass, commConfigString, mainArgs.coreModuleClass,
coreConfigString, Arrays.toString(mainArgs.extraModulesClasses.toArray(new String[] {})),
Arrays.toString(mainArgs.extraNamedParameters.toArray(new String[] {})) });
AbstractModule commModule = (AbstractModule) Class.forName(mainArgs.commModuleClass)
.getConstructor(InputStream.class, String.class)
.newInstance(commConfigFileInputStream, mainArgs.clusterName);
AbstractModule coreModule = (AbstractModule) Class.forName(mainArgs.coreModuleClass)
.getConstructor(InputStream.class).newInstance(coreConfigFileInputStream);
List<com.google.inject.Module> extraModules = new ArrayList<com.google.inject.Module>();
for (String moduleClass : mainArgs.extraModulesClasses) {
extraModules.add((Module) Class.forName(moduleClass).newInstance());
}
Module combinedModule = Modules.combine(commModule, coreModule);
if (extraModules.size() > 0) {
OverriddenModuleBuilder overridenModuleBuilder = Modules.override(combinedModule);
combinedModule = overridenModuleBuilder.with(extraModules);
}
if (mainArgs.zkConnectionString != null) {
mainArgs.extraNamedParameters.add("s4.cluster.zk_address=" + mainArgs.zkConnectionString);
}
if (mainArgs.instanceName != null) {
mainArgs.extraNamedParameters.add("s4.instance.name=" + mainArgs.instanceName);
}
if (!mainArgs.extraNamedParameters.isEmpty()) {
logger.debug("Adding named parameters for injection : {}",
Arrays.toString(mainArgs.extraNamedParameters.toArray(new String[] {})));
Map<String, String> namedParameters = new HashMap<String, String>();
for (String namedParam : mainArgs.extraNamedParameters) {
namedParameters.put(namedParam.split("[=]")[0].trim(),
namedParam.substring(namedParam.indexOf('=') + 1).trim());
}
combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
}
injector = Guice.createInjector(combinedModule);
// start a HelixController to manage the cluster
String controllerName = Inet4Address.getLocalHost().getCanonicalHostName() + UUID.randomUUID().toString();
HelixControllerMain.startHelixController(mainArgs.zkConnectionString, mainArgs.clusterName, controllerName,
HelixControllerMain.STANDALONE);
if (mainArgs.appClass != null) {
logger.info("Starting S4 node with single application from class [{}]", mainArgs.appClass);
App app = (App) injector.getInstance(Class.forName(mainArgs.appClass));
app.init();
app.start();
} else {
logger.info("Starting S4 node. This node will automatically download applications published for the cluster it belongs to");
Server server = injector.getInstance(Server.class);
try {
server.start(injector);
} catch (Exception e) {
logger.error("Failed to start the controller.", e);
}
}
} catch (Exception e) {
logger.error("Cannot start S4 node", e);
System.exit(1);
}
}
/**
* Defines command parameters.
*
*/
@Parameters(separators = "=")
public static class MainArgs {
@Parameter(names = { "-c", "-cluster" }, description = "cluster name", required = true)
String clusterName = null;
@Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
String instanceName = null;
@Parameter(names = "-commModuleClass", description = "configuration module class for the communication layer", required = false)
String commModuleClass = HelixBasedCommModule.class.getName();
@Parameter(names = "-commConfig", description = "s4 communication layer configuration file", required = false)
String commConfigFilePath;
@Parameter(names = "-coreModuleClass", description = "s4-core configuration module class", required = false)
String coreModuleClass = DefaultCoreModule.class.getName();
@Parameter(names = "-coreConfig", description = "s4 core configuration file", required = false)
String coreConfigFilePath = null;
@Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
String appClass = null;
@Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Comma-separated list of additional configuration modules (they will be instantiated through their constructor without arguments).", required = false, hidden = false)
List<String> extraModulesClasses = new ArrayList<String>();
@Parameter(names = { "-namedStringParameters", "-p" }, description = "Comma-separated list of inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-p=name1=value1,name2=value2 '", hidden = false, converter = InlineConfigParameterConverter.class)
List<String> extraNamedParameters = new ArrayList<String>();
@Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
String zkConnectionString;
}
/**
* Parameters parsing utility.
*
*/
public static class InlineConfigParameterConverter implements IStringConverter<String> {
@Override
public String convert(String arg) {
Pattern parameterPattern = Pattern.compile("(\\S+=\\S+)");
logger.info("processing inline configuration parameter {}", arg);
Matcher parameterMatcher = parameterPattern.matcher(arg);
if (!parameterMatcher.find()) {
throw new IllegalArgumentException("Cannot understand parameter " + arg);
}
return parameterMatcher.group(1);
}
}
}