/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.omid.tso.client;

import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;

import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.tso.PausableTimestampOracle;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.TimestampOracle;
import org.apache.omid.tso.util.DummyCellIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class TestTSOClientRequestAndResponseBehaviours {

    private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);

    private static final String TSO_SERVER_HOST = "localhost";
    private static final int TSO_SERVER_PORT = 1234;

    private final static CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
    private final static CellId c2 = new DummyCellIdImpl(0xfeedcafeL);

    private final static Set<CellId> testWriteSet = Sets.newHashSet(c1, c2);

    private OmidClientConfiguration tsoClientConf;

    // Required infrastructure for TSOClient test
    private TSOServer tsoServer;
    private PausableTimestampOracle pausableTSOracle;
    private CommitTable commitTable;

    @BeforeMethod
    public void beforeMethod() throws Exception {

        TSOServerConfig tsoConfig = new TSOServerConfig();
        tsoConfig.setConflictMapSize(1000);
        tsoConfig.setPort(TSO_SERVER_PORT);
        tsoConfig.setNumConcurrentCTWriters(2);
        Module tsoServerMockModule = new TSOMockModule(tsoConfig);
        Injector injector = Guice.createInjector(tsoServerMockModule);

        LOG.info("==================================================================================================");
        LOG.info("======================================= Init TSO Server ==========================================");
        LOG.info("==================================================================================================");

        tsoServer = injector.getInstance(TSOServer.class);
        tsoServer.startAsync();
        tsoServer.awaitRunning();
        TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);

        LOG.info("==================================================================================================");
        LOG.info("===================================== TSO Server Initialized =====================================");
        LOG.info("==================================================================================================");

        pausableTSOracle = (PausableTimestampOracle) injector.getInstance(TimestampOracle.class);
        commitTable = injector.getInstance(CommitTable.class);

        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
        tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);

        this.tsoClientConf = tsoClientConf;

    }

    @AfterMethod
    public void afterMethod() throws Exception {


        tsoServer.stopAsync();
        tsoServer.awaitTerminated();
        tsoServer = null;
        TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);

        pausableTSOracle.resume();

    }

    /**
     * Test to ensure TSOClient timeouts are cancelled.
     * At some point a bug was detected because the TSOClient timeouts were not cancelled, and as timestamp requests
     * had no way to be correlated to timestamp responses, random requests were just timed out after a certain time.
     * We send a lot of timestamp requests, and wait for them to complete.
     * Ensure that the next request doesn't get hit by the timeouts of the previous
     * requests. (i.e. make sure we cancel timeouts)
     */
    @Test(timeOut = 30_000)
    public void testTimeoutsAreCancelled() throws Exception {

        TSOClient client = TSOClient.newInstance(tsoClientConf);
        int requestTimeoutInMs = 500;
        int requestMaxRetries = 5;
        LOG.info("Request timeout {} ms; Max retries {}", requestTimeoutInMs, requestMaxRetries);
        Future<Long> f = null;
        for (int i = 0; i < (requestMaxRetries * 10); i++) {
            f = client.getNewStartTimestamp();
        }
        if (f != null) {
            f.get();
        }
        pausableTSOracle.pause();
        long msToSleep = ((long) (requestTimeoutInMs * 0.75));
        LOG.info("Sleeping for {} ms", msToSleep);
        TimeUnit.MILLISECONDS.sleep(msToSleep);
        f = client.getNewStartTimestamp();
        msToSleep = ((long) (requestTimeoutInMs * 0.9));
        LOG.info("Sleeping for {} ms", msToSleep);
        TimeUnit.MILLISECONDS.sleep(msToSleep);
        LOG.info("Resuming");
        pausableTSOracle.resume();
        f.get();

    }

    @Test(timeOut = 30_000)
    public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {

        OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
        testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
        testTSOClientConf.setRequestMaxRetries(0);
        TSOClient client = TSOClient.newInstance(testTSOClientConf);

        List<Long> startTimestamps = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            startTimestamps.add(client.getNewStartTimestamp().get());
        }

        pausableTSOracle.pause();

        List<Future<Long>> futures = new ArrayList<>();
        for (long s : startTimestamps) {
            futures.add(client.commit(s, Sets.<CellId>newHashSet()));
        }
        TSOClientAccessor.closeChannel(client);

        for (Future<Long> f : futures) {
            try {
                f.get();
                fail("Shouldn't be able to complete");
            } catch (ExecutionException ee) {
                assertTrue(ee.getCause() instanceof ServiceUnavailableException,
                           "Should be a service unavailable exception");
            }
        }
    }

    /**
     * Test that if a client tries to make a request without handshaking, it will be disconnected.
     */
    @Test(timeOut = 30_000)
    public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {

        TSOClientRaw raw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);

        TSOProto.Request request = TSOProto.Request.newBuilder()
                .setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build())
                .build();
        raw.write(request);
        try {
            raw.getResponse().get();
            fail("Channel should be closed");
        } catch (ExecutionException ee) {
            assertEquals(ee.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
        }
        raw.close();

    }

    // ----------------------------------------------------------------------------------------------------------------
    // Test duplicate commits
    // ----------------------------------------------------------------------------------------------------------------

    /**
     * This tests the case where messages arrive at the TSO out of order. This can happen in the case
     * the channel get dropped and the retry is done in a new channel. However, the TSO will respond with
     * aborted to the original message because the retry was already committed and it would be prohibitively
     * expensive to check all non-retry requests to see if they are already committed. For this reason
     * a client must ensure that if it is sending a retry due to a socket error, the previous channel
     * must be entirely closed so that it will not actually receive the abort response. TCP guarantees
     * that this doesn't happen in non-socket error cases.
     *
     */
    @Test(timeOut = 30_000)
    public void testOutOfOrderMessages() throws Exception {

        TSOClient client = TSOClient.newInstance(tsoClientConf);
        TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);

        long ts1 = client.getNewStartTimestamp().get();

        TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
        TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
        assertFalse(response1.getCommitResponse().getAborted(), "Retry Transaction should commit");
        assertTrue(response2.getCommitResponse().getAborted(), "Transaction should abort");
    }

    @Test(timeOut = 30_000)
    public void testDuplicateCommitAborting() throws Exception {

        TSOClient client = TSOClient.newInstance(tsoClientConf);
        TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);

        long ts1 = client.getNewStartTimestamp().get();
        long ts2 = client.getNewStartTimestamp().get();
        client.commit(ts2, testWriteSet).get();

        TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
        TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
        assertTrue(response1.getCommitResponse().getAborted(), "Transaction should abort");
        assertTrue(response2.getCommitResponse().getAborted(), "Retry commit should abort");
    }

    @Test(timeOut = 30_000)
    public void testDuplicateCommit() throws Exception {

        TSOClient client = TSOClient.newInstance(tsoClientConf);
        TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);

        long ts1 = client.getNewStartTimestamp().get();

        TSOProto.Response response1 = clientOneShot.makeRequest(createCommitRequest(ts1, false, testWriteSet));
        TSOProto.Response response2 = clientOneShot.makeRequest(createCommitRequest(ts1, true, testWriteSet));
        if (client.isLowLatency()) {
            assertTrue(response1.hasCommitResponse());
            assertTrue(response2.getCommitResponse().getAborted());
        } else
            assertEquals(response2.getCommitResponse().getCommitTimestamp(),
                    response1.getCommitResponse().getCommitTimestamp(),
                    "Commit timestamp should be the same");
    }

    // ----------------------------------------------------------------------------------------------------------------
    // Test TSOClient retry behaviour
    // ----------------------------------------------------------------------------------------------------------------

    @Test(timeOut = 30_000)
    public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {

        TSOClient client = TSOClient.newInstance(tsoClientConf);
        long ts1 = client.getNewStartTimestamp().get();
        if(client.isLowLatency())
            return;
        pausableTSOracle.pause();
        TSOFuture<Long> future = client.commit(ts1, testWriteSet);
        TSOClientAccessor.closeChannel(client);
        pausableTSOracle.resume();
        future.get();

    }

    @Test(timeOut = 30_000)
    public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {

        OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
        testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
        testTSOClientConf.setRequestTimeoutInMs(100);
        testTSOClientConf.setRequestMaxRetries(10000);
        TSOClient client = TSOClient.newInstance(testTSOClientConf);

        long ts1 = client.getNewStartTimestamp().get();
        pausableTSOracle.pause();
        TSOFuture<Long> future = client.commit(ts1, testWriteSet);
        TimeUnit.SECONDS.sleep(1);
        pausableTSOracle.resume();
        future.get();
    }

    @Test(timeOut = 30_000)
    public void testCommitFailWhenTSOIsDown() throws Exception {

        OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
        testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
        testTSOClientConf.setRequestTimeoutInMs(100);
        testTSOClientConf.setRequestMaxRetries(10);
        TSOClient client = TSOClient.newInstance(testTSOClientConf);

        long ts1 = client.getNewStartTimestamp().get();
        pausableTSOracle.pause();
        TSOFuture<Long> future = client.commit(ts1, testWriteSet);
        try {
            future.get();
        } catch (ExecutionException e) {
            assertEquals(e.getCause().getClass(), ServiceUnavailableException.class,
                         "Should be a ServiceUnavailableExeption");
        }

    }

    @Test(timeOut = 30_000)
    public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {

        OmidClientConfiguration testTSOClientConf = new OmidClientConfiguration();
        testTSOClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
        testTSOClientConf.setRequestTimeoutInMs(100);
        testTSOClientConf.setRequestMaxRetries(10000);
        TSOClient client = TSOClient.newInstance(testTSOClientConf);

        pausableTSOracle.pause();
        Future<Long> future = client.getNewStartTimestamp();
        TimeUnit.SECONDS.sleep(1);
        pausableTSOracle.resume();
        future.get();

    }

    // ----------------------------------------------------------------------------------------------------------------
    // The next 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
    // (They exercise the communication protocol) TODO Remove???
    // ----------------------------------------------------------------------------------------------------------------
    @Test(timeOut = 30_000)
    public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {

        TSOClient client = TSOClient.newInstance(tsoClientConf);
        TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);

        long tx1ST = client.getNewStartTimestamp().get();

        clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
        TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
        if (client.isLowLatency())
            assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
        else {
            assertFalse(response.getCommitResponse().getAborted(), "Transaction should be committed");
            assertEquals(response.getCommitResponse().getCommitTimestamp(),
                    tx1ST + CommitTable.MAX_CHECKPOINTS_PER_TXN);
        }
    }

    @Test(timeOut = 30_000)
    public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {

        TSOClient client = TSOClient.newInstance(tsoClientConf);
        TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);

        long tx1ST = client.getNewStartTimestamp().get();
        // Invalidate the transaction
        commitTable.getClient().tryInvalidateTransaction(tx1ST);

        clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
        TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
        assertTrue(response.getCommitResponse().getAborted(), "Transaction should be aborted");
        assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
    }

    @Test(timeOut = 30_000)
    public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {

        TSOClient client = TSOClient.newInstance(tsoClientConf);
        TSOClientOneShot clientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);

        long tx1ST = client.getNewStartTimestamp().get();

        clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));

        // Simulate remove entry from the commit table before exercise retry
        commitTable.getClient().deleteCommitEntry(tx1ST);

        TSOProto.Response response = clientOneShot.makeRequest(createRetryCommitRequest(tx1ST));
        assertTrue(response.getCommitResponse().getAborted(), "Transaction should abort");
        assertEquals(response.getCommitResponse().getCommitTimestamp(), 0);
    }
    // ----------------------------------------------------------------------------------------------------------------
    // The previous 3 tests are similar to the ones in TestRetryProcessor but checking the result on the TSOClient side
    // (They exercise the communication protocol) TODO Remove???
    // ----------------------------------------------------------------------------------------------------------------

    // ----------------------------------------------------------------------------------------------------------------
    // Helper methods
    // ----------------------------------------------------------------------------------------------------------------

    private TSOProto.Request createRetryCommitRequest(long ts) {
        return createCommitRequest(ts, true, testWriteSet);
    }

    private TSOProto.Request createCommitRequest(long ts, boolean retry, Set<CellId> writeSet) {
        TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
        TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
        commitBuilder.setStartTimestamp(ts);
        commitBuilder.setIsRetry(retry);
        for (CellId cell : writeSet) {
            commitBuilder.addCellId(cell.getCellId());
        }
        return builder.setCommitRequest(commitBuilder.build()).build();
    }

}
