Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-pirk
diff --git a/docs/org/apache/pirk/schema/data/LoadDataSchemas.html b/docs/org/apache/pirk/schema/data/LoadDataSchemas.html
deleted file mode 100644
index 99649fe..0000000
--- a/docs/org/apache/pirk/schema/data/LoadDataSchemas.html
+++ /dev/null
@@ -1,335 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<!-- NewPage -->
-<html lang="en">
-<head>
-<!-- Generated by javadoc (version 1.7.0_80) on Sun Jul 24 11:37:26 EDT 2016 -->
-<title>LoadDataSchemas</title>
-<meta name="date" content="2016-07-24">
-<link rel="stylesheet" type="text/css" href="../../../../../stylesheet.css" title="Style">
-</head>
-<body>
-<script type="text/javascript"><!--
-    if (location.href.indexOf('is-external=true') == -1) {
-        parent.document.title="LoadDataSchemas";
-    }
-//-->
-</script>
-<noscript>
-<div>JavaScript is disabled on your browser.</div>
-</noscript>
-<!-- ========= START OF TOP NAVBAR ======= -->
-<div class="topNav"><a name="navbar_top">
-<!--   -->
-</a><a href="#skip-navbar_top" title="Skip navigation links"></a><a name="navbar_top_firstrow">
-<!--   -->
-</a>
-<ul class="navList" title="Navigation">
-<li><a href="../../../../../overview-summary.html">Overview</a></li>
-<li><a href="package-summary.html">Package</a></li>
-<li class="navBarCell1Rev">Class</li>
-<li><a href="class-use/LoadDataSchemas.html">Use</a></li>
-<li><a href="package-tree.html">Tree</a></li>
-<li><a href="../../../../../deprecated-list.html">Deprecated</a></li>
-<li><a href="../../../../../index-files/index-1.html">Index</a></li>
-<li><a href="../../../../../help-doc.html">Help</a></li>
-</ul>
-</div>
-<div class="subNav">
-<ul class="navList">
-<li><a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data"><span class="strong">Prev Class</span></a></li>
-<li>Next Class</li>
-</ul>
-<ul class="navList">
-<li><a href="../../../../../index.html?org/apache/pirk/schema/data/LoadDataSchemas.html" target="_top">Frames</a></li>
-<li><a href="LoadDataSchemas.html" target="_top">No Frames</a></li>
-</ul>
-<ul class="navList" id="allclasses_navbar_top">
-<li><a href="../../../../../allclasses-noframe.html">All Classes</a></li>
-</ul>
-<div>
-<script type="text/javascript"><!--
-  allClassesLink = document.getElementById("allclasses_navbar_top");
-  if(window==top) {
-    allClassesLink.style.display = "block";
-  }
-  else {
-    allClassesLink.style.display = "none";
-  }
-  //-->
-</script>
-</div>
-<div>
-<ul class="subNavList">
-<li>Summary:&nbsp;</li>
-<li>Nested&nbsp;|&nbsp;</li>
-<li>Field&nbsp;|&nbsp;</li>
-<li><a href="#constructor_summary">Constr</a>&nbsp;|&nbsp;</li>
-<li><a href="#method_summary">Method</a></li>
-</ul>
-<ul class="subNavList">
-<li>Detail:&nbsp;</li>
-<li>Field&nbsp;|&nbsp;</li>
-<li><a href="#constructor_detail">Constr</a>&nbsp;|&nbsp;</li>
-<li><a href="#method_detail">Method</a></li>
-</ul>
-</div>
-<a name="skip-navbar_top">
-<!--   -->
-</a></div>
-<!-- ========= END OF TOP NAVBAR ========= -->
-<!-- ======== START OF CLASS DATA ======== -->
-<div class="header">
-<div class="subTitle">org.apache.pirk.schema.data</div>
-<h2 title="Class LoadDataSchemas" class="title">Class LoadDataSchemas</h2>
-</div>
-<div class="contentContainer">
-<ul class="inheritance">
-<li>java.lang.Object</li>
-<li>
-<ul class="inheritance">
-<li>org.apache.pirk.schema.data.LoadDataSchemas</li>
-</ul>
-</li>
-</ul>
-<div class="description">
-<ul class="blockList">
-<li class="blockList">
-<hr>
-<br>
-<pre>public class <span class="strong">LoadDataSchemas</span>
-extends java.lang.Object</pre>
-<div class="block">Class to load any data schemas specified in the properties file, 'data.schemas'
- <p>
- Schemas should be specified as follows; all items are treated in a case insensitive manner:
- 
- <pre>
- <code>&lt;schema&gt;
-  &lt;schemaName&gt; name of the schema &lt;/schemaName&gt;
-  &lt;element&gt;
-      &lt;name&gt; element name /name&gt;
-      &lt;type&gt; class name or type name (if Java primitive type) of the element &lt;/type&gt;
-      &lt;isArray&gt; true or false -- whether or not the schema element is an array within the data &lt;/isArray&gt;
-      &lt;partitioner&gt; optional - Partitioner class for the element; defaults to primitive java type partitioner &lt;/partitioner&gt; 
-  &lt;/element&gt;
- &lt;/schema&gt;
- </code>
- </pre>
- 
- Primitive types must be one of the following: "byte", "short", "int", "long", "float", "double", "char", "string"
- <p></div>
-</li>
-</ul>
-</div>
-<div class="summary">
-<ul class="blockList">
-<li class="blockList">
-<!-- ======== CONSTRUCTOR SUMMARY ======== -->
-<ul class="blockList">
-<li class="blockList"><a name="constructor_summary">
-<!--   -->
-</a>
-<h3>Constructor Summary</h3>
-<table class="overviewSummary" border="0" cellpadding="3" cellspacing="0" summary="Constructor Summary table, listing constructors, and an explanation">
-<caption><span>Constructors</span><span class="tabEnd">&nbsp;</span></caption>
-<tr>
-<th class="colOne" scope="col">Constructor and Description</th>
-</tr>
-<tr class="altColor">
-<td class="colOne"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#LoadDataSchemas()">LoadDataSchemas</a></strong>()</code>&nbsp;</td>
-</tr>
-</table>
-</li>
-</ul>
-<!-- ========== METHOD SUMMARY =========== -->
-<ul class="blockList">
-<li class="blockList"><a name="method_summary">
-<!--   -->
-</a>
-<h3>Method Summary</h3>
-<table class="overviewSummary" border="0" cellpadding="3" cellspacing="0" summary="Method Summary table, listing methods, and an explanation">
-<caption><span>Methods</span><span class="tabEnd">&nbsp;</span></caption>
-<tr>
-<th class="colFirst" scope="col">Modifier and Type</th>
-<th class="colLast" scope="col">Method and Description</th>
-</tr>
-<tr class="altColor">
-<td class="colFirst"><code>static <a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data">DataSchema</a></code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#getSchema(java.lang.String)">getSchema</a></strong>(java.lang.String&nbsp;schemaName)</code>&nbsp;</td>
-</tr>
-<tr class="rowColor">
-<td class="colFirst"><code>static java.util.HashMap&lt;java.lang.String,<a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data">DataSchema</a>&gt;</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#getSchemaMap()">getSchemaMap</a></strong>()</code>&nbsp;</td>
-</tr>
-<tr class="altColor">
-<td class="colFirst"><code>static java.util.Set&lt;java.lang.String&gt;</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#getSchemaNames()">getSchemaNames</a></strong>()</code>&nbsp;</td>
-</tr>
-<tr class="rowColor">
-<td class="colFirst"><code>static void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#initialize()">initialize</a></strong>()</code>&nbsp;</td>
-</tr>
-<tr class="altColor">
-<td class="colFirst"><code>static void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#initialize(boolean,%20org.apache.hadoop.fs.FileSystem)">initialize</a></strong>(boolean&nbsp;hdfs,
-          org.apache.hadoop.fs.FileSystem&nbsp;fs)</code>&nbsp;</td>
-</tr>
-</table>
-<ul class="blockList">
-<li class="blockList"><a name="methods_inherited_from_class_java.lang.Object">
-<!--   -->
-</a>
-<h3>Methods inherited from class&nbsp;java.lang.Object</h3>
-<code>equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait</code></li>
-</ul>
-</li>
-</ul>
-</li>
-</ul>
-</div>
-<div class="details">
-<ul class="blockList">
-<li class="blockList">
-<!-- ========= CONSTRUCTOR DETAIL ======== -->
-<ul class="blockList">
-<li class="blockList"><a name="constructor_detail">
-<!--   -->
-</a>
-<h3>Constructor Detail</h3>
-<a name="LoadDataSchemas()">
-<!--   -->
-</a>
-<ul class="blockListLast">
-<li class="blockList">
-<h4>LoadDataSchemas</h4>
-<pre>public&nbsp;LoadDataSchemas()</pre>
-</li>
-</ul>
-</li>
-</ul>
-<!-- ============ METHOD DETAIL ========== -->
-<ul class="blockList">
-<li class="blockList"><a name="method_detail">
-<!--   -->
-</a>
-<h3>Method Detail</h3>
-<a name="initialize()">
-<!--   -->
-</a>
-<ul class="blockList">
-<li class="blockList">
-<h4>initialize</h4>
-<pre>public static&nbsp;void&nbsp;initialize()
-                       throws java.lang.Exception</pre>
-<dl><dt><span class="strong">Throws:</span></dt>
-<dd><code>java.lang.Exception</code></dd></dl>
-</li>
-</ul>
-<a name="initialize(boolean, org.apache.hadoop.fs.FileSystem)">
-<!--   -->
-</a>
-<ul class="blockList">
-<li class="blockList">
-<h4>initialize</h4>
-<pre>public static&nbsp;void&nbsp;initialize(boolean&nbsp;hdfs,
-              org.apache.hadoop.fs.FileSystem&nbsp;fs)
-                       throws java.lang.Exception</pre>
-<dl><dt><span class="strong">Throws:</span></dt>
-<dd><code>java.lang.Exception</code></dd></dl>
-</li>
-</ul>
-<a name="getSchemaMap()">
-<!--   -->
-</a>
-<ul class="blockList">
-<li class="blockList">
-<h4>getSchemaMap</h4>
-<pre>public static&nbsp;java.util.HashMap&lt;java.lang.String,<a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data">DataSchema</a>&gt;&nbsp;getSchemaMap()</pre>
-</li>
-</ul>
-<a name="getSchemaNames()">
-<!--   -->
-</a>
-<ul class="blockList">
-<li class="blockList">
-<h4>getSchemaNames</h4>
-<pre>public static&nbsp;java.util.Set&lt;java.lang.String&gt;&nbsp;getSchemaNames()</pre>
-</li>
-</ul>
-<a name="getSchema(java.lang.String)">
-<!--   -->
-</a>
-<ul class="blockListLast">
-<li class="blockList">
-<h4>getSchema</h4>
-<pre>public static&nbsp;<a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data">DataSchema</a>&nbsp;getSchema(java.lang.String&nbsp;schemaName)</pre>
-</li>
-</ul>
-</li>
-</ul>
-</li>
-</ul>
-</div>
-</div>
-<!-- ========= END OF CLASS DATA ========= -->
-<!-- ======= START OF BOTTOM NAVBAR ====== -->
-<div class="bottomNav"><a name="navbar_bottom">
-<!--   -->
-</a><a href="#skip-navbar_bottom" title="Skip navigation links"></a><a name="navbar_bottom_firstrow">
-<!--   -->
-</a>
-<ul class="navList" title="Navigation">
-<li><a href="../../../../../overview-summary.html">Overview</a></li>
-<li><a href="package-summary.html">Package</a></li>
-<li class="navBarCell1Rev">Class</li>
-<li><a href="class-use/LoadDataSchemas.html">Use</a></li>
-<li><a href="package-tree.html">Tree</a></li>
-<li><a href="../../../../../deprecated-list.html">Deprecated</a></li>
-<li><a href="../../../../../index-files/index-1.html">Index</a></li>
-<li><a href="../../../../../help-doc.html">Help</a></li>
-</ul>
-</div>
-<div class="subNav">
-<ul class="navList">
-<li><a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data"><span class="strong">Prev Class</span></a></li>
-<li>Next Class</li>
-</ul>
-<ul class="navList">
-<li><a href="../../../../../index.html?org/apache/pirk/schema/data/LoadDataSchemas.html" target="_top">Frames</a></li>
-<li><a href="LoadDataSchemas.html" target="_top">No Frames</a></li>
-</ul>
-<ul class="navList" id="allclasses_navbar_bottom">
-<li><a href="../../../../../allclasses-noframe.html">All Classes</a></li>
-</ul>
-<div>
-<script type="text/javascript"><!--
-  allClassesLink = document.getElementById("allclasses_navbar_bottom");
-  if(window==top) {
-    allClassesLink.style.display = "block";
-  }
-  else {
-    allClassesLink.style.display = "none";
-  }
-  //-->
-</script>
-</div>
-<div>
-<ul class="subNavList">
-<li>Summary:&nbsp;</li>
-<li>Nested&nbsp;|&nbsp;</li>
-<li>Field&nbsp;|&nbsp;</li>
-<li><a href="#constructor_summary">Constr</a>&nbsp;|&nbsp;</li>
-<li><a href="#method_summary">Method</a></li>
-</ul>
-<ul class="subNavList">
-<li>Detail:&nbsp;</li>
-<li>Field&nbsp;|&nbsp;</li>
-<li><a href="#constructor_detail">Constr</a>&nbsp;|&nbsp;</li>
-<li><a href="#method_detail">Method</a></li>
-</ul>
-</div>
-<a name="skip-navbar_bottom">
-<!--   -->
-</a></div>
-<!-- ======== END OF BOTTOM NAVBAR ======= -->
-</body>
-</html>
diff --git a/pom-with-benchmarks.xml b/pom-with-benchmarks.xml
index 9c85639..ee98c43 100644
--- a/pom-with-benchmarks.xml
+++ b/pom-with-benchmarks.xml
@@ -17,17 +17,17 @@
   -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
 
- 
+
 	<groupId>org.apache.pirk</groupId>
 	<artifactId>pirk</artifactId>
 	<version>0.0.1-SNAPSHOT</version>
 	<packaging>jar</packaging>
-	
+
 	<name>Apache Pirk (incubating) Project</name>
-	<description>Apache Pirk (incubating) is a framework for scalable 
+	<description>Apache Pirk (incubating) is a framework for scalable
 		Private Information Retrieval (PIR). </description>
 	<url>http://pirk.incubator.apache.org/</url>
 
@@ -70,12 +70,12 @@
 				<updatePolicy>always</updatePolicy>
 			</snapshots>
 		</repository>
-		
+
 		<repository>
 			<id>conjars.org</id>
 			<url>http://conjars.org/repo</url>
 		</repository>
-             
+
 	</repositories>
 
 	<properties>
@@ -123,7 +123,7 @@
 			<artifactId>spark-core_2.11</artifactId>
 			<version>1.6.1</version>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>com.googlecode.json-simple</groupId>
 			<artifactId>json-simple</artifactId>
