blob: 1b23b6a5480754bdf5bbe145fd9eef8aacdb61e4 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap.tezplugins;
import org.apache.hadoop.hive.conf.Validator.RangeValidator;
import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService.NodeInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.BooleanArray;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray;
import java.io.IOException;
import java.net.BindException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryIdentifierProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.RegisterDagResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFragmentRequestProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UpdateFragmentResponseProto;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class);
private static final Joiner JOINER = Joiner.on("");
private static final Joiner PATH_JOINER = Joiner.on("/");
private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap;
// Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG.
// When DAG specific cleanup happens, it'll be better to link this to a DAG though.
private final EntityTracker entityTracker = new EntityTracker();
private final SourceStateTracker sourceStateTracker;
private final Set<LlapNodeId> nodesForQuery = new HashSet<>();
private LlapTaskSchedulerService scheduler;
private LlapProtocolClientProxy communicator;
private long deleteDelayOnDagComplete;
private final LlapTaskUmbilicalProtocol umbilical;
private final String user;
private String amHost;
private String timelineServerUri;
// These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats.
// Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed.
private final ConcurrentMap<LlapNodeId, Long> knownNodeMap = new ConcurrentHashMap<>();
private final ConcurrentMap<LlapNodeId, PingingNodeInfo> pingedNodeMap = new ConcurrentHashMap<>();
private final LlapRegistryService serviceRegistry;
private volatile QueryIdentifierProto currentQueryIdentifierProto;
private volatile String currentHiveQueryId;
// TODO: this is an ugly hack because Tez plugin isolation does not make sense for LLAP plugins.
// We are going to register a thread-local here for now, so that the scheduler, initializing
// in the same thread after the communicator, will pick up. Or the other way around.
// This only lives for the duration of the service init.
static final Object pluginInitLock = new Object();
static LlapTaskCommunicator instance = null;
public LlapTaskCommunicator(
TaskCommunicatorContext taskCommunicatorContext) {
super(taskCommunicatorContext);
// Not closing this at the moment at shutdown, since this could be a shared instance.
serviceRegistry = LlapRegistryService.getClient(conf);
umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical());
// TODO Avoid reading this from the environment
user = System.getenv(ApplicationConstants.Environment.USER.name());
credentialMap = new ConcurrentHashMap<>();
sourceStateTracker = new SourceStateTracker(getContext(), this);
synchronized (pluginInitLock) {
LlapTaskSchedulerService peer = LlapTaskSchedulerService.instance;
if (peer != null) {
// We are the last to initialize.
peer.setTaskCommunicator(this);
this.setScheduler(peer);
LlapTaskSchedulerService.instance = null;
} else {
instance = this;
}
}
}
@SuppressWarnings("unchecked")
private Token<LlapTokenIdentifier> getLlapToken() {
Token<LlapTokenIdentifier> token = null;
Credentials credentials = getContext().getAMCredentials();
if (credentials != null) {
token = (Token<LlapTokenIdentifier>) credentials.getToken(LlapTokenIdentifier.KIND_NAME);
}
// supposed to have a token only if security is enabled (any mismatch here implies a configuration problem)
Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled());
if (token != null) {
LOG.info("Task communicator with a token {}", token);
}
return token;
}
void setScheduler(LlapTaskSchedulerService peer) {
this.scheduler = peer;
}
private static final String LLAP_TOKEN_NAME = LlapTokenIdentifier.KIND_NAME.toString();
private void processSendError(Throwable t) {
Throwable cause = t;
while (cause != null) {
if (isInvalidTokenError(cause)) {
handleInvalidToken();
return;
}
cause = cause.getCause();
}
}
private boolean isInvalidTokenError(Throwable cause) {
LOG.debug("Checking for invalid token error, cause: {}, cause.getCause(): {}", cause, cause.getCause());
/*
* The lastest message discovered (while doing HIVE-26569) for a RemoteException is:
* "Current (LLAP_TOKEN; LLAP_TOKEN owner=hive/***, renewer=hive, realUser=, issueDate=1670317803579, maxDate=1671527403579,
* sequenceNumber=297, masterKeyId=296, cluster ***, app ID , signing false) can't be found in cache"
*/
return ((cause instanceof InvalidToken && cause.getMessage() != null) || (cause instanceof RemoteException
&& cause.getCause() == null && cause.getMessage() != null && cause.getMessage().contains(LLAP_TOKEN_NAME)
&& (cause.getMessage().contains("InvalidToken") || cause.getMessage().contains("can't be found in cache"))));
}
@Override
public void initialize() throws Exception {
super.initialize();
Configuration conf = getConf();
int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
this.communicator = createLlapProtocolClientProxy(numThreads, conf);
this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
LOG.info("Running LlapTaskCommunicator with "
+ "fileCleanupDelay=" + deleteDelayOnDagComplete
+ ", numCommunicatorThreads=" + numThreads);
this.communicator.init(conf);
String scheme = WebAppUtils.getHttpSchemePrefix(conf);
String ahsUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
this.timelineServerUri = WebAppUtils.getURLWithScheme(scheme, ahsUrl);
}
@Override
public void start() {
super.start();
this.communicator.start();
}
@Override
public void shutdown() {
super.shutdown();
if (this.communicator != null) {
this.communicator.stop();
}
}
@Override
protected void startRpcServer() {
Configuration conf = getConf();
try {
JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
int numHandlers =
HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_COMMUNICATOR_LISTENER_THREAD_COUNT);
String[] portRange =
conf.get(HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT.varname)
.split("-");
boolean isHadoopSecurityAuthorizationEnabled = conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false);
boolean portFound = false;
IOException ioe = null;
int minPort = Integer.parseInt(portRange[0]);
if (portRange.length == 1) {
// Single port specified, not range.
startServerInternal(conf, minPort, numHandlers, jobTokenSecretManager,
isHadoopSecurityAuthorizationEnabled);
portFound = true;
LOG.info("Successfully bound to port {}", minPort);
} else {
int maxPort = Integer.parseInt(portRange[1]);
// Validate the range specified is valid. i.e the ports lie between
// 1024 and 65535.
validatePortRange(portRange[0], portRange[1]);
for (int i = minPort; i < maxPort; i++) {
try {
startServerInternal(conf, i, numHandlers, jobTokenSecretManager,
isHadoopSecurityAuthorizationEnabled);
portFound = true;
LOG.info("Successfully bound to port {}", i);
break;
} catch (BindException be) {
// Ignore and move ahead, in search of a free port.
LOG.warn("Unable to bind to port {}", i, be);
ioe = be;
}
}
}
if(!portFound) {
throw ioe;
}
this.address = NetUtils.getConnectAddress(server);
this.amHost = LlapUtil.getAmHostNameFromAddress(address, conf);
LOG.info("Started LlapUmbilical: " + umbilical.getClass().getName() + " at address: "
+ address + " with numHandlers=" + numHandlers + " using the host name " + amHost);
} catch (IOException e) {
throw new TezUncheckedException(e);
}
}
private void validatePortRange(String minPort, String maxPort)
throws IOException {
RangeValidator rangeValidator = new RangeValidator(1024L, 65535L);
String valMin = rangeValidator.validate(minPort);
String valMax = rangeValidator.validate(maxPort);
if (valMin == null & valMax == null) {
throw new IOException("Invalid minimum range value: " + minPort + " and "
+ "maximum range value: " + maxPort + " for "
+ HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT.varname
+ ". The value should be between 1024 and 65535.");
}
if (valMin != null) {
throw new IOException("Invalid minimum range value :" + minPort + " for "
+ HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT.varname
+ ". The value should be between 1024 and 65535.");
}
if (valMax != null) {
throw new IOException("Invalid maximum range value:" + maxPort + " for "
+ HiveConf.ConfVars.LLAP_TASK_UMBILICAL_SERVER_PORT.varname
+ ". The value should be between 1024 and 65535.");
}
}
private void startServerInternal(Configuration conf, int umbilicalPort,
int numHandlers, JobTokenSecretManager jobTokenSecretManager,
boolean isHadoopSecurityAuthorizationEnabled) throws IOException {
server = new RPC.Builder(conf).setProtocol(LlapTaskUmbilicalProtocol.class)
.setBindAddress("0.0.0.0").setPort(umbilicalPort).setInstance(umbilical)
.setNumHandlers(numHandlers).setSecretManager(jobTokenSecretManager)
.build();
if (isHadoopSecurityAuthorizationEnabled) {
server.refreshServiceAcl(conf, new LlapUmbilicalPolicyProvider());
}
server.start();
}
@VisibleForTesting
protected LlapProtocolClientProxy createLlapProtocolClientProxy(int numThreads, Configuration conf) {
return new LlapProtocolClientProxy(numThreads, conf, getLlapToken());
}
@Override
public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
super.registerRunningContainer(containerId, hostname, port);
entityTracker.registerContainer(containerId, hostname, port);
}
@Override
public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
super.registerContainerEnd(containerId, endReason, diagnostics);
if (endReason == ContainerEndReason.INTERNAL_PREEMPTION) {
LOG.info("Processing containerEnd for container {} caused by internal preemption", containerId);
TezTaskAttemptID taskAttemptId = entityTracker.getTaskAttemptIdForContainer(containerId);
if (taskAttemptId != null) {
sendTaskTerminated(taskAttemptId, true);
}
}
entityTracker.unregisterContainer(containerId);
}
public interface OperationCallback<ResultType, CtxType> {
void setDone(CtxType ctx, ResultType result);
void setError(CtxType ctx, Throwable t);
}
/**
* @param node
* @param callback
* @return if it was possible to attempt the registration. Sometimes it's
* not possible because is a dag is not running
*/
public boolean registerDag(NodeInfo node, final OperationCallback<QueryIdentifierProto, Void> callback) {
RegisterDagRequestProto.Builder builder = RegisterDagRequestProto.newBuilder();
if (currentQueryIdentifierProto == null) {
return false;
}
try {
RegisterDagRequestProto request = builder
.setQueryIdentifier(currentQueryIdentifierProto)
.setUser(user)
.setCredentialsBinary(
getCredentials(getContext()
.getCurrentDagInfo().getCredentials())).build();
communicator.registerDag(request, node.getHost(), node.getRpcPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<RegisterDagResponseProto>() {
@Override
public void setResponse(RegisterDagResponseProto response) {
callback.setDone(null, currentQueryIdentifierProto);
}
@Override
public void indicateError(Throwable t) {
LOG.info("Error registering dag with"
+ " appId=" + currentQueryIdentifierProto.getApplicationIdString()
+ " dagId=" + currentQueryIdentifierProto.getDagIndex()
+ " to node " + node.getHost());
processSendError(t);
callback.setError(null, t);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
return true;
}
public <T> void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo assignedNode,
boolean newState, final OperationCallback<Boolean, T> callback, final T ctx) {
LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(attemptId);
if (nodeId == null) {
if (assignedNode != null) {
nodeId = LlapNodeId.getInstance(assignedNode.getHost(), assignedNode.getRpcPort());
}
LOG.warn("Untracked node for " + attemptId + "; NodeInfo points to " + nodeId);
if (nodeId == null) {
callback.setDone(ctx, false); // TODO: danger of stack overflow... needs a retry limit?
return;
}
}
UpdateFragmentRequestProto request = UpdateFragmentRequestProto.newBuilder()
.setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString())
.setQueryIdentifier(constructQueryIdentifierProto(
attemptId.getDAGID().getId())).build();
communicator.sendUpdateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<UpdateFragmentResponseProto>() {
@Override
public void setResponse(UpdateFragmentResponseProto response) {
callback.setDone(ctx, response.getResult());
}
@Override
public void indicateError(Throwable t) {
LOG.warn("Failed to send update fragment request for {}", attemptId.toString());
processSendError(t);
callback.setError(ctx, t);
}
});
}
@Override
public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec,
Map<String, LocalResource> additionalResources,
Credentials credentials,
boolean credentialsChanged,
int priority) {
super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials,
credentialsChanged, priority);
int dagId = taskSpec.getDAGID().getId();
if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) {
String hiveQueryId = extractQueryIdFromContext();
try {
hiveQueryId = (hiveQueryId == null) ? extractQueryId(taskSpec) : hiveQueryId;
} catch (IOException e) {
throw new RuntimeException("Failed to extract query id from task spec: " + taskSpec, e);
}
Preconditions.checkNotNull(hiveQueryId, "Unexpected null query id");
resetCurrentDag(dagId, hiveQueryId);
}
ContainerInfo containerInfo = getContainerInfo(containerId);
String host;
int port;
if (containerInfo != null) {
synchronized (containerInfo) {
host = containerInfo.host;
port = containerInfo.port;
}
} else {
// TODO Handle this properly
throw new RuntimeException("ContainerInfo not found for container: " + containerId +
", while trying to launch task: " + taskSpec.getTaskAttemptID());
}
LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
registerKnownNode(nodeId);
entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port);
nodesForQuery.add(nodeId);
sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs());
FragmentRuntimeInfo fragmentRuntimeInfo;
try {
fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo(
taskSpec.getVertexName(),
taskSpec.getTaskAttemptID().getTaskID().getId(), priority);
} catch (Exception e) {
LOG.error(
"Error while trying to get runtimeFragmentInfo for fragmentId={}, containerId={}, currentQI={}, currentQueryId={}",
taskSpec.getTaskAttemptID(), containerId, currentQueryIdentifierProto,
currentHiveQueryId, e);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
SubmitWorkRequestProto requestProto;
try {
requestProto = constructSubmitWorkRequest(containerId, taskSpec, fragmentRuntimeInfo, currentHiveQueryId);
} catch (IOException e) {
throw new RuntimeException("Failed to construct request", e);
}
// Have to register this up front right now. Otherwise, it's possible for the task to start
// sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
getContext().taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
communicator.sendSubmitWork(requestProto, host, port,
new LlapProtocolClientProxy.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@Override
public void setResponse(SubmitWorkResponseProto response) {
if (response.hasSubmissionState()) {
LlapDaemonProtocolProtos.SubmissionStateProto ss = response.getSubmissionState();
if (ss.equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
LOG.info(
"Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId + ", Service Busy");
getContext().taskKilled(taskSpec.getTaskAttemptID(),
TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
return;
}
} else {
// TODO: Provide support for reporting errors
// This should never happen as server always returns a valid status on success
throw new RuntimeException("SubmissionState in response is expected!");
}
if (response.hasUniqueNodeId()) {
entityTracker.registerTaskSubmittedToNode(
taskSpec.getTaskAttemptID(), response.getUniqueNodeId());
}
LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
scheduler.notifyStarted(taskSpec.getTaskAttemptID());
}
@Override
public void indicateError(Throwable t) {
Throwable originalError = t;
if (t instanceof ServiceException) {
ServiceException se = (ServiceException) t;
t = se.getCause();
}
if (t instanceof RemoteException) {
// All others from the remote service cause the task to FAIL.
LOG.info(
"Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId, t);
processSendError(originalError);
getContext()
.taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER,
t.toString());
} else {
// Exception from the RPC layer - communication failure, consider as KILLED / service down.
if (t instanceof IOException) {
LOG.info(
"Unable to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId + ", Communication Error");
processSendError(originalError);
getContext().taskKilled(taskSpec.getTaskAttemptID(),
TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
} else {
// Anything else is a FAIL.
LOG.info(
"Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " +
containerId, t);
processSendError(originalError);
getContext()
.taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER,
t.getMessage());
}
}
}
});
}
/**
* This is a best effort action in case of an invalid LLAP_TOKEN. The communicator (LlapProtocolClientProxy) periodically
* refreshes tokens, but there is a relatively small time window (compared to the whole token maxLifetime),
* when the token is cancelled and it's not yet fetched from daemons. By the time we detect this problem, the current
* task attempt is already failed, but usually there are at least 3 task attempts before failing the task and dag, therefore
* fetching an LLAP_TOKEN synchronously here definitely solves the invalid token problem (as the next task attempt will use it).
*/
private void handleInvalidToken() {
this.communicator.refreshToken();
}
@Override
public void unregisterRunningTaskAttempt(final TezTaskAttemptID taskAttemptId,
TaskAttemptEndReason endReason,
String diagnostics) {
super.unregisterRunningTaskAttempt(taskAttemptId, endReason, diagnostics);
if (endReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) {
LOG.info("Processing taskEnd for task {} caused by internal preemption", taskAttemptId);
sendTaskTerminated(taskAttemptId, false);
}
entityTracker.unregisterTaskAttempt(taskAttemptId);
// This will also be invoked for tasks which have been KILLED / rejected by the daemon.
// Informing the daemon becomes necessary once the LlapScheduler supports preemption
// and/or starts attempting to kill tasks which may be running on a node.
}
private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
boolean invokedByContainerEnd) {
LOG.info(
"Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}",
taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd");
LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId);
// NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself
if (nodeId != null) {
TerminateFragmentRequestProto request =
TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(
constructQueryIdentifierProto(
taskAttemptId.getDAGID().getId()))
.setFragmentIdentifierString(taskAttemptId.toString()).build();
communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() {
@Override
public void setResponse(TerminateFragmentResponseProto response) {
}
@Override
public void indicateError(Throwable t) {
LOG.warn("Failed to send terminate fragment request for {}",
taskAttemptId.toString());
processSendError(t);
}
});
} else {
LOG.info(
"Not sending terminate request for fragment {} since it's node is not known. Already unregistered",
taskAttemptId.toString());
}
}
@Override
public void dagComplete(final int dagIdentifier) {
QueryIdentifierProto queryIdentifierProto = constructQueryIdentifierProto(dagIdentifier);
QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder()
.setQueryIdentifier(queryIdentifierProto).setDeleteDelay(deleteDelayOnDagComplete).build();
for (final LlapNodeId llapNodeId : nodesForQuery) {
LOG.info("Sending dagComplete message for {}, to {}", dagIdentifier, llapNodeId);
communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(),
new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>() {
@Override
public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) {
}
@Override
public void indicateError(Throwable t) {
LOG.warn("Failed to indicate dag complete dagId={} to node {}",
dagIdentifier, llapNodeId);
processSendError(t);
}
});
}
nodesForQuery.clear();
// TODO Ideally move some of the other cleanup code from resetCurrentDag over here
}
@Override
public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
// Delegate updates over to the source state tracker.
sourceStateTracker
.sourceStateUpdated(vertexStateUpdate.getVertexName(), vertexStateUpdate.getVertexState());
}
// TODO HIVE-15163. Handle cases where nodes go down and come back on the same port. Historic information
// can prevent updates from being sent out to the new node.
public void sendStateUpdate(final LlapNodeId nodeId,
final SourceStateUpdatedRequestProto request) {
communicator.sendSourceStateUpdate(request, nodeId,
new LlapProtocolClientProxy.ExecuteRequestCallback<SourceStateUpdatedResponseProto>() {
@Override
public void setResponse(SourceStateUpdatedResponseProto response) {
}
@Override
public void indicateError(Throwable t) {
// Re-attempts are left upto the RPC layer. If there's a failure reported after this,
// mark all attempts running on this node as KILLED. The node itself cannot be killed from
// here, that's only possible via the scheduler.
// The assumption is that if there's a failure to communicate with the node - it will
// eventually timeout - and no more tasks will be allocated on it.
LOG.error("Failed to send state update to node: {}, Killing all attempts running on "
+ "node. Attempted StateUpdate={}", nodeId, request, t);
processSendError(t);
BiMap<ContainerId, TezTaskAttemptID> biMap =
entityTracker.getContainerAttemptMapForNode(nodeId);
if (biMap != null) {
synchronized (biMap) {
for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
LOG.info(
"Sending a kill for attempt {}, due to a communication failure while sending a finishable state update",
entry.getValue());
getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED,
"Failed to send finishable state update to node " + nodeId);
}
}
}
}
});
}
@Override
public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
return constructLogUrl(attemptID, containerNodeId, false);
}
@Override
public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
return constructLogUrl(attemptID, containerNodeId, true);
}
private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId containerNodeId, final boolean isDone) {
if (timelineServerUri == null || containerNodeId == null) {
return null;
}
Set<LlapServiceInstance> instanceSet;
try {
instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost());
} catch (IOException e) {
// Not failing the job due to a failure constructing the log url
LOG.warn(
"Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}",
containerNodeId, e.getMessage());
return null;
}
// Once NodeId includes fragmentId - this becomes a lot more reliable.
if (instanceSet != null) {
LlapServiceInstance matchedInstance = null;
for (LlapServiceInstance instance : instanceSet) {
if (instance.getRpcPort() == containerNodeId.getPort()) {
matchedInstance = instance;
break;
}
}
if (matchedInstance != null) {
String containerIdString = matchedInstance.getProperties()
.get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
String nmNodeAddress = matchedInstance.getProperties().get(ConfVars.LLAP_DAEMON_NM_ADDRESS.varname);
if (!StringUtils.isBlank(containerIdString) && !StringUtils.isBlank(nmNodeAddress)) {
return constructLlapLogUrl(attemptID, containerIdString, isDone, nmNodeAddress);
}
}
}
return null;
}
private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString,
final boolean isDone, final String nmAddress) {
String dagId = attemptID.getDAGID().toString();
String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""),
"?nm.id=", nmAddress);
String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers",
containerIdString, "logs", filename);
return url;
}
private static class PingingNodeInfo {
final AtomicLong logTimestamp;
final AtomicInteger pingCount;
PingingNodeInfo(long currentTs) {
logTimestamp = new AtomicLong(currentTs);
pingCount = new AtomicInteger(1);
}
}
public void registerKnownNode(LlapNodeId nodeId) {
Long old = knownNodeMap.putIfAbsent(nodeId,
TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS));
if (old == null) {
LOG.info("Added new known node: {}", nodeId);
}
}
public void registerPingingNode(LlapNodeId nodeId, String uniqueId) {
long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
PingingNodeInfo ni = new PingingNodeInfo(currentTs);
PingingNodeInfo old = pingedNodeMap.put(nodeId, ni);
if (old == null) {
LOG.info("Added new pinging node: [{}] with uniqueId: {}", nodeId, uniqueId);
} else {
old.pingCount.incrementAndGet();
}
// The node should always be known by this point. Log occasionally if it is not known.
if (!knownNodeMap.containsKey(nodeId)) {
if (old == null) {
// First time this is seen. Log it.
LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, ni.pingCount.get());
} else {
// Pinged before. Log only occasionally.
if (currentTs > old.logTimestamp.get() + 5000l) { // 5 seconds elapsed. Log again.
LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, old.pingCount.get());
old.logTimestamp.set(currentTs);
}
}
}
}
private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
void nodePinged(String hostname, String uniqueId, int port,
TezAttemptArray tasks, BooleanArray guaranteed) {
// TODO: do we ever need the port? we could just do away with nodeId altogether.
LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
registerPingingNode(nodeId, uniqueId);
BiMap<ContainerId, TezTaskAttemptID> biMap =
entityTracker.getContainerAttemptMapForNode(nodeId);
if (biMap != null) {
Set<TezTaskAttemptID> error = new HashSet<>();
synchronized (biMap) {
for (int i = 0; i < tasks.get().length; ++i) {
boolean isGuaranteed = false;
if (guaranteed != null) {
isGuaranteed = ((BooleanWritable) guaranteed.get()[i]).get();
}
TezTaskAttemptID attemptID = (TezTaskAttemptID) tasks.get()[i];
// Check if the taskAttempt is present in AM view
if (biMap.containsValue(attemptID)) {
String taskNodeId = entityTracker.getUniqueNodeId(attemptID);
if (taskNodeId != null && taskNodeId.equals(uniqueId)) {
getContext().taskAlive(attemptID);
scheduler.taskInfoUpdated(attemptID, isGuaranteed);
getContext().containerAlive(biMap.inverse().get(attemptID));
} else {
error.add(attemptID);
}
}
}
}
if (!error.isEmpty()) {
LOG.info("The tasks we expected to be on the node are not there: " + error);
for (TezTaskAttemptID attempt: error) {
LOG.info("Sending a kill for attempt {}, due to a ping from "
+ "node with same host and same port but " +
"registered with different unique ID", attempt);
getContext().taskKilled(attempt, TaskAttemptEndReason.NODE_FAILED,
"Node with same host and port but with new unique ID pinged");
}
}
} else {
long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
if (currentTs > nodeNotFoundLogTime.get() + 5000l) {
LOG.warn("Received ping from node without any registered tasks or containers: " + hostname +
":" + port +
". Could be caused by pre-emption by the AM," +
" or a mismatched hostname. Enable debug logging for mismatched host names");
nodeNotFoundLogTime.set(currentTs);
}
}
}
private void resetCurrentDag(int newDagId, String hiveQueryId) {
// Working on the assumption that a single DAG runs at a time per AM.
currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId);
currentHiveQueryId = hiveQueryId;
sourceStateTracker.resetState(currentQueryIdentifierProto);
nodesForQuery.clear();
LOG.info("CurrentDagId set to: " + newDagId + ", name=" +
getContext().getCurrentDagInfo().getName() + ", queryId=" + hiveQueryId);
// TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which
// is likely already happening.
}
// Needed for GenericUDTFGetSplits, where TaskSpecs are generated
private String extractQueryId(TaskSpec taskSpec) throws IOException {
UserPayload processorPayload = taskSpec.getProcessorDescriptor().getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(processorPayload);
return HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID);
}
private String extractQueryIdFromContext() {
//TODO: Remove following instance of check, When TEZ-2672 exposes getConf from DagInfo
DagInfo dagInfo = getContext().getCurrentDagInfo();
if (dagInfo instanceof DAG) {
return ((DAG)dagInfo).getConf().get(ConfVars.HIVEQUERYID.varname);
}
return null;
}
private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId,
TaskSpec taskSpec,
FragmentRuntimeInfo fragmentRuntimeInfo,
String hiveQueryId) throws
IOException {
SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
builder.setContainerIdString(containerId.toString());
builder.setAmHost(getAmHostString());
builder.setAmPort(getAddress().getPort());
Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
taskSpec.getDAGID().getId());
builder.setCredentialsBinary(
getCredentials(getContext().getCurrentDagInfo().getCredentials()));
builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.constructSignableVertexSpec(
taskSpec, currentQueryIdentifierProto, getTokenIdentifier(), user, hiveQueryId)).build());
// Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments
builder.setFragmentRuntimeInfo(fragmentRuntimeInfo);
builder.setIsGuaranteed(ContainerFactory.isContainerInitializedAsGuaranteed(containerId));
return builder.build();
}
private ByteString getCredentials(Credentials credentials) throws IOException {
ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
if (credentialsBinary == null) {
credentialsBinary = LlapTezUtils.serializeCredentials(credentials);
credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
} else {
credentialsBinary = credentialsBinary.duplicate();
}
return ByteString.copyFrom(credentialsBinary);
}
protected class LlapTaskUmbilicalProtocolImpl implements LlapTaskUmbilicalProtocol {
private final TezTaskUmbilicalProtocol tezUmbilical;
public LlapTaskUmbilicalProtocolImpl(TezTaskUmbilicalProtocol tezUmbilical) {
this.tezUmbilical = tezUmbilical;
}
@Override
public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
return tezUmbilical.canCommit(taskid);
}
@Override
public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
TezException {
return tezUmbilical.heartbeat(request);
}
@Override
public void nodeHeartbeat(Text hostname, Text uniqueId, int port,
TezAttemptArray aw, BooleanArray guaranteed) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]");
}
nodePinged(hostname.toString(), uniqueId.toString(), port, aw, guaranteed);
}
@Override
public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
// TODO Unregister the task for state updates, which could in turn unregister the node.
getContext().taskKilled(taskAttemptId,
TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");
entityTracker.unregisterTaskAttempt(taskAttemptId);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
int clientMethodsHash) throws IOException {
return ProtocolSignature.getProtocolSignature(this, protocol,
clientVersion, clientMethodsHash);
}
}
/**
* Track the association between known containers and taskAttempts, along with the nodes they are assigned to.
*/
@VisibleForTesting
static final class EntityTracker {
// TODO: need the description of how these maps are kept consistent.
@VisibleForTesting
final ConcurrentMap<TezTaskAttemptID, LlapNodeId> attemptToNodeMap = new ConcurrentHashMap<>();
@VisibleForTesting
final ConcurrentMap<ContainerId, LlapNodeId> containerToNodeMap = new ConcurrentHashMap<>();
@VisibleForTesting
final ConcurrentMap<LlapNodeId, BiMap<ContainerId, TezTaskAttemptID>> nodeMap = new ConcurrentHashMap<>();
// TODO: we currently put task info everywhere before we submit it and know the "real" node id.
// Therefore, we are going to store this separately. Ideally, we should roll uniqueness
// into LlapNodeId. We get node info from registry; that should (or can) include it.
private final ConcurrentMap<TezTaskAttemptID, String> uniqueNodeMap = new ConcurrentHashMap<>();
void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) {
if (LOG.isDebugEnabled()) {
LOG.debug("Registering " + containerId + ", " + taskAttemptId + " for node: " + host + ":" + port);
}
LlapNodeId llapNodeId = LlapNodeId.getInstance(host, port);
attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId);
registerContainer(containerId, host, port);
// nodeMap registration.
BiMap<ContainerId, TezTaskAttemptID> tmpMap = HashBiMap.create();
BiMap<ContainerId, TezTaskAttemptID> old = nodeMap.putIfAbsent(llapNodeId, tmpMap);
BiMap<ContainerId, TezTaskAttemptID> usedInstance;
usedInstance = old == null ? tmpMap : old;
synchronized(usedInstance) {
usedInstance.put(containerId, taskAttemptId);
}
// Make sure to put the instance back again, in case it was removed as part of a
// containerEnd/taskEnd invocation.
nodeMap.putIfAbsent(llapNodeId, usedInstance);
}
public String getUniqueNodeId(TezTaskAttemptID attemptId) {
return uniqueNodeMap.get(attemptId);
}
public void registerTaskSubmittedToNode(
TezTaskAttemptID taskAttemptID, String uniqueNodeId) {
synchronized (attemptToNodeMap) {
if (attemptToNodeMap.containsKey(taskAttemptID)) {
// Register only if the attempt is known. In case an unregister call
// came in before the register call.
String prev = uniqueNodeMap.putIfAbsent(taskAttemptID, uniqueNodeId);
if (prev != null) {
LOG.warn("Replaced the unique node mapping for task from " + prev +
" to " + uniqueNodeId);
}
}
}
}
void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
uniqueNodeMap.remove(attemptId);
LlapNodeId llapNodeId;
synchronized (attemptToNodeMap) {
llapNodeId = attemptToNodeMap.remove(attemptId);
if (llapNodeId == null) {
// Possible since either container / task can be unregistered.
return;
}
}
BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
ContainerId matched = null;
if (bMap != null) {
synchronized(bMap) {
matched = bMap.inverse().remove(attemptId);
}
if (bMap.isEmpty()) {
nodeMap.remove(llapNodeId);
}
}
// Remove the container mapping
if (matched != null) {
containerToNodeMap.remove(matched);
}
}
void registerContainer(ContainerId containerId, String hostname, int port) {
if (LOG.isDebugEnabled()) {
LOG.debug("Registering " + containerId + " for node: " + hostname + ":" + port);
}
containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port));
// nodeMap registration is not required, since there's no taskId association.
}
LlapNodeId getNodeIdForContainer(ContainerId containerId) {
return containerToNodeMap.get(containerId);
}
LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) {
return attemptToNodeMap.get(taskAttemptId);
}
ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) {
LlapNodeId llapNodeId = getNodeIdForTaskAttempt(taskAttemptId);
if (llapNodeId != null) {
BiMap<TezTaskAttemptID, ContainerId> bMap = nodeMap.get(llapNodeId).inverse();
if (bMap != null) {
synchronized (bMap) {
return bMap.get(taskAttemptId);
}
} else {
return null;
}
} else {
return null;
}
}
TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) {
LlapNodeId llapNodeId = getNodeIdForContainer(containerId);
if (llapNodeId != null) {
BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
if (bMap != null) {
synchronized (bMap) {
return bMap.get(containerId);
}
} else {
return null;
}
} else {
return null;
}
}
void unregisterContainer(ContainerId containerId) {
LlapNodeId llapNodeId = containerToNodeMap.remove(containerId);
if (llapNodeId == null) {
// Possible since either container / task can be unregistered.
return;
}
BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId);
TezTaskAttemptID matched = null;
if (bMap != null) {
synchronized(bMap) {
matched = bMap.remove(containerId);
}
if (bMap.isEmpty()) {
nodeMap.remove(llapNodeId);
}
}
// Remove the container mapping
if (matched != null) {
attemptToNodeMap.remove(matched);
uniqueNodeMap.remove(matched);
}
}
/**
* Return a {@link BiMap} containing container->taskAttemptId mapping for the host specified.
* </p>
* <p/>
* This method return the internal structure used by the EntityTracker. Users must synchronize
* on the structure to ensure correct usage.
*
* @param llapNodeId
* @return
*/
BiMap<ContainerId, TezTaskAttemptID> getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
return nodeMap.get(llapNodeId);
}
}
private QueryIdentifierProto constructQueryIdentifierProto(int dagIdentifier) {
return QueryIdentifierProto.newBuilder()
.setApplicationIdString(getContext().getCurrentAppIdentifier()).setDagIndex(dagIdentifier)
.setAppAttemptNumber(getContext().getApplicationAttemptId().getAttemptId())
.build();
}
public String getAmHostString() {
return amHost;
}
}