blob: 845a27b091f1bb4aa4ad7dfd024c880f9663bbbc [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.launcher;
import java.io.IOException;
import java.net.InetSocketAddress;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.common.TezUtils;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerLauncher;
import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.TezTestServiceCommunicator;
import org.apache.tez.service.TezTestServiceConfConstants;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TezTestServiceContainerLauncher extends ContainerLauncher {
// TODO Support interruptability of tasks which haven't yet been launched.
// TODO May need multiple connections per target machine, depending upon how synchronization is handled in the RPC layer
static final Logger LOG = LoggerFactory.getLogger(TezTestServiceContainerLauncher.class);
private final String tokenIdentifier;
private final int servicePort;
private final TezTestServiceCommunicator communicator;
private final ApplicationAttemptId appAttemptId;
private final Configuration conf;
// Configuration passed in here to set up final parameters
public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) {
super(containerLauncherContext);
try {
conf = TezUtils.createConfFromUserPayload(getContext().getInitialUserPayload());
} catch (IOException e) {
throw new RuntimeException(e);
}
int numThreads = conf.getInt(
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
this.servicePort = conf.getInt(
TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
Preconditions.checkArgument(servicePort > 0,
TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
this.communicator = new TezTestServiceCommunicator(numThreads);
this.tokenIdentifier = getContext().getApplicationAttemptId().getApplicationId().toString();
this.appAttemptId = getContext().getApplicationAttemptId();
}
@Override
public void start() {
communicator.init(conf);
communicator.start();
}
@Override
public void shutdown() {
communicator.stop();
}
@Override
public void launchContainer(final ContainerLaunchRequest launchRequest) {
RunContainerRequestProto runRequest = null;
try {
runRequest = constructRunContainerRequest(launchRequest);
} catch (IOException e) {
getContext().containerLaunchFailed(launchRequest.getContainerId(),
"Failed to construct launch request, " + StringUtils.stringifyException(e));
return;
}
communicator.runContainer(runRequest, launchRequest.getNodeId().getHost(),
launchRequest.getNodeId().getPort(),
new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
@Override
public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
LOG.info(
"Container: " + launchRequest.getContainerId() + " launch succeeded on host: " +
launchRequest.getNodeId());
getContext().containerLaunched(launchRequest.getContainerId());
}
@Override
public void indicateError(Throwable t) {
LOG.error(
"Failed to launch container: " + launchRequest.getContainerId() + " on host: " +
launchRequest.getNodeId(), t);
sendContainerLaunchFailedMsg(launchRequest.getContainerId(), t);
}
});
}
@Override
public void stopContainer(ContainerStopRequest stopRequest) {
LOG.info("Ignoring stopContainer for event: " + stopRequest);
// that the container is actually done (normally received from RM)
// TODO Sending this out for an un-launched container is invalid
getContext().containerStopRequested(stopRequest.getContainerId());
}
private RunContainerRequestProto constructRunContainerRequest(ContainerLaunchRequest launchRequest) throws
IOException {
RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(
TezConstants.getTezYarnServicePluginName()));
InetSocketAddress address = (InetSocketAddress) getContext().getTaskCommunicatorMetaInfo(launchRequest.getTaskCommunicatorName());
builder.setAmHost(address.getHostName()).setAmPort(address.getPort());
builder.setAppAttemptNumber(appAttemptId.getAttemptId());
builder.setApplicationIdString(appAttemptId.getApplicationId().toString());
builder.setTokenIdentifier(tokenIdentifier);
builder.setContainerIdString(launchRequest.getContainerId().toString());
builder.setCredentialsBinary(
ByteString.copyFrom(launchRequest.getContainerLaunchContext().getTokens()));
// TODO Avoid reading this from the environment
builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
return builder.build();
}
@SuppressWarnings("unchecked")
void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) {
getContext().containerLaunchFailed(containerId, t == null ? "" : t.getMessage());
}
}