blob: 523b4d835d1d684af64dada2fc5a8a498f6409cd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* A bunch of utility functions used in HBase TimelineService backend.
*/
public final class HBaseTimelineStorageUtils {
/** milliseconds in one day. */
public static final long MILLIS_ONE_DAY = 86400000L;
private static final Log LOG =
LogFactory.getLog(HBaseTimelineStorageUtils.class);
private HBaseTimelineStorageUtils() {
}
/**
* Combines the input array of attributes and the input aggregation operation
* into a new array of attributes.
*
* @param attributes Attributes to be combined.
* @param aggOp Aggregation operation.
* @return array of combined attributes.
*/
public static Attribute[] combineAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
Attribute[] combinedAttributes = new Attribute[newLength];
if (attributes != null) {
System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
}
if (aggOp != null) {
Attribute a2 = aggOp.getAttribute();
combinedAttributes[newLength - 1] = a2;
}
return combinedAttributes;
}
/**
* Returns a number for the new array size. The new array is the combination
* of input array of attributes and the input aggregation operation.
*
* @param attributes Attributes.
* @param aggOp Aggregation operation.
* @return the size for the new array
*/
private static int getNewLengthCombinedAttributes(Attribute[] attributes,
AggregationOperation aggOp) {
int oldLength = getAttributesLength(attributes);
int aggLength = getAppOpLength(aggOp);
return oldLength + aggLength;
}
private static int getAppOpLength(AggregationOperation aggOp) {
if (aggOp != null) {
return 1;
}
return 0;
}
private static int getAttributesLength(Attribute[] attributes) {
if (attributes != null) {
return attributes.length;
}
return 0;
}
/**
* Returns the first seen aggregation operation as seen in the list of input
* tags or null otherwise.
*
* @param tags list of HBase tags.
* @return AggregationOperation
*/
public static AggregationOperation getAggregationOperationFromTagsList(
List<Tag> tags) {
for (AggregationOperation aggOp : AggregationOperation.values()) {
for (Tag tag : tags) {
if (tag.getType() == aggOp.getTagType()) {
return aggOp;
}
}
}
return null;
}
/**
* Creates a {@link Tag} from the input attribute.
*
* @param attribute Attribute from which tag has to be fetched.
* @return a HBase Tag.
*/
public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) {
// attribute could be either an Aggregation Operation or
// an Aggregation Dimension
// Get the Tag type from either
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
return t;
}
AggregationCompactionDimension aggCompactDim =
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
}
/**
* creates a new cell based on the input cell but with the new value.
*
* @param origCell Original cell
* @param newValue new cell value
* @return cell
* @throws IOException while creating new cell.
*/
public static Cell createNewCell(Cell origCell, byte[] newValue)
throws IOException {
return CellUtil.createCell(CellUtil.cloneRow(origCell),
CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
}
/**
* creates a cell with the given inputs.
*
* @param row row of the cell to be created
* @param family column family name of the new cell
* @param qualifier qualifier for the new cell
* @param ts timestamp of the new cell
* @param newValue value of the new cell
* @param tags tags in the new cell
* @return cell
* @throws IOException while creating the cell.
*/
public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
long ts, byte[] newValue, byte[] tags) throws IOException {
return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
newValue, tags);
}
/**
* returns app id from the list of tags.
*
* @param tags cell tags to be looked into
* @return App Id as the AggregationCompactionDimension
*/
public static String getAggregationCompactionDimension(List<Tag> tags) {
String appId = null;
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
appId = Bytes.toString(t.getValue());
return appId;
}
}
return appId;
}
/**
* Converts an int into it's inverse int to be used in (row) keys
* where we want to have the largest int value in the top of the table
* (scans start at the largest int first).
*
* @param key value to be inverted so that the latest version will be first in
* a scan.
* @return inverted int
*/
public static int invertInt(int key) {
return Integer.MAX_VALUE - key;
}
/**
* returns the timestamp of that day's start (which is midnight 00:00:00 AM)
* for a given input timestamp.
*
* @param ts Timestamp.
* @return timestamp of that day's beginning (midnight)
*/
public static long getTopOfTheDayTimestamp(long ts) {
long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
return dayTimestamp;
}
/**
* @param conf Yarn configuration. Used to see if there is an explicit config
* pointing to the HBase config file to read. It should not be null
* or a NullPointerException will be thrown.
* @return a configuration with the HBase configuration from the classpath,
* optionally overwritten by the timeline service configuration URL if
* specified.
* @throws MalformedURLException if a timeline service HBase configuration URL
* is specified but is a malformed URL.
*/
public static Configuration getTimelineServiceHBaseConf(Configuration conf)
throws MalformedURLException {
if (conf == null) {
throw new NullPointerException();
}
Configuration hbaseConf;
String timelineServiceHBaseConfFileURL =
conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
if (timelineServiceHBaseConfFileURL != null
&& timelineServiceHBaseConfFileURL.length() > 0) {
LOG.info("Using hbase configuration at " +
timelineServiceHBaseConfFileURL);
// create a clone so that we don't mess with out input one
hbaseConf = new Configuration(conf);
Configuration plainHBaseConf = new Configuration(false);
URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
plainHBaseConf.addResource(hbaseSiteXML);
HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
} else {
// default to what is on the classpath
hbaseConf = HBaseConfiguration.create(conf);
}
return hbaseConf;
}
/**
* Given a row key prefix stored in a byte array, return a byte array for its
* immediate next row key.
*
* @param rowKeyPrefix The provided row key prefix, represented in an array.
* @return the closest next row key of the provided row key.
*/
public static byte[] calculateTheClosestNextRowKeyForPrefix(
byte[] rowKeyPrefix) {
// Essentially we are treating it like an 'unsigned very very long' and
// doing +1 manually.
// Search for the place where the trailing 0xFFs start
int offset = rowKeyPrefix.length;
while (offset > 0) {
if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
break;
}
offset--;
}
if (offset == 0) {
// We got an 0xFFFF... (only FFs) stopRow value which is
// the last possible prefix before the end of the table.
// So set it to stop at the 'end of the table'
return HConstants.EMPTY_END_ROW;
}
// Copy the right length of the original
byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
// And increment the last one
newStopRow[newStopRow.length - 1]++;
return newStopRow;
}
/**
* Checks if passed object is of integral type(Short/Integer/Long).
*
* @param obj Object to be checked.
* @return true if object passed is of type Short or Integer or Long, false
* otherwise.
*/
public static boolean isIntegralValue(Object obj) {
return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long);
}
}