blob: 8324afd92a8c8f999e338068cb00b1f0f4e2c01a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import com.google.common.base.Preconditions;
/**
* Timeline entity reader for application entities that are stored in the
* application table.
*/
class ApplicationEntityReader extends GenericEntityReader {
private static final ApplicationTable APPLICATION_TABLE =
new ApplicationTable();
public ApplicationEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
Long limit, Long createdTimeBegin, Long createdTimeEnd,
Long modifiedTimeBegin, Long modifiedTimeEnd,
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
eventFilters, fieldsToRetrieve, true);
}
public ApplicationEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
fieldsToRetrieve);
}
/**
* Uses the {@link ApplicationTable}.
*/
protected BaseTable<?> getTable() {
return APPLICATION_TABLE;
}
@Override
protected Result getResult(Configuration hbaseConf, Connection conn)
throws IOException {
byte[] rowKey =
ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
return table.getResult(hbaseConf, conn, get);
}
@Override
protected void validateParams() {
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
if (singleEntityRead) {
Preconditions.checkNotNull(appId, "appId shouldn't be null");
} else {
Preconditions.checkNotNull(userId, "userId shouldn't be null");
Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
}
}
@Override
protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException {
if (singleEntityRead) {
if (flowId == null || flowRunId == null || userId == null) {
FlowContext context =
lookupFlowContext(clusterId, appId, hbaseConf, conn);
flowId = context.flowId;
flowRunId = context.flowRunId;
userId = context.userId;
}
}
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
if (!singleEntityRead) {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
}
if (createdTimeBegin == null) {
createdTimeBegin = DEFAULT_BEGIN_TIME;
}
if (createdTimeEnd == null) {
createdTimeEnd = DEFAULT_END_TIME;
}
if (modifiedTimeBegin == null) {
modifiedTimeBegin = DEFAULT_BEGIN_TIME;
}
if (modifiedTimeEnd == null) {
modifiedTimeEnd = DEFAULT_END_TIME;
}
}
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
Scan scan = new Scan();
if (flowRunId != null) {
scan.setRowPrefixFilter(ApplicationRowKey.
getRowKeyPrefix(clusterId, userId, flowId, flowRunId));
} else {
scan.setRowPrefixFilter(ApplicationRowKey.
getRowKeyPrefix(clusterId, userId, flowId));
}
scan.setFilter(new PageFilter(limit));
return table.getResultScanner(hbaseConf, conn, scan);
}
@Override
protected TimelineEntity parseEntity(Result result) throws IOException {
if (result == null || result.isEmpty()) {
return null;
}
TimelineEntity entity = new TimelineEntity();
entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
String entityId = ApplicationColumn.ID.readResult(result).toString();
entity.setId(entityId);
// fetch created time
Number createdTime =
(Number)ApplicationColumn.CREATED_TIME.readResult(result);
entity.setCreatedTime(createdTime.longValue());
if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
entity.getCreatedTime() > createdTimeEnd)) {
return null;
}
// fetch modified time
Number modifiedTime =
(Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
entity.setModifiedTime(modifiedTime.longValue());
if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
entity.getModifiedTime() > modifiedTimeEnd)) {
return null;
}
// fetch is related to entities
boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
true);
if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
entity.getIsRelatedToEntities(), isRelatedTo)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
entity.getIsRelatedToEntities().clear();
}
}
// fetch relates to entities
boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
false);
if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
entity.getRelatesToEntities(), relatesTo)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.RELATES_TO)) {
entity.getRelatesToEntities().clear();
}
}
// fetch info
boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
if (checkInfo &&
!TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.INFO)) {
entity.getInfo().clear();
}
}
// fetch configs
boolean checkConfigs = configFilters != null && configFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
if (checkConfigs && !TimelineStorageUtils.matchFilters(
entity.getConfigs(), configFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.CONFIGS)) {
entity.getConfigs().clear();
}
}
// fetch events
boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
readEvents(entity, result, true);
if (checkEvents && !TimelineStorageUtils.matchEventFilters(
entity.getEvents(), eventFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.EVENTS)) {
entity.getEvents().clear();
}
}
// fetch metrics
boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
if (fieldsToRetrieve.contains(Field.ALL) ||
fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
entity.getMetrics(), metricFilters)) {
return null;
}
if (!fieldsToRetrieve.contains(Field.ALL) &&
!fieldsToRetrieve.contains(Field.METRICS)) {
entity.getMetrics().clear();
}
}
return entity;
}
}