/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 * 
 * http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.fluo.integration.impl;

import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.core.impl.TransactorCache;
import org.apache.fluo.core.impl.TransactorID;
import org.apache.fluo.core.impl.TransactorNode;
import org.apache.fluo.integration.ITBaseImpl;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;

/**
 * Tests transactor classes
 */
public class TransactorIT extends ITBaseImpl {
  @Rule
  public Timeout globalTimeout = Timeout.seconds(getTestTimeout());

  public static final Long id1 = Long.valueOf(2);
  public static final Long id2 = Long.valueOf(3);
  public static final long NUM_OPEN_TIMEOUT_MS = 1000;

  @Rule
  public ExpectedException exception = ExpectedException.none();

  @Test
  public void testTransactorAndCache() throws Exception {

    TransactorCache cache = new TransactorCache(env);
    Assert.assertFalse(cache.checkExists(id1));
    Assert.assertFalse(cache.checkExists(id2));

    // create 2 transactors
    TransactorNode t1 = new TransactorNode(env);
    TransactorNode t2 = new TransactorNode(env);

    // verify that they were created correctly
    assertNumOpen(2);
    Assert.assertEquals(id1, t1.getTransactorID().getLongID());
    Assert.assertEquals(id2, t2.getTransactorID().getLongID());
    Assert.assertTrue(checkExists(t1));
    Assert.assertTrue(checkExists(t2));
    Assert.assertArrayEquals("2".getBytes(),
        env.getSharedResources().getCurator().getData().forPath(t1.getNodePath()));
    Assert.assertArrayEquals("3".getBytes(),
        env.getSharedResources().getCurator().getData().forPath(t2.getNodePath()));

    // verify the cache
    Assert.assertTrue(cache.checkExists(id1));
    Assert.assertTrue(cache.checkExists(id2));

    // close transactor 1
    t1.close();

    // verify that t1 was closed
    assertNumOpen(1);
    Assert.assertFalse(checkExists(t1));
    Assert.assertTrue(checkExists(t2));
    Assert.assertFalse(cache.checkExists(id1));
    Assert.assertTrue(cache.checkExists(id2));

    // close transactor 2
    t2.close();

    // verify that t2 closed
    assertNumOpen(0);
    Assert.assertFalse(checkExists(t2));

    // verify the cache
    Assert.assertFalse(cache.checkExists(id1));
    Assert.assertFalse(cache.checkExists(id2));

    cache.close();
  }

  @Test
  public void testFailures() throws Exception {

    TransactorNode t1 = new TransactorNode(env);

    assertNumOpen(1);
    Assert.assertEquals(id1, t1.getTransactorID().getLongID());
    Assert.assertTrue(checkExists(t1));

    // Test that node will be recreated if removed
    env.getSharedResources().getCurator().delete().forPath(t1.getNodePath());

    Assert.assertEquals(id1, t1.getTransactorID().getLongID());
    assertNumOpen(1);
    Assert.assertTrue(checkExists(t1));

    t1.close();

    assertNumOpen(0);
    Assert.assertFalse(checkExists(t1));

    // Test for exception to be called
    exception.expect(IllegalStateException.class);
    t1.getTransactorID().getLongID();
  }

  @Test(timeout = 30000)
  public void testTimeout() throws Exception {
    TransactorCache cache = new TransactorCache(env);

    cache.addTimedoutTransactor(id1, 4, System.currentTimeMillis() - 3);

    Assert.assertTrue(cache.checkTimedout(id1, 3));
    Assert.assertTrue(cache.checkTimedout(id1, 4));
    Assert.assertFalse(cache.checkTimedout(id1, 5));
    Assert.assertFalse(cache.checkTimedout(id1, 6));

    cache.addTimedoutTransactor(id1, 7, System.currentTimeMillis() - 3);
    cache.addTimedoutTransactor(id2, 4, System.currentTimeMillis() - 3);

    Assert.assertTrue(cache.checkTimedout(id1, 4));
    Assert.assertTrue(cache.checkTimedout(id1, 5));
    Assert.assertTrue(cache.checkTimedout(id1, 6));
    Assert.assertTrue(cache.checkTimedout(id1, 7));
    Assert.assertFalse(cache.checkTimedout(id1, 8));
    Assert.assertFalse(cache.checkTimedout(id1, 9));

    Assert.assertTrue(cache.checkTimedout(id2, 3));
    Assert.assertTrue(cache.checkTimedout(id2, 4));
    Assert.assertFalse(cache.checkTimedout(id2, 5));
    Assert.assertFalse(cache.checkTimedout(id2, 6));

    // ensure setting a lower lockTs than previously set has no effect
    cache.addTimedoutTransactor(id1, 3, System.currentTimeMillis() - 3);

    Assert.assertTrue(cache.checkTimedout(id1, 7));
    Assert.assertFalse(cache.checkTimedout(id1, 8));

    cache.close();
  }

  @Test
  public void testTransactorID() {
    TransactorID tid1 = new TransactorID(env);
    Assert.assertEquals(id1, tid1.getLongID());
    Assert.assertEquals((Long) (id1 - 1), env.getSharedResources().getTransactorID().getLongID());
    TransactorID tid3 = new TransactorID(env);
    Assert.assertEquals(id2, tid3.getLongID());
  }

  private boolean checkExists(TransactorNode t) throws Exception {
    return env.getSharedResources().getCurator().checkExists().forPath(t.getNodePath()) != null;
  }

  private int getNumOpen() throws Exception {
    return env.getSharedResources().getCurator().getChildren()
        .forPath(ZookeeperPath.TRANSACTOR_NODES).size();
  }

  private void assertNumOpen(int expected) throws Exception {
    long startTime = System.currentTimeMillis();
    while (getNumOpen() != expected) {
      Thread.sleep(50);
      if ((System.currentTimeMillis() - startTime) > NUM_OPEN_TIMEOUT_MS) {
        Assert.fail("Timed out waiting for correct transactor number in Zookeeper");
      }
    }
  }
}
