blob: 8202431459dcc3d8ecc4b8a5e82d34a69c952755 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.collector;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Service that handles writes to the timeline service and writes them to the
* backing storage.
*
* Classes that extend this can add their own lifecycle management or
* customization of request handling.
*/
@Private
@Unstable
public abstract class TimelineCollector extends CompositeService {
private static final Logger LOG =
LoggerFactory.getLogger(TimelineCollector.class);
public static final String SEPARATOR = "_";
private TimelineWriter writer;
private ConcurrentMap<String, AggregationStatusTable> aggregationGroups
= new ConcurrentHashMap<>();
private static Set<String> entityTypesSkipAggregation
= new HashSet<>();
private volatile boolean readyToAggregate = false;
private volatile boolean isStopped = false;
public TimelineCollector(String name) {
super(name);
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
isStopped = true;
super.serviceStop();
}
boolean isStopped() {
return isStopped;
}
protected void setWriter(TimelineWriter w) {
this.writer = w;
}
protected Map<String, AggregationStatusTable> getAggregationGroups() {
return aggregationGroups;
}
protected void setReadyToAggregate() {
readyToAggregate = true;
}
protected boolean isReadyToAggregate() {
return readyToAggregate;
}
/**
* Method to decide the set of timeline entity types the collector should
* skip on aggregations. Subclasses may want to override this method to
* customize their own behaviors.
*
* @return A set of strings consists of all types the collector should skip.
*/
protected Set<String> getEntityTypesSkipAggregation() {
return entityTypesSkipAggregation;
}
public abstract TimelineCollectorContext getTimelineEntityContext();
/**
* Handles entity writes. These writes are synchronous and are written to the
* backing storage without buffering/batching. If any entity already exists,
* it results in an update of the entity.
*
* This method should be reserved for selected critical entities and events.
* For normal voluminous writes one should use the async method
* {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}.
*
* @param entities entities to post
* @param callerUgi the caller UGI
* @return the response that contains the result of the post.
* @throws IOException if there is any exception encountered while putting
* entities.
*/
public TimelineWriteResponse putEntities(TimelineEntities entities,
UserGroupInformation callerUgi) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("putEntities(entities=" + entities + ", callerUgi="
+ callerUgi + ")");
}
TimelineWriteResponse response;
// synchronize on the writer object so that no other threads can
// flush the writer buffer concurrently and swallow any exception
// caused by the timeline enitites that are being put here.
synchronized (writer) {
response = writeTimelineEntities(entities, callerUgi);
flushBufferedTimelineEntities();
}
return response;
}
private TimelineWriteResponse writeTimelineEntities(
TimelineEntities entities, UserGroupInformation callerUgi)
throws IOException {
// Update application metrics for aggregation
updateAggregateStatus(entities, aggregationGroups,
getEntityTypesSkipAggregation());
final TimelineCollectorContext context = getTimelineEntityContext();
return writer.write(context, entities, callerUgi);
}
/**
* Flush buffered timeline entities, if any.
* @throws IOException if there is any exception encountered while
* flushing buffered entities.
*/
private void flushBufferedTimelineEntities() throws IOException {
writer.flush();
}
/**
* Handles entity writes in an asynchronous manner. The method returns as soon
* as validation is done. No promises are made on how quickly it will be
* written to the backing storage or if it will always be written to the
* backing storage. Multiple writes to the same entities may be batched and
* appropriate values updated and result in fewer writes to the backing
* storage.
*
* @param entities entities to post
* @param callerUgi the caller UGI
* @throws IOException if there is any exception encounted while putting
* entities.
*/
public void putEntitiesAsync(TimelineEntities entities,
UserGroupInformation callerUgi) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
}
writeTimelineEntities(entities, callerUgi);
}
/**
* Aggregate all metrics in given timeline entities with no predefined states.
*
* @param entities Entities to aggregate
* @param resultEntityId Id of the result entity
* @param resultEntityType Type of the result entity
* @param needsGroupIdInResult Marks if we want the aggregation group id in
* each aggregated metrics.
* @return A timeline entity that contains all aggregated TimelineMetric.
*/
public static TimelineEntity aggregateEntities(
TimelineEntities entities, String resultEntityId,
String resultEntityType, boolean needsGroupIdInResult) {
ConcurrentMap<String, AggregationStatusTable> aggregationGroups
= new ConcurrentHashMap<>();
updateAggregateStatus(entities, aggregationGroups, null);
if (needsGroupIdInResult) {
return aggregate(aggregationGroups, resultEntityId, resultEntityType);
} else {
return aggregateWithoutGroupId(
aggregationGroups, resultEntityId, resultEntityType);
}
}
/**
* Update the aggregation status table for a timeline collector.
*
* @param entities Entities to update
* @param aggregationGroups Aggregation status table
* @param typesToSkip Entity types that we can safely assume to skip updating
*/
static void updateAggregateStatus(
TimelineEntities entities,
ConcurrentMap<String, AggregationStatusTable> aggregationGroups,
Set<String> typesToSkip) {
for (TimelineEntity e : entities.getEntities()) {
if ((typesToSkip != null && typesToSkip.contains(e.getType()))
|| e.getMetrics().isEmpty()) {
continue;
}
AggregationStatusTable aggrTable = aggregationGroups.get(e.getType());
if (aggrTable == null) {
AggregationStatusTable table = new AggregationStatusTable();
aggrTable = aggregationGroups.putIfAbsent(e.getType(),
table);
if (aggrTable == null) {
aggrTable = table;
}
}
aggrTable.update(e);
}
}
/**
* Aggregate internal status and generate timeline entities for the
* aggregation results.
*
* @param aggregationGroups Aggregation status table
* @param resultEntityId Id of the result entity
* @param resultEntityType Type of the result entity
* @return A timeline entity that contains all aggregated TimelineMetric.
*/
static TimelineEntity aggregate(
Map<String, AggregationStatusTable> aggregationGroups,
String resultEntityId, String resultEntityType) {
TimelineEntity result = new TimelineEntity();
result.setId(resultEntityId);
result.setType(resultEntityType);
for (Map.Entry<String, AggregationStatusTable> entry
: aggregationGroups.entrySet()) {
entry.getValue().aggregateAllTo(result, entry.getKey());
}
return result;
}
/**
* Aggregate internal status and generate timeline entities for the
* aggregation results. The result metrics will not have aggregation group
* information.
*
* @param aggregationGroups Aggregation status table
* @param resultEntityId Id of the result entity
* @param resultEntityType Type of the result entity
* @return A timeline entity that contains all aggregated TimelineMetric.
*/
static TimelineEntity aggregateWithoutGroupId(
Map<String, AggregationStatusTable> aggregationGroups,
String resultEntityId, String resultEntityType) {
TimelineEntity result = new TimelineEntity();
result.setId(resultEntityId);
result.setType(resultEntityType);
for (Map.Entry<String, AggregationStatusTable> entry
: aggregationGroups.entrySet()) {
entry.getValue().aggregateAllTo(result, "");
}
return result;
}
// Note: In memory aggregation is performed in an eventually consistent
// fashion.
protected static class AggregationStatusTable {
// On aggregation, for each metric, aggregate all per-entity accumulated
// metrics. We only use the id and type for TimelineMetrics in the key set
// of this table.
private ConcurrentMap<TimelineMetric, Map<String, TimelineMetric>>
aggregateTable;
public AggregationStatusTable() {
aggregateTable = new ConcurrentHashMap<>();
}
public void update(TimelineEntity incoming) {
String entityId = incoming.getId();
for (TimelineMetric m : incoming.getMetrics()) {
// Skip if the metric does not need aggregation
if (m.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
continue;
}
// Update aggregateTable
Map<String, TimelineMetric> aggrRow = aggregateTable.get(m);
if (aggrRow == null) {
Map<String, TimelineMetric> tempRow = new HashMap<>();
aggrRow = aggregateTable.putIfAbsent(m, tempRow);
if (aggrRow == null) {
aggrRow = tempRow;
}
}
synchronized (aggrRow) {
aggrRow.put(entityId, m);
}
}
}
public TimelineEntity aggregateTo(TimelineMetric metric, TimelineEntity e,
String aggregationGroupId) {
if (metric.getRealtimeAggregationOp() == TimelineMetricOperation.NOP) {
return e;
}
Map<String, TimelineMetric> aggrRow = aggregateTable.get(metric);
if (aggrRow != null) {
TimelineMetric aggrMetric = new TimelineMetric();
if (aggregationGroupId.length() > 0) {
aggrMetric.setId(metric.getId() + SEPARATOR + aggregationGroupId);
} else {
aggrMetric.setId(metric.getId());
}
aggrMetric.setRealtimeAggregationOp(TimelineMetricOperation.NOP);
Map<Object, Object> status = new HashMap<>();
synchronized (aggrRow) {
for (TimelineMetric m : aggrRow.values()) {
TimelineMetric.aggregateTo(m, aggrMetric, status);
// getRealtimeAggregationOp returns an enum so we can directly
// compare with "!=".
if (m.getRealtimeAggregationOp()
!= aggrMetric.getRealtimeAggregationOp()) {
aggrMetric.setRealtimeAggregationOp(m.getRealtimeAggregationOp());
}
}
aggrRow.clear();
}
Set<TimelineMetric> metrics = e.getMetrics();
metrics.remove(aggrMetric);
metrics.add(aggrMetric);
}
return e;
}
public TimelineEntity aggregateAllTo(TimelineEntity e,
String aggregationGroupId) {
for (TimelineMetric m : aggregateTable.keySet()) {
aggregateTo(m, e, aggregationGroupId);
}
return e;
}
}
}