/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.rya.mongodb.iter;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.security.Authorizations;
import org.apache.log4j.Logger;
import org.apache.rya.api.RdfCloudTripleStoreUtils;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil;
import org.bson.Document;
import org.openrdf.query.BindingSet;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
import com.mongodb.DBObject;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.util.JSON;

import info.aduna.iteration.CloseableIteration;

public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> {
    private static final Logger log = Logger.getLogger(RyaStatementBindingSetCursorIterator.class);

    private static final int QUERY_BATCH_SIZE = 50;

    private final MongoCollection<Document> coll;
    private final Multimap<RyaStatement, BindingSet> rangeMap;
    private final Multimap<RyaStatement, BindingSet> executedRangeMap = HashMultimap.create();
    private final Iterator<RyaStatement> queryIterator;
    private Iterator<Document> batchQueryResultsIterator;
    private RyaStatement currentResultStatement;
    private Iterator<BindingSet> currentBindingSetIterator;
    private final MongoDBStorageStrategy<RyaStatement> strategy;
    private final Authorizations auths;

    public RyaStatementBindingSetCursorIterator(final MongoCollection<Document> coll,
            final Multimap<RyaStatement, BindingSet> rangeMap, final MongoDBStorageStrategy<RyaStatement> strategy,
            final Authorizations auths) {
        this.coll = coll;
        this.rangeMap = rangeMap;
        queryIterator = rangeMap.keySet().iterator();
        this.strategy = strategy;
        this.auths = auths;
    }

    @Override
    public boolean hasNext() {
        if (!currentBindingSetIteratorIsValid()) {
            findNextResult();
        }
        return currentBindingSetIteratorIsValid();
    }

    @Override
    public Entry<RyaStatement, BindingSet> next() {
        if (!currentBindingSetIteratorIsValid()) {
            findNextResult();
        }
        if (currentBindingSetIteratorIsValid()) {
            final BindingSet currentBindingSet = currentBindingSetIterator.next();
            return new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(currentResultStatement, currentBindingSet);
        }
        return null;
    }

    private boolean currentBindingSetIteratorIsValid() {
        return (currentBindingSetIterator != null) && currentBindingSetIterator.hasNext();
    }

    private void findNextResult() {
        if (!currentBatchQueryResultCursorIsValid()) {
            submitBatchQuery();
        }

        if (currentBatchQueryResultCursorIsValid()) {
            // convert to Rya Statement
            final Document queryResult = batchQueryResultsIterator.next();
            final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson());
            currentResultStatement = strategy.deserializeDBObject(dbo);

            // Find all of the queries in the executed RangeMap that this result matches
            // and collect all of those binding sets
            final Set<BindingSet> bsList = new HashSet<>();
            for (final RyaStatement executedQuery : executedRangeMap.keys()) {
                if (isResultForQuery(executedQuery, currentResultStatement)) {
                    bsList.addAll(executedRangeMap.get(executedQuery));
                }
            }
            currentBindingSetIterator = bsList.iterator();
        }

        // Handle case of invalid currentResultStatement or no binding sets returned
        if ((currentBindingSetIterator == null || !currentBindingSetIterator.hasNext()) && (currentBatchQueryResultCursorIsValid() || queryIterator.hasNext())) {
            findNextResult();
        }
    }

    private static boolean isResultForQuery(final RyaStatement query, final RyaStatement result) {
        return isResult(query.getSubject(), result.getSubject()) &&
                isResult(query.getPredicate(), result.getPredicate()) &&
                isResult(query.getObject(), result.getObject()) &&
                isResult(query.getContext(), result.getContext());
    }

    private static boolean isResult(final RyaType query, final RyaType result) {
        return (query == null) || query.equals(result);
    }

    private void submitBatchQuery() {
        int count = 0;
        executedRangeMap.clear();
        final List<Document> pipeline = new ArrayList<>();
        final List<DBObject> match = new ArrayList<>();

        while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){
            count++;
            final RyaStatement query = queryIterator.next();
            executedRangeMap.putAll(query, rangeMap.get(query));
            final DBObject currentQuery = strategy.getQuery(query);
            match.add(currentQuery);
        }

        if (match.size() > 1) {
            pipeline.add(new Document("$match", new Document("$or", match)));
        } else if (match.size() == 1) {
            pipeline.add(new Document("$match", match.get(0)));
        } else {
            batchQueryResultsIterator = Iterators.emptyIterator();
            return;
        }

        // Executing redact aggregation to only return documents the user has access to.
        pipeline.addAll(AggregationUtil.createRedactPipeline(auths));
        log.trace(pipeline);

        final AggregateIterable<Document> aggIter = coll.aggregate(pipeline);
        aggIter.batchSize(1000);
        batchQueryResultsIterator = aggIter.iterator();
    }

    private boolean currentBatchQueryResultCursorIsValid() {
        return (batchQueryResultsIterator != null) && batchQueryResultsIterator.hasNext();
    }


    @Override
    public void close() throws RyaDAOException {
        // TODO don't know what to do here
    }

    @Override
    public void remove() throws RyaDAOException {
        next();
    }

}
