blob: 368b0604d5a89752d89754438aac760a5c1249b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.junit.Test;
public class TestRowKeys {
private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
private final static byte[] QUALIFIER_SEP_BYTES = Bytes
.toBytes(QUALIFIER_SEP);
private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
private final static String USER = QUALIFIER_SEP + "user";
private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow"
+ QUALIFIER_SEP;
private final static Long FLOW_RUN_ID;
private final static String APPLICATION_ID;
static {
long runid = Long.MAX_VALUE - 900L;
byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
byte[] byteArr = Bytes.toBytes(runid);
int sepByteLen = QUALIFIER_SEP_BYTES.length;
if (sepByteLen <= byteArr.length) {
for (int i = 0; i < sepByteLen; i++) {
byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
}
}
FLOW_RUN_ID = Bytes.toLong(byteArr);
long clusterTs = System.currentTimeMillis();
byteArr = Bytes.toBytes(clusterTs);
if (sepByteLen <= byteArr.length) {
for (int i = 0; i < sepByteLen; i++) {
byteArr[byteArr.length - sepByteLen + i] =
(byte) (longMaxByteArr[byteArr.length - sepByteLen + i] -
QUALIFIER_SEP_BYTES[i]);
}
}
clusterTs = Bytes.toLong(byteArr);
int seqId = 222;
APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
}
private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
int sepLen = QUALIFIER_SEP_BYTES.length;
for (int i = 0; i < sepLen; i++) {
assertTrue(
"Row key prefix not encoded properly.",
byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
QUALIFIER_SEP_BYTES[i]);
}
}
@Test
public void testApplicationRowKey() {
byte[] byteRowKey =
new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID).getRowKey();
ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
byte[] byteRowKeyPrefix =
new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)
.getRowKeyPrefix();
byte[][] splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix,
new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
Separator.VARIABLE_SIZE});
assertEquals(5, splits.length);
assertEquals(0, splits[4].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
assertEquals(FLOW_RUN_ID,
(Long) LongConverter.invertLong(Bytes.toLong(splits[3])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix =
new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME).getRowKeyPrefix();
splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
assertEquals(4, splits.length);
assertEquals(0, splits[3].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
/**
* Tests the converters indirectly through the public methods of the
* corresponding rowkey.
*/
@Test
public void testAppToFlowRowKey() {
byte[] byteRowKey = new AppToFlowRowKey(CLUSTER,
APPLICATION_ID).getRowKey();
AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
}
@Test
public void testEntityRowKey() {
String entityId = "!ent!ity!!id!";
String entityType = "entity!Type";
byte[] byteRowKey =
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
entityType, entityId).getRowKey();
EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
assertEquals(entityType, rowKey.getEntityType());
assertEquals(entityId, rowKey.getEntityId());
byte[] byteRowKeyPrefix =
new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID, entityType).getRowKeyPrefix();
byte[][] splits =
Separator.QUALIFIERS.split(
byteRowKeyPrefix,
new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE});
assertEquals(7, splits.length);
assertEquals(0, splits[6].length);
assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
assertEquals(entityType,
Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix =
new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID).getRowKeyPrefix();
splits =
Separator.QUALIFIERS.split(
byteRowKeyPrefix,
new int[] {Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE});
assertEquals(6, splits.length);
assertEquals(0, splits[5].length);
AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4]));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testFlowActivityRowKey() {
Long ts = 1459900830000L;
Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
byte[] byteRowKey =
new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(dayTimestamp, rowKey.getDayTimestamp());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
byte[] byteRowKeyPrefix =
new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix();
byte[][] splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
assertEquals(2, splits.length);
assertEquals(0, splits[1].length);
assertEquals(CLUSTER,
Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix =
new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix();
splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix,
new int[] {Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
Separator.VARIABLE_SIZE});
assertEquals(3, splits.length);
assertEquals(0, splits[2].length);
assertEquals(CLUSTER,
Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
assertEquals(ts,
(Long) LongConverter.invertLong(Bytes.toLong(splits[1])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
public void testFlowRunRowKey() {
byte[] byteRowKey =
new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID).getRowKey();
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
byte[] byteRowKeyPrefix =
new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null).getRowKey();
byte[][] splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] {
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE });
assertEquals(4, splits.length);
assertEquals(0, splits[3].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
}