blob: 10446e163a3671799bf4eb16b5a112bf4d7cd1e3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.common;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hcatalog.data.Pair;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hcatalog.mapreduce.FosterStorageHandler;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.HCatStorageHandler;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.apache.hcatalog.mapreduce.PartInfo;
import org.apache.hcatalog.mapreduce.StorerInfo;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.LoginException;
public class HCatUtil {
private static final Logger LOG = LoggerFactory.getLogger(HCatUtil.class);
private static volatile HiveClientCache hiveClientCache;
private final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2 * 60;
public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
if (j.getConfiguration().get("mapred.task.id", "").equals("")) {
return false;
}
return true;
}
public static String serialize(Serializable obj) throws IOException {
if (obj == null) {
return "";
}
try {
ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
objStream.writeObject(obj);
objStream.close();
return encodeBytes(serialObj.toByteArray());
} catch (Exception e) {
throw new IOException("Serialization error: " + e.getMessage(), e);
}
}
public static Object deserialize(String str) throws IOException {
if (str == null || str.length() == 0) {
return null;
}
try {
ByteArrayInputStream serialObj = new ByteArrayInputStream(
decodeBytes(str));
ObjectInputStream objStream = new ObjectInputStream(serialObj);
return objStream.readObject();
} catch (Exception e) {
throw new IOException("Deserialization error: " + e.getMessage(), e);
}
}
public static String encodeBytes(byte[] bytes) {
StringBuffer strBuf = new StringBuffer();
for (int i = 0; i < bytes.length; i++) {
strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a')));
strBuf.append((char) (((bytes[i]) & 0xF) + ('a')));
}
return strBuf.toString();
}
public static byte[] decodeBytes(String str) {
byte[] bytes = new byte[str.length() / 2];
for (int i = 0; i < str.length(); i += 2) {
char c = str.charAt(i);
bytes[i / 2] = (byte) ((c - 'a') << 4);
c = str.charAt(i + 1);
bytes[i / 2] += (c - 'a');
}
return bytes;
}
public static List<HCatFieldSchema> getHCatFieldSchemaList(
FieldSchema... fields) throws HCatException {
List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>(
fields.length);
for (FieldSchema f : fields) {
result.add(HCatSchemaUtils.getHCatFieldSchema(f));
}
return result;
}
public static List<HCatFieldSchema> getHCatFieldSchemaList(
List<FieldSchema> fields) throws HCatException {
if (fields == null) {
return null;
} else {
List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>();
for (FieldSchema f : fields) {
result.add(HCatSchemaUtils.getHCatFieldSchema(f));
}
return result;
}
}
public static HCatSchema extractSchema(Table table) throws HCatException {
return new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols()));
}
public static HCatSchema extractSchema(Partition partition) throws HCatException {
return new HCatSchema(HCatUtil.getHCatFieldSchemaList(partition.getCols()));
}
public static List<FieldSchema> getFieldSchemaList(
List<HCatFieldSchema> hcatFields) {
if (hcatFields == null) {
return null;
} else {
List<FieldSchema> result = new ArrayList<FieldSchema>();
for (HCatFieldSchema f : hcatFields) {
result.add(HCatSchemaUtils.getFieldSchema(f));
}
return result;
}
}
public static Table getTable(HiveMetaStoreClient client, String dbName, String tableName)
throws NoSuchObjectException, TException, MetaException {
return new Table(client.getTable(dbName, tableName));
}
public static HCatSchema getTableSchemaWithPtnCols(Table table) throws IOException {
HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols()));
if (table.getPartitionKeys().size() != 0) {
// add partition keys to table schema
// NOTE : this assumes that we do not ever have ptn keys as columns
// inside the table schema as well!
for (FieldSchema fs : table.getPartitionKeys()) {
tableSchema.append(HCatSchemaUtils.getHCatFieldSchema(fs));
}
}
return tableSchema;
}
/**
* return the partition columns from a table instance
*
* @param table the instance to extract partition columns from
* @return HCatSchema instance which contains the partition columns
* @throws IOException
*/
public static HCatSchema getPartitionColumns(Table table) throws IOException {
HCatSchema cols = new HCatSchema(new LinkedList<HCatFieldSchema>());
if (table.getPartitionKeys().size() != 0) {
for (FieldSchema fs : table.getPartitionKeys()) {
cols.append(HCatSchemaUtils.getHCatFieldSchema(fs));
}
}
return cols;
}
/**
* Validate partition schema, checks if the column types match between the
* partition and the existing table schema. Returns the list of columns
* present in the partition but not in the table.
*
* @param table the table
* @param partitionSchema the partition schema
* @return the list of newly added fields
* @throws IOException Signals that an I/O exception has occurred.
*/
public static List<FieldSchema> validatePartitionSchema(Table table,
HCatSchema partitionSchema) throws IOException {
Map<String, FieldSchema> partitionKeyMap = new HashMap<String, FieldSchema>();
for (FieldSchema field : table.getPartitionKeys()) {
partitionKeyMap.put(field.getName().toLowerCase(), field);
}
List<FieldSchema> tableCols = table.getCols();
List<FieldSchema> newFields = new ArrayList<FieldSchema>();
for (int i = 0; i < partitionSchema.getFields().size(); i++) {
FieldSchema field = HCatSchemaUtils.getFieldSchema(partitionSchema
.getFields().get(i));
FieldSchema tableField;
if (i < tableCols.size()) {
tableField = tableCols.get(i);
if (!tableField.getName().equalsIgnoreCase(field.getName())) {
throw new HCatException(
ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH,
"Expected column <" + tableField.getName()
+ "> at position " + (i + 1)
+ ", found column <" + field.getName()
+ ">");
}
} else {
tableField = partitionKeyMap.get(field.getName().toLowerCase());
if (tableField != null) {
throw new HCatException(
ErrorType.ERROR_SCHEMA_PARTITION_KEY, "Key <"
+ field.getName() + ">");
}
}
if (tableField == null) {
// field present in partition but not in table
newFields.add(field);
} else {
// field present in both. validate type has not changed
TypeInfo partitionType = TypeInfoUtils
.getTypeInfoFromTypeString(field.getType());
TypeInfo tableType = TypeInfoUtils
.getTypeInfoFromTypeString(tableField.getType());
if (!partitionType.equals(tableType)) {
throw new HCatException(
ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, "Column <"
+ field.getName() + ">, expected <"
+ tableType.getTypeName() + ">, got <"
+ partitionType.getTypeName() + ">");
}
}
}
return newFields;
}
/**
* Test if the first FsAction is more permissive than the second. This is
* useful in cases where we want to ensure that a file owner has more
* permissions than the group they belong to, for eg. More completely(but
* potentially more cryptically) owner-r >= group-r >= world-r : bitwise
* and-masked with 0444 => 444 >= 440 >= 400 >= 000 owner-w >= group-w >=
* world-w : bitwise and-masked with &0222 => 222 >= 220 >= 200 >= 000
* owner-x >= group-x >= world-x : bitwise and-masked with &0111 => 111 >=
* 110 >= 100 >= 000
*
* @return true if first FsAction is more permissive than the second, false
* if not.
*/
public static boolean validateMorePermissive(FsAction first, FsAction second) {
if ((first == FsAction.ALL) || (second == FsAction.NONE)
|| (first == second)) {
return true;
}
switch (first) {
case READ_EXECUTE:
return ((second == FsAction.READ) || (second == FsAction.EXECUTE));
case READ_WRITE:
return ((second == FsAction.READ) || (second == FsAction.WRITE));
case WRITE_EXECUTE:
return ((second == FsAction.WRITE) || (second == FsAction.EXECUTE));
}
return false;
}
/**
* Ensure that read or write permissions are not granted without also
* granting execute permissions. Essentially, r-- , rw- and -w- are invalid,
* r-x, -wx, rwx, ---, --x are valid
*
* @param perms The FsAction to verify
* @return true if the presence of read or write permission is accompanied
* by execute permissions
*/
public static boolean validateExecuteBitPresentIfReadOrWrite(FsAction perms) {
if ((perms == FsAction.READ) || (perms == FsAction.WRITE)
|| (perms == FsAction.READ_WRITE)) {
return false;
}
return true;
}
public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> getJobTrackerDelegationToken(
Configuration conf, String userName) throws Exception {
// LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")");
JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class));
Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl
.getDelegationToken(new Text(userName));
// LOG.info("got "+t);
return t;
// return null;
}
public static Token<? extends AbstractDelegationTokenIdentifier> extractThriftToken(
String tokenStrForm, String tokenSignature) throws MetaException,
TException, IOException {
// LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")");
Token<? extends AbstractDelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
t.decodeFromUrlString(tokenStrForm);
t.setService(new Text(tokenSignature));
// LOG.info("returning "+t);
return t;
}
/**
* Create an instance of a storage handler defined in storerInfo. If one cannot be found
* then FosterStorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.
* This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.
* @param conf job's configuration will be used to configure the Configurable StorageHandler
* @param storerInfo StorerInfo to definining the StorageHandler and InputFormat, OutputFormat and SerDe
* @return storageHandler instance
* @throws IOException
*/
public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException {
return getStorageHandler(conf,
storerInfo.getStorageHandlerClass(),
storerInfo.getSerdeClass(),
storerInfo.getIfClass(),
storerInfo.getOfClass());
}
public static HCatStorageHandler getStorageHandler(Configuration conf, PartInfo partitionInfo) throws IOException {
return HCatUtil.getStorageHandler(
conf,
partitionInfo.getStorageHandlerClassName(),
partitionInfo.getSerdeClassName(),
partitionInfo.getInputFormatClassName(),
partitionInfo.getOutputFormatClassName());
}
/**
* Create an instance of a storage handler. If storageHandler == null,
* then surrrogate StorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.
* This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.
* @param conf job's configuration will be used to configure the Configurable StorageHandler
* @param storageHandler fully qualified class name of the desired StorageHandle instance
* @param serDe fully qualified class name of the desired SerDe instance
* @param inputFormat fully qualified class name of the desired InputFormat instance
* @param outputFormat fully qualified class name of the desired outputFormat instance
* @return storageHandler instance
* @throws IOException
*/
public static HCatStorageHandler getStorageHandler(Configuration conf,
String storageHandler,
String serDe,
String inputFormat,
String outputFormat)
throws IOException {
if ((storageHandler == null) || (storageHandler.equals(FosterStorageHandler.class.getName()))) {
try {
FosterStorageHandler fosterStorageHandler =
new FosterStorageHandler(inputFormat, outputFormat, serDe);
fosterStorageHandler.setConf(conf);
return fosterStorageHandler;
} catch (ClassNotFoundException e) {
throw new IOException("Failed to load "
+ "foster storage handler", e);
}
}
try {
Class<? extends HCatStorageHandler> handlerClass =
(Class<? extends HCatStorageHandler>) Class
.forName(storageHandler, true, JavaUtils.getClassLoader());
return (HCatStorageHandler) ReflectionUtils.newInstance(
handlerClass, conf);
} catch (ClassNotFoundException e) {
throw new IOException("Error in loading storage handler."
+ e.getMessage(), e);
}
}
public static Pair<String, String> getDbAndTableName(String tableName) throws IOException {
String[] dbTableNametokens = tableName.split("\\.");
if (dbTableNametokens.length == 1) {
return new Pair<String, String>(MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName);
} else if (dbTableNametokens.length == 2) {
return new Pair<String, String>(dbTableNametokens[0], dbTableNametokens[1]);
} else {
throw new IOException("tableName expected in the form "
+ "<databasename>.<table name> or <table name>. Got " + tableName);
}
}
public static Map<String, String>
getInputJobProperties(HCatStorageHandler storageHandler,
InputJobInfo inputJobInfo) {
TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
storageHandler.getInputFormatClass(),
storageHandler.getOutputFormatClass(),
inputJobInfo.getTableInfo().getStorerInfo().getProperties());
if (tableDesc.getJobProperties() == null) {
tableDesc.setJobProperties(new HashMap<String, String>());
}
Map<String, String> jobProperties = new HashMap<String, String>();
try {
tableDesc.getJobProperties().put(
HCatConstants.HCAT_KEY_JOB_INFO,
HCatUtil.serialize(inputJobInfo));
storageHandler.configureInputJobProperties(tableDesc,
jobProperties);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to configure StorageHandler", e);
}
return jobProperties;
}
public static void
configureOutputStorageHandler(HCatStorageHandler storageHandler,
JobContext context,
OutputJobInfo outputJobInfo) {
//TODO replace IgnoreKeyTextOutputFormat with a
//HiveOutputFormatWrapper in StorageHandler
TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
storageHandler.getInputFormatClass(),
IgnoreKeyTextOutputFormat.class,
outputJobInfo.getTableInfo().getStorerInfo().getProperties());
if (tableDesc.getJobProperties() == null)
tableDesc.setJobProperties(new HashMap<String, String>());
for (Map.Entry<String, String> el : context.getConfiguration()) {
tableDesc.getJobProperties().put(el.getKey(), el.getValue());
}
Map<String, String> jobProperties = new HashMap<String, String>();
try {
tableDesc.getJobProperties().put(
HCatConstants.HCAT_KEY_OUTPUT_INFO,
HCatUtil.serialize(outputJobInfo));
storageHandler.configureOutputJobProperties(tableDesc,
jobProperties);
for (Map.Entry<String, String> el : jobProperties.entrySet()) {
context.getConfiguration().set(el.getKey(), el.getValue());
}
} catch (IOException e) {
throw new IllegalStateException(
"Failed to configure StorageHandler", e);
}
}
/**
* Replace the contents of dest with the contents of src
* @param src
* @param dest
*/
public static void copyConf(Configuration src, Configuration dest) {
dest.clear();
for (Map.Entry<String, String> el : src) {
dest.set(el.getKey(), el.getValue());
}
}
/**
* Get or create a hive client depending on whether it exits in cache or not
* @param hiveConf The hive configuration
* @return the client
* @throws MetaException When HiveMetaStoreClient couldn't be created
* @throws IOException
*/
public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf)
throws MetaException, IOException {
// Singleton behaviour: create the cache instance if required. The cache needs to be created lazily and
// using the expiry time available in hiveConf.
if (hiveClientCache == null) {
synchronized (HiveMetaStoreClient.class) {
if (hiveClientCache == null) {
hiveClientCache = new HiveClientCache(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME,
DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS));
}
}
}
try {
return hiveClientCache.get(hiveConf);
} catch (LoginException e) {
throw new IOException("Couldn't create hiveMetaStoreClient, Error getting UGI for user", e);
}
}
public static void closeHiveClientQuietly(HiveMetaStoreClient client) {
try {
if (client != null)
client.close();
} catch (Exception e) {
LOG.debug("Error closing metastore client. Ignored the error.", e);
}
}
public static HiveConf getHiveConf(Configuration conf)
throws IOException {
HiveConf hiveConf = new HiveConf(conf, HCatUtil.class);
//copy the hive conf into the job conf and restore it
//in the backend context
if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) {
conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
HCatUtil.serialize(hiveConf.getAllProperties()));
} else {
//Copy configuration properties into the hive conf
Properties properties = (Properties) HCatUtil.deserialize(
conf.get(HCatConstants.HCAT_KEY_HIVE_CONF));
for (Map.Entry<Object, Object> prop : properties.entrySet()) {
if (prop.getValue() instanceof String) {
hiveConf.set((String) prop.getKey(), (String) prop.getValue());
} else if (prop.getValue() instanceof Integer) {
hiveConf.setInt((String) prop.getKey(),
(Integer) prop.getValue());
} else if (prop.getValue() instanceof Boolean) {
hiveConf.setBoolean((String) prop.getKey(),
(Boolean) prop.getValue());
} else if (prop.getValue() instanceof Long) {
hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue());
} else if (prop.getValue() instanceof Float) {
hiveConf.setFloat((String) prop.getKey(),
(Float) prop.getValue());
}
}
}
if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
hiveConf.set("hive.metastore.token.signature",
conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
}
return hiveConf;
}
public static JobConf getJobConfFromContext(JobContext jobContext) {
JobConf jobConf;
// we need to convert the jobContext into a jobConf
// 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat)
// begin conversion..
jobConf = new JobConf(jobContext.getConfiguration());
// ..end of conversion
return jobConf;
}
public static void copyJobPropertiesToJobConf(
Map<String, String> jobProperties, JobConf jobConf) {
for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
jobConf.set(entry.getKey(), entry.getValue());
}
}
}