/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.rya.streams.client.command;

import static org.junit.Assert.assertEquals;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rya.api.model.VisibilityStatement;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
import org.apache.rya.test.kafka.KafkaTestInstanceRule;
import org.apache.rya.test.kafka.KafkaTestUtil;
import org.junit.Rule;
import org.junit.Test;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;

/**
 * Integration tests the methods of {@link LoadStatementsCommand}.
 */
public class LoadStatementsCommandIT {

    private static final Path TURTLE_FILE = Paths.get("src/test/resources/statements.ttl");

    private final String ryaInstance = UUID.randomUUID().toString();

    @Rule
    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);

    @Test
    public void shortParams() throws Exception {
        // Load a file of statements into Kafka.
        final String visibilities = "a|b|c";
        final String[] args = new String[] {
                "-r", "" + ryaInstance,
                "-i", kafka.getKafkaHostname(),
                "-p", kafka.getKafkaPort(),
                "-f", TURTLE_FILE.toString(),
                "-v", visibilities
        };

        // Load the file of statements into the Statements topic.
        new LoadStatementsCommand().execute(args);

        // Show that the statements were loaded into the topic.
        final List<VisibilityStatement> read = new ArrayList<>();

        try(final Consumer<String, VisibilityStatement> consumer =
                KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) {
            // Subscribe for messages.
            consumer.subscribe( Arrays.asList(KafkaTopics.statementsTopic(ryaInstance)) );

            // Read the messages and extract their values.
            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = consumer.poll(3000).iterator();
            while(iter.hasNext()) {
                read.add( iter.next().value() );
            }
        }

        final ValueFactory VF = ValueFactoryImpl.getInstance();
        final List<VisibilityStatement> expected = new ArrayList<>();
        expected.add(new VisibilityStatement(
                VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")),
                visibilities));
        expected.add(new VisibilityStatement(
                VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")),
                visibilities));
        expected.add(new VisibilityStatement(
                VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")),
                visibilities));

        // Show the written statements matches the read ones.
        assertEquals(expected, read);
    }

    @Test
    public void longParams() throws Exception {
        // Load a file of statements into Kafka.
        final String visibilities = "a|b|c";
        final String[] args = new String[] {
                "--ryaInstance", "" + ryaInstance,
                "--kafkaHostname", kafka.getKafkaHostname(),
                "--kafkaPort", kafka.getKafkaPort(),
                "--statementsFile", TURTLE_FILE.toString(),
                "--visibilities", visibilities
        };

        // Load the file of statements into the Statements topic.
        new LoadStatementsCommand().execute(args);

        // Show that the statements were loaded into the topic.
        final List<VisibilityStatement> read = new ArrayList<>();

        try(final Consumer<String, VisibilityStatement> consumer =
                KafkaTestUtil.fromStartConsumer(kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) {
            // Subscribe for messages.
            consumer.subscribe( Arrays.asList(KafkaTopics.statementsTopic(ryaInstance)) );

            // Read the messages and extract their values.
            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = consumer.poll(3000).iterator();
            while(iter.hasNext()) {
                read.add( iter.next().value() );
            }
        }

        final ValueFactory VF = ValueFactoryImpl.getInstance();
        final List<VisibilityStatement> expected = new ArrayList<>();
        expected.add(new VisibilityStatement(
                VF.createStatement(VF.createURI("http://example#alice"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#bob")),
                visibilities));
        expected.add(new VisibilityStatement(
                VF.createStatement(VF.createURI("http://example#bob"), VF.createURI("http://example#talksTo"), VF.createURI("http://example#charlie")),
                visibilities));
        expected.add(new VisibilityStatement(
                VF.createStatement(VF.createURI("http://example#charlie"), VF.createURI("http://example#likes"), VF.createURI("http://example#icecream")),
                visibilities));

        // Show the written statements matches the read ones.
        assertEquals(expected, read);
    }
}