Use new Fluo package names
diff --git a/bin/diff.sh b/bin/diff.sh
new file mode 100755
index 0000000..929d0a2
--- /dev/null
+++ b/bin/diff.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+. $BIN_DIR/load-env.sh
+
+
+yarn jar $STRESS_JAR io.fluo.stress.trie.Diff $FLUO_PROPS $@
+
diff --git a/pom.xml b/pom.xml
index bdd119a..4fa733c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,7 @@
<properties>
<accumulo.version>1.6.4</accumulo.version>
<hadoop.version>2.6.3</hadoop.version>
- <fluo.version>1.0.0-beta-2</fluo.version>
+ <fluo.version>1.0.0-beta-3-SNAPSHOT</fluo.version>
<slf4j.version>1.7.12</slf4j.version>
</properties>
@@ -110,17 +110,17 @@
<dependencies>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-api</artifactId>
<version>${fluo.version}</version>
</dependency>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-core</artifactId>
<version>${fluo.version}</version>
</dependency>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-mapreduce</artifactId>
<version>${fluo.version}</version>
</dependency>
@@ -174,14 +174,14 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-integration</artifactId>
<version>${fluo.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-mini</artifactId>
<version>${fluo.version}</version>
<scope>test</scope>
diff --git a/src/main/java/io/fluo/stress/trie/CompactLL.java b/src/main/java/io/fluo/stress/trie/CompactLL.java
index 722a547..30cd9c6 100644
--- a/src/main/java/io/fluo/stress/trie/CompactLL.java
+++ b/src/main/java/io/fluo/stress/trie/CompactLL.java
@@ -2,11 +2,11 @@
import java.io.File;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.core.util.AccumuloUtil;
import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.hadoop.io.Text;
/**
diff --git a/src/main/java/io/fluo/stress/trie/Constants.java b/src/main/java/io/fluo/stress/trie/Constants.java
index 8acf8ad..368ffd9 100644
--- a/src/main/java/io/fluo/stress/trie/Constants.java
+++ b/src/main/java/io/fluo/stress/trie/Constants.java
@@ -15,9 +15,9 @@
*/
package io.fluo.stress.trie;
-import io.fluo.api.data.Column;
-import io.fluo.api.types.StringEncoder;
-import io.fluo.api.types.TypeLayer;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.types.StringEncoder;
+import org.apache.fluo.api.types.TypeLayer;
/**
*
diff --git a/src/main/java/io/fluo/stress/trie/Diff.java b/src/main/java/io/fluo/stress/trie/Diff.java
new file mode 100644
index 0000000..2e4eed6
--- /dev/null
+++ b/src/main/java/io/fluo/stress/trie/Diff.java
@@ -0,0 +1,115 @@
+package io.fluo.stress.trie;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
+
+public class Diff {
+ public static Map<String, Long> getRootCount(FluoClient client, Snapshot snap, int level,
+ int stopLevel, int nodeSize) throws Exception {
+ ScannerConfiguration scanConfig = new ScannerConfiguration();
+ scanConfig.setSpan(Span.prefix(String.format("%02d:", level)));
+ scanConfig.fetchColumn(Constants.COUNT_SEEN_COL.getFamily(),
+ Constants.COUNT_SEEN_COL.getQualifier());
+ scanConfig.fetchColumn(Constants.COUNT_WAIT_COL.getFamily(),
+ Constants.COUNT_WAIT_COL.getQualifier());
+
+ RowIterator rowIter = snap.get(scanConfig);
+
+ HashMap<String, Long> counts = new HashMap<>();
+
+ while (rowIter.hasNext()) {
+ Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
+ String row = rowEntry.getKey().toString();
+ Node node = new Node(row);
+
+ while (node.getLevel() > stopLevel) {
+ node = node.getParent();
+ }
+
+ String stopRow = node.getRowId();
+ long count = counts.getOrDefault(stopRow, 0L);
+
+ if (node.getNodeSize() == nodeSize) {
+ ColumnIterator colIter = rowEntry.getValue();
+ while (colIter.hasNext()) {
+ Entry<Column, Bytes> colEntry = colIter.next();
+ count += Long.parseLong(colEntry.getValue().toString());
+ }
+ } else {
+ throw new RuntimeException("TODO");
+ }
+
+ counts.put(stopRow, count);
+ }
+
+ return counts;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ if (args.length != 1) {
+ System.err.println("Usage: " + Print.class.getSimpleName() + " <fluo props>");
+ System.exit(-1);
+ }
+
+ FluoConfiguration config = new FluoConfiguration(new File(args[0]));
+
+ try (FluoClient client = FluoFactory.newClient(config); Snapshot snap = client.newSnapshot()) {
+
+ int stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
+ int nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
+
+ Map<String, Long> rootCounts = getRootCount(client, snap, stopLevel, stopLevel, nodeSize);
+ ArrayList<String> rootRows = new ArrayList<>(rootCounts.keySet());
+ Collections.sort(rootRows);
+
+ // TODO 8
+ for (int level = stopLevel + 1; level <= 8; level++) {
+ System.out.printf("Level %d:\n",level);
+
+ Map<String, Long> counts = getRootCount(client, snap, level, stopLevel, nodeSize);
+
+ long sum = 0;
+
+ for (String row : rootRows) {
+ long c1 = rootCounts.get(row);
+ long c2 = counts.getOrDefault(row, -1L);
+
+ if (c1 != c2) {
+ System.out.printf("\tdiff: %s %d %d\n", row, c1, c2);
+ }
+
+ if(c2 > 0){
+ sum += c2;
+ }
+ }
+
+ HashSet<String> extras = new HashSet<>(counts.keySet());
+ extras.removeAll(rootCounts.keySet());
+
+ for (String row : extras) {
+ long c = counts.get(row);
+ System.out.printf("\textra: %s %d\n", row, c);
+ }
+
+ System.out.println("\tsum "+sum);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/fluo/stress/trie/Generate.java b/src/main/java/io/fluo/stress/trie/Generate.java
index 7181e9b..4388c5c 100644
--- a/src/main/java/io/fluo/stress/trie/Generate.java
+++ b/src/main/java/io/fluo/stress/trie/Generate.java
@@ -22,7 +22,7 @@
import java.util.Random;
import com.google.common.base.Preconditions;
-import io.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
diff --git a/src/main/java/io/fluo/stress/trie/Init.java b/src/main/java/io/fluo/stress/trie/Init.java
index bfa9a4c..581dcde 100644
--- a/src/main/java/io/fluo/stress/trie/Init.java
+++ b/src/main/java/io/fluo/stress/trie/Init.java
@@ -22,18 +22,18 @@
import java.io.OutputStream;
import java.util.Collection;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.core.util.AccumuloUtil;
-import io.fluo.mapreduce.FluoKeyValue;
-import io.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.commons.codec.binary.Base64;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.AccumuloUtil;
+import org.apache.fluo.mapreduce.FluoKeyValue;
+import org.apache.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/src/main/java/io/fluo/stress/trie/Load.java b/src/main/java/io/fluo/stress/trie/Load.java
index 47fde85..8027299 100644
--- a/src/main/java/io/fluo/stress/trie/Load.java
+++ b/src/main/java/io/fluo/stress/trie/Load.java
@@ -19,10 +19,10 @@
import java.io.File;
import java.io.IOException;
-import io.fluo.api.client.Loader;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.mapreduce.FluoOutputFormat;
import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.mapreduce.FluoOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
diff --git a/src/main/java/io/fluo/stress/trie/NodeObserver.java b/src/main/java/io/fluo/stress/trie/NodeObserver.java
index 068b694..3c0f67e 100644
--- a/src/main/java/io/fluo/stress/trie/NodeObserver.java
+++ b/src/main/java/io/fluo/stress/trie/NodeObserver.java
@@ -15,12 +15,12 @@
import java.util.Map;
-import io.fluo.api.client.TransactionBase;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Column;
-import io.fluo.api.observer.AbstractObserver;
-import io.fluo.api.types.TypedSnapshotBase.Value;
-import io.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.types.TypedSnapshotBase.Value;
+import org.apache.fluo.api.types.TypedTransactionBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/io/fluo/stress/trie/NumberLoader.java b/src/main/java/io/fluo/stress/trie/NumberLoader.java
index c34ae74..b5e9ddc 100644
--- a/src/main/java/io/fluo/stress/trie/NumberLoader.java
+++ b/src/main/java/io/fluo/stress/trie/NumberLoader.java
@@ -15,11 +15,11 @@
import java.util.Map;
-import io.fluo.api.client.Loader;
-import io.fluo.api.client.TransactionBase;
-import io.fluo.api.data.Column;
-import io.fluo.api.types.TypedSnapshotBase.Value;
-import io.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.types.TypedSnapshotBase.Value;
+import org.apache.fluo.api.types.TypedTransactionBase;
import static com.google.common.base.Preconditions.checkArgument;
diff --git a/src/main/java/io/fluo/stress/trie/Print.java b/src/main/java/io/fluo/stress/trie/Print.java
index 25a50be..fcc5d1a 100644
--- a/src/main/java/io/fluo/stress/trie/Print.java
+++ b/src/main/java/io/fluo/stress/trie/Print.java
@@ -19,17 +19,17 @@
import java.io.File;
import java.util.Map.Entry;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Column;
-import io.fluo.api.data.Span;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
import org.apache.commons.configuration.Configuration;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.api.iterator.ColumnIterator;
+import org.apache.fluo.api.iterator.RowIterator;
public class Print {
diff --git a/src/main/java/io/fluo/stress/trie/Split.java b/src/main/java/io/fluo/stress/trie/Split.java
index d21124c..ca8de57 100644
--- a/src/main/java/io/fluo/stress/trie/Split.java
+++ b/src/main/java/io/fluo/stress/trie/Split.java
@@ -18,13 +18,13 @@
import java.util.TreeSet;
import com.google.common.base.Strings;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.core.util.AccumuloUtil;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.hadoop.io.Text;
public class Split {
diff --git a/src/test/java/io/fluo/stress/TrieBasicIT.java b/src/test/java/io/fluo/stress/TrieBasicIT.java
index f7ccac2..0f84110 100644
--- a/src/test/java/io/fluo/stress/TrieBasicIT.java
+++ b/src/test/java/io/fluo/stress/TrieBasicIT.java
@@ -21,16 +21,17 @@
import java.util.Random;
import java.util.Set;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.types.TypedSnapshot;
-import io.fluo.core.client.LoaderExecutorImpl;
-import io.fluo.core.impl.Environment;
-import io.fluo.integration.ITBaseMini;
import io.fluo.stress.trie.Constants;
import io.fluo.stress.trie.Node;
import io.fluo.stress.trie.NodeObserver;
import io.fluo.stress.trie.NumberLoader;
import org.apache.commons.configuration.Configuration;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.types.TypedSnapshot;
+import org.apache.fluo.integration.ITBaseMini;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -39,56 +40,56 @@
import static io.fluo.stress.trie.Constants.COUNT_SEEN_COL;
import static io.fluo.stress.trie.Constants.TYPEL;
-/**
+/**
* Tests Trie Stress Test using Basic Loader
*/
public class TrieBasicIT extends ITBaseMini {
-
+
private static final Logger log = LoggerFactory.getLogger(TrieBasicIT.class);
-
+
@Override
protected List<ObserverConfiguration> getObservers() {
return Collections.singletonList(new ObserverConfiguration(NodeObserver.class.getName()));
}
-
+
@Override
protected void setAppConfig(Configuration config){
config.setProperty(Constants.STOP_LEVEL_PROP, 0);
}
-
+
@Test
public void testBit32() throws Exception {
runTrieTest(20, Integer.MAX_VALUE, 32);
}
-
+
@Test
public void testBit8() throws Exception {
runTrieTest(25, Integer.MAX_VALUE, 8);
}
-
+
@Test
public void testBit4() throws Exception {
runTrieTest(10, Integer.MAX_VALUE, 4);
}
-
+
@Test
public void testBit() throws Exception {
runTrieTest(5, Integer.MAX_VALUE, 1);
}
-
+
@Test
public void testDuplicates() throws Exception {
runTrieTest(20, 10, 4);
}
-
+
private void runTrieTest(int ingestNum, int maxValue, int nodeSize) throws Exception {
-
+
log.info("Ingesting "+ingestNum+" unique numbers with a nodeSize of "+nodeSize+" bits");
-
+
config.setLoaderThreads(0);
config.setLoaderQueueSize(0);
-
- try(Environment env = new Environment(config); LoaderExecutorImpl le = new LoaderExecutorImpl(config, env)) {
+
+ try(FluoClient fluoClient = FluoFactory.newClient(config); LoaderExecutor le = client.newLoaderExecutor()) {
Random random = new Random();
Set<Integer> ingested = new HashSet<>();
for (int i = 0; i < ingestNum; i++) {
diff --git a/src/test/java/io/fluo/stress/TrieMapRedIT.java b/src/test/java/io/fluo/stress/TrieMapRedIT.java
index 505c14a..c720dee 100644
--- a/src/test/java/io/fluo/stress/TrieMapRedIT.java
+++ b/src/test/java/io/fluo/stress/TrieMapRedIT.java
@@ -23,8 +23,6 @@
import java.util.Collections;
import java.util.List;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.integration.ITBaseMini;
import io.fluo.stress.trie.Constants;
import io.fluo.stress.trie.Generate;
import io.fluo.stress.trie.Init;
@@ -35,15 +33,17 @@
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.integration.ITBaseMini;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
-/**
+/**
* Tests Trie Stress Test using MapReduce Ingest
*/
public class TrieMapRedIT extends ITBaseMini {
-
+
@Override
protected List<ObserverConfiguration> getObservers() {
return Collections.singletonList(new ObserverConfiguration(NodeObserver.class.getName()));
@@ -54,7 +54,7 @@
config.setProperty(Constants.STOP_LEVEL_PROP, 0);
config.setProperty(Constants.NODE_SIZE_PROP, 8);
}
-
+
static void generate(int numMappers, int numPerMapper, int max, File out1) throws Exception {
int ret = ToolRunner.run(new Generate(), new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///", "" + numMappers,
numPerMapper + "", max + "", out1.toURI().toString()});
@@ -127,7 +127,7 @@
miniFluo.waitForObservers();
Assert.assertEquals(new Print.Stats(0, ucount2, false), Print.getStats(config));
-
+
File out3 = new File(testDir, "nums-3");
generate(2, 100, 500, out3);
load(8, fluoPropsFile, out3);
diff --git a/src/test/java/io/fluo/stress/TrieStopLevelIT.java b/src/test/java/io/fluo/stress/TrieStopLevelIT.java
index 5569e45..610fe53 100644
--- a/src/test/java/io/fluo/stress/TrieStopLevelIT.java
+++ b/src/test/java/io/fluo/stress/TrieStopLevelIT.java
@@ -19,13 +19,13 @@
import java.util.Collections;
import java.util.List;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.data.Bytes;
import io.fluo.stress.trie.Constants;
import io.fluo.stress.trie.Node;
import io.fluo.stress.trie.NodeObserver;
import org.apache.commons.configuration.Configuration;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.data.Bytes;
import org.junit.Assert;
import org.junit.Test;