blob: ac25d0e72ba743750d1da93c7c1ee7ee1f8241a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.yarn.appMaster;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.drill.yarn.core.ContainerRequestSpec;
import org.apache.drill.yarn.core.DrillOnYarnConfig;
import org.apache.drill.yarn.core.LaunchSpec;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Wrapper around the asynchronous versions of the YARN AM-RM and AM-NM
* interfaces. Allows strategy code to factor out the YARN-specific bits so that
* strategy code is simpler. Also allows replacing the actual YARN code with a
* mock for unit testing.
*/
public class AMYarnFacadeImpl implements AMYarnFacade {
private static final Log LOG = LogFactory.getLog(AMYarnFacadeImpl.class);
private YarnConfiguration conf;
private AMRMClientAsync<ContainerRequest> resourceMgr;
private NMClientAsync nodeMgr;
private RegisterApplicationMasterResponse registration;
private YarnClient client;
private int pollPeriodMs;
private String appMasterTrackingUrl;
private ApplicationId appId;
private ApplicationReport appReport;
private String amHost;
private boolean supportsDisks;
public AMYarnFacadeImpl(int pollPeriodMs) {
this.pollPeriodMs = pollPeriodMs;
}
@Override
public void start(CallbackHandler resourceCallback,
org.apache.hadoop.yarn.client.api.async.NMClientAsync.CallbackHandler nodeCallback ) {
conf = new YarnConfiguration();
resourceMgr = AMRMClientAsync.createAMRMClientAsync(pollPeriodMs, resourceCallback);
resourceMgr.init(conf);
resourceMgr.start();
// Create the asynchronous node manager client
nodeMgr = NMClientAsync.createNMClientAsync(nodeCallback);
nodeMgr.init(conf);
nodeMgr.start();
client = YarnClient.createYarnClient();
client.init(conf);
client.start();
String appIdStr = System.getenv(DrillOnYarnConfig.APP_ID_ENV_VAR);
if (appIdStr != null) {
appId = ConverterUtils.toApplicationId(appIdStr);
try {
appReport = client.getApplicationReport(appId);
} catch (YarnException | IOException e) {
LOG.error(
"Failed to get YARN applicaiton report for App ID: " + appIdStr, e);
}
}
}
@Override
public void register(String trackingUrl) throws YarnFacadeException {
String thisHostName = NetUtils.getHostname();
LOG.debug("Host Name from YARN: " + thisHostName);
if (trackingUrl != null) {
// YARN seems to provide multiple names: MACHNAME.local/10.250.56.235
// The second seems to be the IP address, which is what we want.
String names[] = thisHostName.split("/");
amHost = names[names.length - 1];
appMasterTrackingUrl = trackingUrl.replace("<host>", amHost);
LOG.info("Tracking URL: " + appMasterTrackingUrl);
}
try {
LOG.trace("Registering with YARN");
registration = resourceMgr.registerApplicationMaster(thisHostName, 0,
appMasterTrackingUrl);
} catch (YarnException | IOException e) {
throw new YarnFacadeException("Register AM failed", e);
}
// Some distributions (but not the stock YARN) support Disk
// resources. Since Drill compiles against Apache YARN, without disk
// resources, we have to use an indirect mechnanism to look for the
// disk enum at runtime when we don't have that enum value at compile time.
for (SchedulerResourceTypes type : registration
.getSchedulerResourceTypes()) {
if (type.name().equals("DISK")) {
supportsDisks = true;
}
}
}
@Override
public String getTrackingUrl( ) { return appMasterTrackingUrl; }
@Override
public boolean supportsDiskResource( ) { return supportsDisks; }
@Override
public ContainerRequest requestContainer(ContainerRequestSpec containerSpec) {
ContainerRequest request = containerSpec.makeRequest();
resourceMgr.addContainerRequest(containerSpec.makeRequest());
return request;
}
@Override
public void launchContainer(Container container, LaunchSpec taskSpec)
throws YarnFacadeException {
ContainerLaunchContext context = createLaunchContext(taskSpec);
startContainerAsync(container, context);
}
private ContainerLaunchContext createLaunchContext(LaunchSpec task)
throws YarnFacadeException {
try {
return task.createLaunchContext(conf);
} catch (IOException e) {
throw new YarnFacadeException("Failed to create launch context", e);
}
}
private void startContainerAsync(Container container,
ContainerLaunchContext context) {
nodeMgr.startContainerAsync(container, context);
}
@Override
public void finish(boolean succeeded, String msg) throws YarnFacadeException {
// Stop the Node Manager client.
nodeMgr.stop();
// Deregister the app from YARN.
String appMsg = "Drill Cluster Shut-Down";
FinalApplicationStatus status = FinalApplicationStatus.SUCCEEDED;
if (!succeeded) {
appMsg = "Drill Cluster Fatal Error - check logs";
status = FinalApplicationStatus.FAILED;
}
if (msg != null) {
appMsg = msg;
}
try {
resourceMgr.unregisterApplicationMaster(status, appMsg, "");
} catch (YarnException | IOException e) {
throw new YarnFacadeException("Deregister AM failed", e);
}
// Stop the Resource Manager client
resourceMgr.stop();
}
@Override
public void releaseContainer(Container container) {
resourceMgr.releaseAssignedContainer(container.getId());
}
@Override
public void killContainer(Container container) {
nodeMgr.stopContainerAsync(container.getId(), container.getNodeId());
}
@Override
public int getNodeCount() {
return resourceMgr.getClusterNodeCount();
}
@Override
public Resource getResources() {
return resourceMgr.getAvailableResources();
}
@Override
public void removeContainerRequest(ContainerRequest containerRequest) {
resourceMgr.removeContainerRequest(containerRequest);
}
@Override
public RegisterApplicationMasterResponse getRegistrationResponse() {
return registration;
}
@Override
public void blacklistNode(String nodeName) {
resourceMgr.updateBlacklist(Collections.singletonList(nodeName), null);
}
@Override
public void removeBlacklist(String nodeName) {
resourceMgr.updateBlacklist(null, Collections.singletonList(nodeName));
}
@Override
public List<NodeReport> getNodeReports() throws YarnFacadeException {
try {
return client.getNodeReports(NodeState.RUNNING);
} catch (Exception e) {
throw new YarnFacadeException("getNodeReports failed", e);
}
}
@Override
public YarnAppHostReport getAppHostReport() {
// Cobble together YARN links to simplify debugging.
YarnAppHostReport hostRpt = new YarnAppHostReport();
hostRpt.amHost = amHost;
if (appId != null) {
hostRpt.appId = appId.toString();
}
if (appReport == null) {
return hostRpt;
}
try {
String rmLink = appReport.getTrackingUrl();
URL url = new URL(rmLink);
hostRpt.rmHost = url.getHost();
hostRpt.rmUrl = "http://" + hostRpt.rmHost + ":" + url.getPort() + "/";
hostRpt.rmAppUrl = hostRpt.rmUrl + "cluster/app/" + appId.toString();
} catch (MalformedURLException e) {
return null;
}
hostRpt.nmHost = System.getenv("NM_HOST");
String nmPort = System.getenv("NM_HTTP_PORT");
if (hostRpt.nmHost != null || nmPort != null) {
hostRpt.nmUrl = "http://" + hostRpt.nmHost + ":" + nmPort + "/";
hostRpt.nmAppUrl = hostRpt.nmUrl + "node/application/" + hostRpt.appId;
}
return hostRpt;
}
}