blob: 4ef4c98f75ff03f7f6f202403090cc5bb2d8e0d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.executor;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nemo.common.coder.BytesDecoderFactory;
import org.apache.nemo.common.coder.BytesEncoderFactory;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.exception.IllegalMessageException;
import org.apache.nemo.common.exception.UnknownFailureCauseException;
import org.apache.nemo.common.ir.edge.executionproperty.CompressionProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DecompressionProperty;
import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageContext;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageListener;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
import org.apache.nemo.runtime.executor.data.SerializerManager;
import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
import org.apache.nemo.runtime.executor.datatransfer.NemoEventDecoderFactory;
import org.apache.nemo.runtime.executor.datatransfer.NemoEventEncoderFactory;
import org.apache.nemo.runtime.executor.task.TaskExecutor;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Executor.
*/
public final class Executor {
private static final Logger LOG = LoggerFactory.getLogger(Executor.class.getName());
private final String executorId;
/**
* To be used for a thread pool to execute tasks.
*/
private final ExecutorService executorService;
/**
* In charge of this executor's intermediate data transfer.
*/
private final SerializerManager serializerManager;
/**
* Factory of InputReader/OutputWriter for executing tasks groups.
*/
private final IntermediateDataIOFactory intermediateDataIOFactory;
private final BroadcastManagerWorker broadcastManagerWorker;
private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
private final MetricMessageSender metricMessageSender;
@Inject
private Executor(@Parameter(JobConf.ExecutorId.class) final String executorId,
final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
final MessageEnvironment messageEnvironment,
final SerializerManager serializerManager,
final IntermediateDataIOFactory intermediateDataIOFactory,
final BroadcastManagerWorker broadcastManagerWorker,
final MetricManagerWorker metricMessageSender) {
this.executorId = executorId;
this.executorService = Executors.newCachedThreadPool(new BasicThreadFactory.Builder()
.namingPattern("TaskExecutor thread-%d")
.build());
this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
this.serializerManager = serializerManager;
this.intermediateDataIOFactory = intermediateDataIOFactory;
this.broadcastManagerWorker = broadcastManagerWorker;
this.metricMessageSender = metricMessageSender;
messageEnvironment.setupListener(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID, new ExecutorMessageReceiver());
}
public String getExecutorId() {
return executorId;
}
private synchronized void onTaskReceived(final Task task) {
LOG.debug("Executor [{}] received Task [{}] to execute.",
new Object[]{executorId, task.getTaskId()});
executorService.execute(() -> launchTask(task));
}
/**
* Launches the Task, and keeps track of the execution state with taskStateManager.
*
* @param task to launch.
*/
private void launchTask(final Task task) {
LOG.info("Launch task: {}", task.getTaskId());
try {
final long deserializationStartTime = System.currentTimeMillis();
final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
SerializationUtils.deserialize(task.getSerializedIRDag());
metricMessageSender.send("TaskMetric", task.getTaskId(), "taskDeserializationTime",
SerializationUtils.serialize(System.currentTimeMillis() - deserializationStartTime));
final TaskStateManager taskStateManager =
new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
irDag.getVertices().forEach(v -> {
irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
});
new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
metricMessageSender, persistentConnectionToMasterMap).execute();
} catch (final Exception e) {
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
ControlMessage.Message.newBuilder()
.setId(RuntimeIdManager.generateMessageId())
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
.setType(ControlMessage.MessageType.ExecutorFailed)
.setExecutorFailedMsg(ControlMessage.ExecutorFailedMsg.newBuilder()
.setExecutorId(executorId)
.setException(ByteString.copyFrom(SerializationUtils.serialize(e)))
.build())
.build());
throw e;
}
}
/**
* This wraps the encoder with NemoEventEncoder.
* If the encoder is BytesEncoderFactory, we do not wrap the encoder.
* TODO #276: Add NoCoder property value in Encoder/DecoderProperty
*
* @param encoderFactory encoder factory
* @return wrapped encoder
*/
private EncoderFactory getEncoderFactory(final EncoderFactory encoderFactory) {
if (encoderFactory instanceof BytesEncoderFactory) {
return encoderFactory;
} else {
return new NemoEventEncoderFactory(encoderFactory);
}
}
/**
* This wraps the encoder with NemoEventDecoder.
* If the decoder is BytesDecoderFactory, we do not wrap the decoder.
* TODO #276: Add NoCoder property value in Encoder/DecoderProperty
*
* @param decoderFactory decoder factory
* @return wrapped decoder
*/
private DecoderFactory getDecoderFactory(final DecoderFactory decoderFactory) {
if (decoderFactory instanceof BytesDecoderFactory) {
return decoderFactory;
} else {
return new NemoEventDecoderFactory(decoderFactory);
}
}
public void terminate() {
try {
metricMessageSender.close();
} catch (final UnknownFailureCauseException e) {
throw new UnknownFailureCauseException(
new Exception("Closing MetricManagerWorker failed in executor " + executorId));
}
}
/**
* MessageListener for Executor.
*/
private final class ExecutorMessageReceiver implements MessageListener<ControlMessage.Message> {
@Override
public void onMessage(final ControlMessage.Message message) {
switch (message.getType()) {
case ScheduleTask:
final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
final Task task =
SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
onTaskReceived(task);
break;
case RequestMetricFlush:
metricMessageSender.flush();
break;
default:
throw new IllegalMessageException(
new Exception("This message should not be received by an executor :" + message.getType()));
}
}
@Override
public void onMessageWithContext(final ControlMessage.Message message, final MessageContext messageContext) {
switch (message.getType()) {
default:
throw new IllegalMessageException(
new Exception("This message should not be requested to an executor :" + message.getType()));
}
}
}
}