blob: 84e9fd60f6482506f10c0df432aa22e74892e939 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.resource.batchmgr;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.MesosExecutorDriver;
import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.FrameworkInfo;
import org.apache.mesos.Protos.SlaveInfo;
import org.apache.mesos.Protos.Status;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Protos.TaskState;
import org.apache.mesos.Protos.TaskStatus;
import org.apache.oodt.cas.resource.structs.JobInput;
import org.apache.oodt.cas.resource.structs.JobInstance;
import org.apache.oodt.cas.resource.structs.JobSpec;
import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
import org.apache.oodt.cas.resource.util.MesosUtilities;
/**
* @author starchmd
*
* This "Executor" is run by mesos to actually run the job.
*/
public class ResourceExecutor implements Executor {
PrintStream str = null;
String id = new SimpleDateFormat("HH:mm:ss").format(Calendar.getInstance().getTime())+" ";
public ResourceExecutor() {
try {
File tmp = new File("./executor-log.notalog");
//tmp.delete(); //With NIO then something must be caught
str = new PrintStream(new FileOutputStream(tmp));
str.println(id+"Starting up new<<<<<");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
/* (non-Javadoc)
* @see org.apache.mesos.Executor#disconnected(org.apache.mesos.ExecutorDriver)
*/
@Override
public void disconnected(ExecutorDriver driver) {
str.println(id+"Disconnected!");
}
/* (non-Javadoc)
* @see org.apache.mesos.Executor#error(org.apache.mesos.ExecutorDriver, java.lang.String)
*/
@Override
public void error(ExecutorDriver driver, String error) {
str.println(id+"Error: "+error);
}
/* (non-Javadoc)
* @see org.apache.mesos.Executor#frameworkMessage(org.apache.mesos.ExecutorDriver, byte[])
*/
@Override
public void frameworkMessage(ExecutorDriver arg0, byte[] arg1) {
str.println(id+"Message: "+new String(arg1));
}
/* (non-Javadoc)
* @see org.apache.mesos.Executor#killTask(org.apache.mesos.ExecutorDriver, org.apache.mesos.Protos.TaskID)
*/
@Override
public void killTask(ExecutorDriver arg0, TaskID arg1) {
str.println(id+"Kill");
}
/* (non-Javadoc)
* @see org.apache.mesos.Executor#launchTask(org.apache.mesos.ExecutorDriver, org.apache.mesos.Protos.TaskInfo)
*/
@Override
public void launchTask(final ExecutorDriver driver, final TaskInfo info) {
str.println(id+"Launch task!");
try {
JobSpec spec = MesosUtilities.byteStringToJobSpec(info.getData());
final JobInstance exec = GenericResourceManagerObjectFactory
.getJobInstanceFromClassName(spec.getJob().getJobInstanceClassName());
final JobInput in = spec.getIn();
Thread tmp = new Thread(new Runnable(){
public void run() {
TaskStatus status = null;
try {
exec.execute(in);
status = TaskStatus.newBuilder().setTaskId(info.getTaskId())
.setState(TaskState.TASK_FINISHED).build();
} catch (JobInputException e) {
e.printStackTrace();
status = TaskStatus.newBuilder().setTaskId(info.getTaskId())
.setState(TaskState.TASK_FAILED).build();
}
driver.sendStatusUpdate(status);
}
});
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
.setState(TaskState.TASK_STARTING).build());
tmp.start();
} catch (ClassNotFoundException e1) {
System.out.println("BAD DATA: ");
e1.printStackTrace();
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
.setState(TaskState.TASK_FAILED).build());
} catch (InstantiationException e2) {
System.out.println("BAD DATA: ");
e2.printStackTrace();
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
.setState(TaskState.TASK_FAILED).build());
} catch (IllegalAccessException e3) {
System.out.println("BAD DATA: ");
e3.printStackTrace();
driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(info.getTaskId())
.setState(TaskState.TASK_FAILED).build());
}
}
/* (non-Javadoc)
* @see org.apache.mesos.Executor#registered(org.apache.mesos.ExecutorDriver, org.apache.mesos.Protos.ExecutorInfo, org.apache.mesos.Protos.FrameworkInfo, org.apache.mesos.Protos.SlaveInfo)
*/
@Override
public void registered(ExecutorDriver arg0, ExecutorInfo arg1,
FrameworkInfo arg2, SlaveInfo arg3) {
System.out.println("Do-Wah-Do-Wah");
str.println(id+"Registered, Huzzah!");
}
/* (non-Javadoc)
* @see org.apache.mesos.Executor#reregistered(org.apache.mesos.ExecutorDriver, org.apache.mesos.Protos.SlaveInfo)
*/
@Override
public void reregistered(ExecutorDriver arg0, SlaveInfo arg1) {
System.out.println("Do-Wah-Do-Wah GO GO GO!!!!");
str.println(id+"Re-regged");
}
/* (non-Javadoc)
* @see org.apache.mesos.Executor#shutdown(org.apache.mesos.ExecutorDriver)
*/
@Override
public void shutdown(ExecutorDriver arg0) {
System.out.println("Down down down.");
str.println(id+"Shutdown");
}
public static void main(String[] args) throws Exception {
MesosExecutorDriver driver = new MesosExecutorDriver(new ResourceExecutor());
System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
}
}