blob: fba0c43aec3914271a5425fec6b283a382fe9ee9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.integration.impl;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.accumulo.util.LongUtil;
import org.apache.fluo.accumulo.util.ZookeeperPath;
import org.apache.fluo.core.impl.TimestampTracker;
import org.apache.fluo.core.impl.TransactorID;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/**
* Tests TimestampTracker class
*/
public class TimestampTrackerIT extends ITBaseImpl {
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
@Test(expected = NoSuchElementException.class)
public void testTsNoElement() {
TimestampTracker tracker = env.getSharedResources().getTimestampTracker();
Assert.assertTrue(tracker.isEmpty());
tracker.getOldestActiveTimestamp();
}
@Test
public void testTrackingWithNoUpdate() throws Exception {
TimestampTracker tracker = new TimestampTracker(env, new TransactorID(env));
Assert.assertTrue(tracker.isEmpty());
Assert.assertFalse(zkNodeExists(tracker));
final long ts1 = tracker.allocateTimestamp().getTxTimestamp();
Assert.assertFalse(tracker.isEmpty());
Assert.assertTrue(zkNodeExists(tracker));
Assert.assertTrue(ts1 > zkNodeValue(tracker));
Assert.assertEquals(tracker.getZookeeperTimestamp(), zkNodeValue(tracker));
Assert.assertEquals(ts1, tracker.getOldestActiveTimestamp());
final long ts2 = tracker.allocateTimestamp().getTxTimestamp();
Assert.assertEquals(ts1, tracker.getOldestActiveTimestamp());
tracker.removeTimestamp(ts1);
Assert.assertFalse(tracker.isEmpty());
Assert.assertEquals(ts2, tracker.getOldestActiveTimestamp());
Assert.assertFalse(tracker.isEmpty());
Assert.assertTrue(ts1 > zkNodeValue(tracker));
Assert.assertEquals(tracker.getZookeeperTimestamp(), zkNodeValue(tracker));
tracker.removeTimestamp(ts2);
Assert.assertTrue(tracker.isEmpty());
Assert.assertTrue(zkNodeExists(tracker));
tracker.close();
}
@Test
public void testTrackingWithZkUpdate() throws Exception {
TimestampTracker tracker = new TimestampTracker(env, new TransactorID(env), 5);
final long ts1 = tracker.allocateTimestamp().getTxTimestamp();
while (!zkNodeExists(tracker) || tracker.getZookeeperTimestamp() != zkNodeValue(tracker)) {
Thread.sleep(5);
}
Assert.assertNotNull(ts1);
Assert.assertTrue(zkNodeExists(tracker));
Assert.assertNotNull(zkNodeValue(tracker));
Assert.assertEquals(tracker.getZookeeperTimestamp(), zkNodeValue(tracker));
Assert.assertEquals(ts1, tracker.getOldestActiveTimestamp());
final long ts2 = tracker.allocateTimestamp().getTxTimestamp();
Assert.assertEquals(ts1, tracker.getOldestActiveTimestamp());
Thread.sleep(15);
tracker.removeTimestamp(ts1);
Thread.sleep(15);
while (ts2 != zkNodeValue(tracker)) {
Thread.sleep(5);
}
Assert.assertEquals(ts2, tracker.getOldestActiveTimestamp());
Assert.assertEquals(ts2, zkNodeValue(tracker));
tracker.removeTimestamp(ts2);
while (zkNodeExists(tracker)) {
Thread.sleep(5);
}
Assert.assertTrue(tracker.isEmpty());
Assert.assertFalse(zkNodeExists(tracker));
tracker.close();
}
@Test
public void testTimestampUtilGetOldestTs() throws Exception {
Assert.assertEquals(0, getOldestTs());
TimestampTracker tr1 = new TimestampTracker(env, new TransactorID(env), 5);
final long ts1 = tr1.allocateTimestamp().getTxTimestamp();
Thread.sleep(15);
Assert.assertEquals(tr1.getZookeeperTimestamp(), getOldestTs());
TimestampTracker tr2 = new TimestampTracker(env, new TransactorID(env), 5);
final long ts2 = tr2.allocateTimestamp().getTxTimestamp();
TimestampTracker tr3 = new TimestampTracker(env, new TransactorID(env), 5);
final long ts3 = tr3.allocateTimestamp().getTxTimestamp();
while (ts1 != getOldestTs()) {
Thread.sleep(5);
}
Assert.assertEquals(ts1, getOldestTs());
tr1.removeTimestamp(ts1);
while (ts2 != getOldestTs()) {
Thread.sleep(5);
}
Assert.assertEquals(ts2, getOldestTs());
tr2.removeTimestamp(ts2);
while (ts3 != getOldestTs()) {
Thread.sleep(5);
}
Assert.assertEquals(ts3, getOldestTs());
tr3.removeTimestamp(ts3);
tr1.close();
tr2.close();
tr3.close();
}
private long getOldestTs() throws Exception {
CuratorFramework curator = env.getSharedResources().getCurator();
List<String> children;
try {
children = curator.getChildren().forPath(ZookeeperPath.TRANSACTOR_TIMESTAMPS);
} catch (NoNodeException nne) {
children = Collections.emptyList();
}
long oldestTs = Long.MAX_VALUE;
for (String child : children) {
try {
Long ts = LongUtil.fromByteArray(
curator.getData().forPath(ZookeeperPath.TRANSACTOR_TIMESTAMPS + "/" + child));
if (ts < oldestTs) {
oldestTs = ts;
}
} catch (NoNodeException nne) {
continue;
}
}
return oldestTs == Long.MAX_VALUE ? 0 : oldestTs;
}
private boolean zkNodeExists(TimestampTracker tracker) throws Exception {
return env.getSharedResources().getCurator().checkExists()
.forPath(tracker.getNodePath()) != null;
}
private long zkNodeValue(TimestampTracker tracker) throws Exception {
if (zkNodeExists(tracker) == false) {
throw new IllegalStateException("node does not exist");
}
return LongUtil.fromByteArray(
env.getSharedResources().getCurator().getData().forPath(tracker.getNodePath()));
}
}