@@ -135,7 +135,7 @@
 			<artifactId>commons-net</artifactId>
 			<version>3.3</version>
 		</dependency>
-            
+
 		<dependency>
 			<groupId>org.elasticsearch</groupId>
 			<artifactId>elasticsearch-hadoop</artifactId>
@@ -155,61 +155,61 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
-		
+
 		<!-- Square's JNA GMP module-->
 		<dependency>
-		    <groupId>com.squareup.jnagmp</groupId>
-		    <artifactId>jnagmp</artifactId>
-		    <version>1.1.0</version>
+			<groupId>com.squareup.jnagmp</groupId>
+			<artifactId>jnagmp</artifactId>
+			<version>1.1.0</version>
 		</dependency>
 		<!-- JMH for benchmarking the Paillier functions -->
 		<dependency>
-		    <groupId>org.openjdk.jmh</groupId>
-		    <artifactId>jmh-core</artifactId>
-		    <version>${jmh.version}</version>
+			<groupId>org.openjdk.jmh</groupId>
+			<artifactId>jmh-core</artifactId>
+			<version>${jmh.version}</version>
 		</dependency>
 		<dependency>
-		    <groupId>org.openjdk.jmh</groupId>
-		    <artifactId>jmh-generator-annprocess</artifactId>
-		    <version>${jmh.version}</version>
+			<groupId>org.openjdk.jmh</groupId>
+			<artifactId>jmh-generator-annprocess</artifactId>
+			<version>${jmh.version}</version>
 		</dependency>
-		
+
 	</dependencies>
 
 	<build>
 		<plugins>
-		<plugin>
-			<groupId>org.apache.maven.plugins</groupId>
-			<artifactId>maven-surefire-plugin</artifactId>
-			<version>2.18</version>
-			<configuration>
-			<redirectTestOutputToFile>true</redirectTestOutputToFile>
-			<argLine combine.children="append">-Xmx1G
-				-Djava.net.preferIPv4Stack=true</argLine>
-			</configuration>
-			<dependencies>
-				<dependency>
-				<!-- Force surefire to use JUnit -->
-				<groupId>org.apache.maven.surefire</groupId>
-				<artifactId>surefire-junit4</artifactId>
-				<version>2.18</version>
-				</dependency>
-			</dependencies>
-                </plugin>
 			<plugin>
-			    <groupId>org.apache.maven.plugins</groupId>
-			    <artifactId>maven-compiler-plugin</artifactId>
-			    <version>3.1</version>
-			    <configuration>
-			        <compilerVersion>${javac.target}</compilerVersion>
-			        <source>${javac.target}</source>
-			        <target>${javac.target}</target>
-			    </configuration>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.18</version>
+				<configuration>
+					<redirectTestOutputToFile>true</redirectTestOutputToFile>
+					<argLine combine.children="append">-Xmx1G
+						-Djava.net.preferIPv4Stack=true</argLine>
+				</configuration>
+				<dependencies>
+					<dependency>
+						<!-- Force surefire to use JUnit -->
+						<groupId>org.apache.maven.surefire</groupId>
+						<artifactId>surefire-junit4</artifactId>
+						<version>2.18</version>
+					</dependency>
+				</dependencies>
 			</plugin>
 			<plugin>
-			    <groupId>org.apache.maven.plugins</groupId>
-			    <artifactId>maven-shade-plugin</artifactId>
-			    <version>2.4.3</version> <!-- older versions of maven-shade-plugin make JMH painful -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<version>3.1</version>
+				<configuration>
+					<compilerVersion>${javac.target}</compilerVersion>
+					<source>${javac.target}</source>
+					<target>${javac.target}</target>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<version>2.4.3</version> <!-- older versions of maven-shade-plugin make JMH painful -->
 
 				<executions>
 					<execution>
@@ -223,10 +223,10 @@
 							<shadedClassifierName>exe</shadedClassifierName>
 							<transformers>
 								<transformer
-									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+										implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
 								</transformer>
 								<transformer
-									implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
+										implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
 								</transformer>
 							</transformers>
 							<filters>
@@ -242,37 +242,37 @@
 						</configuration>
 					</execution>
 					<execution>
