/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.service;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;

import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import org.junit.Assert;
import org.junit.Test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.utils.concurrent.WaitQueue;

import static com.google.common.util.concurrent.Futures.getUnchecked;

public class MigrationCoordinatorTest
{
    private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class);

    private static final InetAddress EP1;
    private static final InetAddress EP2;
    private static final InetAddress EP3;

    private static final UUID LOCAL_VERSION = UUID.randomUUID();
    private static final UUID V1 = UUID.randomUUID();
    private static final UUID V2 = UUID.randomUUID();

    static
    {
        try
        {
            EP1 = InetAddress.getByName("10.0.0.1");
            EP2 = InetAddress.getByName("10.0.0.2");
            EP3 = InetAddress.getByName("10.0.0.3");
        }
        catch (UnknownHostException e)
        {
            throw new AssertionError(e);
        }

        DatabaseDescriptor.forceStaticInitialization();
    }

    private static class InstrumentedCoordinator extends MigrationCoordinator
    {

        Queue<Callback> requests = new LinkedList<>();
        @Override
        protected void sendMigrationMessage(MigrationCoordinator.Callback callback)
        {
            requests.add(callback);
        }

        boolean shouldPullSchema = true;
        @Override
        protected boolean shouldPullSchema(UUID version)
        {
            return shouldPullSchema;
        }

        boolean shouldPullFromEndpoint = true;
        @Override
        protected boolean shouldPullFromEndpoint(InetAddress endpoint)
        {
            return shouldPullFromEndpoint;
        }

        boolean shouldPullImmediately = true;
        @Override
        protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
        {
            return shouldPullImmediately;
        }

        Set<InetAddress> deadNodes = new HashSet<>();
        protected boolean isAlive(InetAddress endpoint)
        {
            return !deadNodes.contains(endpoint);
        }

        UUID localVersion = LOCAL_VERSION;
        @Override
        protected boolean isLocalVersion(UUID version)
        {
            return localVersion.equals(version);
        }

        int maxOutstandingRequests = 3;
        @Override
        protected int getMaxOutstandingVersionRequests()
        {
            return maxOutstandingRequests;
        }

        Set<InetAddress> mergedSchemasFrom = new HashSet<>();
        @Override
        protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations)
        {
            mergedSchemasFrom.add(endpoint);
        }
    }

    @Test
    public void requestResponseCycle() throws InterruptedException
    {
        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();
        coordinator.maxOutstandingRequests = 1;

        Assert.assertTrue(coordinator.requests.isEmpty());

        // first schema report should send a migration request
        getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
        Assert.assertEquals(1, coordinator.requests.size());
        Assert.assertFalse(coordinator.awaitSchemaRequests(1));

        // second should not
        getUnchecked(coordinator.reportEndpointVersion(EP2, V1));
        Assert.assertEquals(1, coordinator.requests.size());
        Assert.assertFalse(coordinator.awaitSchemaRequests(1));

        // until the first request fails, then the second endpoint should be contacted
        MigrationCoordinator.Callback request1 = coordinator.requests.poll();
        Assert.assertEquals(EP1, request1.endpoint);
        getUnchecked(request1.fail());
        Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty());
        Assert.assertFalse(coordinator.awaitSchemaRequests(1));

        // ... then the second endpoint should be contacted
        Assert.assertEquals(1, coordinator.requests.size());
        MigrationCoordinator.Callback request2 = coordinator.requests.poll();
        Assert.assertEquals(EP2, request2.endpoint);
        Assert.assertFalse(coordinator.awaitSchemaRequests(1));
        getUnchecked(request2.response(Collections.emptyList()));
        Assert.assertEquals(EP2, Iterables.getOnlyElement(coordinator.mergedSchemasFrom));
        Assert.assertTrue(coordinator.awaitSchemaRequests(1));

        // and migration tasks should not be sent out for subsequent version reports
        getUnchecked(coordinator.reportEndpointVersion(EP3, V1));
        Assert.assertTrue(coordinator.requests.isEmpty());

    }

    /**
     * If we don't send a request for a version, and endpoints associated with
     * it all change versions, we should signal anyone waiting on that version
     */
    @Test
    public void versionsAreSignaledWhenDeleted()
    {
        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();

        coordinator.reportEndpointVersion(EP1, V1);
        WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
        Assert.assertFalse(signal.isSignalled());

        coordinator.reportEndpointVersion(EP1, V2);
        Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));

        Assert.assertTrue(signal.isSignalled());
    }

	/**
	 * If an endpoint is removed and no other endpoints are reporting its
	 * schema version, the version should be removed and we should signal
	 * anyone waiting on that version
	 */
	@Test
	public void versionsAreSignaledWhenEndpointsRemoved()
	{
		InstrumentedCoordinator coordinator = new InstrumentedCoordinator();

		coordinator.reportEndpointVersion(EP1, V1);
		WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register();
		Assert.assertFalse(signal.isSignalled());

		coordinator.removeAndIgnoreEndpoint(EP1);
		Assert.assertNull(coordinator.getVersionInfoUnsafe(V1));

		Assert.assertTrue(signal.isSignalled());
	}


    private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked)
    {
        Assert.assertTrue(coordinator.requests.isEmpty());
        Future<Void> future = coordinator.reportEndpointVersion(EP1, V1);
        if (future != null)
            getUnchecked(future);
        Assert.assertTrue(coordinator.requests.isEmpty());

        Assert.assertEquals(startupShouldBeUnblocked, coordinator.awaitSchemaRequests(1));
    }

    private static void assertNoContact(InstrumentedCoordinator coordinator, boolean startupShouldBeUnblocked)
    {
        assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked);
    }

    @Test
    public void dontContactNodesWithSameSchema()
    {
        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();

        coordinator.localVersion = V1;
        assertNoContact(coordinator, true);
    }

    @Test
    public void dontContactIncompatibleNodes()
    {
        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();

        coordinator.shouldPullFromEndpoint = false;
        assertNoContact(coordinator, false);
    }

    @Test
    public void dontContactDeadNodes()
    {
        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();

        coordinator.deadNodes.add(EP1);
        assertNoContact(coordinator, EP1, V1, false);
    }

    /**
     * If a node has become incompativle between when the task was scheduled and when it
     * was run, we should detect that and fail the task
     */
    @Test
    public void testGossipRace()
    {
        InstrumentedCoordinator coordinator = new InstrumentedCoordinator() {
            protected boolean shouldPullImmediately(InetAddress endpoint, UUID version)
            {
                // this is the last thing that gets called before scheduling the pull, so set this flag here
                shouldPullFromEndpoint = false;
                return super.shouldPullImmediately(endpoint, version);
            }
        };

        Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1));
        assertNoContact(coordinator, EP1, V1, false);
    }

    @Test
    public void testWeKeepSendingRequests() throws Exception
    {
        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();

        getUnchecked(coordinator.reportEndpointVersion(EP3, V2));
        coordinator.requests.remove().response(Collections.emptyList());

        getUnchecked(coordinator.reportEndpointVersion(EP1, V1));
        getUnchecked(coordinator.reportEndpointVersion(EP2, V1));

        MigrationCoordinator.Callback prev = null;
        Set<InetAddress> EPs = Sets.newHashSet(EP1, EP2);
        int ep1requests = 0;
        int ep2requests = 0;

        for (int i=0; i<10; i++)
        {
            Assert.assertEquals(String.format("%s", i), 2, coordinator.requests.size());

            MigrationCoordinator.Callback next = coordinator.requests.remove();

            // we should be contacting endpoints in a round robin fashion
            Assert.assertTrue(EPs.contains(next.endpoint));
            if (prev != null && prev.endpoint.equals(next.endpoint))
                Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.endpoint, next.endpoint));

            // should send a new request
            next.fail().get();
            prev = next;
            Assert.assertFalse(coordinator.awaitSchemaRequests(1));

            Assert.assertEquals(2, coordinator.requests.size());
        }
        logger.info("{} -> {}", EP1, ep1requests);
        logger.info("{} -> {}", EP2, ep2requests);

        // a single success should unblock startup though
        coordinator.requests.remove().response(Collections.emptyList());
        Assert.assertTrue(coordinator.awaitSchemaRequests(1));

    }

    /**
     * Pull unreceived schemas should detect and send requests out for any
     * schemas that are marked unreceived and have no outstanding requests
     */
    @Test
    public void pullUnreceived()
    {
        InstrumentedCoordinator coordinator = new InstrumentedCoordinator();

        coordinator.shouldPullFromEndpoint = false;
        assertNoContact(coordinator, false);

        coordinator.shouldPullFromEndpoint = true;
        Assert.assertEquals(0, coordinator.requests.size());
        List<Future<Void>> futures = coordinator.pullUnreceivedSchemaVersions();
        futures.forEach(Futures::getUnchecked);
        Assert.assertEquals(1, coordinator.requests.size());
    }
}
