blob: b15cce0d6813375778abf1d2bbe09e83734d19f0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import org.apache.oozie.util.XLog;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class SparkConfigurationService implements Service {
private static XLog LOG = XLog.getLog(SparkConfigurationService.class);
public static final String CONF_PREFIX = Service.CONF_PREFIX + "SparkConfigurationService.";
public static final String SPARK_CONFIGURATIONS = CONF_PREFIX + "spark.configurations";
public static final String SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR
= SPARK_CONFIGURATIONS + ".ignore.spark.yarn.jar";
public static final String SPARK_CONFIGURATIONS_BLACKLIST = SPARK_CONFIGURATIONS + ".blacklist";
private static final String SPARK_CONFIG_FILE = "spark-defaults.conf";
private static final String SPARK_YARN_JAR_PROP = "spark.yarn.jar";
private static final String HOST_WILDCARD = "*";
private Map<String, Properties> sparkConfigs;
private Set<String> blacklist;
@Override
public void init(Services services) throws ServiceException {
loadBlacklist();
loadSparkConfigs();
}
@Override
public void destroy() {
sparkConfigs.clear();
blacklist.clear();
}
@Override
public Class<? extends Service> getInterface() {
return SparkConfigurationService.class;
}
private void loadBlacklist() {
blacklist = new HashSet<>();
for(String s : ConfigurationService.getStrings(SPARK_CONFIGURATIONS_BLACKLIST)) {
blacklist.add(s.trim());
}
// spark.yarn.jar is added if the old property to ignore it is set.
if(ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR)){
LOG.warn("Deprecated property found in configuration: " + SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR +
"Use "+SPARK_CONFIGURATIONS_BLACKLIST+" instead.");
blacklist.add(SPARK_YARN_JAR_PROP);
}
}
private void loadSparkConfigs() throws ServiceException {
sparkConfigs = new HashMap<>();
String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATIONS);
for (String confDef : confDefs) {
readEntry(confDef.trim());
}
}
private void readEntry(String confDef) throws ServiceException {
String[] parts = confDef.split("=");
if (parts.length == 2) {
String hostPort = parts[0];
String confDir = parts[1];
File dir = getAbsoluteDir(confDir);
if (dir.exists()) {
Properties sparkDefaults = readSparkConfigFile(hostPort, dir);
filterBlackList(sparkDefaults);
if(!sparkDefaults.isEmpty()) {
sparkConfigs.put(hostPort, sparkDefaults);
}
} else {
LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
hostPort, dir.getAbsolutePath());
}
} else {
LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef);
}
}
private File getAbsoluteDir(String confDir) throws ServiceException {
File dir = new File(confDir);
if (!dir.isAbsolute()) {
File configDir = new File(ConfigurationService.getConfigurationDirectory());
dir = new File(configDir, confDir);
}
return dir;
}
private void filterBlackList(Properties sparkDefaults) {
for(String property : blacklist){
sparkDefaults.remove(property);
}
}
private Properties readSparkConfigFile(String hostPort, File dir) {
File file = new File(dir, SPARK_CONFIG_FILE);
Properties props = new Properties();
if (file.exists()) {
try (FileInputStream stream = new FileInputStream(file);
InputStreamReader reader = new InputStreamReader(stream, StandardCharsets.UTF_8.name())) {
props.load(reader);
LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath());
} catch (IOException ioe) {
LOG.warn("Spark Configuration could not be loaded for {0}: {1}",
hostPort, ioe.getMessage(), ioe);
}
} else {
LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
hostPort, file.getAbsolutePath());
}
return props;
}
public Properties getSparkConfig(String resourceManagerHostPort) {
resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null;
Properties config = sparkConfigs.get(resourceManagerHostPort);
if (config == null) {
config = sparkConfigs.get(HOST_WILDCARD);
if (config == null) {
config = new Properties();
}
}
return config;
}
}