blob: 3285b06ad9407616fb8fa854c9a5f11bf6c8281d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter.TimelineFilterType;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* A bunch of utility functions used across TimelineReader and TimelineWriter.
*/
@Public
@Unstable
public final class TimelineStorageUtils {
private TimelineStorageUtils() {
}
private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class);
/** milliseconds in one day. */
public static final long MILLIS_ONE_DAY = 86400000L;
/**
* Converts an int into it's inverse int to be used in (row) keys
* where we want to have the largest int value in the top of the table
* (scans start at the largest int first).
*
* @param key value to be inverted so that the latest version will be first in
* a scan.
* @return inverted int
*/
public static int invertInt(int key) {
return Integer.MAX_VALUE - key;
}
/**
* returns the timestamp of that day's start (which is midnight 00:00:00 AM)
* for a given input timestamp.
*
* @param ts Timestamp.
* @return timestamp of that day's beginning (midnight)
*/
public static long getTopOfTheDayTimestamp(long ts) {
long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
return dayTimestamp;
}
/**
* Combines the input array of attributes and the input aggregation operation
* into a new array of attributes.
*
* @param attributes Attributes to be combined.
* @param aggOp Aggregation operation.
* @return array of combined attributes.
*/
public static Attribute[] combineAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
Attribute[] combinedAttributes = new Attribute[newLength];
if (attributes != null) {
System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
}
if (aggOp != null) {
Attribute a2 = aggOp.getAttribute();
combinedAttributes[newLength - 1] = a2;
}
return combinedAttributes;
}
/**
* Returns a number for the new array size. The new array is the combination
* of input array of attributes and the input aggregation operation.
*
* @param attributes Attributes.
* @param aggOp Aggregation operation.
* @return the size for the new array
*/
private static int getNewLengthCombinedAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int oldLength = getAttributesLength(attributes);
int aggLength = getAppOpLength(aggOp);
return oldLength + aggLength;
}
private static int getAppOpLength(AggregationOperation aggOp) {
if (aggOp != null) {
return 1;
}
return 0;
}
private static int getAttributesLength(Attribute[] attributes) {
if (attributes != null) {
return attributes.length;
}
return 0;
}
/**
* Returns the first seen aggregation operation as seen in the list of input
* tags or null otherwise.
*
* @param tags list of HBase tags.
* @return AggregationOperation
*/
public static AggregationOperation getAggregationOperationFromTagsList(
List<Tag> tags) {
for (AggregationOperation aggOp : AggregationOperation.values()) {
for (Tag tag : tags) {
if (tag.getType() == aggOp.getTagType()) {
return aggOp;
}
}
}
return null;
}
/**
* Creates a {@link Tag} from the input attribute.
*
* @param attribute Attribute from which tag has to be fetched.
* @return a HBase Tag.
*/
public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
// attribute could be either an Aggregation Operation or
// an Aggregation Dimension
// Get the Tag type from either
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
return t;
}
AggregationCompactionDimension aggCompactDim =
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
}
/**
* Matches key-values filter. Used for relatesTo/isRelatedTo filters.
*
* @param entity entity which holds relatesTo/isRelatedTo relations which we
* will match against.
* @param keyValuesFilter key-values filter.
* @param entityFiltersType type of filters we are trying to match.
* @return true, if filter matches, false otherwise.
*/
private static boolean matchKeyValuesFilter(TimelineEntity entity,
TimelineKeyValuesFilter keyValuesFilter,
TimelineEntityFiltersType entityFiltersType) {
Map<String, Set<String>> relations = null;
if (entityFiltersType == TimelineEntityFiltersType.IS_RELATED_TO) {
relations = entity.getIsRelatedToEntities();
} else if (entityFiltersType == TimelineEntityFiltersType.RELATES_TO) {
relations = entity.getRelatesToEntities();
}
if (relations == null) {
return false;
}
Set<String> ids = relations.get(keyValuesFilter.getKey());
if (ids == null) {
return false;
}
boolean matched = false;
for (Object id : keyValuesFilter.getValues()) {
// Matches if id is found amongst the relationships for an entity and
// filter's compare op is EQUAL.
// If compare op is NOT_EQUAL, for a match to occur, id should not be
// found amongst relationships for an entity.
matched = !(ids.contains(id) ^
keyValuesFilter.getCompareOp() == TimelineCompareOp.EQUAL);
if (!matched) {
return false;
}
}
return true;
}
/**
* Matches relatesto.
*
* @param entity entity which holds relatesto relations.
* @param relatesTo the relations for filtering.
* @return true, if filter matches, false otherwise.
* @throws IOException if an unsupported filter for matching relations is
* being matched.
*/
public static boolean matchRelatesTo(TimelineEntity entity,
TimelineFilterList relatesTo) throws IOException {
return matchFilters(
entity, relatesTo, TimelineEntityFiltersType.RELATES_TO);
}
/**
* Matches isrelatedto.
*
* @param entity entity which holds isRelatedTo relations.
* @param isRelatedTo the relations for filtering.
* @return true, if filter matches, false otherwise.
* @throws IOException if an unsupported filter for matching relations is
* being matched.
*/
public static boolean matchIsRelatedTo(TimelineEntity entity,
TimelineFilterList isRelatedTo) throws IOException {
return matchFilters(
entity, isRelatedTo, TimelineEntityFiltersType.IS_RELATED_TO);
}
/**
* Matches key-value filter. Used for config and info filters.
*
* @param entity entity which holds the config/info which we will match
* against.
* @param kvFilter a key-value filter.
* @param entityFiltersType type of filters we are trying to match.
* @return true, if filter matches, false otherwise.
*/
private static boolean matchKeyValueFilter(TimelineEntity entity,
TimelineKeyValueFilter kvFilter,
TimelineEntityFiltersType entityFiltersType) {
Map<String, ? extends Object> map = null;
// Supported only for config and info filters.
if (entityFiltersType == TimelineEntityFiltersType.CONFIG) {
map = entity.getConfigs();
} else if (entityFiltersType == TimelineEntityFiltersType.INFO) {
map = entity.getInfo();
}
if (map == null) {
return false;
}
Object value = map.get(kvFilter.getKey());
if (value == null) {
return false;
}
// Matches if filter's value is equal to the value of the key and filter's
// compare op is EQUAL.
// If compare op is NOT_EQUAL, for a match to occur, value should not be
// equal to the value of the key.
return !(value.equals(kvFilter.getValue()) ^
kvFilter.getCompareOp() == TimelineCompareOp.EQUAL);
}
/**
* Matches config filters.
*
* @param entity entity which holds a map of config key-value pairs.
* @param configFilters list of info filters.
* @return a boolean flag to indicate if both match.
* @throws IOException if an unsupported filter for matching config filters is
* being matched.
*/
public static boolean matchConfigFilters(TimelineEntity entity,
TimelineFilterList configFilters) throws IOException {
return
matchFilters(entity, configFilters, TimelineEntityFiltersType.CONFIG);
}
/**
* Matches info filters.
*
* @param entity entity which holds a map of info key-value pairs.
* @param infoFilters list of info filters.
* @return a boolean flag to indicate if both match.
* @throws IOException if an unsupported filter for matching info filters is
* being matched.
*/
public static boolean matchInfoFilters(TimelineEntity entity,
TimelineFilterList infoFilters) throws IOException {
return matchFilters(entity, infoFilters, TimelineEntityFiltersType.INFO);
}
/**
* Matches exists filter. Used for event filters.
*
* @param entity entity which holds the events which we will match against.
* @param existsFilter exists filter.
* @param entityFiltersType type of filters we are trying to match.
* @return true, if filter matches, false otherwise.
*/
private static boolean matchExistsFilter(TimelineEntity entity,
TimelineExistsFilter existsFilter,
TimelineEntityFiltersType entityFiltersType) {
// Currently exists filter is only supported for event filters.
if (entityFiltersType != TimelineEntityFiltersType.EVENT) {
return false;
}
Set<String> eventIds = new HashSet<String>();
for (TimelineEvent event : entity.getEvents()) {
eventIds.add(event.getId());
}
// Matches if filter's value is contained in the list of events filter's
// compare op is EQUAL.
// If compare op is NOT_EQUAL, for a match to occur, value should not be
// contained in the list of events.
return !(eventIds.contains(existsFilter.getValue()) ^
existsFilter.getCompareOp() == TimelineCompareOp.EQUAL);
}
/**
* Matches event filters.
*
* @param entity entity which holds a set of event objects.
* @param eventFilters the set of event Ids for filtering.
* @return a boolean flag to indicate if both match.
* @throws IOException if an unsupported filter for matching event filters is
* being matched.
*/
public static boolean matchEventFilters(TimelineEntity entity,
TimelineFilterList eventFilters) throws IOException {
return matchFilters(entity, eventFilters, TimelineEntityFiltersType.EVENT);
}
/**
* Compare two values based on comparison operator.
*
* @param compareOp comparison operator.
* @param val1 value 1.
* @param val2 value 2.
* @return true, if relation matches, false otherwise
*/
private static boolean compareValues(TimelineCompareOp compareOp,
long val1, long val2) {
switch (compareOp) {
case LESS_THAN:
return val1 < val2;
case LESS_OR_EQUAL:
return val1 <= val2;
case EQUAL:
return val1 == val2;
case NOT_EQUAL:
return val1 != val2;
case GREATER_OR_EQUAL:
return val1 >= val2;
case GREATER_THAN:
return val1 > val2;
default:
throw new RuntimeException("Unknown TimelineCompareOp " +
compareOp.name());
}
}
/**
* Matches compare filter. Used for metric filters.
*
* @param entity entity which holds the metrics which we will match against.
* @param compareFilter compare filter.
* @param entityFiltersType type of filters we are trying to match.
* @return true, if filter matches, false otherwise.
* @throws IOException if metric filters holds non integral values.
*/
private static boolean matchCompareFilter(TimelineEntity entity,
TimelineCompareFilter compareFilter,
TimelineEntityFiltersType entityFiltersType) throws IOException {
// Currently exists filter is only supported for metric filters.
if (entityFiltersType != TimelineEntityFiltersType.METRIC) {
return false;
}
// We expect only integral values(short/int/long) for metric filters.
if (!isIntegralValue(compareFilter.getValue())) {
throw new IOException("Metric filters has non integral values");
}
Map<String, TimelineMetric> metricMap =
new HashMap<String, TimelineMetric>();
for (TimelineMetric metric : entity.getMetrics()) {
metricMap.put(metric.getId(), metric);
}
TimelineMetric metric = metricMap.get(compareFilter.getKey());
if (metric == null) {
return false;
}
// We will be using the latest value of metric to compare.
return compareValues(compareFilter.getCompareOp(),
metric.getValuesJAXB().firstEntry().getValue().longValue(),
((Number)compareFilter.getValue()).longValue());
}
/**
* Matches metric filters.
*
* @param entity entity which holds a set of metric objects.
* @param metricFilters list of metric filters.
* @return a boolean flag to indicate if both match.
* @throws IOException if an unsupported filter for matching metric filters is
* being matched.
*/
public static boolean matchMetricFilters(TimelineEntity entity,
TimelineFilterList metricFilters) throws IOException {
return matchFilters(
entity, metricFilters, TimelineEntityFiltersType.METRIC);
}
/**
* Common routine to match different filters. Iterates over a filter list and
* calls routines based on filter type.
*
* @param entity Timeline entity.
* @param filters filter list.
* @param entityFiltersType type of filters which are being matched.
* @return a boolean flag to indicate if filter matches.
* @throws IOException if an unsupported filter for matching this specific
* filter is being matched.
*/
private static boolean matchFilters(TimelineEntity entity,
TimelineFilterList filters, TimelineEntityFiltersType entityFiltersType)
throws IOException {
if (filters == null || filters.getFilterList().isEmpty()) {
return false;
}
TimelineFilterList.Operator operator = filters.getOperator();
for (TimelineFilter filter : filters.getFilterList()) {
TimelineFilterType filterType = filter.getFilterType();
if (!entityFiltersType.isValidFilter(filterType)) {
throw new IOException("Unsupported filter " + filterType);
}
boolean matched = false;
switch (filterType) {
case LIST:
matched = matchFilters(entity, (TimelineFilterList)filter,
entityFiltersType);
break;
case COMPARE:
matched = matchCompareFilter(entity, (TimelineCompareFilter)filter,
entityFiltersType);
break;
case EXISTS:
matched = matchExistsFilter(entity, (TimelineExistsFilter)filter,
entityFiltersType);
break;
case KEY_VALUE:
matched = matchKeyValueFilter(entity, (TimelineKeyValueFilter)filter,
entityFiltersType);
break;
case KEY_VALUES:
matched = matchKeyValuesFilter(entity, (TimelineKeyValuesFilter)filter,
entityFiltersType);
break;
default:
throw new IOException("Unsupported filter " + filterType);
}
if (!matched) {
if(operator == TimelineFilterList.Operator.AND) {
return false;
}
} else {
if(operator == TimelineFilterList.Operator.OR) {
return true;
}
}
}
return operator == TimelineFilterList.Operator.AND;
}
/**
* Checks if passed object is of integral type(Short/Integer/Long).
*
* @param obj Object to be checked.
* @return true if object passed is of type Short or Integer or Long, false
* otherwise.
*/
public static boolean isIntegralValue(Object obj) {
return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long);
}
/**
* creates a new cell based on the input cell but with the new value.
*
* @param origCell Original cell
* @param newValue new cell value
* @return cell
* @throws IOException while creating new cell.
*/
public static Cell createNewCell(Cell origCell, byte[] newValue)
throws IOException {
return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
}
/**
* creates a cell with the given inputs.
*
* @param row row of the cell to be created
* @param family column family name of the new cell
* @param qualifier qualifier for the new cell
* @param ts timestamp of the new cell
* @param newValue value of the new cell
* @param tags tags in the new cell
* @return cell
* @throws IOException while creating the cell.
*/
public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
long ts, byte[] newValue, byte[] tags) throws IOException {
return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
newValue, tags);
}
/**
* returns app id from the list of tags.
*
* @param tags cell tags to be looked into
* @return App Id as the AggregationCompactionDimension
*/
public static String getAggregationCompactionDimension(List<Tag> tags) {
String appId = null;
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
appId = Bytes.toString(t.getValue());
return appId;
}
}
return appId;
}
/**
* @param conf Yarn configuration. Used to see if there is an explicit config
* pointing to the HBase config file to read. If null then a new
* HBase configuration will be returned.
* @return a configuration with the HBase configuration from the classpath,
* optionally overwritten by the timeline service configuration URL if
* specified.
* @throws MalformedURLException if a timeline service HBase configuration URL
* is specified but is a malformed URL.
*/
public static Configuration getTimelineServiceHBaseConf(Configuration conf)
throws MalformedURLException {
Configuration hbaseConf;
if (conf == null) {
return HBaseConfiguration.create();
}
String timelineServiceHBaseConfFileURL =
conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
if (timelineServiceHBaseConfFileURL != null
&& timelineServiceHBaseConfFileURL.length() > 0) {
// create a clone so that we don't mess with out input one
hbaseConf = new Configuration(conf);
Configuration plainHBaseConf = new Configuration(false);
URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
plainHBaseConf.addResource(hbaseSiteXML);
HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
} else {
// default to what is on the classpath
hbaseConf = HBaseConfiguration.create(conf);
}
return hbaseConf;
}
}