blob: 2d8ab78b8f58ae735771c520852fff53f0b05145 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.myriad.executor;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.mesos.MesosExecutorDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Status;
import org.apache.mesos.Protos.TaskState;
import org.apache.mesos.Protos.TaskStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Auxillary service wrapper for MyriadExecutor
*/
public class MyriadExecutorAuxService extends AuxiliaryService {
private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class);
private static final String SERVICE_NAME = "myriad_service";
public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_";
public static final String YARN_CONTAINER_FULL_PREFIX = "yarn_task_";
private MesosExecutorDriver driver;
private Thread myriadExecutorThread;
// Storing container id strings as it is difficult to get access to
// NodeManager's NMContext object from an auxiliary service.
private Set<String> containerIds = new HashSet<>();
protected MyriadExecutorAuxService() {
super(SERVICE_NAME);
}
@Override
protected void serviceStart() throws Exception {
LOGGER.info("Starting MyriadExecutor...");
myriadExecutorThread = new Thread(new Runnable() {
public void run() {
driver = new MesosExecutorDriver(new MyriadExecutor(containerIds));
LOGGER.error("MyriadExecutor exit with status " + Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1));
}
});
myriadExecutorThread.start();
}
@Override
public void initializeApplication(ApplicationInitializationContext initAppContext) {
LOGGER.debug("initializeApplication");
}
@Override
public void stopApplication(ApplicationTerminationContext stopAppContext) {
LOGGER.debug("stopApplication");
}
@Override
public ByteBuffer getMetaData() {
LOGGER.debug("getMetaData");
return null;
}
@Override
public void initializeContainer(ContainerInitializationContext initContainerContext) {
ContainerId containerId = initContainerContext.getContainerId();
synchronized (containerIds) {
containerIds.add(containerId.toString());
}
sendStatus(containerId, TaskState.TASK_RUNNING);
}
@Override
public void stopContainer(ContainerTerminationContext stopContainerContext) {
ContainerId containerId = stopContainerContext.getContainerId();
synchronized (containerIds) {
containerIds.remove(containerId.toString());
}
sendStatus(stopContainerContext.getContainerId(), TaskState.TASK_FINISHED);
}
private void sendStatus(ContainerId containerId, TaskState taskState) {
Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString()).build();
TaskStatus status = TaskStatus.newBuilder().setTaskId(taskId).setState(taskState).build();
driver.sendStatusUpdate(status);
LOGGER.debug("Sent status " + taskState + " for taskId " + taskId);
}
}