| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ambari.server.controller.internal; |
| |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.AGENT_STACK_RETRY_COUNT; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.AGENT_STACK_RETRY_ON_UNAVAILABILITY; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.DB_NAME; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.GPL_LICENSE_ACCEPTED; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.GROUP_LIST; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.HOST_SYS_PREPPED; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.JAVA_HOME; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.JAVA_VERSION; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.JCE_NAME; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.JDK_LOCATION; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.JDK_NAME; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.MYSQL_JDBC_URL; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.NOT_MANAGED_HDFS_PATH_LIST; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.ORACLE_JDBC_URL; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.PACKAGE_LIST; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SERVICE_REPO_INFO; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_NAME; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_VERSION; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.USER_GROUPS; |
| import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.USER_LIST; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.PrintWriter; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| |
| import org.apache.ambari.server.AmbariException; |
| import org.apache.ambari.server.api.services.AmbariMetaInfo; |
| import org.apache.ambari.server.configuration.Configuration; |
| import org.apache.ambari.server.controller.AmbariManagementController; |
| import org.apache.ambari.server.controller.MaintenanceStateHelper; |
| import org.apache.ambari.server.controller.ServiceComponentHostRequest; |
| import org.apache.ambari.server.controller.ServiceComponentHostResponse; |
| import org.apache.ambari.server.controller.spi.NoSuchParentResourceException; |
| import org.apache.ambari.server.controller.spi.NoSuchResourceException; |
| import org.apache.ambari.server.controller.spi.Predicate; |
| import org.apache.ambari.server.controller.spi.Request; |
| import org.apache.ambari.server.controller.spi.RequestStatus; |
| import org.apache.ambari.server.controller.spi.Resource; |
| import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException; |
| import org.apache.ambari.server.controller.spi.SystemException; |
| import org.apache.ambari.server.controller.spi.UnsupportedPropertyException; |
| import org.apache.ambari.server.controller.utilities.PropertyHelper; |
| import org.apache.ambari.server.state.ClientConfigFileDefinition; |
| import org.apache.ambari.server.state.Cluster; |
| import org.apache.ambari.server.state.Clusters; |
| import org.apache.ambari.server.state.ComponentInfo; |
| import org.apache.ambari.server.state.Config; |
| import org.apache.ambari.server.state.ConfigHelper; |
| import org.apache.ambari.server.state.DesiredConfig; |
| import org.apache.ambari.server.state.PropertyInfo.PropertyType; |
| import org.apache.ambari.server.state.ServiceInfo; |
| import org.apache.ambari.server.state.ServiceOsSpecific; |
| import org.apache.ambari.server.state.StackId; |
| import org.apache.ambari.server.utils.SecretReference; |
| import org.apache.ambari.server.utils.StageUtils; |
| import org.apache.commons.compress.archivers.tar.TarArchiveEntry; |
| import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; |
| import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; |
| import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; |
| import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; |
| import org.apache.commons.compress.utils.IOUtils; |
| import org.apache.commons.lang.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.gson.Gson; |
| import com.google.inject.assistedinject.Assisted; |
| import com.google.inject.assistedinject.AssistedInject; |
| |
| /** |
| * Resource provider for client config resources. |
| */ |
| public class ClientConfigResourceProvider extends AbstractControllerResourceProvider { |
| |
| |
| // ----- Property ID constants --------------------------------------------- |
| |
| protected static final String COMPONENT_CLUSTER_NAME_PROPERTY_ID = "ServiceComponentInfo/cluster_name"; |
| protected static final String COMPONENT_SERVICE_NAME_PROPERTY_ID = "ServiceComponentInfo/service_name"; |
| protected static final String COMPONENT_COMPONENT_NAME_PROPERTY_ID = "ServiceComponentInfo/component_name"; |
| protected static final String HOST_COMPONENT_HOST_NAME_PROPERTY_ID = |
| PropertyHelper.getPropertyId("HostRoles", "host_name"); |
| |
| private final Gson gson; |
| |
| private static Set<String> pkPropertyIds = |
| new HashSet<>(Arrays.asList(new String[]{ |
| COMPONENT_CLUSTER_NAME_PROPERTY_ID, |
| COMPONENT_SERVICE_NAME_PROPERTY_ID, |
| COMPONENT_COMPONENT_NAME_PROPERTY_ID})); |
| |
| private MaintenanceStateHelper maintenanceStateHelper; |
| private static final Logger LOG = LoggerFactory.getLogger(ClientConfigResourceProvider.class); |
| |
| // ----- Constructors ---------------------------------------------------- |
| |
| /** |
| * Create a new resource provider for the given management controller. |
| * |
| * @param propertyIds the property ids |
| * @param keyPropertyIds the key property ids |
| * @param managementController the management controller |
| */ |
| @AssistedInject |
| ClientConfigResourceProvider(@Assisted Set<String> propertyIds, |
| @Assisted Map<Resource.Type, String> keyPropertyIds, |
| @Assisted AmbariManagementController managementController) { |
| super(propertyIds, keyPropertyIds, managementController); |
| gson = new Gson(); |
| } |
| |
| // ----- ResourceProvider ------------------------------------------------ |
| |
| @Override |
| public RequestStatus createResources(Request request) |
| throws SystemException, |
| UnsupportedPropertyException, |
| ResourceAlreadyExistsException, |
| NoSuchParentResourceException { |
| |
| throw new SystemException("The request is not supported"); |
| } |
| |
| @Override |
| public Set<Resource> getResources(Request request, Predicate predicate) |
| throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { |
| |
| Set<Resource> resources = new HashSet<>(); |
| |
| final Set<ServiceComponentHostRequest> requests = new HashSet<>(); |
| |
| for (Map<String, Object> propertyMap : getPropertyMaps(predicate)) { |
| requests.add(getRequest(propertyMap)); |
| } |
| |
| Set<ServiceComponentHostResponse> responses = null; |
| try { |
| responses = getResources(new Command<Set<ServiceComponentHostResponse>>() { |
| @Override |
| public Set<ServiceComponentHostResponse> invoke() throws AmbariException { |
| return getManagementController().getHostComponents(requests); |
| } |
| }); |
| } catch (Exception e) { |
| throw new SystemException("Failed to get components ", e); |
| } |
| |
| Map<String,ServiceComponentHostResponse> componentMap = new HashMap<>(); |
| |
| // reduce set of sch responses to one sch response for every service component |
| for (ServiceComponentHostResponse resp: responses) { |
| String componentName = resp.getComponentName(); |
| if (!componentMap.containsKey(componentName)) { |
| componentMap.put(resp.getComponentName(),resp); |
| } |
| } |
| |
| ServiceComponentHostRequest schRequest = requests.iterator().next(); |
| String requestComponentName = schRequest.getComponentName(); |
| String requestServiceName = schRequest.getServiceName(); |
| String requestHostName = schRequest.getHostname(); |
| |
| Map<String,List<ServiceComponentHostResponse>> serviceToComponentMap = new HashMap<>(); |
| |
| // sch response for the service components that have configFiles defined in the stack definition of the service |
| List <ServiceComponentHostResponse> schWithConfigFiles = new ArrayList<>(); |
| |
| Configuration configs = new Configuration(); |
| Map<String, String> configMap = configs.getConfigsMap(); |
| String TMP_PATH = configMap.get(Configuration.SERVER_TMP_DIR.getKey()); |
| String pythonCmd = configMap.get(Configuration.AMBARI_PYTHON_WRAP.getKey()); |
| List<String> pythonCompressFilesCmds = new ArrayList<>(); |
| List<File> commandFiles = new ArrayList<>(); |
| |
| for (ServiceComponentHostResponse response : componentMap.values()){ |
| |
| AmbariManagementController managementController = getManagementController(); |
| ConfigHelper configHelper = managementController.getConfigHelper(); |
| Cluster cluster = null; |
| Clusters clusters = managementController.getClusters(); |
| try { |
| cluster = clusters.getCluster(response.getClusterName()); |
| |
| StackId stackId = cluster.getCurrentStackVersion(); |
| String serviceName = response.getServiceName(); |
| String componentName = response.getComponentName(); |
| String hostName = response.getHostname(); |
| String publicHostName = response.getPublicHostname(); |
| ComponentInfo componentInfo = null; |
| String packageFolder = null; |
| |
| componentInfo = managementController.getAmbariMetaInfo(). |
| getComponent(stackId.getStackName(), stackId.getStackVersion(), serviceName, componentName); |
| packageFolder = managementController.getAmbariMetaInfo(). |
| getService(stackId.getStackName(), stackId.getStackVersion(), serviceName).getServicePackageFolder(); |
| |
| String commandScript = componentInfo.getCommandScript().getScript(); |
| List<ClientConfigFileDefinition> clientConfigFiles = componentInfo.getClientConfigFiles(); |
| |
| if (clientConfigFiles == null) { |
| if (componentMap.size() == 1) { |
| throw new SystemException("No configuration files defined for the component " + componentInfo.getName()); |
| } else { |
| LOG.debug(String.format("No configuration files defined for the component %s",componentInfo.getName())); |
| continue; |
| } |
| } |
| |
| // service component hosts that have configFiles defined in the stack definition of the service |
| schWithConfigFiles.add(response); |
| |
| if (serviceToComponentMap.containsKey(response.getServiceName())) { |
| List <ServiceComponentHostResponse> schResponseList = serviceToComponentMap.get(serviceName); |
| schResponseList.add(response); |
| } else { |
| List <ServiceComponentHostResponse> schResponseList = new ArrayList<>(); |
| schResponseList.add(response); |
| serviceToComponentMap.put(serviceName,schResponseList); |
| } |
| |
| String resourceDirPath = configs.getResourceDirPath(); |
| String packageFolderAbsolute = resourceDirPath + File.separator + packageFolder; |
| |
| String commandScriptAbsolute = packageFolderAbsolute + File.separator + commandScript; |
| |
| |
| Map<String, Map<String, String>> configurations = new TreeMap<>(); |
| Map<String, Long> configVersions = new TreeMap<>(); |
| Map<String, Map<PropertyType, Set<String>>> configPropertiesTypes = new TreeMap<>(); |
| Map<String, Map<String, Map<String, String>>> configurationAttributes = new TreeMap<>(); |
| |
| Map<String, DesiredConfig> desiredClusterConfigs = cluster.getDesiredConfigs(); |
| |
| //Get configurations and configuration attributes |
| for (Map.Entry<String, DesiredConfig> desiredConfigEntry : desiredClusterConfigs.entrySet()) { |
| |
| String configType = desiredConfigEntry.getKey(); |
| DesiredConfig desiredConfig = desiredConfigEntry.getValue(); |
| Config clusterConfig = cluster.getConfig(configType, desiredConfig.getTag()); |
| |
| if (clusterConfig != null) { |
| Map<String, String> props = new HashMap<>(clusterConfig.getProperties()); |
| |
| // Apply global properties for this host from all config groups |
| Map<String, Map<String, String>> allConfigTags = null; |
| allConfigTags = configHelper |
| .getEffectiveDesiredTags(cluster, schRequest.getHostname()); |
| |
| Map<String, Map<String, String>> configTags = new HashMap<>(); |
| |
| for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) { |
| if (entry.getKey().equals(clusterConfig.getType())) { |
| configTags.put(clusterConfig.getType(), entry.getValue()); |
| } |
| } |
| |
| Map<String, Map<String, String>> properties = configHelper |
| .getEffectiveConfigProperties(cluster, configTags); |
| |
| if (!properties.isEmpty()) { |
| for (Map<String, String> propertyMap : properties.values()) { |
| props.putAll(propertyMap); |
| } |
| } |
| |
| configurations.put(clusterConfig.getType(), props); |
| configVersions.put(clusterConfig.getType(), clusterConfig.getVersion()); |
| configPropertiesTypes.put(clusterConfig.getType(), clusterConfig.getPropertiesTypes()); |
| |
| Map<String, Map<String, String>> attrs = new TreeMap<>(); |
| configHelper.cloneAttributesMap(clusterConfig.getPropertiesAttributes(), attrs); |
| |
| Map<String, Map<String, Map<String, String>>> attributes = configHelper |
| .getEffectiveConfigAttributes(cluster, configTags); |
| for (Map<String, Map<String, String>> attributesMap : attributes.values()) { |
| configHelper.cloneAttributesMap(attributesMap, attrs); |
| } |
| configurationAttributes.put(clusterConfig.getType(), attrs); |
| } |
| } |
| |
| ConfigHelper.processHiddenAttribute(configurations, configurationAttributes, componentName, true); |
| |
| for (Map.Entry<String, Map<String, Map<String, String>>> configurationAttributesEntry : configurationAttributes.entrySet()) { |
| Map<String, Map<String, String>> attrs = configurationAttributesEntry.getValue(); |
| // remove internal attributes like "hidden" |
| attrs.remove("hidden"); |
| } |
| |
| // replace passwords on password references |
| for (Map.Entry<String, Map<String, String>> configEntry : configurations.entrySet()) { |
| String configType = configEntry.getKey(); |
| Map<String, String> configProperties = configEntry.getValue(); |
| Long configVersion = configVersions.get(configType); |
| Map<PropertyType, Set<String>> propertiesTypes = configPropertiesTypes.get(configType); |
| SecretReference.replacePasswordsWithReferences(propertiesTypes, configProperties, configType, configVersion); |
| } |
| |
| Map<String, Set<String>> clusterHostInfo = null; |
| ServiceInfo serviceInfo = null; |
| String osFamily = null; |
| clusterHostInfo = StageUtils.getClusterHostInfo(cluster); |
| serviceInfo = managementController.getAmbariMetaInfo().getService(stackId.getStackName(), |
| stackId.getStackVersion(), serviceName); |
| try { |
| clusterHostInfo = StageUtils.substituteHostIndexes(clusterHostInfo); |
| } catch (AmbariException e) { |
| // Before moving substituteHostIndexes to StageUtils, a SystemException was thrown in the |
| // event an index could not be mapped to a host. After the move, this was changed to an |
| // AmbariException for consistency in the StageUtils class. To keep this method consistent |
| // with how it behaved in the past, if an AmbariException is thrown, it is caught and |
| // translated to a SystemException. |
| throw new SystemException(e.getMessage(), e); |
| } |
| osFamily = clusters.getHost(hostName).getOsFamily(); |
| |
| TreeMap<String, String> hostLevelParams = new TreeMap<>(); |
| hostLevelParams.put(JDK_LOCATION, managementController.getJdkResourceUrl()); |
| hostLevelParams.put(JAVA_HOME, managementController.getJavaHome()); |
| hostLevelParams.put(JAVA_VERSION, String.valueOf(configs.getJavaVersion())); |
| hostLevelParams.put(JDK_NAME, managementController.getJDKName()); |
| hostLevelParams.put(JCE_NAME, managementController.getJCEName()); |
| hostLevelParams.put(STACK_NAME, stackId.getStackName()); |
| hostLevelParams.put(STACK_VERSION, stackId.getStackVersion()); |
| hostLevelParams.put(DB_NAME, managementController.getServerDB()); |
| hostLevelParams.put(MYSQL_JDBC_URL, managementController.getMysqljdbcUrl()); |
| hostLevelParams.put(ORACLE_JDBC_URL, managementController.getOjdbcUrl()); |
| hostLevelParams.put(HOST_SYS_PREPPED, configs.areHostsSysPrepped()); |
| hostLevelParams.putAll(managementController.getRcaParameters()); |
| hostLevelParams.put(AGENT_STACK_RETRY_ON_UNAVAILABILITY, configs.isAgentStackRetryOnInstallEnabled()); |
| hostLevelParams.put(AGENT_STACK_RETRY_COUNT, configs.getAgentStackRetryOnInstallCount()); |
| hostLevelParams.put(GPL_LICENSE_ACCEPTED, configs.getGplLicenseAccepted().toString()); |
| |
| // Write down os specific info for the service |
| ServiceOsSpecific anyOs = null; |
| if (serviceInfo.getOsSpecifics().containsKey(AmbariMetaInfo.ANY_OS)) { |
| anyOs = serviceInfo.getOsSpecifics().get(AmbariMetaInfo.ANY_OS); |
| } |
| |
| ServiceOsSpecific hostOs = populateServicePackagesInfo(serviceInfo, hostLevelParams, osFamily); |
| |
| // Build package list that is relevant for host |
| List<ServiceOsSpecific.Package> packages = |
| new ArrayList<>(); |
| if (anyOs != null) { |
| packages.addAll(anyOs.getPackages()); |
| } |
| |
| if (hostOs != null) { |
| packages.addAll(hostOs.getPackages()); |
| } |
| String packageList = gson.toJson(packages); |
| hostLevelParams.put(PACKAGE_LIST, packageList); |
| |
| Set<String> userSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.USER, cluster, desiredClusterConfigs); |
| String userList = gson.toJson(userSet); |
| hostLevelParams.put(USER_LIST, userList); |
| |
| //Create a user_group mapping and send it as part of the hostLevelParams |
| Map<String, Set<String>> userGroupsMap = configHelper.createUserGroupsMap( |
| stackId, cluster, desiredClusterConfigs); |
| String userGroups = gson.toJson(userGroupsMap); |
| hostLevelParams.put(USER_GROUPS,userGroups); |
| |
| Set<String> groupSet = configHelper.getPropertyValuesWithPropertyType(stackId, PropertyType.GROUP, cluster, desiredClusterConfigs); |
| String groupList = gson.toJson(groupSet); |
| hostLevelParams.put(GROUP_LIST, groupList); |
| |
| Map<org.apache.ambari.server.state.PropertyInfo, String> notManagedHdfsPathMap = configHelper.getPropertiesWithPropertyType(stackId, PropertyType.NOT_MANAGED_HDFS_PATH, cluster, desiredClusterConfigs); |
| Set<String> notManagedHdfsPathSet = configHelper.filterInvalidPropertyValues(notManagedHdfsPathMap, NOT_MANAGED_HDFS_PATH_LIST); |
| String notManagedHdfsPathList = gson.toJson(notManagedHdfsPathSet); |
| hostLevelParams.put(NOT_MANAGED_HDFS_PATH_LIST, notManagedHdfsPathList); |
| |
| String jsonConfigurations = null; |
| Map<String, Object> commandParams = new HashMap<>(); |
| List<Map<String, String>> xmlConfigs = new LinkedList<>(); |
| List<Map<String, String>> envConfigs = new LinkedList<>(); |
| List<Map<String, String>> propertiesConfigs = new LinkedList<>(); |
| |
| //Fill file-dictionary configs from metainfo |
| for (ClientConfigFileDefinition clientConfigFile : clientConfigFiles) { |
| Map<String, String> fileDict = new HashMap<>(); |
| fileDict.put(clientConfigFile.getFileName(), clientConfigFile.getDictionaryName()); |
| if (clientConfigFile.getType().equals("xml")) { |
| xmlConfigs.add(fileDict); |
| } else if (clientConfigFile.getType().equals("env")) { |
| envConfigs.add(fileDict); |
| } else if (clientConfigFile.getType().equals("properties")) { |
| propertiesConfigs.add(fileDict); |
| } |
| } |
| |
| commandParams.put("xml_configs_list", xmlConfigs); |
| commandParams.put("env_configs_list", envConfigs); |
| commandParams.put("properties_configs_list", propertiesConfigs); |
| commandParams.put("output_file", componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION); |
| |
| Map<String, Object> jsonContent = new TreeMap<>(); |
| jsonContent.put("configurations", configurations); |
| jsonContent.put("configuration_attributes", configurationAttributes); |
| jsonContent.put("commandParams", commandParams); |
| jsonContent.put("clusterHostInfo", clusterHostInfo); |
| jsonContent.put("hostLevelParams", hostLevelParams); |
| jsonContent.put("hostname", hostName); |
| jsonContent.put("public_hostname", publicHostName); |
| jsonContent.put("clusterName", cluster.getClusterName()); |
| jsonContent.put("serviceName", serviceName); |
| jsonContent.put("role", componentName); |
| jsonConfigurations = gson.toJson(jsonContent); |
| |
| File tmpDirectory = new File(TMP_PATH); |
| if (!tmpDirectory.exists()) { |
| try { |
| tmpDirectory.mkdirs(); |
| tmpDirectory.setWritable(true, true); |
| tmpDirectory.setReadable(true, true); |
| } catch (SecurityException se) { |
| throw new SystemException("Failed to get temporary directory to store configurations", se); |
| } |
| } |
| File jsonFile = File.createTempFile(componentName, "-configuration.json", tmpDirectory); |
| try { |
| jsonFile.setWritable(true, true); |
| jsonFile.setReadable(true, true); |
| } catch (SecurityException e) { |
| throw new SystemException("Failed to set permission", e); |
| } |
| |
| PrintWriter printWriter = null; |
| try { |
| printWriter = new PrintWriter(jsonFile.getAbsolutePath()); |
| printWriter.print(jsonConfigurations); |
| printWriter.close(); |
| } catch (FileNotFoundException e) { |
| throw new SystemException("Failed to write configurations to json file ", e); |
| } |
| |
| String cmd = pythonCmd + " " + commandScriptAbsolute + " generate_configs " + jsonFile.getAbsolutePath() + " " + |
| packageFolderAbsolute + " " + TMP_PATH + File.separator + "structured-out.json" + " INFO " + TMP_PATH; |
| |
| commandFiles.add(jsonFile); |
| pythonCompressFilesCmds.add(cmd); |
| |
| } catch (AmbariException e) { |
| throw new SystemException("Controller error ", e); |
| } catch (IOException e) { |
| throw new SystemException("Controller error ", e); |
| } |
| } |
| |
| if (schWithConfigFiles.isEmpty()) { |
| throw new SystemException("No configuration files defined for any component" ); |
| } |
| |
| Integer totalCommands = pythonCompressFilesCmds.size() * 2; |
| Integer threadPoolSize = Math.min(totalCommands,configs.getExternalScriptThreadPoolSize()); |
| ExecutorService processExecutor = Executors.newFixedThreadPool(threadPoolSize); |
| |
| // put all threads that starts process to compress each component config files in the executor |
| try { |
| List<CommandLineThreadWrapper> pythonCmdThreads = executeCommands(processExecutor, pythonCompressFilesCmds); |
| |
| // wait for all threads to finish |
| Integer timeout = configs.getExternalScriptTimeout(); |
| waitForAllThreadsToJoin(processExecutor, pythonCmdThreads, timeout); |
| } finally { |
| for (File each : commandFiles) { |
| each.delete(); |
| } |
| } |
| |
| if (StringUtils.isEmpty(requestComponentName)) { |
| TarUtils tarUtils; |
| String fileName; |
| List <ServiceComponentHostResponse> schToTarConfigFiles = schWithConfigFiles; |
| if (StringUtils.isNotEmpty(requestHostName)) { |
| fileName = requestHostName + "(" + Resource.InternalType.Host.toString().toUpperCase()+")"; |
| } else if (StringUtils.isNotEmpty(requestServiceName)) { |
| fileName = requestServiceName + "(" + Resource.InternalType.Service.toString().toUpperCase()+")"; |
| schToTarConfigFiles = serviceToComponentMap.get(requestServiceName); |
| } else { |
| fileName = schRequest.getClusterName() + "(" + Resource.InternalType.Cluster.toString().toUpperCase()+")"; |
| } |
| tarUtils = new TarUtils(TMP_PATH, fileName, schToTarConfigFiles); |
| tarUtils.tarConfigFiles(); |
| } |
| |
| Resource resource = new ResourceImpl(Resource.Type.ClientConfig); |
| resources.add(resource); |
| return resources; |
| } |
| |
| /** |
| * Execute all external script commands |
| * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool |
| * @param commandLines List of {String} commands that starts the python process to compress config files |
| * @return {@link CommandLineThreadWrapper} |
| * @throws SystemException |
| */ |
| private List<CommandLineThreadWrapper> executeCommands(final ExecutorService processExecutor, List<String> commandLines) |
| throws SystemException { |
| List <CommandLineThreadWrapper> commandLineThreadWrappers = new ArrayList<>(); |
| try { |
| for (String commandLine : commandLines) { |
| CommandLineThreadWrapper commandLineThreadWrapper = executeCommand(processExecutor,commandLine); |
| commandLineThreadWrappers.add(commandLineThreadWrapper); |
| } |
| } catch (IOException e) { |
| LOG.error("Failed to run generate client configs script for components"); |
| processExecutor.shutdownNow(); |
| throw new SystemException("Failed to run generate client configs script for components"); |
| } |
| return commandLineThreadWrappers; |
| } |
| |
| |
| /** |
| * Execute external script command |
| * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool |
| * @param commandLine {String} command that starts the python process to compress config files |
| * @return {@link CommandLineThreadWrapper} |
| * @throws IOException |
| */ |
| private CommandLineThreadWrapper executeCommand(final ExecutorService processExecutor, final String commandLine) |
| throws IOException { |
| ProcessBuilder builder = new ProcessBuilder(Arrays.asList(commandLine.split("\\s+"))); |
| builder.redirectErrorStream(true); |
| Process process = builder.start(); |
| CommandLineThread commandLineThread = new CommandLineThread(process); |
| LogStreamReader logStream = new LogStreamReader(process.getInputStream()); |
| Thread logStreamThread = new Thread(logStream, "LogStreamReader"); |
| // log collecting thread should be always put first in the executor |
| processExecutor.execute(logStreamThread); |
| processExecutor.execute(commandLineThread); |
| return new CommandLineThreadWrapper(commandLine, commandLineThread, |
| logStreamThread, logStream, process); |
| } |
| |
| |
| /** |
| * Waits for all threads to join that have started python process to tar config files for component |
| * @param processExecutor {@link ExecutorService} executes the process when threads are available in the pool |
| * @param pythonCmdThreads list of {@link CommandLineThreadWrapper} |
| * @param timeout {Integer} time to wait for the threads to join |
| * @throws SystemException |
| */ |
| private void waitForAllThreadsToJoin(ExecutorService processExecutor, List <CommandLineThreadWrapper> pythonCmdThreads, Integer timeout) |
| throws SystemException { |
| processExecutor.shutdown(); |
| try { |
| processExecutor.awaitTermination(timeout, TimeUnit.MILLISECONDS); |
| processExecutor.shutdownNow(); |
| for (CommandLineThreadWrapper commandLineThreadWrapper: pythonCmdThreads) { |
| CommandLineThread commandLineThread = commandLineThreadWrapper.getCommandLineThread(); |
| try { |
| Integer returnCode = commandLineThread.getReturnCode(); |
| if (returnCode == null) { |
| throw new TimeoutException(); |
| } else if (returnCode != 0) { |
| throw new ExecutionException(String.format("Execution of \"%s\" returned %d.", commandLineThreadWrapper.getCommandLine(), returnCode), |
| new Throwable(commandLineThreadWrapper.getLogStream().getOutput())); |
| } |
| } catch (TimeoutException e) { |
| LOG.error("Generate client configs script was killed due to timeout ", e); |
| throw new SystemException("Generate client configs script was killed due to timeout ", e); |
| } catch (ExecutionException e) { |
| LOG.error(e.getMessage(), e); |
| throw new SystemException(e.getMessage() + " " + e.getCause()); |
| } finally { |
| commandLineThreadWrapper.getProcess().destroy(); |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| processExecutor.shutdownNow(); |
| LOG.error("Failed to run generate client configs script for components"); |
| throw new SystemException("Failed to run generate client configs script for components"); |
| } |
| } |
| |
| |
| /** |
| * wrapper class that holds all information and references to the thread and python process |
| * started to create compressed configuration config files |
| */ |
| private static class CommandLineThreadWrapper { |
| |
| private String commandLine; |
| |
| private CommandLineThread commandLineThread; |
| |
| private Thread logStreamThread; |
| |
| private LogStreamReader logStream; |
| |
| private Process process; |
| |
| private CommandLineThreadWrapper(String commandLine, CommandLineThread commandLineThread, |
| Thread logStreamThread, LogStreamReader logStream, Process process) { |
| this.commandLine = commandLine; |
| this.commandLineThread = commandLineThread; |
| this.logStreamThread = logStreamThread; |
| this.logStream = logStream; |
| this.process = process; |
| } |
| |
| /** |
| * Returns commandLine that starts pyton process |
| * @return {@link #commandLine} |
| */ |
| private String getCommandLine() { |
| return commandLine; |
| } |
| |
| /** |
| * Sets {@link #commandLine} |
| * @param commandLine {String} |
| */ |
| private void setCommandLine(String commandLine) { |
| this.commandLine = commandLine; |
| } |
| |
| /** |
| * Returns thread that starts and waits for python process to complete |
| * @return {@link #commandLineThread} |
| */ |
| private CommandLineThread getCommandLineThread() { |
| return commandLineThread; |
| } |
| |
| /** |
| * Sets {@link #commandLineThread} |
| * @param commandLineThread {@link CommandLineThread} |
| */ |
| private void setCommandLineThread(CommandLineThread commandLineThread) { |
| this.commandLineThread = commandLineThread; |
| } |
| |
| /** |
| * Returns thread that starts and waits to get the output and error log stream from the python process |
| * @return {@link #logStreamThread} |
| */ |
| private Thread getLogStreamThread() { |
| return logStreamThread; |
| } |
| |
| /** |
| * Sets {@link #logStreamThread} |
| * @param logStreamThread {@link Thread} |
| */ |
| private void setLogStreamThread(Thread logStreamThread) { |
| this.logStreamThread = logStreamThread; |
| } |
| |
| /** |
| * Returns log stream from the python subprocess |
| * @return {@link #logStream} |
| */ |
| private LogStreamReader getLogStream() { |
| return logStream; |
| } |
| |
| /** |
| * Sets {@link #logStream} |
| * @param logStream {@link LogStreamReader} |
| */ |
| private void setLogStream(LogStreamReader logStream) { |
| this.logStream = logStream; |
| } |
| |
| /** |
| * Returns python process |
| * @return {@link #process} |
| */ |
| private Process getProcess() { |
| return process; |
| } |
| |
| /** |
| * Sets {@link #process} |
| * @param process {@link Process} |
| */ |
| private void setProcess(Process process) { |
| this.process = process; |
| } |
| } |
| |
| /** |
| * Class to run python process to compress config files as seperate thread |
| */ |
| private static class CommandLineThread extends Thread { |
| private final Process process; |
| private Integer returnCode; |
| |
| private void setReturnCode(Integer exit) { |
| returnCode = exit; |
| } |
| |
| private Integer getReturnCode() { |
| return returnCode; |
| } |
| |
| private CommandLineThread(Process process) { |
| this.process = process; |
| } |
| |
| |
| @Override |
| public void run() { |
| try { |
| setReturnCode(process.waitFor()); |
| } catch (InterruptedException ignore) { |
| return; |
| } |
| } |
| |
| } |
| |
| /** |
| * Class to collect output and error stream of python subprocess |
| */ |
| private class LogStreamReader implements Runnable { |
| |
| private BufferedReader reader; |
| private StringBuilder output; |
| |
| public LogStreamReader(InputStream is) { |
| reader = new BufferedReader(new InputStreamReader(is)); |
| output = new StringBuilder(""); |
| } |
| |
| public String getOutput() { |
| return output.toString(); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| String line = reader.readLine(); |
| while (line != null) { |
| output.append(line); |
| output.append("\n"); |
| line = reader.readLine(); |
| } |
| reader.close(); |
| } catch (IOException e) { |
| LOG.warn(e.getMessage()); |
| } |
| } |
| } |
| |
| |
| /** |
| * This is the utility class to do further compression related operations |
| * on already compressed component configuration files |
| */ |
| protected static class TarUtils { |
| |
| /** |
| * temporary dir where tar files are saved on ambari server |
| */ |
| private String tmpDir; |
| |
| /** |
| * name of the compressed file that should be created |
| */ |
| private String fileName; |
| |
| |
| private List<ServiceComponentHostResponse> serviceComponentHostResponses; |
| |
| /** |
| * Constructor sets all the fields of the class |
| * @param tmpDir {String} |
| * @param fileName {String} |
| * @param serviceComponentHostResponses {List} |
| */ |
| TarUtils(String tmpDir, String fileName, List<ServiceComponentHostResponse> serviceComponentHostResponses) { |
| this.tmpDir = tmpDir; |
| this.fileName = fileName; |
| this.serviceComponentHostResponses = serviceComponentHostResponses; |
| } |
| |
| /** |
| * creates single compressed file from the list of existing compressed file |
| * @throws SystemException |
| */ |
| protected void tarConfigFiles() |
| throws SystemException { |
| |
| try { |
| File compressedOutputFile = new File(tmpDir, fileName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION); |
| FileOutputStream fOut = new FileOutputStream(compressedOutputFile); |
| BufferedOutputStream bOut = new BufferedOutputStream(fOut); |
| GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bOut); |
| TarArchiveOutputStream tOut = new TarArchiveOutputStream(gzOut); |
| |
| try { |
| for (ServiceComponentHostResponse schResponse : serviceComponentHostResponses) { |
| String componentName = schResponse.getComponentName(); |
| File compressedInputFile = new File(tmpDir, componentName + "-configs" + Configuration.DEF_ARCHIVE_EXTENSION); |
| FileInputStream fin = new FileInputStream(compressedInputFile); |
| BufferedInputStream bIn = new BufferedInputStream(fin); |
| GzipCompressorInputStream gzIn = new GzipCompressorInputStream(bIn); |
| TarArchiveInputStream tarIn = new TarArchiveInputStream(gzIn); |
| TarArchiveEntry entry = null; |
| try { |
| while ((entry = tarIn.getNextTarEntry()) != null) { |
| entry.setName(componentName + File.separator + entry.getName()); |
| tOut.putArchiveEntry(entry); |
| if (entry.isFile()) { |
| IOUtils.copy(tarIn, tOut); |
| } |
| tOut.closeArchiveEntry(); |
| } |
| } catch (Exception e) { |
| throw new SystemException(e.getMessage(), e); |
| } finally { |
| tarIn.close(); |
| gzIn.close(); |
| bIn.close(); |
| fin.close(); |
| } |
| } |
| } finally { |
| tOut.finish(); |
| tOut.close(); |
| } |
| } catch (Exception e) { |
| throw new SystemException(e.getMessage(), e); |
| } |
| } |
| } |
| |
| @Override |
| public RequestStatus updateResources(final Request request, Predicate predicate) |
| throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { |
| |
| throw new SystemException("The request is not supported"); |
| } |
| |
| @Override |
| public RequestStatus deleteResources(Request request, Predicate predicate) |
| throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException { |
| |
| throw new SystemException("The request is not supported"); |
| } |
| |
| |
| // ----- AbstractResourceProvider ------------------------------------------ |
| |
| @Override |
| protected Set<String> getPKPropertyIds() { |
| return pkPropertyIds; |
| } |
| |
| |
| // ----- utility methods --------------------------------------------------- |
| |
| /** |
| * Get a component request object from a map of property values. |
| * |
| * @param properties the predicate |
| * @return the component request object |
| */ |
| |
| private ServiceComponentHostRequest getRequest(Map<String, Object> properties) { |
| return new ServiceComponentHostRequest( |
| (String) properties.get(COMPONENT_CLUSTER_NAME_PROPERTY_ID), |
| (String) properties.get(COMPONENT_SERVICE_NAME_PROPERTY_ID), |
| (String) properties.get(COMPONENT_COMPONENT_NAME_PROPERTY_ID), |
| (String) properties.get(HOST_COMPONENT_HOST_NAME_PROPERTY_ID), |
| null); |
| } |
| |
| |
| protected ServiceOsSpecific populateServicePackagesInfo(ServiceInfo serviceInfo, Map<String, String> hostParams, |
| String osFamily) { |
| ServiceOsSpecific hostOs = new ServiceOsSpecific(osFamily); |
| List<ServiceOsSpecific> foundedOSSpecifics = getOSSpecificsByFamily(serviceInfo.getOsSpecifics(), osFamily); |
| if (!foundedOSSpecifics.isEmpty()) { |
| for (ServiceOsSpecific osSpecific : foundedOSSpecifics) { |
| hostOs.addPackages(osSpecific.getPackages()); |
| } |
| // Choose repo that is relevant for host |
| ServiceOsSpecific.Repo serviceRepo = hostOs.getRepo(); |
| if (serviceRepo != null) { |
| String serviceRepoInfo = gson.toJson(serviceRepo); |
| hostParams.put(SERVICE_REPO_INFO, serviceRepoInfo); |
| } |
| } |
| |
| return hostOs; |
| } |
| |
| private List<ServiceOsSpecific> getOSSpecificsByFamily(Map<String, ServiceOsSpecific> osSpecifics, String osFamily) { |
| List<ServiceOsSpecific> foundedOSSpecifics = new ArrayList<>(); |
| for (Map.Entry<String, ServiceOsSpecific> osSpecific : osSpecifics.entrySet()) { |
| if (osSpecific.getKey().indexOf(osFamily) != -1) { |
| foundedOSSpecifics.add(osSpecific.getValue()); |
| } |
| } |
| return foundedOSSpecifics; |
| } |
| |
| } |