blob: fb02593e980adb74a0c1175f9ec9bf5c6c8597a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URL;
import java.util.List;
import java.util.Properties;
import javax.ws.rs.core.MediaType;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.common.util.JarHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.util.Records;
import com.google.common.collect.Lists;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.client.StramClientUtils.YarnClientHelper;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.webapp.StramWebServices;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* The purpose of this test is to verify basic streaming application deployment
* on a distributed yarn cluster. Specifically this exercises the application master,
* which is not used in other tests that rely on local mode.
*/
public class StramMiniClusterTest
{
private static final Logger LOG = LoggerFactory.getLogger(StramMiniClusterTest.class);
protected static MiniYARNCluster yarnCluster = null;
protected static Configuration conf = new Configuration();
@Rule
public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
@Before
public void setupEachTime() throws IOException
{
StreamingContainer.eventloop.start();
}
@After
public void teardown()
{
StreamingContainer.eventloop.stop();
}
@BeforeClass
public static void setup() throws InterruptedException, IOException
{
LOG.info("Starting up YARN cluster");
conf = StramClientUtils.addDTDefaultResources(conf);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 64);
conf.setInt("yarn.nodemanager.vmem-pmem-ratio", 20); // workaround to avoid containers being killed because java allocated too much vmem
conf.set("yarn.scheduler.capacity.root.queues", "default");
conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false);
conf.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100.0F);
conf.set(YarnConfiguration.NM_ADMIN_USER_ENV, String.format("JAVA_HOME=%s,CLASSPATH=%s", System.getProperty("java.home"), getTestRuntimeClasspath()));
conf.set(YarnConfiguration.NM_ENV_WHITELIST, YarnConfiguration.DEFAULT_NM_ENV_WHITELIST.replaceAll("JAVA_HOME,*", ""));
if (yarnCluster == null) {
yarnCluster = new MiniYARNCluster(StramMiniClusterTest.class.getName(),
1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
}
conf = yarnCluster.getConfig();
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
if (url == null) {
LOG.error("Could not find 'yarn-site.xml' dummy file in classpath");
throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
}
File confFile = new File(url.getPath());
yarnCluster.getConfig().set("yarn.application.classpath", confFile.getParent());
OutputStream os = new FileOutputStream(confFile);
LOG.debug("Conf file: {}", confFile);
yarnCluster.getConfig().writeXml(os);
os.close();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
}
}
@AfterClass
public static void tearDown() throws IOException
{
if (yarnCluster != null) {
yarnCluster.stop();
yarnCluster = null;
}
}
private void checkNodeState() throws YarnException
{
GetClusterNodesRequest request = Records.newRecord(GetClusterNodesRequest.class);
ClientRMService clientRMService = yarnCluster.getResourceManager().getClientRMService();
GetClusterNodesResponse response = clientRMService.getClusterNodes(request);
List<NodeReport> nodeReports = response.getNodeReports();
LOG.info("{}", nodeReports);
for (NodeReport nr: nodeReports) {
if (!nr.getNodeState().isUnusable()) {
return;
}
}
fail("Yarn Mini cluster should have at least one usable node.");
}
@Test
public void testSetupShutdown() throws Exception
{
checkNodeState();
JarHelper jarHelper = new JarHelper();
LOG.info("engine jar: {}", jarHelper.getJar(StreamingAppMaster.class));
LOG.info("engine test jar: {}", jarHelper.getJar(StramMiniClusterTest.class));
// create test application
Properties dagProps = new Properties();
// input module (ensure shutdown works while windows are generated)
dagProps.put(StreamingApplication.APEX_PREFIX + "operator.numGen.classname", TestGeneratorInputOperator.class.getName());
dagProps.put(StreamingApplication.APEX_PREFIX + "operator.numGen.maxTuples", "1");
dagProps.put(StreamingApplication.APEX_PREFIX + "operator.module1.classname", GenericTestOperator.class.getName());
dagProps.put(StreamingApplication.APEX_PREFIX + "operator.module2.classname", GenericTestOperator.class.getName());
dagProps.put(StreamingApplication.APEX_PREFIX + "stream.fromNumGen.source", "numGen.outport");
dagProps.put(StreamingApplication.APEX_PREFIX + "stream.fromNumGen.sinks", "module1.inport1");
dagProps.put(StreamingApplication.APEX_PREFIX + "stream.n1n2.source", "module1.outport1");
dagProps.put(StreamingApplication.APEX_PREFIX + "stream.n1n2.sinks", "module2.inport1");
dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.MASTER_MEMORY_MB.getName(), "128");
dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.CONTAINER_JVM_OPTIONS.getName(), "-Dlog4j.properties=custom_log4j.properties");
dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.MEMORY_MB.getName(), "64");
dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*." + OperatorContext.VCORES.getName(), "1");
dagProps.setProperty(StreamingApplication.APEX_PREFIX + "operator.*.port.*." + Context.PortContext.BUFFER_MEMORY_MB.getName(), "32");
dagProps.setProperty(StreamingApplication.APEX_PREFIX + LogicalPlan.DEBUG.getName(), "true");
LOG.info("dag properties: {}", dagProps);
LOG.info("Initializing Client");
LogicalPlanConfiguration tb = new LogicalPlanConfiguration(conf);
tb.addFromProperties(dagProps, null);
LogicalPlan dag = createDAG(tb);
Configuration yarnConf = new Configuration(yarnCluster.getConfig());
StramClient client = new StramClient(yarnConf, dag);
try {
client.start();
LOG.info("Running client");
client.startApplication();
boolean result = client.monitorApplication();
LOG.info("Client run completed. Result=" + result);
Assert.assertTrue(result);
} finally {
client.stop();
}
}
private LogicalPlan createDAG(LogicalPlanConfiguration lpc) throws Exception
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.toURI().toString());
lpc.prepareDAG(dag, null, "testApp");
dag.validate();
Assert.assertEquals("", Integer.valueOf(128), dag.getValue(DAG.MASTER_MEMORY_MB));
Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", dag.getValue(DAG.CONTAINER_JVM_OPTIONS));
return dag;
}
/**
* Verify the web service deployment and lifecycle functionality
*
* @throws Exception
*/
@Ignore //disabled due to web service init delay issue
@Test
public void testWebService() throws Exception
{
// single container topology of inline input and module
Properties props = new Properties();
props.put(StreamingApplication.APEX_PREFIX + "stream.input.classname", TestGeneratorInputOperator.class.getName());
props.put(StreamingApplication.APEX_PREFIX + "stream.input.outputNode", "module1");
props.put(StreamingApplication.APEX_PREFIX + "module.module1.classname", GenericTestOperator.class.getName());
LOG.info("Initializing Client");
LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
tb.addFromProperties(props, null);
StramClient client = new StramClient(new Configuration(yarnCluster.getConfig()), createDAG(tb));
try {
client.start();
client.startApplication();
// attempt web service connection
ApplicationReport appReport = client.getApplicationReport();
Thread.sleep(5000); // delay to give web service time to fully initialize
Client wsClient = Client.create();
wsClient.setFollowRedirects(true);
WebResource r = wsClient.resource("http://" + appReport.getTrackingUrl()).path(StramWebServices.PATH).path(StramWebServices.PATH_INFO);
LOG.info("Requesting: " + r.getURI());
ClientResponse response = r.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
LOG.info("Got response: " + json.toString());
assertEquals("incorrect number of elements", 1, json.length());
assertEquals("appId", appReport.getApplicationId().toString(), json.get("id"));
r = wsClient.resource("http://" + appReport.getTrackingUrl()).path(StramWebServices.PATH).path(StramWebServices.PATH_PHYSICAL_PLAN_OPERATORS);
LOG.info("Requesting: " + r.getURI());
response = r.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
LOG.info("Got response: " + json.toString());
} finally {
//LOG.info("waiting...");
//synchronized (this) {
// this.wait();
//}
//boolean result = client.monitorApplication();
client.killApplication();
client.stop();
}
}
private static String getTestRuntimeClasspath()
{
InputStream classpathFileStream = null;
BufferedReader reader = null;
String envClassPath = "";
LOG.info("Trying to generate classpath for app master from current thread's classpath");
try {
// Create classpath from generated classpath
// Check maven pom.xml for generated classpath info
// Works in tests where compile time env is same as runtime.
ClassLoader thisClassLoader =
Thread.currentThread().getContextClassLoader();
String generatedClasspathFile = "mvn-generated-classpath";
classpathFileStream =
thisClassLoader.getResourceAsStream(generatedClasspathFile);
if (classpathFileStream == null) {
LOG.info("Could not load classpath resource " + generatedClasspathFile);
return envClassPath;
}
LOG.info("Readable bytes from stream=" + classpathFileStream.available());
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
String cp = reader.readLine();
if (cp != null) {
envClassPath += cp.trim() + ":";
}
// Put the file itself on classpath for tasks.
envClassPath += thisClassLoader.getResource(generatedClasspathFile).getFile();
} catch (IOException e) {
LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage());
}
try {
if (classpathFileStream != null) {
classpathFileStream.close();
}
if (reader != null) {
reader.close();
}
} catch (IOException e) {
LOG.info("Failed to close class path file stream or reader. Error=" + e.getMessage());
}
return envClassPath;
}
public static class FailingOperator extends BaseOperator implements InputOperator
{
@Override
public void emitTuples()
{
throw new RuntimeException("Operator failure");
}
}
@Test
public void testOperatorFailureRecovery() throws Exception
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.toURI().toString());
FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);
dag.getContextAttributes(badOperator).put(OperatorContext.RECOVERY_ATTEMPTS, 1);
LOG.info("Initializing Client");
StramClient client = new StramClient(conf, dag);
try {
client.start();
client.startApplication();
client.setClientTimeout(120000);
boolean result = client.monitorApplication();
LOG.info("Client run completed. Result=" + result);
Assert.assertFalse("should fail", result);
ApplicationReport ar = client.getApplicationReport();
Assert.assertEquals("should fail", FinalApplicationStatus.FAILED, ar.getFinalApplicationStatus());
// unable to get the diagnostics message set by the AM here - see YARN-208
// diagnostics message does not make it here even with Hadoop 2.2 (but works on standalone cluster)
//Assert.assertTrue("appReport " + ar, ar.getDiagnostics().contains("badOperator"));
} finally {
client.stop();
}
}
protected class ShipJarsBaseOperator extends BaseOperator
{
}
protected class ShipJarsOperator extends ShipJarsBaseOperator
{
}
@Ignore
@Test
public void testUnmanagedAM() throws Exception
{
new InlineAM(conf)
{
@Override
@SuppressWarnings("SleepWhileInLoop")
public void runAM(ApplicationAttemptId attemptId) throws Exception
{
LOG.debug("AM running {}", attemptId);
//AMRMClient amRmClient = new AMRMClientImpl(attemptId);
//amRmClient.init(conf);
//amRmClient.start();
YarnClientHelper yarnClient = new YarnClientHelper(conf);
ApplicationMasterProtocol resourceManager = yarnClient.connectToRM();
// register with the RM (LAUNCHED -> RUNNING)
RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
resourceManager.registerApplicationMaster(appMasterRequest);
// AM specific logic
/*
int containerCount = 1;
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(1500);
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(10);
String[] hosts = {"vm1"};
String[] racks = {"somerack"};
AMRMClient.ContainerRequest req = new AMRMClient.ContainerRequest(capability, hosts, racks, priority, containerCount);
amRmClient.addContainerRequest(req);
for (int i=0; i<100; i++) {
AllocateResponse ar = amRmClient.allocate(0);
Thread.sleep(1000);
LOG.debug("allocateResponse: {}" , ar);
}
*/
int responseId = 0;
AllocateRequest req = Records.newRecord(AllocateRequest.class);
req.setResponseId(responseId++);
List<ResourceRequest> lr = Lists.newArrayList();
lr.add(setupContainerAskForRM("hdev-vm", 1, 128, 10));
lr.add(setupContainerAskForRM("/default-rack", 1, 128, 10));
lr.add(setupContainerAskForRM("*", 1, 128, 10));
req.setAskList(lr);
LOG.info("Requesting: " + req.getAskList());
resourceManager.allocate(req);
for (int i = 0; i < 100; i++) {
req = Records.newRecord(AllocateRequest.class);
req.setResponseId(responseId++);
AllocateResponse ar = resourceManager.allocate(req);
sleep(1000);
LOG.debug("allocateResponse: {}", ar);
}
// unregister from RM
FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
finishReq.setDiagnostics("testUnmanagedAM finished");
resourceManager.finishApplicationMaster(finishReq);
}
private ResourceRequest setupContainerAskForRM(String resourceName, int numContainers, int containerMemory, int priority)
{
ResourceRequest request = Records.newRecord(ResourceRequest.class);
// setup requirements for hosts
// whether a particular rack/host is needed
// Refer to apis under org.apache.hadoop.net for more
// details on how to get figure out rack/host mapping.
// using * as any host will do for the distributed shell app
request.setResourceName(resourceName);
// set no. of containers needed
request.setNumContainers(numContainers);
// set the priority for the request
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(priority);
request.setPriority(pri);
// Set up resource type requirements
// For now, only memory is supported so we set memory requirements
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(containerMemory);
request.setCapability(capability);
return request;
}
}.run();
}
@Ignore
@Test
public void testUnmanagedAM2() throws Exception
{
new InlineAM(conf)
{
@Override
@SuppressWarnings("SleepWhileInLoop")
public void runAM(ApplicationAttemptId attemptId) throws Exception
{
LOG.debug("AM running {}", attemptId);
AMRMClient<ContainerRequest> amRmClient = AMRMClient.createAMRMClient();
amRmClient.init(conf);
amRmClient.start();
// register with the RM (LAUNCHED -> RUNNING)
amRmClient.registerApplicationMaster("", 0, null);
// AM specific logic
String[] hosts = {"vm1"};
String[] racks = {"somerack"};
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(1000);
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(10);
AMRMClient.ContainerRequest req = new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
amRmClient.addContainerRequest(req);
amRmClient.addContainerRequest(req);
/*
capability = Records.newRecord(Resource.class);
capability.setMemory(5512);
priority = Records.newRecord(Priority.class);
priority.setPriority(11);
req = new AMRMClient.ContainerRequest(capability, hosts, racks, priority, 3);
amRmClient.addContainerRequest(req);
*/
for (int i = 0; i < 100; i++) {
AllocateResponse ar = amRmClient.allocate(0);
sleep(1000);
LOG.debug("allocateResponse: {}", ar);
for (Container c : ar.getAllocatedContainers()) {
LOG.debug("*** allocated {}", c.getResource());
amRmClient.removeContainerRequest(req);
}
/*
GetClusterNodesRequest request =
Records.newRecord(GetClusterNodesRequest.class);
ClientRMService clientRMService = yarnCluster.getResourceManager().getClientRMService();
GetClusterNodesResponse response = clientRMService.getClusterNodes(request);
List<NodeReport> nodeReports = response.getNodeReports();
LOG.info(nodeReports);
for (NodeReport nr: nodeReports) {
LOG.info("Node: " + nr.getNodeId());
LOG.info("Total memory: " + nr.getCapability());
LOG.info("Used memory: " + nr.getUsed());
LOG.info("Number containers: " + nr.getNumContainers());
}
*/
}
// unregister from RM
amRmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "testUnmanagedAM finished", null);
}
}.run();
}
private static String APP_NAME = "$test\\\"'";
@Test
public void testAddAttributeToArgs() throws Exception
{
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(LogicalPlan.APPLICATION_NAME, APP_NAME);
AddAttributeToArgsOperator operator = dag.addOperator("test", AddAttributeToArgsOperator.class);
dag.getContextAttributes(operator).put(OperatorContext.RECOVERY_ATTEMPTS, 0);
StramClient client = new StramClient(conf, dag);
try {
client.start();
client.startApplication();
Assert.assertTrue(client.monitorApplication());
} finally {
client.stop();
}
}
public static class AddAttributeToArgsOperator extends BaseOperator implements InputOperator
{
@Override
public void emitTuples()
{
throw APP_NAME.equals(System.getProperty(LogicalPlan.APPLICATION_NAME.getLongName()))
? new ShutdownException() : new RuntimeException();
}
}
}