blob: c57d88827208ba7463e06ec7ce914af591c5c82e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component.instance;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.BoundedAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*;
import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
Comparable<ComponentInstance> {
private static final Logger LOG =
LoggerFactory.getLogger(ComponentInstance.class);
private StateMachine<ComponentInstanceState, ComponentInstanceEventType,
ComponentInstanceEvent> stateMachine;
private Component component;
private final ReadLock readLock;
private final WriteLock writeLock;
private ComponentInstanceId compInstanceId = null;
private Path compInstanceDir;
private Container container;
private YarnRegistryViewForProviders yarnRegistryOperations;
private FileSystem fs;
private boolean timelineServiceEnabled = false;
private ServiceTimelinePublisher serviceTimelinePublisher;
private ServiceScheduler scheduler;
private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
private volatile ScheduledFuture containerStatusFuture;
private volatile ContainerStatus status;
private long containerStartedTime = 0;
// This container object is used for rest API query
private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
private static final StateMachineFactory<ComponentInstance,
ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent>
stateMachineFactory =
new StateMachineFactory<ComponentInstance, ComponentInstanceState,
ComponentInstanceEventType, ComponentInstanceEvent>(INIT)
.addTransition(INIT, STARTED, START,
new ContainerStartedTransition())
.addTransition(INIT, INIT, STOP,
// container failed before launching, nothing to cleanup from registry
// This could happen if NMClient#startContainerAsync failed, container
// will be completed, but COMP_INSTANCE is still at INIT.
new ContainerStoppedTransition(true))
//From Running
.addTransition(STARTED, INIT, STOP,
new ContainerStoppedTransition())
.addTransition(STARTED, READY, BECOME_READY,
new ContainerBecomeReadyTransition())
// FROM READY
.addTransition(READY, STARTED, BECOME_NOT_READY,
new ContainerBecomeNotReadyTransition())
.addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
.installTopology();
public ComponentInstance(Component component,
ComponentInstanceId compInstanceId) {
this.stateMachine = stateMachineFactory.make(this);
this.component = component;
this.compInstanceId = compInstanceId;
this.scheduler = component.getScheduler();
this.yarnRegistryOperations =
component.getScheduler().getYarnRegistryOperations();
this.serviceTimelinePublisher =
component.getScheduler().getServiceTimelinePublisher();
if (YarnConfiguration
.timelineServiceV2Enabled(component.getScheduler().getConfig())) {
this.timelineServiceEnabled = true;
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.fs = scheduler.getContext().fs.getFileSystem();
}
private static class ContainerStartedTransition extends BaseTransition {
@Override public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
// Query container status for ip and host
compInstance.containerStatusFuture =
compInstance.scheduler.executorService.scheduleAtFixedRate(
new ContainerStatusRetriever(compInstance.scheduler,
event.getContainerId(), compInstance), 0, 1,
TimeUnit.SECONDS);
long containerStartTime = System.currentTimeMillis();
try {
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(compInstance.getContainer()
.getContainerToken());
containerStartTime = containerTokenIdentifier.getCreationTime();
} catch (Exception e) {
LOG.info("Could not get container creation time, using current time");
}
org.apache.hadoop.yarn.service.api.records.Container container =
new org.apache.hadoop.yarn.service.api.records.Container();
container.setId(event.getContainerId().toString());
container.setLaunchTime(new Date(containerStartTime));
container.setState(ContainerState.RUNNING_BUT_UNREADY);
container.setBareHost(compInstance.getNodeId().getHost());
container.setComponentInstanceName(compInstance.getCompInstanceName());
if (compInstance.containerSpec != null) {
// remove the previous container.
compInstance.getCompSpec().removeContainer(compInstance.containerSpec);
}
compInstance.containerSpec = container;
compInstance.getCompSpec().addContainer(container);
compInstance.containerStartedTime = containerStartTime;
compInstance.component.incRunningContainers();
if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher
.componentInstanceStarted(container, compInstance);
}
}
}
private static class ContainerBecomeReadyTransition extends BaseTransition {
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
compInstance.containerSpec.setState(ContainerState.READY);
compInstance.component.incContainersReady();
if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher
.componentInstanceBecomeReady(compInstance.containerSpec);
}
}
}
private static class ContainerBecomeNotReadyTransition extends BaseTransition {
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
compInstance.component.decContainersReady();
}
}
private static class ContainerStoppedTransition extends BaseTransition {
// whether the container failed before launched by AM or not.
boolean failedBeforeLaunching = false;
public ContainerStoppedTransition(boolean failedBeforeLaunching) {
this.failedBeforeLaunching = failedBeforeLaunching;
}
public ContainerStoppedTransition() {
this(false);
}
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
// re-ask the failed container.
Component comp = compInstance.component;
comp.requestContainers(1);
String containerDiag =
compInstance.getCompInstanceId() + ": " + event.getStatus()
.getDiagnostics();
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
compInstance.cancelContainerStatusRetriever();
if (compInstance.getState().equals(READY)) {
compInstance.component.decContainersReady();
}
compInstance.component.decRunningContainers();
boolean shouldExit = false;
// check if it exceeds the failure threshold
if (comp.currentContainerFailure.get() > comp.maxContainerFailurePerComp) {
String exitDiag = MessageFormat.format(
"[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... "
+ System.lineSeparator(),
comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
compInstance.diagnostics.append(exitDiag);
// append to global diagnostics that will be reported to RM.
comp.getScheduler().getDiagnostics().append(containerDiag);
comp.getScheduler().getDiagnostics().append(exitDiag);
LOG.warn(exitDiag);
shouldExit = true;
}
if (!failedBeforeLaunching) {
// clean up registry
// If the container failed before launching, no need to cleanup registry,
// because it was not registered before.
// hdfs dir content will be overwritten when a new container gets started,
// so no need remove.
compInstance.scheduler.executorService
.submit(() -> compInstance.cleanupRegistry(event.getContainerId()));
if (compInstance.timelineServiceEnabled) {
// record in ATS
compInstance.serviceTimelinePublisher
.componentInstanceFinished(event.getContainerId(),
event.getStatus().getExitStatus(), containerDiag);
}
compInstance.containerSpec.setState(ContainerState.STOPPED);
}
// remove the failed ContainerId -> CompInstance mapping
comp.getScheduler().removeLiveCompInstance(event.getContainerId());
comp.reInsertPendingInstance(compInstance);
LOG.info(compInstance.getCompInstanceId()
+ ": {} completed. Reinsert back to pending list and requested " +
"a new container." + System.lineSeparator() +
" exitStatus={}, diagnostics={}.",
event.getContainerId(), event.getStatus().getExitStatus(),
event.getStatus().getDiagnostics());
if (shouldExit) {
// Sleep for 5 seconds in hope that the state can be recorded in ATS.
// in case there's a client polling the comp state, it can be notified.
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.error("Interrupted on sleep while exiting.", e);
}
ExitUtil.terminate(-1);
}
}
}
public ComponentInstanceState getState() {
this.readLock.lock();
try {
return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
}
@Override
public void handle(ComponentInstanceEvent event) {
try {
writeLock.lock();
ComponentInstanceState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error(getCompInstanceId() + ": Invalid event " + event.getType() +
" at " + oldState, e);
}
if (oldState != getState()) {
LOG.info(getCompInstanceId() + " Transitioned from " + oldState + " to "
+ getState() + " on " + event.getType() + " event");
}
} finally {
writeLock.unlock();
}
}
public void setContainer(Container container) {
this.container = container;
this.compInstanceId.setContainerId(container.getId());
}
public String getCompInstanceName() {
return compInstanceId.getCompInstanceName();
}
public ContainerStatus getContainerStatus() {
return status;
}
public void updateContainerStatus(ContainerStatus status) {
this.status = status;
org.apache.hadoop.yarn.service.api.records.Container container =
getCompSpec().getContainer(status.getContainerId().toString());
if (container != null) {
container.setIp(StringUtils.join(",", status.getIPs()));
container.setHostname(status.getHost());
if (timelineServiceEnabled) {
serviceTimelinePublisher.componentInstanceIPHostUpdated(container);
}
}
updateServiceRecord(yarnRegistryOperations, status);
}
public String getCompName() {
return compInstanceId.getCompName();
}
public void setCompInstanceDir(Path dir) {
this.compInstanceDir = dir;
}
public Component getComponent() {
return component;
}
public Container getContainer() {
return container;
}
public ComponentInstanceId getCompInstanceId() {
return compInstanceId;
}
public NodeId getNodeId() {
return this.container.getNodeId();
}
public org.apache.hadoop.yarn.service.api.records.Component getCompSpec() {
return component.getComponentSpec();
}
private static class BaseTransition implements
SingleArcTransition<ComponentInstance, ComponentInstanceEvent> {
@Override public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
}
}
public ProbeStatus ping() {
if (component.getProbe() == null) {
ProbeStatus status = new ProbeStatus();
status.setSuccess(true);
return status;
}
return component.getProbe().ping(this);
}
// Write service record into registry
private void updateServiceRecord(
YarnRegistryViewForProviders yarnRegistry, ContainerStatus status) {
ServiceRecord record = new ServiceRecord();
String containerId = status.getContainerId().toString();
record.set(YARN_ID, containerId);
record.description = getCompInstanceName();
record.set(YARN_PERSISTENCE, PersistencePolicies.CONTAINER);
record.set(YARN_IP, status.getIPs().get(0));
record.set(YARN_HOSTNAME, status.getHost());
record.set(YARN_COMPONENT, component.getName());
try {
yarnRegistry
.putComponent(RegistryPathUtils.encodeYarnID(containerId), record);
} catch (IOException e) {
LOG.error(
"Failed to update service record in registry: " + containerId + "");
}
}
// Called when user flexed down the container and ContainerStoppedTransition
// is not executed in this case.
// Release the container, dec running,
// cleanup registry, hdfs dir, and send record to ATS
public void destroy() {
LOG.info(getCompInstanceId() + ": Flexed down by user, destroying.");
diagnostics.append(getCompInstanceId() + ": Flexed down by user");
// update metrics
if (getState() == STARTED) {
component.decRunningContainers();
}
if (getState() == READY) {
component.decContainersReady();
component.decRunningContainers();
}
getCompSpec().removeContainer(containerSpec);
if (container == null) {
LOG.info(getCompInstanceId() + " no container is assigned when " +
"destroying");
return;
}
ContainerId containerId = container.getId();
scheduler.removeLiveCompInstance(containerId);
component.getScheduler().getAmRMClient()
.releaseAssignedContainer(containerId);
if (timelineServiceEnabled) {
serviceTimelinePublisher.componentInstanceFinished(containerId,
KILLED_BY_APPMASTER, diagnostics.toString());
}
cancelContainerStatusRetriever();
scheduler.executorService.submit(() ->
cleanupRegistryAndCompHdfsDir(containerId));
}
private void cleanupRegistry(ContainerId containerId) {
String cid = RegistryPathUtils.encodeYarnID(containerId.toString());
try {
yarnRegistryOperations.deleteComponent(getCompInstanceId(), cid);
} catch (IOException e) {
LOG.error(getCompInstanceId() + ": Failed to delete registry", e);
}
}
//TODO Maybe have a dedicated cleanup service.
public void cleanupRegistryAndCompHdfsDir(ContainerId containerId) {
cleanupRegistry(containerId);
try {
if (compInstanceDir != null && fs.exists(compInstanceDir)) {
boolean deleted = fs.delete(compInstanceDir, true);
if (!deleted) {
LOG.error(getCompInstanceId()
+ ": Failed to delete component instance dir: "
+ compInstanceDir);
} else {
LOG.info(getCompInstanceId() + ": Deleted component instance dir: "
+ compInstanceDir);
}
}
} catch (IOException e) {
LOG.warn(getCompInstanceId() + ": Failed to delete directory", e);
}
}
// Query container status until ip and hostname are available and update
// the service record into registry service
private static class ContainerStatusRetriever implements Runnable {
private ContainerId containerId;
private NodeId nodeId;
private NMClient nmClient;
private ComponentInstance instance;
ContainerStatusRetriever(ServiceScheduler scheduler,
ContainerId containerId, ComponentInstance instance) {
this.containerId = containerId;
this.nodeId = instance.getNodeId();
this.nmClient = scheduler.getNmClient().getClient();
this.instance = instance;
}
@Override public void run() {
ContainerStatus status = null;
try {
status = nmClient.getContainerStatus(containerId, nodeId);
} catch (Exception e) {
if (e instanceof YarnException) {
throw new YarnRuntimeException(
instance.compInstanceId + " Failed to get container status on "
+ nodeId + " , cancelling.", e);
}
LOG.error(instance.compInstanceId + " Failed to get container status on "
+ nodeId + ", will try again", e);
return;
}
if (ServiceUtils.isEmpty(status.getIPs()) || ServiceUtils
.isUnset(status.getHost())) {
return;
}
instance.updateContainerStatus(status);
LOG.info(
instance.compInstanceId + " IP = " + status.getIPs() + ", host = "
+ status.getHost() + ", cancel container status retriever");
instance.containerStatusFuture.cancel(false);
}
}
private void cancelContainerStatusRetriever() {
if (containerStatusFuture != null && !containerStatusFuture.isDone()) {
containerStatusFuture.cancel(true);
}
}
public String getHostname() {
String domain = getComponent().getScheduler().getConfig()
.get(RegistryConstants.KEY_DNS_DOMAIN);
String hostname;
if (domain == null || domain.isEmpty()) {
hostname = MessageFormat
.format("{0}.{1}.{2}", getCompInstanceName(),
getComponent().getContext().service.getName(),
RegistryUtils.currentUser());
} else {
hostname = MessageFormat
.format("{0}.{1}.{2}.{3}", getCompInstanceName(),
getComponent().getContext().service.getName(),
RegistryUtils.currentUser(), domain);
}
return hostname;
}
@Override
public int compareTo(ComponentInstance to) {
long delta = containerStartedTime - to.containerStartedTime;
if (delta == 0) {
return getCompInstanceId().compareTo(to.getCompInstanceId());
} else if (delta < 0) {
return -1;
} else {
return 1;
}
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ComponentInstance instance = (ComponentInstance) o;
if (containerStartedTime != instance.containerStartedTime)
return false;
return compInstanceId.equals(instance.compInstanceId);
}
@Override public int hashCode() {
int result = compInstanceId.hashCode();
result = 31 * result + (int) (containerStartedTime ^ (containerStartedTime
>>> 32));
return result;
}
}