-					    <phase>package</phase>
-					    <id>benchmark</id>
-					    <goals>
-					        <goal>shade</goal>
-					    </goals>
-					    <configuration>
-					        <!-- The jar is very corrupted if it isn't minimized -->
-					        <minimizeJar>true</minimizeJar>
-					        <finalName>${benchmarkjar.name}</finalName>
-					        <transformers>
-					            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-					                <mainClass>org.openjdk.jmh.Main</mainClass>
-					            </transformer>
-					        </transformers>
-					        <filters>
-					            <filter>
-					                <!--
-					                    Shading signed JARs will fail without this.
-					                    http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
-					                -->
-					                <artifact>*:*</artifact>
-					                <excludes>
-					                    <exclude>META-INF/*.SF</exclude>
-					                    <exclude>META-INF/*.DSA</exclude>
-					                    <exclude>META-INF/*.RSA</exclude>
-					                </excludes>
-					            </filter>
-					        </filters>
-					    </configuration>
+						<phase>package</phase>
+						<id>benchmark</id>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<!-- The jar is very corrupted if it isn't minimized -->
+							<minimizeJar>true</minimizeJar>
+							<finalName>${benchmarkjar.name}</finalName>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+									<mainClass>org.openjdk.jmh.Main</mainClass>
+								</transformer>
+							</transformers>
+							<filters>
+								<filter>
+									<!--
+                                        Shading signed JARs will fail without this.
+                                        http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
+                                    -->
+									<artifact>*:*</artifact>
+									<excludes>
+										<exclude>META-INF/*.SF</exclude>
+										<exclude>META-INF/*.DSA</exclude>
+										<exclude>META-INF/*.RSA</exclude>
+									</excludes>
+								</filter>
+							</filters>
+						</configuration>
 					</execution>
-					
+
 				</executions>
 			</plugin>
 		</plugins>
diff --git a/pom.xml b/pom.xml
index b46325e..c14a928 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,653 +11,747 @@
 	License for the specific language governing permissions and ~ limitations 
 	under the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
 
-	<parent>
-		<groupId>org.apache</groupId>
-		<artifactId>apache</artifactId>
-		<version>18</version>
-	</parent>
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>apache</artifactId>
+        <version>18</version>
+    </parent>
 
-	<groupId>org.apache.pirk</groupId>
-	<artifactId>apache-pirk</artifactId>
-	<version>0.0.1-SNAPSHOT</version>
-	<packaging>jar</packaging>
+    <groupId>org.apache.pirk</groupId>
+    <artifactId>apache-pirk</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
 
-	<name>Apache Pirk (incubating) Project</name>
-	<description>Apache Pirk (incubating) is a framework for scalable Private Information Retrieval (PIR). </description>
-	<url>http://pirk.incubator.apache.org/</url>
+    <name>Apache Pirk (incubating) Project</name>
+    <description>Apache Pirk (incubating) is a framework for scalable Private Information Retrieval (PIR).</description>
+    <url>http://pirk.incubator.apache.org/</url>
 
-	<!-- this is the year of inception at ASF -->
-	<inceptionYear>2016</inceptionYear>
+    <!-- this is the year of inception at ASF -->
+    <inceptionYear>2016</inceptionYear>
 
-	<organization>
-		<name>The Apache Software Foundation</name>
-		<url>https://www.apache.org</url>
-	</organization>
+    <organization>
+        <name>The Apache Software Foundation</name>
+        <url>https://www.apache.org</url>
+    </organization>
 
-	<licenses>
-		<license>
-			<name>Apache License, Version 2.0</name>
-			<url>https://www.apache.org/licenses/LICENSE-2.0</url>
-		</license>
-	</licenses>
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>https://www.apache.org/licenses/LICENSE-2.0</url>
+        </license>
+    </licenses>
 
-	<mailingLists>
-		<mailingList>
-			<name>Dev</name>
-			<subscribe>dev-subscribe@pirk.incubator.apache.org</subscribe>
-			<unsubscribe>dev-unsubscribe@pirk.incubator.apache.org</unsubscribe>
-			<post>dev@pirk.incubator.apache.org</post>
-			<archive>http://mail-archives.apache.org/mod_mbox/incubator-pirk-dev/</archive>
-		</mailingList>
-		<mailingList>
-			<name>Commits</name>
-			<subscribe>commits-subscribe@pirk.incubator.apache.org</subscribe>
-			<unsubscribe>commits-unsubscribe@pirk.incubator.apache.org</unsubscribe>
-			<archive>http://mail-archives.apache.org/mod_mbox/incubator-pirk-commits</archive>
-		</mailingList>
-	</mailingLists>
+    <mailingLists>
+        <mailingList>
+            <name>Dev</name>
+            <subscribe>dev-subscribe@pirk.incubator.apache.org</subscribe>
+            <unsubscribe>dev-unsubscribe@pirk.incubator.apache.org</unsubscribe>
+            <post>dev@pirk.incubator.apache.org</post>
+            <archive>http://mail-archives.apache.org/mod_mbox/incubator-pirk-dev/</archive>
+        </mailingList>
+        <mailingList>
+            <name>Commits</name>
+            <subscribe>commits-subscribe@pirk.incubator.apache.org</subscribe>
+            <unsubscribe>commits-unsubscribe@pirk.incubator.apache.org</unsubscribe>
+            <archive>http://mail-archives.apache.org/mod_mbox/incubator-pirk-commits</archive>
+        </mailingList>
+    </mailingLists>
 
-	<scm>
-		<connection>scm:git:git://git.apache.org/incubator-pirk.git</connection>
-		<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-pirk.git</developerConnection>
-		<url>https://git-wip-us.apache.org/repos/asf?p=incubator-pirk.git</url>
-		<tag>HEAD</tag>
-	</scm>
+    <scm>
+        <connection>scm:git:git://git.apache.org/incubator-pirk.git</connection>
+        <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-pirk.git</developerConnection>
+        <url>https://git-wip-us.apache.org/repos/asf?p=incubator-pirk.git</url>
+        <tag>HEAD</tag>
+    </scm>
 
-	<issueManagement>
-		<system>JIRA</system>
-		<url>https://issues.apache.org/jira/browse/PIRK</url>
-	</issueManagement>
+    <issueManagement>
+        <system>JIRA</system>
+        <url>https://issues.apache.org/jira/browse/PIRK</url>
+    </issueManagement>
 
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<maven.compiler.source>1.8</maven.compiler.source>
-		<maven.compiler.target>1.8</maven.compiler.target>
-		<scala.version>2.10.4</scala.version>
-		<jmh.version>1.11.3</jmh.version>
-		<benchmarkjar.name>benchmarks</benchmarkjar.name>
-		<javac.target>1.8</javac.target>
-		<slf4j.version>1.7.21</slf4j.version>
-		<log4j2.version>2.6.2</log4j2.version>
-		<junit.version>4.12</junit.version>
-		<log4j.configuration>log4j2.xml</log4j.configuration>
-		<hadoop.version>2.7.2</hadoop.version>
-		<apache-commons.version>3.3</apache-commons.version>
-		<elasticsearch.version>2.3.3</elasticsearch.version>
-		<spark-streaming.version>2.0.0</spark-streaming.version>
-		<pirk.forkCount>1C</pirk.forkCount>
-		<pirk.reuseForks>true</pirk.reuseForks>
-	</properties>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <scala.version>2.10.4</scala.version>
+        <jmh.version>1.11.3</jmh.version>
+        <benchmarkjar.name>benchmarks</benchmarkjar.name>
+        <javac.target>1.8</javac.target>
+        <slf4j.version>1.7.21</slf4j.version>
+        <log4j2.version>2.1</log4j2.version>
+        <junit.version>4.12</junit.version>
+        <log4j.configuration>log4j2.xml</log4j.configuration>
+        <hadoop.version>2.7.2</hadoop.version>
+        <spark.version>1.6.1</spark.version>
+        <apache-commons.version>3.3</apache-commons.version>
+        <elasticsearch.version>2.3.3</elasticsearch.version>
+        <storm.version>1.0.1</storm.version>
+        <kafka.version>0.9.0.1</kafka.version>
+        <spark-streaming.version>2.0.0</spark-streaming.version>
+        <pirk.forkCount>1C</pirk.forkCount>
+        <pirk.reuseForks>true</pirk.reuseForks>
+    </properties>
 
-	<dependencies>
-		<dependency>
-			<groupId>junit</groupId>
-			<artifactId>junit</artifactId>
-			<version>4.12</version>
-		</dependency>
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.commons</groupId>
-			<artifactId>commons-math3</artifactId>
-			<version>${apache-commons.version}</version>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>${apache-commons.version}</version>
+        </dependency>
 
-		<dependency>
-			<groupId>com.googlecode.json-simple</groupId>
-			<artifactId>json-simple</artifactId>
-			<version>1.1</version>
-		</dependency>
+        <dependency>
+            <groupId>com.googlecode.json-simple</groupId>
+            <artifactId>json-simple</artifactId>
+            <version>1.1</version>
+        </dependency>
 
-		<dependency>
-			<groupId>commons-net</groupId>
-			<artifactId>commons-net</artifactId>
-			<version>${apache-commons.version}</version>
-		</dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+            <version>${apache-commons.version}</version>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-			<version>${hadoop.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-mapreduce-client-core</artifactId>
-			<version>${hadoop.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-client</artifactId>
-			<version>${hadoop.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>xerces</groupId>
+                    <artifactId>xercesImpl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.spark</groupId>
-			<artifactId>spark-core_2.11</artifactId>
-			<version>1.6.1</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scalap</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-reflect</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
-		<dependency>
-			<groupId>org.elasticsearch</groupId>
-			<artifactId>elasticsearch-hadoop</artifactId>
-			<version>${elasticsearch.version}</version>
-			<exclusions>
-				<exclusion>
-					<artifactId>commons-net</artifactId>
-					<groupId>commons-net</groupId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.hive</groupId>
-					<artifactId>hive-service</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.storm</groupId>
-					<artifactId>storm-core</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>cascading</groupId>
-					<artifactId>cascading-local</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>cascading</groupId>
-					<artifactId>cascading-core</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>cascading</groupId>
-					<artifactId>cascading-hadoop</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.10</artifactId>
+            <version>${spark-streaming.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.spark</groupId>
-			<artifactId>spark-streaming_2.10</artifactId>
-			<version>${spark-streaming.version}</version>
-		</dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch-hadoop</artifactId>
+            <version>${elasticsearch.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-net</artifactId>
+                    <groupId>commons-net</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-service</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.storm</groupId>
+                    <artifactId>storm-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>cascading</groupId>
+                    <artifactId>cascading-local</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>cascading</groupId>
+                    <artifactId>cascading-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>cascading</groupId>
+                    <artifactId>cascading-hadoop</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
-		<!-- Square's JNA GMP module -->
-		<dependency>
-			<groupId>com.squareup.jnagmp</groupId>
-			<artifactId>jnagmp</artifactId>
-			<version>1.1.0</version>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${storm.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
-		<!-- JMH for benchmarking the Paillier functions -->
-		<dependency>
-			<groupId>org.openjdk.jmh</groupId>
-			<artifactId>jmh-core</artifactId>
-			<version>${jmh.version}</version>
-			<scope>provided</scope>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${storm.version}</version>
+        </dependency>
 
-		<dependency>
-			<groupId>org.openjdk.jmh</groupId>
-			<artifactId>jmh-generator-annprocess</artifactId>
-			<version>${jmh.version}</version>
-			<scope>provided</scope>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>${kafka.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
-		<!-- Sl4j modules -->
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-api</artifactId>
-			<version>${slf4j.version}</version>
-		</dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>2.10.0</version>
+            <scope>test</scope>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-slf4j-impl</artifactId>
-			<version>2.6.2</version>
-		</dependency>
+        <!-- Square's JNA GMP module -->
+        <dependency>
+            <groupId>com.squareup.jnagmp</groupId>
+            <artifactId>jnagmp</artifactId>
+            <version>1.1.0</version>
+        </dependency>
 
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-core</artifactId>
-			<version>2.6.2</version>
-		</dependency>
+        <!-- JMH for benchmarking the Paillier functions -->
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <version>${jmh.version}</version>
+            <scope>provided</scope>
+        </dependency>
 
-	</dependencies>
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <version>${jmh.version}</version>
+            <scope>provided</scope>
+        </dependency>
 
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-surefire-plugin</artifactId>
-			</plugin>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-			</plugin>
+        <!-- Sl4j modules -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
 
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-			</plugin>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j2.version}</version>
+        </dependency>
 
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-source-plugin</artifactId>
-			</plugin>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j2.version}</version>
+        </dependency>
 
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-			</plugin>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
 
-			<plugin>
-				<groupId>org.apache.rat</groupId>
-				<artifactId>apache-rat-plugin</artifactId>
-			</plugin>
+    </dependencies>
 
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-release-plugin</artifactId>
-			</plugin>
-		</plugins>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
 
-		<pluginManagement>
-			<plugins>
-				<plugin>
-					<groupId>org.apache.rat</groupId>
-					<artifactId>apache-rat-plugin</artifactId>
-					<version>0.12</version>
-					<configuration>
-						<excludes>
-							<exclude>.travis.yml</exclude> <!-- Travis CI Build Descriptor File -->
-							<exclude>findbugs-exclude.xml</exclude> <!-- False positives for FindBugs analysis -->
-							<exclude>KEYS</exclude> <!-- GPG keys of Release Managers -->
-							<exclude>eclipse*.xml</exclude> <!-- Exclude eclipse* xml -->
-							<exclude>docs/*</exclude> <!-- Exclude docs -->
-							<exclude>logs/*</exclude> <!-- Exclude logs -->
-							<exclude>**/m2.conf</exclude> <!-- Exclude Maven conf which gets installed on travis and fails RAT check -->
-							<exclude>src/main/resources/META-INF/**</exclude>  
-						</excludes>
-					</configuration>
-					<dependencies>
-						<!-- workaround for RAT-158 -->
-						<dependency>
-							<groupId>org.apache.maven.doxia</groupId>
-							<artifactId>doxia-core</artifactId>
-							<version>1.6</version>
-							<exclusions>
-								<exclusion>
-									<groupId>xerces</groupId>
-									<artifactId>xercesImpl</artifactId>
-								</exclusion>
-							</exclusions>
-						</dependency>
-					</dependencies>
-					<executions>
-						<execution>
-							<phase>validate</phase>
-							<goals>
-								<goal>check</goal>
-							</goals>
-						</execution>
-					</executions>
-				</plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+            </plugin>
 
-				<!-- Coverage analysis for tests -->
-				<plugin>
-					<groupId>org.jacoco</groupId>
-					<artifactId>jacoco-maven-plugin</artifactId>
-					<version>0.7.5.201505241946</version>
-					<executions>
-						<execution>
-							<goals>
-								<goal>prepare-agent</goal>
-							</goals>
-							<configuration>
-								<output>file</output>
-								<dumpOnExit>true</dumpOnExit>
-							</configuration>
-						</execution>
-						<execution>
-							<id>report</id>
-							<phase>prepare-package</phase>
-							<goals>
-								<goal>report</goal>
-							</goals>
-						</execution>
-					</executions>
-				</plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-source-plugin</artifactId>
+            </plugin>
 
-				<!-- Report jacoco coverage to coveralls.io -->
-				<plugin>
-					<groupId>org.eluder.coveralls</groupId>
-					<artifactId>coveralls-maven-plugin</artifactId>
-					<version>4.1.0</version>
-				</plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-release-plugin</artifactId>
+            </plugin>
+        </plugins>
+
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.rat</groupId>
+                    <artifactId>apache-rat-plugin</artifactId>
+                    <version>0.12</version>
+                    <configuration>
+                        <excludes>
+                            <exclude>.travis.yml</exclude> <!-- Travis CI Build Descriptor File -->
+                            <exclude>findbugs-exclude.xml</exclude> <!-- False positives for FindBugs analysis -->
+                            <exclude>KEYS</exclude> <!-- GPG keys of Release Managers -->
+                            <exclude>eclipse*.xml</exclude> <!-- Exclude eclipse* xml -->
+                            <exclude>docs/*</exclude> <!-- Exclude docs -->
+                            <exclude>logs/*</exclude> <!-- Exclude logs -->
+                            <exclude>**/m2.conf</exclude> <!-- Exclude Maven conf which gets installed on travis and fails RAT check -->
+                            <exclude>src/main/resources/META-INF/**</exclude>
+                        </excludes>
+                    </configuration>
+                    <dependencies>
+                        <!-- workaround for RAT-158 -->
+                        <dependency>
+                            <groupId>org.apache.maven.doxia</groupId>
+                            <artifactId>doxia-core</artifactId>
+                            <version>1.6</version>
+                            <exclusions>
+                                <exclusion>
+                                    <groupId>xerces</groupId>
+                                    <artifactId>xercesImpl</artifactId>
+                                </exclusion>
+                            </exclusions>
+                        </dependency>
+                    </dependencies>
+                    <executions>
+                        <execution>
+                            <phase>validate</phase>
+                            <goals>
+                                <goal>check</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+
+                <!-- Coverage analysis for tests -->
+                <plugin>
+                    <groupId>org.jacoco</groupId>
+                    <artifactId>jacoco-maven-plugin</artifactId>
+                    <version>0.7.5.201505241946</version>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>prepare-agent</goal>
+                            </goals>
+                            <configuration>
+                                <output>file</output>
+                                <dumpOnExit>true</dumpOnExit>
+                            </configuration>
+                        </execution>
+                        <execution>
+                            <id>report</id>
+                            <phase>prepare-package</phase>
+                            <goals>
+                                <goal>report</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+
+                <!-- Report jacoco coverage to coveralls.io -->
+                <plugin>
+                    <groupId>org.eluder.coveralls</groupId>
+                    <artifactId>coveralls-maven-plugin</artifactId>
+                    <version>4.1.0</version>
+                </plugin>
 
 
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-surefire-plugin</artifactId>
-					<version>2.18</version>
-					<configuration>
-						<redirectTestOutputToFile>true</redirectTestOutputToFile>
-						<argLine combine.children="append">-Xmx1G
-							-Djava.net.preferIPv4Stack=true
-						</argLine>
-						<systemPropertyVariables>
-							<log4j.configuration>${log4j.configuration}</log4j.configuration>
-						</systemPropertyVariables>
-						<forkCount>${pirk.forkCount}</forkCount>
-						<reuseForks>${pirk.reuseForks}</reuseForks>
-					</configuration>
-					<dependencies>
-						<dependency>
-							<!-- Force surefire to use JUnit -->
-							<groupId>org.apache.maven.surefire</groupId>
-							<artifactId>surefire-junit4</artifactId>
-							<version>2.18</version>
-						</dependency>
-					</dependencies>
-				</plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>2.18</version>
+                    <configuration>
+                        <redirectTestOutputToFile>true</redirectTestOutputToFile>
+                        <argLine combine.children="append">-Xmx1G
+                            -Djava.net.preferIPv4Stack=true
+                        </argLine>
+                        <systemPropertyVariables>
+                            <log4j.configuration>${log4j.configuration}</log4j.configuration>
+                        </systemPropertyVariables>
+                        <forkCount>${pirk.forkCount}</forkCount>
+                        <reuseForks>${pirk.reuseForks}</reuseForks>
+                    </configuration>
+                    <dependencies>
+                        <dependency>
+                            <!-- Force surefire to use JUnit -->
+                            <groupId>org.apache.maven.surefire</groupId>
+                            <artifactId>surefire-junit4</artifactId>
+                            <version>2.18</version>
+                        </dependency>
+                    </dependencies>
+                </plugin>
 
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-compiler-plugin</artifactId>
-					<version>3.5.1</version>
-					<configuration>
-						<compilerVersion>${javac.target}</compilerVersion>
-						<source>${javac.target}</source>
-						<target>${javac.target}</target>
-					</configuration>
-				</plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>3.5.1</version>
+                    <configuration>
+                        <compilerVersion>${javac.target}</compilerVersion>
+                        <source>${javac.target}</source>
+                        <target>${javac.target}</target>
+                    </configuration>
+                </plugin>
 
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-jar-plugin</artifactId>
-					<version>3.0.1</version>
-					<configuration>
-						<excludes>
-							<exclude>org/apache/pirk/benchmark/**</exclude>
-							<exclude>*/openjdk/**</exclude>
-							<exclude>generated-sources/**</exclude>
-						</excludes>
-					</configuration>
-				</plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-jar-plugin</artifactId>
+                    <version>3.0.1</version>
+                    <configuration>
+                        <excludes>
+                            <exclude>org/apache/pirk/benchmark/**</exclude>
+                            <exclude>*/openjdk/**</exclude>
+                            <exclude>generated-sources/**</exclude>
+                        </excludes>
+                    </configuration>
+                </plugin>
 
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-source-plugin</artifactId>
-					<version>3.0.1</version>
-					<configuration>
-						<excludes>
-							<exclude>org/apache/pirk/benchmark/**</exclude>
-							<exclude>*/openjdk/**</exclude>
-							<exclude>generated-sources/**</exclude>
-						</excludes>
-					</configuration>
-				</plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-source-plugin</artifactId>
+                    <version>3.0.1</version>
+                    <configuration>
+                        <excludes>
+                            <exclude>org/apache/pirk/benchmark/**</exclude>
+                            <exclude>*/openjdk/**</exclude>
+                            <exclude>generated-sources/**</exclude>
+                        </excludes>
+                    </configuration>
+                </plugin>
 
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-shade-plugin</artifactId>
-					<version>2.4.3</version> <!-- older versions of maven-shade-plugin make JMH painful -->
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-shade-plugin</artifactId>
+                    <version>2.4.3</version> <!-- older versions of maven-shade-plugin make JMH painful -->
 
-					<executions>
-						<execution>
-							<phase>package</phase>
-							<id>main</id>
-							<goals>
-								<goal>shade</goal>
-							</goals>
-							<configuration>
-								<shadedArtifactAttached>true</shadedArtifactAttached>
-								<shadedClassifierName>exe</shadedClassifierName>
-								<transformers>
-									<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									</transformer>
-									<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
-									</transformer>
-								</transformers>
-								<filters>
-									<filter>
-										<artifact>*:*</artifact>
-										<excludes>
-											<exclude>META-INF/*.SF</exclude>
-											<exclude>META-INF/*.DSA</exclude>
-											<exclude>META-INF/*.RSA</exclude>
-										</excludes>
-									</filter>
-								</filters>
-							</configuration>
-						</execution>
+                    <executions>
+                        <execution>
+                            <phase>package</phase>
+                            <id>main</id>
+                            <goals>
+                                <goal>shade</goal>
+                            </goals>
+                            <configuration>
+                                <shadedArtifactAttached>true</shadedArtifactAttached>
+                                <shadedClassifierName>exe</shadedClassifierName>
+                                <transformers>
+                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    </transformer>
+                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
+                                    </transformer>
+                                </transformers>
+                                <filters>
+                                    <filter>
+                                        <artifact>*:*</artifact>
+                                        <excludes>
+                                            <exclude>META-INF/*.SF</exclude>
+                                            <exclude>META-INF/*.DSA</exclude>
+                                            <exclude>META-INF/*.RSA</exclude>
+                                        </excludes>
+                                    </filter>
+                                </filters>
+                            </configuration>
+                        </execution>
 
-						<!-- in the version with benchmarks (pom-with-benchmarks.xml), this 
-							is where that <execution></execution> lives -->
+                        <!-- in the version with benchmarks (pom-with-benchmarks.xml), this
+                            is where that <execution></execution> lives -->
 
-					</executions>
-				</plugin>
+                    </executions>
+                </plugin>
 
-				<!--This plugin's configuration is used to store Eclipse m2e settings 
-					only. It has no influence on the Maven build itself. -->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>
-											org.scala-tools
-										</groupId>
-										<artifactId>
-											maven-scala-plugin
-										</artifactId>
-										<versionRange>
-											[2.15.2,)
-										</versionRange>
-										<goals>
-											<goal>testCompile</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore />
-									</action>
-								</pluginExecution>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>
-											org.apache.rat
-										</groupId>
-										<artifactId>
-											apache-rat-plugin
-										</artifactId>
-										<versionRange>
-											[0.11,)
-										</versionRange>
-										<goals>
-											<goal>check</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore />
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
+                <!--This plugin's configuration is used to store Eclipse m2e settings
+                    only. It has no influence on the Maven build itself. -->
+                <plugin>
+                    <groupId>org.eclipse.m2e</groupId>
+                    <artifactId>lifecycle-mapping</artifactId>
+                    <version>1.0.0</version>
+                    <configuration>
+                        <lifecycleMappingMetadata>
+                            <pluginExecutions>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>
+                                            org.scala-tools
+                                        </groupId>
+                                        <artifactId>
+                                            maven-scala-plugin
+                                        </artifactId>
+                                        <versionRange>
+                                            [2.15.2,)
+                                        </versionRange>
+                                        <goals>
+                                            <goal>testCompile</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                                <pluginExecution>
+                                    <pluginExecutionFilter>
+                                        <groupId>
+                                            org.apache.rat
+                                        </groupId>
+                                        <artifactId>
+                                            apache-rat-plugin
+                                        </artifactId>
+                                        <versionRange>
+                                            [0.11,)
+                                        </versionRange>
+                                        <goals>
+                                            <goal>check</goal>
+                                        </goals>
+                                    </pluginExecutionFilter>
+                                    <action>
+                                        <ignore/>
+                                    </action>
+                                </pluginExecution>
+                            </pluginExecutions>
+                        </lifecycleMappingMetadata>
+                    </configuration>
+                </plugin>
 
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-release-plugin</artifactId>
-					<version>2.5.1</version>
-					<configuration>
-						<useReleaseProfile>true</useReleaseProfile>
-						<releaseProfiles>signed_release</releaseProfiles>
-						<autoVersionSubmodules>true</autoVersionSubmodules>
-						<goals>deploy</goals>
-						<tagNameFormat>@{project.artifactId}-@{project.version}</tagNameFormat>
-						<pushChanges>false</pushChanges>
-						<localCheckout>true</localCheckout>
-					</configuration>
-					<executions>
-						<execution>
-							<id>default</id>
-							<goals>
-								<goal>perform</goal>
-							</goals>
-							<configuration>
-								<pomFileName>pom.xml</pomFileName>
-							</configuration>
-						</execution>
-					</executions>
-				</plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-release-plugin</artifactId>
+                    <version>2.5.1</version>
+                    <configuration>
+                        <useReleaseProfile>true</useReleaseProfile>
+                        <releaseProfiles>signed_release</releaseProfiles>
+                        <autoVersionSubmodules>true</autoVersionSubmodules>
+                        <goals>deploy</goals>
+                        <tagNameFormat>@{project.artifactId}-@{project.version}</tagNameFormat>
+                        <pushChanges>false</pushChanges>
+                        <localCheckout>true</localCheckout>
+                    </configuration>
+                    <executions>
+                        <execution>
+                            <id>default</id>
+                            <goals>
+                                <goal>perform</goal>
+                            </goals>
+                            <configuration>
+                                <pomFileName>pom.xml</pomFileName>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
 
-				<plugin>
-					<groupId>org.apache.maven.plugins</groupId>
-					<artifactId>maven-javadoc-plugin</artifactId>
-					<version>2.10.4</version>
-					<configuration>
-						<javadocDirectory>docs</javadocDirectory>
-						<testJavadocDirectory>docs/test</testJavadocDirectory>
-						<javadocVersion>1.8</javadocVersion>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-javadoc-plugin</artifactId>
+                    <version>2.10.4</version>
+                    <configuration>
+                        <javadocDirectory>docs</javadocDirectory>
+                        <testJavadocDirectory>docs/test</testJavadocDirectory>
+                        <javadocVersion>1.8</javadocVersion>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
 
-	</build>
+    </build>
 
-	<profiles>
-		<profile>
-			<!-- Performs execution of Integration Tests using the Maven FailSafe 
-				Plugin. The view of integration tests in this context are those tests interfacing 
-				with external sources and services requiring additional resources or credentials 
-				that cannot be explicitly provided. -->
-			<id>integration-tests</id>
-			<build>
-				<plugins>
-					<plugin>
-						<groupId>org.apache.maven.plugins</groupId>
-						<artifactId>maven-failsafe-plugin</artifactId>
-						<executions>
-							<execution>
-								<goals>
-									<goal>integration-test</goal>
-									<goal>verify</goal>
-								</goals>
-							</execution>
-						</executions>
-					</plugin>
-				</plugins>
-			</build>
-		</profile>
-		<profile>
-			<!-- Checks style and licensing requirements. This is a good idea to run 
-				for contributions and for the release process. While it would be nice to 
-				run always these plugins can considerably slow the build and have proven 
-				to create unstable builds in our multi-module project and when building using 
-				multiple threads. The stability issues seen with Checkstyle in multi-module 
-				builds include false-positives and false negatives. -->
-			<id>contrib-check</id>
-			<build>
-				<plugins>
-					<plugin>
-						<groupId>org.apache.rat</groupId>
-						<artifactId>apache-rat-plugin</artifactId>
-						<executions>
-							<execution>
-								<goals>
-									<goal>check</goal>
-								</goals>
-								<phase>verify</phase>
-							</execution>
-						</executions>
-					</plugin>
-					<plugin>
-						<groupId>org.apache.maven.plugins</groupId>
-						<artifactId>maven-checkstyle-plugin</artifactId>
-						<executions>
-							<execution>
-								<id>check-style</id>
-								<goals>
-									<goal>check</goal>
-								</goals>
-							</execution>
-						</executions>
-					</plugin>
-				</plugins>
-			</build>
-		</profile>
-		<profile>
-			<!-- This profile will disable DocLint which performs strict JavaDoc processing 
-				which was introduced in JDK 8. These are technically errors in the JavaDoc 
-				which we need to eventually address. However, if a release is performed using 
-				JDK 8, the JavaDoc generation would fail. By activating this profile when 
-				running on JDK 8 we can ensure the JavaDocs continue to generate successfully -->
-			<id>disable-doclint</id>
-			<activation>
-				<jdk>1.8</jdk>
-			</activation>
-			<build>
-				<pluginManagement>
-					<plugins>
-						<plugin>
-							<groupId>org.apache.maven.plugins</groupId>
-							<artifactId>maven-javadoc-plugin</artifactId>
-							<configuration>
-								<additionalparam>-Xdoclint:none</additionalparam>
-							</configuration>
-						</plugin>
-					</plugins>
-				</pluginManagement>
-			</build>
-		</profile>
-	</profiles>
+    <profiles>
+        <profile>
+            <!-- Performs execution of Integration Tests using the Maven FailSafe
+                Plugin. The view of integration tests in this context are those tests interfacing
+                with external sources and services requiring additional resources or credentials
+                that cannot be explicitly provided. -->
+            <id>integration-tests</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>integration-test</goal>
+                                    <goal>verify</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <!-- Checks style and licensing requirements. This is a good idea to run
+                for contributions and for the release process. While it would be nice to
+                run always these plugins can considerably slow the build and have proven
+                to create unstable builds in our multi-module project and when building using
+                multiple threads. The stability issues seen with Checkstyle in multi-module
+                builds include false-positives and false negatives. -->
+            <id>contrib-check</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.rat</groupId>
+                        <artifactId>apache-rat-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>check</goal>
+                                </goals>
+                                <phase>verify</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-checkstyle-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>check-style</id>
+                                <goals>
+                                    <goal>check</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <!-- This profile will disable DocLint which performs strict JavaDoc processing
+                which was introduced in JDK 8. These are technically errors in the JavaDoc
+                which we need to eventually address. However, if a release is performed using
+                JDK 8, the JavaDoc generation would fail. By activating this profile when
+                running on JDK 8 we can ensure the JavaDocs continue to generate successfully -->
+            <id>disable-doclint</id>
+            <activation>
+                <jdk>1.8</jdk>
+            </activation>
+            <build>
+                <pluginManagement>
+                    <plugins>
+                        <plugin>
+                            <groupId>org.apache.maven.plugins</groupId>
+                            <artifactId>maven-javadoc-plugin</artifactId>
+                            <configuration>
+                                <additionalparam>-Xdoclint:none</additionalparam>
+                            </configuration>
+                        </plugin>
+                    </plugins>
+                </pluginManagement>
+            </build>
+        </profile>
+    </profiles>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/src/main/java/org/apache/pirk/query/wideskies/Query.java b/src/main/java/org/apache/pirk/query/wideskies/Query.java
index f43fe47..b0322d0 100644
--- a/src/main/java/org/apache/pirk/query/wideskies/Query.java
+++ b/src/main/java/org/apache/pirk/query/wideskies/Query.java
@@ -25,8 +25,8 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
-
 import java.util.function.Consumer;
+
 import org.apache.pirk.encryption.ModPowAbstraction;
 import org.apache.pirk.serialization.Storable;
 import org.slf4j.Logger;
diff --git a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
index 362d26f..76cd755 100644
--- a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
+++ b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pirk.query.wideskies;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.pirk.schema.query.QuerySchema;
@@ -44,7 +47,7 @@
   private String queryType = null; // QueryType string const
 
   private int hashBitSize = 0; // Bit size of the keyed hash function
-  private String hashKey = null; // Key for the keyed hash function
+  private String hashKey; // Key for the keyed hash function
 
   private int numBitsPerDataElement = 0; // total num bits per returned data value - defined relative to query type
   private int dataPartitionBitSize = 0; // num of bits for each partition of an incoming data element, must be < 32 right now
@@ -96,6 +99,33 @@
     printQueryInfo();
   }
 
+  public QueryInfo(Map queryInfoMap)
+  {
+    // The Storm Config serializes the map as a json and reads back in with numeric values as longs.
+    // So numerics need to be cast as a long and call .intValue. However, in PirkHashScheme the map contains ints.
+    identifier = UUID.fromString((String) queryInfoMap.get("uuid"));
+    queryType = (String) queryInfoMap.get("queryType");
+    hashKey = (String) queryInfoMap.get("hashKey");
+    useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable");
+    useHDFSExpLookupTable = (boolean) queryInfoMap.get("useHDFSExpLookupTable");
+    embedSelector = (boolean) queryInfoMap.get("embedSelector");
+    try
+    {
+      numSelectors = ((Long) queryInfoMap.get("numSelectors")).intValue();
+      hashBitSize = ((Long) queryInfoMap.get("hashBitSize")).intValue();
+      numBitsPerDataElement = ((Long) queryInfoMap.get("numBitsPerDataElement")).intValue();
+      numPartitionsPerDataElement = ((Long) queryInfoMap.get("numPartitionsPerDataElement")).intValue();
+      dataPartitionBitSize = ((Long) queryInfoMap.get("dataPartitionsBitSize")).intValue();
+    } catch (ClassCastException e)
+    {
+      numSelectors = (int) queryInfoMap.get("numSelectors");
+      hashBitSize = (int) queryInfoMap.get("hashBitSize");
+      numBitsPerDataElement = (int) queryInfoMap.get("numBitsPerDataElement");
+      numPartitionsPerDataElement = (int) queryInfoMap.get("numPartitionsPerDataElement");
+      dataPartitionBitSize = (int) queryInfoMap.get("dataPartitionsBitSize");
+    }
+  }
+
   public UUID getIdentifier()
   {
     return identifier;
@@ -151,6 +181,24 @@
     return embedSelector;
   }
 
+  public Map toMap()
+  {
+    Map<String,Object> queryInfo = new HashMap<String,Object>();
+    queryInfo.put("uuid", identifier.toString());
+    queryInfo.put("queryType", queryType);
+    queryInfo.put("numSelectors", numSelectors);
+    queryInfo.put("hashBitSize", hashBitSize);
+    queryInfo.put("hashKey", hashKey);
+    queryInfo.put("numBitsPerDataElement", numBitsPerDataElement);
+    queryInfo.put("numPartitionsPerDataElement", numPartitionsPerDataElement);
+    queryInfo.put("dataPartitionsBitSize", dataPartitionBitSize);
+    queryInfo.put("useExpLookupTable", useExpLookupTable);
+    queryInfo.put("useHDFSExpLookupTable", useHDFSExpLookupTable);
+    queryInfo.put("embedSelector", embedSelector);
+
+    return queryInfo;
+  }
+
   public void addQuerySchema(QuerySchema qSchemaIn)
   {
     qSchema = qSchemaIn;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index 6b32418..f8e396b 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -28,6 +28,7 @@
 import org.apache.pirk.responder.wideskies.spark.ComputeResponse;
 import org.apache.pirk.responder.wideskies.spark.streaming.ComputeStreamingResponse;
 import org.apache.pirk.responder.wideskies.standalone.Responder;
+import org.apache.pirk.responder.wideskies.storm.PirkTopology;
 import org.apache.pirk.serialization.LocalFileSystemStore;
 import org.apache.pirk.utils.SystemConfiguration;
 import org.slf4j.Logger;
@@ -49,6 +50,11 @@
 {
   private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class);
 
+  enum Platform
+  {
+    MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE;
+  }
+
   public static void main(String[] args) throws Exception
   {
     ResponderCLI responderCLI = new ResponderCLI(args);
@@ -56,49 +62,64 @@
     // For handling System.exit calls from Spark Streaming
     System.setSecurityManager(new SystemExitManager());
 
-    if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("mapreduce"))
+    Platform platform = Platform.NONE;
+    String platformString = SystemConfiguration.getProperty(ResponderProps.PLATFORM);
+    try
     {
-      logger.info("Launching MapReduce ResponderTool:");
-
-      ComputeResponseTool pirWLTool = new ComputeResponseTool();
-      ToolRunner.run(pirWLTool, new String[] {});
+      platform = Platform.valueOf(platformString.toUpperCase());
+    } catch (IllegalArgumentException e)
+    {
+      logger.error("platform " + platformString + " not found.");
     }
-    else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("spark"))
+
+    switch (platform)
     {
-      logger.info("Launching Spark ComputeResponse:");
+      case MAPREDUCE:
+        logger.info("Launching MapReduce ResponderTool:");
 
-      FileSystem fs = FileSystem.get(new Configuration());
-      ComputeResponse computeResponse = new ComputeResponse(fs);
-      computeResponse.performQuery();
-    }
-    else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("sparkstreaming"))
-    {
-      logger.info("Launching Spark ComputeStreamingResponse:");
+        ComputeResponseTool pirWLTool = new ComputeResponseTool();
+        ToolRunner.run(pirWLTool, new String[] {});
+        break;
 
-      FileSystem fs = FileSystem.get(new Configuration());
-      ComputeStreamingResponse computeSR = new ComputeStreamingResponse(fs);
-      try
-      {
-        computeSR.performQuery();
-      } catch (SystemExitException e)
-      {
-        // If System.exit(0) is not caught from Spark Streaming,
-        // the application will complete with a 'failed' status
-        logger.info("Exited with System.exit(0) from Spark Streaming");
-      }
+      case SPARK:
+        logger.info("Launching Spark ComputeResponse:");
 
-      // Teardown the context
-      computeSR.teardown();
-    }
-    else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("standalone"))
-    {
-      logger.info("Launching Standalone Responder:");
+        ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration()));
+        computeResponse.performQuery();
+        break;
 
-      String queryInput = SystemConfiguration.getProperty("pir.queryInput");
-      Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
+      case SPARKSTREAMING:
+        logger.info("Launching Spark ComputeStreamingResponse:");
 
-      Responder pirResponder = new Responder(query);
-      pirResponder.computeStandaloneResponse();
+        ComputeStreamingResponse computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration()));
+        try
+        {
+          computeSR.performQuery();
+        } catch (SystemExitException e)
+        {
+          // If System.exit(0) is not caught from Spark Streaming,
+          // the application will complete with a 'failed' status
+          logger.info("Exited with System.exit(0) from Spark Streaming");
+        }
+
+        // Teardown the context
+        computeSR.teardown();
+        break;
+
+      case STORM:
+        logger.info("Launching Storm PirkTopology:");
+        PirkTopology.runPirkTopology();
+        break;
+
+      case STANDALONE:
+        logger.info("Launching Standalone Responder:");
+
+        String queryInput = SystemConfiguration.getProperty("pir.queryInput");
+        Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
+
+        Responder pirResponder = new Responder(query);
+        pirResponder.computeStandaloneResponse();
+        break;
     }
   }
 
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index 4fee85a..f5d24d7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -21,6 +21,8 @@
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.cli.Option;
 import org.apache.pirk.inputformat.hadoop.InputFormatConst;
 import org.apache.pirk.schema.data.DataSchemaLoader;
@@ -76,10 +78,45 @@
   public static final String MAXBATCHES = "pir.sparkstreaming.maxBatches";
   public static final String STOPGRACEFULLY = "spark.streaming.stopGracefullyOnShutdown";
 
-  static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, ESPORT,
-      OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
+  // Storm parameters
+  // hdfs
+  static final String HDFSURI = "hdfs.uri";
+  static final String USEHDFS = "hdfs.use";
+  // kafka
+  static final String KAFKATOPIC = "kafka.topic";
+  static final String KAFKACLIENTID = "kafka.clientId";
+  static final String KAFKAZK = "kafka.zk";
+  static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart";
+  // pirk topo
+  static final String STORMTOPONAME = "storm.topoName";
+  static final String STORMWORKERS = "storm.workers";
+  static final String STORMNUMACKERS = "storm.numAckers";
+  static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize";
+  static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize";
+  static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize";
+  static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending";
+  static final String STORMHEAPMEMORY = "storm.worker.heapMemory";
+  static final String STORMCHILDOPTS = "storm.worker.childOpts";
+  static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory";
+  static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem";
+  static final String STORMSPOUTPAR = "storm.spout.parallelism";
+  static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism";
+  static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism";
+  static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism";
+  static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple";
+  static final String STORMSPLITPARTITIONS = "storm.splitPartitions";
+  static final String STORMSALTCOLUMNS = "storm.saltColumns";
+  static final String STORMNUMROWDIVS = "storm.rowDivs";
+
+  static final String[] STORMPROPS = new String[]{HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS,
+      STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, STORMMAXWORKERHEAP,
+      STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY, STORMSPLITPARTITIONS,
+      STORMSALTCOLUMNS, STORMNUMROWDIVS};
+
+  static final List<String> PROPSLIST = Arrays.asList((String[]) ArrayUtils.addAll(new String[]{PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, OUTPUTFILE,
+      BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
       REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN,
-      COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY);
+      COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY}, STORMPROPS));
 
   /**
    * Validates the responder properties
@@ -98,7 +135,7 @@
     }
 
     String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase();
-    if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("standalone"))
+    if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("storm") && !platform.equals("standalone"))
     {
       logger.info("Unsupported platform: " + platform);
       valid = false;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
index 0745bea..0050e29 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
@@ -44,7 +44,6 @@
 
 /**
  * Class to compute the encrypted row elements for a query from extracted data partitions
- * 
  */
 public class ComputeEncryptedRow
 {
@@ -98,7 +97,6 @@
    * Optionally uses a static LRU cache for the modular exponentiation
    * <p>
    * Emits {@code Tuple2<<colNum, colVal>>}
-   * 
    */
   public static List<Tuple2<Long,BigInteger>> computeEncRow(Iterable<BytesArrayWritable> dataPartitionsIter, Query query, int rowIndex,
       boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) throws IOException
@@ -112,7 +110,7 @@
     int elementCounter = 0;
     for (BytesArrayWritable dataPartitions : dataPartitionsIter)
     {
-      logger.debug("rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
+      logger.debug("rowIndex = {} elementCounter = {}", rowIndex, elementCounter);
 
       if (limitHitsPerSelector)
       {
@@ -121,7 +119,7 @@
           break;
         }
       }
-      logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter);
+      logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter);
 
       // Update the associated column values
       for (int i = 0; i < dataPartitions.size(); ++i)
