blob: 4ad62d7af7162f8a3f2a8a2c9591f5ae99de83f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.catalog;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.io.Text;
import org.apache.hive.hcatalog.api.HCatClient;
import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* An implementation of CatalogService that uses Hive Meta Store (HCatalog)
* as the backing Catalog registry.
*/
public class HiveCatalogService extends AbstractCatalogService {
private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogService.class);
public static final String CREATE_TIME = "falcon.create_time";
public static final String UPDATE_TIME = "falcon.update_time";
public static final String PARTITION_DOES_NOT_EXIST = "Partition does not exist";
public static HiveConf createHiveConf(Configuration conf,
String metastoreUrl) throws IOException {
HiveConf hcatConf = new HiveConf(conf, HiveConf.class);
hcatConf.set("hive.metastore.local", "false");
hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
HCatSemanticAnalyzer.class.getName());
hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
return hcatConf;
}
/**
* This is used from with in an oozie job.
*
* @param conf conf object
* @param metastoreUrl metastore uri
* @return hive metastore client handle
* @throws FalconException
*/
private static HiveMetaStoreClient createClient(Configuration conf,
String metastoreUrl) throws FalconException {
try {
LOG.info("Creating HCatalog client object for metastore {} using conf {}",
metastoreUrl, conf.toString());
final Credentials credentials = getCredentials(conf);
Configuration jobConf = credentials != null ? copyCredentialsToConf(conf, credentials) : conf;
HiveConf hcatConf = createHiveConf(jobConf, metastoreUrl);
if (UserGroupInformation.isSecurityEnabled()) {
hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
conf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname));
hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
ugi.addCredentials(credentials); // credentials cannot be null
}
return new HiveMetaStoreClient(hcatConf);
} catch (Exception e) {
throw new FalconException("Exception creating HiveMetaStoreClient: " + e.getMessage(), e);
}
}
private static JobConf copyCredentialsToConf(Configuration conf, Credentials credentials) {
JobConf jobConf = new JobConf(conf);
jobConf.setCredentials(credentials);
return jobConf;
}
private static Credentials getCredentials(Configuration conf) throws IOException {
final String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
if (tokenFile == null) {
return null;
}
try {
LOG.info("Adding credentials/delegation tokens from token file={} to conf", tokenFile);
Credentials credentials = Credentials.readTokenStorageFile(new File(tokenFile), conf);
LOG.info("credentials numberOfTokens={}, numberOfSecretKeys={}",
credentials.numberOfTokens(), credentials.numberOfSecretKeys());
return credentials;
} catch (IOException e) {
LOG.warn("error while fetching credentials from {}", tokenFile);
}
return null;
}
/**
* This is used from with in falcon namespace.
*
* @param conf conf
* @param catalogUrl metastore uri
* @return hive metastore client handle
* @throws FalconException
*/
private static HiveMetaStoreClient createProxiedClient(Configuration conf,
String catalogUrl) throws FalconException {
try {
final HiveConf hcatConf = createHiveConf(conf, catalogUrl);
UserGroupInformation proxyUGI = CurrentUser.getProxyUGI();
addSecureCredentialsAndToken(conf, hcatConf, proxyUGI);
LOG.info("Creating HCatalog client object for {}", catalogUrl);
return proxyUGI.doAs(new PrivilegedExceptionAction<HiveMetaStoreClient>() {
public HiveMetaStoreClient run() throws Exception {
return new HiveMetaStoreClient(hcatConf);
}
});
} catch (Exception e) {
throw new FalconException("Exception creating Proxied HiveMetaStoreClient: " + e.getMessage(), e);
}
}
private static void addSecureCredentialsAndToken(Configuration conf,
HiveConf hcatConf,
UserGroupInformation proxyUGI) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
String metaStoreServicePrincipal = conf.get(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
metaStoreServicePrincipal);
hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
Token<DelegationTokenIdentifier> delegationTokenId = getDelegationToken(
hcatConf, metaStoreServicePrincipal);
proxyUGI.addToken(delegationTokenId);
}
}
private static Token<DelegationTokenIdentifier> getDelegationToken(HiveConf hcatConf,
String metaStoreServicePrincipal)
throws IOException {
LOG.debug("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
HCatClient hcatClient = HCatClient.create(hcatConf);
String delegationToken = hcatClient.getDelegationToken(
CurrentUser.getUser(), metaStoreServicePrincipal);
hcatConf.set("hive.metastore.token.signature", "FalconService");
Token<DelegationTokenIdentifier> delegationTokenId = new Token<DelegationTokenIdentifier>();
delegationTokenId.decodeFromUrlString(delegationToken);
delegationTokenId.setService(new Text("FalconService"));
LOG.info("Created delegation token={}", delegationToken);
return delegationTokenId;
}
@Override
public boolean isAlive(Configuration conf, final String catalogUrl) throws FalconException {
LOG.info("Checking if the service is alive for: {}", catalogUrl);
try {
HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
Database database = client.getDatabase("default");
return database != null;
} catch (Exception e) {
throw new FalconException("Exception checking if the service is alive:" + e.getMessage(), e);
}
}
@Override
public boolean dbExists(Configuration conf, final String catalogUrl,
final String databaseName) throws FalconException {
LOG.info("Checking if the db exists: {}", databaseName);
try {
HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
Database db = client.getDatabase(databaseName);
return db != null;
} catch (NoSuchObjectException e) {
return false;
} catch (Exception e) {
throw new FalconException("Exception checking if the db exists:" + e.getMessage(), e);
}
}
@Override
public boolean tableExists(Configuration conf, final String catalogUrl, final String database,
final String tableName) throws FalconException {
LOG.info("Checking if the table exists: {}", tableName);
try {
HiveMetaStoreClient client = createProxiedClient(conf, catalogUrl);
Table table = client.getTable(database, tableName);
return table != null;
} catch (NoSuchObjectException e) {
return false;
} catch (Exception e) {
throw new FalconException("Exception checking if the table exists:" + e.getMessage(), e);
}
}
@Override
public boolean isTableExternal(Configuration conf, String catalogUrl, String database,
String tableName) throws FalconException {
LOG.info("Checking if the table is external: {}", tableName);
try {
HiveMetaStoreClient client = createClient(conf, catalogUrl);
Table table = client.getTable(database, tableName);
return table.getTableType().equals(TableType.EXTERNAL_TABLE.name());
} catch (Exception e) {
throw new FalconException("Exception checking if the table is external:" + e.getMessage(), e);
}
}
@Override
public List<CatalogPartition> listPartitions(Configuration conf, String catalogUrl,
String database, String tableName,
List<String> values) throws FalconException {
LOG.info("List partitions for: {}, partition filter: {}", tableName, values);
try {
List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
HiveMetaStoreClient client = createClient(conf, catalogUrl);
List<Partition> hCatPartitions = client.listPartitions(database, tableName, values, (short) -1);
for (Partition hCatPartition : hCatPartitions) {
LOG.debug("Partition: " + hCatPartition.getValues());
CatalogPartition partition = createCatalogPartition(hCatPartition);
catalogPartitionList.add(partition);
}
return catalogPartitionList;
} catch (Exception e) {
throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
}
}
@Override
public List<CatalogPartition> listPartitionsByFilter(Configuration conf, String catalogUrl,
String database, String tableName,
String filter) throws FalconException {
LOG.info("List partitions for: {}, partition filter: {}", tableName, filter);
try {
List<CatalogPartition> catalogPartitionList = new ArrayList<CatalogPartition>();
HiveMetaStoreClient client = createClient(conf, catalogUrl);
List<Partition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter, (short) -1);
for (Partition hCatPartition : hCatPartitions) {
LOG.info("Partition: " + hCatPartition.getValues());
CatalogPartition partition = createCatalogPartition(hCatPartition);
catalogPartitionList.add(partition);
}
return catalogPartitionList;
} catch (Exception e) {
throw new FalconException("Exception listing partitions:" + e.getMessage(), e);
}
}
private CatalogPartition createCatalogPartition(Partition hCatPartition) {
final CatalogPartition catalogPartition = new CatalogPartition();
catalogPartition.setDatabaseName(hCatPartition.getDbName());
catalogPartition.setTableName(hCatPartition.getTableName());
catalogPartition.setValues(hCatPartition.getValues());
catalogPartition.setInputFormat(hCatPartition.getSd().getInputFormat());
catalogPartition.setOutputFormat(hCatPartition.getSd().getOutputFormat());
catalogPartition.setLocation(hCatPartition.getSd().getLocation());
catalogPartition.setSerdeInfo(hCatPartition.getSd().getSerdeInfo().getSerializationLib());
catalogPartition.setCreateTime(hCatPartition.getCreateTime());
catalogPartition.setLastAccessTime(hCatPartition.getLastAccessTime());
Map<String, String> params = hCatPartition.getParameters();
if (params != null) {
String size = hCatPartition.getParameters().get("totalSize");
if (StringUtils.isNotBlank(size)) {
catalogPartition.setSize(Long.parseLong(size));
}
}
return catalogPartition;
}
//Drop single partition
@Override
public boolean dropPartition(Configuration conf, String catalogUrl,
String database, String tableName,
List<String> partitionValues, boolean deleteData) throws FalconException {
LOG.info("Dropping partition for: {}, partition: {}", tableName, partitionValues);
try {
HiveMetaStoreClient client = createClient(conf, catalogUrl);
return client.dropPartition(database, tableName, partitionValues, deleteData);
} catch (Exception e) {
throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
}
}
@Override
public void dropPartitions(Configuration conf, String catalogUrl,
String database, String tableName,
List<String> partitionValues, boolean deleteData) throws FalconException {
LOG.info("Dropping partitions for: {}, partitions: {}", tableName, partitionValues);
try {
HiveMetaStoreClient client = createClient(conf, catalogUrl);
List<Partition> partitions = client.listPartitions(database, tableName, partitionValues, (short) -1);
for (Partition part : partitions) {
LOG.info("Dropping partition for: {}, partition: {}", tableName, part.getValues());
client.dropPartition(database, tableName, part.getValues(), deleteData);
}
} catch (Exception e) {
throw new FalconException("Exception dropping partitions:" + e.getMessage(), e);
}
}
@Override
public CatalogPartition getPartition(Configuration conf, String catalogUrl,
String database, String tableName,
List<String> partitionValues) throws FalconException {
LOG.info("Fetch partition for: {}, partition spec: {}", tableName, partitionValues);
try {
HiveMetaStoreClient client = createClient(conf, catalogUrl);
Partition hCatPartition = client.getPartition(database, tableName, partitionValues);
return createCatalogPartition(hCatPartition);
} catch (NoSuchObjectException nsoe) {
throw new FalconException(PARTITION_DOES_NOT_EXIST + ":" + nsoe.getMessage(), nsoe);
} catch (Exception e) {
throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
}
}
@Override
public List<String> getPartitionColumns(Configuration conf, String catalogUrl, String database,
String tableName) throws FalconException {
LOG.info("Fetching partition columns of table: " + tableName);
try {
HiveMetaStoreClient client = createClient(conf, catalogUrl);
Table table = client.getTable(database, tableName);
List<String> partCols = new ArrayList<String>();
for (FieldSchema part : table.getPartitionKeys()) {
partCols.add(part.getName());
}
return partCols;
} catch (Exception e) {
throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e);
}
}
@Override
public void addPartition(Configuration conf, String catalogUrl, String database,
String tableName, List<String> partValues, String location) throws FalconException {
LOG.info("Adding partition {} for {}.{} with location {}", partValues, database, tableName, location);
try {
HiveMetaStoreClient client = createClient(conf, catalogUrl);
Table table = client.getTable(database, tableName);
org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition();
part.setDbName(database);
part.setTableName(tableName);
part.setValues(partValues);
part.setSd(table.getSd());
part.getSd().setLocation(location);
part.setParameters(table.getParameters());
if (part.getParameters() == null) {
part.setParameters(new HashMap<String, String>());
}
part.getParameters().put(CREATE_TIME, String.valueOf(System.currentTimeMillis()));
client.add_partition(part);
} catch (Exception e) {
throw new FalconException("Exception adding partition: " + e.getMessage(), e);
}
}
@Override
public void updatePartition(Configuration conf, String catalogUrl, String database,
String tableName, List<String> partValues, String location) throws FalconException {
LOG.info("Updating partition {} of {}.{} with location {}", partValues, database, tableName, location);
try {
HiveMetaStoreClient client = createClient(conf, catalogUrl);
Table table = client.getTable(database, tableName);
org.apache.hadoop.hive.metastore.api.Partition part = new org.apache.hadoop.hive.metastore.api.Partition();
part.setDbName(database);
part.setTableName(tableName);
part.setValues(partValues);
part.setSd(table.getSd());
part.getSd().setLocation(location);
part.setParameters(table.getParameters());
if (part.getParameters() == null) {
part.setParameters(new HashMap<String, String>());
}
part.getParameters().put(UPDATE_TIME, String.valueOf(System.currentTimeMillis()));
client.alter_partition(database, tableName, part);
} catch (Exception e) {
throw new FalconException("Exception updating partition: " + e.getMessage(), e);
}
}
}