blob: 6c73ebb8d673179ef1267d03109afb2b8a1e10d3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.timelineservice;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.*;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import static org.apache.hadoop.yarn.service.api.records.ContainerState.READY;
import static org.apache.hadoop.yarn.service.api.records.ContainerState.STOPPED;
import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO;
/**
* A single service that publishes all the Timeline Entities.
*/
public class ServiceTimelinePublisher extends CompositeService {
// Number of bytes of config which can be published in one shot to ATSv2.
public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024;
private TimelineV2Client timelineClient;
private volatile boolean stopped = false;
private static final Logger log =
LoggerFactory.getLogger(ServiceTimelinePublisher.class);
@Override
protected void serviceInit(org.apache.hadoop.conf.Configuration configuration)
throws Exception {
addService(timelineClient);
super.serviceInit(configuration);
}
@Override
protected void serviceStop() throws Exception {
stopped = true;
super.serviceStop();
}
public boolean isStopped() {
return stopped;
}
public ServiceTimelinePublisher(TimelineV2Client client) {
super(ServiceTimelinePublisher.class.getName());
timelineClient = client;
}
public void serviceAttemptRegistered(Service service,
org.apache.hadoop.conf.Configuration systemConf) {
long currentTimeMillis = service.getLaunchTime() == null
? System.currentTimeMillis() : service.getLaunchTime().getTime();
TimelineEntity entity = createServiceAttemptEntity(service.getId());
entity.setCreatedTime(currentTimeMillis);
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.NAME, service.getName());
entityInfos.put(ServiceTimelineMetricsConstants.STATE,
ServiceState.STARTED.toString());
entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_TIME,
currentTimeMillis);
entity.addInfo(ServiceTimelineMetricsConstants.QUICK_LINKS,
service.getQuicklinks());
entity.addInfo(entityInfos);
// add an event
TimelineEvent startEvent = new TimelineEvent();
startEvent.setId(ServiceTimelineEvent.SERVICE_ATTEMPT_REGISTERED.toString());
startEvent.setTimestamp(currentTimeMillis);
entity.addEvent(startEvent);
// publish before configurations published
putEntity(entity);
// publish system config - YarnConfiguration
populateTimelineEntity(systemConf.iterator(), service.getId(),
ServiceTimelineEntityType.SERVICE_ATTEMPT.toString());
// publish container conf
publishContainerConf(service.getConfiguration(), service.getId(),
ServiceTimelineEntityType.SERVICE_ATTEMPT.toString());
// publish component as separate entity.
publishComponents(service.getComponents());
}
public void serviceAttemptUpdated(Service service) {
TimelineEntity entity = createServiceAttemptEntity(service.getId());
entity.addInfo(ServiceTimelineMetricsConstants.QUICK_LINKS,
service.getQuicklinks());
putEntity(entity);
}
public void serviceAttemptUnregistered(ServiceContext context,
String diagnostics) {
TimelineEntity entity = createServiceAttemptEntity(
context.attemptId.getApplicationId().toString());
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.STATE,
FinalApplicationStatus.ENDED);
entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
entity.addInfo(entityInfos);
// add an event
TimelineEvent finishEvent = new TimelineEvent();
finishEvent
.setId(ServiceTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString());
finishEvent.setTimestamp(System.currentTimeMillis());
entity.addEvent(finishEvent);
putEntity(entity);
}
public void componentInstanceStarted(Container container,
ComponentInstance instance) {
TimelineEntity entity = createComponentInstanceEntity(container.getId());
entity.setCreatedTime(container.getLaunchTime().getTime());
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.BARE_HOST,
container.getBareHost());
entityInfos.put(ServiceTimelineMetricsConstants.STATE,
container.getState().toString());
entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_TIME,
container.getLaunchTime().getTime());
entityInfos.put(ServiceTimelineMetricsConstants.COMPONENT_NAME,
instance.getCompName());
entityInfos.put(ServiceTimelineMetricsConstants.COMPONENT_INSTANCE_NAME,
instance.getCompInstanceName());
entity.addInfo(entityInfos);
// add an event
TimelineEvent startEvent = new TimelineEvent();
startEvent
.setId(ServiceTimelineEvent.COMPONENT_INSTANCE_REGISTERED.toString());
startEvent.setTimestamp(container.getLaunchTime().getTime());
entity.addEvent(startEvent);
putEntity(entity);
}
public void componentInstanceFinished(ContainerId containerId,
int exitCode, String diagnostics) {
TimelineEntity entity = createComponentInstanceEntity(
containerId.toString());
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
exitCode);
entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
entityInfos.put(ServiceTimelineMetricsConstants.STATE, STOPPED);
entity.addInfo(entityInfos);
// add an event
TimelineEvent startEvent = new TimelineEvent();
startEvent
.setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UNREGISTERED.toString());
startEvent.setTimestamp(System.currentTimeMillis());
entity.addEvent(startEvent);
putEntity(entity);
}
public void componentInstanceIPHostUpdated(Container container) {
TimelineEntity entity = createComponentInstanceEntity(container.getId());
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
entityInfos.put(ServiceTimelineMetricsConstants.IP, container.getIp());
entityInfos.put(ServiceTimelineMetricsConstants.HOSTNAME,
container.getHostname());
entityInfos.put(ServiceTimelineMetricsConstants.STATE,
container.getState().toString());
entity.addInfo(entityInfos);
TimelineEvent updateEvent = new TimelineEvent();
updateEvent.setId(ServiceTimelineEvent.COMPONENT_INSTANCE_IP_HOST_UPDATE
.toString());
updateEvent.setTimestamp(System.currentTimeMillis());
entity.addEvent(updateEvent);
putEntity(entity);
}
public void componentInstanceBecomeReady(Container container) {
TimelineEntity entity = createComponentInstanceEntity(container.getId());
Map<String, Object> entityInfo = new HashMap<>();
entityInfo.put(ServiceTimelineMetricsConstants.STATE, READY);
entity.addInfo(entityInfo);
TimelineEvent updateEvent = new TimelineEvent();
updateEvent.setId(ServiceTimelineEvent.COMPONENT_INSTANCE_BECOME_READY
.toString());
updateEvent.setTimestamp(System.currentTimeMillis());
entity.addEvent(updateEvent);
putEntity(entity);
}
private void publishComponents(List<Component> components) {
long currentTimeMillis = System.currentTimeMillis();
for (Component component : components) {
TimelineEntity entity = createComponentEntity(component.getName());
entity.setCreatedTime(currentTimeMillis);
// create info keys
Map<String, Object> entityInfos = new HashMap<String, Object>();
if (component.getArtifact() != null) {
entityInfos.put(ServiceTimelineMetricsConstants.ARTIFACT_ID,
component.getArtifact().getId());
entityInfos.put(ServiceTimelineMetricsConstants.ARTIFACT_TYPE,
component.getArtifact().getType().toString());
}
if (component.getResource() != null) {
entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_CPU,
component.getResource().getCpus());
entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_MEMORY,
component.getResource().getMemory());
if (component.getResource().getProfile() != null) {
entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_PROFILE,
component.getResource().getProfile());
}
}
if (component.getLaunchCommand() != null) {
entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_COMMAND,
component.getLaunchCommand());
}
entityInfos.put(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER,
component.getRunPrivilegedContainer().toString());
entity.addInfo(entityInfos);
putEntity(entity);
// publish container specific configurations
publishContainerConf(component.getConfiguration(), component.getName(),
ServiceTimelineEntityType.COMPONENT.toString());
}
}
private void publishContainerConf(Configuration configuration,
String entityId, String entityType) {
populateTimelineEntity(configuration.getEnv().entrySet().iterator(),
entityId, entityType);
for (ConfigFile configFile : configuration.getFiles()) {
populateTimelineEntity(configFile.getProperties().entrySet().iterator(),
entityId, entityType);
}
}
private void populateTimelineEntity(Iterator<Entry<String, String>> iterator,
String entityId, String entityType) {
int configSize = 0;
TimelineEntity entity = createTimelineEntity(entityId, entityType);
while (iterator.hasNext()) {
Entry<String, String> entry = iterator.next();
int size = entry.getKey().length() + entry.getValue().length();
configSize += size;
// Configs are split into multiple entities if they exceed 100kb in size.
if (configSize > ATS_CONFIG_PUBLISH_SIZE_BYTES) {
if (entity.getConfigs().size() > 0) {
putEntity(entity);
entity = createTimelineEntity(entityId, entityType);
}
configSize = size;
}
entity.addConfig(entry.getKey(), entry.getValue());
}
if (configSize > 0) {
putEntity(entity);
}
}
/**
* Called from ServiceMetricsSink at regular interval of time.
* @param metrics of service or components
* @param entityId Id of entity
* @param entityType Type of entity
* @param timestamp
*/
public void publishMetrics(Iterable<AbstractMetric> metrics, String entityId,
String entityType, long timestamp) {
TimelineEntity entity = createTimelineEntity(entityId, entityType);
Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>();
for (AbstractMetric metric : metrics) {
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setId(metric.name());
timelineMetric.addValue(timestamp, metric.value());
entityMetrics.add(timelineMetric);
}
entity.setMetrics(entityMetrics);
putEntity(entity);
}
private TimelineEntity createServiceAttemptEntity(String serviceId) {
TimelineEntity entity = createTimelineEntity(serviceId,
ServiceTimelineEntityType.SERVICE_ATTEMPT.toString());
return entity;
}
private TimelineEntity createComponentInstanceEntity(String instanceId) {
TimelineEntity entity = createTimelineEntity(instanceId,
ServiceTimelineEntityType.COMPONENT_INSTANCE.toString());
return entity;
}
private TimelineEntity createComponentEntity(String componentId) {
TimelineEntity entity = createTimelineEntity(componentId,
ServiceTimelineEntityType.COMPONENT.toString());
return entity;
}
private TimelineEntity createTimelineEntity(String entityId,
String entityType) {
TimelineEntity entity = new TimelineEntity();
entity.setId(entityId);
entity.setType(entityType);
return entity;
}
private void putEntity(TimelineEntity entity) {
try {
if (log.isDebugEnabled()) {
log.debug("Publishing the entity " + entity + ", JSON-style content: "
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
}
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
log.error("Seems like client has been removed before the entity "
+ "could be published for " + entity);
}
} catch (Exception e) {
log.error("Error when publishing entity " + entity, e);
}
}
}