/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.rya.streams.kafka.interactor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.api.entity.QueryResultStream;
import org.apache.rya.streams.api.interactor.GetQueryResultStream;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
import org.openrdf.query.impl.MapBindingSet;

/**
 * Integration tests the methods of {@link KafkaGetQueryResultStream}.
 */
public class KafkaGetQueryResultStreamIT {

    @Rule
    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);

    /**
     * Polls a {@link QueryResultStream} until it has either polled too many times without hitting
     * the target number of results, or it hits the target number of results.
     *
     * @param pollMs - How long each poll could take.
     * @param pollIterations - The maximum number of polls that will be attempted.
     * @param targetSize - The number of results to read before stopping.
     * @param stream - The stream that will be polled.
     * @return The results that were read from the stream.
     * @throws Exception If the poll failed.
     */
    private <T> List<T> pollForResults(
            final int pollMs,
            final int pollIterations,
            final int targetSize,
            final QueryResultStream<T> stream)  throws Exception{
        final List<T> read = new ArrayList<>();

        int i = 0;
        while(read.size() < targetSize && i < pollIterations) {
            for(final T result : stream.poll(pollMs)) {
                read.add( result );
            }
            i++;
        }

        return read;
    }

    @Test
    public void fromStart() throws Exception {
        // Create an ID for the query.
        final UUID queryId = UUID.randomUUID();

        // Create a list of test VisibilityBindingSets.
        final List<VisibilityBindingSet> original = new ArrayList<>();

        final ValueFactory vf = new ValueFactoryImpl();
        MapBindingSet bs = new MapBindingSet();
        bs.addBinding("urn:name", vf.createLiteral("Alice"));
        original.add(new VisibilityBindingSet(bs, "a|b|c"));

        bs = new MapBindingSet();
        bs.addBinding("urn:name", vf.createLiteral("Bob"));
        original.add(new VisibilityBindingSet(bs, "a"));

        bs = new MapBindingSet();
        bs.addBinding("urn:name", vf.createLiteral("Charlie"));
        original.add(new VisibilityBindingSet(bs, "b|c"));

        // Write some entries to the query result topic in Kafka.
        try(final Producer<?, VisibilityBindingSet> producer =
                KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) {
            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
            for(final VisibilityBindingSet visBs : original) {
                producer.send(new ProducerRecord<>(resultTopic, visBs));
            }
        }

        // Use the interactor that is being tested to read all of the visibility binding sets.
        final GetQueryResultStream<VisibilityBindingSet> interactor =
                new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
        final List<VisibilityBindingSet> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));

        // Show the fetched binding sets match the original, as well as their order.
        assertEquals(original, read);
    }

    @Test
    public void fromNow() throws Exception {
        // Create an ID for the query.
        final UUID queryId = UUID.randomUUID();

        try(final Producer<?, VisibilityBindingSet> producer =
                KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) {
            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);

            // Write a single visibility binding set to the query's result topic. This will not appear in the expected results.
            final ValueFactory vf = new ValueFactoryImpl();
            MapBindingSet bs = new MapBindingSet();
            bs.addBinding("urn:name", vf.createLiteral("Alice"));
            producer.send(new ProducerRecord<>(resultTopic, new VisibilityBindingSet(bs, "a|b|c")));
            producer.flush();

            // Use the interactor that is being tested to read all of the visibility binding sets that appear after this point.
            final GetQueryResultStream<VisibilityBindingSet> interactor =
                    new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
            try(QueryResultStream<VisibilityBindingSet> results = interactor.fromNow(queryId)) {
                // Read results from the stream.
                List<VisibilityBindingSet> read = new ArrayList<>();
                for(final VisibilityBindingSet visBs : results.poll(500)) {
                    read.add(visBs);
                }

                // Show nothing has been read.
                assertTrue(read.isEmpty());

                // Write two more entries to the result topic. These will be seen by the result stream.
                final List<VisibilityBindingSet> original = new ArrayList<>();

                bs = new MapBindingSet();
                bs.addBinding("urn:name", vf.createLiteral("Bob"));
                original.add(new VisibilityBindingSet(bs, "a"));

                bs = new MapBindingSet();
                bs.addBinding("urn:name", vf.createLiteral("Charlie"));
                original.add(new VisibilityBindingSet(bs, "b|c"));

                for(final VisibilityBindingSet visBs : original) {
                    producer.send(new ProducerRecord<>(resultTopic, visBs));
                }
                producer.flush();

                // Read the results from the result stream.
                read = pollForResults(500, 3, 2, results);

                // Show the new entries were read.
                assertEquals(original, read);
            }
        }
    }

    @Test(expected = IllegalStateException.class)
    public void pollClosedStream() throws Exception {
        // Create an ID for the query.
        final UUID queryId = UUID.randomUUID();

        // Use the interactor that is being tested to create a result stream and immediately close it.
        final GetQueryResultStream<VisibilityBindingSet> interactor =
                new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityBindingSetDeserializer.class);
        final QueryResultStream<VisibilityBindingSet> results = interactor.fromStart(queryId);
        results.close();

        // Try to poll the closed stream.
        results.poll(1);
    }

    @Test
    public void fromStart_visibilityStatements() throws Exception {
        // Create an ID for the query.
        final UUID queryId = UUID.randomUUID();

        // Create some statements that will be written to the result topic.
        final List<VisibilityStatement> original = new ArrayList<>();
        final ValueFactory vf = new ValueFactoryImpl();
        original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
        original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(63)), "b") );
        original.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral("urn:34")), "") );

        // Write the entries to the query result topic in Kafka.
        try(final Producer<?, VisibilityStatement> producer =
                KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityStatementSerializer.class)) {
            final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
            for(final VisibilityStatement visStmt : original) {
                producer.send(new ProducerRecord<>(resultTopic, visStmt));
            }
        }

        // Use the interactor that is being tested to read all of the visibility binding sets.
        final GetQueryResultStream<VisibilityStatement> interactor =
                new KafkaGetQueryResultStream<>(kafka.getKafkaHostname(), kafka.getKafkaPort(), VisibilityStatementDeserializer.class);
        final List<VisibilityStatement> read = pollForResults(500, 3, 3, interactor.fromStart(queryId));

        // Show the fetched binding sets match the original, as well as their order.
        assertEquals(original, read);
    }
}