blob: c56e0649095290352ffde57ec873394b13250f60 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.mesos.*;
import org.apache.mesos.Protos.*;
public class TestMultipleExecutorsFramework {
static class TestScheduler implements Scheduler {
@Override
public void registered(SchedulerDriver driver,
FrameworkID frameworkId,
MasterInfo masterInfo) {
System.out.println("Registered! ID = " + frameworkId.getValue());
}
@Override
public void reregistered(SchedulerDriver driver, MasterInfo masterInfo) {}
@Override
public void disconnected(SchedulerDriver driver) {}
@Override
public void resourceOffers(SchedulerDriver driver,
List<Offer> offers) {
try {
File file = new File("./test-executor");
for (Offer offer : offers) {
List<TaskInfo> tasks = new ArrayList<TaskInfo>();
if (!fooLaunched) {
TaskID taskId = TaskID.newBuilder()
.setValue("foo")
.build();
System.out.println("Launching task " + taskId.getValue());
TaskInfo task = TaskInfo.newBuilder()
.setName("task " + taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.addResources(Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(1)
.build())
.build())
.addResources(Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(128)
.build())
.build())
.setExecutor(ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder()
.setValue("executor-foo")
.build())
.setCommand(CommandInfo.newBuilder()
.setValue(file.getCanonicalPath())
.build())
.build())
.setData(ByteString.copyFromUtf8("100000"))
.build();
tasks.add(task);
fooLaunched = true;
}
if (!barLaunched) {
TaskID taskId = TaskID.newBuilder()
.setValue("bar")
.build();
System.out.println("Launching task " + taskId.getValue());
TaskInfo task = TaskInfo.newBuilder()
.setName("task " + taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.addResources(Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(1)
.build())
.build())
.addResources(Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(128)
.build())
.build())
.setExecutor(ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder()
.setValue("executor-bar")
.build())
.setCommand(CommandInfo.newBuilder()
.setValue(file.getCanonicalPath())
.build())
.build())
.setData(ByteString.copyFrom("100000".getBytes()))
.build();
tasks.add(task);
barLaunched = true;
}
driver.launchTasks(offer.getId(), tasks);
}
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
@Override
public void offerRescinded(SchedulerDriver driver, OfferID offerId) {}
@Override
public void statusUpdate(SchedulerDriver driver, TaskStatus status) {
System.out.println("Status update: task " + status.getTaskId() +
" is in state " + status.getState());
if (status.getState() == TaskState.TASK_FINISHED) {
if (status.getTaskId().getValue().equals("foo")) {
fooFinished = true;
}
if (status.getTaskId().getValue().equals("bar")) {
barFinished = true;
}
}
if (fooFinished && barFinished) {
driver.stop();
}
}
@Override
public void frameworkMessage(SchedulerDriver driver,
ExecutorID executorId,
SlaveID slaveId,
byte[] data) {}
@Override
public void slaveLost(SchedulerDriver driver, SlaveID slaveId) {}
@Override
public void executorLost(SchedulerDriver driver,
ExecutorID executorId,
SlaveID slaveId,
int status) {}
public void error(SchedulerDriver driver, String message) {
System.out.println("Error: " + message);
}
private boolean fooLaunched = false;
private boolean barLaunched = false;
private boolean fooFinished = false;
private boolean barFinished = false;
}
private static void usage() {
String name = TestMultipleExecutorsFramework.class.getName();
System.err.println("Usage: " + name + " master <tasks>");
}
public static void main(String[] args) throws Exception {
if (args.length < 1 || args.length > 2) {
usage();
System.exit(1);
}
FrameworkInfo framework = FrameworkInfo.newBuilder()
.setUser("") // Have Mesos fill in the current user.
.setName("Test Multiple Executors Framework (Java)")
.setPrincipal("test-multiple-executors-framework-java")
.build();
MesosSchedulerDriver driver = new MesosSchedulerDriver(
new TestScheduler(),
framework,
args[0]);
System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
}
}