blob: d3b48f266a23dba0a985367622b56b532b419de7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.master;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.Util;
import org.apache.nemo.common.exception.ContainerException;
import org.apache.nemo.common.exception.IllegalMessageException;
import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.*;
import org.apache.nemo.runtime.common.metric.JobMetric;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.master.metric.MetricManagerMaster;
import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
import org.apache.nemo.runtime.master.metric.MetricStore;
import org.apache.nemo.runtime.master.resource.ContainerManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
import org.apache.nemo.runtime.master.scheduler.ExecutorRegistry;
import org.apache.nemo.runtime.master.scheduler.Scheduler;
import org.apache.nemo.runtime.master.servlet.*;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Parameter;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* (WARNING) Use runtimeMasterThread for all public methods to avoid race conditions.
* See comments in the {@link Scheduler} for avoiding race conditions.
* <p>
* Runtime Master is the central controller of Runtime.
* Compiler submits an {@link PhysicalPlan} to Runtime Master to execute a job.
* Runtime Master handles:
* a) Scheduling the plan with {@link Scheduler}.
* b) Managing resources with {@link ContainerManager}.
* c) Managing blocks with {@link BlockManagerMaster}.
* d) Receiving and sending control messages with {@link MessageEnvironment}.
* e) Metric using {@link MetricMessageHandler}.
*/
@DriverSide
public final class RuntimeMaster {
private static final Logger LOG = LoggerFactory.getLogger(RuntimeMaster.class.getName());
private static final int DAG_LOGGING_PERIOD = 3000;
private static final int METRIC_ARRIVE_TIMEOUT = 10000;
private static final int REST_SERVER_PORT = 10101;
private static final int SPECULATION_CHECKING_PERIOD_MS = 100;
private final ExecutorService runtimeMasterThread;
private final ScheduledExecutorService speculativeTaskCloningThread;
private final Scheduler scheduler;
private final ContainerManager containerManager;
private final ExecutorRegistry executorRegistry;
private final MetricMessageHandler metricMessageHandler;
private final MessageEnvironment masterMessageEnvironment;
private final ClientRPC clientRPC;
private final MetricManagerMaster metricManagerMaster;
private final PlanStateManager planStateManager;
// For converting json data. This is a thread safe.
private final ObjectMapper objectMapper;
private final String jobId;
private final String dagDirectory;
private final Boolean dbEnabled;
private final String dbAddress;
private final String dbId;
private final String dbPassword;
private final Set<IRVertex> irVertices;
private final AtomicInteger resourceRequestCount;
private CountDownLatch metricCountDownLatch;
// REST API server for web metric visualization ui.
private final Server metricServer;
private final MetricStore metricStore;
/**
* Constructor.
*
* @param scheduler the scheduler implementation.
* @param containerManager the container manager, in charge of the available containers.
* @param metricMessageHandler the handler for metric messages.
* @param masterMessageEnvironment message environment for the runtime master.
* @param metricManagerMaster metric manager master.
* @param clientRPC the RPC channel to communicate with the client.
* @param planStateManager the manager that keeps track of the plan state.
* @param jobId the Job ID, provided by the user.
* @param dbEnabled whether or not the DB is enabled, provided by the user.
* @param dbAddress the DB Address, provided by the user.
* @param dbId the ID for the given DB.
* @param dbPassword the password for the given DB.
* @param dagDirectory directory of the DAG to save the json files and metrics into.
*/
@Inject
private RuntimeMaster(final Scheduler scheduler,
final ContainerManager containerManager,
final ExecutorRegistry executorRegistry,
final MetricMessageHandler metricMessageHandler,
final MessageEnvironment masterMessageEnvironment,
final MetricManagerMaster metricManagerMaster,
final ClientRPC clientRPC,
final PlanStateManager planStateManager,
@Parameter(JobConf.JobId.class) final String jobId,
@Parameter(JobConf.DBEnabled.class) final Boolean dbEnabled,
@Parameter(JobConf.DBAddress.class) final String dbAddress,
@Parameter(JobConf.DBId.class) final String dbId,
@Parameter(JobConf.DBPasswd.class) final String dbPassword,
@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
// We would like to use a single thread for runtime master operations
// since the processing logic in master takes a very short amount of time
// compared to the job completion times of executed jobs
// and keeping it single threaded removes the complexity of multi-thread synchronization.
this.runtimeMasterThread =
Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster thread"));
// Check for speculative execution every second.
this.speculativeTaskCloningThread = Executors
.newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "SpeculativeTaskCloning thread"));
this.speculativeTaskCloningThread.scheduleAtFixedRate(
() -> this.runtimeMasterThread.submit(scheduler::onSpeculativeExecutionCheck),
SPECULATION_CHECKING_PERIOD_MS,
SPECULATION_CHECKING_PERIOD_MS,
TimeUnit.MILLISECONDS);
this.scheduler = scheduler;
this.containerManager = containerManager;
this.executorRegistry = executorRegistry;
this.metricMessageHandler = metricMessageHandler;
this.masterMessageEnvironment = masterMessageEnvironment;
this.masterMessageEnvironment
.setupListener(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, new MasterControlMessageReceiver());
this.clientRPC = clientRPC;
this.metricManagerMaster = metricManagerMaster;
this.jobId = jobId;
this.dagDirectory = dagDirectory;
this.dbEnabled = dbEnabled;
this.dbAddress = dbAddress;
this.dbId = dbId;
this.dbPassword = dbPassword;
this.irVertices = new HashSet<>();
this.resourceRequestCount = new AtomicInteger(0);
this.objectMapper = new ObjectMapper();
this.metricServer = startRestMetricServer();
this.metricStore = MetricStore.getStore();
this.planStateManager = planStateManager;
this.metricCountDownLatch = new CountDownLatch(0);
}
/**
* Start Metric Server.
*
* @return the metric server.
*/
private Server startRestMetricServer() {
final Server server = new Server(REST_SERVER_PORT);
final ServletHandler servletHandler = new ServletHandler();
server.setHandler(servletHandler);
servletHandler.addServletWithMapping(JobMetricServlet.class, "/api/job");
servletHandler.addServletWithMapping(TaskMetricServlet.class, "/api/task");
servletHandler.addServletWithMapping(StageMetricServlet.class, "/api/stage");
servletHandler.addServletWithMapping(AllMetricServlet.class, "/api");
servletHandler.addServletWithMapping(WebSocketMetricServlet.class, "/api/websocket");
try {
server.start();
} catch (final Exception e) {
throw new MetricException("Failed to start REST API server: " + e);
}
return server;
}
/**
* Record IR DAG related metrics.
*
* @param irdag the IR DAG to record.
* @param planId the ID of the IR DAG Physical Plan.
*/
public void recordIRDAGMetrics(final IRDAG irdag, final String planId) {
metricStore.getOrCreateMetric(JobMetric.class, planId).setIRDAG(irdag);
}
/**
* Flush metrics.
*/
public void flushMetrics() {
if (metricCountDownLatch.getCount() == 0) {
metricCountDownLatch = new CountDownLatch(executorRegistry.getNumberOfRunningExecutors());
// send metric flush request to all executors
metricManagerMaster.sendMetricFlushRequest();
}
try {
if (!metricCountDownLatch.await(METRIC_ARRIVE_TIMEOUT, TimeUnit.MILLISECONDS)) {
LOG.warn("Write Metric before all metric messages arrived.");
}
} catch (InterruptedException e) {
LOG.warn("Waiting Save Metric Process interrupted: ", e);
// clean up state...
Thread.currentThread().interrupt();
}
// save metric to file
metricStore.dumpAllMetricToFile(Paths.get(dagDirectory,
"Metric_" + jobId + "_" + System.currentTimeMillis() + ".json").toString());
// save metric to database
if (this.dbEnabled) {
metricStore.saveOptimizationMetricsToDB(dbAddress, jobId, dbId, dbPassword);
}
}
/**
* Submits the {@link PhysicalPlan} to Runtime.
* At now, we are assuming that a single job submit multiple plans.
*
* @param plan to execute
* @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
* @return pair of {@link PlanStateManager} and {@link ScheduledExecutorService}
*/
public Pair<PlanStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
final int maxScheduleAttempt) {
final Callable<Pair<PlanStateManager, ScheduledExecutorService>> planExecutionCallable = () -> {
this.irVertices.addAll(plan.getIdToIRVertex().values());
try {
scheduler.schedulePlan(plan, maxScheduleAttempt);
final ScheduledExecutorService dagLoggingExecutor = scheduleDagLogging();
return Pair.of(planStateManager, dagLoggingExecutor);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
try {
return runtimeMasterThread.submit(planExecutionCallable).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Terminates the RuntimeMaster.
*/
public void terminate() {
// No need to speculate anymore
speculativeTaskCloningThread.shutdown();
try {
// wait for metric flush
if (!metricCountDownLatch.await(METRIC_ARRIVE_TIMEOUT, TimeUnit.MILLISECONDS)) {
LOG.warn("Terminating master before all executor terminated messages arrived.");
}
} catch (final InterruptedException e) {
LOG.warn("Waiting executor terminating process interrupted: ", e);
// clean up state...
Thread.currentThread().interrupt();
}
runtimeMasterThread.execute(() -> {
scheduler.terminate();
try {
masterMessageEnvironment.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
metricMessageHandler.terminate();
containerManager.terminate();
try {
metricServer.stop();
} catch (final Exception e) {
throw new MetricException("Failed to stop rest api server: " + e);
}
});
// Do not shutdown runtimeMasterThread. We need it to clean things up.
}
/**
* Requests a container with resource specification.
*
* @param resourceSpecificationString the resource specification.
*/
public void requestContainer(final String resourceSpecificationString) {
final Future<?> containerRequestEventResult = runtimeMasterThread.submit(() -> {
try {
final List<Pair<Integer, ResourceSpecification>> resourceSpecificationList = // pair of (# of executors, specs)
Util.parseResourceSpecificationString(resourceSpecificationString);
for (final Pair<Integer, ResourceSpecification> resourceSpecification: resourceSpecificationList) {
resourceRequestCount.getAndAdd(resourceSpecification.left());
containerManager.requestContainer(resourceSpecification.left(), resourceSpecification.right());
}
} catch (final Exception e) {
throw new ContainerException(e);
}
});
try {
containerRequestEventResult.get();
} catch (final Exception e) {
LOG.error("Exception while requesting for a container: ", e);
throw new ContainerException(e);
}
}
/**
* Called when a container is allocated for this runtime.
* A wrapper function for {@link ContainerManager}.
*
* @param executorId to use for the executor to be launched on this container.
* @param allocatedEvaluator to be used as the container.
* @param executorConfiguration to use for the executor to be launched on this container.
*/
public void onContainerAllocated(final String executorId,
final AllocatedEvaluator allocatedEvaluator,
final Configuration executorConfiguration) {
runtimeMasterThread.execute(() ->
containerManager.onContainerAllocated(executorId, allocatedEvaluator, executorConfiguration));
}
/**
* Called when an executor is launched on a container for this runtime.
*
* @param activeContext of the launched executor.
* @return true if all requested executors have been launched, false otherwise.
*/
public boolean onExecutorLaunched(final ActiveContext activeContext) {
final Callable<Boolean> processExecutorLaunchedEvent = () -> {
final Optional<ExecutorRepresenter> executor = containerManager.onContainerLaunched(activeContext);
if (executor.isPresent()) {
scheduler.onExecutorAdded(executor.get());
return (resourceRequestCount.decrementAndGet() == 0);
} else {
return false;
}
};
final boolean eventResult;
try {
eventResult = runtimeMasterThread.submit(processExecutorLaunchedEvent).get();
} catch (final Exception e) {
throw new ContainerException(e);
}
return eventResult;
}
/**
* Called when an executor fails due to container failure on this runtime.
*
* @param failedEvaluator that failed.
*/
public void onExecutorFailed(final FailedEvaluator failedEvaluator) {
runtimeMasterThread.execute(() -> {
metricCountDownLatch.countDown();
// Note that getFailedContextList() can be empty if the failure occurred
// prior to launching an Executor on the Evaluator.
failedEvaluator.getFailedContextList().forEach(failedContext -> {
final String failedExecutorId = failedContext.getId();
scheduler.onExecutorRemoved(failedExecutorId);
});
containerManager.onContainerFailed(failedEvaluator.getId());
});
}
/**
* Handler for control messages received by Master.
*/
public final class MasterControlMessageReceiver implements MessageListener<ControlMessage.Message> {
@Override
public void onMessage(final ControlMessage.Message message) {
runtimeMasterThread.execute(() ->
handleControlMessage(message));
}
@Override
public void onMessageWithContext(final ControlMessage.Message message, final MessageContext messageContext) {
switch (message.getType()) {
case RequestBroadcastVariable:
final Serializable broadcastId =
SerializationUtils.deserialize(message.getRequestbroadcastVariableMsg().getBroadcastId().toByteArray());
final Object broadcastVariable = BroadcastManagerMaster.getBroadcastVariable(broadcastId);
if (broadcastVariable == null) {
throw new IllegalStateException(broadcastId.toString());
}
messageContext.reply(
ControlMessage.Message.newBuilder()
.setId(RuntimeIdManager.generateMessageId())
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
.setType(ControlMessage.MessageType.InMasterBroadcastVariable)
.setBroadcastVariableMsg(ControlMessage.InMasterBroadcastVariableMessage.newBuilder()
.setRequestId(message.getId())
// TODO #206: Efficient Broadcast Variable Serialization
.setVariable(ByteString.copyFrom(SerializationUtils.serialize((Serializable) broadcastVariable)))
.build())
.build());
break;
default:
throw new IllegalMessageException(
new Exception("This message should not be requested to Master :" + message.getType()));
}
}
}
private void handleControlMessage(final ControlMessage.Message message) {
switch (message.getType()) {
case TaskStateChanged:
final ControlMessage.TaskStateChangedMsg taskStateChangedMsg
= message.getTaskStateChangedMsg();
scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
taskStateChangedMsg.getTaskId(),
taskStateChangedMsg.getAttemptIdx(),
MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
taskStateChangedMsg.getVertexPutOnHoldId(),
MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
break;
case ExecutorFailed:
// Executor failed due to user code.
final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
final String failedExecutorId = executorFailedMsg.getExecutorId();
final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
throw new RuntimeException(exception);
case RunTimePassMessage:
((BatchScheduler) scheduler).onRunTimePassMessage(
message.getRunTimePassMessageMsg().getTaskId(),
message.getRunTimePassMessageMsg().getEntryList());
break;
case MetricMessageReceived:
final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
metricList.forEach(metric -> metricMessageHandler.onMetricMessageReceived(
metric.getMetricType(), metric.getMetricId(),
metric.getMetricField(), metric.getMetricValue().toByteArray()));
break;
case ExecutorDataCollected:
final String serializedData = message.getDataCollected().getData();
clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
.setType(ControlMessage.DriverToClientMessageType.DataCollected)
.setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(serializedData).build())
.build());
break;
case MetricFlushed:
metricCountDownLatch.countDown();
break;
default:
throw new IllegalMessageException(
new Exception("This message should not be received by Master :" + message.getType()));
}
}
/**
* Schedules a periodic DAG logging thread.
* TODO #20: RESTful APIs to Access Job State and Metric.
*
* @return the scheduled executor service.
*/
private ScheduledExecutorService scheduleDagLogging() {
final ScheduledExecutorService dagLoggingExecutor = Executors.newSingleThreadScheduledExecutor();
dagLoggingExecutor.scheduleAtFixedRate(new Runnable() {
public void run() {
flushMetrics();
planStateManager.storeJSON("periodic");
}
}, DAG_LOGGING_PERIOD, DAG_LOGGING_PERIOD, TimeUnit.MILLISECONDS);
return dagLoggingExecutor;
}
}