blob: ba18ba51efd61c9372f2dc96798091d0872b8767 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import org.junit.jupiter.api.Test;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Class to test the row key structures for various tables.
*
*/
public class TestRowKeys {
private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue();
private final static byte[] QUALIFIER_SEP_BYTES = Bytes
.toBytes(QUALIFIER_SEP);
private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster";
private final static String USER = QUALIFIER_SEP + "user";
private final static String SUB_APP_USER = QUALIFIER_SEP + "subAppUser";
private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow"
+ QUALIFIER_SEP;
private final static Long FLOW_RUN_ID;
private final static String APPLICATION_ID;
static {
long runid = Long.MAX_VALUE - 900L;
byte[] longMaxByteArr = Bytes.toBytes(Long.MAX_VALUE);
byte[] byteArr = Bytes.toBytes(runid);
int sepByteLen = QUALIFIER_SEP_BYTES.length;
if (sepByteLen <= byteArr.length) {
for (int i = 0; i < sepByteLen; i++) {
byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]);
}
}
FLOW_RUN_ID = Bytes.toLong(byteArr);
long clusterTs = System.currentTimeMillis();
byteArr = Bytes.toBytes(clusterTs);
if (sepByteLen <= byteArr.length) {
for (int i = 0; i < sepByteLen; i++) {
byteArr[byteArr.length - sepByteLen + i] =
(byte) (longMaxByteArr[byteArr.length - sepByteLen + i] -
QUALIFIER_SEP_BYTES[i]);
}
}
clusterTs = Bytes.toLong(byteArr);
int seqId = 222;
APPLICATION_ID = ApplicationId.newInstance(clusterTs, seqId).toString();
}
private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) {
int sepLen = QUALIFIER_SEP_BYTES.length;
for (int i = 0; i < sepLen; i++) {
assertTrue(
byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] ==
QUALIFIER_SEP_BYTES[i],
"Row key prefix not encoded properly.");
}
}
@Test
void testApplicationRowKey() {
byte[] byteRowKey =
new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID).getRowKey();
ApplicationRowKey rowKey = ApplicationRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
byte[] byteRowKeyPrefix =
new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)
.getRowKeyPrefix();
byte[][] splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix,
new int[]{Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
Separator.VARIABLE_SIZE});
assertEquals(5, splits.length);
assertEquals(0, splits[4].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
assertEquals(FLOW_RUN_ID,
(Long) LongConverter.invertLong(Bytes.toLong(splits[3])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix =
new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME).getRowKeyPrefix();
splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[]{
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE});
assertEquals(4, splits.length);
assertEquals(0, splits[3].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
/**
* Tests the converters indirectly through the public methods of the
* corresponding rowkey.
*/
@Test
void testAppToFlowRowKey() {
byte[] byteRowKey = new AppToFlowRowKey(APPLICATION_ID).getRowKey();
AppToFlowRowKey rowKey = AppToFlowRowKey.parseRowKey(byteRowKey);
assertEquals(APPLICATION_ID, rowKey.getAppId());
}
@Test
void testEntityRowKey() {
TimelineEntity entity = new TimelineEntity();
entity.setId("!ent!ity!!id!");
entity.setType("entity!Type");
entity.setIdPrefix(54321);
byte[] byteRowKey =
new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID,
entity.getType(), entity.getIdPrefix(),
entity.getId()).getRowKey();
EntityRowKey rowKey = EntityRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
assertEquals(APPLICATION_ID, rowKey.getAppId());
assertEquals(entity.getType(), rowKey.getEntityType());
assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue());
assertEquals(entity.getId(), rowKey.getEntityId());
byte[] byteRowKeyPrefix =
new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID, entity.getType(), null, null)
.getRowKeyPrefix();
byte[][] splits =
Separator.QUALIFIERS.split(
byteRowKeyPrefix,
new int[]{Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE,
Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE});
assertEquals(7, splits.length);
assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4]));
assertEquals(entity.getType(),
Separator.QUALIFIERS.decode(Bytes.toString(splits[5])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix =
new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID,
APPLICATION_ID).getRowKeyPrefix();
splits =
Separator.QUALIFIERS.split(
byteRowKeyPrefix,
new int[]{Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE});
assertEquals(6, splits.length);
assertEquals(0, splits[5].length);
AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter();
assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4]));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
void testFlowActivityRowKey() {
Long ts = 1459900830000L;
Long dayTimestamp = HBaseTimelineSchemaUtils.getTopOfTheDayTimestamp(ts);
byte[] byteRowKey =
new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey();
FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(dayTimestamp, rowKey.getDayTimestamp());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
byte[] byteRowKeyPrefix =
new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix();
byte[][] splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[]{
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE});
assertEquals(2, splits.length);
assertEquals(0, splits[1].length);
assertEquals(CLUSTER,
Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
verifyRowPrefixBytes(byteRowKeyPrefix);
byteRowKeyPrefix =
new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix();
splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix,
new int[]{Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
Separator.VARIABLE_SIZE});
assertEquals(3, splits.length);
assertEquals(0, splits[2].length);
assertEquals(CLUSTER,
Separator.QUALIFIERS.decode(Bytes.toString(splits[0])));
assertEquals(ts,
(Long) LongConverter.invertLong(Bytes.toLong(splits[1])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
void testFlowRunRowKey() {
byte[] byteRowKey =
new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID).getRowKey();
FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(USER, rowKey.getUserId());
assertEquals(FLOW_NAME, rowKey.getFlowName());
assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId());
byte[] byteRowKeyPrefix =
new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null).getRowKey();
byte[][] splits =
Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[]{
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE});
assertEquals(4, splits.length);
assertEquals(0, splits[3].length);
assertEquals(FLOW_NAME,
Separator.QUALIFIERS.decode(Bytes.toString(splits[2])));
verifyRowPrefixBytes(byteRowKeyPrefix);
}
@Test
void testSubAppRowKey() {
TimelineEntity entity = new TimelineEntity();
entity.setId("entity1");
entity.setType("DAG");
entity.setIdPrefix(54321);
byte[] byteRowKey =
new SubApplicationRowKey(SUB_APP_USER, CLUSTER,
entity.getType(), entity.getIdPrefix(),
entity.getId(), USER).getRowKey();
SubApplicationRowKey rowKey = SubApplicationRowKey.parseRowKey(byteRowKey);
assertEquals(CLUSTER, rowKey.getClusterId());
assertEquals(SUB_APP_USER, rowKey.getSubAppUserId());
assertEquals(entity.getType(), rowKey.getEntityType());
assertEquals(entity.getIdPrefix(), rowKey.getEntityIdPrefix().longValue());
assertEquals(entity.getId(), rowKey.getEntityId());
assertEquals(USER, rowKey.getUserId());
}
@Test
void testDomainRowKey() {
String clusterId = "cluster1@dc1";
String domainId = "helloworld";
byte[] byteRowKey =
new DomainRowKey(clusterId, domainId).getRowKey();
DomainRowKey rowKey = DomainRowKey.parseRowKey(byteRowKey);
assertEquals(clusterId, rowKey.getClusterId());
assertEquals(domainId, rowKey.getDomainId());
String rowKeyStr = rowKey.getRowKeyAsString();
DomainRowKey drk = DomainRowKey.parseRowKeyFromString(rowKeyStr);
assertEquals(drk.getClusterId(), rowKey.getClusterId());
assertEquals(drk.getDomainId(), rowKey.getDomainId());
}
@Test
void testDomainRowKeySpecialChars() {
String clusterId = "cluster1!temp!dc1";
String domainId = "hello=world";
byte[] byteRowKey =
new DomainRowKey(clusterId, domainId).getRowKey();
DomainRowKey rowKey = DomainRowKey.parseRowKey(byteRowKey);
assertEquals(clusterId, rowKey.getClusterId());
assertEquals(domainId, rowKey.getDomainId());
String rowKeyStr = rowKey.getRowKeyAsString();
DomainRowKey drk = DomainRowKey.parseRowKeyFromString(rowKeyStr);
assertEquals(drk.getClusterId(), rowKey.getClusterId());
assertEquals(drk.getDomainId(), rowKey.getDomainId());
}
}