* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.fluo.yarn.core;
import java.util.Collection;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.TwillApplication;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.ext.BundledJarRunnable;
import org.apache.twill.ext.BundledJarRunner;
import org.apache.twill.yarn.YarnTwillRunnerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FluoYarnLauncher {
private static final Logger log = LoggerFactory.getLogger(FluoYarnLauncher.class);
private static final String ORACLE_ID = "Oracle";
private static final String WORKER_ID = "Worker";
private static final String CONN_PROPS = "";
private static final String LOG4J_PROPS = "";
private static class FluoYarnApp implements TwillApplication {
private FluoYarnEnv env;
FluoYarnApp(FluoYarnEnv env) {
this.env = env;
public TwillSpecification configure() {
ResourceSpecification oracleResources =
.setMemory(env.getOracleMaxMemory(), ResourceSpecification.SizeUnit.MEGA)
ResourceSpecification workerResources =
.setMemory(env.getWorkerMaxMemory(), ResourceSpecification.SizeUnit.MEGA)
return TwillSpecification.Builder.with().setName("fluo-app-" + env.getApplicationName())
.withRunnable().add(ORACLE_ID, new BundledJarRunnable(), oracleResources).withLocalFiles()
.add(env.getBundledJarName(), new File(env.getBundledJarPath()), false)
.add(CONN_PROPS, new File(env.getConnPropsPath()), false)
.add(LOG4J_PROPS, new File(env.getLogPropsPath()), false).apply()
.add(WORKER_ID, new BundledJarRunnable(), workerResources).withLocalFiles()
.add(env.getBundledJarName(), new File(env.getBundledJarPath()), false)
.add(CONN_PROPS, new File(env.getConnPropsPath()), false)
.add(LOG4J_PROPS, new File(env.getLogPropsPath()), false).apply().anyOrder().build();
private static int getNumRunning(TwillController controller) {
ResourceReport report = controller.getResourceReport();
if (report == null) {
return 0;
int total = 0;
Collection<TwillRunResources> resources = report.getRunnableResources(ORACLE_ID);
if (resources != null) {
total += resources.size();
resources = report.getRunnableResources(WORKER_ID);
if (resources != null) {
total += resources.size();
return total;
public static void main(String[] args) throws Exception {
if (args.length != 5) {
System.err.println("Invalid arguments");
String connProps = args[0];
String yarnProps = args[1];
String logProps = args[2];
String appName = args[3];
String jarPath = args[4];
FluoYarnEnv env = new FluoYarnEnv(yarnProps, connProps, logProps, appName, jarPath);
BundledJarRunner.Arguments oracleArgs =
new BundledJarRunner.Arguments.Builder().setJarFileName(env.getBundledJarName())
.setMainArgs(new String[] {"-a", appName}).createArguments();
BundledJarRunner.Arguments workerArgs =
new BundledJarRunner.Arguments.Builder().setJarFileName(env.getBundledJarName())
.setMainArgs(new String[] {"-a", appName}).createArguments();
TwillRunnerService twillRunner =
new YarnTwillRunnerService(env.getYarnConfiguration(), env.getZookeepers());
TwillPreparer preparer = twillRunner.prepare(new FluoYarnApp(env))
.addJVMOptions("-Dlog4j.configuration=file:$PWD/" + LOG4J_PROPS)
.addJVMOptions(("-Dfluo.conn.props=" + CONN_PROPS))
.withArguments(ORACLE_ID, oracleArgs.toArray())
.withArguments(WORKER_ID, workerArgs.toArray());
TwillController controller = preparer.start();
ResourceReport report = controller.getResourceReport();"Waiting for Fluo application '{}' to start in YARN...", appName);
while (report == null) {
report = controller.getResourceReport();
String appID = report.getApplicationId();"Fluo application '{}' has started in YARN with ID '{}'", appName, appID);"Waiting for all containers of Fluo application '{}' to start in YARN...", appName);
int numRunning = getNumRunning(controller);
while (numRunning != env.getTotalInstances()) {"{} of {} containers have started in YARN", numRunning, env.getTotalInstances());
numRunning = getNumRunning(controller);
}"{} of {} containers have started in YARN", numRunning, env.getTotalInstances());"Fluo application '{}' has successfully started in YARN with ID '{}'", appName, appID);