blob: b79d99728e882ff8d6128d3a106adfb88bd73c30 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import org.apache.oozie.test.XTestCase;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties;
public class TestSparkConfigurationService extends XTestCase {
protected void setUp() throws Exception {
super.setUp();
Services services = new Services();
services.init();
}
protected void tearDown() throws Exception {
Services.get().destroy();
super.tearDown();
}
public void testSparkConfigsEmpty() throws Exception {
SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
scs.destroy();
ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", "");
scs.init(Services.get());
Properties sparkConfigs = scs.getSparkConfig("foo");
assertEquals(0, sparkConfigs.size());
}
public void testSparkConfigs() throws Exception {
File sparkConf1Dir = createSparkConfsInDir("spark-conf-1", "a", "A", "b", "B", "spark.yarn.jar", "foo");
File sparkConf3Dir = createSparkConfsInDir("spark-conf-3");
File sparkConf4Dir = createSparkConfsInDir("spark-conf-4", "y", "Y", "z", "Z", "spark.yarn.jars", "foo2");
SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
scs.destroy();
ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
"rm1=" + sparkConf1Dir.getAbsolutePath() + // absolute path
",rm2" + // invalid entry
",rm3=" + sparkConf3Dir.getAbsolutePath() + // missing file
",rm4=" + sparkConf4Dir.getName()); // relative path
scs.init(Services.get());
Properties sparkConfigs = scs.getSparkConfig("foo");
assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size());
sparkConfigs = scs.getSparkConfig("rm1");
assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size());
assertEquals("A", sparkConfigs.get("a"));
assertEquals("B", sparkConfigs.get("b"));
sparkConfigs = scs.getSparkConfig("rm2");
assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size());
sparkConfigs = scs.getSparkConfig("rm3");
assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size());
sparkConfigs = scs.getSparkConfig("rm4");
assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size());
assertEquals("Y", sparkConfigs.get("y"));
assertEquals("Z", sparkConfigs.get("z"));
scs.destroy();
// Setting this to false should make it not ignore spark.yarn.jar
ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar",
false);
ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations.blacklist", " ");
scs.init(Services.get());
sparkConfigs = scs.getSparkConfig("rm1");
assertEquals(sparkConfigs.toString(), 3, sparkConfigs.size());
assertEquals("A", sparkConfigs.get("a"));
assertEquals("B", sparkConfigs.get("b"));
assertEquals("foo", sparkConfigs.get("spark.yarn.jar"));
ConfigurationService.setBoolean(
"oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar", true);
ConfigurationService.set(
"oozie.service.SparkConfigurationService.spark.configurations.blacklist", "spark.yarn.jar,spark.yarn.jars");
scs.destroy();
ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
"rm1=" + sparkConf1Dir.getAbsolutePath() + // define
",*=" + sparkConf4Dir.getAbsolutePath()); // wildcard
scs.init(Services.get());
sparkConfigs = scs.getSparkConfig("rm1");
assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size());
assertEquals("A", sparkConfigs.get("a"));
assertEquals("B", sparkConfigs.get("b"));
sparkConfigs = scs.getSparkConfig("rm2");
assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size());
assertEquals("Y", sparkConfigs.get("y"));
assertEquals("Z", sparkConfigs.get("z"));
sparkConfigs = scs.getSparkConfig("foo");
assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size());
assertEquals("Y", sparkConfigs.get("y"));
assertEquals("Z", sparkConfigs.get("z"));
}
private File createSparkConfsInDir(String directory, String... props) throws IOException {
File sparkConf1Dir = new File(getTestCaseConfDir(), directory);
sparkConf1Dir.mkdirs();
File sparkConf1 = new File(sparkConf1Dir, "spark-defaults.conf");
Properties sparkConf1Props = new Properties();
for (int i = 0; i < props.length; i += 2) {
sparkConf1Props.setProperty(props[i], props[i + 1]);
}
if (!sparkConf1Props.isEmpty()) {
try (FileOutputStream fos = new FileOutputStream(sparkConf1)) {
sparkConf1Props.store(fos, "");
}
}
return sparkConf1Dir;
}
public void testBlackList() throws Exception {
File sparkConf1Dir = createSparkConfsInDir("spark-conf-1", "a", "A", "b", "B",
"spark.yarn.jar", "foo");
File sparkConf3Dir = createSparkConfsInDir("spark-conf-3");
File sparkConf4Dir = createSparkConfsInDir("spark-conf-4", "y", "Y", "z", "Z",
"spark.yarn.jars", "foo2");
SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
scs.destroy();
ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
"rm1=" + sparkConf1Dir.getAbsolutePath() + // absolute path
",rm2" + // invalid entry
",rm3=" + sparkConf3Dir.getAbsolutePath() + // missing file
",rm4=" + sparkConf4Dir.getName()); // relative path
ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar",
false);
ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations.blacklist", "a,z");
scs.init(Services.get());
Properties sparkConfigs = scs.getSparkConfig("foo");
assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size());
sparkConfigs = scs.getSparkConfig("rm1");
assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size());
assertEquals("B", sparkConfigs.get("b"));
assertEquals("foo", sparkConfigs.get("spark.yarn.jar"));
sparkConfigs = scs.getSparkConfig("rm2");
assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size());
sparkConfigs = scs.getSparkConfig("rm3");
assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size());
sparkConfigs = scs.getSparkConfig("rm4");
assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size());
assertEquals("Y", sparkConfigs.get("y"));
assertEquals("foo2", sparkConfigs.get("spark.yarn.jars"));
scs.destroy();
}
}