@@ -142,8 +140,8 @@
         {
           e.printStackTrace();
         }
-        logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = "
-            + exp + " i = " + i + " partition = " + dataPartitions.getBigInteger(i) + " = " + dataPartitions.getBigInteger(i).toString(2));
+        logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}",
+            rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.getBigInteger(i), dataPartitions.getBigInteger(i).toString(2));
 
         returnPairs.add(new Tuple2<>(colCounter, exp));
 
@@ -162,7 +160,6 @@
    * Optionally uses a static LRU cache for the modular exponentiation
    * <p>
    * Emits {@code Tuple2<<colNum, colVal>>}
-   * 
    */
   public static List<Tuple2<Long,BigInteger>> computeEncRowBI(Iterable<List<BigInteger>> dataPartitionsIter, Query query, int rowIndex,
       boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) throws IOException
@@ -178,7 +175,7 @@
     {
       // long startTime = System.currentTimeMillis();
 
-      logger.debug("rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
+      logger.debug("rowIndex = {} elementCounter = {}", rowIndex, elementCounter);
 
       if (limitHitsPerSelector)
       {
@@ -188,8 +185,7 @@
           break;
         }
       }
-      logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter);
-
+      logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter);
       // Update the associated column values
       for (int i = 0; i < dataPartitions.size(); ++i)
       {
@@ -209,8 +205,9 @@
         {
           e.printStackTrace();
         }
-        logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = "
-            + exp + " i = " + i);
+
+        logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {}",
+            rowIndex, colCounter, part.toString(), part.toString(2), exp, i);
 
         returnPairs.add(new Tuple2<>(colCounter, exp));
 
