/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.omid.transaction;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.omid.tools.hbase.HBaseLogin;
import org.apache.omid.tso.client.CellId;
import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
import org.apache.omid.tso.client.TSOClient;
import org.apache.omid.tso.client.TSOProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class HBaseTransactionManager extends AbstractTransactionManager implements HBaseTransactionClient {

    private static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionManager.class);

    private static class HBaseTransactionFactory implements TransactionFactory<HBaseCellId> {

        @Override
        public HBaseTransaction createTransaction(long transactionId, long epoch, AbstractTransactionManager tm) {

            return new HBaseTransaction(transactionId, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(),
                    tm, tm.isLowLatency());

        }

    }

    // ----------------------------------------------------------------------------------------------------------------
    // Construction
    // ----------------------------------------------------------------------------------------------------------------

    public static TransactionManager newInstance() throws IOException, InterruptedException {
        return newInstance(new HBaseOmidClientConfiguration());
    }

    public static TransactionManager newInstance(HBaseOmidClientConfiguration configuration)
            throws IOException, InterruptedException {
        //Logging in to Secure HBase if required
        HBaseLogin.loginIfNeeded(configuration);
        return builder(configuration).build();
    }

    public static class Builder {

        // Required parameters
        private final HBaseOmidClientConfiguration hbaseOmidClientConf;

        // Optional parameters - initialized to default values
        private Optional<TSOProtocol> tsoClient = Optional.absent();
        private Optional<CommitTable.Client> commitTableClient = Optional.absent();
        private Optional<CommitTable.Writer> commitTableWriter = Optional.absent();
        private Optional<PostCommitActions> postCommitter = Optional.absent();

        public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
            this.hbaseOmidClientConf = hbaseOmidClientConf;
        }

        public Builder tsoClient(TSOProtocol tsoClient) {
            this.tsoClient = Optional.of(tsoClient);
            return this;
        }

        public Builder commitTableClient(CommitTable.Client client) {
            this.commitTableClient = Optional.of(client);
            return this;
        }

        public Builder commitTableWriter(CommitTable.Writer writer) {
            this.commitTableWriter = Optional.of(writer);
            return this;
        }

        Builder postCommitter(PostCommitActions postCommitter) {
            this.postCommitter = Optional.of(postCommitter);
            return this;
        }

        public HBaseTransactionManager build() throws IOException, InterruptedException {

            CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
            CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get();
            PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
            TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();

            return new HBaseTransactionManager(hbaseOmidClientConf,
                                               postCommitter,
                                               tsoClient,
                                               commitTableClient,
                                               commitTableWriter,
                                               new HBaseTransactionFactory());
        }

        private Optional<TSOProtocol> buildTSOClient() throws IOException, InterruptedException {
            return Optional.of((TSOProtocol) TSOClient.newInstance(hbaseOmidClientConf.getOmidClientConfiguration()));
        }


        private Optional<CommitTable.Client> buildCommitTableClient() throws IOException {
            HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
            commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
            CommitTable commitTable = new HBaseCommitTable(ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration()), commitTableConf);
            return Optional.of(commitTable.getClient());
        }

        private Optional<CommitTable.Writer> buildCommitTableWriter() throws IOException {
            HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
            commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
            CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
            return Optional.of(commitTable.getWriter());
        }

        private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {

            PostCommitActions postCommitter;
            PostCommitActions syncPostCommitter = new HBaseSyncPostCommitter(hbaseOmidClientConf.getMetrics(),
                                                                             commitTableClient);
            switch(hbaseOmidClientConf.getPostCommitMode()) {
                case ASYNC:
                    ListeningExecutorService postCommitExecutor =
                            MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
                                    new ThreadFactoryBuilder().setNameFormat("postCommit-%d").build()));
                    postCommitter = new HBaseAsyncPostCommitter(syncPostCommitter, postCommitExecutor);
                    break;
                case SYNC:
                default:
                    postCommitter = syncPostCommitter;
                    break;
            }

            return Optional.of(postCommitter);
        }

    }

    public static Builder builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
        return new Builder(hbaseOmidClientConf);
    }

    private HBaseTransactionManager(HBaseOmidClientConfiguration hBaseOmidClientConfiguration,
                                    PostCommitActions postCommitter,
                                    TSOProtocol tsoClient,
                                    CommitTable.Client commitTableClient,
                                    CommitTable.Writer commitTableWriter,
                                    HBaseTransactionFactory hBaseTransactionFactory) {

        super(hBaseOmidClientConfiguration.getMetrics(),
                postCommitter,
                tsoClient,
                commitTableClient,
                commitTableWriter,
                hBaseTransactionFactory);
    }

    // ----------------------------------------------------------------------------------------------------------------
    // AbstractTransactionManager overwritten methods
    // ----------------------------------------------------------------------------------------------------------------

    @Override
    public void preCommit(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
        try {
            // Flush all pending writes
            HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
            hBaseTx.flushTables();
        } catch (IOException e) {
            throw new TransactionManagerException("Exception while flushing writes", e);
        }
    }

    @Override
    public void preRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {
        try {
            // Flush all pending writes
            HBaseTransaction hBaseTx = enforceHBaseTransactionAsParam(transaction);
            hBaseTx.flushTables();
        } catch (IOException e) {
            throw new TransactionManagerException("Exception while flushing writes", e);
        }
    }

    @Override
    public long getHashForTable(byte[] tableName) {
        return HBaseCellId.getHasher().putBytes(tableName).hash().asLong();
    }

    @Override
    public long getLowWatermark() throws TransactionException {
        try {
            return commitTableClient.readLowWatermark().get();
        } catch (ExecutionException ee) {
            throw new TransactionException("Error reading low watermark", ee.getCause());
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new TransactionException("Interrupted reading low watermark", ie);
        }
    }

    // ----------------------------------------------------------------------------------------------------------------
    // Helper methods
    // ----------------------------------------------------------------------------------------------------------------

    static HBaseTransaction enforceHBaseTransactionAsParam(AbstractTransaction<? extends CellId> tx) {

        if (tx instanceof HBaseTransaction) {
            return (HBaseTransaction) tx;
        } else {
            throw new IllegalArgumentException(
                    "The transaction object passed is not an instance of HBaseTransaction");
        }

    }

    public void setConflictDetectionLevel(ConflictDetectionLevel conflictDetectionLevel) {
        tsoClient.setConflictDetectionLevel(conflictDetectionLevel);
    }

    public ConflictDetectionLevel getConflictDetectionLevel() {
        return tsoClient.getConflictDetectionLevel();
    }

    static class CommitTimestampLocatorImpl implements CommitTimestampLocator {

        private HBaseCellId hBaseCellId;
        private final Map<Long, Long> commitCache;
        private TableAccessWrapper tableAccessWrapper;

        CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache, TableAccessWrapper tableAccessWrapper) {
            this.hBaseCellId = hBaseCellId;
            this.commitCache = commitCache;
            this.tableAccessWrapper = tableAccessWrapper;
        }

        CommitTimestampLocatorImpl(HBaseCellId hBaseCellId, Map<Long, Long> commitCache) {
            this.hBaseCellId = hBaseCellId;
            this.commitCache = commitCache;
            this.tableAccessWrapper = null;
            this.tableAccessWrapper = new HTableAccessWrapper(hBaseCellId.getTable().getHTable(), hBaseCellId.getTable().getHTable());
        }

        @Override
        public Optional<Long> readCommitTimestampFromCache(long startTimestamp) {
            if (commitCache.containsKey(startTimestamp)) {
                return Optional.of(commitCache.get(startTimestamp));
            }
            return Optional.absent();
        }

        @Override
        public Optional<Long> readCommitTimestampFromShadowCell(long startTimestamp) throws IOException {

            Get get = new Get(hBaseCellId.getRow());
            byte[] family = hBaseCellId.getFamily();
            byte[] shadowCellQualifier = CellUtils.addShadowCellSuffixPrefix(hBaseCellId.getQualifier());
            get.addColumn(family, shadowCellQualifier);
            get.setMaxVersions(1);
            get.setTimeStamp(startTimestamp);
            Result result = tableAccessWrapper.get(get);
            if (result.containsColumn(family, shadowCellQualifier)) {
                return Optional.of(Bytes.toLong(result.getValue(family, shadowCellQualifier)));
            }
            return Optional.absent();
        }

    }

}
