Merge pull request #40 from keith-turner/use-apache-fluo
Use apache fluo
diff --git a/bin/diff.sh b/bin/diff.sh
new file mode 100755
index 0000000..929d0a2
--- /dev/null
+++ b/bin/diff.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+
+BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+. $BIN_DIR/load-env.sh
+
+
+yarn jar $STRESS_JAR io.fluo.stress.trie.Diff $FLUO_PROPS $@
+
diff --git a/pom.xml b/pom.xml
index bdd119a..f64905b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,8 @@
<properties>
<accumulo.version>1.6.4</accumulo.version>
<hadoop.version>2.6.3</hadoop.version>
- <fluo.version>1.0.0-beta-2</fluo.version>
+ <fluo.version>1.0.0-incubating-SNAPSHOT</fluo.version>
+ <fluo-recipes.version>1.0.0-beta-3-SNAPSHOT</fluo-recipes.version>
<slf4j.version>1.7.12</slf4j.version>
</properties>
@@ -110,21 +111,26 @@
<dependencies>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-api</artifactId>
<version>${fluo.version}</version>
</dependency>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-core</artifactId>
<version>${fluo.version}</version>
</dependency>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-mapreduce</artifactId>
<version>${fluo.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-recipes-core</artifactId>
+ <version>${fluo-recipes.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
@@ -174,14 +180,14 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-integration</artifactId>
<version>${fluo.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.fluo</groupId>
+ <groupId>org.apache.fluo</groupId>
<artifactId>fluo-mini</artifactId>
<version>${fluo.version}</version>
<scope>test</scope>
diff --git a/src/main/java/io/fluo/stress/trie/CompactLL.java b/src/main/java/io/fluo/stress/trie/CompactLL.java
index 722a547..30cd9c6 100644
--- a/src/main/java/io/fluo/stress/trie/CompactLL.java
+++ b/src/main/java/io/fluo/stress/trie/CompactLL.java
@@ -2,11 +2,11 @@
import java.io.File;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.core.util.AccumuloUtil;
import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.hadoop.io.Text;
/**
diff --git a/src/main/java/io/fluo/stress/trie/Constants.java b/src/main/java/io/fluo/stress/trie/Constants.java
index 8acf8ad..4e6ccb5 100644
--- a/src/main/java/io/fluo/stress/trie/Constants.java
+++ b/src/main/java/io/fluo/stress/trie/Constants.java
@@ -1,34 +1,32 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress.trie;
-import io.fluo.api.data.Column;
-import io.fluo.api.types.StringEncoder;
-import io.fluo.api.types.TypeLayer;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.types.StringEncoder;
+import org.apache.fluo.recipes.core.types.TypeLayer;
/**
*
*/
public class Constants {
-
+
public static final TypeLayer TYPEL = new TypeLayer(new StringEncoder());
- public static final Column COUNT_SEEN_COL = TYPEL.bc().fam("count").qual("seen").vis();
- public static final Column COUNT_WAIT_COL = TYPEL.bc().fam("count").qual("wait").vis();
-
+ public static final Column COUNT_SEEN_COL = new Column("count", "seen");
+ public static final Column COUNT_WAIT_COL = new Column("count", "wait");
+
public static final String NODE_SIZE_PROP = "trie.nodeSize";
public static final String STOP_LEVEL_PROP = "trie.stopLevel";
}
diff --git a/src/main/java/io/fluo/stress/trie/Diff.java b/src/main/java/io/fluo/stress/trie/Diff.java
new file mode 100644
index 0000000..aacf7dd
--- /dev/null
+++ b/src/main/java/io/fluo/stress/trie/Diff.java
@@ -0,0 +1,104 @@
+package io.fluo.stress.trie;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+
+public class Diff {
+ public static Map<String, Long> getRootCount(FluoClient client, Snapshot snap, int level,
+ int stopLevel, int nodeSize) throws Exception {
+
+ HashMap<String, Long> counts = new HashMap<>();
+
+ RowScanner rows = snap.scanner().over(Span.prefix(String.format("%02d:", level)))
+ .fetch(Constants.COUNT_SEEN_COL, Constants.COUNT_WAIT_COL).byRow().build();
+
+ for (ColumnScanner columns : rows) {
+ String row = columns.getsRow();
+ Node node = new Node(row);
+
+ while (node.getLevel() > stopLevel) {
+ node = node.getParent();
+ }
+
+ String stopRow = node.getRowId();
+ long count = counts.getOrDefault(stopRow, 0L);
+
+ if (node.getNodeSize() == nodeSize) {
+ for (ColumnValue colVal : columns) {
+ count += Long.parseLong(colVal.getsValue());
+ }
+ } else {
+ throw new RuntimeException("TODO");
+ }
+
+ counts.put(stopRow, count);
+ }
+
+ return counts;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ if (args.length != 1) {
+ System.err.println("Usage: " + Diff.class.getSimpleName() + " <fluo props>");
+ System.exit(-1);
+ }
+
+ FluoConfiguration config = new FluoConfiguration(new File(args[0]));
+
+ try (FluoClient client = FluoFactory.newClient(config); Snapshot snap = client.newSnapshot()) {
+
+ int stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
+ int nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
+
+ Map<String, Long> rootCounts = getRootCount(client, snap, stopLevel, stopLevel, nodeSize);
+ ArrayList<String> rootRows = new ArrayList<>(rootCounts.keySet());
+ Collections.sort(rootRows);
+
+ // TODO 8
+ for (int level = stopLevel + 1; level <= 8; level++) {
+ System.out.printf("Level %d:\n", level);
+
+ Map<String, Long> counts = getRootCount(client, snap, level, stopLevel, nodeSize);
+
+ long sum = 0;
+
+ for (String row : rootRows) {
+ long c1 = rootCounts.get(row);
+ long c2 = counts.getOrDefault(row, -1L);
+
+ if (c1 != c2) {
+ System.out.printf("\tdiff: %s %d %d\n", row, c1, c2);
+ }
+
+ if (c2 > 0) {
+ sum += c2;
+ }
+ }
+
+ HashSet<String> extras = new HashSet<>(counts.keySet());
+ extras.removeAll(rootCounts.keySet());
+
+ for (String row : extras) {
+ long c = counts.get(row);
+ System.out.printf("\textra: %s %d\n", row, c);
+ }
+
+ System.out.println("\tsum " + sum);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/fluo/stress/trie/Generate.java b/src/main/java/io/fluo/stress/trie/Generate.java
index 7181e9b..4388c5c 100644
--- a/src/main/java/io/fluo/stress/trie/Generate.java
+++ b/src/main/java/io/fluo/stress/trie/Generate.java
@@ -22,7 +22,7 @@
import java.util.Random;
import com.google.common.base.Preconditions;
-import io.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
diff --git a/src/main/java/io/fluo/stress/trie/Init.java b/src/main/java/io/fluo/stress/trie/Init.java
index bfa9a4c..581dcde 100644
--- a/src/main/java/io/fluo/stress/trie/Init.java
+++ b/src/main/java/io/fluo/stress/trie/Init.java
@@ -22,18 +22,18 @@
import java.io.OutputStream;
import java.util.Collection;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.core.util.AccumuloUtil;
-import io.fluo.mapreduce.FluoKeyValue;
-import io.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.commons.codec.binary.Base64;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.AccumuloUtil;
+import org.apache.fluo.mapreduce.FluoKeyValue;
+import org.apache.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/src/main/java/io/fluo/stress/trie/Load.java b/src/main/java/io/fluo/stress/trie/Load.java
index 47fde85..b6cb7a8 100644
--- a/src/main/java/io/fluo/stress/trie/Load.java
+++ b/src/main/java/io/fluo/stress/trie/Load.java
@@ -1,17 +1,15 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress.trie;
@@ -19,10 +17,9 @@
import java.io.File;
import java.io.IOException;
-import io.fluo.api.client.Loader;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.mapreduce.FluoOutputFormat;
-import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.mapreduce.FluoOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -39,10 +36,11 @@
private static final Logger log = LoggerFactory.getLogger(Load.class);
- public static class LoadMapper extends Mapper<LongWritable,NullWritable,Loader,NullWritable> {
+ public static class LoadMapper extends Mapper<LongWritable, NullWritable, Loader, NullWritable> {
@Override
- protected void map(LongWritable key, NullWritable val, Context context) throws IOException, InterruptedException {
+ protected void map(LongWritable key, NullWritable val, Context context)
+ throws IOException, InterruptedException {
context.write(new NumberLoader(key.get()), val);
}
}
@@ -61,7 +59,7 @@
Job job = Job.getInstance(getConf());
job.setJobName(Load.class.getName());
-
+
job.setJarByClass(Load.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -72,7 +70,7 @@
job.setNumReduceTasks(0);
job.setOutputFormatClass(FluoOutputFormat.class);
- FluoOutputFormat.configure(job, ConfigurationConverter.getProperties(props));
+ FluoOutputFormat.configure(job, props);
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
diff --git a/src/main/java/io/fluo/stress/trie/NodeObserver.java b/src/main/java/io/fluo/stress/trie/NodeObserver.java
index 068b694..f84249b 100644
--- a/src/main/java/io/fluo/stress/trie/NodeObserver.java
+++ b/src/main/java/io/fluo/stress/trie/NodeObserver.java
@@ -15,12 +15,12 @@
import java.util.Map;
-import io.fluo.api.client.TransactionBase;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Column;
-import io.fluo.api.observer.AbstractObserver;
-import io.fluo.api.types.TypedSnapshotBase.Value;
-import io.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.recipes.core.types.TypedSnapshotBase.Value;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/io/fluo/stress/trie/NumberLoader.java b/src/main/java/io/fluo/stress/trie/NumberLoader.java
index c34ae74..492556b 100644
--- a/src/main/java/io/fluo/stress/trie/NumberLoader.java
+++ b/src/main/java/io/fluo/stress/trie/NumberLoader.java
@@ -15,11 +15,11 @@
import java.util.Map;
-import io.fluo.api.client.Loader;
-import io.fluo.api.client.TransactionBase;
-import io.fluo.api.data.Column;
-import io.fluo.api.types.TypedSnapshotBase.Value;
-import io.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.core.types.TypedSnapshotBase.Value;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
import static com.google.common.base.Preconditions.checkArgument;
diff --git a/src/main/java/io/fluo/stress/trie/Print.java b/src/main/java/io/fluo/stress/trie/Print.java
index 25a50be..b835450 100644
--- a/src/main/java/io/fluo/stress/trie/Print.java
+++ b/src/main/java/io/fluo/stress/trie/Print.java
@@ -1,35 +1,30 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress.trie;
import java.io.File;
-import java.util.Map.Entry;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.api.config.ScannerConfiguration;
-import io.fluo.api.data.Bytes;
-import io.fluo.api.data.Column;
-import io.fluo.api.data.Span;
-import io.fluo.api.iterator.ColumnIterator;
-import io.fluo.api.iterator.RowIterator;
-import org.apache.commons.configuration.Configuration;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
public class Print {
@@ -55,62 +50,57 @@
this.nodes = nodes;
this.sawOtherNodes = son;
}
-
+
@Override
public boolean equals(Object o) {
if (o instanceof Stats) {
Stats os = (Stats) o;
- return totalWait == os.totalWait && totalSeen == os.totalSeen && sawOtherNodes == os.sawOtherNodes;
+ return totalWait == os.totalWait && totalSeen == os.totalSeen
+ && sawOtherNodes == os.sawOtherNodes;
}
return false;
}
}
- public static Stats getStats(Configuration config) throws Exception {
+ public static Stats getStats(SimpleConfiguration config) throws Exception {
try (FluoClient client = FluoFactory.newClient(config); Snapshot snap = client.newSnapshot()) {
int level = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
int nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
-
- ScannerConfiguration scanConfig = new ScannerConfiguration();
- scanConfig.setSpan(Span.prefix(String.format("%02d:", level)));
- scanConfig.fetchColumn(Constants.COUNT_SEEN_COL.getFamily(), Constants.COUNT_SEEN_COL.getQualifier());
- scanConfig.fetchColumn(Constants.COUNT_WAIT_COL.getFamily(), Constants.COUNT_WAIT_COL.getQualifier());
- RowIterator rowIter = snap.get(scanConfig);
+ RowScanner rows = snap.scanner().over(Span.prefix(String.format("%02d:", level)))
+ .fetch(Constants.COUNT_SEEN_COL, Constants.COUNT_WAIT_COL).byRow().build();
+
long totalSeen = 0;
long totalWait = 0;
int otherNodeSizes = 0;
-
+
long nodes = 0;
-
- while (rowIter.hasNext()) {
- Entry<Bytes,ColumnIterator> rowEntry = rowIter.next();
- String row = rowEntry.getKey().toString();
+
+ for (ColumnScanner columns : rows) {
+ String row = columns.getsRow();
Node node = new Node(row);
if (node.getNodeSize() == nodeSize) {
- ColumnIterator colIter = rowEntry.getValue();
- while(colIter.hasNext()){
- Entry<Column,Bytes> colEntry = colIter.next();
- if(colEntry.getKey().equals(Constants.COUNT_SEEN_COL)){
- totalSeen += Long.parseLong(colEntry.getValue().toString());
- }else{
- totalWait += Long.parseLong(colEntry.getValue().toString());
+ for (ColumnValue cv : columns) {
+ if (cv.getColumn().equals(Constants.COUNT_SEEN_COL)) {
+ totalSeen += Long.parseLong(cv.getsValue());
+ } else {
+ totalWait += Long.parseLong(cv.getsValue());
}
}
-
+
nodes++;
} else {
otherNodeSizes++;
}
}
-
+
return new Stats(totalWait, totalSeen, nodes, otherNodeSizes != 0);
}
@@ -122,12 +112,12 @@
System.err.println("Usage: " + Print.class.getSimpleName() + " <fluo props>");
System.exit(-1);
}
-
+
Stats stats = getStats(new FluoConfiguration(new File(args[0])));
System.out.println("Total at root : " + (stats.totalSeen + stats.totalWait));
System.out.println("Nodes Scanned : " + stats.nodes);
-
+
if (stats.sawOtherNodes) {
System.err.println("WARN : Other node sizes were seen and ignored.");
}
diff --git a/src/main/java/io/fluo/stress/trie/Split.java b/src/main/java/io/fluo/stress/trie/Split.java
index d21124c..ca8de57 100644
--- a/src/main/java/io/fluo/stress/trie/Split.java
+++ b/src/main/java/io/fluo/stress/trie/Split.java
@@ -18,13 +18,13 @@
import java.util.TreeSet;
import com.google.common.base.Strings;
-import io.fluo.api.client.FluoClient;
-import io.fluo.api.client.FluoFactory;
-import io.fluo.api.config.FluoConfiguration;
-import io.fluo.core.util.AccumuloUtil;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.hadoop.io.Text;
public class Split {
diff --git a/src/test/java/io/fluo/stress/TrieBasicIT.java b/src/test/java/io/fluo/stress/TrieBasicIT.java
index f7ccac2..2155882 100644
--- a/src/test/java/io/fluo/stress/TrieBasicIT.java
+++ b/src/test/java/io/fluo/stress/TrieBasicIT.java
@@ -1,17 +1,15 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress;
@@ -21,16 +19,17 @@
import java.util.Random;
import java.util.Set;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.types.TypedSnapshot;
-import io.fluo.core.client.LoaderExecutorImpl;
-import io.fluo.core.impl.Environment;
-import io.fluo.integration.ITBaseMini;
import io.fluo.stress.trie.Constants;
import io.fluo.stress.trie.Node;
import io.fluo.stress.trie.NodeObserver;
import io.fluo.stress.trie.NumberLoader;
-import org.apache.commons.configuration.Configuration;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.client.LoaderExecutor;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.integration.ITBaseMini;
+import org.apache.fluo.recipes.core.types.TypedSnapshot;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -39,77 +38,85 @@
import static io.fluo.stress.trie.Constants.COUNT_SEEN_COL;
import static io.fluo.stress.trie.Constants.TYPEL;
-/**
+/**
* Tests Trie Stress Test using Basic Loader
*/
public class TrieBasicIT extends ITBaseMini {
-
+
private static final Logger log = LoggerFactory.getLogger(TrieBasicIT.class);
-
+
@Override
protected List<ObserverConfiguration> getObservers() {
return Collections.singletonList(new ObserverConfiguration(NodeObserver.class.getName()));
}
-
+
@Override
- protected void setAppConfig(Configuration config){
+ protected void setAppConfig(SimpleConfiguration config) {
config.setProperty(Constants.STOP_LEVEL_PROP, 0);
}
-
+
@Test
public void testBit32() throws Exception {
runTrieTest(20, Integer.MAX_VALUE, 32);
}
-
+
@Test
public void testBit8() throws Exception {
runTrieTest(25, Integer.MAX_VALUE, 8);
}
-
+
@Test
public void testBit4() throws Exception {
runTrieTest(10, Integer.MAX_VALUE, 4);
}
-
+
@Test
public void testBit() throws Exception {
runTrieTest(5, Integer.MAX_VALUE, 1);
}
-
+
@Test
public void testDuplicates() throws Exception {
runTrieTest(20, 10, 4);
}
-
+
private void runTrieTest(int ingestNum, int maxValue, int nodeSize) throws Exception {
-
- log.info("Ingesting "+ingestNum+" unique numbers with a nodeSize of "+nodeSize+" bits");
-
+
+ log.info("Ingesting " + ingestNum + " unique numbers with a nodeSize of " + nodeSize + " bits");
+
config.setLoaderThreads(0);
config.setLoaderQueueSize(0);
-
- try(Environment env = new Environment(config); LoaderExecutorImpl le = new LoaderExecutorImpl(config, env)) {
- Random random = new Random();
- Set<Integer> ingested = new HashSet<>();
- for (int i = 0; i < ingestNum; i++) {
- int num = Math.abs(random.nextInt(maxValue));
- le.execute(new NumberLoader(num, nodeSize));
- ingested.add(num);
- }
- int uniqueNum = ingested.size();
- log.info("Ingested "+uniqueNum+" unique numbers with a nodeSize of "+nodeSize+" bits");
+ try (FluoClient fluoClient = FluoFactory.newClient(config)) {
+
+ int uniqueNum;
+
+ try (LoaderExecutor le = client.newLoaderExecutor()) {
+ Random random = new Random();
+ Set<Integer> ingested = new HashSet<>();
+ for (int i = 0; i < ingestNum; i++) {
+ int num = Math.abs(random.nextInt(maxValue));
+ le.execute(new NumberLoader(num, nodeSize));
+ ingested.add(num);
+ }
+
+ uniqueNum = ingested.size();
+ log.info(
+ "Ingested " + uniqueNum + " unique numbers with a nodeSize of " + nodeSize + " bits");
+ }
miniFluo.waitForObservers();
try (TypedSnapshot tsnap = TYPEL.wrap(client.newSnapshot())) {
- Integer result = tsnap.get().row(Node.generateRootId(nodeSize)).col(COUNT_SEEN_COL).toInteger();
+ Integer result =
+ tsnap.get().row(Node.generateRootId(nodeSize)).col(COUNT_SEEN_COL).toInteger();
if (result == null) {
log.error("Could not find root node");
printSnapshot();
}
if (!result.equals(uniqueNum)) {
- log.error("Count (" + result + ") at root node does not match expected (" + uniqueNum + "):");
+ log.error(
+ "Count (" + result + ") at root node does not match expected (" + uniqueNum + "):");
printSnapshot();
}
Assert.assertEquals(uniqueNum, result.intValue());
diff --git a/src/test/java/io/fluo/stress/TrieMapRedIT.java b/src/test/java/io/fluo/stress/TrieMapRedIT.java
index 505c14a..35dceee 100644
--- a/src/test/java/io/fluo/stress/TrieMapRedIT.java
+++ b/src/test/java/io/fluo/stress/TrieMapRedIT.java
@@ -1,30 +1,24 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.integration.ITBaseMini;
import io.fluo.stress.trie.Constants;
import io.fluo.stress.trie.Generate;
import io.fluo.stress.trie.Init;
@@ -32,52 +26,57 @@
import io.fluo.stress.trie.NodeObserver;
import io.fluo.stress.trie.Print;
import io.fluo.stress.trie.Unique;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.io.FileUtils;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.integration.ITBaseMini;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
import org.junit.Test;
-/**
+/**
* Tests Trie Stress Test using MapReduce Ingest
*/
public class TrieMapRedIT extends ITBaseMini {
-
+
@Override
protected List<ObserverConfiguration> getObservers() {
return Collections.singletonList(new ObserverConfiguration(NodeObserver.class.getName()));
}
@Override
- protected void setAppConfig(Configuration config){
+ protected void setAppConfig(SimpleConfiguration config) {
config.setProperty(Constants.STOP_LEVEL_PROP, 0);
config.setProperty(Constants.NODE_SIZE_PROP, 8);
}
-
+
static void generate(int numMappers, int numPerMapper, int max, File out1) throws Exception {
- int ret = ToolRunner.run(new Generate(), new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///", "" + numMappers,
- numPerMapper + "", max + "", out1.toURI().toString()});
+ int ret = ToolRunner.run(new Generate(),
+ new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///",
+ "" + numMappers, numPerMapper + "", max + "", out1.toURI().toString()});
Assert.assertEquals(0, ret);
}
static void load(int nodeSize, File fluoPropsFile, File input) throws Exception {
- int ret = ToolRunner
- .run(new Load(), new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///", fluoPropsFile.getAbsolutePath(),
- input.toURI().toString()});
+ int ret = ToolRunner.run(new Load(), new String[] {"-D", "mapred.job.tracker=local", "-D",
+ "fs.defaultFS=file:///", fluoPropsFile.getAbsolutePath(), input.toURI().toString()});
Assert.assertEquals(0, ret);
}
static void init(int nodeSize, File fluoPropsFile, File input, File tmp) throws Exception {
- int ret = ToolRunner
- .run(new Init(), new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///", fluoPropsFile.getAbsolutePath(),
- input.toURI().toString(), tmp.toURI().toString()});
+ int ret =
+ ToolRunner
+ .run(new Init(),
+ new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///",
+ fluoPropsFile.getAbsolutePath(), input.toURI().toString(),
+ tmp.toURI().toString()});
Assert.assertEquals(0, ret);
}
static int unique(File... dirs) throws Exception {
- ArrayList<String> args = new ArrayList<>(Arrays.asList("-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///"));
+ ArrayList<String> args = new ArrayList<>(
+ Arrays.asList("-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///"));
for (File dir : dirs) {
args.add(dir.toURI().toString());
}
@@ -94,9 +93,7 @@
testDir.mkdirs();
File fluoPropsFile = new File(testDir, "fluo.props");
- BufferedWriter propOut = new BufferedWriter(new FileWriter(fluoPropsFile));
- ConfigurationConverter.getProperties(config).store(propOut, "");
- propOut.close();
+ config.save(fluoPropsFile);
File out1 = new File(testDir, "nums-1");
@@ -122,17 +119,19 @@
generate(2, 100, 500, out2);
load(8, fluoPropsFile, out2);
int ucount2 = unique(out1, out2);
- Assert.assertTrue(ucount2 > ucount); // used > because the probability that no new numbers are chosen is exceedingly small
+ Assert.assertTrue(ucount2 > ucount); // used > because the probability that no new numbers are
+ // chosen is exceedingly small
miniFluo.waitForObservers();
Assert.assertEquals(new Print.Stats(0, ucount2, false), Print.getStats(config));
-
+
File out3 = new File(testDir, "nums-3");
generate(2, 100, 500, out3);
load(8, fluoPropsFile, out3);
int ucount3 = unique(out1, out2, out3);
- Assert.assertTrue(ucount3 > ucount2); // used > because the probability that no new numbers are chosen is exceedingly small
+ Assert.assertTrue(ucount3 > ucount2); // used > because the probability that no new numbers are
+ // chosen is exceedingly small
miniFluo.waitForObservers();
diff --git a/src/test/java/io/fluo/stress/TrieStopLevelIT.java b/src/test/java/io/fluo/stress/TrieStopLevelIT.java
index 5569e45..b9b10ca 100644
--- a/src/test/java/io/fluo/stress/TrieStopLevelIT.java
+++ b/src/test/java/io/fluo/stress/TrieStopLevelIT.java
@@ -1,17 +1,15 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress;
@@ -19,33 +17,33 @@
import java.util.Collections;
import java.util.List;
-import io.fluo.api.client.Snapshot;
-import io.fluo.api.config.ObserverConfiguration;
-import io.fluo.api.data.Bytes;
import io.fluo.stress.trie.Constants;
import io.fluo.stress.trie.Node;
import io.fluo.stress.trie.NodeObserver;
-import org.apache.commons.configuration.Configuration;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.Bytes;
import org.junit.Assert;
import org.junit.Test;
public class TrieStopLevelIT extends TrieMapRedIT {
-
+
@Override
protected List<ObserverConfiguration> getObservers() {
return Collections.singletonList(new ObserverConfiguration(NodeObserver.class.getName()));
}
@Override
- protected void setAppConfig(Configuration config){
+ protected void setAppConfig(SimpleConfiguration config) {
config.setProperty(Constants.STOP_LEVEL_PROP, 7);
config.setProperty(Constants.NODE_SIZE_PROP, 8);
}
-
+
@Test
public void testEndToEnd() throws Exception {
super.testEndToEnd();
- try(Snapshot snap = client.newSnapshot()){
+ try (Snapshot snap = client.newSnapshot()) {
Bytes row = Bytes.of(Node.generateRootId(8));
Assert.assertNull(snap.get(row, Constants.COUNT_SEEN_COL));
Assert.assertNull(snap.get(row, Constants.COUNT_WAIT_COL));