Merge branch 'fluo-430'
diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
index a09ac3b..ec2a83f 100644
--- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
+++ b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/iterators/TimestampSkippingIterator.java
@@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
@@ -50,9 +51,12 @@
private boolean inclusive;
private boolean hasSeeked = false;
+ private boolean removedDeletingIterator = false;
+ private int removalFailures = 0;
+
private static final Logger log = LoggerFactory.getLogger(TimestampSkippingIterator.class);
- TimestampSkippingIterator(SortedKeyValueIterator<Key, Value> source) {
+ public TimestampSkippingIterator(SortedKeyValueIterator<Key, Value> source) {
this.source = source;
}
@@ -78,7 +82,7 @@
while (source.hasTop()
&& curCol.equals(source.getTopKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS)
&& timestamp < source.getTopKey().getTimestamp()) {
- if (count == 10) {
+ if (count == 10 && shouldSeek()) {
// seek to prefix
Key seekKey = new Key(curCol);
seekKey.setTimestamp(timestamp);
@@ -148,20 +152,23 @@
}
}
- private static void setParent(SortedKeyValueIterator<Key, Value> iter,
+ private static boolean setParent(SortedKeyValueIterator<Key, Value> iter,
SortedKeyValueIterator<Key, Value> newParent) {
try {
if (iter instanceof WrappingIterator) {
Field field = WrappingIterator.class.getDeclaredField("source");
field.setAccessible(true);
field.set(iter, newParent);
+ return true;
}
} catch (NoSuchFieldException | IllegalArgumentException | IllegalAccessException e) {
log.debug(e.getMessage(), e);
}
+
+ return false;
}
- private static void removeDeletingIterator(SortedKeyValueIterator<Key, Value> source) {
+ private static boolean removeDeletingIterator(SortedKeyValueIterator<Key, Value> source) {
SortedKeyValueIterator<Key, Value> prev = source;
SortedKeyValueIterator<Key, Value> parent = getParent(source);
@@ -174,9 +181,21 @@
if (parent != null && parent instanceof DeletingIterator) {
SortedKeyValueIterator<Key, Value> delParent = getParent(parent);
if (delParent != null) {
- setParent(prev, delParent);
+ return setParent(prev, delParent);
}
}
+
+ return false;
+ }
+
+ @VisibleForTesting
+ public final boolean shouldSeek() {
+ /*
+ * This method is a saftey check to ensure the deleting iterator was removed. If this iterator
+ * was not removed for some reason, then the performance of seeking will be O(N^2). In the case
+ * where its not removed, it would be better to just scan forward.
+ */
+ return !hasSeeked || removedDeletingIterator || removalFailures < 3;
}
private void seek(Range range) throws IOException {
@@ -185,7 +204,10 @@
// up iterators until the 1st seek. Therefore can only remove the deleting iter after the 1st
// seek. Also, Accumulo may switch data sources and re-setup the deleting iterator, thats why
// this iterator keeps trying to remove it.
- removeDeletingIterator(source);
+ removedDeletingIterator |= removeDeletingIterator(source);
+ if (!removedDeletingIterator) {
+ removalFailures++;
+ }
}
source.seek(range, fams, inclusive);
hasSeeked = true;
diff --git a/modules/api/src/test/java/org/apache/fluo/api/data/ColumnValueTest.java b/modules/api/src/test/java/org/apache/fluo/api/data/ColumnValueTest.java
new file mode 100644
index 0000000..c88d8a3
--- /dev/null
+++ b/modules/api/src/test/java/org/apache/fluo/api/data/ColumnValueTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.api.data;
+
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnValueTest {
+
+
+
+ @Test
+ public void testEquals() {
+ Column c1 = new Column("f1", "q1");
+ Column c2 = new Column("f2", "q1");
+ Column c3 = new Column("f1", "q2");
+ Column c4 = new Column("f2", "q2");
+
+ ColumnValue cv = new ColumnValue(c1, "v1");
+
+ Assert.assertEquals(cv, cv);
+ Assert.assertEquals(cv.hashCode(), cv.hashCode());
+ Assert.assertEquals(new ColumnValue(c1, "v1"), cv);
+ Assert.assertEquals(new ColumnValue(c1, "v1").hashCode(), cv.hashCode());
+ Assert.assertNotEquals(new ColumnValue(c1, "v2"), cv);
+ Assert.assertNotEquals(new ColumnValue(c1, "v2").hashCode(), cv.hashCode());
+
+ for (Column c : Arrays.asList(c2, c3, c4)) {
+ for (String v : Arrays.asList("v1", "v2")) {
+ ColumnValue ocv = new ColumnValue(c, v);
+ Assert.assertNotEquals(ocv, cv);
+ Assert.assertNotEquals(ocv.hashCode(), cv.hashCode());
+ }
+ }
+
+ Assert.assertNotEquals(cv, c1);
+ Assert.assertNotEquals(cv, "v1");
+ }
+
+ @Test
+ public void testCompare() {
+ Column c1 = new Column("f1", "q1");
+ Column c2 = new Column("f2", "q1");
+
+ ColumnValue cv1 = new ColumnValue(c1, "v1");
+ ColumnValue cv2 = new ColumnValue(c2, "v1");
+ ColumnValue cv3 = new ColumnValue(c1, "v2");
+ ColumnValue cv4 = new ColumnValue(c1, "v1");
+
+ Assert.assertTrue(cv1.compareTo(cv1) == 0);
+ Assert.assertTrue(cv1.compareTo(cv4) == 0);
+ Assert.assertTrue(cv1.compareTo(cv2) < 0);
+ Assert.assertTrue(cv2.compareTo(cv1) > 0);
+ Assert.assertTrue(cv1.compareTo(cv3) < 0);
+ Assert.assertTrue(cv3.compareTo(cv1) > 0);
+ }
+
+ @Test
+ public void testGet() {
+ Column c1 = new Column("f1", "q1");
+ ColumnValue cv1 = new ColumnValue(c1, "v1");
+
+ Assert.assertEquals("v1", cv1.getsValue());
+ Assert.assertEquals("v1", cv1.getValue().toString());
+ Assert.assertSame(c1, cv1.getColumn());
+ }
+}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
new file mode 100644
index 0000000..aa8b2ef
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/Skip100StampsIterator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.accumulo;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.fluo.accumulo.iterators.TimestampSkippingIterator;
+
+public class Skip100StampsIterator implements SortedKeyValueIterator<Key, Value> {
+
+ private TimestampSkippingIterator source;
+ private boolean hasTop;
+ private int goodVal;
+ private int goodSeek;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options,
+ IteratorEnvironment env) throws IOException {
+
+ this.source = new TimestampSkippingIterator(source);
+
+ }
+
+ @Override
+ public boolean hasTop() {
+ return hasTop;
+ }
+
+ @Override
+ public void next() throws IOException {
+ hasTop = false;
+ }
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
+ throws IOException {
+
+ source.seek(range, columnFamilies, inclusive);
+
+ Key k = new Key("r1", "f1", "q1");
+
+ long ts = 99999;
+ goodVal = 0;
+ hasTop = true;
+
+ /*
+ * If the TimestampSkippingIterator is not able to remove the DeletingIterator, then the
+ * following loop will have O(N^2) performance. This happens because every time the following
+ * loop seeks forward a little bit, the DeletingIterator scans from the start of the column
+ * looking for deletes. I manually commented out the code that removes the DeletingIterator and
+ * the following code took 50 to 100 times longer to run.
+ */
+
+ while (source.hasTop() && ts > 0) {
+ source.skipToTimestamp(k, ts);
+ if (source.hasTop()) {
+ if (source.getTopValue().toString().equals("v" + ts)) {
+ goodVal++;
+ }
+
+ if (source.shouldSeek()) {
+ goodSeek++;
+ }
+ }
+
+ ts -= 100;
+ }
+ }
+
+ @Override
+ public Key getTopKey() {
+ return new Key("r1", "f1", "q1", 42);
+ }
+
+ @Override
+ public Value getTopValue() {
+ return new Value(("" + goodVal + " " + goodSeek).getBytes());
+ }
+
+ @Override
+ public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
new file mode 100644
index 0000000..68bb138
--- /dev/null
+++ b/modules/integration/src/test/java/org/apache/fluo/integration/accumulo/TimeskippingIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.integration.accumulo;
+
+import com.google.common.collect.Iterables;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.integration.ITBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimeskippingIT extends ITBase {
+
+ private static final Logger log = LoggerFactory.getLogger(TimeskippingIT.class);
+
+ @Test
+ public void testTimestampSkippingIterPerformance() throws Exception {
+
+ conn.tableOperations().create("ttsi", false);
+
+ BatchWriter bw = conn.createBatchWriter("ttsi", new BatchWriterConfig());
+ Mutation m = new Mutation("r1");
+ for (int i = 0; i < 100000; i++) {
+ m.put("f1", "q1", i, "v" + i);
+ }
+
+ bw.addMutation(m);
+ bw.close();
+
+ long t2 = System.currentTimeMillis();
+
+ Scanner scanner = conn.createScanner("ttsi", Authorizations.EMPTY);
+ scanner.addScanIterator(new IteratorSetting(10, Skip100StampsIterator.class));
+
+ Assert.assertEquals("999 1000", Iterables.getOnlyElement(scanner).getValue().toString());
+ long t3 = System.currentTimeMillis();
+
+ if (t3 - t2 > 3000) {
+ log.warn("Timestamp skipping iterator took longer than expected " + (t3 - t2));
+ }
+
+ conn.tableOperations().flush("ttsi", null, null, true);
+
+ long t4 = System.currentTimeMillis();
+ Assert.assertEquals("999 1000", Iterables.getOnlyElement(scanner).getValue().toString());
+ long t5 = System.currentTimeMillis();
+
+ if (t5 - t4 > 3000) {
+ log.warn("Timestamp skipping iterator took longer than expected " + (t5 - t4));
+ }
+ }
+}