/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.rya.indexing.pcj.storage.accumulo;

import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
import org.openrdf.model.impl.LiteralImpl;
import org.openrdf.model.vocabulary.XMLSchema;
import org.openrdf.query.BindingSet;
import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.AggregateOperatorBase;
import org.openrdf.query.algebra.ExtensionElem;
import org.openrdf.query.algebra.TupleExpr;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
import org.openrdf.query.parser.sparql.SPARQLParser;

import com.google.common.base.Preconditions;

/**
 * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for
 * creating, deleting, and interacting with tables where PeriodicQuery results are stored.
 */
public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage {

    private final String ryaInstance;
    private final Connector accumuloConn;
    private Authorizations auths;
    private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
    private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
    private static final PcjTables pcjTables = new PcjTables();
    private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory();

    /**
     * Creates a AccumuloPeriodicQueryResultStorage Object.
     * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance
     * @param ryaInstance - Rya Instance name for connecting to Rya
     */
    public AccumuloPeriodicQueryResultStorage(final Connector accumuloConn, final String ryaInstance) {
        this.accumuloConn = Preconditions.checkNotNull(accumuloConn);
        this.ryaInstance = Preconditions.checkNotNull(ryaInstance);
        final String user = accumuloConn.whoami();
        try {
            this.auths = accumuloConn.securityOperations().getUserAuthorizations(user);
        } catch (AccumuloException | AccumuloSecurityException e) {
            throw new RuntimeException("Unable access user: " + user + "authorizations.");
        }
    }

    @Override
    public String createPeriodicQuery(final String sparql) throws PeriodicQueryStorageException {
        Preconditions.checkNotNull(sparql);
        final String queryId = pcjIdFactory.nextId();
        return createPeriodicQuery(queryId, sparql);
    }

    @Override
    public String createPeriodicQuery(final String queryId, final String sparql) throws PeriodicQueryStorageException {
        Set<String> bindingNames;
        try {
            bindingNames = new AggregateVariableRemover().getNonAggregationVariables(sparql);
        } catch (final MalformedQueryException e) {
            throw new PeriodicQueryStorageException(e.getMessage());
        }
        final List<String> varOrderList = new ArrayList<>();
        varOrderList.add(PeriodicQueryResultStorage.PeriodicBinId);
        varOrderList.addAll(bindingNames);
        createPeriodicQuery(queryId, sparql, new VariableOrder(varOrderList));
        return queryId;
    }

    @Override
    public void createPeriodicQuery(final String queryId, final String sparql, final VariableOrder order) throws PeriodicQueryStorageException {
        Preconditions.checkNotNull(sparql);
        Preconditions.checkNotNull(queryId);
        Preconditions.checkNotNull(order);
        Preconditions.checkArgument(PeriodicQueryResultStorage.PeriodicBinId.equals(order.getVariableOrders().get(0)),
                "periodicBinId binding name must occur first in VariableOrder.");
        final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
        final Set<VariableOrder> varOrders = new HashSet<>();
        varOrders.add(order);
        try {
            pcjTables.createPcjTable(accumuloConn, tableName, varOrders, sparql);
        } catch (final Exception e) {
            throw new PeriodicQueryStorageException(e.getMessage());
        }
    }

    @Override
    public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(final String queryId) throws PeriodicQueryStorageException {
        try {
            return new PeriodicQueryStorageMetadata(
                    pcjTables.getPcjMetadata(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId)));
        } catch (final Exception e) {
            throw new PeriodicQueryStorageException(e.getMessage());
        }
    }

    @Override
    public void addPeriodicQueryResults(final String queryId, final Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException {
        results.forEach(x -> Preconditions.checkArgument(x.hasBinding(PeriodicQueryResultStorage.PeriodicBinId),
                "BindingSet must contain periodBinId binding."));
        try {
            pcjTables.addResults(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId), results);
        } catch (final Exception e) {
            throw new PeriodicQueryStorageException(e.getMessage());
        }
    }

    @Override
    public void deletePeriodicQueryResults(final String queryId, final long binId) throws PeriodicQueryStorageException {
        final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
        BatchDeleter deleter = null;
        try {
            final Text prefix = getRowPrefix(binId);
            deleter = accumuloConn.createBatchDeleter(tableName, auths, 1, new BatchWriterConfig());
            deleter.setRanges(Collections.singleton(Range.prefix(prefix)));
            deleter.delete();
        } catch (final Exception e) {
            throw new PeriodicQueryStorageException(e.getMessage());
        } finally {
            try {
                if(deleter != null) {
                    deleter.close();
                }
            } catch (final Exception e) {
                throw new PeriodicQueryStorageException(e.getMessage());
            }
        }
    }

    public void deletePeriodicQueryResults(final String queryId) throws PeriodicQueryStorageException {
        try {
            pcjTables.purgePcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId));
        } catch (final Exception e) {
            throw new PeriodicQueryStorageException(e.getMessage());
        }
    }

    @Override
    public void deletePeriodicQuery(final String queryId) throws PeriodicQueryStorageException {
        try {
            pcjTables.dropPcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId));
        } catch (final Exception e) {
            throw new PeriodicQueryStorageException(e.getMessage());
        }
    }

    @Override
    public CloseableIterator<BindingSet> listResults(final String queryId, final Optional<Long> binId)
            throws PeriodicQueryStorageException {
        requireNonNull(queryId);

        final String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
        // Fetch the Variable Orders for the binding sets and choose one of
        // them. It
        // doesn't matter which one we choose because they all result in the
        // same output.
        final PeriodicQueryStorageMetadata metadata = getPeriodicQueryMetadata(queryId);
        final VariableOrder varOrder = metadata.getVariableOrder();

        try {
            // Fetch only the Binding Sets whose Variable Order matches the
            // selected one.
            final Scanner scanner = accumuloConn.createScanner(tableName, auths);
            scanner.fetchColumnFamily(new Text(varOrder.toString()));
            if (binId.isPresent()) {
                scanner.setRange(Range.prefix(getRowPrefix(binId.get())));
            }
            return new AccumuloValueBindingSetIterator(scanner);

        } catch (final Exception e) {
            throw new PeriodicQueryStorageException(String.format("PCJ Table does not exist for name '%s'.", tableName), e);
        }
    }

    private Text getRowPrefix(final long binId) throws BindingSetConversionException {
        final QueryBindingSet bs = new QueryBindingSet();
        bs.addBinding(PeriodicQueryResultStorage.PeriodicBinId, new LiteralImpl(Long.toString(binId), XMLSchema.LONG));

        return new Text(converter.convert(bs, new VariableOrder(PeriodicQueryResultStorage.PeriodicBinId)));
    }

    @Override
    public List<String> listPeriodicTables() {

        final List<String> periodicTables = new ArrayList<>();
        final String periodicPrefix = ryaInstance + PeriodicQueryTableNameFactory.PeriodicTableSuffix;
        boolean foundInstance = false;

        for (final String tableName : accumuloConn.tableOperations().list()) {
            if (tableName.startsWith(ryaInstance)) {
                // This table is part of the target Rya instance.
                foundInstance = true;

                if (tableName.startsWith(periodicPrefix)) {
                    periodicTables.add(tableName);
                }
            } else if (foundInstance) {
                // We have encountered the first table name that does not start
                // with the rya instance name after those that do. Because the
                // list is sorted, there can't be any more pcj tables for the
                // target instance in the list.
                break;
            }
        }
        return periodicTables;
    }

    /**
     * Class for removing any aggregate variables from the ProjectionElementList
     * of the parsed SPARQL queries. This ensures that only non-aggregation
     * values are contained in the Accumulo row.  The non-aggregation variables
     * are not updated while the aggregation variables are, so they are included in
     * the serialized BindingSet in the Accumulo Value field, which is overwritten
     * if an entry with the same Key and different Value (updated aggregation) is
     * written to the table.
     *
     */
    static class AggregateVariableRemover extends QueryModelVisitorBase<RuntimeException> {

        private Set<String> bindingNames;

        public Set<String> getNonAggregationVariables(final String sparql) throws MalformedQueryException {
            final TupleExpr te = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
            bindingNames = te.getBindingNames();
            te.visit(this);
            return bindingNames;
        }

        @Override
        public void meet(final ExtensionElem node) {
            if(node.getExpr() instanceof AggregateOperatorBase) {
                bindingNames.remove(node.getName());
            }
        }

    }

}
