/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hcatalog.api;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.thrift.TException;

/**
 * The HCatClientHMSImpl is the Hive Metastore client based implementation of
 * HCatClient.
 */
public class HCatClientHMSImpl extends HCatClient {

    private HiveMetaStoreClient hmsClient;
    private Configuration  config;
    private HiveConf hiveConfig;

    @Override
    public List<String> listDatabaseNamesByPattern(String pattern)
            throws HCatException {
        List<String> dbNames = null;
        try {
            dbNames = hmsClient.getDatabases(pattern);
        } catch (MetaException exp) {
            throw new HCatException("MetaException while listing db names", exp);
        }
        return dbNames;
    }

    @Override
    public HCatDatabase getDatabase(String dbName) throws HCatException {
        HCatDatabase db = null;
        try {
            Database hiveDB = hmsClient.getDatabase(checkDB(dbName));
            if (hiveDB != null) {
                db = new HCatDatabase(hiveDB);
            }
        } catch (NoSuchObjectException exp) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while fetching database", exp);
        } catch (MetaException exp) {
            throw new HCatException("MetaException while fetching database",
                    exp);
        } catch (TException exp) {
            throw new ConnectionFailureException(
                    "TException while fetching database", exp);
        }
        return db;
    }

    @Override
    public void createDatabase(HCatCreateDBDesc dbInfo) throws HCatException {
        try {
            hmsClient.createDatabase(dbInfo.toHiveDb());
        } catch (AlreadyExistsException exp) {
            if (!dbInfo.getIfNotExists()) {
                throw new HCatException(
                        "AlreadyExistsException while creating database", exp);
            }
        } catch (InvalidObjectException exp) {
            throw new HCatException(
                    "InvalidObjectException while creating database", exp);
        } catch (MetaException exp) {
            throw new HCatException("MetaException while creating database",
                    exp);
        } catch (TException exp) {
            throw new ConnectionFailureException(
                    "TException while creating database", exp);
        }
    }

    @Override
    public void dropDatabase(String dbName, boolean ifExists, DropDBMode mode)
            throws HCatException {
        boolean isCascade = mode.toString().equalsIgnoreCase("cascade");
        try {
            hmsClient.dropDatabase(checkDB(dbName), true, ifExists, isCascade);
        } catch (NoSuchObjectException e) {
            if (!ifExists) {
                throw new ObjectNotFoundException(
                        "NoSuchObjectException while dropping db.", e);
            }
        } catch (InvalidOperationException e) {
            throw new HCatException(
                    "InvalidOperationException while dropping db.", e);
        } catch (MetaException e) {
            throw new HCatException("MetaException while dropping db.", e);
        } catch (TException e) {
            throw new ConnectionFailureException("TException while dropping db.",
                    e);
        }
    }

    @Override
    public List<String> listTableNamesByPattern(String dbName,
            String tablePattern) throws HCatException {
        List<String> tableNames = null;
        try {
            tableNames = hmsClient.getTables(checkDB(dbName), tablePattern);
        } catch (MetaException e) {
            throw new HCatException(
                    "MetaException while fetching table names.", e);
        }
        return tableNames;
    }

    @Override
    public HCatTable getTable(String dbName, String tableName)
            throws HCatException {
        HCatTable table = null;
        try {
            Table hiveTable = hmsClient.getTable(checkDB(dbName), tableName);
            if (hiveTable != null) {
                table = new HCatTable(hiveTable);
            }
        } catch (MetaException e) {
            throw new HCatException("MetaException while fetching table.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while fetching table.", e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while fetching table.", e);
        }
        return table;
    }

    @Override
    public void createTable(HCatCreateTableDesc createTableDesc)
            throws HCatException {
        try {
            hmsClient.createTable(createTableDesc.toHiveTable(hiveConfig));
        } catch (AlreadyExistsException e) {
            if (!createTableDesc.getIfNotExists()) {
                throw new HCatException(
                        "AlreadyExistsException while creating table.", e);
            }
        } catch (InvalidObjectException e) {
            throw new HCatException(
                    "InvalidObjectException while creating table.", e);
        } catch (MetaException e) {
            throw new HCatException("MetaException while creating table.", e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while creating table.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while creating table.", e);
        } catch (IOException e) {
            throw new HCatException("IOException while creating hive conf.", e);
        }

    }

    @Override
    public void updateTableSchema(String dbName, String tableName, List<HCatFieldSchema> columnSchema) throws HCatException {
        try {
            Table table = hmsClient.getTable(dbName, tableName);
            table.getSd().setCols(HCatSchemaUtils.getFieldSchemas(columnSchema));
            hmsClient.alter_table(dbName, tableName, table);
        }
        catch (InvalidOperationException e) {
            throw new HCatException("InvalidOperationException while updating table schema.", e);
        }
        catch (MetaException e) {
            throw new HCatException("MetaException while updating table schema.", e);
        }
        catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while updating table schema.", e);
        }
        catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while updating table schema.", e);
        }
    }

    @Override
    public void createTableLike(String dbName, String existingTblName,
            String newTableName, boolean ifNotExists, boolean isExternal,
            String location) throws HCatException {

        Table hiveTable = getHiveTableLike(checkDB(dbName), existingTblName,
                newTableName, ifNotExists, location);
        if (hiveTable != null) {
            try {
                hmsClient.createTable(hiveTable);
            } catch (AlreadyExistsException e) {
                if (!ifNotExists) {
                    throw new HCatException(
                            "A table already exists with the name "
                                    + newTableName, e);
                }
            } catch (InvalidObjectException e) {
                throw new HCatException(
                        "InvalidObjectException in create table like command.",
                        e);
            } catch (MetaException e) {
                throw new HCatException(
                        "MetaException in create table like command.", e);
            } catch (NoSuchObjectException e) {
                throw new ObjectNotFoundException(
                        "NoSuchObjectException in create table like command.",
                        e);
            } catch (TException e) {
                throw new ConnectionFailureException(
                        "TException in create table like command.", e);
            }
        }
    }

    @Override
    public void dropTable(String dbName, String tableName, boolean ifExists)
            throws HCatException {
        try {
            hmsClient.dropTable(checkDB(dbName), tableName,true, ifExists);
        } catch (NoSuchObjectException e) {
            if (!ifExists) {
                throw new ObjectNotFoundException(
                        "NoSuchObjectException while dropping table.", e);
            }
        } catch (MetaException e) {
            throw new HCatException("MetaException while dropping table.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while dropping table.", e);
        }
    }

    @Override
    public void renameTable(String dbName, String oldName, String newName)
            throws HCatException {
        Table tbl;
        try {
            Table oldtbl = hmsClient.getTable(checkDB(dbName), oldName);
            if (oldtbl != null) {
                // TODO : Should be moved out.
                if (oldtbl
                        .getParameters()
                        .get(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE) != null) {
                    throw new HCatException(
                            "Cannot use rename command on a non-native table");
                }
                tbl = new Table(oldtbl);
                tbl.setTableName(newName);
                hmsClient.alter_table(checkDB(dbName), oldName, tbl);
            }
        } catch (MetaException e) {
            throw new HCatException("MetaException while renaming table", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while renaming table", e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while renaming table", e);
        } catch (InvalidOperationException e) {
            throw new HCatException(
                    "InvalidOperationException while renaming table", e);
        }
    }

    @Override
    public List<HCatPartition> getPartitions(String dbName, String tblName)
            throws HCatException {
        List<HCatPartition> hcatPtns = new ArrayList<HCatPartition>();
        try {
            List<Partition> hivePtns = hmsClient.listPartitions(
                    checkDB(dbName), tblName, (short) -1);
            for (Partition ptn : hivePtns) {
                hcatPtns.add(new HCatPartition(ptn));
            }
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while retrieving partition.", e);
        } catch (MetaException e) {
            throw new HCatException(
                    "MetaException while retrieving partition.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while retrieving partition.", e);
        }
        return hcatPtns;
    }

    @Override
    public List<HCatPartition> getPartitions(String dbName, String tblName, Map<String, String> partitionSpec) throws HCatException {
        return listPartitionsByFilter(dbName, tblName, getFilterString(partitionSpec));
    }

    private static String getFilterString(Map<String, String> partitionSpec) {
        final String AND = " AND ";
        StringBuilder filter = new StringBuilder();
        for (Map.Entry<String, String> entry : partitionSpec.entrySet()) {
            filter.append(entry.getKey()).append("=").append("\"").append(entry.getValue()).append("\"").append(AND);
        }

        int length = filter.toString().length();
        if (length > 0)
            filter.delete(length - AND.length(), length);

        return filter.toString();
    }

    @Override
    public HCatPartition getPartition(String dbName, String tableName,
            Map<String, String> partitionSpec) throws HCatException {
        HCatPartition partition = null;
        try {
            List<HCatFieldSchema> partitionColumns = getTable(checkDB(dbName), tableName).getPartCols();
            if (partitionColumns.size() != partitionSpec.size()) {
                throw new HCatException("Partition-spec doesn't have the right number of partition keys.");
            }

            ArrayList<String> ptnValues = new ArrayList<String>();
            for (HCatFieldSchema partitionColumn : partitionColumns) {
                String partKey = partitionColumn.getName();
                if (partitionSpec.containsKey(partKey)) {
                    ptnValues.add(partitionSpec.get(partKey)); // Partition-keys added in order.
                }
                else {
                    throw new HCatException("Invalid partition-key specified: " + partKey);
                }
            }
            Partition hivePartition = hmsClient.getPartition(checkDB(dbName),
                    tableName, ptnValues);
            if (hivePartition != null) {
                partition = new HCatPartition(hivePartition);
            }
        } catch (MetaException e) {
            throw new HCatException(
                    "MetaException while retrieving partition.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while retrieving partition.", e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while retrieving partition.", e);
        }
        return partition;
    }

    @Override
    public void addPartition(HCatAddPartitionDesc partInfo)
            throws HCatException {
        Table tbl = null;
        try {
            tbl = hmsClient.getTable(partInfo.getDatabaseName(),
                    partInfo.getTableName());
            // TODO: Should be moved out.
            if (tbl.getPartitionKeysSize() == 0) {
                throw new HCatException("The table " + partInfo.getTableName()
                        + " is not partitioned.");
            }

            hmsClient.add_partition(partInfo.toHivePartition(tbl));
        } catch (InvalidObjectException e) {
            throw new HCatException(
                    "InvalidObjectException while adding partition.", e);
        } catch (AlreadyExistsException e) {
            throw new HCatException(
                    "AlreadyExistsException while adding partition.", e);
        } catch (MetaException e) {
            throw new HCatException("MetaException while adding partition.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while adding partition.", e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException("The table " + partInfo.getTableName()
                    + " is could not be found.", e);
        }
    }

    @Override
    public void dropPartitions(String dbName, String tableName,
            Map<String, String> partitionSpec, boolean ifExists)
            throws HCatException {
        try {
            dbName = checkDB(dbName);
            List<Partition> partitions = hmsClient.listPartitionsByFilter(dbName, tableName,
                    getFilterString(partitionSpec), (short)-1);

            for (Partition partition : partitions) {
                dropPartition(partition, ifExists);
            }

        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while dropping partition. " +
                            "Either db(" + dbName + ") or table(" + tableName + ") missing.", e);
        } catch (MetaException e) {
            throw new HCatException("MetaException while dropping partition.",
                    e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while dropping partition.", e);
        }
    }

    private void dropPartition(Partition partition, boolean ifExists)
            throws HCatException, MetaException, TException {
        try {
            hmsClient.dropPartition(partition.getDbName(), partition.getTableName(), partition.getValues());
        } catch (NoSuchObjectException e) {
            if (!ifExists) {
                throw new ObjectNotFoundException(
                        "NoSuchObjectException while dropping partition: " + partition.getValues(), e);
            }
        }
    }

    @Override
    public List<HCatPartition> listPartitionsByFilter(String dbName,
            String tblName, String filter) throws HCatException {
        List<HCatPartition> hcatPtns = new ArrayList<HCatPartition>();
        try {
            List<Partition> hivePtns = hmsClient.listPartitionsByFilter(
                    checkDB(dbName), tblName, filter, (short) -1);
            for (Partition ptn : hivePtns) {
                hcatPtns.add(new HCatPartition(ptn));
            }
        } catch (MetaException e) {
            throw new HCatException("MetaException while fetching partitions.",
                    e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while fetching partitions.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while fetching partitions.", e);
        }
        return hcatPtns;
    }

    @Override
    public void markPartitionForEvent(String dbName, String tblName,
            Map<String, String> partKVs, PartitionEventType eventType)
            throws HCatException {
        try {
            hmsClient.markPartitionForEvent(checkDB(dbName), tblName, partKVs,
                    eventType);
        } catch (MetaException e) {
            throw new HCatException(
                    "MetaException while marking partition for event.", e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while marking partition for event.",
                    e);
        } catch (UnknownTableException e) {
            throw new HCatException(
                    "UnknownTableException while marking partition for event.",
                    e);
        } catch (UnknownDBException e) {
            throw new HCatException(
                    "UnknownDBException while marking partition for event.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while marking partition for event.", e);
        } catch (InvalidPartitionException e) {
            throw new HCatException(
                    "InvalidPartitionException while marking partition for event.",
                    e);
        } catch (UnknownPartitionException e) {
            throw new HCatException(
                    "UnknownPartitionException while marking partition for event.",
                    e);
        }
    }

    @Override
    public boolean isPartitionMarkedForEvent(String dbName, String tblName,
            Map<String, String> partKVs, PartitionEventType eventType)
            throws HCatException {
        boolean isMarked = false;
        try {
            isMarked = hmsClient.isPartitionMarkedForEvent(checkDB(dbName),
                    tblName, partKVs, eventType);
        } catch (MetaException e) {
            throw new HCatException(
                    "MetaException while checking partition for event.", e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while checking partition for event.",
                    e);
        } catch (UnknownTableException e) {
            throw new HCatException(
                    "UnknownTableException while checking partition for event.",
                    e);
        } catch (UnknownDBException e) {
            throw new HCatException(
                    "UnknownDBException while checking partition for event.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while checking partition for event.", e);
        } catch (InvalidPartitionException e) {
            throw new HCatException(
                    "InvalidPartitionException while checking partition for event.",
                    e);
        } catch (UnknownPartitionException e) {
            throw new HCatException(
                    "UnknownPartitionException while checking partition for event.",
                    e);
        }
        return isMarked;
    }

    @Override
    public String getDelegationToken(String owner,
            String renewerKerberosPrincipalName) throws HCatException {
        String token = null;
        try {
            token = hmsClient.getDelegationToken(owner,
                    renewerKerberosPrincipalName);
        } catch (MetaException e) {
            throw new HCatException(
                    "MetaException while getting delegation token.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while getting delegation token.", e);
        }

        return token;
    }

    @Override
    public long renewDelegationToken(String tokenStrForm) throws HCatException {
        long time = 0;
        try {
            time = hmsClient.renewDelegationToken(tokenStrForm);
        } catch (MetaException e) {
            throw new HCatException(
                    "MetaException while renewing delegation token.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while renewing delegation token.", e);
        }

        return time;
    }

    @Override
    public void cancelDelegationToken(String tokenStrForm)
            throws HCatException {
        try {
            hmsClient.cancelDelegationToken(tokenStrForm);
        } catch (MetaException e) {
            throw new HCatException(
                    "MetaException while canceling delegation token.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while canceling delegation token.", e);
        }
    }

    /*
     * @param conf /* @throws HCatException,ConnectionFailureException
     *
     * @see
     * org.apache.hcatalog.api.HCatClient#initialize(org.apache.hadoop.conf.
     * Configuration)
     */
    @Override
    void initialize(Configuration conf) throws HCatException {
        this.config = conf;
        try {
            hiveConfig = HCatUtil.getHiveConf(config);
            hmsClient = HCatUtil.getHiveClient(hiveConfig);
        } catch (MetaException exp) {
            throw new HCatException("MetaException while creating HMS client",
                    exp);
        } catch (IOException exp) {
            throw new HCatException("IOException while creating HMS client",
                    exp);
        }

    }

    private Table getHiveTableLike(String dbName, String existingTblName,
            String newTableName, boolean isExternal, String location)
            throws HCatException {
        Table oldtbl = null;
        Table newTable = null;
        try {
            oldtbl = hmsClient.getTable(checkDB(dbName), existingTblName);
        } catch (MetaException e1) {
            throw new HCatException(
                    "MetaException while retrieving existing table.", e1);
        } catch (TException e1) {
            throw new ConnectionFailureException(
                    "TException while retrieving existing table.", e1);
        } catch (NoSuchObjectException e1) {
            throw new ObjectNotFoundException(
                    "NoSuchObjectException while retrieving existing table.",
                    e1);
        }
        if (oldtbl != null) {
            newTable = new Table();
            newTable.setTableName(newTableName);
            newTable.setDbName(dbName);
            StorageDescriptor sd = new StorageDescriptor(oldtbl.getSd());
            newTable.setSd(sd);
            newTable.setParameters(oldtbl.getParameters());
            if (location == null) {
                newTable.getSd().setLocation(oldtbl.getSd().getLocation());
            } else {
                newTable.getSd().setLocation(location);
            }
            if (isExternal) {
                newTable.putToParameters("EXTERNAL", "TRUE");
                newTable.setTableType(TableType.EXTERNAL_TABLE.toString());
            } else {
                newTable.getParameters().remove("EXTERNAL");
            }
            // set create time
            newTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
            newTable.setLastAccessTimeIsSet(false);
        }
        return newTable;
    }

    /*
     * @throws HCatException
     *
     * @see org.apache.hcatalog.api.HCatClient#closeClient()
     */
    @Override
    public void close() throws HCatException {
        hmsClient.close();
    }

    private String checkDB(String name) {
        if (StringUtils.isEmpty(name)) {
            return MetaStoreUtils.DEFAULT_DATABASE_NAME;
        } else {
            return name;
        }
    }

    /*
     * @param partInfoList
     *  @return The size of the list of partitions.
     * @throws HCatException,ConnectionFailureException
     * @see org.apache.hcatalog.api.HCatClient#addPartitions(java.util.List)
     */
    @Override
    public int addPartitions(List<HCatAddPartitionDesc> partInfoList)
            throws HCatException {
        int numPartitions = -1;
        if ((partInfoList == null) || (partInfoList.size() == 0)) {
            throw new HCatException("The partition list is null or empty.");
        }

        Table tbl = null;
        try {
            tbl = hmsClient.getTable(partInfoList.get(0).getDatabaseName(),
                    partInfoList.get(0).getTableName());
            ArrayList<Partition> ptnList = new ArrayList<Partition>();
            for (HCatAddPartitionDesc desc : partInfoList) {
                ptnList.add(desc.toHivePartition(tbl));
            }
            numPartitions = hmsClient.add_partitions(ptnList);
        } catch (InvalidObjectException e) {
            throw new HCatException(
                    "InvalidObjectException while adding partition.", e);
        } catch (AlreadyExistsException e) {
            throw new HCatException(
                    "AlreadyExistsException while adding partition.", e);
        } catch (MetaException e) {
            throw new HCatException("MetaException while adding partition.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while adding partition.", e);
        } catch (NoSuchObjectException e) {
            throw new ObjectNotFoundException("The table "
                    + partInfoList.get(0).getTableName()
                    + " is could not be found.", e);
        }
        return numPartitions;
    }

    @Override
    public String getMessageBusTopicName(String dbName, String tableName) throws HCatException {
        try {
            return hmsClient.getTable(dbName, tableName).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
        }
        catch (MetaException e) {
            throw new HCatException("MetaException while retrieving JMS Topic name.", e);
        } catch (TException e) {
            throw new ConnectionFailureException(
                    "TException while retrieving JMS Topic name.", e);
        } catch (NoSuchObjectException e) {
            throw new HCatException("Could not find DB:" + dbName + " or Table:" + tableName, e);
        }
    }

}
