blob: 10b78f2886e1d28f17f4c8fdd0a36043577c8d9e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.bind.annotation.XmlElement;
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StringCodec;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.BaseContext;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.appdata.AppDataPushAgent;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.OperatorStatus.PortStatus;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.security.StramDelegationTokenIdentifier;
import com.datatorrent.stram.security.StramDelegationTokenManager;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.security.StramWSFilterInitializer;
import com.datatorrent.stram.webapp.AppInfo;
import com.datatorrent.stram.webapp.StramWebApp;
import static java.lang.Thread.sleep;
/**
* Streaming Application Master
*
* @since 0.3.2
*/
public class StreamingAppMasterService extends CompositeService
{
private static final Logger LOG = LoggerFactory.getLogger(StreamingAppMasterService.class);
private static final long DELEGATION_KEY_UPDATE_INTERVAL = 24 * 60 * 60 * 1000;
private static final long DELEGATION_TOKEN_MAX_LIFETIME = Long.MAX_VALUE / 2;
private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2;
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000;
private static final int NUMBER_MISSED_HEARTBEATS = 30;
private static final int MAX_CONTAINER_FAILURES_PER_NODE = 3;
private static final long BLACKLIST_REMOVAL_TIME = 60 * 60 * 1000;
private boolean enableFailedNodeBlacklisting = false;
private AMRMClient<ContainerRequest> amRmClient;
private NMClientAsync nmClient;
private LogicalPlan dag;
// Application Attempt Id ( combination of attemptId and fail count )
private final ApplicationAttemptId appAttemptID;
// Hostname of the container
private final String appMasterHostname = "";
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
// Simple flag to denote whether all works is done
private boolean appDone = false;
// Counter for completed containers ( complete denotes successful or failed )
private final AtomicInteger numCompletedContainers = new AtomicInteger();
// Containers that the RM has allocated to us
private final ConcurrentMap<String, AllocatedContainer> allocatedContainers = Maps.newConcurrentMap();
private final ConcurrentMap<String, AtomicInteger> failedContainersMap = Maps.newConcurrentMap();
private final Queue<Pair<Long, List<String>>> blacklistedNodesQueueWithTimeStamp = new ConcurrentLinkedQueue<Pair<Long, List<String>>>();
// Count of failed containers
private final AtomicInteger numFailedContainers = new AtomicInteger();
private final ConcurrentLinkedQueue<Runnable> pendingTasks = new ConcurrentLinkedQueue<Runnable>();
// child container callback
private StreamingContainerParent heartbeatListener;
private StreamingContainerManager dnmgr;
private StramAppContext appContext;
private final Clock clock = new SystemClock();
private final long startTime = clock.getTime();
private final ClusterAppStats stats = new ClusterAppStats();
private StramDelegationTokenManager delegationTokenManager = null;
private AppDataPushAgent appDataPushAgent;
public StreamingAppMasterService(ApplicationAttemptId appAttemptID)
{
super(StreamingAppMasterService.class.getName());
this.appAttemptID = appAttemptID;
}
/**
* Overrides getters to pull live info.
*/
protected class ClusterAppStats extends AppInfo.AppStats
{
@AutoMetric
@Override
public int getAllocatedContainers()
{
return allocatedContainers.size();
}
@AutoMetric
@Override
public int getPlannedContainers()
{
return dnmgr.getPhysicalPlan().getContainers().size();
}
@AutoMetric
@Override
@XmlElement
public int getFailedContainers()
{
return numFailedContainers.get();
}
@AutoMetric
@Override
public int getNumOperators()
{
return dnmgr.getPhysicalPlan().getAllOperators().size();
}
@Override
public long getCurrentWindowId()
{
long min = Long.MAX_VALUE;
for (Map.Entry<Integer, PTOperator> entry : dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
long windowId = entry.getValue().stats.currentWindowId.get();
if (min > windowId) {
min = windowId;
}
}
return StreamingContainerManager.toWsWindowId(min == Long.MAX_VALUE ? 0 : min);
}
@Override
public long getRecoveryWindowId()
{
return StreamingContainerManager.toWsWindowId(dnmgr.getCommittedWindowId());
}
@AutoMetric
@Override
public long getTuplesProcessedPSMA()
{
long result = 0;
for (Map.Entry<Integer, PTOperator> entry : dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
result += entry.getValue().stats.tuplesProcessedPSMA.get();
}
return result;
}
@AutoMetric
@Override
public long getTotalTuplesProcessed()
{
long result = 0;
for (Map.Entry<Integer, PTOperator> entry : dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
result += entry.getValue().stats.totalTuplesProcessed.get();
}
return result;
}
@AutoMetric
@Override
public long getTuplesEmittedPSMA()
{
long result = 0;
for (Map.Entry<Integer, PTOperator> entry : dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
result += entry.getValue().stats.tuplesEmittedPSMA.get();
}
return result;
}
@AutoMetric
@Override
public long getTotalTuplesEmitted()
{
long result = 0;
for (Map.Entry<Integer, PTOperator> entry : dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
result += entry.getValue().stats.totalTuplesEmitted.get();
}
return result;
}
@AutoMetric
@Override
public long getTotalMemoryAllocated()
{
long result = 0;
for (PTContainer c : dnmgr.getPhysicalPlan().getContainers()) {
result += c.getAllocatedMemoryMB();
}
return result;
}
@AutoMetric
@Override
public long getMemoryRequired()
{
long result = 0;
for (PTContainer c : dnmgr.getPhysicalPlan().getContainers()) {
if (c.getExternalId() == null || c.getState() == PTContainer.State.KILLED) {
result += c.getRequiredMemoryMB();
}
}
return result;
}
@AutoMetric
@Override
public int getTotalVCoresAllocated()
{
int result = 0;
for (PTContainer c : dnmgr.getPhysicalPlan().getContainers()) {
result += c.getAllocatedVCores();
}
return result;
}
@AutoMetric
@Override
public int getVCoresRequired()
{
int result = 0;
for (PTContainer c : dnmgr.getPhysicalPlan().getContainers()) {
if (c.getExternalId() == null || c.getState() == PTContainer.State.KILLED) {
if (c.getRequiredVCores() == 0) {
result++;
} else {
result += c.getRequiredVCores();
}
}
}
return result;
}
@AutoMetric
@Override
public long getTotalBufferServerReadBytesPSMA()
{
long result = 0;
for (Map.Entry<Integer, PTOperator> entry : dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
for (Map.Entry<String, PortStatus> portEntry : entry.getValue().stats.inputPortStatusList.entrySet()) {
result += portEntry.getValue().bufferServerBytesPMSMA.getAvg() * 1000;
}
}
return result;
}
@AutoMetric
@Override
public long getTotalBufferServerWriteBytesPSMA()
{
long result = 0;
for (Map.Entry<Integer, PTOperator> entry : dnmgr.getPhysicalPlan().getAllOperators().entrySet()) {
for (Map.Entry<String, PortStatus> portEntry : entry.getValue().stats.outputPortStatusList.entrySet()) {
result += portEntry.getValue().bufferServerBytesPMSMA.getAvg() * 1000;
}
}
return result;
}
@Override
public List<Integer> getCriticalPath()
{
StreamingContainerManager.CriticalPathInfo criticalPathInfo = dnmgr.getCriticalPathInfo();
return (criticalPathInfo == null) ? null : criticalPathInfo.path;
}
@AutoMetric
@Override
public long getLatency()
{
StreamingContainerManager.CriticalPathInfo criticalPathInfo = dnmgr.getCriticalPathInfo();
return (criticalPathInfo == null) ? 0 : criticalPathInfo.latency;
}
@Override
public long getWindowStartMillis()
{
return dnmgr.getWindowStartMillis();
}
}
private class ClusterAppContextImpl extends BaseContext implements StramAppContext
{
private ClusterAppContextImpl()
{
super(null, null);
}
ClusterAppContextImpl(Attribute.AttributeMap attributes)
{
super(attributes, null);
}
@Override
public ApplicationId getApplicationID()
{
return appAttemptID.getApplicationId();
}
@Override
public ApplicationAttemptId getApplicationAttemptId()
{
return appAttemptID;
}
@Override
public String getApplicationName()
{
return getValue(LogicalPlan.APPLICATION_NAME);
}
@Override
public String getApplicationDocLink()
{
return getValue(LogicalPlan.APPLICATION_DOC_LINK);
}
@Override
public long getStartTime()
{
return startTime;
}
@Override
public String getApplicationPath()
{
return getValue(LogicalPlan.APPLICATION_PATH);
}
@Override
public CharSequence getUser()
{
return System.getenv(ApplicationConstants.Environment.USER.toString());
}
@Override
public Clock getClock()
{
return clock;
}
@Override
public String getAppMasterTrackingUrl()
{
return appMasterTrackingUrl;
}
@Override
public ClusterAppStats getStats()
{
return stats;
}
@Override
public String getGatewayAddress()
{
return getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
}
@Override
public boolean isGatewayConnected()
{
if (StreamingAppMasterService.this.dnmgr != null) {
return StreamingAppMasterService.this.dnmgr.isGatewayConnected();
}
return false;
}
@Override
public List<AppDataSource> getAppDataSources()
{
if (StreamingAppMasterService.this.dnmgr != null) {
return StreamingAppMasterService.this.dnmgr.getAppDataSources();
}
return null;
}
@Override
public Map<String, Object> getMetrics()
{
if (StreamingAppMasterService.this.dnmgr != null) {
return (Map)StreamingAppMasterService.this.dnmgr.getLatestLogicalMetrics();
}
return null;
}
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
private static final long serialVersionUID = 201309112304L;
}
/**
* Dump out contents of $CWD and the environment to stdout for debugging
*/
@SuppressWarnings("UseOfSystemOutOrSystemErr")
public void dumpOutDebugInfo()
{
LOG.info("Dump debug output");
Map<String, String> envs = System.getenv();
LOG.info("\nDumping System Env: begin");
for (Map.Entry<String, String> env : envs.entrySet()) {
LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
}
LOG.info("Dumping System Env: end");
String cmd = "ls -al";
Runtime run = Runtime.getRuntime();
Process pr;
try {
pr = run.exec(cmd);
pr.waitFor();
BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
String line;
LOG.info("\nDumping files in local dir: begin");
try {
while ((line = buf.readLine()) != null) {
LOG.info("System CWD content: " + line);
}
LOG.info("Dumping files in local dir: end");
} finally {
buf.close();
}
} catch (IOException e) {
LOG.debug("Exception", e);
} catch (InterruptedException e) {
LOG.info("Interrupted", e);
}
LOG.info("Classpath: {}", System.getProperty("java.class.path"));
LOG.info("Config resources: {}", getConfig().toString());
try {
// find a better way of logging this using the logger.
Configuration.dumpConfiguration(getConfig(), new PrintWriter(System.out));
} catch (Exception e) {
LOG.error("Error dumping configuration.", e);
}
}
@Override
protected void serviceInit(Configuration conf) throws Exception
{
LOG.info("Application master" + ", appId=" + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId=" + appAttemptID.getAttemptId());
FileInputStream fis = new FileInputStream("./" + LogicalPlan.SER_FILE_NAME);
try {
this.dag = LogicalPlan.read(fis);
} finally {
fis.close();
}
// "debug" simply dumps all data using LOG.info
if (dag.isDebug()) {
dumpOutDebugInfo();
}
dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), conf);
this.dnmgr = StreamingContainerManager.getInstance(recoveryHandler, dag, true);
dag = this.dnmgr.getLogicalPlan();
this.appContext = new ClusterAppContextImpl(dag.getAttributes());
Map<Class<?>, Class<? extends StringCodec<?>>> codecs = dag.getAttributes().get(DAG.STRING_CODECS);
StringCodecs.loadConverters(codecs);
LOG.info("Starting application with {} operators in {} containers", dnmgr.getPhysicalPlan().getAllOperators().size(), dnmgr.getPhysicalPlan().getContainers().size());
if (UserGroupInformation.isSecurityEnabled()) {
// TODO :- Need to perform token renewal
delegationTokenManager = new StramDelegationTokenManager(DELEGATION_KEY_UPDATE_INTERVAL, DELEGATION_TOKEN_MAX_LIFETIME, DELEGATION_TOKEN_RENEW_INTERVAL, DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL);
}
this.nmClient = new NMClientAsyncImpl(new NMCallbackHandler());
addService(nmClient);
this.amRmClient = AMRMClient.createAMRMClient();
addService(amRmClient);
// start RPC server
int rpcListenerCount = dag.getValue(DAGContext.HEARTBEAT_LISTENER_THREAD_COUNT);
this.heartbeatListener = new StreamingContainerParent(this.getClass().getName(), dnmgr, delegationTokenManager, rpcListenerCount);
addService(heartbeatListener);
String appDataPushTransport = dag.getValue(LogicalPlan.METRICS_TRANSPORT);
if (appDataPushTransport != null) {
this.appDataPushAgent = new AppDataPushAgent(dnmgr, appContext);
addService(this.appDataPushAgent);
}
// initialize all services added above
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception
{
super.serviceStart();
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager.startThreads();
}
// write the connect address for containers to DFS
InetSocketAddress connectAddress = NetUtils.getConnectAddress(this.heartbeatListener.getAddress());
URI connectUri = RecoverableRpcProxy.toConnectURI(connectAddress);
FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), getConfig());
recoveryHandler.writeConnectUri(connectUri.toString());
// start web service
try {
org.mortbay.log.Log.setLog(null);
}
catch (Throwable throwable) {
// SPOI-2687. As part of Pivotal Certification, we need to catch ClassNotFoundException as Pivotal was using Jetty 7 where as other distros are using Jetty 6.
// LOG.error("can't set the log to null: ", throwable);
}
try {
Configuration config = getConfig();
if (UserGroupInformation.isSecurityEnabled()) {
config = new Configuration(config);
config.set("hadoop.http.filter.initializers", StramWSFilterInitializer.class.getCanonicalName());
}
WebApp webApp = WebApps.$for("stram", StramAppContext.class, appContext, "ws").with(config).start(new StramWebApp(this.dnmgr));
LOG.info("Started web service at port: " + webApp.port());
this.appMasterTrackingUrl = NetUtils.getConnectAddress(webApp.getListenerAddress()).getHostName() + ":" + webApp.port();
LOG.info("Setting tracking URL to: " + appMasterTrackingUrl);
}
catch (Exception e) {
LOG.error("Webapps failed to start. Ignoring for now:", e);
}
}
@Override
protected void serviceStop() throws Exception
{
super.serviceStop();
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenManager.stopThreads();
}
if (nmClient != null) {
nmClient.stop();
}
if (amRmClient != null) {
amRmClient.stop();
}
if (dnmgr != null) {
dnmgr.teardown();
}
}
public boolean run() throws Exception
{
boolean status = true;
try {
StreamingContainer.eventloop.start();
execute();
}
finally {
StreamingContainer.eventloop.stop();
}
return status;
}
/**
* Main run function for the application master
*
* @throws YarnException
*/
@SuppressWarnings("SleepWhileInLoop")
private void execute() throws YarnException, IOException
{
LOG.info("Starting ApplicationMaster");
final Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
LOG.info("number of tokens: {}", credentials.getAllTokens().size());
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
LOG.debug("token: {}", token);
}
final Configuration conf = getConfig();
long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
long expiryTime = System.currentTimeMillis() + tokenLifeTime;
LOG.debug(" expiry token time {}", tokenLifeTime);
String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE);
// Register self with ResourceManager
RegisterApplicationMasterResponse response = amRmClient.registerApplicationMaster(appMasterHostname, 0, appMasterTrackingUrl);
// Dump out information about cluster capability as seen by the resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
int maxVcores = response.getMaximumResourceCapability().getVirtualCores();
int minMem = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
int minVcores = conf.getInt("yarn.scheduler.minimum-allocation-vcores", 0);
LOG.info("Max mem {}m, Min mem {}m, Max vcores {} and Min vcores {} capabililty of resources in this cluster ", maxMem, minMem, maxVcores, minVcores);
int maxConsecutiveContainerFailures = conf.getInt("MAX_CONSECUTIVE_CONTAINER_FAILURES", MAX_CONTAINER_FAILURES_PER_NODE);
long blacklistRemovalTime = conf.getLong("BLACKLIST_REMOVAL_TIME", BLACKLIST_REMOVAL_TIME);
// for locality relaxation fall back
Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources = Maps.newHashMap();
// Setup heartbeat emitter
// TODO poll RM every now and then with an empty request to let RM know that we are alive
// The heartbeat interval after which an AM is timed out by the RM is defined by a config setting:
// RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
// The allocate calls to the RM count as heartbeat so, for now, this additional heartbeat emitter
// is not required.
int loopCounter = -1;
List<ContainerId> releasedContainers = new ArrayList<ContainerId>();
int numTotalContainers = 0;
// keep track of already requested containers to not request them again while waiting for allocation
int numRequestedContainers = 0;
int numReleasedContainers = 0;
int nextRequestPriority = 0;
ResourceRequestHandler resourceRequestor = new ResourceRequestHandler();
YarnClient clientRMService = YarnClient.createYarnClient();
try {
// YARN-435
// we need getClusterNodes to populate the initial node list,
// subsequent updates come through the heartbeat response
clientRMService.init(conf);
clientRMService.start();
ApplicationReport ar = StramClientUtils.getStartedAppInstanceByName(clientRMService,
dag.getAttributes().get(DAG.APPLICATION_NAME),
UserGroupInformation.getLoginUser().getUserName(),
dag.getAttributes().get(DAG.APPLICATION_ID));
if (ar != null) {
appDone = true;
dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.",
ar.getApplicationId().toString(), ar.getName(), ar.getUser());
LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
finishApplication(FinalApplicationStatus.FAILED, numTotalContainers);
return;
}
resourceRequestor.updateNodeReports(clientRMService.getNodeReports());
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve cluster nodes report.", e);
} finally {
clientRMService.stop();
}
// check for previously allocated containers
// as of 2.2, containers won't survive AM restart, but this will change in the future - YARN-1490
checkContainerStatus();
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
while (!appDone) {
loopCounter++;
if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) {
String applicationId = appAttemptID.getApplicationId().toString();
expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
}
Runnable r;
while ((r = this.pendingTasks.poll()) != null) {
r.run();
}
// log current state
/*
* LOG.info("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" +
* numTotalContainers + ", requested=" + numRequestedContainers + ", completed=" + numCompletedContainers +
* ", failed=" + numFailedContainers + ", currentAllocated=" + this.allAllocatedContainers.size());
*/
// Sleep before each loop when asking RM for containers
// to avoid flooding RM with spurious requests when it
// need not have any available containers
try {
sleep(1000);
}
catch (InterruptedException e) {
LOG.info("Sleep interrupted " + e.getMessage());
}
// Setup request to be sent to RM to allocate containers
List<ContainerRequest> containerRequests = new ArrayList<ContainerRequest>();
List<ContainerRequest> removedContainerRequests = new ArrayList<ContainerRequest>();
// request containers for pending deploy requests
if (!dnmgr.containerStartRequests.isEmpty()) {
StreamingContainerAgent.ContainerStartRequest csr;
while ((csr = dnmgr.containerStartRequests.poll()) != null) {
if (csr.container.getRequiredMemoryMB() > maxMem) {
LOG.warn("Container memory {}m above max threshold of cluster. Using max value {}m.", csr.container.getRequiredMemoryMB(), maxMem);
csr.container.setRequiredMemoryMB(maxMem);
}
if(csr.container.getRequiredMemoryMB() < minMem){
csr.container.setRequiredMemoryMB(minMem);
}
if (csr.container.getRequiredVCores() > maxVcores) {
LOG.warn("Container vcores {} above max threshold of cluster. Using max value {}.", csr.container.getRequiredVCores(), maxVcores);
csr.container.setRequiredVCores(maxVcores);
}
if(csr.container.getRequiredVCores() < minVcores){
csr.container.setRequiredVCores(minVcores);
}
csr.container.setResourceRequestPriority(nextRequestPriority++);
ContainerRequest cr = resourceRequestor.createContainerRequest(csr, true);
MutablePair<Integer, ContainerRequest> pair = new MutablePair<Integer, ContainerRequest>(loopCounter, cr);
requestedResources.put(csr, pair);
containerRequests.add(cr);
}
}
if (!requestedResources.isEmpty()) {
//resourceRequestor.clearNodeMapping();
for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) {
StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
LOG.debug("Request for container {} timed out. Re-requesting container", csr.container);
removedContainerRequests.add(entry.getValue().getRight());
ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false);
entry.getValue().setLeft(loopCounter);
entry.getValue().setRight(cr);
containerRequests.add(cr);
}
}
}
/* Remove nodes from blacklist after timeout */
long currentTime = System.currentTimeMillis();
List<String> blacklistRemovals = new ArrayList<String>();
for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) {
Pair<Long, List<String>> entry = it.next();
Long timeDiff = currentTime - entry.getFirst();
if (timeDiff > blacklistRemovalTime) {
blacklistRemovals.addAll(entry.getSecond());
it.remove();
} else {
break;
}
}
if (!blacklistRemovals.isEmpty()) {
amRmClient.updateBlacklist(null, blacklistRemovals);
}
numTotalContainers += containerRequests.size();
numRequestedContainers += containerRequests.size() - removedContainerRequests.size();
AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers);
if (amResp.getAMCommand() != null) {
LOG.info(" statement executed:{}", amResp.getAMCommand());
switch (amResp.getAMCommand()) {
case AM_RESYNC:
case AM_SHUTDOWN:
throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
default:
throw new YarnRuntimeException("Received the " + amResp.getAMCommand() + " command from RM");
}
}
releasedContainers.clear();
// Retrieve list of allocated containers from the response
List<Container> newAllocatedContainers = amResp.getAllocatedContainers();
// LOG.info("Got response from RM for container ask, allocatedCnt=" + newAllocatedContainers.size());
numRequestedContainers -= newAllocatedContainers.size();
long timestamp = System.currentTimeMillis();
for (Container allocatedContainer : newAllocatedContainers) {
LOG.info("Got new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory() + ", priority" + allocatedContainer.getPriority());
// + ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
boolean alreadyAllocated = true;
StreamingContainerAgent.ContainerStartRequest csr = null;
for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
if (entry.getKey().container.getResourceRequestPriority() == allocatedContainer.getPriority().getPriority()) {
alreadyAllocated = false;
csr = entry.getKey();
break;
}
}
if (alreadyAllocated) {
LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
releasedContainers.add(allocatedContainer.getId());
numReleasedContainers++;
numRequestedContainers--;
continue;
}
if (csr != null) {
requestedResources.remove(csr);
}
// allocate resource to container
ContainerResource resource = new ContainerResource(allocatedContainer.getPriority().getPriority(), allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString(), allocatedContainer.getResource().getMemory(), allocatedContainer.getResource().getVirtualCores(), allocatedContainer.getNodeHttpAddress());
StreamingContainerAgent sca = dnmgr.assignContainer(resource, null);
if (sca == null) {
// allocated container no longer needed, add release request
LOG.warn("Container {} allocated but nothing to deploy, going to release this container.", allocatedContainer.getId());
releasedContainers.add(allocatedContainer.getId());
}
else {
AllocatedContainer allocatedContainerHolder = new AllocatedContainer(allocatedContainer);
this.allocatedContainers.put(allocatedContainer.getId().toString(), allocatedContainerHolder);
ByteBuffer tokens = null;
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
Token<StramDelegationTokenIdentifier> delegationToken = allocateDelegationToken(ugi.getUserName(), heartbeatListener.getAddress());
allocatedContainerHolder.delegationToken = delegationToken;
//ByteBuffer tokens = LaunchContainerRunnable.getTokens(delegationTokenManager, heartbeatListener.getAddress());
tokens = LaunchContainerRunnable.getTokens(ugi, delegationToken);
}
LaunchContainerRunnable launchContainer = new LaunchContainerRunnable(allocatedContainer, nmClient, sca, tokens);
// Thread launchThread = new Thread(runnableLaunchContainer);
// launchThreads.add(launchThread);
// launchThread.start();
launchContainer.run(); // communication with NMs is now async
// record container start event
StramEvent ev = new StramEvent.StartContainerEvent(allocatedContainer.getId().toString(), allocatedContainer.getNodeId().toString());
ev.setTimestamp(timestamp);
dnmgr.recordEventAsync(ev);
}
}
// track node updates for future locality constraint allocations
// TODO: it seems 2.0.4-alpha doesn't give us any updates
resourceRequestor.updateNodeReports(amResp.getUpdatedNodes());
// Check the completed containers
List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
// LOG.debug("Got response from RM for container ask, completedCnt=" + completedContainers.size());
List<String> blacklistAdditions = new ArrayList<String>();
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Completed containerId=" + containerStatus.getContainerId() + ", state=" + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getDiagnostics());
// non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE);
AllocatedContainer allocatedContainer = allocatedContainers.remove(containerStatus.getContainerId().toString());
if (allocatedContainer != null && allocatedContainer.delegationToken != null) {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
delegationTokenManager.cancelToken(allocatedContainer.delegationToken, ugi.getUserName());
}
int exitStatus = containerStatus.getExitStatus();
if (0 != exitStatus) {
if (allocatedContainer != null) {
numFailedContainers.incrementAndGet();
if (exitStatus != 1 && enableFailedNodeBlacklisting) {
// If container failure due to framework
String hostname = allocatedContainer.container.getNodeId().getHost();
int failedTimes = 1;
AtomicInteger failed = failedContainersMap.putIfAbsent(hostname, new AtomicInteger(1));
if (failed != null) {
failedTimes = failed.incrementAndGet();
}
if (failedTimes >= maxConsecutiveContainerFailures) {
// Blacklist the node
LOG.info("Node {} failed {} times consecutively, marking the node blacklisted", hostname, failedTimes);
blacklistAdditions.add(hostname);
}
}
}
// if (exitStatus == 1) {
// // non-recoverable StreamingContainer failure
// appDone = true;
// finalStatus = FinalApplicationStatus.FAILED;
// dnmgr.shutdownDiagnosticsMessage = "Unrecoverable failure " + containerStatus.getContainerId();
// LOG.info("Exiting due to: {}", dnmgr.shutdownDiagnosticsMessage);
// }
// else {
// Recoverable failure or process killed (externally or via stop request by AM)
// also occurs when a container was released by the application but never assigned/launched
LOG.debug("Container {} failed or killed.", containerStatus.getContainerId());
dnmgr.scheduleContainerRestart(containerStatus.getContainerId().toString());
// }
}
else {
// container completed successfully
numCompletedContainers.incrementAndGet();
LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
// Reset counter for node failure, if exists
String hostname = allocatedContainer.container.getNodeId().getHost();
AtomicInteger failedTimes = failedContainersMap.get(hostname);
if(failedTimes != null) {
failedTimes.set(0);
}
}
String containerIdStr = containerStatus.getContainerId().toString();
dnmgr.removeContainerAgent(containerIdStr);
// record container stop event
StramEvent ev = new StramEvent.StopContainerEvent(containerIdStr, containerStatus.getExitStatus());
ev.setReason(containerStatus.getDiagnostics());
dnmgr.recordEventAsync(ev);
}
if (!blacklistAdditions.isEmpty()) {
amRmClient.updateBlacklist(blacklistAdditions, null);
blacklistedNodesQueueWithTimeStamp.add(new Pair<Long, List<String>>(System.currentTimeMillis(), blacklistAdditions));
}
if (dnmgr.forcedShutdown) {
LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage);
finalStatus = FinalApplicationStatus.FAILED;
appDone = true;
}
else if (allocatedContainers.isEmpty() && numRequestedContainers == 0 && dnmgr.containerStartRequests.isEmpty()) {
LOG.debug("Exiting as no more containers are allocated or requested");
finalStatus = FinalApplicationStatus.SUCCEEDED;
appDone = true;
}
LOG.debug("Current application state: loop={}, appDone={}, total={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}",
loopCounter, appDone, numTotalContainers, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests);
// monitor child containers
dnmgr.monitorHeartbeat();
}
finishApplication(finalStatus, numTotalContainers);
}
private void finishApplication(FinalApplicationStatus finalStatus, int numTotalContainers) throws YarnException, IOException
{
LOG.info("Application completed. Signalling finish to RM");
FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setFinalApplicationStatus(finalStatus);
if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
String diagnostics = "Diagnostics." + ", total=" + numTotalContainers + ", completed=" + numCompletedContainers.get() + ", allocated=" + allocatedContainers.size() + ", failed=" + numFailedContainers.get();
if (!StringUtils.isEmpty(dnmgr.shutdownDiagnosticsMessage)) {
diagnostics += "\n";
diagnostics += dnmgr.shutdownDiagnosticsMessage;
}
// YARN-208 - as of 2.0.1-alpha dropped by the RM
finishReq.setDiagnostics(diagnostics);
// expected termination of the master process
// application status and diagnostics message are set above
}
LOG.info("diagnostics: " + finishReq.getDiagnostics());
amRmClient.unregisterApplicationMaster(finishReq.getFinalApplicationStatus(), finishReq.getDiagnostics(), null);
}
private Token<StramDelegationTokenIdentifier> allocateDelegationToken(String username, InetSocketAddress address)
{
StramDelegationTokenIdentifier identifier = new StramDelegationTokenIdentifier(new Text(username), new Text(""), new Text(""));
String service = address.getAddress().getHostAddress() + ":" + address.getPort();
Token<StramDelegationTokenIdentifier> stramToken = new Token<StramDelegationTokenIdentifier>(identifier, delegationTokenManager);
stramToken.setService(new Text(service));
return stramToken;
}
/**
* Check for containers that were allocated in a previous attempt.
* If the containers are still alive, wait for them to check in via heartbeat.
*/
private void checkContainerStatus()
{
Collection<StreamingContainerAgent> containers = this.dnmgr.getContainerAgents();
for (StreamingContainerAgent ca : containers) {
ContainerId containerId = ConverterUtils.toContainerId(ca.container.getExternalId());
NodeId nodeId = ConverterUtils.toNodeId(ca.container.host);
// put container back into the allocated list
org.apache.hadoop.yarn.api.records.Token containerToken = null;
Resource resource = Resource.newInstance(ca.container.getAllocatedMemoryMB(), ca.container.getAllocatedVCores());
Priority priority = Priority.newInstance(ca.container.getResourceRequestPriority());
Container yarnContainer = Container.newInstance(containerId, nodeId, ca.container.nodeHttpAddress, resource, priority, containerToken);
this.allocatedContainers.put(containerId.toString(), new AllocatedContainer(yarnContainer));
// check the status
nmClient.getContainerStatusAsync(containerId, nodeId);
}
}
/**
* Ask RM to allocate given no. of containers to this Application Master
*
* @param containerRequests Containers to ask for from RM
* @param removedContainerRequests Container requests to be removed
* @param releasedContainers
* @return Response from RM to AM with allocated containers
* @throws YarnException
*/
private AllocateResponse sendContainerAskToRM(List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests, List<ContainerId> releasedContainers) throws YarnException, IOException
{
if (removedContainerRequests.size() > 0) {
LOG.debug("Removing container request: {}", removedContainerRequests);
for (ContainerRequest cr : removedContainerRequests) {
amRmClient.removeContainerRequest(cr);
}
}
if (containerRequests.size() > 0) {
LOG.debug("Asking RM for containers: {}", containerRequests);
for (ContainerRequest cr : containerRequests) {
amRmClient.addContainerRequest(cr);
}
}
for (ContainerId containerId : releasedContainers) {
LOG.info("Released container, id={}", containerId.getId());
amRmClient.releaseAssignedContainer(containerId);
}
for (String containerIdStr : dnmgr.containerStopRequests.values()) {
AllocatedContainer allocatedContainer = this.allocatedContainers.get(containerIdStr);
if (allocatedContainer != null && !allocatedContainer.stopRequested) {
nmClient.stopContainerAsync(allocatedContainer.container.getId(), allocatedContainer.container.getNodeId());
LOG.info("Requested stop container {}", containerIdStr);
allocatedContainer.stopRequested = true;
}
dnmgr.containerStopRequests.remove(containerIdStr);
}
return amRmClient.allocate(0);
}
private class NMCallbackHandler implements NMClientAsync.CallbackHandler
{
NMCallbackHandler()
{
}
@Override
public void onContainerStopped(ContainerId containerId)
{
LOG.debug("Succeeded to stop Container {}", containerId);
}
@Override
public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus)
{
LOG.debug("Container Status: id={}, status={}", containerId, containerStatus);
if (containerStatus.getState() != ContainerState.RUNNING) {
recoverContainer(containerId);
}
}
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse)
{
LOG.debug("Succeeded to start Container {}", containerId);
}
@Override
public void onStartContainerError(ContainerId containerId, Throwable t)
{
LOG.error("Start container failed for: containerId={}", containerId, t);
}
@Override
public void onGetContainerStatusError(ContainerId containerId, Throwable t)
{
LOG.error("Failed to query the status of {}", containerId, t);
// if the NM is not reachable, consider container lost and recover (occurs during AM recovery)
recoverContainer(containerId);
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t)
{
LOG.warn("Failed to stop container {}", containerId, t);
// container could not be stopped, we won't receive a stop event from AM heartbeat
// short circuit and schedule recovery directly
recoverContainer(containerId);
}
private void recoverContainer(final ContainerId containerId)
{
pendingTasks.add(new Runnable()
{
@Override
public void run()
{
dnmgr.scheduleContainerRestart(containerId.toString());
allocatedContainers.remove(containerId.toString());
}
});
}
}
private class AllocatedContainer
{
final private Container container;
private boolean stopRequested;
private Token<StramDelegationTokenIdentifier> delegationToken;
private AllocatedContainer(Container c)
{
container = c;
}
}
}