/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hcatalog.common;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hcatalog.data.Pair;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hcatalog.mapreduce.FosterStorageHandler;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.HCatStorageHandler;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
import org.apache.hcatalog.mapreduce.PartInfo;
import org.apache.hcatalog.mapreduce.StorerInfo;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.security.auth.login.LoginException;

public class HCatUtil {

    private static final Logger LOG = LoggerFactory.getLogger(HCatUtil.class);
    private static volatile HiveClientCache hiveClientCache;
    private final static int DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS = 2*60;

    public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
        if (j.getConfiguration().get("mapred.task.id", "").equals("")) {
            return false;
        }
        return true;
    }

    public static String serialize(Serializable obj) throws IOException {
        if (obj == null) {
            return "";
        }
        try {
            ByteArrayOutputStream serialObj = new ByteArrayOutputStream();
            ObjectOutputStream objStream = new ObjectOutputStream(serialObj);
            objStream.writeObject(obj);
            objStream.close();
            return encodeBytes(serialObj.toByteArray());
        } catch (Exception e) {
            throw new IOException("Serialization error: " + e.getMessage(), e);
        }
    }

    public static Object deserialize(String str) throws IOException {
        if (str == null || str.length() == 0) {
            return null;
        }
        try {
            ByteArrayInputStream serialObj = new ByteArrayInputStream(
                    decodeBytes(str));
            ObjectInputStream objStream = new ObjectInputStream(serialObj);
            return objStream.readObject();
        } catch (Exception e) {
            throw new IOException("Deserialization error: " + e.getMessage(), e);
        }
    }

    public static String encodeBytes(byte[] bytes) {
        StringBuffer strBuf = new StringBuffer();

        for (int i = 0; i < bytes.length; i++) {
            strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a')));
            strBuf.append((char) (((bytes[i]) & 0xF) + ('a')));
        }

        return strBuf.toString();
    }

    public static byte[] decodeBytes(String str) {
        byte[] bytes = new byte[str.length() / 2];
        for (int i = 0; i < str.length(); i += 2) {
            char c = str.charAt(i);
            bytes[i / 2] = (byte) ((c - 'a') << 4);
            c = str.charAt(i + 1);
            bytes[i / 2] += (c - 'a');
        }
        return bytes;
    }

    public static List<HCatFieldSchema> getHCatFieldSchemaList(
            FieldSchema... fields) throws HCatException {
        List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>(
                fields.length);

        for (FieldSchema f : fields) {
            result.add(HCatSchemaUtils.getHCatFieldSchema(f));
        }

        return result;
    }

    public static List<HCatFieldSchema> getHCatFieldSchemaList(
            List<FieldSchema> fields) throws HCatException {
        if (fields == null) {
            return null;
        } else {
            List<HCatFieldSchema> result = new ArrayList<HCatFieldSchema>();
            for (FieldSchema f : fields) {
                result.add(HCatSchemaUtils.getHCatFieldSchema(f));
            }
            return result;
        }
    }

    public static HCatSchema extractSchema(Table table) throws HCatException {
        return new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols()));
    }

    public static HCatSchema extractSchema(Partition partition) throws HCatException {
        return new HCatSchema(HCatUtil.getHCatFieldSchemaList(partition.getCols()));
    }

    public static List<FieldSchema> getFieldSchemaList(
            List<HCatFieldSchema> hcatFields) {
        if (hcatFields == null) {
            return null;
        } else {
            List<FieldSchema> result = new ArrayList<FieldSchema>();
            for (HCatFieldSchema f : hcatFields) {
                result.add(HCatSchemaUtils.getFieldSchema(f));
            }
            return result;
        }
    }

    public static Table getTable(HiveMetaStoreClient client, String dbName, String tableName)
        throws NoSuchObjectException, TException, MetaException {
      return new Table(client.getTable(dbName, tableName));
    }

    public static HCatSchema getTableSchemaWithPtnCols(Table table) throws IOException {
        HCatSchema tableSchema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getCols()));

        if (table.getPartitionKeys().size() != 0) {

            // add partition keys to table schema
            // NOTE : this assumes that we do not ever have ptn keys as columns
            // inside the table schema as well!
            for (FieldSchema fs : table.getPartitionKeys()) {
                tableSchema.append(HCatSchemaUtils.getHCatFieldSchema(fs));
            }
        }
        return tableSchema;
    }

    /**
     * return the partition columns from a table instance
     *
     * @param table the instance to extract partition columns from
     * @return HCatSchema instance which contains the partition columns
     * @throws IOException
     */
    public static HCatSchema getPartitionColumns(Table table) throws IOException {
        HCatSchema cols = new HCatSchema(new LinkedList<HCatFieldSchema>());
        if (table.getPartitionKeys().size() != 0) {
            for (FieldSchema fs : table.getPartitionKeys()) {
                cols.append(HCatSchemaUtils.getHCatFieldSchema(fs));
            }
        }
        return cols;
    }

    /**
     * Validate partition schema, checks if the column types match between the
     * partition and the existing table schema. Returns the list of columns
     * present in the partition but not in the table.
     *
     * @param table the table
     * @param partitionSchema the partition schema
     * @return the list of newly added fields
     * @throws IOException Signals that an I/O exception has occurred.
     */
    public static List<FieldSchema> validatePartitionSchema(Table table,
            HCatSchema partitionSchema) throws IOException {
        Map<String, FieldSchema> partitionKeyMap = new HashMap<String, FieldSchema>();

        for (FieldSchema field : table.getPartitionKeys()) {
            partitionKeyMap.put(field.getName().toLowerCase(), field);
        }

        List<FieldSchema> tableCols = table.getCols();
        List<FieldSchema> newFields = new ArrayList<FieldSchema>();

        for (int i = 0; i < partitionSchema.getFields().size(); i++) {

            FieldSchema field = HCatSchemaUtils.getFieldSchema(partitionSchema
                    .getFields().get(i));

            FieldSchema tableField;
            if (i < tableCols.size()) {
                tableField = tableCols.get(i);

                if (!tableField.getName().equalsIgnoreCase(field.getName())) {
                    throw new HCatException(
                            ErrorType.ERROR_SCHEMA_COLUMN_MISMATCH,
                            "Expected column <" + tableField.getName()
                                    + "> at position " + (i + 1)
                                    + ", found column <" + field.getName()
                                    + ">");
                }
            } else {
                tableField = partitionKeyMap.get(field.getName().toLowerCase());

                if (tableField != null) {
                    throw new HCatException(
                            ErrorType.ERROR_SCHEMA_PARTITION_KEY, "Key <"
                                    + field.getName() + ">");
                }
            }

            if (tableField == null) {
                // field present in partition but not in table
                newFields.add(field);
            } else {
                // field present in both. validate type has not changed
                TypeInfo partitionType = TypeInfoUtils
                        .getTypeInfoFromTypeString(field.getType());
                TypeInfo tableType = TypeInfoUtils
                        .getTypeInfoFromTypeString(tableField.getType());

                if (!partitionType.equals(tableType)) {
                    throw new HCatException(
                            ErrorType.ERROR_SCHEMA_TYPE_MISMATCH, "Column <"
                                    + field.getName() + ">, expected <"
                                    + tableType.getTypeName() + ">, got <"
                                    + partitionType.getTypeName() + ">");
                }
            }
        }

        return newFields;
    }

    /**
     * Test if the first FsAction is more permissive than the second. This is
     * useful in cases where we want to ensure that a file owner has more
     * permissions than the group they belong to, for eg. More completely(but
     * potentially more cryptically) owner-r >= group-r >= world-r : bitwise
     * and-masked with 0444 => 444 >= 440 >= 400 >= 000 owner-w >= group-w >=
     * world-w : bitwise and-masked with &0222 => 222 >= 220 >= 200 >= 000
     * owner-x >= group-x >= world-x : bitwise and-masked with &0111 => 111 >=
     * 110 >= 100 >= 000
     *
     * @return true if first FsAction is more permissive than the second, false
     *         if not.
     */
    public static boolean validateMorePermissive(FsAction first, FsAction second) {
        if ((first == FsAction.ALL) || (second == FsAction.NONE)
                || (first == second)) {
            return true;
        }
        switch (first) {
            case READ_EXECUTE:
                return ((second == FsAction.READ) || (second == FsAction.EXECUTE));
            case READ_WRITE:
                return ((second == FsAction.READ) || (second == FsAction.WRITE));
            case WRITE_EXECUTE:
                return ((second == FsAction.WRITE) || (second == FsAction.EXECUTE));
        }
        return false;
    }

    /**
     * Ensure that read or write permissions are not granted without also
     * granting execute permissions. Essentially, r-- , rw- and -w- are invalid,
     * r-x, -wx, rwx, ---, --x are valid
     *
     * @param perms The FsAction to verify
     * @return true if the presence of read or write permission is accompanied
     *         by execute permissions
     */
    public static boolean validateExecuteBitPresentIfReadOrWrite(FsAction perms) {
        if ((perms == FsAction.READ) || (perms == FsAction.WRITE)
                || (perms == FsAction.READ_WRITE)) {
            return false;
        }
        return true;
    }

    public static Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> getJobTrackerDelegationToken(
            Configuration conf, String userName) throws Exception {
        // LOG.info("getJobTrackerDelegationToken("+conf+","+userName+")");
        JobClient jcl = new JobClient(new JobConf(conf, HCatOutputFormat.class));
        Token<org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier> t = jcl
                .getDelegationToken(new Text(userName));
        // LOG.info("got "+t);
        return t;

        // return null;
    }

    public static Token<? extends AbstractDelegationTokenIdentifier> extractThriftToken(
            String tokenStrForm, String tokenSignature) throws MetaException,
            TException, IOException {
        // LOG.info("extractThriftToken("+tokenStrForm+","+tokenSignature+")");
        Token<? extends AbstractDelegationTokenIdentifier> t = new Token<DelegationTokenIdentifier>();
        t.decodeFromUrlString(tokenStrForm);
        t.setService(new Text(tokenSignature));
        // LOG.info("returning "+t);
        return t;
    }

    /**
     * Create an instance of a storage handler defined in storerInfo. If one cannot be found
     * then FosterStorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.
     * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.
     * @param conf job's configuration will be used to configure the Configurable StorageHandler
     * @param storerInfo StorerInfo to definining the StorageHandler and InputFormat, OutputFormat and SerDe
     * @return storageHandler instance
     * @throws IOException
     */
    public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException {
        return getStorageHandler(conf,
                                 storerInfo.getStorageHandlerClass(),
                                 storerInfo.getSerdeClass(),
                                 storerInfo.getIfClass(),
                                 storerInfo.getOfClass());
    }

    public static HCatStorageHandler getStorageHandler(Configuration conf, PartInfo partitionInfo) throws IOException {
      return HCatUtil.getStorageHandler(
          conf,
          partitionInfo.getStorageHandlerClassName(),
          partitionInfo.getSerdeClassName(),
          partitionInfo.getInputFormatClassName(),
          partitionInfo.getOutputFormatClassName());
    }

    /**
     * Create an instance of a storage handler. If storageHandler == null,
     * then surrrogate StorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.
     * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.
     * @param conf job's configuration will be used to configure the Configurable StorageHandler
     * @param storageHandler fully qualified class name of the desired StorageHandle instance
     * @param serDe fully qualified class name of the desired SerDe instance
     * @param inputFormat fully qualified class name of the desired InputFormat instance
     * @param outputFormat fully qualified class name of the desired outputFormat instance
     * @return storageHandler instance
     * @throws IOException
     */
    public static HCatStorageHandler getStorageHandler(Configuration conf,
                                                       String storageHandler,
                                                       String serDe,
                                                       String inputFormat,
                                                       String outputFormat)
    throws IOException {

        if ((storageHandler == null) || (storageHandler.equals(FosterStorageHandler.class.getName()))){
            try {
                FosterStorageHandler fosterStorageHandler =
                    new FosterStorageHandler(inputFormat, outputFormat, serDe);
                fosterStorageHandler.setConf(conf);
                return fosterStorageHandler;
            } catch (ClassNotFoundException e) {
                throw new IOException("Failed to load "
                    + "foster storage handler",e);
            }
        }

        try {
            Class<? extends HCatStorageHandler> handlerClass =
                        (Class<? extends HCatStorageHandler>) Class
                    .forName(storageHandler, true, JavaUtils.getClassLoader());
            return (HCatStorageHandler)ReflectionUtils.newInstance(
                                                            handlerClass, conf);
        } catch (ClassNotFoundException e) {
            throw new IOException("Error in loading storage handler."
                    + e.getMessage(), e);
        }
    }

    public static Pair<String,String> getDbAndTableName(String tableName) throws IOException{
      String[] dbTableNametokens = tableName.split("\\.");
      if(dbTableNametokens.length == 1) {
        return new Pair<String,String>(MetaStoreUtils.DEFAULT_DATABASE_NAME,tableName);
      }else if (dbTableNametokens.length == 2) {
        return new Pair<String, String>(dbTableNametokens[0], dbTableNametokens[1]);
      }else{
        throw new IOException("tableName expected in the form "
            +"<databasename>.<table name> or <table name>. Got " + tableName);
      }
    }

    public static Map<String, String>
      getInputJobProperties(HCatStorageHandler storageHandler,
                            InputJobInfo inputJobInfo) {
        TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
                  storageHandler.getInputFormatClass(),
                  storageHandler.getOutputFormatClass(),
                  inputJobInfo.getTableInfo().getStorerInfo().getProperties());
        if(tableDesc.getJobProperties() == null) {
            tableDesc.setJobProperties(new HashMap<String, String>());
        }

        Map<String,String> jobProperties = new HashMap<String,String>();
        try {
            tableDesc.getJobProperties().put(
                HCatConstants.HCAT_KEY_JOB_INFO,
                HCatUtil.serialize(inputJobInfo));

            storageHandler.configureInputJobProperties(tableDesc,
                                                                jobProperties);

        } catch (IOException e) {
            throw new IllegalStateException(
                "Failed to configure StorageHandler",e);
        }

        return jobProperties;
    }


    public static void
      configureOutputStorageHandler(HCatStorageHandler storageHandler,
                                    JobContext context,
                                    OutputJobInfo outputJobInfo) {
        //TODO replace IgnoreKeyTextOutputFormat with a
        //HiveOutputFormatWrapper in StorageHandler
        TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
                  storageHandler.getInputFormatClass(),
                  IgnoreKeyTextOutputFormat.class,
                  outputJobInfo.getTableInfo().getStorerInfo().getProperties());
        if(tableDesc.getJobProperties() == null)
            tableDesc.setJobProperties(new HashMap<String, String>());
        for (Map.Entry<String,String> el: context.getConfiguration()) {
           tableDesc.getJobProperties().put(el.getKey(),el.getValue());
        }

        Map<String,String> jobProperties = new HashMap<String,String>();
        try {
            tableDesc.getJobProperties().put(
                HCatConstants.HCAT_KEY_OUTPUT_INFO,
                HCatUtil.serialize(outputJobInfo));

            storageHandler.configureOutputJobProperties(tableDesc,
                                                        jobProperties);

            for(Map.Entry<String,String> el: jobProperties.entrySet()) {
                context.getConfiguration().set(el.getKey(),el.getValue());
            }
        } catch (IOException e) {
            throw new IllegalStateException(
                "Failed to configure StorageHandler",e);
        }
    }

    /**
     * Replace the contents of dest with the contents of src
     * @param src
     * @param dest
     */
    public static void copyConf(Configuration src, Configuration dest) {
        dest.clear();
        for(Map.Entry<String,String> el : src) {
            dest.set(el.getKey(),el.getValue());
        }
    }

    /**
     * Get or create a hive client depending on whether it exits in cache or not
     * @param hiveConf The hive configuration
     * @return the client
     * @throws MetaException When HiveMetaStoreClient couldn't be created
     * @throws IOException
     */
    public static HiveMetaStoreClient getHiveClient(HiveConf hiveConf)
            throws MetaException, IOException {

        // Singleton behaviour: create the cache instance if required. The cache needs to be created lazily and
        // using the expiry time available in hiveConf.

        if(hiveClientCache == null ) {
            synchronized (HiveMetaStoreClient.class) {
                if(hiveClientCache == null) {
                    hiveClientCache = new HiveClientCache(hiveConf.getInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME,
                            DEFAULT_HIVE_CACHE_EXPIRY_TIME_SECONDS));
                }
            }
        }
        try {
            return hiveClientCache.get(hiveConf);
        } catch (LoginException e) {
            throw new IOException("Couldn't create hiveMetaStoreClient, Error getting UGI for user", e);
        }
    }

    public static void closeHiveClientQuietly(HiveMetaStoreClient client) {
        try {
            if (client != null)
                client.close();
        } catch (Exception e) {
            LOG.debug("Error closing metastore client. Ignored the error.", e);
        }
    }

    public static HiveConf getHiveConf(Configuration conf)
      throws IOException {

      HiveConf hiveConf = new HiveConf(conf, HCatUtil.class);

      //copy the hive conf into the job conf and restore it
      //in the backend context
      if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) {
        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
            HCatUtil.serialize(hiveConf.getAllProperties()));
      } else {
        //Copy configuration properties into the hive conf
        Properties properties = (Properties) HCatUtil.deserialize(
            conf.get(HCatConstants.HCAT_KEY_HIVE_CONF));

        for(Map.Entry<Object, Object> prop : properties.entrySet() ) {
          if( prop.getValue() instanceof String ) {
            hiveConf.set((String) prop.getKey(), (String) prop.getValue());
          } else if( prop.getValue() instanceof Integer ) {
            hiveConf.setInt((String) prop.getKey(),
                (Integer) prop.getValue());
          } else if( prop.getValue() instanceof Boolean ) {
            hiveConf.setBoolean((String) prop.getKey(),
                (Boolean) prop.getValue());
          } else if( prop.getValue() instanceof Long ) {
            hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue());
          } else if( prop.getValue() instanceof Float ) {
            hiveConf.setFloat((String) prop.getKey(),
                (Float) prop.getValue());
          }
        }
      }

      if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
        hiveConf.set("hive.metastore.token.signature",
                     conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
      }

      return hiveConf;
    }


    public static JobConf getJobConfFromContext(JobContext jobContext)
    {
      JobConf jobConf;
      // we need to convert the jobContext into a jobConf
      // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat)
      // begin conversion..
      jobConf = new JobConf(jobContext.getConfiguration());
      // ..end of conversion


      return jobConf;
    }

    public static void copyJobPropertiesToJobConf(
                    Map<String, String>jobProperties, JobConf jobConf)
    {
      for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
        jobConf.set(entry.getKey(), entry.getValue());
      }
    }
}
