Merge pull request #3 from keith-turner/stresso-update

Update Stresso to Fluo 1.2, Accumulo 2.0, and Hadoop 3.0
diff --git a/stresso/.gitignore b/stresso/.gitignore
index 9233d7a..ac0a72e 100644
--- a/stresso/.gitignore
+++ b/stresso/.gitignore
@@ -7,3 +7,4 @@
 *.iml
 git/
 logs/
+lib/
diff --git a/stresso/README.md b/stresso/README.md
index d3c2577..e06946f 100644
--- a/stresso/README.md
+++ b/stresso/README.md
@@ -3,12 +3,12 @@
 
 [![Build Status](https://travis-ci.org/astralway/stresso.svg?branch=master)](https://travis-ci.org/astralway/stresso)
 
-An example application designed to stress Apache Fluo.  This Fluo application computes the 
+An example application designed to stress Apache Fluo.  This Fluo application computes the
 number of unique integers through the process of building a bitwise trie.  New numbers
-are added to the trie as leaf nodes.  Observers watch all nodes in the trie to create 
+are added to the trie as leaf nodes.  Observers watch all nodes in the trie to create
 parents and percolate counts up to the root nodes such that each node in the trie keeps
-track of the number of leaf nodes below it. The count at the root nodes should equal 
-the total number of leaf nodes.  This makes it easy to verify if the test ran correctly. 
+track of the number of leaf nodes below it. The count at the root nodes should equal
+the total number of leaf nodes.  This makes it easy to verify if the test ran correctly.
 The test stresses Apache Fluo in that multiple transactions can operate on the same data
 as counts are percolated up the trie.
 
@@ -24,8 +24,8 @@
    to percolate.  The lower the stop level, the more root nodes there are.
    Having more root nodes means less collisions, but all roots need to be
    scanned to get the count of unique numbers.  Having ~64k root nodes is a
-   good choice.  
- * **max** : Random numbers are generated modulo the max. 
+   good choice.
+ * **max** : Random numbers are generated modulo the max.
 
 Setting the stop level such that you have ~64k root nodes is dependent on the
 max and nodeSize.  For example assume we choose a max of 10<sup>12</sup> and a
@@ -53,12 +53,12 @@
 level 5.
 
 For small scale test a max of 10<sup>9</sup> and a stop level of 6 is a good
-choice. 
+choice.
 
 ## Building Stresso
 
 ```
-mvn package 
+mvn package
 ```
 
 This will create a jar and shaded jar in target:
@@ -75,7 +75,7 @@
 
 ## Run Stresso on cluster
 
-The [bin directory](/bin) contains a set of scripts to help run this test on a
+The [bin directory](/stresso/bin) contains a set of scripts to help run this test on a
 cluster.  These scripts make the following assumpitions.
 
  * `FLUO_HOME` environment variable is set.  If not set, then set it in `conf/env.sh`.
@@ -83,29 +83,37 @@
  * Hadoop `hadoop` command is on path.
  * Accumulo `accumulo` command is on path.
 
-Before running any of the scipts, copy [conf/env.sh.example](/conf/env.sh.example) 
-to `conf/env.sh`, then inspect and modify the file.
+Copy [conf/env.sh.example](/stresso/conf/env.sh.example) and
+[conf/fluo-app.properties.example](/stresso/conf/fluo-app.properties.example)
+to `conf/env.sh` and `conf/fluo-app.properties`, then inspect and modify these
+files. Then initialize using the following commands.
 
-Next, execute the [run-test.sh](/bin/run-test.sh) script.  This script will create a
-new Apache Fluo app called `stresso` (which can be changed by `FLUO_APP_NAME` in your env.sh). 
-It will modify the application's fluo.properties, copy the stresso jar to the `lib/` 
-directory of the app and set the following in fluo.properties:
+```bash
+# populate the lib dir needed by init
+./bin/build.sh
 
-```
-fluo.observer.0=stresso.trie.NodeObserver
-fluo.app.trie.nodeSize=X
-fluo.app.trie.stopLevel=Y
+# initialize the stresso Fluo application
+fluo init -a stresso -p conf/fluo-app.properties
 ```
 
-The `run-test.sh` script will then initialize and start the Stresso application.  
-It will load a lot of data directly into Accumulo without transactions and then 
-incrementally load smaller amounts of data using transactions.  After incrementally 
+After initialization the Fluo application needs to be started.  There are many possible ways to
+do this.  The following commands will start it locally.
+
+```bash
+mkdir -p logs
+fluo oracle -a stresso &> logs/oracle.log &
+fluo worker -a stresso &> logs/worker.log &
+```
+
+Next, execute the [run-test.sh](/stresso/bin/run-test.sh) script.
+This script loads a lot of data directly into Accumulo without transactions and then
+incrementally loads smaller amounts of data using transactions.  After incrementally
 loading some data, it computes the expected number of unique integers using map reduce.
-It then prints the number of unique integers computed by Apache Fluo. 
+It then prints the number of unique integers computed by Apache Fluo.
 
 ## Additional Scripts
 
-The script [generate.sh](/bin/generate.sh) starts a map reduce job to generate
+The script [generate.sh](/stresso/bin/generate.sh) starts a map reduce job to generate
 random integers.
 
 ```
@@ -119,7 +127,7 @@
 out dir   = Output directory
 ```
 
-The script [split.sh](/bin/split.sh) pre-splits the Accumulo table used by Apache
+The script [split.sh](/stresso/bin/split.sh) pre-splits the Accumulo table used by Apache
 Fluo.  Consider running this command before loading data.
 
 ```
@@ -130,10 +138,10 @@
 num tablets = Num tablets to create for lowest level of tree.  Will create less tablets for higher levels based on the max.
 ```
 After generating random numbers, load them into Apache Fluo with one of the following
-commands.  The script [init.sh](/bin/init.sh) intializes any empty table using
+commands.  The script [bulk_load.sh](/stresso/bin/bulk_load.sh) intializes any empty table using
 map reduce.  This simulates the case where a user has a lot of initial data to
 load into Fluo.  This command should only be run when the table is empty
-because it writes directly to the Fluo table w/o using transactions.  
+because it writes directly to the Fluo table w/o using transactions.
 
 ```
 init.sh <input dir> <tmp dir> <num reducers>
@@ -146,7 +154,7 @@
 num reducers = Number of reduce task map reuduce job should run
 ```
 
-Run the [load.sh](/bin/load.sh) script on a table with existing data. It starts
+Run the [load.sh](/stresso/bin/load.sh) script on a table with existing data. It starts
 a map reduce job that executes load transactions.  Loading the same directory
 multiple times should not result in incorrect counts.
 
@@ -154,7 +162,7 @@
 load.sh <input dir>
 ```
 
-After loading data, run the [print.sh](/bin/print.sh) script to check the
+After loading data, run the [print.sh](/stresso/bin/print.sh) script to check the
 status of the computation of the number of unique integers within Apache Fluo.  This
 command will print two numbers, the sum of the root nodes and number of root
 nodes.  If there are outstanding notification to process, this count may not be
@@ -164,10 +172,10 @@
 print.sh
 ```
 
-In order to know how many unique numbers are expected, run the [unique.sh](/bin/unique.sh)
+In order to know how many unique numbers are expected, run the [unique.sh](/stresso/bin/unique.sh)
 script.  This scrpt runs a map reduce job that calculates the number of
 unique integers.  This script can take a list of directories created by
-multiple runs of [generate.sh](/bin/generate.sh)
+multiple runs of [generate.sh](/stresso/bin/generate.sh)
 
 ```
 unique.sh <num reducers> <input dir>{ <input dir>}
diff --git a/stresso/bin/build.sh b/stresso/bin/build.sh
new file mode 100755
index 0000000..d4dd1fc
--- /dev/null
+++ b/stresso/bin/build.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+
+BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+SKIP_JAR_CHECKS="true"
+
+. $BIN_DIR/load-env.sh
+
+unset SKIP_JAR_CHECKS
+
+cd $BIN_DIR/..
+
+# build Stresso using the versions of Fluo and Accumulo running on the system
+mvn clean package -Dfluo.version=$FLUO_VERSION -Daccumulo.version=$ACCUMULO_VERSION -DskipTests
+
+mkdir -p lib
+
+# populate lib dir used by fluo init
+rm -f lib/*
+cp target/stresso-0.0.1-SNAPSHOT.jar ./lib/
+mvn dependency:copy-dependencies  -DincludeArtifactIds=fluo-recipes-core -DoutputDirectory=./lib
diff --git a/stresso/bin/init.sh b/stresso/bin/bulk_load.sh
similarity index 85%
rename from stresso/bin/init.sh
rename to stresso/bin/bulk_load.sh
index 133ad10..3bddbb1 100755
--- a/stresso/bin/init.sh
+++ b/stresso/bin/bulk_load.sh
@@ -8,4 +8,4 @@
     exit 1
 fi
 
-yarn jar $STRESSO_SHADED_JAR stresso.trie.Init -Dmapreduce.job.reduces=$3 $FLUO_PROPS $1 $2
+yarn jar $STRESSO_SHADED_JAR stresso.trie.Init -Dmapreduce.job.reduces=$3 $FLUO_CONN $FLUO_APP_NAME $1 $2
diff --git a/stresso/bin/compact-ll.sh b/stresso/bin/compact-ll.sh
index 5a98277..ab74fd6 100755
--- a/stresso/bin/compact-ll.sh
+++ b/stresso/bin/compact-ll.sh
@@ -3,4 +3,4 @@
 BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 . $BIN_DIR/load-env.sh
 
-$FLUO_CMD exec $FLUO_APP_NAME stresso.trie.CompactLL $FLUO_PROPS $@
+$FLUO_CMD exec $FLUO_APP_NAME stresso.trie.CompactLL $FLUO_CONN $FLUO_APP_NAME $@
diff --git a/stresso/bin/diff.sh b/stresso/bin/diff.sh
index 5e36d95..d5ce839 100755
--- a/stresso/bin/diff.sh
+++ b/stresso/bin/diff.sh
@@ -3,4 +3,4 @@
 BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 . $BIN_DIR/load-env.sh
 
-$FLUO_CMD exec $FLUO_APP_NAME stresso.trie.Diff $FLUO_PROPS $@
+$FLUO_CMD exec $FLUO_APP_NAME stresso.trie.Diff $FLUO_CONN $FLUO_APP_NAME $@
diff --git a/stresso/bin/generate.sh b/stresso/bin/generate.sh
index 622be8a..16c5f6f 100755
--- a/stresso/bin/generate.sh
+++ b/stresso/bin/generate.sh
@@ -3,4 +3,4 @@
 BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 . $BIN_DIR/load-env.sh
 
-yarn jar $STRESSO_JAR stresso.trie.Generate $@
+yarn jar $STRESSO_SHADED_JAR stresso.trie.Generate $@
diff --git a/stresso/bin/load-env.sh b/stresso/bin/load-env.sh
index 5400fc2..dfba208 100644
--- a/stresso/bin/load-env.sh
+++ b/stresso/bin/load-env.sh
@@ -15,10 +15,9 @@
   echo "FLUO_APP_NAME is not set!" 
   exit 1
 fi
-FLUO_APP_LIB=$FLUO_HOME/apps/$FLUO_APP_NAME/lib
-FLUO_PROPS=$FLUO_HOME/apps/$FLUO_APP_NAME/conf/fluo.properties
-if [ ! -f "$FLUO_PROPS" ] && [ -z "$SKIP_FLUO_PROPS_CHECK" ]; then
-  echo "Fluo properties file not found : $FLUO_PROPS" 
+
+if [ ! -f "$FLUO_CONN" ]; then
+  echo "Fluo conn properties file not found : $FLUO_CONN" 
   exit 1
 fi
 
diff --git a/stresso/bin/load.sh b/stresso/bin/load.sh
index 8cf2ac5..486f115 100755
--- a/stresso/bin/load.sh
+++ b/stresso/bin/load.sh
@@ -3,4 +3,4 @@
 BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 . $BIN_DIR/load-env.sh
 
-yarn jar $STRESSO_SHADED_JAR stresso.trie.Load $FLUO_PROPS $@
+yarn jar $STRESSO_SHADED_JAR stresso.trie.Load $FLUO_CONN $FLUO_APP_NAME $@
diff --git a/stresso/bin/print.sh b/stresso/bin/print.sh
index 2554c4c..02c7bcb 100755
--- a/stresso/bin/print.sh
+++ b/stresso/bin/print.sh
@@ -3,4 +3,4 @@
 BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 . $BIN_DIR/load-env.sh
 
-$FLUO_CMD exec $FLUO_APP_NAME stresso.trie.Print $FLUO_PROPS $@
+$FLUO_CMD exec $FLUO_APP_NAME stresso.trie.Print $FLUO_CONN $FLUO_APP_NAME $@
diff --git a/stresso/bin/run-test.sh b/stresso/bin/run-test.sh
index a58dd6f..685a4c3 100755
--- a/stresso/bin/run-test.sh
+++ b/stresso/bin/run-test.sh
@@ -1,74 +1,18 @@
 #!/bin/bash
 
 BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
-SKIP_JAR_CHECKS="true"
-SKIP_FLUO_PROPS_CHECK="true"
 
 . $BIN_DIR/load-env.sh
 
-unset SKIP_JAR_CHECKS
-unset SKIP_FLUO_PROPS_CHECK
-
 # stop if any command fails
 set -e
 
-if [ ! -d $FLUO_HOME/apps/$FLUO_APP_NAME ]; then
-  $FLUO_CMD new $FLUO_APP_NAME
-else
-  echo "Restarting '$FLUO_APP_NAME' application.  Errors may be printed if it's not running..."
-  $FLUO_CMD stop $FLUO_APP_NAME || true
-  rm -rf $FLUO_HOME/apps/$FLUO_APP_NAME
-  $FLUO_CMD new $FLUO_APP_NAME
-fi
-
-# build stresso
-(cd $BIN_DIR/..;mvn package -Dfluo.version=$FLUO_VERSION -Daccumulo.version=$ACCUMULO_VERSION -DskipTests)
-
-if [[ $(accumulo version) == *1.6* ]]; then
-  # build stress balancer
-  (cd $BIN_DIR/..; mkdir -p git; cd git;git clone https://github.com/keith-turner/stress-balancer.git; cd stress-balancer; ./config-fluo.sh $FLUO_PROPS)
-fi
-
-if [ ! -f "$STRESSO_JAR" ]; then
-  echo "Stresso jar not found : $STRESSO_JAR"
+if [ $($FLUO_CMD status -a $FLUO_APP_NAME) != "RUNNING" ]; then
+  echo "Fluo app $FLUO_APP_NAME is not running"
   exit 1
 fi
-if [ ! -d $FLUO_APP_LIB ]; then
-  echo "Fluo app lib $FLUO_APP_LIB does not exist" 
-  exit 1
-fi
-cp $STRESSO_JAR $FLUO_APP_LIB
-mvn dependency:copy-dependencies  -DincludeArtifactIds=fluo-recipes-core -DoutputDirectory=$FLUO_APP_LIB
 
-# determine a good stop level
-if (("$MAX" <= $((10**9)))); then
-  STOP=6
-elif (("$MAX" <= $((10**12)))); then
-  STOP=5
-else
-  STOP=4
-fi
-
-# delete existing config in fluo.properties if it exist
-$SED '/fluo.observer/d' $FLUO_PROPS
-$SED '/fluo.app.trie/d' $FLUO_PROPS
-
-# append stresso specific config
-echo "fluo.observer.0=stresso.trie.NodeObserver" >> $FLUO_PROPS
-echo "fluo.app.trie.nodeSize=8" >> $FLUO_PROPS
-echo "fluo.app.trie.stopLevel=$STOP" >> $FLUO_PROPS
-
-$FLUO_CMD init $FLUO_APP_NAME -f
-$FLUO_CMD start $FLUO_APP_NAME
-
-echo "Removing any previous logs in $LOG_DIR"
 mkdir -p $LOG_DIR
-rm -f $LOG_DIR/*
-
-# configure balancer for fluo table
-if [[ $(accumulo version) == *1.6* ]]; then
-  (cd $BIN_DIR/../git/stress-balancer; ./config-accumulo.sh $FLUO_PROPS)
-fi # TODO setup RegexGroupBalancer built into Accumulo 1.7.0... may be easier to do from java
 
 hadoop fs -rm -r -f /stresso/
 
@@ -82,7 +26,7 @@
   # generate and load intial data using map reduce writing directly to table
   echo "*****Generating and loading initial data set*****"
   $BIN_DIR/generate.sh $MAPS $((GEN_INIT / MAPS)) $MAX /stresso/init >$LOG_DIR/generate_0.out 2>$LOG_DIR/generate_0.err
-  $BIN_DIR/init.sh /stresso/init /stresso/initTmp $REDUCES >$LOG_DIR/init.out 2>$LOG_DIR/init.err
+  $BIN_DIR/bulk_load.sh /stresso/init /stresso/initTmp $REDUCES >$LOG_DIR/init.out 2>$LOG_DIR/init.err
   hadoop fs -rm -r /stresso/initTmp
 fi
 
@@ -94,7 +38,7 @@
   # TODO could reload the same dataset sometimes, maybe when i%5 == 0 or something
   $BIN_DIR/compact-ll.sh $MAX $COMPACT_CUTOFF >$LOG_DIR/compact-ll_$i.out 2>$LOG_DIR/compact-ll_$i.err
   if ! ((i % WAIT_PERIOD)); then
-    $FLUO_CMD wait $FLUO_APP_NAME >$LOG_DIR/wait_$i.out 2>$LOG_DIR/wait_$i.err
+    $FLUO_CMD wait -a $FLUO_APP_NAME >$LOG_DIR/wait_$i.out 2>$LOG_DIR/wait_$i.err
   else
     sleep $SLEEP
   fi
@@ -106,7 +50,7 @@
 grep UNIQUE $LOG_DIR/unique.err
 
 echo "*****Wait for Fluo to finish processing*****"
-$FLUO_CMD wait $FLUO_APP_NAME
+$FLUO_CMD wait -a $FLUO_APP_NAME
 
 echo "*****Printing # of unique integers calculated by Fluo*****"
 $BIN_DIR/print.sh >$LOG_DIR/print.out 2>$LOG_DIR/print.err
diff --git a/stresso/bin/split.sh b/stresso/bin/split.sh
index 225bef5..e9c9d1f 100755
--- a/stresso/bin/split.sh
+++ b/stresso/bin/split.sh
@@ -3,4 +3,4 @@
 BIN_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 . $BIN_DIR/load-env.sh
 
-$FLUO_CMD exec $FLUO_APP_NAME stresso.trie.Split $FLUO_PROPS "$TABLE_PROPS" $@
+$FLUO_CMD exec $FLUO_APP_NAME stresso.trie.Split $FLUO_CONN $FLUO_APP_NAME "$TABLE_PROPS" $@
diff --git a/stresso/conf/.gitignore b/stresso/conf/.gitignore
index 137e678..8b4171a 100644
--- a/stresso/conf/.gitignore
+++ b/stresso/conf/.gitignore
@@ -1 +1,2 @@
 env.sh
+fluo-app.properties
diff --git a/stresso/conf/env.sh.example b/stresso/conf/env.sh.example
index 77f9171..3e6adbe 100644
--- a/stresso/conf/env.sh.example
+++ b/stresso/conf/env.sh.example
@@ -3,8 +3,14 @@
 ###############################
 # Fluo Home
 test -z "$FLUO_HOME" && FLUO_HOME=/path/to/accumulo
+# Fluo connection properties
+FLUO_CONN=$FLUO_HOME/conf/fluo-conn.properties
 # Fluo application name
 FLUO_APP_NAME=stresso
+# Set this to avoid Hadoop's old version of guava.  This will make Hadoop's
+# yarn command use a classloader when running code.  This classloader isolates
+# stresso runtime code from Hadoop's depedencies.
+export HADOOP_USE_CLIENT_CLASSLOADER=true
 
 ###############################
 # configuration for run-test.sh
diff --git a/stresso/conf/fluo-app.properties b/stresso/conf/fluo-app.properties
new file mode 100644
index 0000000..f07013a
--- /dev/null
+++ b/stresso/conf/fluo-app.properties
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+# agreements.  See the NOTICE file distributed with this work for additional information regarding
+# copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with the License.  You may
+# obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+
+fluo.observer.provider=stresso.trie.StressoObserverProvider
+
+fluo.app.trie.nodeSize=8
+# For a max ~10^9 set to 6.  For a max ~10^12 set to 5.  If more than 10^12 set to 4.
+fluo.app.trie.stopLevel=6
+
+# The following assumes that fluo init is run in the stresso dir
+fluo.observer.init.dir=./lib
+
+fluo.accumulo.table=${fluo.connection.application.name}
+
+# You may need to edit the following to match your Hadoop and Accumulo settings
+fluo.dfs.root=hdfs://localhost:8020/fluo
+fluo.accumulo.instance=uno
+fluo.accumulo.user=root
+fluo.accumulo.password=secret
+fluo.accumulo.zookeepers=localhost
+
+# Performance related properties
+fluo.worker.num.threads=128
+fluo.loader.num.threads=64
+fluo.loader.queue.size=256
+
diff --git a/stresso/conf/fluo-app.properties.example b/stresso/conf/fluo-app.properties.example
new file mode 100644
index 0000000..f07013a
--- /dev/null
+++ b/stresso/conf/fluo-app.properties.example
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+# agreements.  See the NOTICE file distributed with this work for additional information regarding
+# copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with the License.  You may
+# obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under
+# the License.
+
+fluo.observer.provider=stresso.trie.StressoObserverProvider
+
+fluo.app.trie.nodeSize=8
+# For a max ~10^9 set to 6.  For a max ~10^12 set to 5.  If more than 10^12 set to 4.
+fluo.app.trie.stopLevel=6
+
+# The following assumes that fluo init is run in the stresso dir
+fluo.observer.init.dir=./lib
+
+fluo.accumulo.table=${fluo.connection.application.name}
+
+# You may need to edit the following to match your Hadoop and Accumulo settings
+fluo.dfs.root=hdfs://localhost:8020/fluo
+fluo.accumulo.instance=uno
+fluo.accumulo.user=root
+fluo.accumulo.password=secret
+fluo.accumulo.zookeepers=localhost
+
+# Performance related properties
+fluo.worker.num.threads=128
+fluo.loader.num.threads=64
+fluo.loader.queue.size=256
+
diff --git a/stresso/pom.xml b/stresso/pom.xml
index 9b514da..0d3deba 100644
--- a/stresso/pom.xml
+++ b/stresso/pom.xml
@@ -27,10 +27,11 @@
   <url>https://github.com/astralway/stresso</url>
 
   <properties>
-    <accumulo.version>1.7.2</accumulo.version>
-    <hadoop.version>2.6.3</hadoop.version>
-    <fluo.version>1.0.0-incubating</fluo.version>
-    <fluo-recipes.version>1.0.0-incubating</fluo-recipes.version>
+    <accumulo.version>2.0.0-alpha-1</accumulo.version>
+    <accumulo-plugin.version>2.0.0-SNAPSHOT</accumulo-plugin.version>
+    <hadoop.version>3.1.1</hadoop.version>
+    <fluo.version>1.2.0</fluo.version>
+    <fluo-recipes.version>1.2.0</fluo-recipes.version>
     <slf4j.version>1.7.12</slf4j.version>
   </properties>
 
@@ -47,7 +48,7 @@
           <plugin>
             <groupId>org.apache.accumulo</groupId>
             <artifactId>accumulo-maven-plugin</artifactId>
-            <version>${accumulo.version}</version>
+	    <version>${accumulo-plugin.version}</version>
             <configuration>
               <instanceName>it-instance-maven</instanceName>
               <rootPassword>ITSecret</rootPassword>
@@ -82,7 +83,8 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-failsafe-plugin</artifactId>
+	<artifactId>maven-failsafe-plugin</artifactId>
+	<version>3.0.0-M1</version>
         <configuration>
           <systemPropertyVariables>
             <fluo.it.instance.name>it-instance-maven</fluo.it.instance.name>
@@ -102,7 +104,8 @@
 
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
+	<artifactId>maven-shade-plugin</artifactId>
+	<version>3.2.1</version>
         <executions>
           <execution>
             <goals>
@@ -164,12 +167,29 @@
       <version>${accumulo.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-client-mapreduce</artifactId>
+      <version>${accumulo.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-runtime</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
       <version>${slf4j.version}</version>
@@ -181,25 +201,6 @@
       <version>${slf4j.version}</version>
       <scope>provided</scope>
     </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>13.0.1</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-configuration</groupId>
-      <artifactId>commons-configuration</artifactId>
-      <version>1.10</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <version>1.10</version>
-      <scope>provided</scope>
-    </dependency>
-
     <!-- Test Dependencies -->
     <dependency>
       <groupId>junit</groupId>
diff --git a/stresso/src/main/java/stresso/trie/AccumuloUtil.java b/stresso/src/main/java/stresso/trie/AccumuloUtil.java
new file mode 100644
index 0000000..a20bd01
--- /dev/null
+++ b/stresso/src/main/java/stresso/trie/AccumuloUtil.java
@@ -0,0 +1,39 @@
+package stresso.trie;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.fluo.api.client.FluoAdmin;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+
+public class AccumuloUtil {
+
+  public interface TableOp<T> {
+    T run(TableOperations tableOps, String tableName) throws Exception;
+  }
+
+  public interface VoidTableOp {
+    void run(TableOperations tableOps, String tableName) throws Exception;
+  }
+
+  public static void doTableOp(FluoConfiguration fc, VoidTableOp tableOp) {
+    getTableOp(fc, (to, tn) -> {
+      tableOp.run(to, tn);
+      return null;
+    });
+  }
+
+  public static <T> T getTableOp(FluoConfiguration fc, TableOp<T> tableOp) {
+    try (FluoAdmin fadmin = FluoFactory.newAdmin(fc)) {
+      FluoConfiguration appCfg = new FluoConfiguration(fadmin.getApplicationConfig());
+      appCfg.setApplicationName(fc.getApplicationName());
+      AccumuloClient client = Accumulo.newClient()
+          .forInstance(appCfg.getAccumuloInstance(), appCfg.getAccumuloZookeepers())
+          .usingPassword(appCfg.getAccumuloUser(), appCfg.getAccumuloPassword()).build();
+      return tableOp.run(client.tableOperations(), appCfg.getAccumuloTable());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/stresso/src/main/java/stresso/trie/CompactLL.java b/stresso/src/main/java/stresso/trie/CompactLL.java
index 1e0e421..48b5545 100644
--- a/stresso/src/main/java/stresso/trie/CompactLL.java
+++ b/stresso/src/main/java/stresso/trie/CompactLL.java
@@ -3,10 +3,7 @@
 import java.io.File;
 
 import org.apache.accumulo.core.client.Connector;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -18,30 +15,26 @@
 public class CompactLL {
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 3) {
-      System.err
-          .println("Usage: " + Split.class.getSimpleName() + " <fluo props> <max> <cutoff>");
+    if (args.length != 4) {
+      System.err.println(
+          "Usage: " + Split.class.getSimpleName() + " <fluo conn props> <app name> <max> <cutoff>");
       System.exit(-1);
     }
 
     FluoConfiguration config = new FluoConfiguration(new File(args[0]));
+    config.setApplicationName(args[1]);
 
-    long max = Long.parseLong(args[1]);
+    long max = Long.parseLong(args[2]);
 
-    //compact levels that can contain less nodes than this
-    int cutoff = Integer.parseInt(args[2]);
+    // compact levels that can contain less nodes than this
+    int cutoff = Integer.parseInt(args[3]);
 
-    int nodeSize;
-    int stopLevel;
-    try (FluoClient client = FluoFactory.newClient(config)) {
-      nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
-      stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
-    }
+    StressoConfig sconf = StressoConfig.retrieve(config);
 
-    int level = 64 / nodeSize;
+    int level = 64 / sconf.nodeSize;
 
-    while(level >= stopLevel) {
-      if(max < cutoff) {
+    while (level >= sconf.stopLevel) {
+      if (max < cutoff) {
         break;
       }
 
@@ -49,13 +42,12 @@
       level--;
     }
 
-    String start = String.format("%02d", stopLevel);
+    String start = String.format("%02d", sconf.stopLevel);
     String end = String.format("%02d:~", (level));
 
-    System.out.println("Compacting "+start+" to "+end);
-    Connector conn = AccumuloUtil.getConnector(config);
-    conn.tableOperations().compact(config.getAccumuloTable(), new Text(start), new Text(end), true, false);
-
+    System.out.println("Compacting " + start + " to " + end);
+    AccumuloUtil.doTableOp(config,
+        (tableOps, table) -> tableOps.compact(table, new Text(start), new Text(end), true, false));
     System.exit(0);
   }
 }
diff --git a/stresso/src/main/java/stresso/trie/Diff.java b/stresso/src/main/java/stresso/trie/Diff.java
index f74521d..7f9012f 100644
--- a/stresso/src/main/java/stresso/trie/Diff.java
+++ b/stresso/src/main/java/stresso/trie/Diff.java
@@ -52,27 +52,27 @@
 
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 1) {
-      System.err.println("Usage: " + Diff.class.getSimpleName() + " <fluo props>");
+    if (args.length != 2) {
+      System.err.println("Usage: " + Diff.class.getSimpleName() + " <fluo conn props> <app name>");
       System.exit(-1);
     }
 
     FluoConfiguration config = new FluoConfiguration(new File(args[0]));
+    config.setApplicationName(args[1]);
 
     try (FluoClient client = FluoFactory.newClient(config); Snapshot snap = client.newSnapshot()) {
 
-      int stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
-      int nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
-
-      Map<String, Long> rootCounts = getRootCount(client, snap, stopLevel, stopLevel, nodeSize);
+      StressoConfig sconf = StressoConfig.retrieve(client);
+      
+      Map<String, Long> rootCounts = getRootCount(client, snap, sconf.stopLevel, sconf.stopLevel, sconf.nodeSize);
       ArrayList<String> rootRows = new ArrayList<>(rootCounts.keySet());
       Collections.sort(rootRows);
 
       // TODO 8
-      for (int level = stopLevel + 1; level <= 8; level++) {
+      for (int level = sconf.stopLevel + 1; level <= 8; level++) {
         System.out.printf("Level %d:\n", level);
 
-        Map<String, Long> counts = getRootCount(client, snap, level, stopLevel, nodeSize);
+        Map<String, Long> counts = getRootCount(client, snap, level, sconf.stopLevel, sconf.nodeSize);
 
         long sum = 0;
 
diff --git a/stresso/src/main/java/stresso/trie/Init.java b/stresso/src/main/java/stresso/trie/Init.java
index d0f847f..d346871 100644
--- a/stresso/src/main/java/stresso/trie/Init.java
+++ b/stresso/src/main/java/stresso/trie/Init.java
@@ -1,17 +1,15 @@
 /*
  * Copyright 2014 Stresso authors (see AUTHORS)
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  */
 
 package stresso.trie;
@@ -22,17 +20,13 @@
 import java.io.OutputStream;
 import java.util.Collection;
 
-import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
 import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.commons.codec.binary.Base64;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.mapreduce.FluoKeyValue;
 import org.apache.fluo.mapreduce.FluoKeyValueGenerator;
 import org.apache.hadoop.conf.Configured;
@@ -51,17 +45,21 @@
 
 public class Init extends Configured implements Tool {
 
-  public static final String TRIE_STOP_LEVEL_PROP = FluoConfiguration.FLUO_PREFIX + ".stress.trie.stopLevel";
-  public static final String TRIE_NODE_SIZE_PROP = FluoConfiguration.FLUO_PREFIX + ".stress.trie.node.size";
+  public static final String TRIE_STOP_LEVEL_PROP =
+      FluoConfiguration.FLUO_PREFIX + ".stress.trie.stopLevel";
+  public static final String TRIE_NODE_SIZE_PROP =
+      FluoConfiguration.FLUO_PREFIX + ".stress.trie.node.size";
 
-  public static class UniqueReducer extends Reducer<LongWritable,NullWritable,LongWritable,NullWritable> {
+  public static class UniqueReducer
+      extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
     @Override
-    protected void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
+    protected void reduce(LongWritable key, Iterable<NullWritable> values, Context context)
+        throws IOException, InterruptedException {
       context.write(key, NullWritable.get());
     }
   }
 
-  public static class InitMapper extends Mapper<LongWritable,NullWritable,Text,LongWritable> {
+  public static class InitMapper extends Mapper<LongWritable, NullWritable, Text, LongWritable> {
 
     private int stopLevel;
     private int nodeSize;
@@ -76,7 +74,8 @@
     }
 
     @Override
-    protected void map(LongWritable key, NullWritable val, Context context) throws IOException, InterruptedException {
+    protected void map(LongWritable key, NullWritable val, Context context)
+        throws IOException, InterruptedException {
       Node node = new Node(key.get(), 64 / nodeSize, nodeSize);
       while (node != null) {
         outputKey.set(node.getRowId());
@@ -89,12 +88,13 @@
     }
   }
 
-  public static class InitCombiner extends Reducer<Text,LongWritable,Text,LongWritable> {
+  public static class InitCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
 
     private LongWritable outputVal = new LongWritable();
 
     @Override
-    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+    protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+        throws IOException, InterruptedException {
       long sum = 0;
       for (LongWritable l : values) {
         sum += l.get();
@@ -105,11 +105,12 @@
     }
   }
 
-  public static class InitReducer extends Reducer<Text,LongWritable,Key,Value> {
+  public static class InitReducer extends Reducer<Text, LongWritable, Key, Value> {
     private FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
 
     @Override
-    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+    protected void reduce(Text key, Iterable<LongWritable> values, Context context)
+        throws IOException, InterruptedException {
       long sum = 0;
       for (LongWritable l : values) {
         sum += l.get();
@@ -126,27 +127,23 @@
 
   @Override
   public int run(String[] args) throws Exception {
-    if (args.length != 3) {
-      System.err.println("Usage: " + this.getClass().getSimpleName() + " <fluoProps> <input dir> <tmp dir>");
+    if (args.length != 4) {
+      System.err.println("Usage: " + this.getClass().getSimpleName()
+          + "  <fluo conn props> <app name> <input dir> <tmp dir>");
       System.exit(-1);
     }
 
     FluoConfiguration props = new FluoConfiguration(new File(args[0]));
-    Path input = new Path(args[1]);
-    Path tmp = new Path(args[2]);
+    props.setApplicationName(args[1]);
 
-    int stopLevel;
-    int nodeSize;
-    try (FluoClient client = FluoFactory.newClient(props)) {
-      nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
-      stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
-    }
+    Path input = new Path(args[2]);
+    Path tmp = new Path(args[3]);
 
     int ret = unique(input, new Path(tmp, "nums"));
     if (ret != 0)
       return ret;
 
-    return buildTree(nodeSize, props, tmp, stopLevel);
+    return buildTree(props, tmp);
   }
 
   private int unique(Path input, Path tmp) throws Exception {
@@ -166,12 +163,16 @@
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     SequenceFileOutputFormat.setOutputPath(job, tmp);
 
+    job.getConfiguration().set("mapreduce.job.classloader", "true");
+
     boolean success = job.waitForCompletion(true);
     return success ? 0 : 1;
 
   }
 
-  private int buildTree(int nodeSize, FluoConfiguration props, Path tmp, int stopLevel) throws Exception {
+  private int buildTree(FluoConfiguration props, Path tmp) throws Exception {
+    StressoConfig sconf = StressoConfig.retrieve(props);
+
     Job job = Job.getInstance(getConf());
 
     job.setJarByClass(Init.class);
@@ -181,8 +182,8 @@
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(LongWritable.class);
 
-    job.getConfiguration().setInt(TRIE_NODE_SIZE_PROP, nodeSize);
-    job.getConfiguration().setInt(TRIE_STOP_LEVEL_PROP, stopLevel);
+    job.getConfiguration().setInt(TRIE_NODE_SIZE_PROP, sconf.nodeSize);
+    job.getConfiguration().setInt(TRIE_STOP_LEVEL_PROP, sconf.stopLevel);
 
     job.setInputFormatClass(SequenceFileInputFormat.class);
     SequenceFileInputFormat.addInputPath(job, new Path(tmp, "nums"));
@@ -196,11 +197,10 @@
     job.setPartitionerClass(RangePartitioner.class);
 
     FileSystem fs = FileSystem.get(job.getConfiguration());
-    Connector conn = AccumuloUtil.getConnector(props);
 
     Path splitsPath = new Path(tmp, "splits.txt");
 
-    Collection<Text> splits1 = writeSplits(props, fs, conn, splitsPath);
+    Collection<Text> splits1 = writeSplits(props, fs, splitsPath);
 
     RangePartitioner.setSplitFile(job, splitsPath.toString());
     job.setNumReduceTasks(splits1.size() + 1);
@@ -208,29 +208,43 @@
     Path outPath = new Path(tmp, "out");
     AccumuloFileOutputFormat.setOutputPath(job, outPath);
 
+    job.getConfiguration().set("mapreduce.job.classloader", "true");
+
     boolean success = job.waitForCompletion(true);
 
     if (success) {
       Path failPath = new Path(tmp, "failures");
       fs.mkdirs(failPath);
-      conn.tableOperations().importDirectory(props.getAccumuloTable(), outPath.toString(), failPath.toString(), false);
 
-      //Compacting files makes them local to each tablet and generates files using the tables settings.
-      conn.tableOperations().compact(props.getAccumuloTable(), new CompactionConfig().setWait(true));
+      AccumuloUtil.doTableOp(props, (tableOps, table) -> {
+        tableOps.importDirectory(table, outPath.toString(), failPath.toString(), false);
+
+        // Compacting files makes them local to each tablet and generates files using the tables
+        // settings.
+        tableOps.compact(table, new CompactionConfig().setWait(true));
+      });
+
+
     }
     return success ? 0 : 1;
   }
 
-  private Collection<Text> writeSplits(FluoConfiguration props, FileSystem fs, Connector conn, Path splitsPath) throws Exception {
-    Collection<Text> splits1 = conn.tableOperations().listSplits(props.getAccumuloTable());
-    OutputStream out = new BufferedOutputStream(fs.create(splitsPath));
-    for (Text split : splits1) {
-      out.write(Base64.encodeBase64(split.copyBytes()));
-      out.write('\n');
-    }
+  private Collection<Text> writeSplits(FluoConfiguration props, FileSystem fs, Path splitsPath)
+      throws Exception {
 
-    out.close();
-    return splits1;
+    return AccumuloUtil.getTableOp(props, (tableOps, table) -> {
+      Collection<Text> splits1 = tableOps.listSplits(table);
+      OutputStream out = new BufferedOutputStream(fs.create(splitsPath));
+      for (Text split : splits1) {
+        out.write(Base64.encodeBase64(split.copyBytes()));
+        out.write('\n');
+      }
+
+      out.close();
+      return splits1;
+    });
+
+
   }
 
   public static void main(String[] args) throws Exception {
diff --git a/stresso/src/main/java/stresso/trie/Load.java b/stresso/src/main/java/stresso/trie/Load.java
index 8e1ebfb..af0fa26 100644
--- a/stresso/src/main/java/stresso/trie/Load.java
+++ b/stresso/src/main/java/stresso/trie/Load.java
@@ -48,13 +48,14 @@
   @Override
   public int run(String[] args) throws Exception {
 
-    if (args.length != 2) {
-      log.error("Usage: " + this.getClass().getSimpleName() + "<fluoProps> <input dir>");
+    if (args.length != 3) {
+      log.error("Usage: " + this.getClass().getSimpleName() + "<fluo conn props> <app name> <input dir>");
       System.exit(-1);
     }
 
     FluoConfiguration props = new FluoConfiguration(new File(args[0]));
-    Path input = new Path(args[1]);
+    props.setApplicationName(args[1]);
+    Path input = new Path(args[2]);
 
     Job job = Job.getInstance(getConf());
 
@@ -73,6 +74,7 @@
     FluoOutputFormat.configure(job, props);
 
     job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
+    job.getConfiguration().set("mapreduce.job.classloader", "true");
 
     boolean success = job.waitForCompletion(true);
     return success ? 0 : 1;
diff --git a/stresso/src/main/java/stresso/trie/NodeObserver.java b/stresso/src/main/java/stresso/trie/NodeObserver.java
index d7356c5..f92ec23 100644
--- a/stresso/src/main/java/stresso/trie/NodeObserver.java
+++ b/stresso/src/main/java/stresso/trie/NodeObserver.java
@@ -18,21 +18,21 @@
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.fluo.api.observer.Observer;
 import org.apache.fluo.recipes.core.types.TypedSnapshotBase.Value;
 import org.apache.fluo.recipes.core.types.TypedTransactionBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Observer that looks for count:wait for nodes. If found, it increments count:seen and increments
  * count:wait of parent node in trie
  */
-public class NodeObserver extends AbstractObserver {
+public class NodeObserver implements Observer {
 
-  private static final Logger log = LoggerFactory.getLogger(NodeObserver.class);
+  private final int stopLevel;
 
-  private int stopLevel = 0;
+  public NodeObserver(int stopLevel) {
+    this.stopLevel = stopLevel;
+  }
 
   @Override
   public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
@@ -50,29 +50,15 @@
       ttx.mutate().row(row).col(Constants.COUNT_SEEN_COL).set(childSeen + childWait);
       ttx.mutate().row(row).col(Constants.COUNT_WAIT_COL).delete();
 
-      try {
-        Node node = new Node(row.toString());
-        if (node.getLevel() > stopLevel) {
-          Node parent = node.getParent();
-          Integer parentWait =
-              ttx.get().row(parent.getRowId()).col(Constants.COUNT_WAIT_COL).toInteger(0);
-          ttx.mutate().row(parent.getRowId()).col(Constants.COUNT_WAIT_COL)
-              .set(parentWait + childWait);
-        }
-      } catch (IllegalArgumentException e) {
-        log.error(e.getMessage());
-        e.printStackTrace();
+      Node node = new Node(row.toString());
+      if (node.getLevel() > stopLevel) {
+        Node parent = node.getParent();
+        Integer parentWait =
+            ttx.get().row(parent.getRowId()).col(Constants.COUNT_WAIT_COL).toInteger(0);
+        ttx.mutate().row(parent.getRowId()).col(Constants.COUNT_WAIT_COL)
+            .set(parentWait + childWait);
       }
+
     }
   }
-
-  @Override
-  public void init(Context context) throws Exception {
-    stopLevel = context.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
-  }
-
-  @Override
-  public ObservedColumn getObservedColumn() {
-    return new ObservedColumn(Constants.COUNT_WAIT_COL, NotificationType.STRONG);
-  }
 }
diff --git a/stresso/src/main/java/stresso/trie/Print.java b/stresso/src/main/java/stresso/trie/Print.java
index 62c39a8..77c4374 100644
--- a/stresso/src/main/java/stresso/trie/Print.java
+++ b/stresso/src/main/java/stresso/trie/Print.java
@@ -68,13 +68,11 @@
 
     try (FluoClient client = FluoFactory.newClient(config); Snapshot snap = client.newSnapshot()) {
 
-      int level = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
-      int nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
-
-      RowScanner rows = snap.scanner().over(Span.prefix(String.format("%02d:", level)))
+      StressoConfig sconf = StressoConfig.retrieve(client);
+      
+      RowScanner rows = snap.scanner().over(Span.prefix(String.format("%02d:", sconf.stopLevel)))
           .fetch(Constants.COUNT_SEEN_COL, Constants.COUNT_WAIT_COL).byRow().build();
 
-
       long totalSeen = 0;
       long totalWait = 0;
 
@@ -86,7 +84,7 @@
         String row = columns.getsRow();
         Node node = new Node(row);
 
-        if (node.getNodeSize() == nodeSize) {
+        if (node.getNodeSize() == sconf.nodeSize) {
           for (ColumnValue cv : columns) {
             if (cv.getColumn().equals(Constants.COUNT_SEEN_COL)) {
               totalSeen += Long.parseLong(cv.getsValue());
@@ -108,12 +106,14 @@
 
   public static void main(String[] args) throws Exception {
 
-    if (args.length != 1) {
-      System.err.println("Usage: " + Print.class.getSimpleName() + " <fluo props>");
+    if (args.length != 2) {
+      System.err.println("Usage: " + Print.class.getSimpleName() + " <fluo conn props> <app name>");
       System.exit(-1);
     }
 
-    Stats stats = getStats(new FluoConfiguration(new File(args[0])));
+    FluoConfiguration fconf = new FluoConfiguration(new File(args[0])).setApplicationName(args[1]);
+    
+    Stats stats = getStats(fconf);
 
     System.out.println("Total at root : " + (stats.totalSeen + stats.totalWait));
     System.out.println("Nodes Scanned : " + stats.nodes);
diff --git a/stresso/src/main/java/stresso/trie/Split.java b/stresso/src/main/java/stresso/trie/Split.java
index df8e28f..5f4f00f 100644
--- a/stresso/src/main/java/stresso/trie/Split.java
+++ b/stresso/src/main/java/stresso/trie/Split.java
@@ -25,11 +25,8 @@
 import com.google.common.base.Strings;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.FluoFactory;
+import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.hadoop.io.Text;
 
 public class Split {
@@ -41,77 +38,66 @@
   private static final String TABLE_BALANCER_PROP = "table.balancer";
 
   public static void main(String[] args) throws Exception {
-    if (args.length != 3) {
+    if (args.length != 4) {
       System.err.println("Usage: " + Split.class.getSimpleName()
-          + " <fluo props> <table props> <tablets per level>");
+          + " <fluo conn props> <app name> <table props> <tablets per level>");
       System.exit(-1);
     }
 
     FluoConfiguration config = new FluoConfiguration(new File(args[0]));
+    config.setApplicationName(args[1]);
 
-    int maxTablets = Integer.parseInt(args[2]);
+    int maxTablets = Integer.parseInt(args[3]);
 
-    int nodeSize;
-    int stopLevel;
-    try (FluoClient client = FluoFactory.newClient(config)) {
-      nodeSize = client.getAppConfiguration().getInt(Constants.NODE_SIZE_PROP);
-      stopLevel = client.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
-    }
+    StressoConfig sconf = StressoConfig.retrieve(config);
 
-    setupBalancer(config);
+    AccumuloUtil.doTableOp(config, (tableOps, table) -> {
+      setupBalancer(tableOps, table);
 
-    int level = 64 / nodeSize;
+      int level = 64 / sconf.nodeSize;
 
-    while (level >= stopLevel) {
-      int numTablets = maxTablets;
-      if (numTablets == 0)
-        break;
+      while (level >= sconf.stopLevel) {
+        int numTablets = maxTablets;
+        if (numTablets == 0)
+          break;
 
-      TreeSet<Text> splits = genSplits(level, numTablets);
-      addSplits(config, splits);
-      System.out.printf("Added %d tablets for level %d\n", numTablets, level);
+        TreeSet<Text> splits = genSplits(level, numTablets);
+        tableOps.addSplits(table, splits);
+        System.out.printf("Added %d tablets for level %d\n", numTablets, level);
 
-      level--;
-    }
+        level--;
+      }
 
-    optimizeAccumulo(config, args[1]);
+      optimizeAccumulo(tableOps, table, args[2]);
+    });
   }
 
-  private static void optimizeAccumulo(FluoConfiguration config, String tableProps)
+  private static void optimizeAccumulo(TableOperations tableOps, String table, String tableProps)
       throws Exception {
-    Connector conn = AccumuloUtil.getConnector(config);
 
     Properties tprops = new Properties();
     tprops.load(new ByteArrayInputStream(tableProps.getBytes(StandardCharsets.UTF_8)));
 
     Set<Entry<Object, Object>> es = tprops.entrySet();
     for (Entry<Object, Object> e : es) {
-      conn.tableOperations().setProperty(config.getAccumuloTable(), e.getKey().toString(),
-          e.getValue().toString());
+      tableOps.setProperty(table, e.getKey().toString(), e.getValue().toString());
     }
     try {
-      conn.instanceOperations().setProperty("table.durability", "flush");
-      conn.tableOperations().removeProperty("accumulo.metadata", "table.durability");
-      conn.tableOperations().removeProperty("accumulo.root", "table.durability");
+      tableOps.setProperty(table, "table.durability", "flush");
+      tableOps.removeProperty("accumulo.metadata", "table.durability");
+      tableOps.removeProperty("accumulo.root", "table.durability");
     } catch (AccumuloException e) {
       System.err.println(
           "Unable to set durability settings (error expected in Accumulo 1.6) : " + e.getMessage());
     }
   }
 
-  private static void setupBalancer(FluoConfiguration config) throws AccumuloSecurityException {
-    Connector conn = AccumuloUtil.getConnector(config);
-
-    try {
-      // setting this prop first intentionally because it should fail in 1.6
-      conn.tableOperations().setProperty(config.getAccumuloTable(), RGB_PATTERN_PROP, "(\\d\\d).*");
-      conn.tableOperations().setProperty(config.getAccumuloTable(), RGB_DEFAULT_PROP, "none");
-      conn.tableOperations().setProperty(config.getAccumuloTable(), TABLE_BALANCER_PROP, RGB_CLASS);
-      System.out.println("Setup tablet group balancer");
-    } catch (AccumuloException e) {
-      System.err.println(
-          "Unable to setup tablet balancer (error expected in Accumulo 1.6) : " + e.getMessage());
-    }
+  private static void setupBalancer(TableOperations tableOps, String table)
+      throws AccumuloSecurityException, AccumuloException {
+    tableOps.setProperty(table, RGB_PATTERN_PROP, "(\\d\\d).*");
+    tableOps.setProperty(table, RGB_DEFAULT_PROP, "none");
+    tableOps.setProperty(table, TABLE_BALANCER_PROP, RGB_CLASS);
+    System.out.println("Setup tablet group balancer");
   }
 
   private static TreeSet<Text> genSplits(int level, int numTablets) {
@@ -133,9 +119,4 @@
 
     return splits;
   }
-
-  private static void addSplits(FluoConfiguration config, TreeSet<Text> splits) throws Exception {
-    Connector conn = AccumuloUtil.getConnector(config);
-    conn.tableOperations().addSplits(config.getAccumuloTable(), splits);
-  }
 }
diff --git a/stresso/src/main/java/stresso/trie/StressoConfig.java b/stresso/src/main/java/stresso/trie/StressoConfig.java
new file mode 100644
index 0000000..5bb92c3
--- /dev/null
+++ b/stresso/src/main/java/stresso/trie/StressoConfig.java
@@ -0,0 +1,27 @@
+package stresso.trie;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.config.SimpleConfiguration;
+
+public class StressoConfig {
+  public final int nodeSize;
+  public final int stopLevel;
+  
+  public StressoConfig(int nodeSize, int stopLevel) {
+    this.nodeSize = nodeSize;
+    this.stopLevel = stopLevel;
+  }
+  
+  public static StressoConfig retrieve(FluoConfiguration fc) {
+    try (FluoClient client = FluoFactory.newClient(fc)) {
+      return retrieve(client);
+    }
+  }
+
+  public static StressoConfig retrieve(FluoClient client) {
+    SimpleConfiguration ac = client.getAppConfiguration();
+    return new StressoConfig(ac.getInt(Constants.NODE_SIZE_PROP), ac.getInt(Constants.STOP_LEVEL_PROP));
+  }
+}
diff --git a/stresso/src/main/java/stresso/trie/StressoObserverProvider.java b/stresso/src/main/java/stresso/trie/StressoObserverProvider.java
new file mode 100644
index 0000000..2dd3ae2
--- /dev/null
+++ b/stresso/src/main/java/stresso/trie/StressoObserverProvider.java
@@ -0,0 +1,14 @@
+package stresso.trie;
+
+import static org.apache.fluo.api.observer.Observer.NotificationType.STRONG;
+import static stresso.trie.Constants.COUNT_WAIT_COL;
+
+import org.apache.fluo.api.observer.ObserverProvider;
+
+public class StressoObserverProvider implements ObserverProvider {
+  @Override
+  public void provide(Registry registry, Context ctx) {
+    int stopLevel = ctx.getAppConfiguration().getInt(Constants.STOP_LEVEL_PROP);
+    registry.forColumn(COUNT_WAIT_COL, STRONG).useObserver(new NodeObserver(stopLevel));
+  }
+}
diff --git a/stresso/src/main/java/stresso/trie/Unique.java b/stresso/src/main/java/stresso/trie/Unique.java
index ef0d1cc..6cd9423 100644
--- a/stresso/src/main/java/stresso/trie/Unique.java
+++ b/stresso/src/main/java/stresso/trie/Unique.java
@@ -86,6 +86,8 @@
 
     job.setOutputFormat(NullOutputFormat.class);
 
+    job.set("mapreduce.job.classloader", "true");
+    
     RunningJob runningJob = JobClient.runJob(job);
     runningJob.waitForCompletion();
     numUnique = (int) runningJob.getCounters().getCounter(Stats.UNIQUE);