@@ -234,7 +231,6 @@
    * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values.
    * <p>
    * Emits {@code Tuple2<<colNum, colVal>>}
-   * 
    */
   public static List<Tuple2<Long,BigInteger>> computeEncRowCacheInput(Iterable<List<BigInteger>> dataPartitionsIter, HashMap<Integer,BigInteger> cache,
       int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector) throws IOException
@@ -245,7 +241,7 @@
     int elementCounter = 0;
     for (List<BigInteger> dataPartitions : dataPartitionsIter)
     {
-      logger.debug("elementCounter = " + elementCounter);
+      logger.debug("elementCounter = {}", elementCounter);
 
       if (limitHitsPerSelector)
       {
@@ -254,7 +250,7 @@
           break;
         }
       }
-      logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter);
+      logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter);
 
       // Update the associated column values
       for (int i = 0; i < dataPartitions.size(); ++i)
@@ -262,7 +258,7 @@
         BigInteger part = dataPartitions.get(i);
         BigInteger exp = cache.get(part.intValue());
 
-        logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " exp = " + exp + " i = " + i);
+        logger.debug("rowIndex = {} colCounter = {} part = {} exp = {} i = {}", rowIndex, colCounter, part.toString(), exp, i);
 
         returnPairs.add(new Tuple2<>(colCounter, exp));
 
@@ -283,7 +279,6 @@
    * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values
    * <p>
    * Emits {@code Tuple2<<colNum, colVal>>}
-   * 
    */
   public static List<Tuple2<Long,BigInteger>> computeEncRow(BytesArrayWritable dataPartitions, Query query, int rowIndex, int colIndex) throws IOException
   {
@@ -295,7 +290,7 @@
     // Initialize the column counter
     long colCounter = colIndex;
 
-    logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter);
+    logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter);
 
     // Update the associated column values
     for (int i = 0; i < dataPartitions.size(); ++i)
@@ -311,8 +306,8 @@
         e.printStackTrace();
       }
 
-      logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = "
-          + exp + " i = " + i + " partition = " + dataPartitions.getBigInteger(i) + " = " + dataPartitions.getBigInteger(i).toString(2));
+      logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}",
+          rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.getBigInteger(i), dataPartitions.getBigInteger(i).toString(2));
 
       returnPairs.add(new Tuple2<>(colCounter, exp));
 
@@ -321,4 +316,81 @@
 
     return returnPairs;
   }
