blob: 1d76d7474afd3982f4169a1085ee537928cb9997 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.util.SystemClock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.ArrayList;
/**
* A class to store node or application allocations.
* It mainly contains operations for allocation start, add, update and finish.
*/
public class ActivitiesManager extends AbstractService {
private static final Log LOG = LogFactory.getLog(ActivitiesManager.class);
private ConcurrentMap<NodeId, List<NodeAllocation>> recordingNodesAllocation;
private ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
private Set<NodeId> activeRecordedNodes;
private ConcurrentMap<ApplicationId, Long>
recordingAppActivitiesUntilSpecifiedTime;
private ConcurrentMap<ApplicationId, AppAllocation> appsAllocation;
private ConcurrentMap<ApplicationId, List<AppAllocation>>
completedAppAllocations;
private boolean recordNextAvailableNode = false;
private List<NodeAllocation> lastAvailableNodeActivities = null;
private Thread cleanUpThread;
private int timeThreshold = 600 * 1000;
private final RMContext rmContext;
public ActivitiesManager(RMContext rmContext) {
super(ActivitiesManager.class.getName());
recordingNodesAllocation = new ConcurrentHashMap<>();
completedNodeAllocations = new ConcurrentHashMap<>();
appsAllocation = new ConcurrentHashMap<>();
completedAppAllocations = new ConcurrentHashMap<>();
activeRecordedNodes = Collections.newSetFromMap(
new ConcurrentHashMap<NodeId, Boolean>());
recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
this.rmContext = rmContext;
}
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus()
== FinalApplicationStatus.UNDEFINED) {
List<AppAllocation> allocations = completedAppAllocations.get(
applicationId);
return new AppActivitiesInfo(allocations, applicationId);
} else {
return new AppActivitiesInfo(
"fail to get application activities after finished",
applicationId.toString());
}
}
public ActivitiesInfo getActivitiesInfo(String nodeId) {
List<NodeAllocation> allocations;
if (nodeId == null) {
allocations = lastAvailableNodeActivities;
} else {
allocations = completedNodeAllocations.get(NodeId.fromString(nodeId));
}
return new ActivitiesInfo(allocations, nodeId);
}
public void recordNextNodeUpdateActivities(String nodeId) {
if (nodeId == null) {
recordNextAvailableNode = true;
} else {
activeRecordedNodes.add(NodeId.fromString(nodeId));
}
}
public void turnOnAppActivitiesRecording(ApplicationId applicationId,
double maxTime) {
long startTS = SystemClock.getInstance().getTime();
long endTS = startTS + (long) (maxTime * 1000);
recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS);
}
@Override
protected void serviceStart() throws Exception {
cleanUpThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
completedNodeAllocations.entrySet().iterator();
while (ite.hasNext()) {
Map.Entry<NodeId, List<NodeAllocation>> nodeAllocation = ite.next();
List<NodeAllocation> allocations = nodeAllocation.getValue();
long currTS = SystemClock.getInstance().getTime();
if (allocations.size() > 0 && allocations.get(0).getTimeStamp()
- currTS > timeThreshold) {
ite.remove();
}
}
Iterator<Map.Entry<ApplicationId, List<AppAllocation>>> iteApp =
completedAppAllocations.entrySet().iterator();
while (iteApp.hasNext()) {
Map.Entry<ApplicationId, List<AppAllocation>> appAllocation =
iteApp.next();
if (rmContext.getRMApps().get(appAllocation.getKey())
.getFinalApplicationStatus()
!= FinalApplicationStatus.UNDEFINED) {
iteApp.remove();
}
}
try {
Thread.sleep(5000);
} catch (Exception e) {
// ignore
}
}
}
});
cleanUpThread.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
cleanUpThread.interrupt();
super.serviceStop();
}
void startNodeUpdateRecording(NodeId nodeID) {
if (recordNextAvailableNode) {
recordNextNodeUpdateActivities(nodeID.toString());
}
if (activeRecordedNodes.contains(nodeID)) {
List<NodeAllocation> nodeAllocation = new ArrayList<>();
recordingNodesAllocation.put(nodeID, nodeAllocation);
}
}
void startAppAllocationRecording(NodeId nodeID, long currTS,
SchedulerApplicationAttempt application) {
ApplicationId applicationId = application.getApplicationId();
if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
&& recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
> currTS) {
appsAllocation.put(applicationId,
new AppAllocation(application.getPriority(), nodeID,
application.getQueueName()));
}
if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
&& recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
<= currTS) {
turnOffActivityMonitoringForApp(applicationId);
}
}
// Add queue, application or container activity into specific node allocation.
void addSchedulingActivityForNode(NodeId nodeID, String parentName,
String childName, String priority, ActivityState state, String diagnostic,
String type) {
if (shouldRecordThisNode(nodeID)) {
NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
nodeAllocation.addAllocationActivity(parentName, childName, priority,
state, diagnostic, type);
}
}
// Add queue, application or container activity into specific application
// allocation.
void addSchedulingActivityForApp(ApplicationId applicationId,
ContainerId containerId, String priority, ActivityState state,
String diagnostic, String type) {
if (shouldRecordThisApp(applicationId)) {
AppAllocation appAllocation = appsAllocation.get(applicationId);
appAllocation.addAppAllocationActivity(containerId == null ?
"Container-Id-Not-Assigned" :
containerId.toString(), priority, state, diagnostic, type);
}
}
// Update container allocation meta status for this node allocation.
// It updates general container status but not the detailed activity state
// in updateActivityState.
void updateAllocationFinalState(NodeId nodeID, ContainerId containerId,
AllocationState containerState) {
if (shouldRecordThisNode(nodeID)) {
NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
nodeAllocation.updateContainerState(containerId, containerState);
}
}
void finishAppAllocationRecording(ApplicationId applicationId,
ContainerId containerId, ActivityState appState, String diagnostic) {
if (shouldRecordThisApp(applicationId)) {
long currTS = SystemClock.getInstance().getTime();
AppAllocation appAllocation = appsAllocation.remove(applicationId);
appAllocation.updateAppContainerStateAndTime(containerId, appState,
currTS, diagnostic);
List<AppAllocation> appAllocations;
if (completedAppAllocations.containsKey(applicationId)) {
appAllocations = completedAppAllocations.get(applicationId);
} else {
appAllocations = new ArrayList<>();
completedAppAllocations.put(applicationId, appAllocations);
}
if (appAllocations.size() == 1000) {
appAllocations.remove(0);
}
appAllocations.add(appAllocation);
if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
<= currTS) {
turnOffActivityMonitoringForApp(applicationId);
}
}
}
void finishNodeUpdateRecording(NodeId nodeID) {
List<NodeAllocation> value = recordingNodesAllocation.get(nodeID);
long timeStamp = SystemClock.getInstance().getTime();
if (value != null) {
if (value.size() > 0) {
lastAvailableNodeActivities = value;
for (NodeAllocation allocation : lastAvailableNodeActivities) {
allocation.transformToTree();
allocation.setTimeStamp(timeStamp);
}
if (recordNextAvailableNode) {
recordNextAvailableNode = false;
}
}
if (shouldRecordThisNode(nodeID)) {
recordingNodesAllocation.remove(nodeID);
completedNodeAllocations.put(nodeID, value);
stopRecordNodeUpdateActivities(nodeID);
}
}
}
boolean shouldRecordThisApp(ApplicationId applicationId) {
return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
&& appsAllocation.containsKey(applicationId);
}
boolean shouldRecordThisNode(NodeId nodeID) {
return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation
.containsKey(nodeID);
}
private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) {
List<NodeAllocation> nodeAllocations = recordingNodesAllocation.get(nodeID);
NodeAllocation nodeAllocation;
// When this node has already stored allocation activities, get the
// last allocation for this node.
if (nodeAllocations.size() != 0) {
nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1);
// When final state in last allocation is not DEFAULT, it means
// last allocation has finished. Create a new allocation for this node,
// and add it to the allocation list. Return this new allocation.
//
// When final state in last allocation is DEFAULT,
// it means last allocation has not finished. Just get last allocation.
if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) {
nodeAllocation = new NodeAllocation(nodeID);
nodeAllocations.add(nodeAllocation);
}
}
// When this node has not stored allocation activities,
// create a new allocation for this node, and add it to the allocation list.
// Return this new allocation.
else {
nodeAllocation = new NodeAllocation(nodeID);
nodeAllocations.add(nodeAllocation);
}
return nodeAllocation;
}
private void stopRecordNodeUpdateActivities(NodeId nodeId) {
activeRecordedNodes.remove(nodeId);
}
private void turnOffActivityMonitoringForApp(ApplicationId applicationId) {
recordingAppActivitiesUntilSpecifiedTime.remove(applicationId);
}
}