blob: bc04220becb4ea4785b9f280e6d43acaeaa8a085 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.providers.agent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ProtocolTypes;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.slider.api.ClusterDescription;
import org.apache.slider.api.ClusterNode;
import org.apache.slider.api.InternalKeys;
import org.apache.slider.api.OptionKeys;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.StatusKeys;
import org.apache.slider.common.SliderExitCodes;
import org.apache.slider.common.SliderKeys;
import org.apache.slider.common.SliderXmlConfKeys;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.conf.AggregateConf;
import org.apache.slider.core.conf.ConfTreeOperations;
import org.apache.slider.core.conf.MapOperations;
import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.NoSuchNodeException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.launch.CommandLineBuilder;
import org.apache.slider.core.launch.ContainerLauncher;
import org.apache.slider.core.registry.docstore.ConfigFormat;
import org.apache.slider.core.registry.docstore.ConfigUtils;
import org.apache.slider.core.registry.docstore.ExportEntry;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
import org.apache.slider.core.registry.docstore.PublishedConfigurationOutputter;
import org.apache.slider.core.registry.docstore.PublishedExports;
import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.providers.AbstractProviderService;
import org.apache.slider.providers.MonitorDetail;
import org.apache.slider.providers.ProviderCore;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
import org.apache.slider.providers.agent.application.metadata.AbstractComponent;
import org.apache.slider.providers.agent.application.metadata.Application;
import org.apache.slider.providers.agent.application.metadata.CommandScript;
import org.apache.slider.providers.agent.application.metadata.Component;
import org.apache.slider.providers.agent.application.metadata.ComponentCommand;
import org.apache.slider.providers.agent.application.metadata.ComponentExport;
import org.apache.slider.providers.agent.application.metadata.ComponentsInAddonPackage;
import org.apache.slider.providers.agent.application.metadata.ConfigFile;
import org.apache.slider.providers.agent.application.metadata.DefaultConfig;
import org.apache.slider.providers.agent.application.metadata.DockerContainer;
import org.apache.slider.providers.agent.application.metadata.Export;
import org.apache.slider.providers.agent.application.metadata.ExportGroup;
import org.apache.slider.providers.agent.application.metadata.Metainfo;
import org.apache.slider.providers.agent.application.metadata.OSPackage;
import org.apache.slider.providers.agent.application.metadata.OSSpecific;
import org.apache.slider.providers.agent.application.metadata.Package;
import org.apache.slider.providers.agent.application.metadata.PropertyInfo;
import org.apache.slider.server.appmaster.actions.ProviderReportedContainerLoss;
import org.apache.slider.server.appmaster.actions.RegisterComponentInstance;
import org.apache.slider.server.appmaster.state.ContainerPriority;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType;
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
import org.apache.slider.server.appmaster.web.rest.agent.CommandReport;
import org.apache.slider.server.appmaster.web.rest.agent.ComponentStatus;
import org.apache.slider.server.appmaster.web.rest.agent.ExecutionCommand;
import org.apache.slider.server.appmaster.web.rest.agent.HeartBeat;
import org.apache.slider.server.appmaster.web.rest.agent.HeartBeatResponse;
import org.apache.slider.server.appmaster.web.rest.agent.Register;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand;
import org.apache.slider.server.services.security.CertificateManager;
import org.apache.slider.server.services.security.SecurityStore;
import org.apache.slider.server.services.security.StoresGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.slider.server.appmaster.web.rest.RestPaths.SLIDER_PATH_AGENTS;
/**
* This class implements the server-side logic for application deployment through Slider application package
*/
public class AgentProviderService extends AbstractProviderService implements
ProviderCore,
AgentKeys,
SliderKeys, AgentRestOperations {
protected static final Logger log =
LoggerFactory.getLogger(AgentProviderService.class);
private static final ProviderUtils providerUtils = new ProviderUtils(log);
private static final String LABEL_MAKER = "___";
private static final String CONTAINER_ID = "container_id";
private static final String GLOBAL_CONFIG_TAG = "global";
private static final String LOG_FOLDERS_TAG = "LogFolders";
private static final String HOST_FOLDER_FORMAT = "%s:%s";
private static final String CONTAINER_LOGS_TAG = "container_log_dirs";
private static final String CONTAINER_PWDS_TAG = "container_work_dirs";
private static final String COMPONENT_TAG = "component";
private static final String APPLICATION_TAG = "application";
private static final String COMPONENT_DATA_TAG = "ComponentInstanceData";
private static final String SHARED_PORT_TAG = "SHARED";
private static final String PER_CONTAINER_TAG = "{PER_CONTAINER}";
private static final int MAX_LOG_ENTRIES = 40;
private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000;
private final Object syncLock = new Object();
private final ComponentTagProvider tags = new ComponentTagProvider();
private int heartbeatMonitorInterval = 0;
private AgentClientProvider clientProvider;
private AtomicInteger taskId = new AtomicInteger(0);
private volatile Metainfo metaInfo = null;
private AggregateConf instanceDefinition = null;
private SliderFileSystem fileSystem = null;
private Map<String, DefaultConfig> defaultConfigs = null;
private ComponentCommandOrder commandOrder = null;
private HeartbeatMonitor monitor;
private Boolean canAnyMasterPublish = null;
private AgentLaunchParameter agentLaunchParameter = null;
private String clusterName = null;
private boolean isInUpgradeMode;
private Set<String> upgradeContainers = new HashSet<String>();
private boolean appStopInitiated;
private final Map<String, ComponentInstanceState> componentStatuses =
new ConcurrentHashMap<String, ComponentInstanceState>();
private final Map<String, Map<String, String>> componentInstanceData =
new ConcurrentHashMap<String, Map<String, String>>();
private final Map<String, Map<String, List<ExportEntry>>> exportGroups =
new ConcurrentHashMap<String, Map<String, List<ExportEntry>>>();
private final Map<String, Map<String, String>> allocatedPorts =
new ConcurrentHashMap<String, Map<String, String>>();
private final Map<String, Metainfo> packageMetainfo =
new ConcurrentHashMap<String, Metainfo>();
private final Map<String, ExportEntry> logFolderExports =
Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) {
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > MAX_LOG_ENTRIES;
}
});
private final Map<String, ExportEntry> workFolderExports =
Collections.synchronizedMap(new LinkedHashMap<String, ExportEntry>(MAX_LOG_ENTRIES, 0.75f, false) {
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > MAX_LOG_ENTRIES;
}
});
private final Map<String, Set<String>> containerExportsMap =
new HashMap<String, Set<String>>();
/**
* Create an instance of AgentProviderService
*/
public AgentProviderService() {
super("AgentProviderService");
setAgentRestOperations(this);
setHeartbeatMonitorInterval(DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
}
@Override
public String getHumanName() {
return "Slider Agent";
}
@Override
public List<ProviderRole> getRoles() {
return AgentRoles.getRoles();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
clientProvider = new AgentClientProvider(conf);
}
@Override
public Configuration loadProviderConfigurationInformation(File confDir) throws
BadCommandArgumentsException,
IOException {
return new Configuration(false);
}
@Override
public void validateInstanceDefinition(AggregateConf instanceDefinition)
throws
SliderException {
clientProvider.validateInstanceDefinition(instanceDefinition, null);
ConfTreeOperations resources =
instanceDefinition.getResourceOperations();
Set<String> names = resources.getComponentNames();
names.remove(SliderKeys.COMPONENT_AM);
for (String name : names) {
Component componentDef = getMetaInfo().getApplicationComponent(name);
if (componentDef == null) {
throw new BadConfigException(
"Component %s is not a member of application.", name);
}
MapOperations componentConfig = resources.getMandatoryComponent(name);
int count =
componentConfig.getMandatoryOptionInt(ResourceKeys.COMPONENT_INSTANCES);
int definedMinCount = componentDef.getMinInstanceCountInt();
int definedMaxCount = componentDef.getMaxInstanceCountInt();
if (count < definedMinCount || count > definedMaxCount) {
throw new BadConfigException("Component %s, %s value %d out of range. "
+ "Expected minimum is %d and maximum is %d",
name,
ResourceKeys.COMPONENT_INSTANCES,
count,
definedMinCount,
definedMaxCount);
}
}
}
// Reads the metainfo.xml in the application package and loads it
private void buildMetainfo(AggregateConf instanceDefinition,
SliderFileSystem fileSystem) throws IOException, SliderException {
String appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition
.getAppConfOperations());
if (metaInfo == null) {
synchronized (syncLock) {
if (metaInfo == null) {
this.instanceDefinition = instanceDefinition;
this.fileSystem = fileSystem;
readAndSetHeartbeatMonitoringInterval(instanceDefinition);
initializeAgentDebugCommands(instanceDefinition);
metaInfo = getApplicationMetainfo(fileSystem, appDef, false);
log.info("Master package metainfo: {}", metaInfo.toString());
if (metaInfo == null || metaInfo.getApplication() == null) {
log.error("metainfo.xml is unavailable or malformed at {}.", appDef);
throw new SliderException(
"metainfo.xml is required in app package.");
}
commandOrder = new ComponentCommandOrder(metaInfo.getApplication().getCommandOrders());
defaultConfigs = initializeDefaultConfigs(fileSystem, appDef, metaInfo);
monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval());
monitor.start();
// build a map from component to metainfo
String addonAppDefString = instanceDefinition.getAppConfOperations()
.getGlobalOptions().getOption(AgentKeys.ADDONS, null);
log.debug("All addon appdefs: {}", addonAppDefString);
if (addonAppDefString != null) {
Scanner scanner = new Scanner(addonAppDefString).useDelimiter(",");
while (scanner.hasNext()) {
String addonAppDef = scanner.next();
String addonAppDefPath = instanceDefinition
.getAppConfOperations().getGlobalOptions().get(addonAppDef);
log.debug("Addon package {} is stored at: {}", addonAppDef
+ addonAppDefPath);
Metainfo addonMetaInfo = getApplicationMetainfo(fileSystem,
addonAppDefPath, true);
addonMetaInfo.validate();
packageMetainfo.put(addonMetaInfo.getApplicationPackage()
.getName(), addonMetaInfo);
}
log.info("Metainfo map for master and addon: {}",
packageMetainfo.toString());
}
}
}
}
}
@Override
public void initializeApplicationConfiguration(
AggregateConf instanceDefinition, SliderFileSystem fileSystem)
throws IOException, SliderException {
buildMetainfo(instanceDefinition, fileSystem);
}
@Override
public void buildContainerLaunchContext(ContainerLauncher launcher,
AggregateConf instanceDefinition,
Container container,
ProviderRole providerRole,
SliderFileSystem fileSystem,
Path generatedConfPath,
MapOperations resourceComponent,
MapOperations appComponent,
Path containerTmpDirPath) throws
IOException,
SliderException {
String roleName = providerRole.name;
String roleGroup = providerRole.group;
String appDef = SliderUtils.getApplicationDefinitionPath(instanceDefinition
.getAppConfOperations());
initializeApplicationConfiguration(instanceDefinition, fileSystem);
log.info("Build launch context for Agent");
log.debug(instanceDefinition.toString());
//if we are launching docker based app on yarn, then we need to pass docker image
if (isYarnDockerContainer(roleGroup)) {
launcher.setYarnDockerMode(true);
launcher.setDockerImage(getConfigFromMetaInfo(roleGroup, "image"));
launcher.setRunPrivilegedContainer(getConfigFromMetaInfo(roleGroup, "runPriviledgedContainer"));
launcher
.setYarnContainerMountPoints(getConfigFromMetaInfoWithAppConfigOverriding(
roleGroup, "yarn.container.mount.points"));
}
// Set the environment
launcher.putEnv(SliderUtils.buildEnvMap(appComponent,
getStandardTokenMap(getAmState().getAppConfSnapshot(), roleName, roleGroup)));
String workDir = ApplicationConstants.Environment.PWD.$();
launcher.setEnv("AGENT_WORK_ROOT", workDir);
log.info("AGENT_WORK_ROOT set to {}", workDir);
String logDir = ApplicationConstants.LOG_DIR_EXPANSION_VAR;
launcher.setEnv("AGENT_LOG_ROOT", logDir);
log.info("AGENT_LOG_ROOT set to {}", logDir);
if (System.getenv(HADOOP_USER_NAME) != null) {
launcher.setEnv(HADOOP_USER_NAME, System.getenv(HADOOP_USER_NAME));
}
// for 2-Way SSL
launcher.setEnv(SLIDER_PASSPHRASE, instanceDefinition.getPassphrase());
//add english env
launcher.setEnv("LANG", "en_US.UTF-8");
launcher.setEnv("LC_ALL", "en_US.UTF-8");
launcher.setEnv("LANGUAGE", "en_US.UTF-8");
//local resources
// TODO: Should agent need to support App Home
String scriptPath = new File(AgentKeys.AGENT_MAIN_SCRIPT_ROOT, AgentKeys.AGENT_MAIN_SCRIPT).getPath();
String appHome = instanceDefinition.getAppConfOperations().
getGlobalOptions().get(AgentKeys.PACKAGE_PATH);
if (SliderUtils.isSet(appHome)) {
scriptPath = new File(appHome, AgentKeys.AGENT_MAIN_SCRIPT).getPath();
}
// set PYTHONPATH
List<String> pythonPaths = new ArrayList<String>();
pythonPaths.add(AgentKeys.AGENT_MAIN_SCRIPT_ROOT);
pythonPaths.add(AgentKeys.AGENT_JINJA2_ROOT);
String pythonPath = StringUtils.join(File.pathSeparator, pythonPaths);
launcher.setEnv(PYTHONPATH, pythonPath);
log.info("PYTHONPATH set to {}", pythonPath);
Path agentImagePath = null;
String agentImage = instanceDefinition.getInternalOperations().
get(InternalKeys.INTERNAL_APPLICATION_IMAGE_PATH);
if (SliderUtils.isUnset(agentImage)) {
agentImagePath =
new Path(new Path(new Path(instanceDefinition.getInternalOperations().get(InternalKeys.INTERNAL_TMP_DIR),
container.getId().getApplicationAttemptId().getApplicationId().toString()),
AgentKeys.PROVIDER_AGENT),
SliderKeys.AGENT_TAR);
} else {
agentImagePath = new Path(agentImage);
}
if (fileSystem.getFileSystem().exists(agentImagePath)) {
LocalResource agentImageRes = fileSystem.createAmResource(agentImagePath, LocalResourceType.ARCHIVE);
launcher.addLocalResource(AgentKeys.AGENT_INSTALL_DIR, agentImageRes);
} else {
String msg =
String.format("Required agent image slider-agent.tar.gz is unavailable at %s", agentImagePath.toString());
MapOperations compOps = appComponent;
boolean relaxVerificationForTest = compOps != null ? Boolean.valueOf(compOps.
getOptionBool(AgentKeys.TEST_RELAX_VERIFICATION, false)) : false;
log.error(msg);
if (!relaxVerificationForTest) {
throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, msg);
}
}
log.info("Using {} for agent.", scriptPath);
LocalResource appDefRes = fileSystem.createAmResource(
fileSystem.getFileSystem().resolvePath(new Path(appDef)),
LocalResourceType.ARCHIVE);
launcher.addLocalResource(AgentKeys.APP_DEFINITION_DIR, appDefRes);
for (Package pkg : getMetaInfo().getApplication().getPackages()) {
Path pkgPath = fileSystem.buildResourcePath(pkg.getName());
if (!fileSystem.isFile(pkgPath)) {
pkgPath = fileSystem.buildResourcePath(getClusterName(),
pkg.getName());
}
if (!fileSystem.isFile(pkgPath)) {
throw new IOException("Package doesn't exist as a resource: " +
pkg.getName());
}
log.info("Adding resource {}", pkg.getName());
LocalResourceType type = LocalResourceType.FILE;
if ("archive".equals(pkg.getType())) {
type = LocalResourceType.ARCHIVE;
}
LocalResource packageResource = fileSystem.createAmResource(
pkgPath, type);
launcher.addLocalResource(AgentKeys.APP_PACKAGES_DIR, packageResource);
}
String agentConf = instanceDefinition.getAppConfOperations().
getGlobalOptions().getOption(AgentKeys.AGENT_CONF, "");
if (SliderUtils.isSet(agentConf)) {
LocalResource agentConfRes = fileSystem.createAmResource(fileSystem
.getFileSystem().resolvePath(new Path(agentConf)),
LocalResourceType.FILE);
launcher.addLocalResource(AgentKeys.AGENT_CONFIG_FILE, agentConfRes);
}
String agentVer = instanceDefinition.getAppConfOperations().
getGlobalOptions().getOption(AgentKeys.AGENT_VERSION, null);
if (agentVer != null) {
LocalResource agentVerRes = fileSystem.createAmResource(
fileSystem.getFileSystem().resolvePath(new Path(agentVer)),
LocalResourceType.FILE);
launcher.addLocalResource(AgentKeys.AGENT_VERSION_FILE, agentVerRes);
}
if (SliderUtils.isHadoopClusterSecure(getConfig())) {
localizeServiceKeytabs(launcher, instanceDefinition, fileSystem);
}
MapOperations amComponent = instanceDefinition.
getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM);
boolean twoWayEnabled = amComponent != null ? Boolean.valueOf(amComponent.
getOptionBool(AgentKeys.KEY_AGENT_TWO_WAY_SSL_ENABLED, false)) : false;
if (twoWayEnabled) {
localizeContainerSSLResources(launcher, container, fileSystem);
}
MapOperations compOps = appComponent;
if (areStoresRequested(compOps)) {
localizeContainerSecurityStores(launcher, container, roleName, fileSystem,
instanceDefinition, compOps);
}
//add the configuration resources
launcher.addLocalResources(fileSystem.submitDirectory(
generatedConfPath,
SliderKeys.PROPAGATED_CONF_DIR_NAME));
if (appComponent.getOptionBool(AgentKeys.AM_CONFIG_GENERATION, false)) {
// build and localize configuration files
Map<String, Map<String, String>> configurations =
buildCommandConfigurations(instanceDefinition.getAppConfOperations(),
container.getId().toString(), roleName, roleGroup);
localizeConfigFiles(launcher, roleName, roleGroup, getMetaInfo(),
configurations, launcher.getEnv(), fileSystem);
}
String label = getContainerLabel(container, roleName, roleGroup);
CommandLineBuilder operation = new CommandLineBuilder();
String pythonExec = instanceDefinition.getAppConfOperations()
.getGlobalOptions().getOption(SliderXmlConfKeys.PYTHON_EXECUTABLE_PATH,
AgentKeys.PYTHON_EXE);
operation.add(pythonExec);
operation.add(scriptPath);
operation.add(ARG_LABEL, label);
operation.add(ARG_ZOOKEEPER_QUORUM);
operation.add(getClusterOptionPropertyValue(OptionKeys.ZOOKEEPER_QUORUM));
operation.add(ARG_ZOOKEEPER_REGISTRY_PATH);
operation.add(getZkRegistryPath());
String debugCmd = agentLaunchParameter.getNextLaunchParameter(roleGroup);
if (SliderUtils.isSet(debugCmd)) {
operation.add(ARG_DEBUG);
operation.add(debugCmd);
}
operation.add("> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/"
+ AgentKeys.AGENT_OUT_FILE + " 2>&1");
launcher.addCommand(operation.build());
// localize addon package
String addonAppDefString = instanceDefinition.getAppConfOperations()
.getGlobalOptions().getOption(AgentKeys.ADDONS, null);
log.debug("All addon appdefs: {}", addonAppDefString);
if (addonAppDefString != null) {
Scanner scanner = new Scanner(addonAppDefString).useDelimiter(",");
while (scanner.hasNext()) {
String addonAppDef = scanner.next();
String addonAppDefPath = instanceDefinition
.getAppConfOperations().getGlobalOptions().get(addonAppDef);
log.debug("Addon package {} is stored at: {}", addonAppDef, addonAppDefPath);
LocalResource addonPkgRes = fileSystem.createAmResource(
fileSystem.getFileSystem().resolvePath(new Path(addonAppDefPath)),
LocalResourceType.ARCHIVE);
launcher.addLocalResource(AgentKeys.ADDON_DEFINITION_DIR + "/" + addonAppDef, addonPkgRes);
}
log.debug("Metainfo map for master and addon: {}",
packageMetainfo.toString());
}
// Additional files to localize in addition to the application def
String appResourcesString = instanceDefinition.getAppConfOperations()
.getGlobalOptions().getOption(AgentKeys.APP_RESOURCES, null);
log.info("Configuration value for extra resources to localize: {}", appResourcesString);
if (null != appResourcesString) {
try (Scanner scanner = new Scanner(appResourcesString).useDelimiter(",")) {
while (scanner.hasNext()) {
String resource = scanner.next();
Path resourcePath = new Path(resource);
LocalResource extraResource = fileSystem.createAmResource(
fileSystem.getFileSystem().resolvePath(resourcePath),
LocalResourceType.FILE);
String destination = AgentKeys.APP_RESOURCES_DIR + "/" + resourcePath.getName();
log.info("Localizing {} to {}", resourcePath, destination);
// TODO Can we try harder to avoid collisions?
launcher.addLocalResource(destination, extraResource);
}
}
}
// initialize addon pkg states for all componentInstanceStatus
Map<String, State> pkgStatuses = new TreeMap<>();
for (Metainfo appPkg : packageMetainfo.values()) {
// check each component of that addon to see if they apply to this
// component 'role'
for (ComponentsInAddonPackage comp : appPkg.getApplicationPackage()
.getComponents()) {
log.debug("Current component: {} component in metainfo: {}", roleName,
comp.getName());
if (comp.getName().equals(roleGroup)
|| comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) {
pkgStatuses.put(appPkg.getApplicationPackage().getName(), State.INIT);
}
}
}
log.debug("For component: {} pkg status map: {}", roleName,
pkgStatuses.toString());
// initialize the component instance state
getComponentStatuses().put(label,
new ComponentInstanceState(
roleName,
container.getId(),
getClusterInfoPropertyValue(OptionKeys.APPLICATION_NAME),
pkgStatuses));
}
private void localizeContainerSecurityStores(ContainerLauncher launcher,
Container container,
String role,
SliderFileSystem fileSystem,
AggregateConf instanceDefinition,
MapOperations compOps)
throws SliderException, IOException {
// generate and localize security stores
SecurityStore[] stores = generateSecurityStores(container, role,
instanceDefinition, compOps);
for (SecurityStore store : stores) {
LocalResource keystoreResource = fileSystem.createAmResource(
uploadSecurityResource(store.getFile(), fileSystem), LocalResourceType.FILE);
launcher.addLocalResource(String.format("secstores/%s-%s.p12",
store.getType(), role),
keystoreResource);
}
}
private SecurityStore[] generateSecurityStores(Container container,
String role,
AggregateConf instanceDefinition,
MapOperations compOps)
throws SliderException, IOException {
return StoresGenerator.generateSecurityStores(container.getNodeId().getHost(),
container.getId().toString(), role,
instanceDefinition, compOps);
}
private boolean areStoresRequested(MapOperations compOps) {
return compOps != null ? compOps.
getOptionBool(SliderKeys.COMP_STORES_REQUIRED_KEY, false) : false;
}
private void localizeContainerSSLResources(ContainerLauncher launcher,
Container container,
SliderFileSystem fileSystem)
throws SliderException {
try {
// localize server cert
Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName());
LocalResource certResource = fileSystem.createAmResource(
new Path(certsDir, SliderKeys.CRT_FILE_NAME),
LocalResourceType.FILE);
launcher.addLocalResource(AgentKeys.CERT_FILE_LOCALIZATION_PATH,
certResource);
// generate and localize agent cert
CertificateManager certMgr = new CertificateManager();
String hostname = container.getNodeId().getHost();
String containerId = container.getId().toString();
certMgr.generateContainerCertificate(hostname, containerId);
LocalResource agentCertResource = fileSystem.createAmResource(
uploadSecurityResource(
CertificateManager.getAgentCertficateFilePath(containerId),
fileSystem), LocalResourceType.FILE);
// still using hostname as file name on the agent side, but the files
// do end up under the specific container's file space
launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname +
".crt", agentCertResource);
LocalResource agentKeyResource = fileSystem.createAmResource(
uploadSecurityResource(
CertificateManager.getAgentKeyFilePath(containerId), fileSystem),
LocalResourceType.FILE);
launcher.addLocalResource(AgentKeys.INFRA_RUN_SECURITY_DIR + hostname +
".key", agentKeyResource);
} catch (Exception e) {
throw new SliderException(SliderExitCodes.EXIT_DEPLOYMENT_FAILED, e,
"Unable to localize certificates. Two-way SSL cannot be enabled");
}
}
private Path uploadSecurityResource(File resource, SliderFileSystem fileSystem)
throws IOException {
Path certsDir = fileSystem.buildClusterSecurityDirPath(getClusterName());
return uploadResource(resource, fileSystem, certsDir);
}
private Path uploadResource(File resource, SliderFileSystem fileSystem,
String roleName) throws IOException {
Path dir;
if (roleName == null) {
dir = fileSystem.buildClusterResourcePath(getClusterName());
} else {
dir = fileSystem.buildClusterResourcePath(getClusterName(), roleName);
}
return uploadResource(resource, fileSystem, dir);
}
private static synchronized Path uploadResource(File resource,
SliderFileSystem fileSystem, Path parentDir) throws IOException {
if (!fileSystem.getFileSystem().exists(parentDir)) {
fileSystem.getFileSystem().mkdirs(parentDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
}
Path destPath = new Path(parentDir, resource.getName());
if (!fileSystem.getFileSystem().exists(destPath)) {
FSDataOutputStream os = fileSystem.getFileSystem().create(destPath);
byte[] contents = FileUtils.readFileToByteArray(resource);
os.write(contents, 0, contents.length);
os.flush();
os.close();
log.info("Uploaded {} to localization path {}", resource, destPath);
} else {
log.info("Resource {} already existed at localization path {}", resource,
destPath);
}
while (!fileSystem.getFileSystem().exists(destPath)) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
}
fileSystem.getFileSystem().setPermission(destPath,
new FsPermission(FsAction.READ, FsAction.NONE, FsAction.NONE));
return destPath;
}
private void localizeServiceKeytabs(ContainerLauncher launcher,
AggregateConf instanceDefinition,
SliderFileSystem fileSystem)
throws IOException {
String keytabPathOnHost = instanceDefinition.getAppConfOperations()
.getComponent(SliderKeys.COMPONENT_AM).get(
SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
if (SliderUtils.isUnset(keytabPathOnHost)) {
String amKeytabName = instanceDefinition.getAppConfOperations()
.getComponent(SliderKeys.COMPONENT_AM).get(
SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
String keytabDir = instanceDefinition.getAppConfOperations()
.getComponent(SliderKeys.COMPONENT_AM).get(
SliderXmlConfKeys.KEY_HDFS_KEYTAB_DIR);
// we need to localize the keytab files in the directory
Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null,
getClusterName());
boolean serviceKeytabsDeployed = false;
if (fileSystem.getFileSystem().exists(keytabDirPath)) {
FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(keytabDirPath);
LocalResource keytabRes;
for (FileStatus keytab : keytabs) {
if (!amKeytabName.equals(keytab.getPath().getName())
&& keytab.getPath().getName().endsWith(".keytab")) {
serviceKeytabsDeployed = true;
log.info("Localizing keytab {}", keytab.getPath().getName());
keytabRes = fileSystem.createAmResource(keytab.getPath(),
LocalResourceType.FILE);
launcher.addLocalResource(SliderKeys.KEYTAB_DIR + "/" +
keytab.getPath().getName(),
keytabRes);
}
}
}
if (!serviceKeytabsDeployed) {
log.warn("No service keytabs for the application have been localized. "
+ "If the application requires keytabs for secure operation, "
+ "please ensure that the required keytabs have been uploaded "
+ "to the folder {}", keytabDirPath);
}
}
}
private void createConfigFile(SliderFileSystem fileSystem, File file,
ConfigFile configFile, Map<String, String> config)
throws IOException {
ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType());
log.info("Writing {} file {}", configFormat, file);
ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
fileSystem, getClusterName(), file.getName());
PublishedConfiguration publishedConfiguration =
new PublishedConfiguration(configFile.getDictionaryName(),
config.entrySet());
PublishedConfigurationOutputter configurationOutputter =
PublishedConfigurationOutputter.createOutputter(configFormat,
publishedConfiguration);
configurationOutputter.save(file);
}
@VisibleForTesting
protected void localizeConfigFiles(ContainerLauncher launcher,
String roleName, String roleGroup,
Metainfo metainfo,
Map<String, Map<String, String>> configs,
MapOperations env,
SliderFileSystem fileSystem)
throws IOException {
for (ConfigFile configFile : metainfo.getComponentConfigFiles(roleGroup)) {
Map<String, String> config = ConfigUtils.replacePropsInConfig(
configs.get(configFile.getDictionaryName()), env.options);
String fileName = ConfigUtils.replaceProps(config,
configFile.getFileName());
File localFile = new File(SliderKeys.RESOURCE_DIR);
if (!localFile.exists()) {
localFile.mkdir();
}
localFile = new File(localFile, new File(fileName).getName());
String folder = null;
if ("true".equals(config.get(PER_COMPONENT))) {
folder = roleName;
} else if ("true".equals(config.get(PER_GROUP))) {
folder = roleGroup;
}
log.info("Localizing {} configs to config file {} (destination {}) " +
"based on {} configs", config.size(), localFile, fileName,
configFile.getDictionaryName());
createConfigFile(fileSystem, localFile, configFile, config);
Path destPath = uploadResource(localFile, fileSystem, folder);
LocalResource configResource = fileSystem.createAmResource(destPath,
LocalResourceType.FILE);
File destFile = new File(fileName);
if (destFile.isAbsolute()) {
launcher.addLocalResource(
SliderKeys.RESOURCE_DIR + "/" + destFile.getName(),
configResource, fileName);
} else {
launcher.addLocalResource(AgentKeys.APP_CONF_DIR + "/" + fileName,
configResource);
}
}
}
/**
* build the zookeeper registry path.
*
* @return the path the service registered at
* @throws NullPointerException if the service has not yet registered
*/
private String getZkRegistryPath() {
Preconditions.checkNotNull(yarnRegistry, "Yarn registry not bound");
String path = yarnRegistry.getAbsoluteSelfRegistrationPath();
Preconditions.checkNotNull(path, "Service record path not defined");
return path;
}
@Override
public void rebuildContainerDetails(List<Container> liveContainers,
String applicationId, Map<Integer, ProviderRole> providerRoleMap) {
for (Container container : liveContainers) {
// get the role name and label
ProviderRole role = providerRoleMap.get(ContainerPriority
.extractRole(container));
if (role != null) {
String roleName = role.name;
String label = getContainerLabel(container, roleName, role.group);
log.info("Rebuilding in-memory: container {} in role {} in cluster {}",
container.getId(), roleName, applicationId);
getComponentStatuses().put(label,
new ComponentInstanceState(roleName, container.getId(),
applicationId));
} else {
log.warn("Role not found for container {} in cluster {}",
container.getId(), applicationId);
}
}
}
@Override
public boolean isSupportedRole(String role) {
return true;
}
/**
* Handle registration calls from the agents
*
* @param registration registration entry
*
* @return response
*/
@Override
public RegistrationResponse handleRegistration(Register registration) {
log.info("Handling registration: {}", registration);
RegistrationResponse response = new RegistrationResponse();
String label = registration.getLabel();
String pkg = registration.getPkg();
State agentState = registration.getActualState();
String appVersion = registration.getAppVersion();
log.info("label: {} pkg: {}", label, pkg);
if (getComponentStatuses().containsKey(label)) {
response.setResponseStatus(RegistrationStatus.OK);
ComponentInstanceState componentStatus = getComponentStatuses().get(label);
componentStatus.heartbeat(System.currentTimeMillis());
updateComponentStatusWithAgentState(componentStatus, agentState);
String roleName = getRoleName(label);
String roleGroup = getRoleGroup(label);
String containerId = getContainerId(label);
if (SliderUtils.isSet(registration.getTags())) {
tags.recordAssignedTag(roleName, containerId, registration.getTags());
} else {
response.setTags(tags.getTag(roleName, containerId));
}
String hostFqdn = registration.getPublicHostname();
Map<String, String> ports = registration.getAllocatedPorts();
if (ports != null && !ports.isEmpty()) {
processAllocatedPorts(hostFqdn, roleName, roleGroup, containerId, ports);
}
Map<String, String> folders = registration.getLogFolders();
if (folders != null && !folders.isEmpty()) {
publishFolderPaths(folders, containerId, roleName, hostFqdn);
}
// Set app version if empty. It gets unset during upgrade - why?
checkAndSetContainerAppVersion(containerId, appVersion);
} else {
response.setResponseStatus(RegistrationStatus.FAILED);
response.setLog("Label not recognized.");
log.warn("Received registration request from unknown label {}", label);
}
log.info("Registration response: {}", response);
return response;
}
// Checks if app version is empty. Sets it to the version as reported by the
// container during registration phase.
private void checkAndSetContainerAppVersion(String containerId,
String appVersion) {
StateAccessForProviders amState = getAmState();
try {
RoleInstance role = amState.getOwnedContainer(containerId);
if (role != null) {
String currentAppVersion = role.appVersion;
log.debug("Container = {}, app version current = {} new = {}",
containerId, currentAppVersion, appVersion);
if (currentAppVersion == null
|| currentAppVersion.equals(APP_VERSION_UNKNOWN)) {
amState.getOwnedContainer(containerId).appVersion = appVersion;
}
}
} catch (NoSuchNodeException e) {
// ignore - there is nothing to do if we don't find a container
log.warn("Owned container {} not found - {}", containerId, e);
}
}
/**
* Handle heartbeat response from agents
*
* @param heartBeat incoming heartbeat from Agent
*
* @return response to send back
*/
@Override
public HeartBeatResponse handleHeartBeat(HeartBeat heartBeat) {
log.debug("Handling heartbeat: {}", heartBeat);
HeartBeatResponse response = new HeartBeatResponse();
long id = heartBeat.getResponseId();
response.setResponseId(id + 1L);
String label = heartBeat.getHostname();
String pkg = heartBeat.getPackage();
log.debug("package received: " + pkg);
String roleName = getRoleName(label);
String roleGroup = getRoleGroup(label);
String containerId = getContainerId(label);
boolean doUpgrade = false;
if (isInUpgradeMode && upgradeContainers.contains(containerId)) {
doUpgrade = true;
}
StateAccessForProviders accessor = getAmState();
CommandScript cmdScript = getScriptPathForMasterPackage(roleGroup);
List<ComponentCommand> commands = getMetaInfo().getApplicationComponent(roleGroup).getCommands();
if (!isDockerContainer(roleGroup) && !isYarnDockerContainer(roleGroup)
&& (cmdScript == null || cmdScript.getScript() == null)
&& commands.size() == 0) {
log.error(
"role.script is unavailable for {}. Commands will not be sent.",
roleName);
return response;
}
String scriptPath = null;
long timeout = 600L;
if (cmdScript != null) {
scriptPath = cmdScript.getScript();
timeout = cmdScript.getTimeout();
}
if (timeout == 0L) {
timeout = 600L;
}
if (!getComponentStatuses().containsKey(label)) {
// container is completed but still heart-beating, send terminate signal
log.info(
"Sending terminate signal to completed container (still heartbeating): {}",
label);
response.setTerminateAgent(true);
return response;
}
List<ComponentStatus> statuses = heartBeat.getComponentStatus();
if (statuses != null && !statuses.isEmpty()) {
log.info("status from agent: " + statuses.toString());
try {
for(ComponentStatus status : statuses){
RoleInstance role = null;
if(status.getIp() != null && !status.getIp().isEmpty()){
role = amState.getOwnedContainer(containerId);
role.ip = status.getIp();
}
if(status.getHostname() != null && !status.getHostname().isEmpty()){
role = amState.getOwnedContainer(containerId);
role.hostname = status.getHostname();
}
if (role != null) {
// create an updated service record (including hostname and ip) and publish...
ServiceRecord record = new ServiceRecord();
record.set(YarnRegistryAttributes.YARN_ID, containerId);
record.description = roleName;
record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
PersistencePolicies.CONTAINER);
// TODO: switch record attributes to use constants from YarnRegistryAttributes
// when it's been updated.
if (role.ip != null) {
record.set("yarn:ip", role.ip);
}
if (role.hostname != null) {
record.set("yarn:hostname", role.hostname);
}
yarnRegistry.putComponent(
RegistryPathUtils.encodeYarnID(containerId), record);
}
}
} catch (NoSuchNodeException e) {
// ignore - there is nothing to do if we don't find a container
log.warn("Owned container {} not found - {}", containerId, e);
} catch (IOException e) {
log.warn("Error updating container {} service record in registry",
containerId, e);
}
}
Boolean isMaster = isMaster(roleGroup);
ComponentInstanceState componentStatus = getComponentStatuses().get(label);
componentStatus.heartbeat(System.currentTimeMillis());
if (doUpgrade) {
switch (componentStatus.getState()) {
case STARTED:
componentStatus.setTargetState(State.UPGRADED);
break;
case UPGRADED:
componentStatus.setTargetState(State.STOPPED);
break;
case STOPPED:
componentStatus.setTargetState(State.TERMINATING);
break;
default:
break;
}
log.info("Current state = {} target state {}",
componentStatus.getState(), componentStatus.getTargetState());
}
if (appStopInitiated && !componentStatus.isStopInitiated()) {
log.info("Stop initiated for label {}", label);
componentStatus.setTargetState(State.STOPPED);
componentStatus.setStopInitiated(true);
}
publishConfigAndExportGroups(heartBeat, componentStatus, roleGroup);
CommandResult result = null;
List<CommandReport> reports = heartBeat.getReports();
if (SliderUtils.isNotEmpty(reports)) {
CommandReport report = reports.get(0);
Map<String, String> ports = report.getAllocatedPorts();
if (SliderUtils.isNotEmpty(ports)) {
processAllocatedPorts(heartBeat.getFqdn(), roleName, roleGroup, containerId, ports);
}
result = CommandResult.getCommandResult(report.getStatus());
Command command = Command.getCommand(report.getRoleCommand());
componentStatus.applyCommandResult(result, command, pkg);
log.info("Component operation. Status: {}; new container state: {};"
+ " new component state: {}", result,
componentStatus.getContainerState(), componentStatus.getState());
if (command == Command.INSTALL && SliderUtils.isNotEmpty(report.getFolders())) {
publishFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn());
}
}
int waitForCount = accessor.getInstanceDefinitionSnapshot().
getAppConfOperations().getComponentOptInt(roleGroup, AgentKeys.WAIT_HEARTBEAT, 0);
if (id < waitForCount) {
log.info("Waiting until heartbeat count {}. Current val: {}", waitForCount, id);
getComponentStatuses().put(label, componentStatus);
return response;
}
Command command = componentStatus.getNextCommand(doUpgrade);
try {
if (Command.NOP != command) {
log.debug("For comp {} pkg {} issuing {}", roleName,
componentStatus.getNextPkgToInstall(), command.toString());
if (command == Command.INSTALL) {
log.info("Installing {} on {}.", roleName, containerId);
if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){
addInstallDockerCommand(roleName, roleGroup, containerId,
response, null, timeout);
} else if (scriptPath != null) {
addInstallCommand(roleName, roleGroup, containerId, response,
scriptPath, null, timeout, null);
} else {
// commands
ComponentCommand installCmd = null;
for (ComponentCommand compCmd : commands) {
if (compCmd.getName().equals("INSTALL")) {
installCmd = compCmd;
}
}
addInstallCommand(roleName, roleGroup, containerId, response, null,
installCmd, timeout, null);
}
componentStatus.commandIssued(command);
} else if (command == Command.INSTALL_ADDON) {
String nextPkgToInstall = componentStatus.getNextPkgToInstall();
// retrieve scriptPath or command of that package for the component
for (ComponentsInAddonPackage comp : packageMetainfo
.get(nextPkgToInstall).getApplicationPackage().getComponents()) {
// given nextPkgToInstall and roleName is determined, the if below
// should only execute once per heartbeat
log.debug("Addon component: {} pkg: {} script: {}", comp.getName(),
nextPkgToInstall, comp.getCommandScript().getScript());
if (comp.getName().equals(roleGroup)
|| comp.getName().equals(AgentKeys.ADDON_FOR_ALL_COMPONENTS)) {
scriptPath = comp.getCommandScript().getScript();
if (scriptPath != null) {
addInstallCommand(roleName, roleGroup, containerId, response,
scriptPath, null, timeout, nextPkgToInstall);
} else {
ComponentCommand installCmd = null;
for (ComponentCommand compCmd : comp.getCommands()) {
if (compCmd.getName().equals("INSTALL")) {
installCmd = compCmd;
}
}
addInstallCommand(roleName, roleGroup, containerId, response,
null, installCmd, timeout, nextPkgToInstall);
}
}
}
componentStatus.commandIssued(command);
} else if (command == Command.START) {
// check against dependencies
boolean canExecute = commandOrder.canExecute(roleGroup, command, getComponentStatuses().values());
if (canExecute) {
log.info("Starting {} on {}.", roleName, containerId);
if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){
addStartDockerCommand(roleName, roleGroup, containerId,
response, null, timeout, false);
} else if (scriptPath != null) {
addStartCommand(roleName,
roleGroup,
containerId,
response,
scriptPath,
null,
null,
timeout,
isMarkedAutoRestart(roleGroup));
} else {
ComponentCommand startCmd = null;
for (ComponentCommand compCmd : commands) {
if (compCmd.getName().equals("START")) {
startCmd = compCmd;
}
}
ComponentCommand stopCmd = null;
for (ComponentCommand compCmd : commands) {
if (compCmd.getName().equals("STOP")) {
stopCmd = compCmd;
}
}
addStartCommand(roleName, roleGroup, containerId, response, null,
startCmd, stopCmd, timeout, false);
}
componentStatus.commandIssued(command);
} else {
log.info("Start of {} on {} delayed as dependencies have not started.", roleName, containerId);
}
} else if (command == Command.UPGRADE) {
addUpgradeCommand(roleName, roleGroup, containerId, response,
scriptPath, timeout);
componentStatus.commandIssued(command, true);
} else if (command == Command.STOP) {
log.info("Stop command being sent to container with id {}",
containerId);
addStopCommand(roleName, roleGroup, containerId, response, scriptPath,
timeout, doUpgrade);
componentStatus.commandIssued(command);
} else if (command == Command.TERMINATE) {
log.info("A formal terminate command is being sent to container {}"
+ " in state {}", label, componentStatus.getState());
response.setTerminateAgent(true);
}
}
// if there is no outstanding command then retrieve config
if (isMaster && componentStatus.getState() == State.STARTED
&& command == Command.NOP) {
if (!componentStatus.getConfigReported()) {
log.info("Requesting applied config for {} on {}.", roleName, containerId);
if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)){
addGetConfigDockerCommand(roleName, roleGroup, containerId, response);
} else {
addGetConfigCommand(roleName, roleGroup, containerId, response);
}
}
}
// if restart is required then signal
response.setRestartEnabled(false);
if (componentStatus.getState() == State.STARTED
&& command == Command.NOP && isMarkedAutoRestart(roleGroup)) {
response.setRestartEnabled(true);
}
//If INSTALL_FAILED and no INSTALL is scheduled let the agent fail
if (componentStatus.getState() == State.INSTALL_FAILED
&& command == Command.NOP) {
log.warn("Sending terminate signal to container that failed installation: {}", label);
response.setTerminateAgent(true);
}
} catch (SliderException e) {
log.warn("Component instance failed operation.", e);
componentStatus.applyCommandResult(CommandResult.FAILED, command, null);
}
log.debug("Heartbeat response: " + response);
return response;
}
private boolean isDockerContainer(String roleGroup) {
String type = getMetaInfo().getApplicationComponent(roleGroup).getType();
if (SliderUtils.isSet(type)) {
return type.toLowerCase().equals(SliderUtils.DOCKER) || type.toLowerCase().equals(SliderUtils.DOCKER_YARN);
}
return false;
}
private boolean isYarnDockerContainer(String roleGroup) {
String type = getMetaInfo().getApplicationComponent(roleGroup).getType();
if (SliderUtils.isSet(type)) {
return type.toLowerCase().equals(SliderUtils.DOCKER_YARN);
}
return false;
}
protected void processAllocatedPorts(String fqdn,
String roleName,
String roleGroup,
String containerId,
Map<String, String> ports) {
RoleInstance instance;
try {
instance = getAmState().getOwnedContainer(containerId);
} catch (NoSuchNodeException e) {
log.warn("Failed to locate instance of container {}", containerId, e);
instance = null;
}
for (Map.Entry<String, String> port : ports.entrySet()) {
String portname = port.getKey();
String portNo = port.getValue();
log.info("Recording allocated port for {} as {}", portname, portNo);
// add the allocated ports to the global list as well as per container list
// per container allocation will over-write each other in the global
this.getAllocatedPorts().put(portname, portNo);
this.getAllocatedPorts(containerId).put(portname, portNo);
if (instance != null) {
try {
// if the returned value is not a single port number then there are no
// meaningful way for Slider to use it during export
// No need to error out as it may not be the responsibility of the component
// to allocate port or the component may need an array of ports
instance.registerPortEndpoint(Integer.valueOf(portNo), portname);
} catch (NumberFormatException e) {
log.warn("Failed to parse {}", portNo, e);
}
}
}
processAndPublishComponentSpecificData(ports, containerId, fqdn, roleGroup);
processAndPublishComponentSpecificExports(ports, containerId, fqdn, roleName, roleGroup);
// and update registration entries
if (instance != null) {
queueAccess.put(new RegisterComponentInstance(instance.getId(),
roleName, roleGroup, 0, TimeUnit.MILLISECONDS));
}
}
private void updateComponentStatusWithAgentState(
ComponentInstanceState componentStatus, State agentState) {
if (agentState != null) {
componentStatus.setState(agentState);
}
}
@Override
public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) {
Map<String, MonitorDetail> details = super.buildMonitorDetails(clusterDesc);
buildRoleHostDetails(details);
return details;
}
@Override
public void applyInitialRegistryDefinitions(URL amWebURI,
URL agentOpsURI,
URL agentStatusURI,
ServiceRecord serviceRecord)
throws IOException {
super.applyInitialRegistryDefinitions(amWebURI,
agentOpsURI,
agentStatusURI,
serviceRecord);
try {
URL restURL = new URL(agentOpsURI, SLIDER_PATH_AGENTS);
URL agentStatusURL = new URL(agentStatusURI, SLIDER_PATH_AGENTS);
serviceRecord.addInternalEndpoint(
new Endpoint(CustomRegistryConstants.AGENT_SECURE_REST_API,
ProtocolTypes.PROTOCOL_REST,
restURL.toURI()));
serviceRecord.addInternalEndpoint(
new Endpoint(CustomRegistryConstants.AGENT_ONEWAY_REST_API,
ProtocolTypes.PROTOCOL_REST,
agentStatusURL.toURI()));
} catch (URISyntaxException e) {
throw new IOException(e);
}
// identify client component
Component client = null;
for (Component component : getMetaInfo().getApplication().getComponents()) {
if (component != null && component.getCategory().equals("CLIENT")) {
client = component;
break;
}
}
if (client == null) {
log.info("No client component specified, not publishing client configs");
return;
}
// register AM-generated client configs
ConfTreeOperations appConf = instanceDefinition.getAppConfOperations();
MapOperations clientOperations = appConf.getOrAddComponent(client.getName());
appConf.resolve();
if (!clientOperations.getOptionBool(AgentKeys.AM_CONFIG_GENERATION,
false)) {
log.info("AM config generation is false, not publishing client configs");
return;
}
// build and localize configuration files
Map<String, Map<String, String>> configurations = new TreeMap<String, Map<String, String>>();
Map<String, String> tokens = null;
try {
tokens = getStandardTokenMap(appConf, client.getName(), client.getName());
} catch (SliderException e) {
throw new IOException(e);
}
for (ConfigFile configFile : getMetaInfo()
.getComponentConfigFiles(client.getName())) {
addNamedConfiguration(configFile.getDictionaryName(),
appConf.getGlobalOptions().options, configurations, tokens, null,
client.getName());
if (appConf.getComponent(client.getName()) != null) {
addNamedConfiguration(configFile.getDictionaryName(),
appConf.getComponent(client.getName()).options, configurations,
tokens, null, client.getName());
}
}
//do a final replacement of re-used configs
dereferenceAllConfigs(configurations);
for (ConfigFile configFile : getMetaInfo()
.getComponentConfigFiles(client.getName())) {
ConfigFormat configFormat = ConfigFormat.resolve(configFile.getType());
Map<String, String> config = configurations.get(configFile.getDictionaryName());
ConfigUtils.prepConfigForTemplateOutputter(configFormat, config,
fileSystem, getClusterName(),
new File(configFile.getFileName()).getName());
PublishedConfiguration publishedConfiguration =
new PublishedConfiguration(configFile.getDictionaryName(),
config.entrySet());
getAmState().getPublishedSliderConfigurations().put(
configFile.getDictionaryName(), publishedConfiguration);
log.info("Publishing AM configuration {}", configFile.getDictionaryName());
}
}
@Override
public void notifyContainerCompleted(ContainerId containerId) {
// containers get allocated and free'ed without being assigned to any
// component - so many of the data structures may not be initialized
if (containerId != null) {
String containerIdStr = containerId.toString();
if (getComponentInstanceData().containsKey(containerIdStr)) {
getComponentInstanceData().remove(containerIdStr);
log.info("Removing container specific data for {}", containerIdStr);
publishComponentInstanceData();
}
if (this.allocatedPorts.containsKey(containerIdStr)) {
Map<String, String> portsByContainerId = getAllocatedPorts(containerIdStr);
this.allocatedPorts.remove(containerIdStr);
// free up the allocations from global as well
// if multiple containers allocate global ports then last one
// wins and similarly first one removes it - its not supported anyway
for(String portName : portsByContainerId.keySet()) {
getAllocatedPorts().remove(portName);
}
}
String componentName = null;
synchronized (this.componentStatuses) {
for (String label : getComponentStatuses().keySet()) {
if (label.startsWith(containerIdStr)) {
componentName = getRoleName(label);
log.info("Removing component status for label {}", label);
getComponentStatuses().remove(label);
}
}
}
tags.releaseTag(componentName, containerIdStr);
synchronized (this.containerExportsMap) {
Set<String> containerExportSets = containerExportsMap.get(containerIdStr);
if (containerExportSets != null) {
for (String containerExportStr : containerExportSets) {
String[] parts = containerExportStr.split(":");
Map<String, List<ExportEntry>> exportGroup = getCurrentExports(parts[0]);
List<ExportEntry> exports = exportGroup.get(parts[1]);
List<ExportEntry> exportToRemove = new ArrayList<ExportEntry>();
for (ExportEntry export : exports) {
if (containerIdStr.equals(export.getContainerId())) {
exportToRemove.add(export);
}
}
exports.removeAll(exportToRemove);
}
log.info("Removing container exports for {}", containerIdStr);
containerExportsMap.remove(containerIdStr);
}
}
}
}
/**
* Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default.
*
* @param instanceDefinition
*/
private void readAndSetHeartbeatMonitoringInterval(AggregateConf instanceDefinition) {
String hbMonitorInterval = instanceDefinition.getAppConfOperations().
getGlobalOptions().getOption(AgentKeys.HEARTBEAT_MONITOR_INTERVAL,
Integer.toString(DEFAULT_HEARTBEAT_MONITOR_INTERVAL));
try {
setHeartbeatMonitorInterval(Integer.parseInt(hbMonitorInterval));
} catch (NumberFormatException e) {
log.warn(
"Bad value {} for {}. Defaulting to ",
hbMonitorInterval,
HEARTBEAT_MONITOR_INTERVAL,
DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
}
}
/**
* Reads and sets the heartbeat monitoring interval. If bad value is provided then log it and set to default.
*
* @param instanceDefinition
*/
private void initializeAgentDebugCommands(AggregateConf instanceDefinition) {
String launchParameterStr = instanceDefinition.getAppConfOperations().
getGlobalOptions().getOption(AgentKeys.AGENT_INSTANCE_DEBUG_DATA, "");
agentLaunchParameter = new AgentLaunchParameter(launchParameterStr);
}
@VisibleForTesting
protected Map<String, ExportEntry> getLogFolderExports() {
return logFolderExports;
}
@VisibleForTesting
protected Map<String, ExportEntry> getWorkFolderExports() {
return workFolderExports;
}
@VisibleForTesting
protected Metainfo getMetaInfo() {
return this.metaInfo;
}
@VisibleForTesting
protected Map<String, ComponentInstanceState> getComponentStatuses() {
return componentStatuses;
}
@VisibleForTesting
protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem,
String appDef, boolean addonPackage) throws IOException,
BadConfigException {
return AgentUtils.getApplicationMetainfo(fileSystem, appDef, addonPackage);
}
@VisibleForTesting
protected Metainfo getApplicationMetainfo(SliderFileSystem fileSystem,
String appDef) throws IOException, BadConfigException {
return getApplicationMetainfo(fileSystem, appDef, false);
}
@VisibleForTesting
protected void setHeartbeatMonitorInterval(int heartbeatMonitorInterval) {
this.heartbeatMonitorInterval = heartbeatMonitorInterval;
}
public void setInUpgradeMode(boolean inUpgradeMode) {
this.isInUpgradeMode = inUpgradeMode;
}
public void addUpgradeContainers(Set<String> upgradeContainers) {
this.upgradeContainers.addAll(upgradeContainers);
}
public void setAppStopInitiated(boolean appStopInitiated) {
this.appStopInitiated = appStopInitiated;
}
/**
* Read all default configs
*
* @param fileSystem fs
* @param appDef app default path
* @param metainfo metadata
*
* @return configuration maps
*
* @throws IOException
*/
protected Map<String, DefaultConfig> initializeDefaultConfigs(SliderFileSystem fileSystem,
String appDef, Metainfo metainfo) throws IOException {
Map<String, DefaultConfig> defaultConfigMap = new HashMap<>();
if (SliderUtils.isNotEmpty(metainfo.getApplication().getConfigFiles())) {
for (ConfigFile configFile : metainfo.getApplication().getConfigFiles()) {
DefaultConfig config = null;
try {
config = AgentUtils.getDefaultConfig(fileSystem, appDef, configFile.getDictionaryName() + ".xml");
} catch (IOException e) {
log.warn("Default config file not found. Only the config as input during create will be applied for {}",
configFile.getDictionaryName());
}
if (config != null) {
defaultConfigMap.put(configFile.getDictionaryName(), config);
}
}
}
return defaultConfigMap;
}
protected Map<String, DefaultConfig> getDefaultConfigs() {
return defaultConfigs;
}
private int getHeartbeatMonitorInterval() {
return this.heartbeatMonitorInterval;
}
private String getClusterName() {
if (SliderUtils.isUnset(clusterName)) {
clusterName = getAmState().getInternalsSnapshot().get(OptionKeys.APPLICATION_NAME);
}
return clusterName;
}
/**
* Publish a named property bag that may contain name-value pairs for app configurations such as hbase-site
*
* @param name
* @param description
* @param entries
*/
protected void publishApplicationInstanceData(String name, String description,
Iterable<Map.Entry<String, String>> entries) {
PublishedConfiguration pubconf = new PublishedConfiguration();
pubconf.description = description;
pubconf.putValues(entries);
log.info("publishing {}", pubconf);
getAmState().getPublishedSliderConfigurations().put(name, pubconf);
}
/**
* Get a list of all hosts for all role/container per role
*
* @return the map of role->node
*/
protected Map<String, Map<String, ClusterNode>> getRoleClusterNodeMapping() {
return amState.getRoleClusterNodeMapping();
}
private String getContainerLabel(Container container, String role, String group) {
if (role.equals(group)) {
return container.getId().toString() + LABEL_MAKER + role;
} else {
return container.getId().toString() + LABEL_MAKER + role + LABEL_MAKER +
group;
}
}
protected String getClusterInfoPropertyValue(String name) {
StateAccessForProviders accessor = getAmState();
assert accessor.isApplicationLive();
ClusterDescription description = accessor.getClusterStatus();
return description.getInfo(name);
}
protected String getClusterOptionPropertyValue(String name)
throws BadConfigException {
StateAccessForProviders accessor = getAmState();
assert accessor.isApplicationLive();
ClusterDescription description = accessor.getClusterStatus();
return description.getMandatoryOption(name);
}
/**
* Lost heartbeat from the container - release it and ask for a replacement (async operation)
*
* @param label
* @param containerId
*/
protected void lostContainer(
String label,
ContainerId containerId) {
getComponentStatuses().remove(label);
getQueueAccess().put(new ProviderReportedContainerLoss(containerId));
}
/**
* Build the provider status, can be empty
*
* @return the provider status - map of entries to add to the info section
*/
public Map<String, String> buildProviderStatus() {
Map<String, String> stats = new HashMap<String, String>();
return stats;
}
/**
* Format the folder locations and publish in the registry service
*
* @param folders
* @param containerId
* @param hostFqdn
* @param componentName
*/
protected void publishFolderPaths(
Map<String, String> folders, String containerId, String componentName, String hostFqdn) {
Date now = new Date();
for (Map.Entry<String, String> entry : folders.entrySet()) {
ExportEntry exportEntry = new ExportEntry();
exportEntry.setValue(String.format(HOST_FOLDER_FORMAT, hostFqdn, entry.getValue()));
exportEntry.setContainerId(containerId);
exportEntry.setLevel(COMPONENT_TAG);
exportEntry.setTag(componentName);
exportEntry.setUpdatedTime(now.toString());
if (entry.getKey().equals("AGENT_LOG_ROOT")) {
synchronized (logFolderExports) {
getLogFolderExports().put(containerId, exportEntry);
}
} else {
synchronized (workFolderExports) {
getWorkFolderExports().put(containerId, exportEntry);
}
}
log.info("Updating log and pwd folders for container {}", containerId);
}
PublishedExports exports = new PublishedExports(CONTAINER_LOGS_TAG);
exports.setUpdated(now.getTime());
synchronized (logFolderExports) {
updateExportsFromList(exports, getLogFolderExports());
}
getAmState().getPublishedExportsSet().put(CONTAINER_LOGS_TAG, exports);
exports = new PublishedExports(CONTAINER_PWDS_TAG);
exports.setUpdated(now.getTime());
synchronized (workFolderExports) {
updateExportsFromList(exports, getWorkFolderExports());
}
getAmState().getPublishedExportsSet().put(CONTAINER_PWDS_TAG, exports);
}
/**
* Update the export data from the map
* @param exports
* @param folderExports
*/
private void updateExportsFromList(PublishedExports exports, Map<String, ExportEntry> folderExports) {
Map<String, List<ExportEntry>> perComponentList = new HashMap<String, List<ExportEntry>>();
for(Map.Entry<String, ExportEntry> logEntry : folderExports.entrySet())
{
String componentName = logEntry.getValue().getTag();
if (!perComponentList.containsKey(componentName)) {
perComponentList.put(componentName, new ArrayList<ExportEntry>());
}
perComponentList.get(componentName).add(logEntry.getValue());
}
exports.putValues(perComponentList.entrySet());
}
/**
* Process return status for component instances
*
* @param heartBeat
* @param componentStatus
*/
protected void publishConfigAndExportGroups(HeartBeat heartBeat,
ComponentInstanceState componentStatus, String componentGroup) {
List<ComponentStatus> statuses = heartBeat.getComponentStatus();
if (statuses != null && !statuses.isEmpty()) {
log.info("Processing {} status reports.", statuses.size());
for (ComponentStatus status : statuses) {
log.info("Status report: {}", status.toString());
if (status.getConfigs() != null) {
Application application = getMetaInfo().getApplication();
if ((!canAnyMasterPublishConfig() || canPublishConfig(componentGroup)) &&
!instanceDefinition.getAppConfOperations().getComponentOptBool(
componentGroup, AgentKeys.AM_CONFIG_GENERATION, false)) {
// If no Master can explicitly publish then publish if its a master
// Otherwise, wait till the master that can publish is ready
Set<String> exportedConfigs = new HashSet();
String exportedConfigsStr = application.getExportedConfigs();
boolean exportedAllConfigs = exportedConfigsStr == null || exportedConfigsStr.isEmpty();
if (!exportedAllConfigs) {
for (String exportedConfig : exportedConfigsStr.split(",")) {
if (exportedConfig.trim().length() > 0) {
exportedConfigs.add(exportedConfig.trim());
}
}
}
for (String key : status.getConfigs().keySet()) {
if ((!exportedAllConfigs && exportedConfigs.contains(key)) ||
exportedAllConfigs) {
Map<String, String> configs = status.getConfigs().get(key);
publishApplicationInstanceData(key, key, configs.entrySet());
}
}
}
List<ExportGroup> appExportGroups = application.getExportGroups();
boolean hasExportGroups = SliderUtils.isNotEmpty(appExportGroups);
Set<String> appExports = new HashSet();
String appExportsStr = getApplicationComponent(componentGroup).getAppExports();
if (SliderUtils.isSet(appExportsStr)) {
for (String appExport : appExportsStr.split(",")) {
if (!appExport.trim().isEmpty()) {
appExports.add(appExport.trim());
}
}
}
if (hasExportGroups && !appExports.isEmpty()) {
String configKeyFormat = "${site.%s.%s}";
String hostKeyFormat = "${%s_HOST}";
// publish export groups if any
Map<String, String> replaceTokens = new HashMap<String, String>();
for (Map.Entry<String, Map<String, ClusterNode>> entry : getRoleClusterNodeMapping().entrySet()) {
String hostName = getHostsList(entry.getValue().values(), true).iterator().next();
replaceTokens.put(String.format(hostKeyFormat, entry.getKey().toUpperCase(Locale.ENGLISH)), hostName);
}
for (String key : status.getConfigs().keySet()) {
Map<String, String> configs = status.getConfigs().get(key);
for (String configKey : configs.keySet()) {
String lookupKey = String.format(configKeyFormat, key, configKey);
replaceTokens.put(lookupKey, configs.get(configKey));
}
}
Set<String> modifiedGroups = new HashSet<String>();
for (ExportGroup exportGroup : appExportGroups) {
List<Export> exports = exportGroup.getExports();
if (SliderUtils.isNotEmpty(exports)) {
String exportGroupName = exportGroup.getName();
ConcurrentHashMap<String, List<ExportEntry>> map =
(ConcurrentHashMap<String, List<ExportEntry>>)getCurrentExports(exportGroupName);
for (Export export : exports) {
if (canBeExported(exportGroupName, export.getName(), appExports)) {
String value = export.getValue();
// replace host names
for (String token : replaceTokens.keySet()) {
if (value.contains(token)) {
value = value.replace(token, replaceTokens.get(token));
}
}
ExportEntry entry = new ExportEntry();
entry.setLevel(APPLICATION_TAG);
entry.setValue(value);
entry.setUpdatedTime(new Date().toString());
// over-write, app exports are singletons
map.put(export.getName(), new ArrayList(Arrays.asList(entry)));
log.info("Preparing to publish. Key {} and Value {}", export.getName(), value);
}
}
modifiedGroups.add(exportGroupName);
}
}
publishModifiedExportGroups(modifiedGroups);
}
log.info("Received and processed config for {}", heartBeat.getHostname());
componentStatus.setConfigReported(true);
}
}
}
}
private boolean canBeExported(String exportGroupName, String name, Set<String> appExports) {
return appExports.contains(String.format("%s-%s", exportGroupName, name));
}
protected Map<String, List<ExportEntry>> getCurrentExports(String groupName) {
if (!this.exportGroups.containsKey(groupName)) {
synchronized (this.exportGroups) {
if (!this.exportGroups.containsKey(groupName)) {
this.exportGroups.put(groupName, new ConcurrentHashMap<String, List<ExportEntry>>());
}
}
}
return this.exportGroups.get(groupName);
}
private void publishModifiedExportGroups(Set<String> modifiedGroups) {
for (String groupName : modifiedGroups) {
Map<String, List<ExportEntry>> entries = this.exportGroups.get(groupName);
// Publish in old format for the time being
Map<String, String> simpleEntries = new HashMap<String, String>();
for (Map.Entry<String, List<ExportEntry>> entry : entries.entrySet()) {
List<ExportEntry> exports = entry.getValue();
if (SliderUtils.isNotEmpty(exports)) {
// there is no support for multiple exports per name - so extract only the first one
simpleEntries.put(entry.getKey(), entry.getValue().get(0).getValue());
}
}
if (!instanceDefinition.getAppConfOperations().getComponentOptBool(
groupName, AgentKeys.AM_CONFIG_GENERATION, false)) {
publishApplicationInstanceData(groupName, groupName,
simpleEntries.entrySet());
}
PublishedExports exports = new PublishedExports(groupName);
exports.setUpdated(new Date().getTime());
exports.putValues(entries.entrySet());
getAmState().getPublishedExportsSet().put(groupName, exports);
}
}
/** Publish component instance specific data if the component demands it */
protected void processAndPublishComponentSpecificData(Map<String, String> ports,
String containerId,
String hostFqdn,
String componentGroup) {
String portVarFormat = "${site.%s}";
String hostNamePattern = "${THIS_HOST}";
Map<String, String> toPublish = new HashMap<String, String>();
Application application = getMetaInfo().getApplication();
for (Component component : application.getComponents()) {
if (component.getName().equals(componentGroup)) {
if (component.getComponentExports().size() > 0) {
for (ComponentExport export : component.getComponentExports()) {
String templateToExport = export.getValue();
for (String portName : ports.keySet()) {
boolean publishData = false;
String portValPattern = String.format(portVarFormat, portName);
if (templateToExport.contains(portValPattern)) {
templateToExport = templateToExport.replace(portValPattern, ports.get(portName));
publishData = true;
}
if (templateToExport.contains(hostNamePattern)) {
templateToExport = templateToExport.replace(hostNamePattern, hostFqdn);
publishData = true;
}
if (publishData) {
toPublish.put(export.getName(), templateToExport);
log.info("Publishing {} for name {} and container {}",
templateToExport, export.getName(), containerId);
}
}
}
}
}
}
if (toPublish.size() > 0) {
Map<String, String> perContainerData = null;
if (!getComponentInstanceData().containsKey(containerId)) {
perContainerData = new ConcurrentHashMap<String, String>();
} else {
perContainerData = getComponentInstanceData().get(containerId);
}
perContainerData.putAll(toPublish);
getComponentInstanceData().put(containerId, perContainerData);
publishComponentInstanceData();
}
}
/** Publish component instance specific data if the component demands it */
protected void processAndPublishComponentSpecificExports(Map<String, String> ports,
String containerId,
String hostFqdn,
String compName,
String compGroup) {
String portVarFormat = "${site.%s}";
String hostNamePattern = "${" + compGroup + "_HOST}";
List<ExportGroup> appExportGroups = getMetaInfo().getApplication().getExportGroups();
Component component = getMetaInfo().getApplicationComponent(compGroup);
if (component != null && SliderUtils.isSet(component.getCompExports())
&& SliderUtils.isNotEmpty(appExportGroups)) {
Set<String> compExports = new HashSet();
String compExportsStr = component.getCompExports();
for (String compExport : compExportsStr.split(",")) {
if (!compExport.trim().isEmpty()) {
compExports.add(compExport.trim());
}
}
Date now = new Date();
Set<String> modifiedGroups = new HashSet<String>();
for (ExportGroup exportGroup : appExportGroups) {
List<Export> exports = exportGroup.getExports();
if (SliderUtils.isNotEmpty(exports)) {
String exportGroupName = exportGroup.getName();
ConcurrentHashMap<String, List<ExportEntry>> map =
(ConcurrentHashMap<String, List<ExportEntry>>) getCurrentExports(exportGroupName);
for (Export export : exports) {
if (canBeExported(exportGroupName, export.getName(), compExports)) {
log.info("Attempting to publish {} of group {} for component type {}",
export.getName(), exportGroupName, compName);
String templateToExport = export.getValue();
for (String portName : ports.keySet()) {
boolean publishData = false;
String portValPattern = String.format(portVarFormat, portName);
if (templateToExport.contains(portValPattern)) {
templateToExport = templateToExport.replace(portValPattern, ports.get(portName));
publishData = true;
}
if (templateToExport.contains(hostNamePattern)) {
templateToExport = templateToExport.replace(hostNamePattern, hostFqdn);
publishData = true;
}
if (publishData) {
ExportEntry entryToAdd = new ExportEntry();
entryToAdd.setLevel(COMPONENT_TAG);
entryToAdd.setValue(templateToExport);
entryToAdd.setUpdatedTime(now.toString());
entryToAdd.setContainerId(containerId);
entryToAdd.setTag(tags.getTag(compName, containerId));
List<ExportEntry> existingList =
map.putIfAbsent(export.getName(), new CopyOnWriteArrayList(Arrays.asList(entryToAdd)));
// in-place edit, no lock needed
if (existingList != null) {
boolean updatedInPlace = false;
for (ExportEntry entry : existingList) {
if (containerId.toLowerCase(Locale.ENGLISH)
.equals(entry.getContainerId())) {
entryToAdd.setValue(templateToExport);
entryToAdd.setUpdatedTime(now.toString());
updatedInPlace = true;
}
}
if (!updatedInPlace) {
existingList.add(entryToAdd);
}
}
log.info("Publishing {} for name {} and container {}",
templateToExport, export.getName(), containerId);
modifiedGroups.add(exportGroupName);
synchronized (containerExportsMap) {
if (!containerExportsMap.containsKey(containerId)) {
containerExportsMap.put(containerId, new HashSet<String>());
}
Set<String> containerExportMaps = containerExportsMap.get(containerId);
containerExportMaps.add(String.format("%s:%s", exportGroupName, export.getName()));
}
}
}
}
}
}
}
publishModifiedExportGroups(modifiedGroups);
}
}
private void publishComponentInstanceData() {
Map<String, String> dataToPublish = new HashMap<String, String>();
for (String container : getComponentInstanceData().keySet()) {
for (String prop : getComponentInstanceData().get(container).keySet()) {
dataToPublish.put(
container + "." + prop, getComponentInstanceData().get(container).get(prop));
}
}
publishApplicationInstanceData(COMPONENT_DATA_TAG, COMPONENT_DATA_TAG, dataToPublish.entrySet());
}
/**
* Return Component based on group
*
* @param roleGroup component group
*
* @return the component entry or null for no match
*/
protected Component getApplicationComponent(String roleGroup) {
return getMetaInfo().getApplicationComponent(roleGroup);
}
/**
* Extract script path from the application metainfo
*
* @param roleGroup component group
* @return the script path or null for no match
*/
protected CommandScript getScriptPathForMasterPackage(String roleGroup) {
Component component = getApplicationComponent(roleGroup);
if (component != null) {
return component.getCommandScript();
}
return null;
}
/**
* Is the role of type MASTER
*
* @param roleGroup component group
*
* @return true if the role category is MASTER
*/
protected boolean isMaster(String roleGroup) {
Component component = getApplicationComponent(roleGroup);
if (component != null) {
if (component.getCategory().equals("MASTER")) {
return true;
}
}
return false;
}
/**
* Can the role publish configuration
*
* @param roleGroup component group
*
* @return true if it can be pubished
*/
protected boolean canPublishConfig(String roleGroup) {
Component component = getApplicationComponent(roleGroup);
if (component != null) {
return Boolean.TRUE.toString().equals(component.getPublishConfig());
}
return false;
}
/**
* Checks if the role is marked auto-restart
*
* @param roleGroup component group
*
* @return true if it is auto-restart
*/
protected boolean isMarkedAutoRestart(String roleGroup) {
Component component = getApplicationComponent(roleGroup);
if (component != null) {
return component.getAutoStartOnFailureBoolean();
}
return false;
}
/**
* Can any master publish config explicitly, if not a random master is used
*
* @return true if the condition holds
*/
protected boolean canAnyMasterPublishConfig() {
if (canAnyMasterPublish == null) {
Application application = getMetaInfo().getApplication();
if (application == null) {
log.error("Malformed app definition: Expect application as root element in the metainfo.xml");
} else {
for (Component component : application.getComponents()) {
if (Boolean.TRUE.toString().equals(component.getPublishConfig()) &&
component.getCategory().equals("MASTER")) {
canAnyMasterPublish = true;
}
}
}
}
if (canAnyMasterPublish == null) {
canAnyMasterPublish = false;
}
return canAnyMasterPublish;
}
private String getRoleName(String label) {
int index1 = label.indexOf(LABEL_MAKER);
int index2 = label.lastIndexOf(LABEL_MAKER);
if (index1 == index2) {
return label.substring(index1 + LABEL_MAKER.length());
} else {
return label.substring(index1 + LABEL_MAKER.length(), index2);
}
}
private String getRoleGroup(String label) {
return label.substring(label.lastIndexOf(LABEL_MAKER) + LABEL_MAKER.length());
}
private String getContainerId(String label) {
return label.substring(0, label.indexOf(LABEL_MAKER));
}
/**
* Add install command to the heartbeat response
*
* @param roleName
* @param roleGroup
* @param containerId
* @param response
* @param scriptPath
* @param pkg
* when this field is null, it indicates the command is for the
* master package; while not null, for the package named by this
* field
* @throws SliderException
*/
@VisibleForTesting
protected void addInstallCommand(String roleName,
String roleGroup,
String containerId,
HeartBeatResponse response,
String scriptPath,
ComponentCommand compCmd,
long timeout,
String pkg)
throws SliderException {
assert getAmState().isApplicationLive();
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
ExecutionCommand cmd = new ExecutionCommand(AgentCommandType.EXECUTION_COMMAND);
prepareExecutionCommand(cmd);
String clusterName = getClusterName();
cmd.setClusterName(clusterName);
cmd.setRoleCommand(Command.INSTALL.toString());
cmd.setServiceName(clusterName);
cmd.setComponentName(roleName);
cmd.setRole(roleName);
cmd.setPkg(pkg);
Map<String, String> hostLevelParams = new TreeMap<String, String>();
hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions().getOption(JAVA_HOME, getJDKDir()));
hostLevelParams.put(PACKAGE_LIST, getPackageList());
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
Map<String, Map<String, String>> configurations =
buildCommandConfigurations(appConf, containerId, roleName, roleGroup);
cmd.setConfigurations(configurations);
Map<String, Map<String, String>> componentConfigurations = buildComponentConfigurations(appConf);
cmd.setComponentConfigurations(componentConfigurations);
if (SliderUtils.isSet(scriptPath)) {
cmd.setCommandParams(commandParametersSet(scriptPath, timeout, false));
} else {
// assume it to be default shell command
ComponentCommand effectiveCommand = compCmd;
if (effectiveCommand == null) {
effectiveCommand = ComponentCommand.getDefaultComponentCommand("INSTALL");
}
cmd.setCommandParams(commandParametersSet(effectiveCommand, timeout, false));
configurations.get("global").put("exec_cmd", effectiveCommand.getExec());
}
cmd.setHostname(getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME));
response.addExecutionCommand(cmd);
log.debug("command looks like: {} ", cmd);
}
@VisibleForTesting
protected void addInstallDockerCommand(String roleName,
String roleGroup,
String containerId,
HeartBeatResponse response,
ComponentCommand compCmd,
long timeout)
throws SliderException {
assert getAmState().isApplicationLive();
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
ExecutionCommand cmd = new ExecutionCommand(AgentCommandType.EXECUTION_COMMAND);
prepareExecutionCommand(cmd);
String clusterName = getClusterName();
cmd.setClusterName(clusterName);
cmd.setRoleCommand(Command.INSTALL.toString());
cmd.setServiceName(clusterName);
cmd.setComponentName(roleName);
cmd.setRole(roleName);
Map<String, String> hostLevelParams = new TreeMap<String, String>();
hostLevelParams.put(PACKAGE_LIST, getPackageList());
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
appConf, containerId, roleName, roleGroup);
cmd.setConfigurations(configurations);
Map<String, Map<String, String>> componentConfigurations = buildComponentConfigurations(appConf);
cmd.setComponentConfigurations(componentConfigurations);
ComponentCommand effectiveCommand = compCmd;
if (compCmd == null) {
effectiveCommand = new ComponentCommand();
effectiveCommand.setName("INSTALL");
effectiveCommand.setExec("DEFAULT");
}
cmd.setCommandParams(setCommandParameters(effectiveCommand, timeout, false));
configurations.get("global").put("exec_cmd", effectiveCommand.getExec());
cmd.setHostname(getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME));
cmd.addContainerDetails(roleGroup, getMetaInfo());
Map<String, String> dockerConfig = new HashMap<String, String>();
if(isYarnDockerContainer(roleGroup)){
//put nothing
cmd.setYarnDockerMode(true);
} else {
dockerConfig.put(
"docker.command_path",
getConfigFromMetaInfoWithAppConfigOverriding(roleGroup,
"commandPath"));
dockerConfig.put("docker.image_name",
getConfigFromMetaInfo(roleGroup, "image"));
}
configurations.put("docker", dockerConfig);
log.debug("Docker- command: {}", cmd.toString());
response.addExecutionCommand(cmd);
}
private Map<String, String> setCommandParameters(String scriptPath,
long timeout, boolean recordConfig) {
Map<String, String> cmdParams = new TreeMap<String, String>();
cmdParams.put("service_package_folder",
"${AGENT_WORK_ROOT}/work/app/definition/package");
cmdParams.put("script", scriptPath);
cmdParams.put("schema_version", "2.0");
cmdParams.put("command_timeout", Long.toString(timeout));
cmdParams.put("script_type", AbstractComponent.TYPE_PYTHON);
cmdParams.put("record_config", Boolean.toString(recordConfig));
return cmdParams;
}
private Map<String, String> setCommandParameters(ComponentCommand compCmd,
long timeout, boolean recordConfig) {
Map<String, String> cmdParams = new TreeMap<String, String>();
cmdParams.put("service_package_folder",
"${AGENT_WORK_ROOT}/work/app/definition/package");
cmdParams.put("command", compCmd.getExec());
cmdParams.put("schema_version", "2.0");
cmdParams.put("command_timeout", Long.toString(timeout));
cmdParams.put("script_type", compCmd.getType());
cmdParams.put("record_config", Boolean.toString(recordConfig));
return cmdParams;
}
private Map<String, Map<String, String>> buildComponentConfigurations(
ConfTreeOperations appConf) {
return appConf.getComponents();
}
protected static String getPackageListFromApplication(Application application) {
String pkgFormatString = "{\"type\":\"%s\",\"name\":\"%s\"}";
String pkgListFormatString = "[%s]";
List<String> packages = new ArrayList<>();
if (application != null) {
if (application.getPackages().size() > 0) {
// no-op if there are packages that are not OS-specific, as these
// will be localized by AM rather than the Agent
// this should be backwards compatible, as there was previously an
// XML parsing bug that ensured non-OS-specific packages did not exist
} else {
List<OSSpecific> osSpecifics = application.getOSSpecifics();
if (osSpecifics != null && osSpecifics.size() > 0) {
for (OSSpecific osSpecific : osSpecifics) {
if (osSpecific.getOsType().equals("any")) {
for (OSPackage osPackage : osSpecific.getPackages()) {
packages.add(String.format(pkgFormatString, osPackage.getType(), osPackage.getName()));
}
}
}
}
}
}
if (!packages.isEmpty()) {
return "[" + SliderUtils.join(packages, ",", false) + "]";
} else {
return "[]";
}
}
private String getPackageList() {
return getPackageListFromApplication(getMetaInfo().getApplication());
}
private void prepareExecutionCommand(ExecutionCommand cmd) {
cmd.setTaskId(taskId.incrementAndGet());
cmd.setCommandId(cmd.getTaskId() + "-1");
}
private Map<String, String> commandParametersSet(String scriptPath, long timeout, boolean recordConfig) {
Map<String, String> cmdParams = new TreeMap<>();
cmdParams.put("service_package_folder",
"${AGENT_WORK_ROOT}/work/app/definition/package");
cmdParams.put("script", scriptPath);
cmdParams.put("schema_version", "2.0");
cmdParams.put("command_timeout", Long.toString(timeout));
cmdParams.put("script_type", "PYTHON");
cmdParams.put("record_config", Boolean.toString(recordConfig));
return cmdParams;
}
private Map<String, String> commandParametersSet(ComponentCommand compCmd, long timeout, boolean recordConfig) {
Map<String, String> cmdParams = new TreeMap<>();
cmdParams.put("service_package_folder",
"${AGENT_WORK_ROOT}/work/app/definition/package");
cmdParams.put("command", compCmd.getExec());
cmdParams.put("schema_version", "2.0");
cmdParams.put("command_timeout", Long.toString(timeout));
cmdParams.put("script_type", compCmd.getType());
cmdParams.put("record_config", Boolean.toString(recordConfig));
return cmdParams;
}
@VisibleForTesting
protected void addStatusCommand(String roleName,
String roleGroup,
String containerId,
HeartBeatResponse response,
String scriptPath,
long timeout)
throws SliderException {
assert getAmState().isApplicationLive();
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
if (isDockerContainer(roleGroup) || isYarnDockerContainer(roleGroup)) {
addStatusDockerCommand(roleName, roleGroup, containerId, response,
scriptPath, timeout);
return;
}
StatusCommand cmd = new StatusCommand();
String clusterName = getClusterName();
cmd.setCommandType(AgentCommandType.STATUS_COMMAND);
cmd.setComponentName(roleName);
cmd.setServiceName(clusterName);
cmd.setClusterName(clusterName);
cmd.setRoleCommand(StatusCommand.STATUS_COMMAND);
Map<String, String> hostLevelParams = new TreeMap<String, String>();
hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions().getOption(JAVA_HOME, getJDKDir()));
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
cmd.setCommandParams(commandParametersSet(scriptPath, timeout, false));
Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId, roleName, roleGroup);
cmd.setConfigurations(configurations);
response.addStatusCommand(cmd);
}
@VisibleForTesting
protected void addStatusDockerCommand(String roleName,
String roleGroup,
String containerId,
HeartBeatResponse response,
String scriptPath,
long timeout)
throws SliderException {
assert getAmState().isApplicationLive();
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
StatusCommand cmd = new StatusCommand();
String clusterName = getClusterName();
cmd.setCommandType(AgentCommandType.STATUS_COMMAND);
cmd.setComponentName(roleName);
cmd.setServiceName(clusterName);
cmd.setClusterName(clusterName);
cmd.setRoleCommand(StatusCommand.STATUS_COMMAND);
Map<String, String> hostLevelParams = new TreeMap<String, String>();
hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions().getMandatoryOption(JAVA_HOME));
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
cmd.setCommandParams(setCommandParameters(scriptPath, timeout, false));
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
appConf, containerId, roleName, roleGroup);
Map<String, String> dockerConfig = new HashMap<String, String>();
String statusCommand = getConfigFromMetaInfoWithAppConfigOverriding(roleGroup, "statusCommand");
if (statusCommand == null) {
if(isYarnDockerContainer(roleGroup)){
//should complain the required field is null
cmd.setYarnDockerMode(true);
} else {
statusCommand = "docker top "
+ containerId
+ " | grep \"\"";// default value
}
}
dockerConfig.put("docker.status_command",statusCommand);
configurations.put("docker", dockerConfig);
cmd.setConfigurations(configurations);
log.debug("Docker- status {}", cmd);
response.addStatusCommand(cmd);
}
@VisibleForTesting
protected void addGetConfigDockerCommand(String roleName, String roleGroup,
String containerId, HeartBeatResponse response) throws SliderException {
assert getAmState().isApplicationLive();
StatusCommand cmd = new StatusCommand();
String clusterName = getClusterName();
cmd.setCommandType(AgentCommandType.STATUS_COMMAND);
cmd.setComponentName(roleName);
cmd.setServiceName(clusterName);
cmd.setClusterName(clusterName);
cmd.setRoleCommand(StatusCommand.GET_CONFIG_COMMAND);
Map<String, String> hostLevelParams = new TreeMap<String, String>();
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
hostLevelParams.put(CONTAINER_ID, containerId);
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
appConf, containerId, roleName, roleGroup);
Map<String, String> dockerConfig = new HashMap<String, String>();
String statusCommand = getConfigFromMetaInfoWithAppConfigOverriding(roleGroup, "statusCommand");
if (statusCommand == null) {
if(isYarnDockerContainer(roleGroup)){
//should complain the required field is null
cmd.setYarnDockerMode(true);
} else {
statusCommand = "docker top "
+ containerId
+ " | grep \"\"";// default value
}
}
dockerConfig.put("docker.status_command",statusCommand);
configurations.put("docker", dockerConfig);
cmd.setConfigurations(configurations);
log.debug("Docker- getconfig command {}", cmd);
response.addStatusCommand(cmd);
}
private String getConfigFromMetaInfoWithAppConfigOverriding(String roleGroup,
String configName){
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
String containerName = getMetaInfo().getApplicationComponent(roleGroup)
.getDockerContainers().get(0).getName();
String composedConfigName = null;
String appConfigValue = null;
//if the configName is about port , mount, inputfile, then check differently
if (configName.equals("containerPort") || configName.equals("hostPort")){
composedConfigName = containerName + ".ports." + configName;
} else
if (configName.equals("containerMount")
|| configName.equals("hostMount")){
composedConfigName = containerName + ".mounts." + configName;
} else
if (configName.equals("containerPath")
|| configName.equals("fileLocalPath")) {
composedConfigName = containerName + ".inputFiles." + configName;
} else {
composedConfigName = containerName + "." + configName;
}
appConfigValue = appConf.getComponentOpt(roleGroup, composedConfigName,
null);
log.debug(
"Docker- value from appconfig component: {} configName: {} value: {}",
roleGroup, composedConfigName, appConfigValue);
if (appConfigValue == null) {
appConfigValue = getConfigFromMetaInfo(roleGroup, configName);
log.debug(
"Docker- value from metainfo component: {} configName: {} value: {}",
roleGroup, configName, appConfigValue);
}
return appConfigValue;
}
@VisibleForTesting
protected void addStartDockerCommand(String roleName, String roleGroup,
String containerId, HeartBeatResponse response,
ComponentCommand startCommand, long timeout, boolean isMarkedAutoRestart)
throws
SliderException {
assert getAmState().isApplicationLive();
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot();
ExecutionCommand cmd = new ExecutionCommand(AgentCommandType.EXECUTION_COMMAND);
prepareExecutionCommand(cmd);
String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME);
String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME);
cmd.setHostname(hostName);
cmd.setClusterName(clusterName);
cmd.setRoleCommand(Command.START.toString());
cmd.setServiceName(clusterName);
cmd.setComponentName(roleName);
cmd.setRole(roleName);
Map<String, String> hostLevelParams = new TreeMap<>();
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
Map<String, String> roleParams = new TreeMap<>();
cmd.setRoleParams(roleParams);
cmd.getRoleParams().put("auto_restart", Boolean.toString(isMarkedAutoRestart));
startCommand = new ComponentCommand();
startCommand.setName("START");
startCommand.setType("docker");
startCommand.setExec("exec");
cmd.setCommandParams(setCommandParameters(startCommand, timeout, true));
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
appConf, containerId, roleName, roleGroup);
Map<String, Map<String, String>> componentConfigurations = buildComponentConfigurations(appConf);
cmd.setComponentConfigurations(componentConfigurations);
Map<String, String> dockerConfig = new HashMap<String, String>();
if (isYarnDockerContainer(roleGroup)) {
dockerConfig.put(
"docker.startCommand",
getConfigFromMetaInfoWithAppConfigOverriding(roleGroup,
"start_command"));
cmd.setYarnDockerMode(true);
} else {
dockerConfig.put(
"docker.command_path",
getConfigFromMetaInfoWithAppConfigOverriding(roleGroup,
"commandPath"));
dockerConfig.put("docker.image_name",
getConfigFromMetaInfo(roleGroup, "image"));
// options should always have -d
String options = getConfigFromMetaInfoWithAppConfigOverriding(
roleGroup, "options");
if(options != null && !options.isEmpty()){
options = options + " -d";
} else {
options = "-d";
}
dockerConfig.put("docker.options", options);
// options should always have -d
dockerConfig.put(
"docker.containerPort",
getConfigFromMetaInfoWithAppConfigOverriding(roleGroup,
"containerPort"));
dockerConfig
.put(
"docker.hostPort",
getConfigFromMetaInfoWithAppConfigOverriding(roleGroup,
"hostPort"));
dockerConfig.put(
"docker.mounting_directory",
getConfigFromMetaInfoWithAppConfigOverriding(roleGroup,
"containerMount"));
dockerConfig
.put(
"docker.host_mounting_directory",
getConfigFromMetaInfoWithAppConfigOverriding(roleGroup,
"hostMount"));
dockerConfig.put("docker.additional_param",
getConfigFromMetaInfoWithAppConfigOverriding(roleGroup, "additionalParam"));
dockerConfig.put("docker.input_file.mount_path", getConfigFromMetaInfo(
roleGroup, "containerPath"));
}
String lifetime = getConfigFromMetaInfoWithAppConfigOverriding(
roleGroup, "lifetime");
dockerConfig.put("docker.lifetime", lifetime);
configurations.put("docker", dockerConfig);
String statusCommand = getConfigFromMetaInfoWithAppConfigOverriding(
roleGroup, "statusCommand");
if (statusCommand == null) {
if(isYarnDockerContainer(roleGroup)){
//should complain the required field is null
} else {
statusCommand = "docker top "
+ containerId + " | grep \"\"";
}
}
dockerConfig.put("docker.status_command",statusCommand);
cmd.setConfigurations(configurations);
// configurations.get("global").put("exec_cmd", startCommand.getExec());
cmd.addContainerDetails(roleGroup, getMetaInfo());
log.info("Docker- command: {}", cmd.toString());
response.addExecutionCommand(cmd);
}
private String getConfigFromMetaInfo(String roleGroup, String configName) {
String result = null;
List<DockerContainer> containers = getMetaInfo().getApplicationComponent(
roleGroup).getDockerContainers();// to support multi container per
// component later
log.debug("Docker- containers metainfo: {}", containers.toString());
if (containers.size() > 0) {
DockerContainer container = containers.get(0);
switch (configName) {
case "start_command":
result = container.getStartCommand();
break;
case "image":
result = container.getImage();
break;
case "network":
if (container.getNetwork() == null || container.getNetwork().isEmpty()) {
result = "none";
} else {
result = container.getNetwork();
}
break;
case "useNetworkScript":
if (container.getUseNetworkScript() == null || container.getUseNetworkScript().isEmpty()) {
result = "yes";
} else {
result = container.getUseNetworkScript();
}
break;
case "statusCommand":
result = container.getStatusCommand();
break;
case "commandPath":
result = container.getCommandPath();
break;
case "options":
result = container.getOptions();
break;
case "containerPort":
result = container.getPorts().size() > 0 ? container.getPorts().get(0)
.getContainerPort() : null;// to support
// multi port
// later
break;
case "hostPort":
result = container.getPorts().size() > 0 ? container.getPorts().get(0)
.getHostPort() : null;// to support multi
// port later
break;
case "containerMount":
result = container.getMounts().size() > 0 ? container.getMounts()
.get(0).getContainerMount() : null;// to support
// multi port
// later
break;
case "hostMount":
result = container.getMounts().size() > 0 ? container.getMounts()
.get(0).getHostMount() : null;// to support multi
// port later
break;
case "additionalParam":
result = container.getAdditionalParam();// to support multi port later
break;
case "runPriviledgedContainer":
if (container.getRunPrivilegedContainer() == null) {
result = "false";
} else {
result = container.getRunPrivilegedContainer();
}
break;
default:
break;
}
}
log.debug("Docker- component: {} configName: {} value: {}", roleGroup, configName, result);
return result;
}
@VisibleForTesting
protected void addGetConfigCommand(String roleName, String roleGroup,
String containerId, HeartBeatResponse response) throws SliderException {
assert getAmState().isApplicationLive();
StatusCommand cmd = new StatusCommand();
String clusterName = getClusterName();
cmd.setCommandType(AgentCommandType.STATUS_COMMAND);
cmd.setComponentName(roleName);
cmd.setServiceName(clusterName);
cmd.setClusterName(clusterName);
cmd.setRoleCommand(StatusCommand.GET_CONFIG_COMMAND);
Map<String, String> hostLevelParams = new TreeMap<String, String>();
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
hostLevelParams.put(CONTAINER_ID, containerId);
response.addStatusCommand(cmd);
}
@VisibleForTesting
protected void addStartCommand(String roleName, String roleGroup, String containerId,
HeartBeatResponse response,
String scriptPath, ComponentCommand startCommand,
ComponentCommand stopCommand,
long timeout, boolean isMarkedAutoRestart)
throws
SliderException {
assert getAmState().isApplicationLive();
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot();
ExecutionCommand cmd = new ExecutionCommand(AgentCommandType.EXECUTION_COMMAND);
prepareExecutionCommand(cmd);
String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME);
String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME);
cmd.setHostname(hostName);
cmd.setClusterName(clusterName);
cmd.setRoleCommand(Command.START.toString());
cmd.setServiceName(clusterName);
cmd.setComponentName(roleName);
cmd.setRole(roleName);
Map<String, String> hostLevelParams = new TreeMap<>();
hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions().getOption(JAVA_HOME, getJDKDir()));
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
Map<String, String> roleParams = new TreeMap<>();
cmd.setRoleParams(roleParams);
cmd.getRoleParams().put("auto_restart", Boolean.toString(isMarkedAutoRestart));
Map<String, Map<String, String>> configurations = buildCommandConfigurations(appConf, containerId, roleName, roleGroup);
cmd.setConfigurations(configurations);
Map<String, Map<String, String>> componentConfigurations = buildComponentConfigurations(appConf);
cmd.setComponentConfigurations(componentConfigurations);
if (SliderUtils.isSet(scriptPath)) {
cmd.setCommandParams(commandParametersSet(scriptPath, timeout, true));
} else {
if (startCommand == null) {
throw new SliderException("Expected START command not found for component " + roleName);
}
cmd.setCommandParams(commandParametersSet(startCommand, timeout, true));
configurations.get("global").put("exec_cmd", startCommand.getExec());
}
response.addExecutionCommand(cmd);
log.debug("command looks like: {}", cmd);
// With start command, the corresponding command for graceful stop needs to
// be sent. This will be used when a particular container is lost as per RM,
// but then the agent is still running and heart-beating to the Slider AM.
ExecutionCommand cmdStop = new ExecutionCommand(
AgentCommandType.EXECUTION_COMMAND);
cmdStop.setTaskId(taskId.get());
cmdStop.setCommandId(cmdStop.getTaskId() + "-1");
cmdStop.setHostname(hostName);
cmdStop.setClusterName(clusterName);
cmdStop.setRoleCommand(Command.STOP.toString());
cmdStop.setServiceName(clusterName);
cmdStop.setComponentName(roleName);
cmdStop.setRole(roleName);
Map<String, String> hostLevelParamsStop = new TreeMap<String, String>();
hostLevelParamsStop.put(JAVA_HOME, appConf.getGlobalOptions()
.getOption(JAVA_HOME, ""));
hostLevelParamsStop.put(CONTAINER_ID, containerId);
cmdStop.setHostLevelParams(hostLevelParamsStop);
Map<String, String> roleParamsStop = new TreeMap<String, String>();
cmdStop.setRoleParams(roleParamsStop);
cmdStop.getRoleParams().put("auto_restart",
Boolean.toString(isMarkedAutoRestart));
if (SliderUtils.isSet(scriptPath)) {
cmdStop.setCommandParams(commandParametersSet(scriptPath, timeout, true));
} else {
if (stopCommand == null) {
stopCommand = ComponentCommand.getDefaultComponentCommand("STOP");
}
cmd.setCommandParams(commandParametersSet(stopCommand, timeout, true));
configurations.get("global").put("exec_cmd", startCommand.getExec());
}
Map<String, Map<String, String>> configurationsStop = buildCommandConfigurations(
appConf, containerId, roleName, roleGroup);
cmdStop.setConfigurations(configurationsStop);
response.addExecutionCommand(cmdStop);
}
@VisibleForTesting
protected void addUpgradeCommand(String roleName, String roleGroup, String containerId,
HeartBeatResponse response, String scriptPath, long timeout)
throws SliderException {
assert getAmState().isApplicationLive();
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot();
ExecutionCommand cmd = new ExecutionCommand(
AgentCommandType.EXECUTION_COMMAND);
prepareExecutionCommand(cmd);
String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME);
String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME);
cmd.setHostname(hostName);
cmd.setClusterName(clusterName);
cmd.setRoleCommand(Command.UPGRADE.toString());
cmd.setServiceName(clusterName);
cmd.setComponentName(roleName);
cmd.setRole(roleName);
Map<String, String> hostLevelParams = new TreeMap<String, String>();
hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions()
.getMandatoryOption(JAVA_HOME));
hostLevelParams.put(CONTAINER_ID, containerId);
cmd.setHostLevelParams(hostLevelParams);
cmd.setCommandParams(commandParametersSet(scriptPath, timeout, true));
Map<String, Map<String, String>> configurations = buildCommandConfigurations(
appConf, containerId, roleName, roleGroup);
cmd.setConfigurations(configurations);
response.addExecutionCommand(cmd);
}
protected void addStopCommand(String roleName, String roleGroup, String containerId,
HeartBeatResponse response, String scriptPath, long timeout,
boolean isInUpgradeMode) throws SliderException {
assert getAmState().isApplicationLive();
ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot();
ExecutionCommand cmdStop = new ExecutionCommand(
AgentCommandType.EXECUTION_COMMAND);
cmdStop.setTaskId(taskId.get());
cmdStop.setCommandId(cmdStop.getTaskId() + "-1");
String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME);
String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME);
cmdStop.setHostname(hostName);
cmdStop.setClusterName(clusterName);
// Upgrade stop is differentiated by passing a transformed role command -
// UPGRADE_STOP
cmdStop.setRoleCommand(Command.transform(Command.STOP, isInUpgradeMode));
cmdStop.setServiceName(clusterName);
cmdStop.setComponentName(roleName);
cmdStop.setRole(roleName);
Map<String, String> hostLevelParamsStop = new TreeMap<String, String>();
hostLevelParamsStop.put(JAVA_HOME, appConf.getGlobalOptions()
.getMandatoryOption(JAVA_HOME));
hostLevelParamsStop.put(CONTAINER_ID, containerId);
cmdStop.setHostLevelParams(hostLevelParamsStop);
cmdStop.setCommandParams(commandParametersSet(scriptPath, timeout, true));
Map<String, Map<String, String>> configurationsStop = buildCommandConfigurations(
appConf, containerId, roleName, roleGroup);
cmdStop.setConfigurations(configurationsStop);
response.addExecutionCommand(cmdStop);
}
protected static String getJDKDir() {
File javaHome = new File(System.getProperty("java.home")).getParentFile();
File jdkDirectory = null;
if (javaHome.getName().contains("jdk")) {
jdkDirectory = javaHome;
}
if (jdkDirectory != null) {
return jdkDirectory.getAbsolutePath();
} else {
return "";
}
}
protected Map<String, String> getAllocatedPorts() {
return getAllocatedPorts(SHARED_PORT_TAG);
}
protected Map<String, Map<String, String>> getComponentInstanceData() {
return this.componentInstanceData;
}
protected Map<String, String> getAllocatedPorts(String containerId) {
if (!this.allocatedPorts.containsKey(containerId)) {
synchronized (this.allocatedPorts) {
if (!this.allocatedPorts.containsKey(containerId)) {
this.allocatedPorts.put(containerId,
new ConcurrentHashMap<String, String>());
}
}
}
return this.allocatedPorts.get(containerId);
}
private Map<String, Map<String, String>> buildCommandConfigurations(
ConfTreeOperations appConf, String containerId, String roleName, String roleGroup)
throws SliderException {
Map<String, Map<String, String>> configurations =
new TreeMap<String, Map<String, String>>();
Map<String, String> tokens = getStandardTokenMap(appConf, roleName, roleGroup);
tokens.put("${CONTAINER_ID}", containerId);
Set<String> configs = new HashSet<String>();
configs.addAll(getApplicationConfigurationTypes(roleGroup));
configs.addAll(getSystemConfigurationsRequested(appConf));
for (String configType : configs) {
addNamedConfiguration(configType, appConf.getGlobalOptions().options,
configurations, tokens, containerId, roleName);
if (appConf.getComponent(roleGroup) != null) {
addNamedConfiguration(configType, appConf.getComponent(roleGroup).options,
configurations, tokens, containerId, roleName);
}
}
//do a final replacement of re-used configs
dereferenceAllConfigs(configurations);
return configurations;
}
protected void dereferenceAllConfigs(Map<String, Map<String, String>> configurations) {
Map<String, String> allConfigs = new HashMap<String, String>();
String lookupFormat = "${@//site/%s/%s}";
for (String configType : configurations.keySet()) {
Map<String, String> configBucket = configurations.get(configType);
for (String configName : configBucket.keySet()) {
allConfigs.put(String.format(lookupFormat, configType, configName), configBucket.get(configName));
}
}
boolean finished = false;
while (!finished) {
finished = true;
for (Map.Entry<String, String> entry : allConfigs.entrySet()) {
String configValue = entry.getValue();
for (Map.Entry<String, String> lookUpEntry : allConfigs.entrySet()) {
String lookUpValue = lookUpEntry.getValue();
if (lookUpValue.contains("${@//site/")) {
continue;
}
String lookUpKey = lookUpEntry.getKey();
if (configValue != null && configValue.contains(lookUpKey)) {
configValue = configValue.replace(lookUpKey, lookUpValue);
}
}
if (!configValue.equals(entry.getValue())) {
finished = false;
allConfigs.put(entry.getKey(), configValue);
}
}
}
for (String configType : configurations.keySet()) {
Map<String, String> configBucket = configurations.get(configType);
for (Map.Entry<String, String> entry: configBucket.entrySet()) {
String configName = entry.getKey();
String configValue = entry.getValue();
for (Map.Entry<String, String> lookUpEntry : allConfigs.entrySet()) {
String lookUpValue = lookUpEntry.getValue();
if (lookUpValue.contains("${@//site/")) {
continue;
}
String lookUpKey = lookUpEntry.getKey();
if (configValue != null && configValue.contains(lookUpKey)) {
configValue = configValue.replace(lookUpKey, lookUpValue);
}
}
configBucket.put(configName, configValue);
}
}
}
private Map<String, String> getStandardTokenMap(ConfTreeOperations appConf,
String componentName, String componentGroup) throws SliderException {
Map<String, String> tokens = new HashMap<String, String>();
String nnuri = appConf.get("site.fs.defaultFS");
tokens.put("${NN_URI}", nnuri);
tokens.put("${NN_HOST}", URI.create(nnuri).getHost());
tokens.put("${ZK_HOST}", appConf.get(OptionKeys.ZOOKEEPER_HOSTS));
tokens.put("${DEFAULT_ZK_PATH}", appConf.get(OptionKeys.ZOOKEEPER_PATH));
tokens.put("${DEFAULT_DATA_DIR}", getAmState()
.getInternalsSnapshot()
.getGlobalOptions()
.getMandatoryOption(InternalKeys.INTERNAL_DATA_DIR_PATH));
tokens.put("${JAVA_HOME}", appConf.get(AgentKeys.JAVA_HOME));
tokens.put("${COMPONENT_NAME}", componentName);
if (!componentName.equals(componentGroup) && componentName.startsWith(componentGroup)) {
tokens.put("${COMPONENT_ID}", componentName.substring(componentGroup.length()));
}
return tokens;
}
@VisibleForTesting
protected List<String> getSystemConfigurationsRequested(ConfTreeOperations appConf) {
List<String> configList = new ArrayList<String>();
String configTypes = appConf.get(AgentKeys.SYSTEM_CONFIGS);
if (configTypes != null && configTypes.length() > 0) {
String[] configs = configTypes.split(",");
for (String config : configs) {
configList.add(config.trim());
}
}
return new ArrayList<String>(new HashSet<String>(configList));
}
@VisibleForTesting
protected List<String> getApplicationConfigurationTypes(String roleGroup) {
List<String> configList = new ArrayList<String>();
configList.add(GLOBAL_CONFIG_TAG);
List<ConfigFile> configFiles = getMetaInfo().getApplication().getConfigFiles();
for (ConfigFile configFile : configFiles) {
log.info("Expecting config type {}.", configFile.getDictionaryName());
configList.add(configFile.getDictionaryName());
}
for (Component component : getMetaInfo().getApplication().getComponents()) {
if (!component.getName().equals(roleGroup)) {
continue;
}
if (component.getDockerContainers() == null) {
continue;
}
for (DockerContainer container : component.getDockerContainers()) {
if (container.getConfigFiles() == null) {
continue;
}
for (ConfigFile configFile : container.getConfigFiles()) {
log.info("Expecting config type {}.", configFile.getDictionaryName());
configList.add(configFile.getDictionaryName());
}
}
}
// remove duplicates. mostly worried about 'global' being listed
return new ArrayList<String>(new HashSet<String>(configList));
}
private void addNamedConfiguration(String configName, Map<String, String> sourceConfig,
Map<String, Map<String, String>> configurations,
Map<String, String> tokens, String containerId,
String roleName) {
Map<String, String> config = new HashMap<String, String>();
if (configName.equals(GLOBAL_CONFIG_TAG)) {
addDefaultGlobalConfig(config, containerId, roleName);
}
// add role hosts to tokens
addRoleRelatedTokens(tokens);
providerUtils.propagateSiteOptions(sourceConfig, config, configName, tokens);
//apply any port updates
if (!this.getAllocatedPorts().isEmpty()) {
for (String key : config.keySet()) {
String value = config.get(key);
String lookupKey = configName + "." + key;
if (!value.contains(PER_CONTAINER_TAG)) {
// If the config property is shared then pass on the already allocated value
// from any container
if (this.getAllocatedPorts().containsKey(lookupKey)) {
config.put(key, getAllocatedPorts().get(lookupKey));
}
} else {
if (this.getAllocatedPorts(containerId).containsKey(lookupKey)) {
config.put(key, getAllocatedPorts(containerId).get(lookupKey));
}
}
}
}
//apply defaults only if the key is not present and value is not empty
if (getDefaultConfigs().containsKey(configName)) {
log.info("Adding default configs for type {}.", configName);
for (PropertyInfo defaultConfigProp : getDefaultConfigs().get(configName).getPropertyInfos()) {
if (!config.containsKey(defaultConfigProp.getName())) {
if (!defaultConfigProp.getName().isEmpty() &&
defaultConfigProp.getValue() != null &&
!defaultConfigProp.getValue().isEmpty()) {
config.put(defaultConfigProp.getName(), defaultConfigProp.getValue());
}
}
}
}
configurations.put(configName, config);
}
protected void addRoleRelatedTokens(Map<String, String> tokens) {
for (Map.Entry<String, Map<String, ClusterNode>> entry : getRoleClusterNodeMapping().entrySet()) {
String tokenName = entry.getKey().toUpperCase(Locale.ENGLISH) + "_HOST";
String hosts = StringUtils.join(",", getHostsList(entry.getValue().values(), true));
tokens.put("${" + tokenName + "}", hosts);
}
}
private Iterable<String> getHostsList(Collection<ClusterNode> values,
boolean hostOnly) {
List<String> hosts = new ArrayList<String>();
for (ClusterNode cn : values) {
hosts.add(hostOnly ? cn.host : cn.host + "/" + cn.name);
}
return hosts;
}
private void addDefaultGlobalConfig(Map<String, String> config, String containerId, String roleName) {
config.put("app_log_dir", "${AGENT_LOG_ROOT}");
config.put("app_pid_dir", "${AGENT_WORK_ROOT}/app/run");
config.put("app_install_dir", "${AGENT_WORK_ROOT}/app/install");
config.put("app_conf_dir", "${AGENT_WORK_ROOT}/" + AgentKeys.APP_CONF_DIR);
config.put("app_input_conf_dir", "${AGENT_WORK_ROOT}/" + SliderKeys.PROPAGATED_CONF_DIR_NAME);
config.put("app_container_id", containerId);
config.put("app_container_tag", tags.getTag(roleName, containerId));
// add optional parameters only if they are not already provided
if (!config.containsKey("pid_file")) {
config.put("pid_file", "${AGENT_WORK_ROOT}/app/run/component.pid");
}
if (!config.containsKey("app_root")) {
config.put("app_root", "${AGENT_WORK_ROOT}/app/install");
}
}
private void buildRoleHostDetails(Map<String, MonitorDetail> details) {
for (Map.Entry<String, Map<String, ClusterNode>> entry :
getRoleClusterNodeMapping().entrySet()) {
details.put(entry.getKey() + " Host(s)/Container(s)",
new MonitorDetail(getHostsList(entry.getValue().values(), false).toString(), false));
}
}
}