/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hcatalog.mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;

public abstract class HCatBaseInputFormat
    extends InputFormat<WritableComparable, HCatRecord> {

    /**
     * get the schema for the HCatRecord data returned by HCatInputFormat.
     *
     * @param context the jobContext
     * @throws IllegalArgumentException
     */
    private Class<? extends InputFormat> inputFileFormatClass;

    // TODO needs to go in InitializeInput? as part of InputJobInfo
    public static HCatSchema getOutputSchema(JobContext context)
        throws IOException {
        String os = context.getConfiguration().get(
            HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
        if (os == null) {
            return getTableSchema(context);
        } else {
            return (HCatSchema) HCatUtil.deserialize(os);
        }
    }

    /**
     * Set the schema for the HCatRecord data returned by HCatInputFormat.
     * @param job the job object
     * @param hcatSchema the schema to use as the consolidated schema
     */
    public static void setOutputSchema(Job job, HCatSchema hcatSchema)
        throws IOException {
        job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA,
            HCatUtil.serialize(hcatSchema));
    }

    protected static org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
    getMapRedInputFormat(JobConf job, Class inputFormatClass) throws IOException {
        return (
            org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>)
            ReflectionUtils.newInstance(inputFormatClass, job);
    }

    /**
     * Logically split the set of input files for the job. Returns the
     * underlying InputFormat's splits
     * @param jobContext the job context object
     * @return the splits, an HCatInputSplit wrapper over the storage
     *         handler InputSplits
     * @throws IOException or InterruptedException
     */
    @Override
    public List<InputSplit> getSplits(JobContext jobContext)
        throws IOException, InterruptedException {

        //Get the job info from the configuration,
        //throws exception if not initialized
        InputJobInfo inputJobInfo;
        try {
            inputJobInfo = getJobInfo(jobContext);
        } catch (Exception e) {
            throw new IOException(e);
        }

        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
        if (partitionInfoList == null) {
            //No partitions match the specified partition filter
            return splits;
        }

        HCatStorageHandler storageHandler;
        JobConf jobConf;
        Configuration conf = jobContext.getConfiguration();
        //For each matching partition, call getSplits on the underlying InputFormat
        for (PartInfo partitionInfo : partitionInfoList) {
            jobConf = HCatUtil.getJobConfFromContext(jobContext);
            setInputPath(jobConf, partitionInfo.getLocation());
            Map<String, String> jobProperties = partitionInfo.getJobProperties();

            HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
            for (HCatFieldSchema field :
                inputJobInfo.getTableInfo().getDataColumns().getFields())
                allCols.append(field);
            for (HCatFieldSchema field :
                inputJobInfo.getTableInfo().getPartitionColumns().getFields())
                allCols.append(field);

            HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);

            storageHandler = HCatUtil.getStorageHandler(
                jobConf, partitionInfo);

            //Get the input format
            Class inputFormatClass = storageHandler.getInputFormatClass();
            org.apache.hadoop.mapred.InputFormat inputFormat =
                getMapRedInputFormat(jobConf, inputFormatClass);

            //Call getSplit on the InputFormat, create an
            //HCatSplit for each underlying split
            //NumSplits is 0 for our purposes
            org.apache.hadoop.mapred.InputSplit[] baseSplits =
                inputFormat.getSplits(jobConf, 0);

            for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
                splits.add(new HCatSplit(
                    partitionInfo,
                    split, allCols));
            }
        }

        return splits;
    }

    /**
     * Create the RecordReader for the given InputSplit. Returns the underlying
     * RecordReader if the required operations are supported and schema matches
     * with HCatTable schema. Returns an HCatRecordReader if operations need to
     * be implemented in HCat.
     * @param split the split
     * @param taskContext the task attempt context
     * @return the record reader instance, either an HCatRecordReader(later) or
     *         the underlying storage handler's RecordReader
     * @throws IOException or InterruptedException
     */
    @Override
    public RecordReader<WritableComparable, HCatRecord>
    createRecordReader(InputSplit split,
                       TaskAttemptContext taskContext) throws IOException, InterruptedException {

        HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
        PartInfo partitionInfo = hcatSplit.getPartitionInfo();
        JobContext jobContext = taskContext;

        HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(
            jobContext.getConfiguration(), partitionInfo);

        JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
        Map<String, String> jobProperties = partitionInfo.getJobProperties();
        HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);

        Map<String, String> valuesNotInDataCols = getColValsNotInDataColumns(
            getOutputSchema(jobContext), partitionInfo
        );

        return new HCatRecordReader(storageHandler, valuesNotInDataCols);
    }


    /**
     * gets values for fields requested by output schema which will not be in the data
     */
    private static Map<String, String> getColValsNotInDataColumns(HCatSchema outputSchema,
                                                                  PartInfo partInfo) {
        HCatSchema dataSchema = partInfo.getPartitionSchema();
        Map<String, String> vals = new HashMap<String, String>();
        for (String fieldName : outputSchema.getFieldNames()) {
            if (dataSchema.getPosition(fieldName) == null) {
                // this entry of output is not present in the output schema
                // so, we first check the table schema to see if it is a part col

                if (partInfo.getPartitionValues().containsKey(fieldName)) {
                    vals.put(fieldName, partInfo.getPartitionValues().get(fieldName));
                } else {
                    vals.put(fieldName, null);
                }
            }
        }
        return vals;
    }

    /**
     * Gets the HCatTable schema for the table specified in the HCatInputFormat.setInput call
     * on the specified job context. This information is available only after HCatInputFormat.setInput
     * has been called for a JobContext.
     * @param context the context
     * @return the table schema
     * @throws IOException if HCatInputFormat.setInput has not been called
     *                     for the current context
     */
    public static HCatSchema getTableSchema(JobContext context)
        throws IOException {
        InputJobInfo inputJobInfo = getJobInfo(context);
        HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
        for (HCatFieldSchema field :
            inputJobInfo.getTableInfo().getDataColumns().getFields())
            allCols.append(field);
        for (HCatFieldSchema field :
            inputJobInfo.getTableInfo().getPartitionColumns().getFields())
            allCols.append(field);
        return allCols;
    }

    /**
     * Gets the InputJobInfo object by reading the Configuration and deserializing
     * the string. If InputJobInfo is not present in the configuration, throws an
     * exception since that means HCatInputFormat.setInput has not been called.
     * @param jobContext the job context
     * @return the InputJobInfo object
     * @throws IOException the exception
     */
    private static InputJobInfo getJobInfo(JobContext jobContext)
        throws IOException {
        String jobString = jobContext.getConfiguration().get(
            HCatConstants.HCAT_KEY_JOB_INFO);
        if (jobString == null) {
            throw new IOException("job information not found in JobContext."
                + " HCatInputFormat.setInput() not called?");
        }

        return (InputJobInfo) HCatUtil.deserialize(jobString);
    }

    private void setInputPath(JobConf jobConf, String location)
        throws IOException {

        // ideally we should just call FileInputFormat.setInputPaths() here - but
        // that won't work since FileInputFormat.setInputPaths() needs
        // a Job object instead of a JobContext which we are handed here

        int length = location.length();
        int curlyOpen = 0;
        int pathStart = 0;
        boolean globPattern = false;
        List<String> pathStrings = new ArrayList<String>();

        for (int i = 0; i < length; i++) {
            char ch = location.charAt(i);
            switch (ch) {
            case '{': {
                curlyOpen++;
                if (!globPattern) {
                    globPattern = true;
                }
                break;
            }
            case '}': {
                curlyOpen--;
                if (curlyOpen == 0 && globPattern) {
                    globPattern = false;
                }
                break;
            }
            case ',': {
                if (!globPattern) {
                    pathStrings.add(location.substring(pathStart, i));
                    pathStart = i + 1;
                }
                break;
            }
            }
        }
        pathStrings.add(location.substring(pathStart, length));

        Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));

        FileSystem fs = FileSystem.get(jobConf);
        Path path = paths[0].makeQualified(fs);
        StringBuilder str = new StringBuilder(StringUtils.escapeString(
            path.toString()));
        for (int i = 1; i < paths.length; i++) {
            str.append(StringUtils.COMMA_STR);
            path = paths[i].makeQualified(fs);
            str.append(StringUtils.escapeString(path.toString()));
        }

        jobConf.set("mapred.input.dir", str.toString());
    }

}
