blob: 42cd23c320796eda6118df0d6683f7e4bb980bb4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.HBaseSerDe;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.hbase.HBaseBulkOutputFormat.HBaseBulkOutputCommitter;
import org.apache.hcatalog.hbase.HBaseDirectOutputFormat.HBaseDirectOutputCommitter;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.RevisionManagerConfiguration;
import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.apache.hcatalog.mapreduce.HCatStorageHandler;
import org.apache.thrift.TBase;
import org.apache.zookeeper.ZooKeeper;
import com.facebook.fb303.FacebookBase;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class HBaseHCatStorageHandler provides functionality to create HBase
* tables through HCatalog. The implementation is very similar to the
* HiveHBaseStorageHandler, with more details to suit HCatalog.
*/
public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Configurable {
public final static String DEFAULT_PREFIX = "default.";
private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
private Configuration hbaseConf;
private Configuration jobConf;
private HBaseAdmin admin;
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
// Populate jobProperties with input table name, table columns, RM snapshot,
// hbase-default.xml and hbase-site.xml
Map<String, String> tableJobProperties = tableDesc.getJobProperties();
String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
try {
InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
Configuration jobConf = getJobConf();
addResources(jobConf, jobProperties);
JobConf copyOfConf = new JobConf(jobConf);
HBaseConfiguration.addHbaseResources(copyOfConf);
//Getting hbase delegation token in getInputSplits does not work with PIG. So need to
//do it here
if (jobConf instanceof JobConf) { //Should be the case
HBaseUtil.addHBaseDelegationToken(copyOfConf);
((JobConf) jobConf).getCredentials().addAll(copyOfConf.getCredentials());
}
String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
String serSnapshot = (String) inputJobInfo.getProperties().get(
HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
if (serSnapshot == null) {
HCatTableSnapshot snapshot =
HBaseRevisionManagerUtil.createSnapshot(
RevisionManagerConfiguration.create(copyOfConf),
qualifiedTableName, tableInfo);
jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
HCatUtil.serialize(snapshot));
}
//This adds it directly to the jobConf. Setting in jobProperties does not get propagated
//to JobConf as of now as the jobProperties is maintained per partition
//TODO: Remove when HCAT-308 is fixed
addOutputDependencyJars(jobConf);
jobProperties.put("tmpjars", jobConf.get("tmpjars"));
} catch (IOException e) {
throw new IllegalStateException("Error while configuring job properties", e);
}
}
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
// Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
// Populate RM transaction in OutputJobInfo
// In case of bulk mode, populate intermediate output location
Map<String, String> tableJobProperties = tableDesc.getJobProperties();
String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
try {
OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedHBaseTableName(tableInfo);
jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
jobProperties.put(TableOutputFormat.OUTPUT_TABLE, qualifiedTableName);
Configuration jobConf = getJobConf();
addResources(jobConf, jobProperties);
Configuration copyOfConf = new Configuration(jobConf);
HBaseConfiguration.addHbaseResources(copyOfConf);
String txnString = outputJobInfo.getProperties().getProperty(
HBaseConstants.PROPERTY_WRITE_TXN_KEY);
Transaction txn = null;
if (txnString == null) {
txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo,
RevisionManagerConfiguration.create(copyOfConf));
String serializedTxn = HCatUtil.serialize(txn);
outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
serializedTxn);
} else {
txn = (Transaction) HCatUtil.deserialize(txnString);
}
if (isBulkMode(outputJobInfo)) {
String tableLocation = tableInfo.getTableLocation();
String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
.toString();
outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
// We are writing out an intermediate sequenceFile hence
// location is not passed in OutputJobInfo.getLocation()
// TODO replace this with a mapreduce constant when available
jobProperties.put("mapred.output.dir", location);
jobProperties.put("mapred.output.committer.class", HBaseBulkOutputCommitter.class.getName());
} else {
jobProperties.put("mapred.output.committer.class", HBaseDirectOutputCommitter.class.getName());
}
jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
addOutputDependencyJars(jobConf);
jobProperties.put("tmpjars", jobConf.get("tmpjars"));
} catch (IOException e) {
throw new IllegalStateException("Error while configuring job properties", e);
}
}
/*
* @return instance of HiveAuthorizationProvider
*
* @throws HiveException
*
* @see org.apache.hcatalog.storagehandler.HCatStorageHandler#
* getAuthorizationProvider()
*/
@Override
public HiveAuthorizationProvider getAuthorizationProvider()
throws HiveException {
HBaseAuthorizationProvider hbaseAuth = new HBaseAuthorizationProvider();
hbaseAuth.init(getConf());
return hbaseAuth;
}
/*
* @param table
*
* @throws MetaException
*
* @see org.apache.hcatalog.storagehandler.HCatStorageHandler
* #commitCreateTable(org.apache.hadoop.hive.metastore.api.Table)
*/
@Override
public void commitCreateTable(Table table) throws MetaException {
}
/*
* @param instance of table
*
* @param deleteData
*
* @throws MetaException
*
* @see org.apache.hcatalog.storagehandler.HCatStorageHandler
* #commitDropTable(org.apache.hadoop.hive.metastore.api.Table, boolean)
*/
@Override
public void commitDropTable(Table tbl, boolean deleteData)
throws MetaException {
checkDeleteTable(tbl);
}
/*
* @param instance of table
*
* @throws MetaException
*
* @see org.apache.hcatalog.storagehandler.HCatStorageHandler
* #preCreateTable(org.apache.hadoop.hive.metastore.api.Table)
*/
@Override
public void preCreateTable(Table tbl) throws MetaException {
boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
hbaseConf = getConf();
if (tbl.getSd().getLocation() != null) {
throw new MetaException("LOCATION may not be specified for HBase.");
}
try {
String tableName = getFullyQualifiedHBaseTableName(tbl);
String hbaseColumnsMapping = tbl.getParameters().get(
HBaseSerDe.HBASE_COLUMNS_MAPPING);
if (hbaseColumnsMapping == null) {
throw new MetaException(
"No hbase.columns.mapping defined in table"
+ " properties.");
}
List<String> hbaseColumnFamilies = new ArrayList<String>();
List<String> hbaseColumnQualifiers = new ArrayList<String>();
List<byte[]> hbaseColumnFamiliesBytes = new ArrayList<byte[]>();
int iKey = HBaseUtil.parseColumnMapping(hbaseColumnsMapping,
hbaseColumnFamilies, hbaseColumnFamiliesBytes,
hbaseColumnQualifiers, null);
HTableDescriptor tableDesc;
Set<String> uniqueColumnFamilies = new HashSet<String>();
if (!getHBaseAdmin().tableExists(tableName)) {
// if it is not an external table then create one
if (!isExternal) {
// Create the column descriptors
tableDesc = new HTableDescriptor(tableName);
uniqueColumnFamilies.addAll(hbaseColumnFamilies);
uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
for (String columnFamily : uniqueColumnFamilies) {
HColumnDescriptor familyDesc = new HColumnDescriptor(Bytes
.toBytes(columnFamily));
familyDesc.setMaxVersions(Integer.MAX_VALUE);
tableDesc.addFamily(familyDesc);
}
getHBaseAdmin().createTable(tableDesc);
} else {
// an external table
throw new MetaException("HBase table " + tableName
+ " doesn't exist while the table is "
+ "declared as an external table.");
}
} else {
if (!isExternal) {
throw new MetaException("Table " + tableName
+ " already exists within HBase."
+ " Use CREATE EXTERNAL TABLE instead to"
+ " register it in HCatalog.");
}
// make sure the schema mapping is right
tableDesc = getHBaseAdmin().getTableDescriptor(
Bytes.toBytes(tableName));
for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
if (i == iKey) {
continue;
}
if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) {
throw new MetaException("Column Family "
+ hbaseColumnFamilies.get(i)
+ " is not defined in hbase table " + tableName);
}
}
}
// ensure the table is online
new HTable(hbaseConf, tableDesc.getName());
//Set up table in revision manager.
RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
rm.createTable(tableName, new ArrayList<String>(uniqueColumnFamilies));
} catch (MasterNotRunningException mnre) {
throw new MetaException(StringUtils.stringifyException(mnre));
} catch (IOException ie) {
throw new MetaException(StringUtils.stringifyException(ie));
} catch (IllegalArgumentException iae) {
throw new MetaException(StringUtils.stringifyException(iae));
}
}
/*
* @param table
*
* @throws MetaException
*
* @see org.apache.hcatalog.storagehandler.HCatStorageHandler
* #preDropTable(org.apache.hadoop.hive.metastore.api.Table)
*/
@Override
public void preDropTable(Table table) throws MetaException {
}
/*
* @param table
*
* @throws MetaException
*
* @see org.apache.hcatalog.storagehandler.HCatStorageHandler
* #rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table)
*/
@Override
public void rollbackCreateTable(Table table) throws MetaException {
checkDeleteTable(table);
}
/*
* @param table
*
* @throws MetaException
*
* @see org.apache.hcatalog.storagehandler.HCatStorageHandler
* #rollbackDropTable(org.apache.hadoop.hive.metastore.api.Table)
*/
@Override
public void rollbackDropTable(Table table) throws MetaException {
}
/*
* @return instance of HiveMetaHook
*
* @see org.apache.hcatalog.storagehandler.HCatStorageHandler#getMetaHook()
*/
@Override
public HiveMetaHook getMetaHook() {
return this;
}
private HBaseAdmin getHBaseAdmin() throws MetaException {
try {
if (admin == null) {
admin = new HBaseAdmin(this.getConf());
}
return admin;
} catch (MasterNotRunningException mnre) {
throw new MetaException(StringUtils.stringifyException(mnre));
} catch (ZooKeeperConnectionException zkce) {
throw new MetaException(StringUtils.stringifyException(zkce));
}
}
private String getFullyQualifiedHBaseTableName(Table tbl) {
String tableName = tbl.getParameters().get(HBaseSerDe.HBASE_TABLE_NAME);
if (tableName == null) {
tableName = tbl.getSd().getSerdeInfo().getParameters()
.get(HBaseSerDe.HBASE_TABLE_NAME);
}
if (tableName == null) {
if (tbl.getDbName().equals(MetaStoreUtils.DEFAULT_DATABASE_NAME)) {
tableName = tbl.getTableName();
} else {
tableName = tbl.getDbName() + "." + tbl.getTableName();
}
tableName = tableName.toLowerCase();
}
return tableName;
}
static String getFullyQualifiedHBaseTableName(HCatTableInfo tableInfo) {
String qualifiedName = tableInfo.getStorerInfo().getProperties()
.getProperty(HBaseSerDe.HBASE_TABLE_NAME);
if (qualifiedName == null) {
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
if ((databaseName == null)
|| (databaseName.equals(MetaStoreUtils.DEFAULT_DATABASE_NAME))) {
qualifiedName = tableName;
} else {
qualifiedName = databaseName + "." + tableName;
}
qualifiedName = qualifiedName.toLowerCase();
}
return qualifiedName;
}
@Override
public Class<? extends InputFormat> getInputFormatClass() {
return HBaseInputFormat.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return HBaseBaseOutputFormat.class;
}
/*
* @return subclass of SerDe
*
* @throws UnsupportedOperationException
*
* @see
* org.apache.hcatalog.storagehandler.HCatStorageHandler#getSerDeClass()
*/
@Override
public Class<? extends SerDe> getSerDeClass()
throws UnsupportedOperationException {
return HBaseSerDe.class;
}
public Configuration getJobConf() {
return jobConf;
}
@Override
public Configuration getConf() {
if (hbaseConf == null) {
hbaseConf = HBaseConfiguration.create();
}
return hbaseConf;
}
@Override
public void setConf(Configuration conf) {
//setConf is called both during DDL operations and mapred read/write jobs.
//Creating a copy of conf for DDL and adding hbase-default and hbase-site.xml to it.
//For jobs, maintaining a reference instead of cloning as we need to
// 1) add hbase delegation token to the Credentials.
// 2) set tmpjars on it. Putting in jobProperties does not get propagated to JobConf
// in case of InputFormat as they are maintained per partition.
//Not adding hbase-default.xml and hbase-site.xml to jobConf as it will override any
//hbase properties set in the JobConf by the user. In configureInputJobProperties and
//configureOutputJobProperties, we take care of adding the default properties
//that are not already present. TODO: Change to a copy for jobs after HCAT-308 is fixed.
jobConf = conf;
hbaseConf = RevisionManagerConfiguration.create(HBaseConfiguration.create(conf));
}
private void checkDeleteTable(Table table) throws MetaException {
boolean isExternal = MetaStoreUtils.isExternalTable(table);
String tableName = getFullyQualifiedHBaseTableName(table);
RevisionManager rm = null;
try {
if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
// we have created an HBase table, so we delete it to roll back;
if (getHBaseAdmin().isTableEnabled(tableName)) {
getHBaseAdmin().disableTable(tableName);
}
getHBaseAdmin().deleteTable(tableName);
//Drop table in revision manager.
rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
rm.dropTable(tableName);
}
} catch (IOException ie) {
throw new MetaException(StringUtils.stringifyException(ie));
} finally {
HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
}
}
/**
* Helper method for users to add the required depedency jars to distributed cache.
* @param conf
* @throws IOException
*/
private void addOutputDependencyJars(Configuration conf) throws IOException {
TableMapReduceUtil.addDependencyJars(conf,
//ZK
ZooKeeper.class,
//HBase
HTable.class,
//Hive
HiveException.class,
//HCatalog jar
HCatOutputFormat.class,
//hcat hbase storage handler jar
HBaseHCatStorageHandler.class,
//hive hbase storage handler jar
HBaseSerDe.class,
//hive jar
Table.class,
//libthrift jar
TBase.class,
//hbase jar
Bytes.class,
//thrift-fb303 .jar
FacebookBase.class,
//guava jar
ThreadFactoryBuilder.class);
}
/**
* Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
* if they are not already present in the jobConf.
* @param jobConf Job configuration
* @param newJobProperties Map to which new properties should be added
*/
private void addResources(Configuration jobConf,
Map<String, String> newJobProperties) {
Configuration conf = new Configuration(false);
HBaseConfiguration.addHbaseResources(conf);
RevisionManagerConfiguration.addResources(conf);
for (Entry<String, String> entry : conf) {
if (jobConf.get(entry.getKey()) == null)
newJobProperties.put(entry.getKey(), entry.getValue());
}
}
public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
//Default is false
String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
.getProperty(HBaseConstants.PROPERTY_BULK_OUTPUT_MODE_KEY,
"false");
return "true".equals(bulkMode);
}
private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
StringBuilder builder = new StringBuilder();
String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
if (outputColSchema == null) {
String[] splits = hbaseColumnMapping.split("[,]");
for (int i = 0; i < splits.length; i++) {
if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
builder.append(splits[i]).append(" ");
}
} else {
HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
HCatSchema tableSchema = tableInfo.getDataColumns();
List<String> outputFieldNames = outputSchema.getFieldNames();
List<Integer> outputColumnMapping = new ArrayList<Integer>();
for (String fieldName : outputFieldNames) {
int position = tableSchema.getPosition(fieldName);
outputColumnMapping.add(position);
}
List<String> columnFamilies = new ArrayList<String>();
List<String> columnQualifiers = new ArrayList<String>();
HBaseUtil.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
columnQualifiers, null);
for (int i = 0; i < outputColumnMapping.size(); i++) {
int cfIndex = outputColumnMapping.get(i);
String cf = columnFamilies.get(cfIndex);
// We skip the key column.
if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
String qualifier = columnQualifiers.get(i);
builder.append(cf);
builder.append(":");
if (qualifier != null) {
builder.append(qualifier);
}
builder.append(" ");
}
}
}
//Remove the extra space delimiter
builder.deleteCharAt(builder.length() - 1);
return builder.toString();
}
}