blob: aa0bb52a53145b940f11e2d87942e1b7392063d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.kudu;
import java.io.IOException;
import java.security.AccessController;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.security.auth.Subject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hive.kudu.KuduStorageHandler.KUDU_MASTER_ADDRS_KEY;
/**
* A collection of static utility methods for the Kudu Hive integration.
* This is useful for code sharing.
*/
public final class KuduHiveUtils {
private static final Logger LOG = LoggerFactory.getLogger(KuduHiveUtils.class);
private static final Text KUDU_TOKEN_KIND = new Text("kudu-authn-data");
private KuduHiveUtils() {}
/**
* Returns the union of the configuration and table properties with the
* table properties taking precedence.
*/
public static Configuration createOverlayedConf(Configuration conf, Properties tblProps) {
Configuration newConf = new Configuration(conf);
for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
newConf.set((String) prop.getKey(), (String) prop.getValue());
}
return newConf;
}
public static String getMasterAddresses(Configuration conf) throws IOException {
// Use the table property if it exists.
String masterAddresses = conf.get(KUDU_MASTER_ADDRS_KEY);
if (StringUtils.isEmpty(masterAddresses)) {
// Fall back to the default configuration.
masterAddresses = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_KUDU_MASTER_ADDRESSES_DEFAULT);
}
if (StringUtils.isEmpty(masterAddresses)) {
throw new IOException("Kudu master addresses are not specified in the table property (" +
KUDU_MASTER_ADDRS_KEY + "), or default configuration (" +
HiveConf.ConfVars.HIVE_KUDU_MASTER_ADDRESSES_DEFAULT.varname + ").");
}
return masterAddresses;
}
public static KuduClient getKuduClient(Configuration conf) throws IOException {
String masterAddresses = getMasterAddresses(conf);
KuduClient client = new KuduClient.KuduClientBuilder(masterAddresses).build();
importCredentialsFromCurrentSubject(client);
return client;
}
public static void importCredentialsFromCurrentSubject(KuduClient client) {
Subject subj = Subject.getSubject(AccessController.getContext());
if (subj == null) {
return;
}
Text service = new Text(client.getMasterAddressesAsString());
// Find the Hadoop credentials stored within the JAAS subject.
Set<Credentials> credSet = subj.getPrivateCredentials(Credentials.class);
for (Credentials creds : credSet) {
for (Token<?> tok : creds.getAllTokens()) {
if (!tok.getKind().equals(KUDU_TOKEN_KIND)) {
continue;
}
// Only import credentials relevant to the service corresponding to
// 'client'. This is necessary if we want to support a job which
// reads from one cluster and writes to another.
if (!tok.getService().equals(service)) {
LOG.debug("Not importing credentials for service " + service +
"(expecting service " + service + ")");
continue;
}
LOG.debug("Importing credentials for service " + service);
client.importAuthenticationCredentials(tok.getPassword());
return;
}
}
}
/* This method converts a Kudu type to the corresponding Hive type */
public static PrimitiveTypeInfo toHiveType(Type kuduType, ColumnTypeAttributes attributes)
throws SerDeException {
switch (kuduType) {
case BOOL:
return TypeInfoFactory.booleanTypeInfo;
case INT8:
return TypeInfoFactory.byteTypeInfo;
case INT16:
return TypeInfoFactory.shortTypeInfo;
case INT32:
return TypeInfoFactory.intTypeInfo;
case INT64:
return TypeInfoFactory.longTypeInfo;
case UNIXTIME_MICROS:
return TypeInfoFactory.timestampTypeInfo;
case DECIMAL:
return TypeInfoFactory.getDecimalTypeInfo(attributes.getPrecision(), attributes.getScale());
case FLOAT:
return TypeInfoFactory.floatTypeInfo;
case DOUBLE:
return TypeInfoFactory.doubleTypeInfo;
case STRING:
return TypeInfoFactory.stringTypeInfo;
case BINARY:
return TypeInfoFactory.binaryTypeInfo;
default:
throw new SerDeException("Unsupported column type: " + kuduType);
}
}
}