updates for changes in Fluo and Fluo Recipes
diff --git a/pom.xml b/pom.xml
index 4fa733c..f64905b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -35,7 +35,8 @@
<properties>
<accumulo.version>1.6.4</accumulo.version>
<hadoop.version>2.6.3</hadoop.version>
- <fluo.version>1.0.0-beta-3-SNAPSHOT</fluo.version>
+ <fluo.version>1.0.0-incubating-SNAPSHOT</fluo.version>
+ <fluo-recipes.version>1.0.0-beta-3-SNAPSHOT</fluo-recipes.version>
<slf4j.version>1.7.12</slf4j.version>
</properties>
@@ -125,6 +126,11 @@
<version>${fluo.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.fluo</groupId>
+ <artifactId>fluo-recipes-core</artifactId>
+ <version>${fluo-recipes.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
diff --git a/src/main/java/io/fluo/stress/trie/Constants.java b/src/main/java/io/fluo/stress/trie/Constants.java
index 368ffd9..4e6ccb5 100644
--- a/src/main/java/io/fluo/stress/trie/Constants.java
+++ b/src/main/java/io/fluo/stress/trie/Constants.java
@@ -1,34 +1,32 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress.trie;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.StringEncoder;
-import org.apache.fluo.api.types.TypeLayer;
+import org.apache.fluo.recipes.core.types.StringEncoder;
+import org.apache.fluo.recipes.core.types.TypeLayer;
/**
*
*/
public class Constants {
-
+
public static final TypeLayer TYPEL = new TypeLayer(new StringEncoder());
- public static final Column COUNT_SEEN_COL = TYPEL.bc().fam("count").qual("seen").vis();
- public static final Column COUNT_WAIT_COL = TYPEL.bc().fam("count").qual("wait").vis();
-
+ public static final Column COUNT_SEEN_COL = new Column("count", "seen");
+ public static final Column COUNT_WAIT_COL = new Column("count", "wait");
+
public static final String NODE_SIZE_PROP = "trie.nodeSize";
public static final String STOP_LEVEL_PROP = "trie.stopLevel";
}
diff --git a/src/main/java/io/fluo/stress/trie/Diff.java b/src/main/java/io/fluo/stress/trie/Diff.java
index 2e4eed6..525a442 100644
--- a/src/main/java/io/fluo/stress/trie/Diff.java
+++ b/src/main/java/io/fluo/stress/trie/Diff.java
@@ -6,36 +6,27 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
public class Diff {
public static Map<String, Long> getRootCount(FluoClient client, Snapshot snap, int level,
int stopLevel, int nodeSize) throws Exception {
- ScannerConfiguration scanConfig = new ScannerConfiguration();
- scanConfig.setSpan(Span.prefix(String.format("%02d:", level)));
- scanConfig.fetchColumn(Constants.COUNT_SEEN_COL.getFamily(),
- Constants.COUNT_SEEN_COL.getQualifier());
- scanConfig.fetchColumn(Constants.COUNT_WAIT_COL.getFamily(),
- Constants.COUNT_WAIT_COL.getQualifier());
-
- RowIterator rowIter = snap.get(scanConfig);
HashMap<String, Long> counts = new HashMap<>();
- while (rowIter.hasNext()) {
- Entry<Bytes, ColumnIterator> rowEntry = rowIter.next();
- String row = rowEntry.getKey().toString();
+ RowScanner rows = snap.scanner().over(Span.prefix(String.format("%02d:", level)))
+ .fetch(Constants.COUNT_SEEN_COL, Constants.COUNT_WAIT_COL).byRow().build();
+
+ for (ColumnScanner columns : rows) {
+ String row = columns.getsRow();
Node node = new Node(row);
while (node.getLevel() > stopLevel) {
@@ -46,10 +37,8 @@
long count = counts.getOrDefault(stopRow, 0L);
if (node.getNodeSize() == nodeSize) {
- ColumnIterator colIter = rowEntry.getValue();
- while (colIter.hasNext()) {
- Entry<Column, Bytes> colEntry = colIter.next();
- count += Long.parseLong(colEntry.getValue().toString());
+ for (ColumnValue colVal : columns) {
+ count += Long.parseLong(colVal.getsValue());
}
} else {
throw new RuntimeException("TODO");
@@ -81,7 +70,7 @@
// TODO 8
for (int level = stopLevel + 1; level <= 8; level++) {
- System.out.printf("Level %d:\n",level);
+ System.out.printf("Level %d:\n", level);
Map<String, Long> counts = getRootCount(client, snap, level, stopLevel, nodeSize);
@@ -95,7 +84,7 @@
System.out.printf("\tdiff: %s %d %d\n", row, c1, c2);
}
- if(c2 > 0){
+ if (c2 > 0) {
sum += c2;
}
}
@@ -108,7 +97,7 @@
System.out.printf("\textra: %s %d\n", row, c);
}
- System.out.println("\tsum "+sum);
+ System.out.println("\tsum " + sum);
}
}
}
diff --git a/src/main/java/io/fluo/stress/trie/Load.java b/src/main/java/io/fluo/stress/trie/Load.java
index 8027299..b6cb7a8 100644
--- a/src/main/java/io/fluo/stress/trie/Load.java
+++ b/src/main/java/io/fluo/stress/trie/Load.java
@@ -1,17 +1,15 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress.trie;
@@ -19,7 +17,6 @@
import java.io.File;
import java.io.IOException;
-import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.mapreduce.FluoOutputFormat;
@@ -39,10 +36,11 @@
private static final Logger log = LoggerFactory.getLogger(Load.class);
- public static class LoadMapper extends Mapper<LongWritable,NullWritable,Loader,NullWritable> {
+ public static class LoadMapper extends Mapper<LongWritable, NullWritable, Loader, NullWritable> {
@Override
- protected void map(LongWritable key, NullWritable val, Context context) throws IOException, InterruptedException {
+ protected void map(LongWritable key, NullWritable val, Context context)
+ throws IOException, InterruptedException {
context.write(new NumberLoader(key.get()), val);
}
}
@@ -61,7 +59,7 @@
Job job = Job.getInstance(getConf());
job.setJobName(Load.class.getName());
-
+
job.setJarByClass(Load.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
@@ -72,7 +70,7 @@
job.setNumReduceTasks(0);
job.setOutputFormatClass(FluoOutputFormat.class);
- FluoOutputFormat.configure(job, ConfigurationConverter.getProperties(props));
+ FluoOutputFormat.configure(job, props);
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
diff --git a/src/main/java/io/fluo/stress/trie/NodeObserver.java b/src/main/java/io/fluo/stress/trie/NodeObserver.java
index 3c0f67e..f84249b 100644
--- a/src/main/java/io/fluo/stress/trie/NodeObserver.java
+++ b/src/main/java/io/fluo/stress/trie/NodeObserver.java
@@ -19,8 +19,8 @@
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.fluo.api.types.TypedSnapshotBase.Value;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.recipes.core.types.TypedSnapshotBase.Value;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/io/fluo/stress/trie/NumberLoader.java b/src/main/java/io/fluo/stress/trie/NumberLoader.java
index b5e9ddc..492556b 100644
--- a/src/main/java/io/fluo/stress/trie/NumberLoader.java
+++ b/src/main/java/io/fluo/stress/trie/NumberLoader.java
@@ -18,8 +18,8 @@
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.types.TypedSnapshotBase.Value;
-import org.apache.fluo.api.types.TypedTransactionBase;
+import org.apache.fluo.recipes.core.types.TypedSnapshotBase.Value;
+import org.apache.fluo.recipes.core.types.TypedTransactionBase;
import static com.google.common.base.Preconditions.checkArgument;
diff --git a/src/main/java/io/fluo/stress/trie/Print.java b/src/main/java/io/fluo/stress/trie/Print.java
index fcc5d1a..b835450 100644
--- a/src/main/java/io/fluo/stress/trie/Print.java
+++ b/src/main/java/io/fluo/stress/trie/Print.java
@@ -1,35 +1,30 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress.trie;
import java.io.File;
-import java.util.Map.Entry;
-import org.apache.commons.configuration.Configuration;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.config.ScannerConfiguration;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.config.SimpleConfiguration;
+import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
-import org.apache.fluo.api.iterator.ColumnIterator;
-import org.apache.fluo.api.iterator.RowIterator;
public class Print {
@@ -55,62 +50,57 @@
this.nodes = nodes;
this.sawOtherNodes = son;
}
-
+
@Override
public boolean equals(Object o) {
if (o instanceof Stats) {
Stats os = (Stats) o;
- return totalWait == os.totalWait && totalSeen == os.totalSeen && sawOtherNodes == os.sawOtherNodes;
+ return totalWait == os.totalWait && totalSeen == os.totalSeen
+ && sawOtherNodes == os.sawOtherNodes;
}
return false;
}
}
- public static Stats getStats(Configuration config) throws Exception {
+ public static Stats getStats(SimpleConfiguration config) throws Exception {
try (FluoClient client = FluoFactory.newClient(config); Snapshot snap = client.newSnapshot()) {
int level = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
int nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
-
- ScannerConfiguration scanConfig = new ScannerConfiguration();
- scanConfig.setSpan(Span.prefix(String.format("%02d:", level)));
- scanConfig.fetchColumn(Constants.COUNT_SEEN_COL.getFamily(), Constants.COUNT_SEEN_COL.getQualifier());
- scanConfig.fetchColumn(Constants.COUNT_WAIT_COL.getFamily(), Constants.COUNT_WAIT_COL.getQualifier());
- RowIterator rowIter = snap.get(scanConfig);
+ RowScanner rows = snap.scanner().over(Span.prefix(String.format("%02d:", level)))
+ .fetch(Constants.COUNT_SEEN_COL, Constants.COUNT_WAIT_COL).byRow().build();
+
long totalSeen = 0;
long totalWait = 0;
int otherNodeSizes = 0;
-
+
long nodes = 0;
-
- while (rowIter.hasNext()) {
- Entry<Bytes,ColumnIterator> rowEntry = rowIter.next();
- String row = rowEntry.getKey().toString();
+
+ for (ColumnScanner columns : rows) {
+ String row = columns.getsRow();
Node node = new Node(row);
if (node.getNodeSize() == nodeSize) {
- ColumnIterator colIter = rowEntry.getValue();
- while(colIter.hasNext()){
- Entry<Column,Bytes> colEntry = colIter.next();
- if(colEntry.getKey().equals(Constants.COUNT_SEEN_COL)){
- totalSeen += Long.parseLong(colEntry.getValue().toString());
- }else{
- totalWait += Long.parseLong(colEntry.getValue().toString());
+ for (ColumnValue cv : columns) {
+ if (cv.getColumn().equals(Constants.COUNT_SEEN_COL)) {
+ totalSeen += Long.parseLong(cv.getsValue());
+ } else {
+ totalWait += Long.parseLong(cv.getsValue());
}
}
-
+
nodes++;
} else {
otherNodeSizes++;
}
}
-
+
return new Stats(totalWait, totalSeen, nodes, otherNodeSizes != 0);
}
@@ -122,12 +112,12 @@
System.err.println("Usage: " + Print.class.getSimpleName() + " <fluo props>");
System.exit(-1);
}
-
+
Stats stats = getStats(new FluoConfiguration(new File(args[0])));
System.out.println("Total at root : " + (stats.totalSeen + stats.totalWait));
System.out.println("Nodes Scanned : " + stats.nodes);
-
+
if (stats.sawOtherNodes) {
System.err.println("WARN : Other node sizes were seen and ignored.");
}
diff --git a/src/test/java/io/fluo/stress/TrieBasicIT.java b/src/test/java/io/fluo/stress/TrieBasicIT.java
index 0f84110..2155882 100644
--- a/src/test/java/io/fluo/stress/TrieBasicIT.java
+++ b/src/test/java/io/fluo/stress/TrieBasicIT.java
@@ -1,17 +1,15 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress;
@@ -25,13 +23,13 @@
import io.fluo.stress.trie.Node;
import io.fluo.stress.trie.NodeObserver;
import io.fluo.stress.trie.NumberLoader;
-import org.apache.commons.configuration.Configuration;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.config.ObserverConfiguration;
-import org.apache.fluo.api.types.TypedSnapshot;
+import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.integration.ITBaseMini;
+import org.apache.fluo.recipes.core.types.TypedSnapshot;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -53,7 +51,7 @@
}
@Override
- protected void setAppConfig(Configuration config){
+ protected void setAppConfig(SimpleConfiguration config) {
config.setProperty(Constants.STOP_LEVEL_PROP, 0);
}
@@ -84,33 +82,41 @@
private void runTrieTest(int ingestNum, int maxValue, int nodeSize) throws Exception {
- log.info("Ingesting "+ingestNum+" unique numbers with a nodeSize of "+nodeSize+" bits");
+ log.info("Ingesting " + ingestNum + " unique numbers with a nodeSize of " + nodeSize + " bits");
config.setLoaderThreads(0);
config.setLoaderQueueSize(0);
- try(FluoClient fluoClient = FluoFactory.newClient(config); LoaderExecutor le = client.newLoaderExecutor()) {
- Random random = new Random();
- Set<Integer> ingested = new HashSet<>();
- for (int i = 0; i < ingestNum; i++) {
- int num = Math.abs(random.nextInt(maxValue));
- le.execute(new NumberLoader(num, nodeSize));
- ingested.add(num);
- }
- int uniqueNum = ingested.size();
+ try (FluoClient fluoClient = FluoFactory.newClient(config)) {
- log.info("Ingested "+uniqueNum+" unique numbers with a nodeSize of "+nodeSize+" bits");
+ int uniqueNum;
+
+ try (LoaderExecutor le = client.newLoaderExecutor()) {
+ Random random = new Random();
+ Set<Integer> ingested = new HashSet<>();
+ for (int i = 0; i < ingestNum; i++) {
+ int num = Math.abs(random.nextInt(maxValue));
+ le.execute(new NumberLoader(num, nodeSize));
+ ingested.add(num);
+ }
+
+ uniqueNum = ingested.size();
+ log.info(
+ "Ingested " + uniqueNum + " unique numbers with a nodeSize of " + nodeSize + " bits");
+ }
miniFluo.waitForObservers();
try (TypedSnapshot tsnap = TYPEL.wrap(client.newSnapshot())) {
- Integer result = tsnap.get().row(Node.generateRootId(nodeSize)).col(COUNT_SEEN_COL).toInteger();
+ Integer result =
+ tsnap.get().row(Node.generateRootId(nodeSize)).col(COUNT_SEEN_COL).toInteger();
if (result == null) {
log.error("Could not find root node");
printSnapshot();
}
if (!result.equals(uniqueNum)) {
- log.error("Count (" + result + ") at root node does not match expected (" + uniqueNum + "):");
+ log.error(
+ "Count (" + result + ") at root node does not match expected (" + uniqueNum + "):");
printSnapshot();
}
Assert.assertEquals(uniqueNum, result.intValue());
diff --git a/src/test/java/io/fluo/stress/TrieMapRedIT.java b/src/test/java/io/fluo/stress/TrieMapRedIT.java
index c720dee..35dceee 100644
--- a/src/test/java/io/fluo/stress/TrieMapRedIT.java
+++ b/src/test/java/io/fluo/stress/TrieMapRedIT.java
@@ -1,23 +1,19 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -30,10 +26,9 @@
import io.fluo.stress.trie.NodeObserver;
import io.fluo.stress.trie.Print;
import io.fluo.stress.trie.Unique;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.io.FileUtils;
import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.integration.ITBaseMini;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Assert;
@@ -50,34 +45,38 @@
}
@Override
- protected void setAppConfig(Configuration config){
+ protected void setAppConfig(SimpleConfiguration config) {
config.setProperty(Constants.STOP_LEVEL_PROP, 0);
config.setProperty(Constants.NODE_SIZE_PROP, 8);
}
static void generate(int numMappers, int numPerMapper, int max, File out1) throws Exception {
- int ret = ToolRunner.run(new Generate(), new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///", "" + numMappers,
- numPerMapper + "", max + "", out1.toURI().toString()});
+ int ret = ToolRunner.run(new Generate(),
+ new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///",
+ "" + numMappers, numPerMapper + "", max + "", out1.toURI().toString()});
Assert.assertEquals(0, ret);
}
static void load(int nodeSize, File fluoPropsFile, File input) throws Exception {
- int ret = ToolRunner
- .run(new Load(), new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///", fluoPropsFile.getAbsolutePath(),
- input.toURI().toString()});
+ int ret = ToolRunner.run(new Load(), new String[] {"-D", "mapred.job.tracker=local", "-D",
+ "fs.defaultFS=file:///", fluoPropsFile.getAbsolutePath(), input.toURI().toString()});
Assert.assertEquals(0, ret);
}
static void init(int nodeSize, File fluoPropsFile, File input, File tmp) throws Exception {
- int ret = ToolRunner
- .run(new Init(), new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///", fluoPropsFile.getAbsolutePath(),
- input.toURI().toString(), tmp.toURI().toString()});
+ int ret =
+ ToolRunner
+ .run(new Init(),
+ new String[] {"-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///",
+ fluoPropsFile.getAbsolutePath(), input.toURI().toString(),
+ tmp.toURI().toString()});
Assert.assertEquals(0, ret);
}
static int unique(File... dirs) throws Exception {
- ArrayList<String> args = new ArrayList<>(Arrays.asList("-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///"));
+ ArrayList<String> args = new ArrayList<>(
+ Arrays.asList("-D", "mapred.job.tracker=local", "-D", "fs.defaultFS=file:///"));
for (File dir : dirs) {
args.add(dir.toURI().toString());
}
@@ -94,9 +93,7 @@
testDir.mkdirs();
File fluoPropsFile = new File(testDir, "fluo.props");
- BufferedWriter propOut = new BufferedWriter(new FileWriter(fluoPropsFile));
- ConfigurationConverter.getProperties(config).store(propOut, "");
- propOut.close();
+ config.save(fluoPropsFile);
File out1 = new File(testDir, "nums-1");
@@ -122,7 +119,8 @@
generate(2, 100, 500, out2);
load(8, fluoPropsFile, out2);
int ucount2 = unique(out1, out2);
- Assert.assertTrue(ucount2 > ucount); // used > because the probability that no new numbers are chosen is exceedingly small
+ Assert.assertTrue(ucount2 > ucount); // used > because the probability that no new numbers are
+ // chosen is exceedingly small
miniFluo.waitForObservers();
@@ -132,7 +130,8 @@
generate(2, 100, 500, out3);
load(8, fluoPropsFile, out3);
int ucount3 = unique(out1, out2, out3);
- Assert.assertTrue(ucount3 > ucount2); // used > because the probability that no new numbers are chosen is exceedingly small
+ Assert.assertTrue(ucount3 > ucount2); // used > because the probability that no new numbers are
+ // chosen is exceedingly small
miniFluo.waitForObservers();
diff --git a/src/test/java/io/fluo/stress/TrieStopLevelIT.java b/src/test/java/io/fluo/stress/TrieStopLevelIT.java
index 610fe53..b9b10ca 100644
--- a/src/test/java/io/fluo/stress/TrieStopLevelIT.java
+++ b/src/test/java/io/fluo/stress/TrieStopLevelIT.java
@@ -1,17 +1,15 @@
/*
* Copyright 2014 Fluo authors (see AUTHORS)
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
*/
package io.fluo.stress;
@@ -22,30 +20,30 @@
import io.fluo.stress.trie.Constants;
import io.fluo.stress.trie.Node;
import io.fluo.stress.trie.NodeObserver;
-import org.apache.commons.configuration.Configuration;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.config.ObserverConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.junit.Assert;
import org.junit.Test;
public class TrieStopLevelIT extends TrieMapRedIT {
-
+
@Override
protected List<ObserverConfiguration> getObservers() {
return Collections.singletonList(new ObserverConfiguration(NodeObserver.class.getName()));
}
@Override
- protected void setAppConfig(Configuration config){
+ protected void setAppConfig(SimpleConfiguration config) {
config.setProperty(Constants.STOP_LEVEL_PROP, 7);
config.setProperty(Constants.NODE_SIZE_PROP, 8);
}
-
+
@Test
public void testEndToEnd() throws Exception {
super.testEndToEnd();
- try(Snapshot snap = client.newSnapshot()){
+ try (Snapshot snap = client.newSnapshot()) {
Bytes row = Bytes.of(Node.generateRootId(8));
Assert.assertNull(snap.get(row, Constants.COUNT_SEEN_COL));
Assert.assertNull(snap.get(row, Constants.COUNT_WAIT_COL));