blob: 39897f6201187cd8ced773c60ce6ae6dfafed01a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.ServiceMetrics;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.monitor.probe.Probe;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Apps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
import static org.apache.hadoop.yarn.service.component.ComponentState.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_THRESHOLD;
public class Component implements EventHandler<ComponentEvent> {
private static final Logger LOG = LoggerFactory.getLogger(Component.class);
private org.apache.hadoop.yarn.service.api.records.Component componentSpec;
private long allocateId;
private Priority priority;
private ServiceMetrics componentMetrics;
private ServiceScheduler scheduler;
private ServiceContext context;
private AMRMClientAsync<ContainerRequest> amrmClient;
private AtomicLong instanceIdCounter = new AtomicLong();
private Map<String, ComponentInstance> compInstances =
new ConcurrentHashMap<>();
// component instances to be assigned with a container
private List<ComponentInstance> pendingInstances =
Collections.synchronizedList(new LinkedList<>());
private ContainerFailureTracker failureTracker;
private Probe probe;
private final ReentrantReadWriteLock.ReadLock readLock;
private final ReentrantReadWriteLock.WriteLock writeLock;
public int maxContainerFailurePerComp;
// The number of containers failed since last reset. This excludes preempted,
// disk_failed containers etc. This will be reset to 0 periodically.
public AtomicInteger currentContainerFailure = new AtomicInteger(0);
private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
stateMachine;
private AsyncDispatcher dispatcher;
private static final StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>
stateMachineFactory =
new StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>(
INIT)
// INIT will only got to FLEXING
.addTransition(INIT, EnumSet.of(STABLE, FLEXING, INIT),
FLEX, new FlexComponentTransition())
// container recovered on AM restart
.addTransition(INIT, INIT, CONTAINER_RECOVERED,
new ContainerRecoveredTransition())
// container recovered in AM heartbeat
.addTransition(FLEXING, FLEXING, CONTAINER_RECOVERED,
new ContainerRecoveredTransition())
// container allocated by RM
.addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
new ContainerAllocatedTransition())
// container launched on NM
.addTransition(FLEXING, EnumSet.of(STABLE, FLEXING),
CONTAINER_STARTED, new ContainerStartedTransition())
// container failed while flexing
.addTransition(FLEXING, FLEXING, CONTAINER_COMPLETED,
new ContainerCompletedTransition())
// Flex while previous flex is still in progress
.addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX,
new FlexComponentTransition())
// container failed while stable
.addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
new ContainerCompletedTransition())
// Ignore surplus container
.addTransition(STABLE, STABLE, CONTAINER_ALLOCATED,
new ContainerAllocatedTransition())
// Flex by user
// For flex up, go to FLEXING state
// For flex down, go to STABLE state
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
FLEX, new FlexComponentTransition())
.addTransition(STABLE, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(FLEXING, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(UPGRADING, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.installTopology();
public Component(
org.apache.hadoop.yarn.service.api.records.Component component,
long allocateId, ServiceContext context) {
this.allocateId = allocateId;
this.priority = Priority.newInstance((int) allocateId);
this.componentSpec = component;
componentMetrics = ServiceMetrics.register(component.getName(),
"Metrics for component " + component.getName());
componentMetrics
.tag("type", "Metrics type [component or service]", "component");
this.scheduler = context.scheduler;
this.context = context;
amrmClient = scheduler.getAmRMClient();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
dispatcher = scheduler.getDispatcher();
failureTracker =
new ContainerFailureTracker(context, this);
probe = MonitorUtils.getProbe(componentSpec.getReadinessCheck());
maxContainerFailurePerComp = componentSpec.getConfiguration()
.getPropertyInt(CONTAINER_FAILURE_THRESHOLD, 10);
createNumCompInstances(component.getNumberOfContainers());
setDesiredContainers(component.getNumberOfContainers().intValue());
}
private void createNumCompInstances(long count) {
for (int i = 0; i < count; i++) {
createOneCompInstance();
}
}
private void createOneCompInstance() {
ComponentInstanceId id =
new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
componentSpec.getName());
ComponentInstance instance = new ComponentInstance(this, id);
compInstances.put(instance.getCompInstanceName(), instance);
pendingInstances.add(instance);
}
private static class FlexComponentTransition implements
MultipleArcTransition<Component, ComponentEvent, ComponentState> {
// For flex up, go to FLEXING state
// For flex down, go to STABLE state
@Override
public ComponentState transition(Component component,
ComponentEvent event) {
component.setDesiredContainers((int)event.getDesired());
if (!component.areDependenciesReady()) {
LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not"
+ " satisfied.", component.getName());
return component.getState();
}
if (component.getState() == INIT) {
// This happens on init
LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event
.getDesired() + " instances.");
component.requestContainers(component.pendingInstances.size());
return checkIfStable(component);
}
long before = component.getComponentSpec().getNumberOfContainers();
long delta = event.getDesired() - before;
component.getComponentSpec().setNumberOfContainers(event.getDesired());
if (delta > 0) {
// Scale up
LOG.info("[FLEX UP COMPONENT " + component.getName() + "]: scaling up from "
+ before + " to " + event.getDesired());
component.requestContainers(delta);
component.createNumCompInstances(delta);
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
component.getScheduler().getApp().setState(ServiceState.STARTED);
return FLEXING;
} else if (delta < 0){
delta = 0 - delta;
// scale down
LOG.info("[FLEX DOWN COMPONENT " + component.getName()
+ "]: scaling down from " + before + " to " + event.getDesired());
List<ComponentInstance> list =
new ArrayList<>(component.getAllComponentInstances());
// sort in Most recent -> oldest order, destroy most recent ones.
list.sort(Collections.reverseOrder());
for (int i = 0; i < delta; i++) {
ComponentInstance instance = list.get(i);
// remove the instance
component.compInstances.remove(instance.getCompInstanceName());
component.pendingInstances.remove(instance);
// decrement id counter
component.instanceIdCounter.decrementAndGet();
instance.destroy();
}
checkAndUpdateComponentState(component, false);
return STABLE;
} else {
LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " +
event.getDesired() + " instances, ignoring");
return STABLE;
}
}
}
private static class ContainerAllocatedTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
component.assignContainerToCompInstance(event.getContainer());
}
}
private static class ContainerRecoveredTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
ComponentInstance instance = event.getInstance();
Container container = event.getContainer();
if (instance == null) {
LOG.info("[COMPONENT {}]: Trying to recover {} but event did not " +
"specify component instance",
component.getName(), container.getId());
component.releaseContainer(container);
return;
}
component.pendingInstances.remove(instance);
instance.setContainer(container);
ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
component.getScheduler().addLiveCompInstance(container.getId(), instance);
LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
"host {}, num pending component instances reduced to {} ",
component.getName(), container.getId(),
instance.getCompInstanceName(), container.getNodeId(),
component.pendingInstances.size());
component.dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(container.getId(), START));
}
}
private static class ContainerStartedTransition implements
MultipleArcTransition<Component,ComponentEvent,ComponentState> {
@Override public ComponentState transition(Component component,
ComponentEvent event) {
component.dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(event.getContainerId(), START));
return checkIfStable(component);
}
}
private static ComponentState checkIfStable(Component component) {
// if desired == running
if (component.componentMetrics.containersReady.value() == component
.getComponentSpec().getNumberOfContainers()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE;
} else {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
return FLEXING;
}
}
// This method should be called whenever there is an increment or decrement
// of a READY state container of a component
public static synchronized void checkAndUpdateComponentState(
Component component, boolean isIncrement) {
org.apache.hadoop.yarn.service.api.records.ComponentState curState =
component.componentSpec.getState();
if (isIncrement) {
// check if all containers are in READY state
if (component.componentMetrics.containersReady
.value() == component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(component.scheduler,
isIncrement);
}
} else {
// container moving out of READY state could be because of FLEX down so
// still need to verify the count before changing the component state
if (component.componentMetrics.containersReady
.value() < component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
if (curState != component.componentSpec.getState()) {
LOG.info("[COMPONENT {}] state changed from {} -> {}",
component.componentSpec.getName(), curState,
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(component.scheduler,
isIncrement);
}
}
}
private static class ContainerCompletedTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
component.updateMetrics(event.getStatus());
component.dispatcher.getEventHandler().handle(
new ComponentInstanceEvent(event.getStatus().getContainerId(),
STOP).setStatus(event.getStatus()));
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
component.getScheduler().getApp().setState(ServiceState.STARTED);
}
}
private static class ComponentNeedsUpgradeTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
}
}
public void removePendingInstance(ComponentInstance instance) {
pendingInstances.remove(instance);
}
public void reInsertPendingInstance(ComponentInstance instance) {
pendingInstances.add(instance);
}
private void releaseContainer(Container container) {
scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
componentMetrics.surplusContainers.incr();
scheduler.getServiceMetrics().surplusContainers.incr();
}
private void assignContainerToCompInstance(Container container) {
if (pendingInstances.size() == 0) {
LOG.info(
"[COMPONENT {}]: No pending component instance left, release surplus container {}",
getName(), container.getId());
releaseContainer(container);
return;
}
ComponentInstance instance = pendingInstances.remove(0);
LOG.info(
"[COMPONENT {}]: {} allocated, num pending component instances reduced to {}",
getName(), container.getId(), pendingInstances.size());
instance.setContainer(container);
scheduler.addLiveCompInstance(container.getId(), instance);
LOG.info(
"[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
getName(), container.getId(), instance.getCompInstanceName(),
container.getNodeId());
scheduler.getContainerLaunchService()
.launchCompInstance(scheduler.getApp(), instance, container);
}
@SuppressWarnings({ "unchecked" })
public void requestContainers(long count) {
LOG.info("[COMPONENT {}] Requesting for {} container(s)",
componentSpec.getName(), count);
org.apache.hadoop.yarn.service.api.records.Resource componentResource =
componentSpec.getResource();
Resource resource = Resource.newInstance(componentResource.calcMemoryMB(),
componentResource.getCpus());
if (componentResource.getAdditional() != null) {
for (Map.Entry<String, ResourceInformation> entry : componentResource
.getAdditional().entrySet()) {
String resourceName = entry.getKey();
// Avoid setting memory/cpu under "additional"
if (resourceName.equals(
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI)
|| resourceName.equals(
org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI)) {
LOG.warn("Please set memory/vcore in the main section of resource, "
+ "ignoring this entry=" + resourceName);
continue;
}
ResourceInformation specInfo = entry.getValue();
org.apache.hadoop.yarn.api.records.ResourceInformation ri =
org.apache.hadoop.yarn.api.records.ResourceInformation.newInstance(
entry.getKey(),
specInfo.getUnit(),
specInfo.getValue());
resource.setResourceInformation(resourceName, ri);
}
}
if (!scheduler.hasAtLeastOnePlacementConstraint()) {
for (int i = 0; i < count; i++) {
ContainerRequest request = ContainerRequest.newBuilder()
.capability(resource).priority(priority)
.allocationRequestId(allocateId).relaxLocality(true).build();
LOG.info("[COMPONENT {}] Submitting container request : {}",
componentSpec.getName(), request);
amrmClient.addContainerRequest(request);
}
} else {
// Schedule placement requests. Validation of non-null target tags and
// that they refer to existing component names are already done. So, no
// need to validate here.
PlacementPolicy placementPolicy = componentSpec.getPlacementPolicy();
Collection<SchedulingRequest> schedulingRequests = new HashSet<>();
// We prepare an AND-ed composite constraint to be the final composite
// constraint. If placement expressions are specified to create advanced
// composite constraints then this AND-ed composite constraint is not
// used.
PlacementConstraint finalConstraint = null;
for (org.apache.hadoop.yarn.service.api.records.PlacementConstraint
yarnServiceConstraint : placementPolicy.getConstraints()) {
List<TargetExpression> targetExpressions = new ArrayList<>();
// Currently only intra-application allocation tags are supported.
if (!yarnServiceConstraint.getTargetTags().isEmpty()) {
targetExpressions.add(PlacementTargets.allocationTagToIntraApp(
yarnServiceConstraint.getTargetTags().toArray(new String[0])));
}
// Add all node attributes
for (Map.Entry<String, List<String>> attribute : yarnServiceConstraint
.getNodeAttributes().entrySet()) {
targetExpressions.add(PlacementTargets.nodeAttribute(
attribute.getKey(), attribute.getValue().toArray(new String[0])));
}
// Add all node partitions
if (!yarnServiceConstraint.getNodePartitions().isEmpty()) {
targetExpressions
.add(PlacementTargets.nodePartition(yarnServiceConstraint
.getNodePartitions().toArray(new String[0])));
}
PlacementConstraint constraint = null;
switch (yarnServiceConstraint.getType()) {
case AFFINITY:
constraint = PlacementConstraints
.targetIn(yarnServiceConstraint.getScope().getValue(),
targetExpressions.toArray(new TargetExpression[0]))
.build();
break;
case ANTI_AFFINITY:
constraint = PlacementConstraints
.targetNotIn(yarnServiceConstraint.getScope().getValue(),
targetExpressions.toArray(new TargetExpression[0]))
.build();
break;
case AFFINITY_WITH_CARDINALITY:
constraint = PlacementConstraints.targetCardinality(
yarnServiceConstraint.getScope().name().toLowerCase(),
yarnServiceConstraint.getMinCardinality() == null ? 0
: yarnServiceConstraint.getMinCardinality().intValue(),
yarnServiceConstraint.getMaxCardinality() == null
? Integer.MAX_VALUE
: yarnServiceConstraint.getMaxCardinality().intValue(),
targetExpressions.toArray(new TargetExpression[0])).build();
break;
}
// The default AND-ed final composite constraint
if (finalConstraint != null) {
finalConstraint = PlacementConstraints
.and(constraint.getConstraintExpr(),
finalConstraint.getConstraintExpr())
.build();
} else {
finalConstraint = constraint;
}
LOG.debug("[COMPONENT {}] Placement constraint: {}",
componentSpec.getName(), constraint.getConstraintExpr().toString());
}
ResourceSizing resourceSizing = ResourceSizing.newInstance((int) count,
resource);
LOG.debug("[COMPONENT {}] Resource sizing: {}", componentSpec.getName(),
resourceSizing);
SchedulingRequest request = SchedulingRequest.newBuilder()
.priority(priority).allocationRequestId(allocateId)
.allocationTags(Collections.singleton(componentSpec.getName()))
.executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true))
.placementConstraintExpression(finalConstraint)
.resourceSizing(resourceSizing).build();
LOG.info("[COMPONENT {}] Submitting scheduling request: {}",
componentSpec.getName(), request);
schedulingRequests.add(request);
amrmClient.addSchedulingRequests(schedulingRequests);
}
}
private void setDesiredContainers(int n) {
int delta = n - scheduler.getServiceMetrics().containersDesired.value();
if (delta > 0) {
scheduler.getServiceMetrics().containersDesired.incr(delta);
} else {
scheduler.getServiceMetrics().containersDesired.decr(delta);
}
componentMetrics.containersDesired.set(n);
}
private void updateMetrics(ContainerStatus status) {
switch (status.getExitStatus()) {
case SUCCESS:
componentMetrics.containersSucceeded.incr();
scheduler.getServiceMetrics().containersSucceeded.incr();
return;
case PREEMPTED:
componentMetrics.containersPreempted.incr();
scheduler.getServiceMetrics().containersPreempted.incr();
break;
case DISKS_FAILED:
componentMetrics.containersDiskFailure.incr();
scheduler.getServiceMetrics().containersDiskFailure.incr();
break;
default:
break;
}
// containersFailed include preempted, disks_failed etc.
componentMetrics.containersFailed.incr();
scheduler.getServiceMetrics().containersFailed.incr();
if (Apps.shouldCountTowardsNodeBlacklisting(status.getExitStatus())) {
String host = scheduler.getLiveInstances().get(status.getContainerId())
.getNodeId().getHost();
failureTracker.incNodeFailure(host);
currentContainerFailure.getAndIncrement() ;
}
}
public boolean areDependenciesReady() {
List<String> dependencies = componentSpec.getDependencies();
if (ServiceUtils.isEmpty(dependencies)) {
return true;
}
for (String dependency : dependencies) {
Component dependentComponent =
scheduler.getAllComponents().get(dependency);
if (dependentComponent == null) {
LOG.error("Couldn't find dependency {} for {} (should never happen)",
dependency, getName());
continue;
}
if (dependentComponent.getNumReadyInstances() < dependentComponent
.getNumDesiredInstances()) {
LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}"
+ " instances are ready.", getName(), dependency,
dependentComponent.getNumReadyInstances(),
dependentComponent.getNumDesiredInstances());
return false;
}
}
return true;
}
public Map<String, String> getDependencyHostIpTokens() {
Map<String, String> tokens = new HashMap<>();
List<String> dependencies = componentSpec.getDependencies();
if (ServiceUtils.isEmpty(dependencies)) {
return tokens;
}
for (String dependency : dependencies) {
Collection<ComponentInstance> instances = scheduler.getAllComponents()
.get(dependency).getAllComponentInstances();
for (ComponentInstance instance : instances) {
if (instance.getContainerStatus() == null) {
continue;
}
if (ServiceUtils.isEmpty(instance.getContainerStatus().getIPs()) ||
ServiceUtils.isUnset(instance.getContainerStatus().getHost())) {
continue;
}
String ip = instance.getContainerStatus().getIPs().get(0);
String host = instance.getContainerStatus().getHost();
tokens.put(String.format(COMPONENT_INSTANCE_IP,
instance.getCompInstanceName().toUpperCase()), ip);
tokens.put(String.format(COMPONENT_INSTANCE_HOST,
instance.getCompInstanceName().toUpperCase()), host);
}
}
return tokens;
}
public void incRunningContainers() {
componentMetrics.containersRunning.incr();
scheduler.getServiceMetrics().containersRunning.incr();
}
public void decRunningContainers() {
componentMetrics.containersRunning.decr();
scheduler.getServiceMetrics().containersRunning.decr();
}
public void incContainersReady() {
componentMetrics.containersReady.incr();
scheduler.getServiceMetrics().containersReady.incr();
checkAndUpdateComponentState(this, true);
}
public void decContainersReady() {
componentMetrics.containersReady.decr();
scheduler.getServiceMetrics().containersReady.decr();
checkAndUpdateComponentState(this, false);
}
public int getNumReadyInstances() {
return componentMetrics.containersReady.value();
}
public int getNumRunningInstances() {
return componentMetrics.containersRunning.value();
}
public int getNumDesiredInstances() {
return componentMetrics.containersDesired.value();
}
public ComponentInstance getComponentInstance(String componentInstanceName) {
return compInstances.get(componentInstanceName);
}
public Collection<ComponentInstance> getAllComponentInstances() {
return compInstances.values();
}
public org.apache.hadoop.yarn.service.api.records.Component getComponentSpec() {
return this.componentSpec;
}
public void resetCompFailureCount() {
LOG.info("[COMPONENT {}]: Reset container failure count from {} to 0.",
getName(), currentContainerFailure.get());
currentContainerFailure.set(0);
failureTracker.resetContainerFailures();
}
public Probe getProbe() {
return probe;
}
public Priority getPriority() {
return priority;
}
public long getAllocateId() {
return allocateId;
}
public String getName () {
return componentSpec.getName();
}
public ComponentState getState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
public ServiceScheduler getScheduler() {
return scheduler;
}
@Override
public void handle(ComponentEvent event) {
try {
writeLock.lock();
ComponentState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error(MessageFormat.format("[COMPONENT {0}]: Invalid event {1} at {2}",
componentSpec.getName(), event.getType(), oldState), e);
}
if (oldState != getState()) {
LOG.info("[COMPONENT {}] Transitioned from {} to {} on {} event.",
componentSpec.getName(), oldState, getState(), event.getType());
}
} finally {
writeLock.unlock();
}
}
private static class BaseTransition implements
SingleArcTransition<Component, ComponentEvent> {
@Override public void transition(Component component,
ComponentEvent event) {
}
}
public ServiceContext getContext() {
return context;
}
// Only for testing
public List<ComponentInstance> getPendingInstances() {
return pendingInstances;
}
}