/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.client.proxy;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.distributedlog.client.ClientConfig;
import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService;
import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService;
import org.apache.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient;
import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
import org.apache.distributedlog.client.stats.ClientStats;
import org.apache.distributedlog.thrift.service.ServerInfo;
import com.twitter.finagle.stats.NullStatsReceiver;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.Pair;
import org.jboss.netty.util.HashedWheelTimer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/**
 * Test Proxy Client Manager.
 */
public class TestProxyClientManager {

    @Rule
    public TestName runtime = new TestName();

    static class TestHostProvider implements HostProvider {

        Set<SocketAddress> hosts = new HashSet<SocketAddress>();

        synchronized void addHost(SocketAddress host) {
            hosts.add(host);
        }

        @Override
        public synchronized Set<SocketAddress> getHosts() {
            return ImmutableSet.copyOf(hosts);
        }

    }

    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
                                                               long periodicHandshakeIntervalMs) {
        HostProvider provider = new TestHostProvider();
        return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs);
    }

    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
                                                               HostProvider hostProvider,
                                                               long periodicHandshakeIntervalMs) {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs);
        clientConfig.setPeriodicOwnershipSyncIntervalMs(-1);
        HashedWheelTimer dlTimer = new HashedWheelTimer(
                new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(),
                clientConfig.getRedirectBackoffStartMs(),
                TimeUnit.MILLISECONDS);
        return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider,
                new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver()));
    }

    private static SocketAddress createSocketAddress(int port) {
        return new InetSocketAddress("127.0.0.1", port);
    }

    private static MockProxyClient createMockProxyClient(SocketAddress address) {
        return new MockProxyClient(address, new MockBasicService());
    }

    private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient(
            SocketAddress address, ServerInfo serverInfo) {
        MockServerInfoService service = new MockServerInfoService();
        MockProxyClient proxyClient = new MockProxyClient(address, service);
        service.updateServerInfo(serverInfo);
        return Pair.of(proxyClient, service);
    }

    @Test(timeout = 60000)
    public void testBasicCreateRemove() throws Exception {
        SocketAddress address = createSocketAddress(1000);
        MockProxyClientBuilder builder = new MockProxyClientBuilder();
        MockProxyClient mockProxyClient = createMockProxyClient(address);
        builder.provideProxyClient(address, mockProxyClient);

        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
        assertEquals("There should be no clients in the manager",
                0, clientManager.getNumProxies());
        ProxyClient proxyClient =  clientManager.createClient(address);
        assertEquals("Create client should build the proxy client",
                1, clientManager.getNumProxies());
        assertTrue("The client returned should be the same client that builder built",
                mockProxyClient == proxyClient);
    }

    @Test(timeout = 60000)
    public void testGetShouldCreateClient() throws Exception {
        SocketAddress address = createSocketAddress(2000);
        MockProxyClientBuilder builder = new MockProxyClientBuilder();
        MockProxyClient mockProxyClient = createMockProxyClient(address);
        builder.provideProxyClient(address, mockProxyClient);

        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
        assertEquals("There should be no clients in the manager",
                0, clientManager.getNumProxies());
        ProxyClient proxyClient =  clientManager.getClient(address);
        assertEquals("Get client should build the proxy client",
                1, clientManager.getNumProxies());
        assertTrue("The client returned should be the same client that builder built",
                mockProxyClient == proxyClient);
    }

    @Test(timeout = 60000)
    public void testConditionalRemoveClient() throws Exception {
        SocketAddress address = createSocketAddress(3000);
        MockProxyClientBuilder builder = new MockProxyClientBuilder();
        MockProxyClient mockProxyClient = createMockProxyClient(address);
        MockProxyClient anotherMockProxyClient = createMockProxyClient(address);
        builder.provideProxyClient(address, mockProxyClient);

        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
        assertEquals("There should be no clients in the manager",
                0, clientManager.getNumProxies());
        clientManager.createClient(address);
        assertEquals("Create client should build the proxy client",
                1, clientManager.getNumProxies());
        clientManager.removeClient(address, anotherMockProxyClient);
        assertEquals("Conditional remove should not remove proxy client",
                1, clientManager.getNumProxies());
        clientManager.removeClient(address, mockProxyClient);
        assertEquals("Conditional remove should remove proxy client",
                0, clientManager.getNumProxies());
    }

    @Test(timeout = 60000)
    public void testRemoveClient() throws Exception {
        SocketAddress address = createSocketAddress(3000);
        MockProxyClientBuilder builder = new MockProxyClientBuilder();
        MockProxyClient mockProxyClient = createMockProxyClient(address);
        builder.provideProxyClient(address, mockProxyClient);

        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
        assertEquals("There should be no clients in the manager",
                0, clientManager.getNumProxies());
        clientManager.createClient(address);
        assertEquals("Create client should build the proxy client",
                1, clientManager.getNumProxies());
        clientManager.removeClient(address);
        assertEquals("Remove should remove proxy client",
                0, clientManager.getNumProxies());
    }

    @Test(timeout = 60000)
    public void testCreateClientShouldHandshake() throws Exception {
        SocketAddress address = createSocketAddress(3000);
        MockProxyClientBuilder builder = new MockProxyClientBuilder();
        ServerInfo serverInfo = new ServerInfo();
        serverInfo.putToOwnerships(runtime.getMethodName() + "_stream",
                runtime.getMethodName() + "_owner");
        Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
                createMockProxyClient(address, serverInfo);
        builder.provideProxyClient(address, mockProxyClient.getLeft());

        final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null);
        final CountDownLatch doneLatch = new CountDownLatch(1);
        ProxyListener listener = new ProxyListener() {
            @Override
            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
                resultHolder.set(serverInfo);
                doneLatch.countDown();
            }
            @Override
            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
            }
        };

        ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
        clientManager.registerProxyListener(listener);
        assertEquals("There should be no clients in the manager",
                0, clientManager.getNumProxies());
        clientManager.createClient(address);
        assertEquals("Create client should build the proxy client",
                1, clientManager.getNumProxies());

        // When a client is created, it would handshake with that proxy
        doneLatch.await();
        assertEquals("Handshake should return server info",
                serverInfo, resultHolder.get());
    }

    @Test(timeout = 60000)
    public void testHandshake() throws Exception {
        final int numHosts = 3;
        final int numStreamsPerHost = 3;
        final int initialPort = 4000;

        MockProxyClientBuilder builder = new MockProxyClientBuilder();
        Map<SocketAddress, ServerInfo> serverInfoMap =
                new HashMap<SocketAddress, ServerInfo>();
        for (int i = 0; i < numHosts; i++) {
            SocketAddress address = createSocketAddress(initialPort + i);
            ServerInfo serverInfo = new ServerInfo();
            for (int j = 0; j < numStreamsPerHost; j++) {
                serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
                        address.toString());
            }
            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
                    createMockProxyClient(address, serverInfo);
            builder.provideProxyClient(address, mockProxyClient.getLeft());
            serverInfoMap.put(address, serverInfo);
        }

        final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
        final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts);
        ProxyListener listener = new ProxyListener() {
            @Override
            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
                synchronized (results) {
                    results.put(address, serverInfo);
                }
                doneLatch.countDown();
            }

            @Override
            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
            }
        };

        TestHostProvider rs = new TestHostProvider();
        ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L);
        clientManager.registerProxyListener(listener);
        assertEquals("There should be no clients in the manager",
                0, clientManager.getNumProxies());
        for (int i = 0; i < numHosts; i++) {
            rs.addHost(createSocketAddress(initialPort + i));
        }
        // handshake would handshake with 3 hosts again
        clientManager.handshake();
        doneLatch.await();
        assertEquals("Handshake should return server info",
                numHosts, results.size());
        assertTrue("Handshake should get all server infos",
                Maps.difference(serverInfoMap, results).areEqual());
    }

    @Test(timeout = 60000)
    public void testPeriodicHandshake() throws Exception {
        final int numHosts = 3;
        final int numStreamsPerHost = 3;
        final int initialPort = 5000;

        MockProxyClientBuilder builder = new MockProxyClientBuilder();
        Map<SocketAddress, ServerInfo> serverInfoMap =
                new HashMap<SocketAddress, ServerInfo>();
        Map<SocketAddress, MockServerInfoService> mockServiceMap =
                new HashMap<SocketAddress, MockServerInfoService>();
        final Map<SocketAddress, CountDownLatch> hostDoneLatches =
                new HashMap<SocketAddress, CountDownLatch>();
        for (int i = 0; i < numHosts; i++) {
            SocketAddress address = createSocketAddress(initialPort + i);
            ServerInfo serverInfo = new ServerInfo();
            for (int j = 0; j < numStreamsPerHost; j++) {
                serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
                        address.toString());
            }
            Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
                    createMockProxyClient(address, serverInfo);
            builder.provideProxyClient(address, mockProxyClient.getLeft());
            serverInfoMap.put(address, serverInfo);
            mockServiceMap.put(address, mockProxyClient.getRight());
            hostDoneLatches.put(address, new CountDownLatch(2));
        }

        final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
        final CountDownLatch doneLatch = new CountDownLatch(numHosts);
        ProxyListener listener = new ProxyListener() {
            @Override
            public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
                synchronized (results) {
                    results.put(address, serverInfo);
                    CountDownLatch latch = hostDoneLatches.get(address);
                    if (null != latch) {
                        latch.countDown();
                    }
                }
                doneLatch.countDown();
            }

            @Override
            public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
            }
        };

        TestHostProvider rs = new TestHostProvider();
        ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L);
        clientManager.setPeriodicHandshakeEnabled(false);
        clientManager.registerProxyListener(listener);

        assertEquals("There should be no clients in the manager",
                0, clientManager.getNumProxies());
        for (int i = 0; i < numHosts; i++) {
            SocketAddress address = createSocketAddress(initialPort + i);
            rs.addHost(address);
            clientManager.createClient(address);
        }

        // make sure the first 3 handshakes going through
        doneLatch.await();

        assertEquals("Handshake should return server info",
                numHosts, results.size());
        assertTrue("Handshake should get all server infos",
                Maps.difference(serverInfoMap, results).areEqual());

        // update server info
        for (int i = 0; i < numHosts; i++) {
            SocketAddress address = createSocketAddress(initialPort + i);
            ServerInfo serverInfo = new ServerInfo();
            for (int j = 0; j < numStreamsPerHost; j++) {
                serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j,
                        address.toString());
            }
            MockServerInfoService service = mockServiceMap.get(address);
            serverInfoMap.put(address, serverInfo);
            service.updateServerInfo(serverInfo);
        }

        clientManager.setPeriodicHandshakeEnabled(true);
        for (int i = 0; i < numHosts; i++) {
            SocketAddress address = createSocketAddress(initialPort + i);
            CountDownLatch latch = hostDoneLatches.get(address);
            latch.await();
        }

        assertTrue("Periodic handshake should update all server infos",
                Maps.difference(serverInfoMap, results).areEqual());
    }

}
