blob: cf32880920345456e30c0547cf6608d3003ec6c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_PMEM_CHECK_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLED;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_ENABLED;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.AM_RESOURCE_MEM;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ServiceTestUtils {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceTestUtils.class);
private MiniYARNCluster yarnCluster = null;
private MiniDFSCluster hdfsCluster = null;
private FileSystem fs = null;
private Configuration conf = null;
public static final int NUM_NMS = 1;
private File basedir;
public static final JsonSerDeser<Service> JSON_SER_DESER =
new JsonSerDeser<>(Service.class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
// Example service definition
// 2 components, each of which has 2 containers.
protected Service createExampleApplication() {
Service exampleApp = new Service();
exampleApp.setName("example-app");
exampleApp.addComponent(createComponent("compa"));
exampleApp.addComponent(createComponent("compb"));
return exampleApp;
}
public static Component createComponent(String name) {
return createComponent(name, 2L, "sleep 1000");
}
protected static Component createComponent(String name, long numContainers,
String command) {
Component comp1 = new Component();
comp1.setNumberOfContainers(numContainers);
comp1.setLaunchCommand(command);
comp1.setName(name);
Resource resource = new Resource();
comp1.setResource(resource);
resource.setMemory("128");
resource.setCpus(1);
return comp1;
}
public static SliderFileSystem initMockFs() throws IOException {
return initMockFs(null);
}
public static SliderFileSystem initMockFs(Service ext) throws IOException {
SliderFileSystem sfs = mock(SliderFileSystem.class);
FileSystem mockFs = mock(FileSystem.class);
JsonSerDeser<Service> jsonSerDeser = mock(JsonSerDeser.class);
when(sfs.getFileSystem()).thenReturn(mockFs);
when(sfs.buildClusterDirPath(anyObject())).thenReturn(
new Path("cluster_dir_path"));
if (ext != null) {
when(jsonSerDeser.load(anyObject(), anyObject())).thenReturn(ext);
}
ServiceApiUtil.setJsonSerDeser(jsonSerDeser);
return sfs;
}
protected void setConf(YarnConfiguration conf) {
this.conf = conf;
}
protected Configuration getConf() {
return conf;
}
protected FileSystem getFS() {
return fs;
}
protected MiniYARNCluster getYarnCluster() {
return yarnCluster;
}
protected void setupInternal(int numNodeManager)
throws Exception {
LOG.info("Starting up YARN cluster");
if (conf == null) {
setConf(new YarnConfiguration());
}
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
// reduce the teardown waiting time
conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
conf.set("yarn.log.dir", "target");
// mark if we need to launch the v1 timeline server
// disable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
// Enable ContainersMonitorImpl
conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
LinuxResourceCalculatorPlugin.class.getName());
conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
ProcfsBasedProcessTree.class.getName());
conf.setBoolean(
YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true);
conf.setBoolean(TIMELINE_SERVICE_ENABLED, false);
conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
conf.setLong(DEBUG_NM_DELETE_DELAY_SEC, 60000);
conf.setLong(AM_RESOURCE_MEM, 526);
conf.setLong(YarnServiceConf.READINESS_CHECK_INTERVAL, 5);
// Disable vmem check to disallow NM killing the container
conf.setBoolean(NM_VMEM_CHECK_ENABLED, false);
conf.setBoolean(NM_PMEM_CHECK_ENABLED, false);
// setup zk cluster
TestingCluster zkCluster;
zkCluster = new TestingCluster(1);
zkCluster.start();
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
LOG.info("ZK cluster: " + zkCluster.getConnectString());
fs = FileSystem.get(conf);
basedir = new File("target", "apps");
if (basedir.exists()) {
FileUtils.deleteDirectory(basedir);
} else {
basedir.mkdirs();
}
conf.set(YARN_SERVICE_BASE_PATH, basedir.getAbsolutePath());
if (yarnCluster == null) {
yarnCluster =
new MiniYARNCluster(TestYarnNativeServices.class.getSimpleName(), 1,
numNodeManager, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
waitForNMsToRegister();
URL url = Thread.currentThread().getContextClassLoader()
.getResource("yarn-site.xml");
if (url == null) {
throw new RuntimeException(
"Could not find 'yarn-site.xml' dummy file in classpath");
}
Configuration yarnClusterConfig = yarnCluster.getConfig();
yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
new File(url.getPath()).getParent());
//write the document to a buffer (not directly to the file, as that
//can cause the file being written to get read -which will then fail.
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
yarnClusterConfig.writeXml(bytesOut);
bytesOut.close();
//write the bytes to the file in the classpath
OutputStream os = new FileOutputStream(new File(url.getPath()));
os.write(bytesOut.toByteArray());
os.close();
LOG.info("Write yarn-site.xml configs to: " + url);
}
if (hdfsCluster == null) {
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
.numDataNodes(1).build();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
}
}
public void shutdown() throws IOException {
if (yarnCluster != null) {
try {
yarnCluster.stop();
} finally {
yarnCluster = null;
}
}
if (hdfsCluster != null) {
try {
hdfsCluster.shutdown();
} finally {
hdfsCluster = null;
}
}
if (basedir != null) {
FileUtils.deleteDirectory(basedir);
}
SliderFileSystem sfs = new SliderFileSystem(conf);
Path appDir = sfs.getBaseApplicationPath();
sfs.getFileSystem().delete(appDir, true);
}
private void waitForNMsToRegister() throws Exception {
int sec = 60;
while (sec >= 0) {
if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
>= NUM_NMS) {
break;
}
Thread.sleep(1000);
sec--;
}
}
}