blob: 1a6e7286d5427c42fd67c549a0e4c40594620313 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.accumulo;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableInputFormat;
import org.apache.hadoop.hive.accumulo.mr.HiveAccumuloTableOutputFormat;
import org.apache.hadoop.hive.accumulo.predicate.AccumuloPredicateHandler;
import org.apache.hadoop.hive.accumulo.serde.AccumuloIndexParameters;
import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe;
import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* Create table mapping to Accumulo for Hive. Handle predicate pushdown if necessary.
*/
public class AccumuloStorageHandler extends DefaultStorageHandler implements HiveMetaHook,
HiveStoragePredicateHandler {
private static final Logger LOG = LoggerFactory.getLogger(AccumuloStorageHandler.class);
private static final String DEFAULT_PREFIX = "default";
protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance();
protected AccumuloConnectionParameters connectionParams;
protected Configuration conf;
protected HiveAccumuloHelper helper = new HiveAccumuloHelper();
/**
* Push down table properties into the JobConf.
*
* @param desc
* Hive table description
* @param jobProps
* Properties that will be added to the JobConf by Hive
*/
@Override
public void configureTableJobProperties(TableDesc desc, Map<String, String> jobProps) {
// Should not be getting invoked, configureInputJobProperties or configureOutputJobProperties
// should be invoked instead.
configureInputJobProperties(desc, jobProps);
configureOutputJobProperties(desc, jobProps);
}
protected String getTableName(Table table) throws MetaException {
// Use TBLPROPERTIES
String tableName = table.getParameters().get(AccumuloSerDeParameters.TABLE_NAME);
if (null != tableName) {
return tableName;
}
// Then try SERDEPROPERTIES
tableName = table.getSd().getSerdeInfo().getParameters()
.get(AccumuloSerDeParameters.TABLE_NAME);
if (null != tableName) {
return tableName;
}
// Use the hive table name, ignoring the default database
if (DEFAULT_PREFIX.equals(table.getDbName())) {
return table.getTableName();
} else {
return table.getDbName() + "." + table.getTableName();
}
}
protected String getIndexTableName(Table table) {
// Use TBLPROPERTIES
String idxTableName = table.getParameters().get(AccumuloIndexParameters.INDEXTABLE_NAME);
if (null != idxTableName) {
return idxTableName;
}
// Then try SERDEPROPERTIES
idxTableName = table.getSd().getSerdeInfo().getParameters()
.get(AccumuloIndexParameters.INDEXTABLE_NAME);
return idxTableName;
}
protected String getTableName(TableDesc tableDesc) {
Properties props = tableDesc.getProperties();
String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME);
if (null != tableName) {
return tableName;
}
tableName = props.getProperty(hive_metastoreConstants.META_TABLE_NAME);
if (tableName.startsWith(DEFAULT_PREFIX + ".")) {
return tableName.substring(DEFAULT_PREFIX.length() + 1);
}
return tableName;
}
protected String getColumnTypes(TableDesc tableDesc) {
Properties props = tableDesc.getProperties();
String columnTypes = props.getProperty(serdeConstants.LIST_COLUMN_TYPES);
return columnTypes;
}
protected String getIndexTableName(TableDesc tableDesc) {
Properties props = tableDesc.getProperties();
String tableName = props.getProperty(AccumuloIndexParameters.INDEXTABLE_NAME);
return tableName;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
connectionParams = new AccumuloConnectionParameters(conf);
}
@Override
public Class<? extends AbstractSerDe> getSerDeClass() {
return AccumuloSerDe.class;
}
@Override
public HiveMetaHook getMetaHook() {
return this;
}
@Override
public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
return null;
}
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
Properties props = tableDesc.getProperties();
jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS,
props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS));
String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME);
if (null == tableName) {
tableName = getTableName(tableDesc);
}
jobProperties.put(AccumuloSerDeParameters.TABLE_NAME,
tableName);
String useIterators = props.getProperty(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY);
if (useIterators != null) {
if (!"true".equalsIgnoreCase(useIterators) && !"false".equalsIgnoreCase(useIterators)) {
throw new IllegalArgumentException("Expected value of true or false for "
+ AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY);
}
jobProperties.put(AccumuloSerDeParameters.ITERATOR_PUSHDOWN_KEY, useIterators);
}
String storageType = props.getProperty(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE);
if (null != storageType) {
jobProperties.put(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE, storageType);
}
String authValue = props.getProperty(AccumuloSerDeParameters.AUTHORIZATIONS_KEY);
if (null != authValue) {
jobProperties.put(AccumuloSerDeParameters.AUTHORIZATIONS_KEY, authValue);
}
LOG.info("Computed input job properties of " + jobProperties);
Configuration conf = getConf();
helper.loadDependentJars(conf);
// When Kerberos is enabled, we have to add the Accumulo delegation token to the
// Job so that it gets passed down to the YARN/Tez task.
if (connectionParams.useSasl()) {
try {
// Open an accumulo connection
Connector conn = connectionParams.getConnector();
// Convert the Accumulo token in a Hadoop token
Token<? extends TokenIdentifier> accumuloToken = helper.setConnectorInfoForInputAndOutput(connectionParams, conn, conf);
// Probably don't have a JobConf here, but we can still try...
if (conf instanceof JobConf) {
// Convert the Accumulo token in a Hadoop token
LOG.debug("Adding Hadoop Token for Accumulo to Job's Credentials: " + accumuloToken);
// Add the Hadoop token to the JobConf
JobConf jobConf = (JobConf) conf;
jobConf.getCredentials().addToken(accumuloToken.getService(), accumuloToken);
LOG.info("All job tokens: " + jobConf.getCredentials().getAllTokens());
} else {
LOG.info("Don't have a JobConf, so we cannot persist Tokens. Have to do it later.");
}
} catch (Exception e) {
throw new RuntimeException("Failed to obtain DelegationToken for "
+ connectionParams.getAccumuloUserName(), e);
}
}
}
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
Properties props = tableDesc.getProperties();
// Adding these job properties will make them available to the OutputFormat in checkOutputSpecs
String colMap = props.getProperty(AccumuloSerDeParameters.COLUMN_MAPPINGS);
jobProperties.put(AccumuloSerDeParameters.COLUMN_MAPPINGS, colMap);
String tableName = props.getProperty(AccumuloSerDeParameters.TABLE_NAME);
if (null == tableName) {
tableName = getTableName(tableDesc);
}
jobProperties.put(AccumuloSerDeParameters.TABLE_NAME, tableName);
String indexTable = props.getProperty(AccumuloIndexParameters.INDEXTABLE_NAME);
if (null == indexTable) {
indexTable = getIndexTableName(tableDesc);
}
if ( null != indexTable) {
jobProperties.put(AccumuloIndexParameters.INDEXTABLE_NAME, indexTable);
String indexColumns = props.getProperty(AccumuloIndexParameters.INDEXED_COLUMNS);
jobProperties.put(AccumuloIndexParameters.INDEXED_COLUMNS,
getIndexedColFamQuals(tableDesc, indexColumns, colMap));
}
if (props.containsKey(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE)) {
jobProperties.put(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE,
props.getProperty(AccumuloSerDeParameters.DEFAULT_STORAGE_TYPE));
}
if (props.containsKey(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY)) {
jobProperties.put(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY,
props.getProperty(AccumuloSerDeParameters.VISIBILITY_LABEL_KEY));
}
}
private String getIndexedColFamQuals(TableDesc tableDesc, String indexColumns, String colMap) {
StringBuilder sb = new StringBuilder();
String cols = indexColumns;
String hiveColString = tableDesc.getProperties().getProperty(serdeConstants.LIST_COLUMNS);
// if there are actual accumulo index columns defined then build
// the comma separated list of accumulo columns
if (cols == null || cols.isEmpty() || "*".equals(indexColumns)) {
// skip rowid
cols = hiveColString.substring(hiveColString.indexOf(',')+1);
}
String[] hiveTypes = tableDesc.getProperties()
.getProperty(serdeConstants.LIST_COLUMN_TYPES).split(":");
String[] accCols = colMap.split(",");
String[] hiveCols = hiveColString.split(",");
Set<String> indexSet = new HashSet<String>();
for (String idx : cols.split(",")) {
indexSet.add(idx.trim());
}
for (int i = 0; i < hiveCols.length; i++) {
if (indexSet.contains(hiveCols[i].trim())) {
if (sb.length() > 0) {
sb.append(",");
}
sb.append(accCols[i].trim() + ":" + AccumuloIndexLexicoder.getRawType(hiveTypes[i]));
}
}
return sb.toString();
}
@SuppressWarnings("rawtypes")
@Override
public Class<? extends InputFormat> getInputFormatClass() {
return HiveAccumuloTableInputFormat.class;
}
@Override
@SuppressWarnings("rawtypes")
public Class<? extends OutputFormat> getOutputFormatClass() {
return HiveAccumuloTableOutputFormat.class;
}
@Override
public void preCreateTable(Table table) throws MetaException {
boolean isExternal = isExternalTable(table);
if (table.getSd().getLocation() != null) {
throw new MetaException("Location can't be specified for Accumulo");
}
Map<String, String> serdeParams = table.getSd().getSerdeInfo().getParameters();
String columnMapping = serdeParams.get(AccumuloSerDeParameters.COLUMN_MAPPINGS);
if (columnMapping == null) {
throw new MetaException(AccumuloSerDeParameters.COLUMN_MAPPINGS
+ " missing from SERDEPROPERTIES");
}
try {
String tblName = getTableName(table);
Connector connector = connectionParams.getConnector();
TableOperations tableOpts = connector.tableOperations();
// Attempt to create the table, taking EXTERNAL into consideration
if (!tableOpts.exists(tblName)) {
if (!isExternal) {
tableOpts.create(tblName);
} else {
throw new MetaException("Accumulo table " + tblName
+ " doesn't exist even though declared external");
}
} else {
if (!isExternal) {
throw new MetaException("Table " + tblName
+ " already exists in Accumulo. Use CREATE EXTERNAL TABLE to register with Hive.");
}
}
String idxTable = getIndexTableName(table);
if (idxTable != null && !idxTable.isEmpty()) {
// create the index table if it does not exist
if (!tableOpts.exists(idxTable)) {
tableOpts.create(idxTable);
}
}
} catch (AccumuloSecurityException e) {
throw new MetaException(StringUtils.stringifyException(e));
} catch (TableExistsException e) {
throw new MetaException(StringUtils.stringifyException(e));
} catch (AccumuloException e) {
throw new MetaException(StringUtils.stringifyException(e));
}
}
protected boolean isExternalTable(Table table) {
return MetaStoreUtils.isExternalTable(table);
}
@Override
public void rollbackCreateTable(Table table) throws MetaException {
// Same as commitDropTable where we always delete the data (accumulo table)
commitDropTable(table, true);
}
@Override
public void commitCreateTable(Table table) throws MetaException {
// do nothing
}
@Override
public void commitDropTable(Table table, boolean deleteData) throws MetaException {
String tblName = getTableName(table);
if (!isExternalTable(table)) {
try {
if (deleteData) {
TableOperations tblOpts = connectionParams.getConnector().tableOperations();
if (tblOpts.exists(tblName)) {
tblOpts.delete(tblName);
}
}
} catch (AccumuloException e) {
throw new MetaException(StringUtils.stringifyException(e));
} catch (AccumuloSecurityException e) {
throw new MetaException(StringUtils.stringifyException(e));
} catch (TableNotFoundException e) {
throw new MetaException(StringUtils.stringifyException(e));
}
}
}
@Override
public void preDropTable(Table table) throws MetaException {
// do nothing
}
@Override
public void rollbackDropTable(Table table) throws MetaException {
// do nothing
}
@Override
public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deserializer,
ExprNodeDesc desc) {
if (!(deserializer instanceof AccumuloSerDe)) {
throw new RuntimeException("Expected an AccumuloSerDe but got "
+ deserializer.getClass().getName());
}
AccumuloSerDe serDe = (AccumuloSerDe) deserializer;
if (serDe.getIteratorPushdown()) {
return predicateHandler.decompose(conf, desc);
} else {
LOG.info("Set to ignore Accumulo iterator pushdown, skipping predicate handler.");
return null;
}
}
@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
helper.loadDependentJars(jobConf);
Properties tblProperties = tableDesc.getProperties();
AccumuloSerDeParameters serDeParams = null;
try {
serDeParams =
new AccumuloSerDeParameters(jobConf, tblProperties, AccumuloSerDe.class.getName());
} catch (SerDeException e) {
LOG.error("Could not instantiate AccumuloSerDeParameters", e);
return;
}
try {
serDeParams.getRowIdFactory().addDependencyJars(jobConf);
} catch (IOException e) {
LOG.error("Could not add necessary dependencies for "
+ serDeParams.getRowIdFactory().getClass(), e);
}
// When Kerberos is enabled, we have to add the Accumulo delegation token to the
// Job so that it gets passed down to the YARN/Tez task.
if (connectionParams.useSasl()) {
try {
// Open an accumulo connection
Connector conn = connectionParams.getConnector();
// Convert the Accumulo token in a Hadoop token
Token<? extends TokenIdentifier> accumuloToken = helper.setConnectorInfoForInputAndOutput(connectionParams, conn, jobConf);
LOG.debug("Adding Hadoop Token for Accumulo to Job's Credentials");
// Add the Hadoop token to the JobConf
helper.mergeTokenIntoJobConf(jobConf, accumuloToken);
LOG.debug("All job tokens: " + jobConf.getCredentials().getAllTokens());
} catch (Exception e) {
throw new RuntimeException("Failed to obtain DelegationToken for "
+ connectionParams.getAccumuloUserName(), e);
}
}
}
}