blob: 05570f1f530d3ea5384190d25a41a07c59c80901 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;
/**
* Timeline entity reader for listing all available entity types given one
* reader context. Right now only supports listing all entity types within one
* YARN application.
*/
public final class EntityTypeReader extends AbstractTimelineStorageReader {
private static final Log LOG = LogFactory.getLog(EntityTypeReader.class);
private static final EntityTable ENTITY_TABLE = new EntityTable();
public EntityTypeReader(TimelineReaderContext context) {
super(context);
}
/**
* Reads a set of timeline entity types from the HBase storage for the given
* context.
*
* @param hbaseConf HBase Configuration.
* @param conn HBase Connection.
* @return a set of <cite>TimelineEntity</cite> objects, with only type field
* set.
* @throws IOException if any exception is encountered while reading entities.
*/
public Set<String> readEntityTypes(Configuration hbaseConf,
Connection conn) throws IOException {
validateParams();
augmentParams(hbaseConf, conn);
Set<String> types = new TreeSet<>();
TimelineReaderContext context = getContext();
EntityRowKeyPrefix prefix = new EntityRowKeyPrefix(context.getClusterId(),
context.getUserId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId());
byte[] currRowKey = prefix.getRowKeyPrefix();
byte[] nextRowKey = prefix.getRowKeyPrefix();
nextRowKey[nextRowKey.length - 1]++;
FilterList typeFilterList = new FilterList();
typeFilterList.addFilter(new FirstKeyOnlyFilter());
typeFilterList.addFilter(new KeyOnlyFilter());
typeFilterList.addFilter(new PageFilter(1));
if (LOG.isDebugEnabled()) {
LOG.debug("FilterList created for scan is - " + typeFilterList);
}
int counter = 0;
while (true) {
try (ResultScanner results =
getResult(hbaseConf, conn, typeFilterList, currRowKey, nextRowKey)) {
TimelineEntity entity = parseEntityForType(results.next());
if (entity == null) {
break;
}
++counter;
if (!types.add(entity.getType())) {
LOG.warn("Failed to add type " + entity.getType()
+ " to the result set because there is a duplicated copy. ");
}
String currType = entity.getType();
if (LOG.isDebugEnabled()) {
LOG.debug("Current row key: " + Arrays.toString(currRowKey));
LOG.debug("New entity type discovered: " + currType);
}
currRowKey = getNextRowKey(prefix.getRowKeyPrefix(), currType);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Scanned " + counter + "records for "
+ types.size() + "types");
}
return types;
}
@Override
protected void validateParams() {
Preconditions.checkNotNull(getContext(), "context shouldn't be null");
Preconditions.checkNotNull(getContext().getClusterId(),
"clusterId shouldn't be null");
Preconditions.checkNotNull(getContext().getAppId(),
"appId shouldn't be null");
}
/**
* Gets the possibly next row key prefix given current prefix and type.
*
* @param currRowKeyPrefix The current prefix that contains user, cluster,
* flow, run, and application id.
* @param entityType Current entity type.
* @return A new prefix for the possibly immediately next row key.
*/
private static byte[] getNextRowKey(byte[] currRowKeyPrefix,
String entityType) {
if (currRowKeyPrefix == null || entityType == null) {
return null;
}
byte[] entityTypeEncoded = Separator.QUALIFIERS.join(
Separator.encode(entityType, Separator.SPACE, Separator.TAB,
Separator.QUALIFIERS),
Separator.EMPTY_BYTES);
byte[] currRowKey
= new byte[currRowKeyPrefix.length + entityTypeEncoded.length];
System.arraycopy(currRowKeyPrefix, 0, currRowKey, 0,
currRowKeyPrefix.length);
System.arraycopy(entityTypeEncoded, 0, currRowKey, currRowKeyPrefix.length,
entityTypeEncoded.length);
return HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
currRowKey);
}
private ResultScanner getResult(Configuration hbaseConf, Connection conn,
FilterList filterList, byte[] startPrefix, byte[] endPrefix)
throws IOException {
Scan scan = new Scan(startPrefix, endPrefix);
scan.setFilter(filterList);
scan.setSmall(true);
return ENTITY_TABLE.getResultScanner(hbaseConf, conn, scan);
}
private TimelineEntity parseEntityForType(Result result)
throws IOException {
if (result == null || result.isEmpty()) {
return null;
}
TimelineEntity entity = new TimelineEntity();
EntityRowKey newRowKey = EntityRowKey.parseRowKey(result.getRow());
entity.setType(newRowKey.getEntityType());
return entity;
}
}