/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.rya.streams.kafka.processors.join;

import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
import org.openrdf.query.impl.MapBindingSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;
import com.google.common.base.Joiner;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;

/**
 * A {@link KeyValueStore} implementation of {@link JoinStateStore}.
 * </p>
 * This is a key/value store, so we need to store the {@link VisibilityBindingSet}s using keys that allow us to fetch
 * all binding sets that join from a specific side. We use the following pattern to accomplish this:
 * <pre>
 * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]
 * </pre>
 * This will group all binding sets that have been emitted from a specific side and who have the same join variables
 * next to each other within the store. This isn't enough information to fetch that group, though. We must provide a
 * start and end key to bound the range that is fetched back. To accomplish this, we place a start of range marker
 * as the first key for all unique [side]/[join values] groups, and an end of range marker as the last key for each
 * of those groups.
 * </p>
 * The rows follow this pattern:
 * <pre>
 * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]0x00
 * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value],[remainingBindingValues]
 * [side],[joinVar1 value], [joinVar2 value], ..., [joinVarN value]0xFF
 * </pre>
 * </p>
 * When an iterator over the results is returned, it skips over the start and end of range markers.
 */
@DefaultAnnotation(NonNull.class)
public class KeyValueJoinStateStore implements JoinStateStore {

    private static final Logger log = LoggerFactory.getLogger(KeyValueJoinStateStore.class);

    /**
     * This is the minimum value in UTF-8 character.
     */
    private static final String START_RANGE_SUFFIX = new String(new byte[] { 0x00 }, Charsets.UTF_8);