+
+  /**
+   * Method to compute the encrypted row elements for a query from extracted data partitions in the form of ArrayList<<BigInteger>>
+   * <p>
+   * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values.
+   * <p>
+   * Uses a static LRU cache for the modular exponentiation
+   * <p>
+   * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values
+   * <p>
+   * Emits {@code Tuple2<<colNum, colVal>>}
+   */
+  public static List<Tuple2<Long,BigInteger>> computeEncRow(List<BigInteger> dataPartitions, Query query, int rowIndex, int colIndex)
+      throws IOException
+  {
+    List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
+
+    // Pull the corresponding encrypted row query
+    BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+    // Initialize the column counter
+    long colCounter = colIndex;
+
+    logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions, rowIndex, colCounter);
+
+    // Update the associated column values
+    for (int i = 0; i < dataPartitions.size(); ++i)
+    {
+      BigInteger part = dataPartitions.get(i);
+
+      BigInteger exp = null;
+      try
+      {
+        exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+      } catch (ExecutionException e)
+      {
+        e.printStackTrace();
+        break;
+      }
+
+      logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}",
+          rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.get(i), dataPartitions.get(i).toString(2));
+
+      returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+
+      ++colCounter;
+    }
+
+    return returnPairs;
+  }
+
+  public static List<Tuple2<Long,BigInteger>> computeEncRow(BigInteger part, Query query, int rowIndex, int colIndex) throws IOException
+  {
+    List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
+
+    // Pull the corresponding encrypted row query
+    BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+    // Initialize the column counter
+    long colCounter = colIndex;
+
+    // Update the associated column values
+    BigInteger exp = null;
+    try
+    {
+      exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+    } catch (ExecutionException e)
+    {
+      e.printStackTrace();
+    }
+
+    returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+
+    ++colCounter;
+
+    return returnPairs;
+  }
 }
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
index 61169f2..e605d94 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
@@ -35,8 +35,8 @@
 import scala.Tuple2;
 
 /**
- * Given a MapWritable dataElement, this class gives the common functionality to extract the selector by queryType from each dataElement, perform a keyed hash
- * of the selector, extract the partitions of the dataElement, and outputs {@code <hash(selector), dataPartitions>}
+ * Given a MapWritable or JSON formatted dataElement, this class gives the common functionality to extract the selector by queryType from each dataElement,
+ * perform a keyed hash of the selector, extract the partitions of the dataElement, and outputs {@code <hash(selector), dataPartitions>}
  */
 public class HashSelectorAndPartitionData
 {
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index f34acf8..6014435 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -90,8 +90,9 @@
   private BroadcastVars bVars = null;
 
   private QueryInfo queryInfo = null;
-  Query query = null;
-  QuerySchema qSchema = null;
+
+  private Query query = null;
+  private QuerySchema qSchema = null;
 
   private int numDataPartitions = 0;
   private int numColMultPartitions = 0;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
new file mode 100644
index 0000000..08c9917
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bolt class to perform encrypted column multiplication
+ * <p>
+ * Takes {@code <columnIndex, columnValue>} tuples as input and aggregates (multiplies) the columnValues for a given columnIndex as they are received.
+ * <p>
+ * EncRowCalcBolts send flush signals to the EncColMultBolts indicating that they have finished sending all tuples for a session. Whenever a flush signal is
+ * received from a EncRowCalcBolt, the num of received flush signals is tallied until each EncRowCalcBolt has emitted a flush signal.
+ * <p>
+ * Once a flush signal has been received from each EncRowCalcBolt, all {@code <columnIndex, aggregate colVal product>} tuples are sent to the OutputBolt and a session_end
+ * signal is sent back to each EncRowMultBolt.
+ * <p>
+ * The EncRowMultBolts buffer their output from the time that they send a flush signal to the EncColMultBolts until the time that they receive a session_end
+ * signal from all of the EncColMultBolts.
+ * 
+ */
+public class EncColMultBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncColMultBolt.class);
+
+  private OutputCollector outputCollector;
+
+  private BigInteger nSquared;
+  private long numFlushSignals;
+  private Long totalFlushSignals;
+
+  // This is the main object here. It holds column Id -> aggregated product
+  private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
+  private BigInteger colVal1;
+  private BigInteger colMult;
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
+  {
+    outputCollector = collector;
+    String nSquare = (String) map.get(StormConstants.N_SQUARED_KEY);
+    nSquared = new BigInteger(nSquare);
+    totalFlushSignals = (Long) map.get(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY);
+
+    logger.info("Initialized EncColMultBolt. ");
+  }
+
+  @Override
+  public void execute(Tuple tuple)
+  {
+    if (tuple.getSourceStreamId().equals(StormConstants.ENCROWCALCBOLT_FLUSH_SIG))
+    {
+      numFlushSignals += 1;
+      logger.debug("Received  {} flush signals out of {}", numFlushSignals, totalFlushSignals);
+
+      // Need to receive notice from all EncRowCalcBolts in order to flush.
+      if (numFlushSignals == totalFlushSignals)
+      {
+        logger.debug("Received signal to flush in EncColMultBolt. Outputting {} results.", resultsMap.keySet().size());
+        for (Long key : resultsMap.keySet())
+          // key = column Id, value = aggregated product
+          outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(key, resultsMap.get(key)));
+        resultsMap.clear();
+
+        // Send signal to OutputBolt to write output and notify EncRowCalcBolt that results have been flushed.
+        outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(new Long(-1), BigInteger.valueOf(0)));
+        outputCollector.emit(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Values(1));
+        numFlushSignals = 0;
+      }
+    }
+    else
+    {
+      // Data tuple received. Do column multiplication.
+
+      long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ERC_FIELD);
+      colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD);
+
+      logger.debug("Received tuple in ECM, multiplying {} to col {}", colVal1, colIndex);
+
+      if (resultsMap.containsKey(colIndex))
+      {
+        colMult = colVal1.multiply(resultsMap.get(colIndex));
+        resultsMap.put(colIndex, colMult.mod(nSquared));
+      }
+      else
+      {
+        resultsMap.put(colIndex, colVal1);
+      }
+    }
+    outputCollector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+  {
+    outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_ID,
+        new Fields(StormConstants.COLUMN_INDEX_ECM_FIELD, StormConstants.COLUMN_PRODUCT_FIELD));
+    outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Fields("finished"));
+  }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
new file mode 100644
index 0000000..639a52b
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Bolt class to perform the encrypted row calculation
+ * <p>
+ * Receives a {@code <hash(selector), dataPartitions>} tuple as input.
+ * <p>
+ * Encrypts the row data and emits a (column index, encrypted row-value) tuple for each encrypted block.
+ * <p>
+ * Every FLUSH_FREQUENCY seconds, it sends a signal to EncColMultBolt to flush its output and resets all counters. At that point, all outgoing (column index,
+ * encrypted row-value) tuples are buffered until a SESSION_END signal is received back from each EncColMultBolt.
+ */
+public class EncRowCalcBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncRowCalcBolt.class);
+
+  private OutputCollector outputCollector;
+  private static Query query;
+  private static boolean querySet = false;
+
+  private Boolean limitHitsPerSelector;
+  private Long maxHitsPerSelector;
+  private Long totalEndSigs;
+  private int rowDivisions;
+  private Boolean saltColumns;
+  private Boolean splitPartitions;
+
+  private Random rand;
+
+  // These are the main data structures used here.
+  private Map<Integer,Integer> hitsByRow = new HashMap<Integer,Integer>();
+  private Map<Integer,Integer> colIndexByRow = new HashMap<Integer,Integer>();
+  private List<Tuple2<Long,BigInteger>> matrixElements = new ArrayList<>();
+  private List<BigInteger> dataArray = new ArrayList<>();
+
+  private int numEndSigs = 0;
+
+  // These buffered values are used in the case when a session has been ejected, but the SESSION_END signal has not been received
+  // yet from the next bolt.
+  private boolean buffering = false;
+  private List<Tuple2<Long,BigInteger>> bufferedValues = new ArrayList<>();
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext, OutputCollector coll)
+  {
+    outputCollector = coll;
+    setQuery(map);
+    logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS));
+
+    maxHitsPerSelector = (Long) map.get(StormConstants.MAX_HITS_PER_SEL_KEY);
+    limitHitsPerSelector = (Boolean) map.get(StormConstants.LIMIT_HITS_PER_SEL_KEY);
+    totalEndSigs = (Long) map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY);
+    splitPartitions = (Boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY);
+    saltColumns = (Boolean) map.get(StormConstants.SALT_COLUMNS_KEY);
+    rowDivisions = ((Long) map.get(StormConstants.ROW_DIVISIONS_KEY)).intValue();
+
+    // If splitPartitions==true, the data is incoming partition by partition, rather than record by record.
+    // The numRecords below will increment every partition elt exceed the maxHitsPerSelector param far too
+    // soon unless the latter is modified.
+    if (splitPartitions)
+      maxHitsPerSelector *= query.getQueryInfo().getNumPartitionsPerDataElement();
+
+    rand = new Random();
+
+    logger.info("Initialized EncRowCalcBolt.");
+  }
+
+  @Override
+  public void execute(Tuple tuple)
+  {
+    if (tuple.getSourceStreamId().equals(StormConstants.DEFAULT))
+    {
+      matrixElements = processTupleFromPartitionDataBolt(tuple); // tuple: <hash,partitions>
+
+      if (buffering)
+      {
+        logger.debug("Buffering tuple.");
+        bufferedValues.addAll(matrixElements);
+      }
+      else
+      {
+        emitTuples(matrixElements);
+      }
+    }
+    else if (StormUtils.isTickTuple(tuple) && !buffering)
+    {
+      logger.debug("Sending flush signal to EncColMultBolt.");
+      outputCollector.emit(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Values(1));
+
+      colIndexByRow.clear();
+      hitsByRow.clear();
+
+      buffering = true;
+    }
+    else if (tuple.getSourceStreamId().equals(StormConstants.ENCCOLMULTBOLT_SESSION_END))
+    {
+      numEndSigs += 1;
+      logger.debug("SessionEnd signal {} of {} received", numEndSigs, totalEndSigs);
+
+      // Need to receive signal from all EncColMultBolt instances before stopping buffering.
+      if (numEndSigs == totalEndSigs)
+      {
+        logger.debug("Buffering completed, emitting {} tuples.", bufferedValues.size());
+        emitTuples(bufferedValues);
+        bufferedValues.clear();
+        buffering = false;
+
+        numEndSigs = 0;
+      }
+    }
+    outputCollector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+  {
+    outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
+        new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT));
+    outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Fields(StormConstants.FLUSH));
+  }
+
+  /***
+   * Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the
+   * colIndexByRow and hitsByRow appropriately.
+   *
+   * @param tuple
+   * @return
+   */
+  private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple)
+  {
+    matrixElements.clear();
+    int rowIndex = tuple.getIntegerByField(StormConstants.HASH_FIELD);
+
+    if (!colIndexByRow.containsKey(rowIndex))
+    {
+      colIndexByRow.put(rowIndex, 0);
+      hitsByRow.put(rowIndex, 0);
+    }
+
+    if (splitPartitions)
+    {
+      dataArray.add((BigInteger) tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD));
+    }
+    else
+    {
+      dataArray = (ArrayList<BigInteger>) tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD);
+    }
+    logger.debug("Retrieving {} elements in EncRowCalcBolt.", dataArray.size());
+
+    try
+    {
+      int colIndex = colIndexByRow.get(rowIndex);
+      int numRecords = hitsByRow.get(rowIndex);
+
+      if (limitHitsPerSelector && numRecords < maxHitsPerSelector)
+      {
+        logger.debug("computing matrix elements.");
+        matrixElements = ComputeEncryptedRow.computeEncRow(dataArray, query, rowIndex, colIndex);
+        colIndexByRow.put(rowIndex, colIndex + matrixElements.size());
+        hitsByRow.put(rowIndex, numRecords + 1);
+      }
+      else if (limitHitsPerSelector)
+      {
+        logger.info("maxHits: rowIndex = " + rowIndex + " elementCounter = " + numRecords);
+      }
+    } catch (IOException e)
+    {
+      logger.warn("Caught IOException while encrypting row. ", e);
+    }
+
+    dataArray.clear();
+    return matrixElements;
+  }
+
+  private void emitTuples(List<Tuple2<Long,BigInteger>> matrixElements)
+  {
+    // saltColumns distributes the column multiplication done in the next bolt EncColMultBolt to avoid hotspotting.
+    if (saltColumns)
+    {
+      for (Tuple2<Long,BigInteger> sTuple : matrixElements)
+      {
+        outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Values(sTuple._1(), sTuple._2(), rand.nextInt(rowDivisions)));
+      }
+    }
+    else
+    {
+      for (Tuple2<Long,BigInteger> sTuple : matrixElements)
+      {
+        outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Values(sTuple._1(), sTuple._2(), 0));
+      }
+    }
+  }
+
+  private synchronized static void setQuery(Map map)
+  {
+    if (!querySet)
+    {
+      query = StormUtils.prepareQuery(map);
+      querySet = true;
+    }
+  }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
new file mode 100644
index 0000000..68b02f3
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Bolt to compute and output the final Response object for a query
+ * <p>
+ * Receives {@code <colIndex, colProduct>} tuples, computes the final column product for each colIndex, records the results in the final Response object, and
+ * outputs the final Response object for the query.
+ * <p>
+ * Flush signals are sent to the OuputBolt from the EncColMultBolts via a tuple of the form {@code <-1, 0>}. Once a flush signal has been received from each
+ * EncColMultBolt (or a timeout is reached), the final column product is computed and the final Response is formed and emitted.
+ * <p>
+ * Currently, the Responses are written to HDFS to location specified by the outputFile with the timestamp appended.
+ * <p>
+ * TODO: -- Enable other Response output locations
+ * 
+ */
+public class OutputBolt extends BaseRichBolt
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(OutputBolt.class);
+
+  private OutputCollector outputCollector;
+  private QueryInfo queryInfo;
+  private Response response;
+  private String outputFile;
+  private boolean hdfs;
+  private String hdfsUri;
+  private Integer flushCounter = 0;
+  private List<Tuple> tuplesToAck = new ArrayList<>();
+  private Integer totalFlushSigs;
+
+  private LocalFileSystemStore localStore;
+  private HadoopFileSystemStore hadoopStore;
+
+  // This latch just serves as a hook for testing.
+  public static CountDownLatch latch = new CountDownLatch(4);
+
+  // This is the main object here. It holds column Id -> product
+  private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
+
+  private BigInteger colVal;
+  private BigInteger colMult;
+
+  private BigInteger nSquared;
+
+  @Override
+  public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
+  {
+    outputCollector = collector;
+
+    totalFlushSigs = ((Long) map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY)).intValue();
+    outputFile = (String) map.get(StormConstants.OUTPUT_FILE_KEY);
+
+    hdfs = (boolean) map.get(StormConstants.USE_HDFS);
+
+    if (hdfs)
+    {
+      hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
+      try
+      {
+        FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
+        hadoopStore = new HadoopFileSystemStore(fs);
+      } catch (IOException e)
+      {
+        logger.error("Failed to initialize Hadoop file system for output.");
+        throw new RuntimeException(e);
+      }
+    }
+    else
+    {
+      localStore = new LocalFileSystemStore();
+    }
+    nSquared = new BigInteger((String) map.get(StormConstants.N_SQUARED_KEY));
+    queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+    response = new Response(queryInfo);
+
+    logger.info("Intitialized OutputBolt.");
+  }
+
+  @Override
+  public void execute(Tuple tuple)
+  {
+    long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ECM_FIELD);
+    colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD);
+
+    // colIndex == -1 is just the signal sent by EncColMultBolt to notify that it flushed it's values.
+    // Could have created a new stream for such signals, but that seemed like overkill.
+    if (colIndex == -1)
+    {
+      flushCounter++;
+
+      logger.debug("Received " + flushCounter + " output flush signals out of " + totalFlushSigs);
+
+      // Wait till all EncColMultBolts have been flushed
+      if (flushCounter == totalFlushSigs)
+      {
+        logger.info("TimeToFlush reached - outputting response to " + outputFile + " with columns.size = " + resultsMap.keySet().size());
+        try
+        {
+          String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())).toString();
+          for (long cv : resultsMap.keySet())
+          {
+            response.addElement((int) cv, resultsMap.get(cv));
+          }
+
+          if (hdfs)
+          {
+            hadoopStore.store(new Path(outputFile + "_" + timestamp), response);
+          }
+          else
+          { // In order to accommodate testing, this does not currently include timestamp.
+            // Should probably be fixed, but this will not likely be used outside of testing.
+            localStore.store(new File(outputFile), response);
+            for (long cv : resultsMap.keySet())
+            {
+              response.addElement((int) cv, resultsMap.get(cv));
+              logger.debug("column = " + cv + ", value = " + resultsMap.get(cv).toString());
+            }
+          }
+        } catch (IOException e)
+        {
+          logger.warn("Unable to write output file.");
+        }
+
+        // Reset
+        resultsMap.clear();
+        flushCounter = 0;
+        for (Tuple t : tuplesToAck)
+          outputCollector.ack(t);
+        // Used for integration test
+        latch.countDown();
+      }
+    }
+    else
+    {
+      // Process data values: add them to map. The column multiplication is only done in the case where saltColumns==true,
+      // in which case a small number of multiplications still need to be done per column.
+      if (resultsMap.containsKey(colIndex))
+      {
+        colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared);
+        resultsMap.put(colIndex, colMult);
+      }
+      else
+      {
+        resultsMap.put(colIndex, colVal);
+      }
+      logger.debug("column = " + colIndex + ", value = " + resultsMap.get(colIndex).toString());
+    }
+    outputCollector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+  {}
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
new file mode 100644
index 0000000..9d24620
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.query.wideskies.QueryUtils;
+
+import org.apache.pirk.schema.data.DataSchema;
+import org.apache.pirk.schema.data.DataSchemaRegistry;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bolt to extract the partitions of the data record and output {@code <hash(selector), dataPartitions>}
+ * <p>
+ * Currently receives a {@code <hash(selector), JSON data record>} as input.
+ * <p>
+ *
+ */
+public class PartitionDataBolt extends BaseBasicBolt
+{
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PartitionDataBolt.class);
+
+  private static final long serialVersionUID = 1L;
+
+  private QueryInfo queryInfo;
+  private String queryType;
+  private QuerySchema qSchema = null;
+
+  private boolean embedSelector;
+
+  private boolean splitPartitions;
+
+  private JSONObject json;
+  private List<BigInteger> partitions;
+
+  @Override
+  public void prepare(Map map, TopologyContext context)
+  {
+    queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+    queryType = queryInfo.getQueryType();
+    embedSelector = queryInfo.getEmbedSelector();
+    logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS));
+    StormUtils.initializeSchemas(map, "partition");
+    try
+    {
+      if ((boolean) map.get(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY))
+      {
+        qSchema = queryInfo.getQuerySchema();
+      }
+      if (qSchema == null)
+      {
+        qSchema = QuerySchemaRegistry.get(queryType);
+      }
+    } catch (Exception e)
+    {
+      logger.error("Unable to initialize schemas in PartitionDataBolt. ", e);
+    }
+
+    json = new JSONObject();
+    splitPartitions = (boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY);
+
+    logger.info("Initialized ExtractAndPartitionDataBolt.");
+  }
+
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector outputCollector)
+  {
+    int hash = tuple.getIntegerByField(StormConstants.HASH_FIELD);
+    json = (JSONObject) tuple.getValueByField(StormConstants.JSON_DATA_FIELD);
+
+    try
+    {
+      partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector);
+
+      logger.debug("HashSelectorsAndPartitionDataBolt processing {} outputting results - {}", json.toString(), partitions.size());
+
+      // splitPartitions determines whether each partition piece is sent individually or the full Array is sent together.
+      // Since processing in the follow-on bolt (EncRowCalcBolt) is computationally expensive, current working theory is
+      // that splitting them up allows for better throughput. Though maybe with better knowledge/tuning of Storm internals
+      // and paramters (e.g. certain buffer sizes), it may make no difference.
+      if (splitPartitions)
+      {
+        for (BigInteger partition : partitions)
+        {
+          outputCollector.emit(new Values(hash, partition));
+        }
+      }
+      else
+      {
+        outputCollector.emit(new Values(hash, partitions));
+      }
+
+    } catch (Exception e)
+    {
+      logger.warn("Failed to partition data for record -- " + json + "\n", e);
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+  {
+    outputFieldsDeclarer.declare(new Fields(StormConstants.HASH_FIELD, StormConstants.PARTIONED_DATA_FIELD));
+  }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
new file mode 100644
index 0000000..76bb80c
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.query.wideskies.QueryUtils;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.utils.KeyedHash;
+
+import org.apache.storm.Config;
+import org.apache.storm.kafka.StringScheme;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Scheme used by spout to retrieve and hash selector from JSON data on Kafka.
+ */
+public class PirkHashScheme extends StringScheme implements Scheme
+{
+
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PirkHashScheme.class);
+
+  private QueryInfo queryInfo;
+
+  transient private JSONParser parser;
+  transient private JSONObject json;
+  private boolean initialized = false;
+  private QuerySchema qSchema;
+  private Config conf;
+
+  public PirkHashScheme(Config conf)
+  {
+    this.conf = conf;
+  }
+
+  public List<Object> deserialize(ByteBuffer bytes)
+  {
+    if (!initialized)
+    {
+      parser = new JSONParser();
+      queryInfo = new QueryInfo((Map) conf.get(StormConstants.QUERY_INFO_KEY));
+
+      StormUtils.initializeSchemas(conf, "hashScheme");
+
+      if ((boolean) conf.get(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY))
+      {
+        qSchema = queryInfo.getQuerySchema();
+      }
+      if (qSchema == null)
+      {
+        qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType());
+      }
+
+      initialized = true;
+    }
+    String str = super.deserializeString(bytes);
+
+    try
+    {
+      json = (JSONObject) parser.parse(str);
+    } catch (ParseException e)
+    {
+      json = null;
+      logger.warn("ParseException parsing " + str, e);
+    }
+    String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, json);
+    int hash = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector);
+
+    return new Values(hash, json);
+  }
+
+  public Fields getOutputFields()
+  {
+    return new Fields(StormConstants.HASH_FIELD, StormConstants.JSON_DATA_FIELD);
+  }
+
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
new file mode 100644
index 0000000..ddfca8b
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.*;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Storm topology class for wideskies Pirk implementation
+ * <p>
+ * 
+ */
+public class PirkTopology
+{
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PirkTopology.class);
+
+  private static final String kafkaClientId = SystemConfiguration.getProperty("kafka.clientId", "KafkaSpout");
+  private static final String brokerZk = SystemConfiguration.getProperty("kafka.zk", "localhost:2181");
+  private static final String kafkaTopic = SystemConfiguration.getProperty("kafka.topic", "pirkTopic");
+  private static final Boolean forceFromStart = Boolean.parseBoolean(SystemConfiguration.getProperty("kafka.forceFromStart", "false"));
+
+  private static final Boolean useHdfs = Boolean.parseBoolean(SystemConfiguration.getProperty("hdfs.use", "true"));
+  private static final String hdfsUri = SystemConfiguration.getProperty("hdfs.uri", "localhost");
+
+  private static final String topologyName = SystemConfiguration.getProperty("storm.topoName", "PirkTopology");
+  private static final Integer numWorkers = Integer.parseInt(SystemConfiguration.getProperty("storm.workers", "1"));
+
+  private static final Integer spoutParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.spout.parallelism", "1"));
+  private static final Integer partitionDataBoltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.partitiondata.parallelism", "1"));
+  private static final Integer encrowcalcboltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.parallelism", "1"));
+  private static final Integer enccolmultboltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.enccolmultbolt.parallelism", "1"));
+
+  private static final Boolean saltColumns = Boolean.parseBoolean(SystemConfiguration.getProperty("storm.saltColumns", "false"));
+  private static final Boolean splitPartitions = Boolean.parseBoolean(SystemConfiguration.getProperty("storm.splitPartitions", "false"));
+
+  private static final String queryFile = SystemConfiguration.getProperty("pir.queryInput");
+  private static final String outputPath = SystemConfiguration.getProperty("pir.outputFile");
+
+  public static void runPirkTopology() throws Exception
+  {
+    // Set up Kafka parameters
+    logger.info("Configuring Kafka.");
+    String zkRoot = "/" + kafkaTopic + "_pirk_storm";
+    BrokerHosts zkHosts = new ZkHosts(brokerZk);
+    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, kafkaTopic, zkRoot, kafkaClientId);
+    kafkaConfig.ignoreZkOffsets = forceFromStart;
+
+    // Create conf
+    logger.info("Retrieving Query and generating Storm conf.");
+    Config conf = createStormConf();
+    Query query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile);
+    conf.put(StormConstants.N_SQUARED_KEY, query.getNSquared().toString());
+    conf.put(StormConstants.QUERY_INFO_KEY, query.getQueryInfo().toMap());
+
+    // Configure this for different types of input data on Kafka.
+    kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf));
+
+    // Create topology
+    StormTopology topology = getPirkTopology(kafkaConfig);
+
+    // Run topology
+    logger.info("Submitting Pirk topology to Storm...");
+    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+
+  } // main
+
+  /***
+   * Creates Pirk topology: KafkaSpout -> PartitionDataBolt -> EncRowCalcBolt -> EncColMultBolt -> OutputBolt Requires KafkaConfig to initialize KafkaSpout.
+   *
+   * @param kafkaConfig
+   * @return
+   */
+  public static StormTopology getPirkTopology(SpoutConfig kafkaConfig)
+  {
+    // Create spout and bolts
+    KafkaSpout spout = new KafkaSpout(kafkaConfig);
+    PartitionDataBolt partitionDataBolt = new PartitionDataBolt();
+    EncRowCalcBolt ercbolt = new EncRowCalcBolt();
+    EncColMultBolt ecmbolt = new EncColMultBolt();
+    OutputBolt outputBolt = new OutputBolt();
+
+    // Build Storm topology
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout(StormConstants.SPOUT_ID, spout, spoutParallelism);
+
+    builder.setBolt(StormConstants.PARTITION_DATA_BOLT_ID, partitionDataBolt, partitionDataBoltParallelism).fieldsGrouping(StormConstants.SPOUT_ID,
+        new Fields(StormConstants.HASH_FIELD));
+
+    // TODO: Decide whether to use Resource Aware Scheduler. (If not, get rid of b2 and b3).
+    BoltDeclarer b2 = builder.setBolt(StormConstants.ENCROWCALCBOLT_ID, ercbolt, encrowcalcboltParallelism)
+        .fieldsGrouping(StormConstants.PARTITION_DATA_BOLT_ID, new Fields(StormConstants.HASH_FIELD))
+        .allGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_SESSION_END)
+        .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.ticktuple")));
+
+    // b2.setMemoryLoad(5000);
+    // b2.setCPULoad(150.0);
+
+    BoltDeclarer b3 = builder.setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism)
+        .fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
+            new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.SALT))
+        .allGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_FLUSH_SIG);
+    // b3.setMemoryLoad(5000);
+    // b3.setCPULoad(500.0);
+
+    builder.setBolt(StormConstants.OUTPUTBOLT_ID, outputBolt, 1).globalGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_ID);
+
+    return builder.createTopology();
+  }
+
+  public static Config createStormConf()
+  {
+
+    Boolean limitHitsPerSelector = Boolean.parseBoolean(SystemConfiguration.getProperty("pir.limitHitsPerSelector"));
+    Integer maxHitsPerSelector = Integer.parseInt(SystemConfiguration.getProperty("pir.maxHitsPerSelector"));
+    Integer rowDivisions = Integer.parseInt(SystemConfiguration.getProperty("storm.rowDivs", "1"));
+
+    Config conf = new Config();
+    conf.setNumAckers(Integer.parseInt(SystemConfiguration.getProperty("storm.numAckers", numWorkers.toString())));
+    conf.setMaxSpoutPending(Integer.parseInt(SystemConfiguration.getProperty("storm.maxSpoutPending", "300")));
+    conf.setNumWorkers(numWorkers);
+    conf.setDebug(false);
+    // conf.setNumEventLoggers(2);
+
+    conf.put(conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.receiveBufferSize", "1024")));
+    conf.put(conf.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.sendBufferSize", "1024")));
+    conf.put(conf.TOPOLOGY_TRANSFER_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.transferBufferSize", "32")));
+    conf.put(conf.WORKER_HEAP_MEMORY_MB, Integer.parseInt(SystemConfiguration.getProperty("storm.worker.heapMemory", "750")));
+    conf.put(conf.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128")));
+
+    // Pirk parameters to send to bolts
+    conf.put(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY, SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true"));
+    conf.put(StormConstants.QSCHEMA_KEY, SystemConfiguration.getProperty("query.schemas"));
+    conf.put(StormConstants.DSCHEMA_KEY, SystemConfiguration.getProperty("data.schemas"));
+    conf.put(StormConstants.HDFS_URI_KEY, hdfsUri);
+    conf.put(StormConstants.QUERY_FILE_KEY, queryFile);
+    conf.put(StormConstants.USE_HDFS, useHdfs);
+    conf.put(StormConstants.OUTPUT_FILE_KEY, outputPath);
+    conf.put(StormConstants.LIMIT_HITS_PER_SEL_KEY, limitHitsPerSelector);
+    conf.put(StormConstants.MAX_HITS_PER_SEL_KEY, maxHitsPerSelector);
+    conf.put(StormConstants.SPLIT_PARTITIONS_KEY, splitPartitions);
+    conf.put(StormConstants.SALT_COLUMNS_KEY, saltColumns);
+    conf.put(StormConstants.ROW_DIVISIONS_KEY, rowDivisions);
+    conf.put(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY, encrowcalcboltParallelism);
+    conf.put(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY, enccolmultboltParallelism);
+
+    return conf;
+  }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
new file mode 100644
index 0000000..7f1e59d
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+public class StormConstants
+{
+  // Topology Components
+  public static final String SPOUT_ID = "kafkaspout";
+  public static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt";
+  public static final String ENCROWCALCBOLT_ID = "encrowcalcbolt";
+  public static final String ENCCOLMULTBOLT_ID = "enccolmultbolt";
+  public static final String OUTPUTBOLT_ID = "outputbolt";
+
+  // Extra Streams
+  public static final String DEFAULT = "default";
+  public static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id";
+  public static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush";
+  public static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end";
+
+  // Tuple Fields
+  // From HashBolt (and variants)
+  public static final String HASH_FIELD = "hash";
+  public static final String PARTIONED_DATA_FIELD = "parData";
+  public static final String JSON_DATA_FIELD = "data";
+  // From EncRowCalcBolt
+  public static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc";
+  public static final String ENCRYPTED_VALUE_FIELD = "encRowValue";
+  // From EncColMultBolt
+  public static final String COLUMN_INDEX_ECM_FIELD = "colIndex";
+  public static final String COLUMN_PRODUCT_FIELD = "colProduct";
+
+  // Configuration Keys
+  public static final String USE_HDFS = "useHdfs";
+  public static final String HDFS_URI_KEY = "hdfsUri";
+  public static final String QUERY_FILE_KEY = "queryFile";
+  public static final String QUERY_INFO_KEY = "queryInfo";
+  public static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas";
+  public static final String QSCHEMA_KEY = "qSchema";
+  public static final String DSCHEMA_KEY = "dschema";
+  public static final String OUTPUT_FILE_KEY = "output";
+  public static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector";
+  public static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector";
+  public static final String SALT_COLUMNS_KEY = "saltColumns";
+  public static final String ROW_DIVISIONS_KEY = "rowDivisions";
+  public static final String SPLIT_PARTITIONS_KEY = "splitPartitions";
+  public static final String N_SQUARED_KEY = "nSquared";
+  public static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar";
+  public static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar";
+
+  public static final String SALT = "salt";
+  public static final String FLUSH = "flush";
+
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
new file mode 100644
index 0000000..7fbca66
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.schema.data.DataSchemaLoader;
+import org.apache.pirk.schema.query.QuerySchemaLoader;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Constants;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Utils class for the Storm implementation of Wideskies
+ */
+public class StormUtils
+{
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(StormUtils.class);
+
+  /**
+   * Method to read in serialized Query object from the given queryFile
+   * 
+   * @param useHdfs
+   * @param hdfsUri
+   * @param queryFile
+   * @return
+   */
+  public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile)
+  {
+    Query query = null;
+
+    try
+    {
+      if (useHdfs)
+      {
+        FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
+        logger.info("reading query file from hdfs: " + queryFile);
+        query = (new HadoopFileSystemStore(fs)).recall(queryFile, Query.class);
+      }
+      else
+      {
+        logger.info("reading local query file from " + queryFile);
+        query = (new LocalFileSystemStore()).recall(queryFile, Query.class);
+      }
+    } catch (Exception e)
+    {
+      logger.error("Unable to initalize query info.", e);
+      throw new RuntimeException(e);
+    }
+    return query;
+  }
+
+  /**
+   * Method to read in and return a serialized Query object from the given file and initialize/load the query.schemas and data.schemas
+   * 
+   * @param map
+   * @return
+   */
+  public static Query prepareQuery(Map map)
+  {
+    Query query = null;
+
+    boolean useHdfs = (boolean) map.get(StormConstants.USE_HDFS);
+    String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
+    String queryFile = (String) map.get(StormConstants.QUERY_FILE_KEY);
+    try
+    {
+      query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile);
+
+    } catch (Exception e)
+    {
+      logger.warn("Unable to initialize query info.", e);
+    }
+
+    return query;
+  }
+
+  /***
+   * Initialize data and query schema. Conf requires values for USE_HDFS, HDFS_URI_KEY, DSCHEMA_KEY, and QSCHEMA_KEY
+   * 
+   * @param conf
+   */
+  public static void initializeSchemas(Map conf, String id)
+  {
+    SystemConfiguration.setProperty("data.schemas", (String) conf.get(StormConstants.DSCHEMA_KEY));
+    SystemConfiguration.setProperty("query.schemas", (String) conf.get(StormConstants.QSCHEMA_KEY));
+
+    try
+    {
+      boolean hdfs = (boolean) conf.get(StormConstants.USE_HDFS);
+      if (hdfs)
+      {
+        String hdfsUri = (String) conf.get(StormConstants.HDFS_URI_KEY);
+        FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
+        DataSchemaLoader.initialize(true, fs);
+        QuerySchemaLoader.initialize(true, fs);
+      }
+      else
+      {
+        DataSchemaLoader.initialize();
+        QuerySchemaLoader.initialize();
+      }
+    } catch (Exception e)
+    {
+      logger.error("Failed to initialize schema files.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected static boolean isTickTuple(Tuple tuple)
+  {
+    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+  }
+
+}
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index 28de96e..c651eaa 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -31,7 +31,9 @@
 import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.pirk.schema.data.partitioner.DataPartitioner;
 import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
 import org.apache.pirk.utils.PIRException;
@@ -131,8 +133,8 @@
       InputStream is = null;
       if (hdfs)
       {
-        is = fs.open(new Path(schemaFile));
         logger.info("hdfs: filePath = " + schemaFile);
+        is = fs.open(fs.makeQualified(new Path(schemaFile)));
       }
       else
       {
diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
index 962e467..b0c9326 100644
--- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java
+++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
@@ -77,8 +77,6 @@
   {
     logger.info("Running testDNSHostnameQuery(): ");
 
-    QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY);
-
     int numExpectedResults = 6;
     List<QueryResponseJSON> results;
     if (isDistributed)
@@ -93,6 +91,14 @@
         numExpectedResults = 7; // all 7 for non distributed case; if testFalsePositive==true, then 6
       }
     }
+    checkDNSHostnameQueryResults(results, isDistributed, numExpectedResults, testFalsePositive, dataElements);
+    logger.info("Completed testDNSHostnameQuery(): ");
+  }
+
+  public static void checkDNSHostnameQueryResults(List<QueryResponseJSON> results, boolean isDistributed, int numExpectedResults,
+      boolean testFalsePositive, List<JSONObject> dataElements)
+  {
+    QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY);
     logger.info("results:");
     printResultList(results);
 
@@ -188,7 +194,6 @@
         }
       }
     }
-    logger.info("Completed testDNSHostnameQuery(): ");
   }
 
   public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
index 8f82f1c..7501aaa 100644
--- a/src/main/resources/log4j2.xml
+++ b/src/main/resources/log4j2.xml
@@ -46,4 +46,4 @@
 		</Root>
 	</Loggers>
 
-</Configuration>
\ No newline at end of file
+</Configuration>
diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties
index 963fa34..a88c846 100755
--- a/src/main/resources/pirk.properties
+++ b/src/main/resources/pirk.properties
@@ -228,9 +228,3 @@
 		
 #Parallelism for expLookupTable creation in hdfs 
 pir.expCreationSplits = 600
-
-
-
-
-
-
diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties
index 3ae92c7..ac6cb35 100644
--- a/src/main/resources/responder.properties
+++ b/src/main/resources/responder.properties
@@ -154,4 +154,47 @@
 #default is false
 #spark.streaming.stopGracefullyOnShutdown=
 
- 
\ No newline at end of file
+ ##Properties for Kafka
+ #kafka.topic = topicName
+ #kafka.clientId = pirk_spout
+
+ # Kafka Zookeepers
+ #kafka.zk = localhost:2181
+ # Read from beginning of Kafka topic on startup
+ #kafka.forceFromStart = false
+
+
+ ##Properties for Storm
+ #storm.topoName = pir
+ #storm.workers = 1
+ #storm.numAckers = 1
+ #storm.maxSpoutPending=10
+ #storm.worker.heapMemory=6000
+ #storm.componentOnheapMem= 600.0
+
+ # This should be set to the number of Kafka partitions
+ #storm.spout.parallelism = 1
+
+ #storm.hashbolt.parallelism = 1
+ #storm.encrowcalcbolt.parallelism = 1
+ # This bolt is most computationally expensive and should have the highest value
+ #storm.enccolmultbolt.parallelism = 2
+
+ # These may be useful for tuning
+ #storm.executor.receiveBufferSize = 1024
+ #storm.executor.sendBufferSize = 1024
+ #storm.transferBufferSize = 8
+
+ # Frequency with which PIR matrix elements are flushed out
+ #storm.encrowcalcbolt.ticktuple = 60
+
+ # Design configurations:
+ # Hashbolt emits individual tuples for each data partition when splitPartitions =true
+ # emits the batch of data partitions for a record in a single tuple when =false
+ #storm.splitPartitions = true
+ # A task running EncColMultBolt will only be responsible for multiplying a subset of the row
+ # for any individual column when saltColumns = true
+ # All multiplication for a single column is done on a single EncColMultBolt instance when = false
+ #storm.saltColumns = true
+ # Only makes sense to tune if saltColumns=true
+ #storm.rowDivs = 1
diff --git a/src/test/java/org/apache/pirk/general/PaillierTest.java b/src/test/java/org/apache/pirk/general/PaillierTest.java
index 14347fa..1cc35d1 100644
--- a/src/test/java/org/apache/pirk/general/PaillierTest.java
+++ b/src/test/java/org/apache/pirk/general/PaillierTest.java
@@ -257,7 +257,7 @@
   {
     Random r = new Random();
     int lowBitLength = 3073; // inclusive
-    int highBitLength = 7001; // exclusive
+    int highBitLength = 4000; // exclusive
 
     int loopVal = 1; // int loopVal = 1000; //change this and re-test for high loop testing
     for (int i = 0; i < loopVal; ++i)
@@ -267,7 +267,7 @@
       basicTestPaillierWithKeyGeneration(bitLength, certainty, ensureBitSet);
       basicTestPaillierWithKeyGeneration(3072, certainty, ensureBitSet);
 
-      // Test with random bit length between 3073 and 7000
+      // Test with random bit length between 3073 and 4000
       int randomLargeBitLength = r.nextInt(highBitLength - lowBitLength) + lowBitLength;
       basicTestPaillierWithKeyGeneration(randomLargeBitLength, certainty, ensureBitSet);
     }
diff --git a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
new file mode 100644
index 0000000..906c337
--- /dev/null
+++ b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.storm;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pirk.encryption.Paillier;
+import org.apache.pirk.querier.wideskies.Querier;
+import org.apache.pirk.querier.wideskies.QuerierConst;
+import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
+import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.storm.*;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.schema.query.filter.StopListFilter;
+import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.test.utils.BaseTests;
+import org.apache.pirk.test.utils.Inputs;
+import org.apache.pirk.test.utils.TestUtils;
+import org.apache.pirk.utils.QueryResultsWriter;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Config;
+import org.apache.storm.ILocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MkClusterParam;
+import org.apache.storm.testing.TestJob;
+import org.json.simple.JSONObject;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Properties;
+import java.util.HashMap;
+import java.util.Arrays;
+import java.util.ArrayList;
+
+@Category(IntegrationTest.class)
+public class KafkaStormIntegrationTest
+{
+  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaStormIntegrationTest.class);
+
+  private static final LocalFileSystemStore localStore = new LocalFileSystemStore();
+
+  private static TestingServer zookeeperLocalCluster;
+  private static KafkaServer kafkaLocalBroker;
+  private static ZkClient zkClient;
+
+  private static final String topic = "pirk_test_topic";
+  private static final String kafkaTmpDir = "/tmp/kafka";
+
+  private static File fileQuery;
+  private static File fileQuerier;
+
+  private QueryInfo queryInfo;
+  private BigInteger nSquared;
+
+  private static int testCountDown = 4;
+
+  @Test
+  public void testKafkaStormIntegration() throws Exception
+  {
+    SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true");
+    SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10");
+    SystemConfiguration.setProperty("storm.spout.parallelism", "1");
+    SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1");
+    SystemConfiguration.setProperty("storm.encrowcalcbolt.parallelism", "2");
+    SystemConfiguration.setProperty("storm.enccolmultbolt.parallelism", "2");
+    SystemConfiguration.setProperty("storm.encrowcalcbolt.ticktuple", "8");
+    SystemConfiguration.setProperty("storm.rowDivs", "2");
+    SystemConfiguration.setProperty("hdfs.use", "false");
+
+    startZookeeper();
+    startKafka();
+
+    SystemConfiguration.setProperty("kafka.topic", topic);
+    SystemConfiguration.setProperty("storm.topoName", "pirTest");
+
+    // Create encrypted file
+    SystemConfiguration.setProperty("pir.stopListFile", "none");
+    Inputs.createSchemaFiles(StopListFilter.class.getName());
+
+    // Perform encryption. Set queryInfo, nSquared, fileQuery, and fileQuerier
+    performEncryption();
+    SystemConfiguration.setProperty("pir.queryInput", fileQuery.getAbsolutePath());
+
+    KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig());
+    loadTestData(producer);
+
+
+    logger.info("Test (splitPartitions,saltColumns) = (true,true)");
+    SystemConfiguration.setProperty("storm.splitPartitions", "true");
+    SystemConfiguration.setProperty("storm.saltColumns", "true");
+    runTest();
+
+    logger.info("Test (splitPartitions,saltColumns) = (true,false)");
+    SystemConfiguration.setProperty("storm.splitPartitions", "true");
+    SystemConfiguration.setProperty("storm.saltColumns", "false");
+    runTest();
+
+    logger.info("Test (splitPartitions,saltColumns) = (false,true)");
+    SystemConfiguration.setProperty("storm.splitPartitions", "false");
+    SystemConfiguration.setProperty("storm.saltColumns", "true");
+    runTest();
+
+    logger.info("Test (splitPartitions,saltColumns) = (false,false)");
+    SystemConfiguration.setProperty("storm.splitPartitions", "false");
+    SystemConfiguration.setProperty("storm.saltColumns", "false");
+    runTest();
+  }
+
+  private void runTest() throws Exception
+  {
+    File responderFile = File.createTempFile("responderFile", ".txt");
+    logger.info("Starting topology.");
+    runTopology(responderFile);
+
+    // decrypt results
+    logger.info("Decrypting results. " + responderFile.length());
+    File fileFinalResults = performDecryption(responderFile);
+
+    // check results
+    List<QueryResponseJSON> results = TestUtils.readResultsFile(fileFinalResults);
+    BaseTests.checkDNSHostnameQueryResults(results, false, 7, false, Inputs.createJSONDataElements());
+
+    responderFile.deleteOnExit();
+    fileFinalResults.deleteOnExit();
+  }
+
+  private void runTopology(File responderFile) throws Exception
+  {
+    MkClusterParam mkClusterParam = new MkClusterParam();
+    // The test sometimes fails because of timing issues when more than 1 supervisor set.
+    mkClusterParam.setSupervisors(1);
+
+    // Maybe using "withSimulatedTimeLocalCluster" would be better to avoid worrying about timing.
+    Config conf = PirkTopology.createStormConf();
+    conf.put(StormConstants.OUTPUT_FILE_KEY, responderFile.getAbsolutePath());
+    conf.put(StormConstants.N_SQUARED_KEY, nSquared.toString());
+    conf.put(StormConstants.QUERY_INFO_KEY, queryInfo.toMap());
+    // conf.setDebug(true);
+    mkClusterParam.setDaemonConf(conf);
+
+    TestJob testJob = createPirkTestJob(conf);
+    Testing.withLocalCluster(mkClusterParam, testJob);
+    // Testing.withSimulatedTimeLocalCluster(mkClusterParam, testJob);
+  }
+
+  private TestJob createPirkTestJob(final Config config)
+  {
+    final SpoutConfig kafkaConfig = setUpTestKafkaSpout(config);
+    return new TestJob()
+    {
+      StormTopology topology = PirkTopology.getPirkTopology(kafkaConfig);
+
+      @Override
+      public void run(ILocalCluster iLocalCluster) throws Exception
+      {
+        iLocalCluster.submitTopology("pirk_integration_test", config, topology);
+        logger.info("Pausing for setup.");
+        //Thread.sleep(4000);
+        //KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig());
+        //loadTestData(producer);
+        //Thread.sleep(10000);
+        while(OutputBolt.latch.getCount() == testCountDown){
+          Thread.sleep(1000);
+        }
+        testCountDown -=1;
+
+        logger.info("Finished...");
+      }
+    };
+  }
+
+  private SpoutConfig setUpTestKafkaSpout(Config conf)
+  {
+    ZkHosts zkHost = new ZkHosts(zookeeperLocalCluster.getConnectString());
+
+    SpoutConfig kafkaConfig = new SpoutConfig(zkHost, topic, "/pirk_test_root", "pirk_integr_test_spout");
+    kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf));
+    logger.info("KafkaConfig initialized...");
+
+    return kafkaConfig;
+  }
+
+  private void startZookeeper() throws Exception
+  {
+    logger.info("Starting zookeeper.");
+    zookeeperLocalCluster = new TestingServer();
+    zookeeperLocalCluster.start();
+    logger.info("Zookeeper initialized.");
+
+  }
+
+  private void startKafka() throws Exception
+  {
+    FileUtils.deleteDirectory(new File(kafkaTmpDir));
+
+    Properties props = new Properties();
+    props.setProperty("zookeeper.session.timeout.ms", "100000");
+    props.put("advertised.host.name", "localhost");
+    props.put("port", 11111);
+    // props.put("broker.id", "0");
+    props.put("log.dir", kafkaTmpDir);
+    props.put("enable.zookeeper", "true");
+    props.put("zookeeper.connect", zookeeperLocalCluster.getConnectString());
+    KafkaConfig kafkaConfig = KafkaConfig.fromProps(props);
+    kafkaLocalBroker = new KafkaServer(kafkaConfig, new SystemTime(), scala.Option.apply("kafkaThread"));
+    kafkaLocalBroker.startup();
+
+    zkClient = new ZkClient(zookeeperLocalCluster.getConnectString(), 60000, 60000, ZKStringSerializer$.MODULE$);
+    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperLocalCluster.getConnectString()), false);
+    //ZkUtils zkUtils = ZkUtils.apply(zookeeperLocalCluster.getConnectString(), 60000, 60000, false);
+    AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception
+  {
+    zkClient.close();
+    kafkaLocalBroker.shutdown();
+    zookeeperLocalCluster.stop();
+
+    FileUtils.deleteDirectory(new File(kafkaTmpDir));
+
+    fileQuery.delete();
+    fileQuerier.delete();
+
+  }
+
+  private HashMap<String,Object> createKafkaProducerConfig()
+  {
+    String kafkaHostName = "localhost";
+    Integer kafkaPorts = 11111;
+    HashMap<String,Object> config = new HashMap<String,Object>();
+    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + kafkaPorts);
+    config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+    return config;
+  }
+
+  private void loadTestData(KafkaProducer producer)
+  {
+    for (JSONObject dataRecord : Inputs.createJSONDataElements())
+    {
+      logger.info("Sending record to Kafka " + dataRecord.toString());
+      producer.send(new ProducerRecord<String,String>(topic, dataRecord.toString()));
+    }
+  }
+
+  private void performEncryption() throws Exception
+  {
+    // ArrayList<String> selectors = BaseTests.selectorsDomain;
+    List<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net"));
+    String queryType = Inputs.DNS_HOSTNAME_QUERY;
+
+    Paillier paillier = new Paillier(BaseTests.paillierBitSize, BaseTests.certainty);
+
+    nSquared = paillier.getNSquared();
+
+    queryInfo = new QueryInfo(BaseTests.queryIdentifier, selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, queryType,
+        false, true, false);
+
+    // Perform the encryption
+    logger.info("Performing encryption of the selectors - forming encrypted query vectors:");
+    EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, paillier);
+    Querier querier = encryptQuery.encrypt(1);
+    logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:");
+
+    // Write out files.
+    fileQuerier = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG, ".txt");
+    fileQuery = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG, ".txt");
+
+    localStore.store(fileQuerier.getAbsolutePath(), querier);
+    localStore.store(fileQuery, querier.getQuery());
+  }
+
+  private File performDecryption(File responseFile) throws Exception
+  {
+    File finalResults = File.createTempFile("finalFileResults", ".txt");
+    String querierFilePath = fileQuerier.getAbsolutePath();
+    String responseFilePath = responseFile.getAbsolutePath();
+    String outputFile = finalResults.getAbsolutePath();
+    int numThreads = 1;
+
+    Response response = localStore.recall(responseFilePath, Response.class);
+    Querier querier = localStore.recall(querierFilePath, Querier.class);
+
+    // Perform decryption and output the result file
+    DecryptResponse decryptResponse = new DecryptResponse(response, querier);
+    decryptResponse.decrypt(numThreads);
+    QueryResultsWriter.writeResultFile(outputFile, decryptResponse.decrypt(numThreads));
+    return finalResults;
+  }
+
+}
diff --git a/src/test/java/org/apache/pirk/storm/SystemTime.java b/src/test/java/org/apache/pirk/storm/SystemTime.java
new file mode 100644
index 0000000..e8dcba2
--- /dev/null
+++ b/src/test/java/org/apache/pirk/storm/SystemTime.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.storm;
+
+import kafka.utils.Time;
+
+public class SystemTime implements Time
+{
+
+  @Override
+  public long milliseconds()
+  {
+    return System.currentTimeMillis();
+  }
+
+  @Override
+  public long nanoseconds()
+  {
+    return System.nanoTime();
+  }
+
+  @Override
+  public void sleep(long arg0)
+  {
+    try
+    {
+      Thread.sleep(arg0);
+    } catch (InterruptedException e)
+    {
+
+    }
+  }
+
+}