blob: 17d5fc2369cfed5a5270e97ea4577f30dded1f5b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.*;
import org.apache.mesos.v1.*;
import org.apache.mesos.v1.Protos.*;
import org.apache.mesos.v1.scheduler.Mesos;
import org.apache.mesos.v1.scheduler.Scheduler;
import org.apache.mesos.v1.scheduler.V0Mesos;
import org.apache.mesos.v1.scheduler.V1Mesos;
import org.apache.mesos.v1.scheduler.Protos.Call;
import org.apache.mesos.v1.scheduler.Protos.Event;
public class V1TestFramework {
static class TestScheduler implements Scheduler {
public TestScheduler(
String master,
FrameworkInfo framework,
ExecutorInfo executor) {
this(master, framework, executor, 5);
}
public TestScheduler(
String master,
FrameworkInfo framework,
ExecutorInfo executor,
int totalTasks) {
this.framework = framework;
this.executor = executor;
this.totalTasks = totalTasks;
this.state = State.DISCONNECTED;
}
// TODO(anand): Synchronize on `state` instead.
@Override
public synchronized void connected(final Mesos mesos) {
System.out.println("Connected");
state = State.CONNECTED;
retryTimer = new Timer();
retryTimer.schedule(new TimerTask() {
@Override
public void run() {
doReliableRegistration(mesos);
}
}, 0, 1000); // Repeat every 1 second
}
@Override
public synchronized void disconnected(Mesos mesos) {
System.out.println("Disconnected");
state = state.DISCONNECTED;
cancelRetryTimer();
}
@Override
public synchronized void received(Mesos mesos, Event event) {
switch (event.getType()) {
case SUBSCRIBED: {
frameworkId = event.getSubscribed().getFrameworkId();
state = State.SUBSCRIBED;
System.out.println(
"Subscribed with ID " + frameworkId + " to master " +
event.getSubscribed().getMasterInfo().getId());
break;
}
case OFFERS: {
System.out.println("Received an OFFERS event");
offers(mesos, event.getOffers().getOffersList());
break;
}
case RESCIND: {
System.out.println("Received an RESCIND event");
break;
}
case UPDATE: {
System.out.println("Received an UPDATE event");
update(mesos, event.getUpdate().getStatus());
break;
}
case MESSAGE: {
System.out.println("Received a MESSAGE event");
break;
}
case FAILURE: {
System.out.println("Received a FAILURE event");
break;
}
case ERROR: {
System.out.println("Received an ERROR event");
System.exit(1);
}
case HEARTBEAT: {
// TODO(anand): Force reconnection with the master upon lack
// of heartbeats.
System.out.println("Received a HEARTBEAT event");
break;
}
case UNKNOWN: {
System.out.println("Received an UNKNOWN event");
break;
}
}
}
public synchronized void doReliableRegistration(Mesos mesos) {
if (state == State.SUBSCRIBED || state == State.DISCONNECTED) {
cancelRetryTimer();
return;
}
Call.Builder callBuilder = Call.newBuilder()
.setType(Call.Type.SUBSCRIBE)
.setSubscribe(Call.Subscribe.newBuilder()
.setFrameworkInfo(framework)
.build());
mesos.send(callBuilder.build());
}
private void cancelRetryTimer() {
// Cancel previously active timer (if one exists).
if (retryTimer != null) {
retryTimer.cancel();
retryTimer.purge();
}
retryTimer = null;
}
public void offers(Mesos mesos, List<Offer> offers) {
double CPUS_PER_TASK = 1;
double MEM_PER_TASK = 128;
for (Offer offer : offers) {
Offer.Operation.Launch.Builder launch =
Offer.Operation.Launch.newBuilder();
double offerCpus = 0;
double offerMem = 0;
for (Resource resource : offer.getResourcesList()) {
if (resource.getName().equals("cpus")) {
offerCpus += resource.getScalar().getValue();
} else if (resource.getName().equals("mem")) {
offerMem += resource.getScalar().getValue();
}
}
System.out.println(
"Received offer " + offer.getId().getValue() + " with cpus: " +
offerCpus + " and mem: " + offerMem);
double remainingCpus = offerCpus;
double remainingMem = offerMem;
while (launchedTasks < totalTasks &&
remainingCpus >= CPUS_PER_TASK &&
remainingMem >= MEM_PER_TASK) {
TaskID taskId = TaskID.newBuilder()
.setValue(Integer.toString(launchedTasks++)).build();
System.out.println("Launching task " + taskId.getValue() +
" using offer " + offer.getId().getValue());
TaskInfo task = TaskInfo.newBuilder()
.setName("task " + taskId.getValue())
.setTaskId(taskId)
.setAgentId(offer.getAgentId())
.addResources(Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(CPUS_PER_TASK)
.build()))
.addResources(Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(MEM_PER_TASK)
.build()))
.setExecutor(ExecutorInfo.newBuilder(executor))
.build();
launch.addTaskInfos(TaskInfo.newBuilder(task));
remainingCpus -= CPUS_PER_TASK;
remainingMem -= MEM_PER_TASK;
}
mesos.send(Call.newBuilder()
.setType(Call.Type.ACCEPT)
.setFrameworkId(frameworkId)
.setAccept(Call.Accept.newBuilder()
.addOfferIds(offer.getId())
.addOperations(Offer.Operation.newBuilder()
.setType(Offer.Operation.Type.LAUNCH)
.setLaunch(launch)
.build())
.setFilters(Filters.newBuilder()
.setRefuseSeconds(1)
.build()))
.build());
}
}
public void update(Mesos mesos, TaskStatus status) {
System.out.println(
"Status update: task " + status.getTaskId().getValue() +
" is in state " + status.getState().getValueDescriptor().getName());
if (status.getState() == TaskState.TASK_FINISHED) {
finishedTasks++;
System.out.println("Finished tasks: " + finishedTasks);
if (finishedTasks == totalTasks) {
lock.lock();
try {
finished = true;
finishedCondtion.signal();
} finally {
lock.unlock();
}
}
}
if (status.getState() == TaskState.TASK_LOST ||
status.getState() == TaskState.TASK_KILLED ||
status.getState() == TaskState.TASK_FAILED) {
System.err.println(
"Aborting because task " + status.getTaskId().getValue() +
" is in unexpected state " +
status.getState().getValueDescriptor().getName() +
" with reason '" +
status.getReason().getValueDescriptor().getName() + "'" +
" from source '" +
status.getSource().getValueDescriptor().getName() + "'" +
" with message '" + status.getMessage() + "'");
System.exit(1);
}
mesos.send(Call.newBuilder()
.setType(Call.Type.ACKNOWLEDGE)
.setFrameworkId(frameworkId)
.setAcknowledge(Call.Acknowledge.newBuilder()
.setAgentId(status.getAgentId())
.setTaskId(status.getTaskId())
.setUuid(status.getUuid())
.build())
.build());
}
private enum State {
DISCONNECTED,
CONNECTED,
SUBSCRIBED
}
private FrameworkInfo framework;
private FrameworkID frameworkId;
private final ExecutorInfo executor;
private final int totalTasks;
private int launchedTasks = 0;
private int finishedTasks = 0;
private State state;
private Timer retryTimer = null;
}
private static void usage() {
String name = V1TestFramework.class.getName();
System.err.println("Usage: " + name + " master version{0,1}");
}
public static void main(String[] args) throws Exception {
if (args.length < 2 || args.length > 3) {
usage();
System.exit(1);
}
int version = Integer.parseInt(args[1]);
if (version != 0 && version != 1) {
usage();
System.exit(1);
}
String uri = new File("./test-executor").getCanonicalPath();
ExecutorInfo executor = ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue("default"))
.setCommand(CommandInfo.newBuilder().setValue(uri))
.setName("Test Executor (Java)")
.setSource("java_test")
.build();
FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder()
.setUser(System.getProperty("user.name", "default-user"))
.setName("V" + version + " Test Framework (Java)");
Credential.Builder credentialBuilder = null;
if (System.getenv("MESOS_EXAMPLE_PRINCIPAL") != null) {
frameworkBuilder.setPrincipal(System.getenv("MESOS_EXAMPLE_PRINCIPAL"));
if (System.getenv("MESOS_EXAMPLE_SECRET") != null) {
credentialBuilder = Credential.newBuilder()
.setPrincipal(System.getenv("MESOS_EXAMPLE_PRINCIPAL"))
.setSecret(System.getenv("MESOS_EXAMPLE_SECRET"));
}
}
Scheduler scheduler = args.length == 2
? new TestScheduler(args[0], frameworkBuilder.build(), executor)
: new TestScheduler(args[0],
frameworkBuilder.build(),
executor,
Integer.parseInt(args[2]));
Mesos mesos;
if (credentialBuilder != null) {
mesos = version == 1
? new V1Mesos(scheduler, args[0], credentialBuilder.build())
: new V0Mesos(scheduler,
frameworkBuilder.build(),
args[0],
credentialBuilder.build());
} else {
mesos = version == 1
? new V1Mesos(scheduler, args[0])
: new V0Mesos(scheduler, frameworkBuilder.build(), args[0]);
}
lock.lock();
try {
while (!finished) {
finishedCondtion.await();
}
} finally {
lock.unlock();
}
// NOTE: Copied from src/examples/java/TestFramework.java:
// For this test to pass reliably on some platforms, this sleep is
// required to ensure that the SchedulerDriver teardown is complete
// before the JVM starts running native object destructors after
// System.exit() is called. 500ms proved successful in test runs,
// but on a heavily loaded machine it might not.
// TODO(greg): Ideally, we would inspect the status of the driver
// and its associated tasks via the Java API and wait until their
// teardown is complete to exit.
Thread.sleep(500);
System.exit(0);
}
static boolean finished = false;
final static Lock lock = new ReentrantLock();
final static Condition finishedCondtion = lock.newCondition();
}