blob: 747f6abfa23d866043564288c29852c3128f0fbf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineSchemaUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
/**
* Represents a rowkey for the flow activity table.
*/
public class FlowActivityRowKey {
private final String clusterId;
private final Long dayTs;
private final String userId;
private final String flowName;
private final FlowActivityRowKeyConverter
flowActivityRowKeyConverter = new FlowActivityRowKeyConverter();
/**
* @param clusterId identifying the cluster
* @param dayTs to be converted to the top of the day timestamp
* @param userId identifying user
* @param flowName identifying the flow
*/
public FlowActivityRowKey(String clusterId, Long dayTs, String userId,
String flowName) {
this(clusterId, dayTs, userId, flowName, true);
}
/**
* @param clusterId identifying the cluster
* @param timestamp when the flow activity happened. May be converted to the
* top of the day depending on the convertDayTsToTopOfDay argument.
* @param userId identifying user
* @param flowName identifying the flow
* @param convertDayTsToTopOfDay if true and timestamp isn't null, then
* timestamp will be converted to the top-of-the day timestamp
*/
protected FlowActivityRowKey(String clusterId, Long timestamp, String userId,
String flowName, boolean convertDayTsToTopOfDay) {
this.clusterId = clusterId;
if (convertDayTsToTopOfDay && (timestamp != null)) {
this.dayTs = HBaseTimelineSchemaUtils.getTopOfTheDayTimestamp(timestamp);
} else {
this.dayTs = timestamp;
}
this.userId = userId;
this.flowName = flowName;
}
public String getClusterId() {
return clusterId;
}
public Long getDayTimestamp() {
return dayTs;
}
public String getUserId() {
return userId;
}
public String getFlowName() {
return flowName;
}
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowName}.
*
* @return byte array for the row key
*/
public byte[] getRowKey() {
return flowActivityRowKeyConverter.encode(this);
}
/**
* Given the raw row key as bytes, returns the row key as an object.
*
* @param rowKey Byte representation of row key.
* @return A <cite>FlowActivityRowKey</cite> object.
*/
public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
return new FlowActivityRowKeyConverter().decode(rowKey);
}
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowName}.
* @return String representation of row key
*/
public String getRowKeyAsString() {
return flowActivityRowKeyConverter.encodeAsString(this);
}
/**
* Given the raw row key as string, returns the row key as an object.
* @param encodedRowKey String representation of row key.
* @return A <cite>FlowActivityRowKey</cite> object.
*/
public static FlowActivityRowKey parseRowKeyFromString(String encodedRowKey) {
return new FlowActivityRowKeyConverter().decodeFromString(encodedRowKey);
}
/**
* Encodes and decodes row key for flow activity table. The row key is of the
* form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
* timestamp) is a long and rest are strings.
* <p>
*/
final private static class FlowActivityRowKeyConverter
implements KeyConverter<FlowActivityRowKey>,
KeyConverterToString<FlowActivityRowKey> {
private FlowActivityRowKeyConverter() {
}
/**
* The flow activity row key is of the form
* clusterId!dayTimestamp!user!flowName with each segment separated by !.
* The sizes below indicate sizes of each one of these segements in
* sequence. clusterId, user and flowName are strings. Top of the day
* timestamp is a long hence 8 bytes in size. Strings are variable in size
* (i.e. they end whenever separator is encountered). This is used while
* decoding and helps in determining where to split.
*/
private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };
/*
* (non-Javadoc)
*
* Encodes FlowActivityRowKey object into a byte array with each
* component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
* This leads to an flow activity table row key of the form
* clusterId!dayTimestamp!user!flowName. If dayTimestamp in passed
* FlowActivityRowKey object is null and clusterId is not null, then this
* returns a row key prefix as clusterId! and if userId in
* FlowActivityRowKey is null (and the fields preceding it i.e. clusterId
* and dayTimestamp are not null), this returns a row key prefix as
* clusterId!dayTimeStamp! dayTimestamp is inverted while encoding as it
* helps maintain a descending order for row keys in flow activity table.
*
* @see org.apache.hadoop.yarn.server.timelineservice.storage.common
* .KeyConverter#encode(java.lang.Object)
*/
@Override
public byte[] encode(FlowActivityRowKey rowKey) {
if (rowKey.getDayTimestamp() == null) {
return Separator.QUALIFIERS.join(Separator.encode(
rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS), Separator.EMPTY_BYTES);
}
if (rowKey.getUserId() == null) {
return Separator.QUALIFIERS.join(Separator.encode(
rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS), Bytes.toBytes(LongConverter
.invertLong(rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
}
return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Bytes
.toBytes(LongConverter.invertLong(rowKey.getDayTimestamp())),
Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
}
/*
* (non-Javadoc)
*
* @see
* org.apache.hadoop.yarn.server.timelineservice.storage.common
* .KeyConverter#decode(byte[])
*/
@Override
public FlowActivityRowKey decode(byte[] rowKey) {
byte[][] rowKeyComponents =
Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
if (rowKeyComponents.length != 4) {
throw new IllegalArgumentException("the row key is not valid for "
+ "a flow activity");
}
String clusterId =
Separator.decode(Bytes.toString(rowKeyComponents[0]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
Long dayTs = LongConverter.invertLong(Bytes.toLong(rowKeyComponents[1]));
String userId =
Separator.decode(Bytes.toString(rowKeyComponents[2]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
String flowName =
Separator.decode(Bytes.toString(rowKeyComponents[3]),
Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
}
@Override
public String encodeAsString(FlowActivityRowKey key) {
if (key.getDayTimestamp() == null) {
return TimelineReaderUtils
.joinAndEscapeStrings(new String[] {key.clusterId});
} else if (key.getUserId() == null) {
return TimelineReaderUtils.joinAndEscapeStrings(
new String[] {key.clusterId, key.dayTs.toString()});
} else if (key.getFlowName() == null) {
return TimelineReaderUtils.joinAndEscapeStrings(
new String[] {key.clusterId, key.dayTs.toString(), key.userId});
}
return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
key.clusterId, key.dayTs.toString(), key.userId, key.flowName});
}
@Override
public FlowActivityRowKey decodeFromString(String encodedRowKey) {
List<String> split = TimelineReaderUtils.split(encodedRowKey);
if (split == null || split.size() != 4) {
throw new IllegalArgumentException(
"Invalid row key for flow activity.");
}
Long dayTs = Long.valueOf(split.get(1));
return new FlowActivityRowKey(split.get(0), dayTs, split.get(2),
split.get(3));
}
}
}