/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hcatalog.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The Class HbaseSnapshotRecordReader implements logic for filtering records
 * based on snapshot.
 */
class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {

    static final Logger LOG = LoggerFactory.getLogger(HbaseSnapshotRecordReader.class);
    private final InputJobInfo inpJobInfo;
    private final Configuration conf;
    private final int maxRevisions = 1;
    private ResultScanner scanner;
    private Scan  scan;
    private HTable  htable;
    private TableSnapshot snapshot;
    private Iterator<Result> resultItr;
    private Set<Long> allAbortedTransactions;
    private DataOutputBuffer valueOut = new DataOutputBuffer();
    private DataInputBuffer valueIn = new DataInputBuffer();

    HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
        this.inpJobInfo = inputJobInfo;
        this.conf = conf;
        String snapshotString = conf.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
        HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
                .deserialize(snapshotString);
        this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(hcatSnapshot,
                inpJobInfo.getTableInfo());
    }

    public void init() throws IOException {
        restart(scan.getStartRow());
    }

    public void restart(byte[] firstRow) throws IOException {
        allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
        long maxValidRevision = getMaximumRevision(scan, snapshot);
        while (allAbortedTransactions.contains(maxValidRevision)) {
            maxValidRevision--;
        }
        Scan newScan = new Scan(scan);
        newScan.setStartRow(firstRow);
        //TODO: See if filters in 0.92 can be used to optimize the scan
        //TODO: Consider create a custom snapshot filter
        //TODO: Make min revision a constant in RM
        newScan.setTimeRange(0, maxValidRevision + 1);
        newScan.setMaxVersions();
        this.scanner = this.htable.getScanner(newScan);
        resultItr = this.scanner.iterator();
    }

    private Set<Long> getAbortedTransactions(String tableName, Scan scan) throws IOException {
        Set<Long> abortedTransactions = new HashSet<Long>();
        RevisionManager rm = null;
        try {
            rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
            byte[][] families = scan.getFamilies();
            for (byte[] familyKey : families) {
                String family = Bytes.toString(familyKey);
                List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
                        tableName, family);
                if (abortedWriteTransactions != null) {
                    for (FamilyRevision revision : abortedWriteTransactions) {
                        abortedTransactions.add(revision.getRevision());
                    }
                }
            }
            return abortedTransactions;
        } finally {
            HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
        }
    }

    private long getMaximumRevision(Scan scan, TableSnapshot snapshot) {
        long maxRevision = 0;
        byte[][] families = scan.getFamilies();
        for (byte[] familyKey : families) {
            String family = Bytes.toString(familyKey);
            long revision = snapshot.getRevision(family);
            if (revision > maxRevision)
                maxRevision = revision;
        }
        return maxRevision;
    }

    /*
     * @param htable The HTable ( of HBase) to use for the record reader.
     *
     */
    public void setHTable(HTable htable) {
        this.htable = htable;
    }

    /*
     * @param scan The scan to be used for reading records.
     *
     */
    public void setScan(Scan scan) {
        this.scan = scan;
    }

    @Override
    public ImmutableBytesWritable createKey() {
        return new ImmutableBytesWritable();
    }

    @Override
    public Result createValue() {
        return new Result();
    }

    @Override
    public long getPos() {
        // This should be the ordinal tuple in the range;
        // not clear how to calculate...
        return 0;
    }

    @Override
    public float getProgress() throws IOException {
        // Depends on the total number of tuples
        return 0;
    }

    @Override
    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
        if (this.resultItr == null) {
            LOG.warn("The HBase result iterator is found null. It is possible"
                    + " that the record reader has already been closed.");
        } else {
            while (resultItr.hasNext()) {
                Result temp = resultItr.next();
                Result hbaseRow = prepareResult(temp.list());
                if (hbaseRow != null) {
                    // Update key and value. Currently no way to avoid serialization/de-serialization
                    // as no setters are available.
                    key.set(hbaseRow.getRow());
                    valueOut.reset();
                    hbaseRow.write(valueOut);
                    valueIn.reset(valueOut.getData(), valueOut.getLength());
                    value.readFields(valueIn);
                    return true;
                }

            }
        }
        return false;
    }

    private Result prepareResult(List<KeyValue> keyvalues) {

        List<KeyValue> finalKeyVals = new ArrayList<KeyValue>();
        Map<String, List<KeyValue>> qualValMap = new HashMap<String, List<KeyValue>>();
        for (KeyValue kv : keyvalues) {
            byte[] cf = kv.getFamily();
            byte[] qualifier = kv.getQualifier();
            String key = Bytes.toString(cf) + ":" + Bytes.toString(qualifier);
            List<KeyValue> kvs;
            if (qualValMap.containsKey(key)) {
                kvs = qualValMap.get(key);
            } else {
                kvs = new ArrayList<KeyValue>();
            }

            String family = Bytes.toString(kv.getFamily());
            //Ignore aborted transactions
            if (allAbortedTransactions.contains(kv.getTimestamp())) {
                continue;
            }

            long desiredTS = snapshot.getRevision(family);
            if (kv.getTimestamp() <= desiredTS) {
                kvs.add(kv);
            }
            qualValMap.put(key, kvs);
        }

        Set<String> keys = qualValMap.keySet();
        for (String cf : keys) {
            List<KeyValue> kvs = qualValMap.get(cf);
            if (maxRevisions <= kvs.size()) {
                for (int i = 0; i < maxRevisions; i++) {
                    finalKeyVals.add(kvs.get(i));
                }
            } else {
                finalKeyVals.addAll(kvs);
            }
        }

        if(finalKeyVals.size() == 0){
            return null;
        } else {
            KeyValue[] kvArray = new KeyValue[finalKeyVals.size()];
            finalKeyVals.toArray(kvArray);
            return new Result(kvArray);
        }
    }

    /*
     * @see org.apache.hadoop.hbase.mapred.TableRecordReader#close()
     */
    @Override
    public void close() {
        this.resultItr = null;
        this.scanner.close();
    }

}
