blob: 76eaa25ea0e8d1d963e6b9fd6b6fd921cb993083 [file] [log] [blame]
package org.apache.s4.tools.yarn;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.deploy.AppConstants;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.s4.tools.Tools;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.io.Files;
import com.google.inject.Injector;
// NOTE: you must have an updated s4-yarn installed app (this is automatic when tests are run from the command line, otherwise do: gradlew s4-yarn:installApp)
public class TestYarnDeployment extends ZkBasedTest {
private static Logger logger = LoggerFactory.getLogger(TestYarnDeployment.class);
protected static MiniYARNCluster yarnCluster = null;
protected static MiniDFSCluster dfsCluster = null;
protected static Configuration conf = new Configuration();
@BeforeClass
public static void setup() throws InterruptedException, IOException {
logger.info("Starting up YARN cluster");
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
// NOTE : we use default ports
dfsCluster = new MiniDFSCluster.Builder(conf).nameNodePort(9000).build();
dfsCluster.waitActive();
String fsDefaultName = "hdfs://localhost:" + dfsCluster.getNameNodePort();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, fsDefaultName);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
yarnCluster = new MiniYARNCluster(TestYarnDeployment.class.getName(), 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
logger.info("setup thread sleep interrupted. message=" + e.getMessage());
}
}
@AfterClass
public static void tearDown() throws IOException {
if (yarnCluster != null) {
yarnCluster.stop();
yarnCluster = null;
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
private ZkClient zkClient;
private File tmpAppsDir;
@Test
public void testDeployment() throws Exception {
tmpAppsDir = Files.createTempDir();
File gradlewFile = CoreTestUtils.findGradlewInRootDir();
CoreTestUtils.callGradleTask(new File(gradlewFile.getParentFile().getAbsolutePath()
+ "/test-apps/simple-deployable-app-1/build.gradle"), "installS4R", new String[] { "appsDir="
+ tmpAppsDir.getAbsolutePath() });
zkClient = new ZkClient("localhost:" + CommTestUtils.ZK_PORT);
zkClient.setZkSerializer(new ZNRecordSerializer());
List<String> processes = zkClient.getChildren("/s4/clusters/cluster1/process");
Assert.assertTrue(processes.size() == 0);
final CountDownLatch signalProcessesReady = new CountDownLatch(1);
zkClient.subscribeChildChanges("/s4/clusters/cluster1/process", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
if (currentChilds.size() == 2) {
signalProcessesReady.countDown();
}
}
});
CountDownLatch signalAppInitialized = new CountDownLatch(1);
CountDownLatch signalAppStarted = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppInitialized,
CommTestUtils.createZkClient());
CommTestUtils.watchAndSignalCreation(AppConstants.INITIALIZED_ZNODE_1, signalAppStarted,
CommTestUtils.createZkClient());
Assert.assertFalse(zkClient.exists(AppConstants.INITIALIZED_ZNODE_1));
FileSystem fs = FileSystem.get(conf);
Path destS4rPath = new Path(fs.getHomeDirectory() + "/simpleDeployableApp.s4r");
fs.copyFromLocalFile(new Path(new File(tmpAppsDir.getAbsolutePath()
+ "/simple-deployable-app-1-0.0.0-SNAPSHOT.s4r").toURI()), destS4rPath);
fs.close();
// deploy with Yarn Client
final String[] params = ("-cluster=cluster1 -nbTasks=2 -flp=14000 -s4r=" + destS4rPath.toUri().toString()
+ " -zk=localhost:2181 -s4Dir=" + gradlewFile.getParentFile().getAbsolutePath()).split("[ ]");
S4CLIYarnArgs yarnArgs = new S4CLIYarnArgs();
Tools.parseArgs(yarnArgs, params);
final S4YarnClient client = new S4YarnClient(yarnArgs, conf);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
boolean result = client.run(true);
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
}
}, "Yarn Client");
t.start();
Assert.assertTrue(signalAppInitialized.await(200, TimeUnit.SECONDS));
Assert.assertTrue(signalAppStarted.await(200, TimeUnit.SECONDS));
String time1 = String.valueOf(System.currentTimeMillis());
CountDownLatch signalEvent1Processed = new CountDownLatch(1);
CommTestUtils
.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, CommTestUtils.createZkClient());
Injector injector = CoreTestUtils.createInjectorWithNonFailFastZKClients();
TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
Event event = new Event();
event.put("line", String.class, time1);
emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
.serialize(event)));
// check event processed
Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
// cleanup
client.killApplication(client.getApplicationList().get(0).getApplicationId());
}
}