    /**
     * This is the maximum value of a UTF-8 character.
     */
    private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF }, Charsets.UTF_8);

    /**
     * A default empty value that is stored for a start of range or end of range marker.
     */
    private static final VisibilityBindingSet RANGE_MARKER_VALUE = new VisibilityBindingSet(new MapBindingSet(), "");

    private final KeyValueStore<String, VisibilityBindingSet> store;
    private final List<String> joinVars;
    private final List<String> allVars;

    /**
     * Constructs an instance of {@link KeyValueJoinStateStore}.
     *
     * @param store - The state store that will be used. (not null)
     * @param joinVars - The variables that are used to build grouping keys. (not null)
     * @param allVars - The variables that are used to build full value keys. (not null)
     * @throws IllegalArgumentException Thrown if {@code allVars} does not start with {@code joinVars}.
     */
    public KeyValueJoinStateStore(
            final KeyValueStore<String, VisibilityBindingSet> store,
            final List<String> joinVars,
            final List<String> allVars) throws IllegalArgumentException {
        this.store = requireNonNull(store);
        this.joinVars = requireNonNull(joinVars);
        this.allVars = requireNonNull(allVars);

        for(int i = 0; i < joinVars.size(); i++) {
            if(!joinVars.get(i).equals(allVars.get(i))) {
                throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " +
                        "Join Vars: " + joinVars + ", All Vars: " + allVars);
            }
        }
    }

    @Override
    public void store(final BinaryResult result) {
        requireNonNull(result);

        // The join key prefix is an ordered list of values from the binding set that match the join variables.
        // This is a prefix for every row that holds values for a specific set of join variable values.
        final Side side = result.getSide();
        final VisibilityBindingSet bs = result.getResult();
        final String joinKeyPrefix = makeCommaDelimitedValues(side, joinVars, bs);

        final List<KeyValue<String, VisibilityBindingSet>> values = new ArrayList<>();

        // For each join variable set, we need a start key for scanning,
        final String startKey = joinKeyPrefix + START_RANGE_SUFFIX;
        values.add( new KeyValue<>(startKey, RANGE_MARKER_VALUE) );

        // The actual value that was emitted as a result.
        final String valueKey = makeCommaDelimitedValues(side, allVars, bs);
        values.add( new KeyValue<>(valueKey, bs) );

        // And the end key for scanning.
        final String endKey = joinKeyPrefix + END_RANGE_SUFFIX;
        values.add( new KeyValue<>(endKey, RANGE_MARKER_VALUE) );

        // Write the pairs to the store.
        log.debug("\nStoring the following values: {}\n", values);
        store.putAll( values );
    }

    @Override
    public CloseableIterator<VisibilityBindingSet> getJoinedValues(final BinaryResult result) {
        requireNonNull(result);

        // Get an iterator over the values that start with the join variables for the other side.
        final Side otherSide = result.getSide() == Side.LEFT ? Side.RIGHT : Side.LEFT;
        final VisibilityBindingSet bs = result.getResult();
        final String joinKeyPrefix = makeCommaDelimitedValues(otherSide, joinVars, bs);

        final String startKey = joinKeyPrefix + START_RANGE_SUFFIX;
        final String endKey = joinKeyPrefix + END_RANGE_SUFFIX;
        final KeyValueIterator<String, VisibilityBindingSet> rangeIt = store.range(startKey, endKey);

        // Return a CloseableIterator over the range's value fields, skipping the start and end entry.
        return new CloseableIterator<VisibilityBindingSet>() {

            private Optional<VisibilityBindingSet> next = null;

            @Override
            public boolean hasNext() {
                // If the iterator has not been initialized yet, read a value in.
                if(next == null) {
                    next = readNext();
                }

                // Return true if there is a next value, otherwise false.
                return next.isPresent();
            }

            @Override
            public VisibilityBindingSet next() {
                // If the iterator has not been initialized yet, read a value in.
                if(next == null) {
                    next = readNext();
                }

                // It's illegal to call next() when there is no next value.
                if(!next.isPresent()) {
                    throw new IllegalStateException("May not invoke next() when there is nothing left in the Iterator.");
                }

                // Update and return the next value.
                final VisibilityBindingSet ret = next.get();
                log.debug("\nReturning: {}", ret);
                next = readNext();
                return ret;
            }

            private Optional<VisibilityBindingSet> readNext() {
                // Check to see if there's anything left in the iterator.
                if(!rangeIt.hasNext()) {
                    return Optional.empty();
                }

                // Read a candidate key/value pair from the iterator.
                KeyValue<String, VisibilityBindingSet> candidate = rangeIt.next();

                // If we are initializing, then the first thing we must read is a start of range marker.
                if(next == null) {
                    if(!candidate.key.endsWith(START_RANGE_SUFFIX)) {
                        throw new IllegalStateException("The first key encountered must be a start of range key.");
                    }
                    log.debug("Read the start of range markers.\n");

                    // Read a new candidate to skip this one.
                    if(!rangeIt.hasNext()) {
                        throw new IllegalStateException("There must be another entry after the start of range key.");
                    }
                    candidate = rangeIt.next();
                }

                // If that value is an end of range key, then we are finished. Otherwise, return it.
                else if(candidate.key.endsWith(END_RANGE_SUFFIX)) {
                    log.debug("Read the end of range marker.\n");

                    // If there are more messages, that's a problem.
                    if(rangeIt.hasNext()) {
                        throw new IllegalStateException("The end of range marker must be the last key in the iterator.");
                    }

                    return Optional.empty();
                }

                // Otherwise we found a new value.
                return Optional.of( candidate.value );
            }

            @Override
            public void close() throws Exception {
                rangeIt.close();
            }
        };
    }

    /**
     * A utility function that helps construct the keys used by {@link KeyValueJoinStateStore}.
     *
     * @param side - The side value for the key. (not null)
     * @param vars - Which variables within the binding set to use for the key's values. (not null)
     * @param bindingSet - The binding set the key is being constructed from. (not null)
     * @return A comma delimited list of the binding values, leading with the side.
     */
    private static String makeCommaDelimitedValues(final Side side, final List<String> vars, final VisibilityBindingSet bindingSet) {
        requireNonNull(side);
        requireNonNull(vars);
        requireNonNull(bindingSet);

        // Make a an ordered list of the binding set variables.
        final List<String> values = new ArrayList<>();
        values.add(side.toString());
        for(final String var : vars) {
            values.add( bindingSet.hasBinding(var) ? bindingSet.getBinding(var).getValue().toString() : "" );
        }

        // Return a comma delimited list of those values.
        return Joiner.on(",").join(values);
    }
}