/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.jackrabbit.oak.plugins.document;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.util.List;

import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState;
import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.Test;

import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;

/**
 * Test the ClusterInfo class
 */
public class ClusterInfoTest {

    @Test
    public void readWriteMode() throws InterruptedException {

        MemoryDocumentStore mem = new MemoryDocumentStore();
        Clock clock = new Clock.Virtual();
        clock.waitUntil(System.currentTimeMillis());
        ClusterNodeInfo.setClock(clock);

        DocumentNodeStore ns1 = new DocumentMK.Builder().
                setDocumentStore(mem).
                setAsyncDelay(0).
                setLeaseCheckMode(LeaseCheckMode.DISABLED).
                setClusterId(1).
                getNodeStore();
        DocumentNodeStore ns2 = new DocumentMK.Builder().
                setDocumentStore(mem).
                setAsyncDelay(0).
                setLeaseCheckMode(LeaseCheckMode.DISABLED).
                setClusterId(2).
                getNodeStore();
        // Bring the current time forward to after the leaseTime which would have been 
        // updated in the DocumentNodeStore initialization.
        clock.waitUntil(clock.getTime() + ns1.getClusterInfo().getLeaseTime());

        ns1.getClusterInfo().setLeaseTime(0);
        ns1.getClusterInfo().setLeaseUpdateInterval(0);
        ns2.getClusterInfo().setLeaseTime(0);
        ns2.getClusterInfo().setLeaseUpdateInterval(0);

        List<ClusterNodeInfoDocument> list = mem.query(
                Collection.CLUSTER_NODES, "0", "a", Integer.MAX_VALUE);
        assertEquals(2, list.size());

        assertNull(mem.getReadPreference());
        assertNull(mem.getWriteConcern());
        mem.setReadWriteMode("read:primary, write:majority");
        assertEquals(ReadPreference.primary(), mem.getReadPreference());
        assertEquals(WriteConcern.MAJORITY, mem.getWriteConcern());

        UpdateOp op;

        // unknown modes: ignore
        op = new UpdateOp(list.get(0).getId(), false);
        op.set("readWriteMode", "read:xyz, write:abc");
        mem.findAndUpdate(Collection.CLUSTER_NODES, op);
        ns1.renewClusterIdLease();
        assertEquals(ReadPreference.primary(), mem.getReadPreference());
        assertEquals(WriteConcern.MAJORITY, mem.getWriteConcern());

        op = new UpdateOp(list.get(0).getId(), false);
        op.set("readWriteMode", "read:nearest, write:fsynced");
        mem.findAndUpdate(Collection.CLUSTER_NODES, op);
        ns1.renewClusterIdLease();
        assertEquals(ReadPreference.nearest(), mem.getReadPreference());
        assertEquals(WriteConcern.FSYNCED, mem.getWriteConcern());

        ns1.dispose();
        ns2.dispose();
    }

    @Test
    public void renewLease() throws InterruptedException {
        MemoryDocumentStore mem = new MemoryDocumentStore();
        Clock clock = new Clock.Virtual();
        clock.waitUntil(System.currentTimeMillis());
        ClusterNodeInfo.setClock(clock);

        DocumentNodeStore ns = new DocumentMK.Builder().
                setDocumentStore(mem).
                setAsyncDelay(0).
                setLeaseCheckMode(LeaseCheckMode.DISABLED).
                getNodeStore();

        ClusterNodeInfo info = ns.getClusterInfo();
        assertNotNull(info);

        // current lease end
        long leaseEnd = getLeaseEndTime(ns);

        // wait a bit, 1sec less than leaseUpdateTime (10sec-1sec by default)
        clock.waitUntil(clock.getTime() + ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS - 1000);

        // must not renew lease right now
        ns.renewClusterIdLease();
        assertEquals(leaseEnd, getLeaseEndTime(ns));

        // wait some more time
        clock.waitUntil(clock.getTime() + 2000);

        // now the lease must be renewed
        ns.renewClusterIdLease();
        assertTrue(getLeaseEndTime(ns) > leaseEnd);

        ns.dispose();
    }

    private static long getLeaseEndTime(DocumentNodeStore nodeStore) {
        ClusterNodeInfoDocument doc = nodeStore.getDocumentStore().find(
                Collection.CLUSTER_NODES,
                String.valueOf(nodeStore.getClusterId()));
        assertNotNull(doc);
        return doc.getLeaseEndTime();
    }

    @Test
    public void useAbandoned() throws InterruptedException {
        Clock clock = new Clock.Virtual();
        clock.waitUntil(System.currentTimeMillis());
        ClusterNodeInfo.setClock(clock);
        MemoryDocumentStore mem = new MemoryDocumentStore();

        DocumentNodeStore ns1 = new DocumentMK.Builder().
                setDocumentStore(mem).
                clock(clock).
                setAsyncDelay(0).
                setLeaseCheckMode(LeaseCheckMode.DISABLED).
                getNodeStore();

        DocumentStore ds = ns1.getDocumentStore();
        int cid = ns1.getClusterId();

        ClusterNodeInfoDocument cnid = ds.find(Collection.CLUSTER_NODES, "" + cid);
        assertNotNull(cnid);
        assertEquals(ClusterNodeState.ACTIVE.toString(), cnid.get(ClusterNodeInfo.STATE));
        ns1.dispose();

        long waitFor = 2000;
        // modify record to indicate "active" with a lease end in the future
        UpdateOp up = new UpdateOp("" + cid, false);
        up.set(ClusterNodeInfo.STATE, ClusterNodeState.ACTIVE.toString());
        long now = clock.getTime();
        up.set(ClusterNodeInfo.LEASE_END_KEY, now + waitFor);
        ds.findAndUpdate(Collection.CLUSTER_NODES, up);

        // try restart
        ns1 = new DocumentMK.Builder().
                setDocumentStore(mem).
                clock(clock).
                setAsyncDelay(0).
                setLeaseCheckMode(LeaseCheckMode.DISABLED).
                getNodeStore();
 
        assertEquals("should have re-used existing cluster id", cid, ns1.getClusterId());
        ns1.dispose();
    }

    @After
    public void tearDown(){
        ClusterNodeInfo.resetClockToDefault();
    }
}
