blob: 07a5cf9efcf5bca5fa8b7273904c8d4a9af97eee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.state;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.slider.api.types.ComponentInformation;
import org.apache.slider.api.types.RoleStatistics;
import org.apache.slider.providers.PlacementPolicy;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.server.appmaster.management.BoolMetricPredicate;
import org.apache.slider.server.appmaster.management.LongGauge;
import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Models the ongoing status of all nodes in an application.
*
* These structures are shared across the {@link AppState} and {@link RoleHistory} structures,
* and must be designed for synchronous access. Atomic counters are preferred to anything which
* requires synchronization. Where synchronized access is good is that it allows for
* the whole instance to be locked, for updating multiple entries.
*/
public final class RoleStatus implements Cloneable, MetricSet {
private final String name;
private final String group;
/**
* Role priority
*/
private final int key;
private final ProviderRole providerRole;
private final LongGauge actual = new LongGauge();
private final LongGauge completed = new LongGauge();
private final LongGauge desired = new LongGauge();
private final LongGauge failed = new LongGauge();
private final LongGauge failedRecently = new LongGauge(0);
private final LongGauge limitsExceeded = new LongGauge(0);
private final LongGauge nodeFailed = new LongGauge(0);
/** Number of AA requests queued. */
private final LongGauge pendingAntiAffineRequests = new LongGauge(0);
private final LongGauge preempted = new LongGauge(0);
private final LongGauge releasing = new LongGauge();
private final LongGauge requested = new LongGauge();
private final LongGauge started = new LongGauge();
private final LongGauge startFailed = new LongGauge();
private final LongGauge totalRequested = new LongGauge();
/** resource requirements */
private Resource resourceRequirements;
/** any pending AA request */
private volatile OutstandingRequest outstandingAArequest = null;
private String failureMessage = "";
private final Set<ContainerId> failedContainers = new HashSet<>();
public RoleStatus(ProviderRole providerRole) {
this.providerRole = providerRole;
this.name = providerRole.name;
this.group = providerRole.group;
this.key = providerRole.id;
}
@Override
public Map<String, Metric> getMetrics() {
Map<String, Metric> metrics = new HashMap<>(15);
metrics.put("actual", actual);
metrics.put("completed", completed );
metrics.put("desired", desired);
metrics.put("failed", failed);
metrics.put("limitsExceeded", limitsExceeded);
metrics.put("nodeFailed", nodeFailed);
metrics.put("preempted", preempted);
metrics.put("pendingAntiAffineRequests", pendingAntiAffineRequests);
metrics.put("releasing", releasing);
metrics.put("requested", requested);
metrics.put("preempted", preempted);
metrics.put("releasing", releasing );
metrics.put("requested", requested);
metrics.put("started", started);
metrics.put("startFailed", startFailed);
metrics.put("totalRequested", totalRequested);
metrics.put("outstandingAArequest",
new BoolMetricPredicate(new BoolMetricPredicate.Eval() {
@Override
public boolean eval() {
return isAARequestOutstanding();
}
}));
return metrics;
}
public String getName() {
return name;
}
public String getGroup() {
return group;
}
public int getKey() {
return key;
}
public int getPriority() {
return getKey();
}
/**
* Get the placement policy enum, from the values in
* {@link PlacementPolicy}
* @return the placement policy for this role
*/
public int getPlacementPolicy() {
return providerRole.placementPolicy;
}
public long getPlacementTimeoutSeconds() {
return providerRole.placementTimeoutSeconds;
}
/**
* The number of failures on a specific node that can be tolerated
* before selecting a different node for placement
* @return
*/
public int getNodeFailureThreshold() {
return providerRole.nodeFailureThreshold;
}
public boolean isExcludeFromFlexing() {
return hasPlacementPolicy(PlacementPolicy.EXCLUDE_FROM_FLEXING);
}
public boolean isStrictPlacement() {
return hasPlacementPolicy(PlacementPolicy.STRICT);
}
public boolean isAntiAffinePlacement() {
return hasPlacementPolicy(PlacementPolicy.ANTI_AFFINITY_REQUIRED);
}
public boolean hasPlacementPolicy(int policy) {
return 0 != (getPlacementPolicy() & policy);
}
public boolean isPlacementDesired() {
return !hasPlacementPolicy(PlacementPolicy.ANYWHERE);
}
public long getDesired() {
return desired.get();
}
public void setDesired(long desired) {
this.desired.set(desired);
}
public long getActual() {
return actual.get();
}
public long incActual() {
return actual.incrementAndGet();
}
public long decActual() {
return actual.decToFloor(1);
}
/**
* Get the request count.
* @return a count of requested containers
*/
public long getRequested() {
return requested.get();
}
public long incRequested() {
totalRequested.incrementAndGet();
return requested.incrementAndGet();
}
public void cancel(long count) {
requested.decToFloor(count);
}
public void decRequested() {
cancel(1);
}
public long getReleasing() {
return releasing.get();
}
public long incReleasing() {
return releasing.incrementAndGet();
}
public long decReleasing() {
return releasing.decToFloor(1);
}
public long getFailed() {
return failed.get();
}
public long getFailedRecently() {
return failedRecently.get();
}
/**
* Reset the recent failure
* @return the number of failures in the "recent" window
*/
public synchronized long resetFailedRecently() {
// clear failedContainers
failedContainers.clear();
return failedRecently.getAndSet(0);
}
public long getLimitsExceeded() {
return limitsExceeded.get();
}
public long incPendingAntiAffineRequests(long v) {
return pendingAntiAffineRequests.addAndGet(v);
}
/**
* Probe for an outstanding AA request being true
* @return true if there is an outstanding AA Request
*/
public boolean isAARequestOutstanding() {
return outstandingAArequest != null;
}
/**
* expose the predicate {@link #isAARequestOutstanding()} as an integer,
* which is very convenient in tests
* @return 1 if there is an outstanding request; 0 if not
*/
public int getOutstandingAARequestCount() {
return isAARequestOutstanding()? 1: 0;
}
/**
* Note that a role failed, text will
* be used in any diagnostics if an exception
* is later raised.
* @param startupFailure flag to indicate this was a startup event
* @param text text about the failure
* @param outcome outcome of the container
*/
public synchronized void noteFailed(boolean startupFailure, String text,
ContainerOutcome outcome, ContainerId containerId) {
if (text != null) {
failureMessage = text;
}
if (containerId != null) {
failedContainers.add(containerId);
}
switch (outcome) {
case Preempted:
preempted.incrementAndGet();
break;
case Node_failure:
nodeFailed.incrementAndGet();
failed.incrementAndGet();
break;
case Failed_limits_exceeded: // exceeded memory or CPU; app/configuration related
limitsExceeded.incrementAndGet();
// fall through
case Failed: // application failure, possibly node related, possibly not
default: // anything else (future-proofing)
failed.incrementAndGet();
failedRecently.incrementAndGet();
//have a look to see if it short lived
if (startupFailure) {
incStartFailed();
}
break;
}
}
public long getStartFailed() {
return startFailed.get();
}
public synchronized void incStartFailed() {
startFailed.getAndIncrement();
}
public synchronized String getFailureMessage() {
return failureMessage;
}
public synchronized Set<ContainerId> getFailedContainers() {
return Collections.unmodifiableSet(failedContainers);
}
public long getCompleted() {
return completed.get();
}
public synchronized void setCompleted(int completed) {
this.completed.set(completed);
}
public long incCompleted() {
return completed.incrementAndGet();
}
public long getStarted() {
return started.get();
}
public synchronized void incStarted() {
started.incrementAndGet();
}
public long getTotalRequested() {
return totalRequested.get();
}
public long getPreempted() {
return preempted.get();
}
public long getNodeFailed() {
return nodeFailed.get();
}
public long getPendingAntiAffineRequests() {
return pendingAntiAffineRequests.get();
}
public void setPendingAntiAffineRequests(long pendingAntiAffineRequests) {
this.pendingAntiAffineRequests.set(pendingAntiAffineRequests);
}
public long decPendingAntiAffineRequests() {
return pendingAntiAffineRequests.decToFloor(1);
}
public OutstandingRequest getOutstandingAArequest() {
return outstandingAArequest;
}
public void setOutstandingAArequest(OutstandingRequest outstandingAArequest) {
this.outstandingAArequest = outstandingAArequest;
}
/**
* Complete the outstanding AA request (there's no check for one in progress, caller
* expected to have done that).
*/
public void completeOutstandingAARequest() {
setOutstandingAArequest(null);
}
/**
* Cancel any outstanding AA request. Harmless if the role is non-AA, or
* if there are no outstanding requests.
*/
public void cancelOutstandingAARequest() {
if (outstandingAArequest != null) {
setOutstandingAArequest(null);
setPendingAntiAffineRequests(0);
decRequested();
}
}
/**
* Get the number of roles we are short of.
* nodes released are ignored.
* @return the positive or negative number of roles to add/release.
* 0 means "do nothing".
*/
public long getDelta() {
long inuse = getActualAndRequested();
long delta = desired.get() - inuse;
if (delta < 0) {
//if we are releasing, remove the number that are already released.
delta += releasing.get();
//but never switch to a positive
delta = Math.min(delta, 0);
}
return delta;
}
/**
* Get count of actual and requested containers. This includes pending ones
* @return the size of the application when outstanding requests are included.
*/
public long getActualAndRequested() {
return actual.get() + requested.get();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("RoleStatus{");
sb.append("name='").append(name).append('\'');
sb.append(", group=").append(group);
sb.append(", key=").append(key);
sb.append(", desired=").append(desired);
sb.append(", actual=").append(actual);
sb.append(", requested=").append(requested);
sb.append(", releasing=").append(releasing);
sb.append(", failed=").append(failed);
sb.append(", startFailed=").append(startFailed);
sb.append(", started=").append(started);
sb.append(", completed=").append(completed);
sb.append(", totalRequested=").append(totalRequested);
sb.append(", preempted=").append(preempted);
sb.append(", nodeFailed=").append(nodeFailed);
sb.append(", failedRecently=").append(failedRecently);
sb.append(", limitsExceeded=").append(limitsExceeded);
sb.append(", resourceRequirements=").append(resourceRequirements);
sb.append(", isAntiAffinePlacement=").append(isAntiAffinePlacement());
if (isAntiAffinePlacement()) {
sb.append(", pendingAntiAffineRequests=").append(pendingAntiAffineRequests);
sb.append(", outstandingAArequest=").append(outstandingAArequest);
}
sb.append(", failureMessage='").append(failureMessage).append('\'');
sb.append(", providerRole=").append(providerRole);
sb.append(", failedContainers=").append(failedContainers);
sb.append('}');
return sb.toString();
}
@Override
public synchronized Object clone() throws CloneNotSupportedException {
return super.clone();
}
/**
* Get the provider role
* @return the provider role
*/
public ProviderRole getProviderRole() {
return providerRole;
}
/**
* Build the statistics map from the current data
* @return a map for use in statistics reports
*/
public Map<String, Integer> buildStatistics() {
ComponentInformation componentInformation = serialize();
return componentInformation.buildStatistics();
}
/**
* Produced a serialized form which can be served up as JSON
* @return a summary of the current role status.
*/
public synchronized ComponentInformation serialize() {
ComponentInformation info = new ComponentInformation();
info.name = name;
info.priority = getPriority();
info.desired = desired.intValue();
info.actual = actual.intValue();
info.requested = requested.intValue();
info.releasing = releasing.intValue();
info.failed = failed.intValue();
info.startFailed = startFailed.intValue();
info.placementPolicy = getPlacementPolicy();
info.failureMessage = failureMessage;
info.totalRequested = totalRequested.intValue();
info.failedRecently = failedRecently.intValue();
info.nodeFailed = nodeFailed.intValue();
info.preempted = preempted.intValue();
info.pendingAntiAffineRequestCount = pendingAntiAffineRequests.intValue();
info.isAARequestOutstanding = isAARequestOutstanding();
return info;
}
/**
* Get the (possibly null) label expression for this role
* @return a string or null
*/
public String getLabelExpression() {
return providerRole.labelExpression;
}
public Resource getResourceRequirements() {
return resourceRequirements;
}
public void setResourceRequirements(Resource resourceRequirements) {
this.resourceRequirements = resourceRequirements;
}
/**
* Compare two role status entries by name
*/
public static class CompareByName implements Comparator<RoleStatus>,
Serializable {
@Override
public int compare(RoleStatus o1, RoleStatus o2) {
return o1.getName().compareTo(o2.getName());
}
}
/**
* Compare two role status entries by key
*/
public static class CompareByKey implements Comparator<RoleStatus>,
Serializable {
@Override
public int compare(RoleStatus o1, RoleStatus o2) {
return (o1.getKey() < o2.getKey() ? -1 : (o1.getKey() == o2.getKey() ? 0 : 1));
}
}
/**
* Given a resource, set its requirements to those this role needs
* @param resource resource to configure
* @return the resource
*/
public Resource copyResourceRequirements(Resource resource) {
Preconditions.checkNotNull(resourceRequirements,
"Role resource requirements have not been set");
resource.setMemory(resourceRequirements.getMemory());
resource.setVirtualCores(resourceRequirements.getVirtualCores());
return resource;
}
public synchronized RoleStatistics getStatistics() {
RoleStatistics stats = new RoleStatistics();
stats.activeAA = getOutstandingAARequestCount();
stats.actual = actual.get();
stats.desired = desired.get();
stats.failed = failed.get();
stats.limitsExceeded = limitsExceeded.get();
stats.nodeFailed = nodeFailed.get();
stats.preempted = preempted.get();
stats.releasing = releasing.get();
stats.requested = requested.get();
stats.started = started.get();
stats.startFailed = startFailed.get();
stats.totalRequested = totalRequested.get();
return stats;
}
}