blob: 0956f1e5676934f0ef4bcac4d4e3fa5cae4e3f78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper;
import org.apache.hadoop.yarn.webapp.NotFoundException;
/**
* The base class for reading timeline data from the HBase storage. This class
* provides basic support to validate and augment reader context.
*/
public abstract class AbstractTimelineStorageReader {
private final TimelineReaderContext context;
/**
* Used to look up the flow context.
*/
private final AppToFlowTableRW appToFlowTable = new AppToFlowTableRW();
public AbstractTimelineStorageReader(TimelineReaderContext ctxt) {
context = ctxt;
}
protected TimelineReaderContext getContext() {
return context;
}
/**
* Looks up flow context from AppToFlow table.
*
* @param appToFlowRowKey to identify Cluster and App Ids.
* @param clusterId the cluster id.
* @param hbaseConf HBase configuration.
* @param conn HBase Connection.
* @return flow context information.
* @throws IOException if any problem occurs while fetching flow information.
*/
protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey,
String clusterId, Configuration hbaseConf, Connection conn)
throws IOException {
byte[] rowKey = appToFlowRowKey.getRowKey();
Get get = new Get(rowKey);
Result result = appToFlowTable.getResult(hbaseConf, conn, get);
if (result != null && !result.isEmpty()) {
Object flowName = ColumnRWHelper.readResult(
result, AppToFlowColumnPrefix.FLOW_NAME, clusterId);
Object flowRunId = ColumnRWHelper.readResult(
result, AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId);
Object userId = ColumnRWHelper.readResult(
result, AppToFlowColumnPrefix.USER_ID, clusterId);
if (flowName == null || userId == null || flowRunId == null) {
throw new NotFoundException(
"Unable to find the context flow name, and flow run id, "
+ "and user id for clusterId=" + clusterId
+ ", appId=" + appToFlowRowKey.getAppId());
}
return new FlowContext((String)userId, (String)flowName,
((Number)flowRunId).longValue());
} else {
throw new NotFoundException(
"Unable to find the context flow name, and flow run id, "
+ "and user id for clusterId=" + clusterId
+ ", appId=" + appToFlowRowKey.getAppId());
}
}
/**
* Sets certain parameters to defaults if the values are not provided.
*
* @param hbaseConf HBase Configuration.
* @param conn HBase Connection.
* @throws IOException if any exception is encountered while setting params.
*/
protected void augmentParams(Configuration hbaseConf, Connection conn)
throws IOException {
defaultAugmentParams(hbaseConf, conn);
}
/**
* Default behavior for all timeline readers to augment parameters.
*
* @param hbaseConf HBase Configuration.
* @param conn HBase Connection.
* @throws IOException if any exception is encountered while setting params.
*/
final protected void defaultAugmentParams(Configuration hbaseConf,
Connection conn) throws IOException {
// In reality all three should be null or neither should be null
if (context.getFlowName() == null || context.getFlowRunId() == null
|| context.getUserId() == null) {
// Get flow context information from AppToFlow table.
AppToFlowRowKey appToFlowRowKey =
new AppToFlowRowKey(context.getAppId());
FlowContext flowContext =
lookupFlowContext(appToFlowRowKey, context.getClusterId(), hbaseConf,
conn);
context.setFlowName(flowContext.flowName);
context.setFlowRunId(flowContext.flowRunId);
context.setUserId(flowContext.userId);
}
}
/**
* Validates the required parameters to read the entities.
*/
protected abstract void validateParams();
/**
* Encapsulates flow context information.
*/
protected static class FlowContext {
private final String userId;
private final String flowName;
private final Long flowRunId;
public FlowContext(String user, String flowName, Long flowRunId) {
this.userId = user;
this.flowName = flowName;
this.flowRunId = flowRunId;
}
protected String getUserId() {
return userId;
}
protected String getFlowName() {
return flowName;
}
protected Long getFlowRunId() {
return flowRunId;
}
}
}