blob: 52fccfdc5f9e773cceb6600bff1b929205606319 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.state;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.server.appmaster.operations.CancelSingleRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Tracks an outstanding request. This is used to correlate an allocation response
* with the node and role used in the request.
* <p>
* The node identifier may be null -which indicates that a request was made without
* a specific target node
* <p>
* Equality and the hash code are based <i>only</i> on the role and hostname,
* which are fixed in the constructor. This means that a simple
* instance constructed with (role, hostname) can be used to look up
* a complete request instance in the {@link OutstandingRequestTracker} map
*/
public final class OutstandingRequest extends RoleHostnamePair {
protected static final Logger log =
LoggerFactory.getLogger(OutstandingRequest.class);
/**
* Node the request is for -may be null
*/
public final NodeInstance node;
/**
* A list of all possible nodes to list in an AA request. For a non-AA
* request where {@link #node} is set, element 0 of the list is the same
* value.
*/
public final List<NodeInstance> nodes = new ArrayList<>(1);
/**
* Optional label. This is cached as the request option (explicit-location + label) is forbidden,
* yet the label needs to be retained for escalation.
*/
public String label;
/**
* Requested time in millis.
* <p>
* Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
*/
private AMRMClient.ContainerRequest issuedRequest;
/**
* Requested time in millis.
* <p>
* Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
*/
private long requestedTimeMillis;
/**
* Time in millis after which escalation should be triggered..
* <p>
* Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long)}
*/
private long escalationTimeoutMillis;
/**
* Has the placement request been escalated?
*/
private boolean escalated;
/**
* Flag to indicate that escalation is allowed
*/
private boolean mayEscalate;
/**
* Priority of request; only valid after the request is built up
*/
private int priority = -1;
/**
* Is this an Anti-affine request which should be cancelled on
* a cluster resize?
*/
private boolean antiAffine = false;
/**
* Create a request
* @param roleId role
* @param node node -can be null
*/
public OutstandingRequest(int roleId,
NodeInstance node) {
super(roleId, node != null ? node.hostname : null);
this.node = node;
nodes.add(node);
}
/**
* Create an outstanding request with the given role and hostname
* Important: this is useful only for map lookups -the other constructor
* with the NodeInstance parameter is needed to generate node-specific
* container requests
* @param roleId role
* @param hostname hostname
*/
public OutstandingRequest(int roleId, String hostname) {
super(roleId, hostname);
this.node = null;
}
/**
* Create an Anti-affine reques, including all listed nodes (there must be one)
* as targets.
* @param roleId role
* @param nodes list of nodes
*/
public OutstandingRequest(int roleId, List<NodeInstance> nodes) {
super(roleId, nodes.get(0).hostname);
this.node = null;
this.antiAffine = true;
this.nodes.addAll(nodes);
}
/**
* Is the request located in the cluster, that is: does it have a node.
* @return true if a node instance was supplied in the constructor
*/
public boolean isLocated() {
return node != null;
}
public long getRequestedTimeMillis() {
return requestedTimeMillis;
}
public long getEscalationTimeoutMillis() {
return escalationTimeoutMillis;
}
public boolean isEscalated() {
return escalated;
}
public boolean mayEscalate() {
return mayEscalate;
}
public AMRMClient.ContainerRequest getIssuedRequest() {
return issuedRequest;
}
public int getPriority() {
return priority;
}
public boolean isAntiAffine() {
return antiAffine;
}
public void setAntiAffine(boolean antiAffine) {
this.antiAffine = antiAffine;
}
/**
* Build a container request.
* <p>
* The value of {@link #node} is used to direct a lot of policy. If null,
* placement is relaxed.
* If not null, the choice of whether to use the suggested node
* is based on the placement policy and failure history.
* <p>
* If the request has an address, it is set in the container request
* (with a flag to enable relaxed priorities).
* <p>
* This operation sets the requested time flag, used for tracking timeouts
* on outstanding requests
* @param resource resource
* @param role role
* @param time time in millis to record as request time
* @return the request to raise
*/
public synchronized AMRMClient.ContainerRequest buildContainerRequest(
Resource resource, RoleStatus role, long time) {
Preconditions.checkArgument(resource != null, "null `resource` arg");
Preconditions.checkArgument(role != null, "null `role` arg");
// cache label for escalation
label = role.getLabelExpression();
requestedTimeMillis = time;
escalationTimeoutMillis = time + role.getPlacementTimeoutSeconds() * 1000;
String[] hosts;
boolean relaxLocality;
boolean strictPlacement = role.isStrictPlacement();
NodeInstance target = this.node;
String nodeLabels;
if (isAntiAffine()) {
int size = nodes.size();
log.info("Creating anti-affine request across {} nodes; first node = {}",
size, hostname);
hosts = new String[size];
StringBuilder builder = new StringBuilder(size * 16);
int c = 0;
for (NodeInstance nodeInstance : nodes) {
hosts[c++] = nodeInstance.hostname;
builder.append(nodeInstance.hostname).append(" ");
}
log.debug("Full host list: [ {}]", builder);
escalated = false;
mayEscalate = false;
relaxLocality = false;
nodeLabels = null;
} else if (target != null) {
// placed request. Hostname is used in request
hosts = new String[1];
hosts[0] = target.hostname;
// and locality flag is set to false; Slider will decide when
// to relax things
relaxLocality = false;
log.info("Submitting request for container on {}", hosts[0]);
// enable escalation for all but strict placements.
escalated = false;
mayEscalate = !strictPlacement;
nodeLabels = null;
} else {
// no hosts
hosts = null;
// relax locality is mandatory on an unconstrained placement
relaxLocality = true;
// declare that the the placement is implicitly escalated.
escalated = true;
// and forbid it happening
mayEscalate = false;
nodeLabels = label;
}
Priority pri = ContainerPriority.createPriority(roleId, !relaxLocality);
priority = pri.getPriority();
issuedRequest = new AMRMClient.ContainerRequest(resource,
hosts,
null,
pri,
relaxLocality,
nodeLabels);
validate();
return issuedRequest;
}
/**
* Build an escalated container request, updating {@link #issuedRequest} with
* the new value.
* @return the new container request, which has the same resource and label requirements
* as the original one, and the same host, but: relaxed placement, and a changed priority
* so as to place it into the relaxed list.
*/
public synchronized AMRMClient.ContainerRequest escalate() {
Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued " + this);
escalated = true;
// this is now the priority
// it is tagged as unlocated because it needs to go into a different
// set of outstanding requests from the strict placements
Priority pri = ContainerPriority.createPriority(roleId, false);
// update the field
priority = pri.getPriority();
String[] nodes;
List<String> issuedRequestNodes = issuedRequest.getNodes();
if (SliderUtils.isUnset(label) && issuedRequestNodes != null) {
nodes = issuedRequestNodes.toArray(new String[issuedRequestNodes.size()]);
} else {
nodes = null;
}
issuedRequest = new AMRMClient.ContainerRequest(issuedRequest.getCapability(),
nodes,
null,
pri,
true,
label);
validate();
return issuedRequest;
}
/**
* Mark the request as completed (or canceled).
* <p>
* Current action: if a node is defined, its request count is decremented
*/
public void completed() {
if (node != null) {
node.getOrCreate(roleId).requestCompleted();
}
}
/**
* Query to see if the request is available and ready to be escalated
* @param time time to check against
* @return true if escalation should begin
*/
public synchronized boolean shouldEscalate(long time) {
return mayEscalate
&& !escalated
&& issuedRequest != null
&& escalationTimeoutMillis < time;
}
/**
* Query for the resource requirements matching; always false before a request is issued
* @param resource
* @return
*/
public synchronized boolean resourceRequirementsMatch(Resource resource) {
return issuedRequest != null && Resources.fitsIn(issuedRequest.getCapability(), resource);
}
@Override
public String toString() {
boolean requestHasLocation = ContainerPriority.hasLocation(getPriority());
final StringBuilder sb = new StringBuilder("OutstandingRequest{");
sb.append("roleId=").append(roleId);
if (hostname != null) {
sb.append(", hostname='").append(hostname).append('\'');
}
sb.append(", node=").append(node);
sb.append(", hasLocation=").append(requestHasLocation);
sb.append(", requestedTimeMillis=").append(requestedTimeMillis);
sb.append(", mayEscalate=").append(mayEscalate);
sb.append(", escalated=").append(escalated);
sb.append(", escalationTimeoutMillis=").append(escalationTimeoutMillis);
sb.append(", issuedRequest=").append(
issuedRequest != null ? SliderUtils.requestToString(issuedRequest) : "(null)");
sb.append('}');
return sb.toString();
}
/**
* Create a cancel operation
* @return an operation that can be used to cancel the request
*/
public CancelSingleRequest createCancelOperation() {
Preconditions.checkState(issuedRequest != null, "No issued request to cancel");
return new CancelSingleRequest(issuedRequest);
}
/**
* Valid if a node label expression specified on container request is valid or
* not. Mimics the logic in AMRMClientImpl, so can be used for preflight checking
* and in mock tests
*
*/
public void validate() throws InvalidContainerRequestException {
Preconditions.checkNotNull(issuedRequest, "request has not yet been built up");
AMRMClient.ContainerRequest containerRequest = issuedRequest;
String exp = containerRequest.getNodeLabelExpression();
if (null == exp || exp.isEmpty()) {
return;
}
// Don't support specifying >= 2 node labels in a node label expression now
if (exp.contains("&&") || exp.contains("||")) {
throw new InvalidContainerRequestException(
"Cannot specify more than two node labels"
+ " in a single node label expression: " + this);
}
// Don't allow specify node label against ANY request listing hosts or racks
if ((containerRequest.getRacks() != null &&
(!containerRequest.getRacks().isEmpty()))
||
(containerRequest.getNodes() != null &&
(!containerRequest.getNodes().isEmpty()))) {
throw new InvalidContainerRequestException(
"Cannot specify node label with rack and node: " + this);
}
// relax priority
boolean hasLocation = ContainerPriority.hasLocation(priority);
if (containerRequest.getRelaxLocality() != !hasLocation) {
throw new InvalidContainerRequestException(
"relax location flag doesn't match container priority: "
+ this);
}
}
/**
* Create a new role/hostname pair for indexing.
* @return a new index.
*/
public RoleHostnamePair getIndex() {
return new RoleHostnamePair(roleId, hostname);
}
}