blob: c81028570c92ede3ad7ad6dc108fd1fab43affb3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration.impl;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.core.impl.TransactorCache;
import org.apache.fluo.core.impl.TransactorID;
import org.apache.fluo.core.impl.TransactorNode;
import org.apache.fluo.integration.ITBaseImpl;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
/**
* Tests transactor classes
*/
public class TransactorIT extends ITBaseImpl {
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
public static Long id1 = Long.valueOf(2);
public static Long id2 = Long.valueOf(3);
public static long NUM_OPEN_TIMEOUT_MS = 1000;
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void testTransactorAndCache() throws Exception {
TransactorCache cache = new TransactorCache(env);
Assert.assertFalse(cache.checkExists(id1));
Assert.assertFalse(cache.checkExists(id2));
// create 2 transactors
TransactorNode t1 = new TransactorNode(env);
TransactorNode t2 = new TransactorNode(env);
// verify that they were created correctly
assertNumOpen(2);
Assert.assertEquals(id1, t1.getTransactorID().getLongID());
Assert.assertEquals(id2, t2.getTransactorID().getLongID());
Assert.assertTrue(checkExists(t1));
Assert.assertTrue(checkExists(t2));
Assert.assertArrayEquals("2".getBytes(),
env.getSharedResources().getCurator().getData().forPath(t1.getNodePath()));
Assert.assertArrayEquals("3".getBytes(),
env.getSharedResources().getCurator().getData().forPath(t2.getNodePath()));
// verify the cache
Assert.assertTrue(cache.checkExists(id1));
Assert.assertTrue(cache.checkExists(id2));
// close transactor 1
t1.close();
// verify that t1 was closed
assertNumOpen(1);
Assert.assertFalse(checkExists(t1));
Assert.assertTrue(checkExists(t2));
Assert.assertFalse(cache.checkExists(id1));
Assert.assertTrue(cache.checkExists(id2));
// close transactor 2
t2.close();
// verify that t2 closed
assertNumOpen(0);
Assert.assertFalse(checkExists(t2));
// verify the cache
Assert.assertFalse(cache.checkExists(id1));
Assert.assertFalse(cache.checkExists(id2));
cache.close();
}
@Test
public void testFailures() throws Exception {
TransactorNode t1 = new TransactorNode(env);
assertNumOpen(1);
Assert.assertEquals(id1, t1.getTransactorID().getLongID());
Assert.assertTrue(checkExists(t1));
// Test that node will be recreated if removed
env.getSharedResources().getCurator().delete().forPath(t1.getNodePath());
Assert.assertEquals(id1, t1.getTransactorID().getLongID());
assertNumOpen(1);
Assert.assertTrue(checkExists(t1));
t1.close();
assertNumOpen(0);
Assert.assertFalse(checkExists(t1));
// Test for exception to be called
exception.expect(IllegalStateException.class);
t1.getTransactorID().getLongID();
}
@Test(timeout = 30000)
public void testTimeout() throws Exception {
TransactorCache cache = new TransactorCache(env);
cache.addTimedoutTransactor(id1, 4, System.currentTimeMillis() - 3);
Assert.assertTrue(cache.checkTimedout(id1, 3));
Assert.assertTrue(cache.checkTimedout(id1, 4));
Assert.assertFalse(cache.checkTimedout(id1, 5));
Assert.assertFalse(cache.checkTimedout(id1, 6));
cache.addTimedoutTransactor(id1, 7, System.currentTimeMillis() - 3);
cache.addTimedoutTransactor(id2, 4, System.currentTimeMillis() - 3);
Assert.assertTrue(cache.checkTimedout(id1, 4));
Assert.assertTrue(cache.checkTimedout(id1, 5));
Assert.assertTrue(cache.checkTimedout(id1, 6));
Assert.assertTrue(cache.checkTimedout(id1, 7));
Assert.assertFalse(cache.checkTimedout(id1, 8));
Assert.assertFalse(cache.checkTimedout(id1, 9));
Assert.assertTrue(cache.checkTimedout(id2, 3));
Assert.assertTrue(cache.checkTimedout(id2, 4));
Assert.assertFalse(cache.checkTimedout(id2, 5));
Assert.assertFalse(cache.checkTimedout(id2, 6));
// ensure setting a lower lockTs than previously set has no effect
cache.addTimedoutTransactor(id1, 3, System.currentTimeMillis() - 3);
Assert.assertTrue(cache.checkTimedout(id1, 7));
Assert.assertFalse(cache.checkTimedout(id1, 8));
cache.close();
}
@Test
public void testTransactorID() {
TransactorID tid1 = new TransactorID(env);
Assert.assertEquals(id1, tid1.getLongID());
Assert.assertEquals((Long) (id1 - 1), env.getSharedResources().getTransactorID().getLongID());
TransactorID tid3 = new TransactorID(env);
Assert.assertEquals(id2, tid3.getLongID());
}
private boolean checkExists(TransactorNode t) throws Exception {
return env.getSharedResources().getCurator().checkExists().forPath(t.getNodePath()) != null;
}
private int getNumOpen() throws Exception {
return env.getSharedResources().getCurator().getChildren()
.forPath(ZookeeperPath.TRANSACTOR_NODES).size();
}
private void assertNumOpen(int expected) throws Exception {
long startTime = System.currentTimeMillis();
while (getNumOpen() != expected) {
Thread.sleep(50);
if ((System.currentTimeMillis() - startTime) > NUM_OPEN_TIMEOUT_MS) {
Assert.fail("Timed out waiting for correct transactor number in Zookeeper");
}
}
}
}