blob: 6308b41058a759962b115d2cc537fd9cebca9604 [file] [log] [blame]
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* In memory implementation of the {@link PlacementConstraintManagerService}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MemoryPlacementConstraintManager
extends PlacementConstraintManagerService {
private static final Logger LOG =
LoggerFactory.getLogger(MemoryPlacementConstraintManager.class);
private ReentrantReadWriteLock.ReadLock readLock;
private ReentrantReadWriteLock.WriteLock writeLock;
/**
* Stores the global constraints that will be manipulated by the cluster
* admin. The key of each entry is the tag that will enable the corresponding
* constraint.
*/
private Map<String, PlacementConstraint> globalConstraints;
/**
* Stores the constraints for each application, along with the allocation tags
* that will enable each of the constraints for a given application.
*/
private Map<ApplicationId, Map<String, PlacementConstraint>> appConstraints;
public MemoryPlacementConstraintManager() {
this.globalConstraints = new HashMap<>();
this.appConstraints = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
public void registerApplication(ApplicationId appId,
Map<Set<String>, PlacementConstraint> constraintMap) {
// Check if app already exists. If not, prepare its constraint map.
Map<String, PlacementConstraint> constraintsForApp = new HashMap<>();
readLock.lock();
try {
if (appConstraints.get(appId) != null) {
LOG.warn("Application {} has already been registered.", appId);
return;
}
// Go over each sourceTag-constraint pair, validate it, and add it to the
// constraint map for this app.
for (Map.Entry<Set<String>, PlacementConstraint> entry : constraintMap
.entrySet()) {
Set<String> sourceTags = entry.getKey();
PlacementConstraint constraint = entry.getValue();
if (validateConstraint(sourceTags, constraint)) {
String sourceTag = getValidSourceTag(sourceTags);
constraintsForApp.put(sourceTag, constraint);
}
}
} finally {
readLock.unlock();
}
if (constraintsForApp.isEmpty()) {
LOG.info("Application {} was registered, but no constraints were added.",
appId);
}
// Update appConstraints.
writeLock.lock();
try {
appConstraints.put(appId, constraintsForApp);
} finally {
writeLock.unlock();
}
}
@Override
public void addConstraint(ApplicationId appId, Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
writeLock.lock();
try {
Map<String, PlacementConstraint> constraintsForApp =
appConstraints.get(appId);
if (constraintsForApp == null) {
LOG.info("Cannot add constraint to application {}, as it has not "
+ "been registered yet.", appId);
return;
}
addConstraintToMap(constraintsForApp, sourceTags, placementConstraint,
replace);
} finally {
writeLock.unlock();
}
}
@Override
public void addGlobalConstraint(Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
writeLock.lock();
try {
addConstraintToMap(globalConstraints, sourceTags, placementConstraint,
replace);
} finally {
writeLock.unlock();
}
}
/**
* Helper method that adds a constraint to a map for a given source tag.
* Assumes there is already a lock on the constraint map.
*
* @param constraintMap constraint map to which the constraint will be added
* @param sourceTags the source tags that will enable this constraint
* @param placementConstraint the new constraint to be added
* @param replace if true, an existing constraint for these sourceTags will be
* replaced with the new one
*/
private void addConstraintToMap(
Map<String, PlacementConstraint> constraintMap, Set<String> sourceTags,
PlacementConstraint placementConstraint, boolean replace) {
if (validateConstraint(sourceTags, placementConstraint)) {
String sourceTag = getValidSourceTag(sourceTags);
if (constraintMap.get(sourceTag) == null || replace) {
if (replace) {
LOG.info("Replacing the constraint associated with tag {} with {}.",
sourceTag, placementConstraint);
}
constraintMap.put(sourceTag, placementConstraint);
} else {
LOG.info("Constraint {} will not be added. There is already a "
+ "constraint associated with tag {}.",
placementConstraint, sourceTag);
}
}
}
@Override
public Map<Set<String>, PlacementConstraint> getConstraints(
ApplicationId appId) {
readLock.lock();
try {
if (appConstraints.get(appId) == null) {
LOG.debug("Application {} is not registered in the Placement "
+ "Constraint Manager.", appId);
return null;
}
// Copy to a new map and return an unmodifiable version of it.
// Each key of the map is a set with a single source tag.
Map<Set<String>, PlacementConstraint> constraintMap =
appConstraints.get(appId).entrySet().stream()
.collect(Collectors.toMap(
e -> Stream.of(e.getKey()).collect(Collectors.toSet()),
e -> e.getValue()));
return Collections.unmodifiableMap(constraintMap);
} finally {
readLock.unlock();
}
}
@Override
public PlacementConstraint getConstraint(ApplicationId appId,
Set<String> sourceTags) {
if (!validateSourceTags(sourceTags)) {
return null;
}
String sourceTag = getValidSourceTag(sourceTags);
readLock.lock();
try {
if (appConstraints.get(appId) == null) {
LOG.debug("Application {} is not registered in the Placement "
+ "Constraint Manager.", appId);
return null;
}
// TODO: Merge this constraint with the global one for this tag, if one
// exists.
return appConstraints.get(appId).get(sourceTag);
} finally {
readLock.unlock();
}
}
@Override
public PlacementConstraint getGlobalConstraint(Set<String> sourceTags) {
if (!validateSourceTags(sourceTags)) {
return null;
}
String sourceTag = getValidSourceTag(sourceTags);
readLock.lock();
try {
return globalConstraints.get(sourceTag);
} finally {
readLock.unlock();
}
}
@Override
public PlacementConstraint getMultilevelConstraint(ApplicationId appId,
Set<String> sourceTags, PlacementConstraint schedulingRequestConstraint) {
List<PlacementConstraint> constraints = new ArrayList<>();
// Add scheduling request-level constraint.
if (schedulingRequestConstraint != null) {
constraints.add(schedulingRequestConstraint);
}
// Add app-level constraint if appId is given.
if (appId != null && sourceTags != null
&& !sourceTags.isEmpty()) {
constraints.add(getConstraint(appId, sourceTags));
}
// Add global constraint.
if (sourceTags != null && !sourceTags.isEmpty()) {
constraints.add(getGlobalConstraint(sourceTags));
}
// Remove all null or duplicate constraints.
List<PlacementConstraint.AbstractConstraint> allConstraints =
constraints.stream()
.filter(placementConstraint -> placementConstraint != null
&& placementConstraint.getConstraintExpr() != null)
.map(PlacementConstraint::getConstraintExpr)
.distinct()
.collect(Collectors.toList());
// Compose an AND constraint
// When merge request(RC), app(AC) and global constraint(GC),
// we do a merge on them with CC=AND(GC, AC, RC) and returns a
// composite AND constraint. Subsequently we check if CC could
// be satisfied. This ensures that every level of constraint
// is satisfied.
PlacementConstraint.And andConstraint = PlacementConstraints.and(
allConstraints.toArray(new PlacementConstraint
.AbstractConstraint[allConstraints.size()]));
return andConstraint.build();
}
@Override
public void unregisterApplication(ApplicationId appId) {
writeLock.lock();
try {
appConstraints.remove(appId);
} finally {
writeLock.unlock();
}
}
@Override
public void removeGlobalConstraint(Set<String> sourceTags) {
if (!validateSourceTags(sourceTags)) {
return;
}
String sourceTag = getValidSourceTag(sourceTags);
writeLock.lock();
try {
globalConstraints.remove(sourceTag);
} finally {
writeLock.unlock();
}
}
@Override
public int getNumRegisteredApplications() {
readLock.lock();
try {
return appConstraints.size();
} finally {
readLock.unlock();
}
}
@Override
public int getNumGlobalConstraints() {
readLock.lock();
try {
return globalConstraints.size();
} finally {
readLock.unlock();
}
}
}