Merge branch 'view-logs-docs'
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
index 2902772..89b111b 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java
@@ -29,6 +29,7 @@
import java.util.Objects;
import com.google.common.base.Preconditions;
+import com.google.common.primitives.UnsignedBytes;
/**
* Represents bytes in Fluo. Bytes is an immutable wrapper around a byte array. Bytes always copies
@@ -155,7 +156,7 @@
}
public void writeTo(OutputStream out) throws IOException {
- // since Bytes is immutable, its important the we do not let the internal byte array escape
+ // since Bytes is immutable, its important that we do not let the internal byte array escape
if (length <= 32) {
int end = offset + length;
for (int i = offset; i < end; i++) {
@@ -177,17 +178,23 @@
*/
@Override
public final int compareTo(Bytes other) {
- int minLen = Math.min(this.length(), other.length());
+ if (this == other) {
+ return 0;
+ } else if (this.length == this.data.length && other.length == other.data.length) {
+ return UnsignedBytes.lexicographicalComparator().compare(this.data, other.data);
+ } else {
+ int minLen = Math.min(this.length(), other.length());
- for (int i = 0; i < minLen; i++) {
- int a = (this.byteAt(i) & 0xff);
- int b = (other.byteAt(i) & 0xff);
+ for (int i = 0; i < minLen; i++) {
+ int a = (this.byteAt(i) & 0xff);
+ int b = (other.byteAt(i) & 0xff);
- if (a != b) {
- return a - b;
+ if (a != b) {
+ return a - b;
+ }
}
+ return this.length() - other.length();
}
- return this.length() - other.length();
}
/**
diff --git a/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java b/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java
index 641c9bf..720a1a3 100644
--- a/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java
+++ b/modules/api/src/test/java/org/apache/fluo/api/data/BytesTest.java
@@ -86,10 +86,37 @@
Assert.assertEquals(-1, b1.compareTo(b2));
Assert.assertEquals(1, b2.compareTo(b1));
Assert.assertEquals(0, b1.compareTo(b3));
+ Assert.assertEquals(0, b1.compareTo(b1));
Assert.assertEquals(1, b1.compareTo(Bytes.EMPTY));
}
@Test
+ public void testCompareSubsequence() {
+ Bytes b1 = Bytes.of("abcd");
+ Bytes b2 = b1.subSequence(0, 3);
+ Bytes b3 = Bytes.of("abc");
+ Bytes b4 = Bytes.of("~abcde");
+ Bytes b5 = b4.subSequence(1, 4);
+ Bytes b6 = b4.subSequence(1, 5);
+
+ for (Bytes ba : Arrays.asList(b2, b3, b5, b1.subSequence(0, 3))) {
+ for (Bytes bb : Arrays.asList(b2, b3, b5)) {
+ Assert.assertEquals(0, ba.compareTo(bb));
+ }
+ }
+
+ Assert.assertEquals(1, b1.compareTo(b2));
+ Assert.assertEquals(-1, b2.compareTo(b1));
+
+ for (Bytes less : Arrays.asList(b2, b3, b5)) {
+ for (Bytes greater : Arrays.asList(b1, b4, b6)) {
+ Assert.assertTrue(less.compareTo(greater) < 0);
+ Assert.assertTrue(greater.compareTo(less) > 0);
+ }
+ }
+ }
+
+ @Test
public void testToByteBuffer() {
Bytes b1 = Bytes.of("fluofluo");
ByteBuffer buffer = b1.toByteBuffer();
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
index 8639fbc..3af4fdc 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/CollisionIT.java
@@ -42,7 +42,9 @@
import org.apache.fluo.integration.ITBaseMini;
import org.apache.fluo.integration.TestUtil;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
/**
* Run end to end test with lots of collisions and verify the following :
@@ -61,6 +63,9 @@
private static final Column STAT_CHANGED = new Column("stat", "changed");
private static final Column STAT_PROCESSED = new Column("stat", "processed");
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(60);
+
private static class NumLoader implements Loader {
int num;
@@ -130,6 +135,8 @@
miniFluo.waitForObservers();
+ long recentTS;
+
try (Snapshot snapshot = client.newSnapshot()) {
for (int i = 0; i < expectedCounts.length; i++) {
@@ -144,16 +151,15 @@
String allTotal = snapshot.gets("all", STAT_TOTAL);
Assert.assertNotNull(allTotal);
Assert.assertEquals(1000, Integer.parseInt(allTotal));
+
+ recentTS = snapshot.getStartTimestamp();
}
long oldestTS = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
- while (true) {
+
+ while (oldestTS < recentTS) {
UtilWaitThread.sleep(300);
- long tmp = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
- if (oldestTS == tmp) {
- break;
- }
- oldestTS = tmp;
+ oldestTS = ZookeeperUtil.getGcTimestamp(config.getAppZookeepers());
}
conn.tableOperations().compact(getCurTableName(), null, null, true, true);
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
index 91636f7..e543c3c 100644
--- a/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/OracleIT.java
@@ -36,7 +36,9 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.thrift.server.THsHaServer;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -44,6 +46,9 @@
public class OracleIT extends ITBaseImpl {
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(60);
+
@Test
public void testRestart() throws Exception {
OracleClient client = env.getSharedResources().getOracleClient();
@@ -225,10 +230,8 @@
oserver.stop();
sleepWhileConnected(oserver);
- int count = 0;
- while (count < 5 && client.getOracle() != null) {
- Thread.sleep(1000);
- count++;
+ while (client.getOracle() != null) {
+ Thread.sleep(100);
}
assertNull(client.getOracle());