Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-pirk
diff --git a/docs/org/apache/pirk/schema/data/LoadDataSchemas.html b/docs/org/apache/pirk/schema/data/LoadDataSchemas.html
deleted file mode 100644
index 99649fe..0000000
--- a/docs/org/apache/pirk/schema/data/LoadDataSchemas.html
+++ /dev/null
@@ -1,335 +0,0 @@
-<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<!-- NewPage -->
-<html lang="en">
-<head>
-<!-- Generated by javadoc (version 1.7.0_80) on Sun Jul 24 11:37:26 EDT 2016 -->
-<title>LoadDataSchemas</title>
-<meta name="date" content="2016-07-24">
-<link rel="stylesheet" type="text/css" href="../../../../../stylesheet.css" title="Style">
-</head>
-<body>
-<script type="text/javascript"><!--
- if (location.href.indexOf('is-external=true') == -1) {
- parent.document.title="LoadDataSchemas";
- }
-//-->
-</script>
-<noscript>
-<div>JavaScript is disabled on your browser.</div>
-</noscript>
-<!-- ========= START OF TOP NAVBAR ======= -->
-<div class="topNav"><a name="navbar_top">
-<!-- -->
-</a><a href="#skip-navbar_top" title="Skip navigation links"></a><a name="navbar_top_firstrow">
-<!-- -->
-</a>
-<ul class="navList" title="Navigation">
-<li><a href="../../../../../overview-summary.html">Overview</a></li>
-<li><a href="package-summary.html">Package</a></li>
-<li class="navBarCell1Rev">Class</li>
-<li><a href="class-use/LoadDataSchemas.html">Use</a></li>
-<li><a href="package-tree.html">Tree</a></li>
-<li><a href="../../../../../deprecated-list.html">Deprecated</a></li>
-<li><a href="../../../../../index-files/index-1.html">Index</a></li>
-<li><a href="../../../../../help-doc.html">Help</a></li>
-</ul>
-</div>
-<div class="subNav">
-<ul class="navList">
-<li><a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data"><span class="strong">Prev Class</span></a></li>
-<li>Next Class</li>
-</ul>
-<ul class="navList">
-<li><a href="../../../../../index.html?org/apache/pirk/schema/data/LoadDataSchemas.html" target="_top">Frames</a></li>
-<li><a href="LoadDataSchemas.html" target="_top">No Frames</a></li>
-</ul>
-<ul class="navList" id="allclasses_navbar_top">
-<li><a href="../../../../../allclasses-noframe.html">All Classes</a></li>
-</ul>
-<div>
-<script type="text/javascript"><!--
- allClassesLink = document.getElementById("allclasses_navbar_top");
- if(window==top) {
- allClassesLink.style.display = "block";
- }
- else {
- allClassesLink.style.display = "none";
- }
- //-->
-</script>
-</div>
-<div>
-<ul class="subNavList">
-<li>Summary: </li>
-<li>Nested | </li>
-<li>Field | </li>
-<li><a href="#constructor_summary">Constr</a> | </li>
-<li><a href="#method_summary">Method</a></li>
-</ul>
-<ul class="subNavList">
-<li>Detail: </li>
-<li>Field | </li>
-<li><a href="#constructor_detail">Constr</a> | </li>
-<li><a href="#method_detail">Method</a></li>
-</ul>
-</div>
-<a name="skip-navbar_top">
-<!-- -->
-</a></div>
-<!-- ========= END OF TOP NAVBAR ========= -->
-<!-- ======== START OF CLASS DATA ======== -->
-<div class="header">
-<div class="subTitle">org.apache.pirk.schema.data</div>
-<h2 title="Class LoadDataSchemas" class="title">Class LoadDataSchemas</h2>
-</div>
-<div class="contentContainer">
-<ul class="inheritance">
-<li>java.lang.Object</li>
-<li>
-<ul class="inheritance">
-<li>org.apache.pirk.schema.data.LoadDataSchemas</li>
-</ul>
-</li>
-</ul>
-<div class="description">
-<ul class="blockList">
-<li class="blockList">
-<hr>
-<br>
-<pre>public class <span class="strong">LoadDataSchemas</span>
-extends java.lang.Object</pre>
-<div class="block">Class to load any data schemas specified in the properties file, 'data.schemas'
- <p>
- Schemas should be specified as follows; all items are treated in a case insensitive manner:
-
- <pre>
- <code><schema>
- <schemaName> name of the schema </schemaName>
- <element>
- <name> element name /name>
- <type> class name or type name (if Java primitive type) of the element </type>
- <isArray> true or false -- whether or not the schema element is an array within the data </isArray>
- <partitioner> optional - Partitioner class for the element; defaults to primitive java type partitioner </partitioner>
- </element>
- </schema>
- </code>
- </pre>
-
- Primitive types must be one of the following: "byte", "short", "int", "long", "float", "double", "char", "string"
- <p></div>
-</li>
-</ul>
-</div>
-<div class="summary">
-<ul class="blockList">
-<li class="blockList">
-<!-- ======== CONSTRUCTOR SUMMARY ======== -->
-<ul class="blockList">
-<li class="blockList"><a name="constructor_summary">
-<!-- -->
-</a>
-<h3>Constructor Summary</h3>
-<table class="overviewSummary" border="0" cellpadding="3" cellspacing="0" summary="Constructor Summary table, listing constructors, and an explanation">
-<caption><span>Constructors</span><span class="tabEnd"> </span></caption>
-<tr>
-<th class="colOne" scope="col">Constructor and Description</th>
-</tr>
-<tr class="altColor">
-<td class="colOne"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#LoadDataSchemas()">LoadDataSchemas</a></strong>()</code> </td>
-</tr>
-</table>
-</li>
-</ul>
-<!-- ========== METHOD SUMMARY =========== -->
-<ul class="blockList">
-<li class="blockList"><a name="method_summary">
-<!-- -->
-</a>
-<h3>Method Summary</h3>
-<table class="overviewSummary" border="0" cellpadding="3" cellspacing="0" summary="Method Summary table, listing methods, and an explanation">
-<caption><span>Methods</span><span class="tabEnd"> </span></caption>
-<tr>
-<th class="colFirst" scope="col">Modifier and Type</th>
-<th class="colLast" scope="col">Method and Description</th>
-</tr>
-<tr class="altColor">
-<td class="colFirst"><code>static <a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data">DataSchema</a></code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#getSchema(java.lang.String)">getSchema</a></strong>(java.lang.String schemaName)</code> </td>
-</tr>
-<tr class="rowColor">
-<td class="colFirst"><code>static java.util.HashMap<java.lang.String,<a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data">DataSchema</a>></code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#getSchemaMap()">getSchemaMap</a></strong>()</code> </td>
-</tr>
-<tr class="altColor">
-<td class="colFirst"><code>static java.util.Set<java.lang.String></code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#getSchemaNames()">getSchemaNames</a></strong>()</code> </td>
-</tr>
-<tr class="rowColor">
-<td class="colFirst"><code>static void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#initialize()">initialize</a></strong>()</code> </td>
-</tr>
-<tr class="altColor">
-<td class="colFirst"><code>static void</code></td>
-<td class="colLast"><code><strong><a href="../../../../../org/apache/pirk/schema/data/LoadDataSchemas.html#initialize(boolean,%20org.apache.hadoop.fs.FileSystem)">initialize</a></strong>(boolean hdfs,
- org.apache.hadoop.fs.FileSystem fs)</code> </td>
-</tr>
-</table>
-<ul class="blockList">
-<li class="blockList"><a name="methods_inherited_from_class_java.lang.Object">
-<!-- -->
-</a>
-<h3>Methods inherited from class java.lang.Object</h3>
-<code>equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait</code></li>
-</ul>
-</li>
-</ul>
-</li>
-</ul>
-</div>
-<div class="details">
-<ul class="blockList">
-<li class="blockList">
-<!-- ========= CONSTRUCTOR DETAIL ======== -->
-<ul class="blockList">
-<li class="blockList"><a name="constructor_detail">
-<!-- -->
-</a>
-<h3>Constructor Detail</h3>
-<a name="LoadDataSchemas()">
-<!-- -->
-</a>
-<ul class="blockListLast">
-<li class="blockList">
-<h4>LoadDataSchemas</h4>
-<pre>public LoadDataSchemas()</pre>
-</li>
-</ul>
-</li>
-</ul>
-<!-- ============ METHOD DETAIL ========== -->
-<ul class="blockList">
-<li class="blockList"><a name="method_detail">
-<!-- -->
-</a>
-<h3>Method Detail</h3>
-<a name="initialize()">
-<!-- -->
-</a>
-<ul class="blockList">
-<li class="blockList">
-<h4>initialize</h4>
-<pre>public static void initialize()
- throws java.lang.Exception</pre>
-<dl><dt><span class="strong">Throws:</span></dt>
-<dd><code>java.lang.Exception</code></dd></dl>
-</li>
-</ul>
-<a name="initialize(boolean, org.apache.hadoop.fs.FileSystem)">
-<!-- -->
-</a>
-<ul class="blockList">
-<li class="blockList">
-<h4>initialize</h4>
-<pre>public static void initialize(boolean hdfs,
- org.apache.hadoop.fs.FileSystem fs)
- throws java.lang.Exception</pre>
-<dl><dt><span class="strong">Throws:</span></dt>
-<dd><code>java.lang.Exception</code></dd></dl>
-</li>
-</ul>
-<a name="getSchemaMap()">
-<!-- -->
-</a>
-<ul class="blockList">
-<li class="blockList">
-<h4>getSchemaMap</h4>
-<pre>public static java.util.HashMap<java.lang.String,<a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data">DataSchema</a>> getSchemaMap()</pre>
-</li>
-</ul>
-<a name="getSchemaNames()">
-<!-- -->
-</a>
-<ul class="blockList">
-<li class="blockList">
-<h4>getSchemaNames</h4>
-<pre>public static java.util.Set<java.lang.String> getSchemaNames()</pre>
-</li>
-</ul>
-<a name="getSchema(java.lang.String)">
-<!-- -->
-</a>
-<ul class="blockListLast">
-<li class="blockList">
-<h4>getSchema</h4>
-<pre>public static <a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data">DataSchema</a> getSchema(java.lang.String schemaName)</pre>
-</li>
-</ul>
-</li>
-</ul>
-</li>
-</ul>
-</div>
-</div>
-<!-- ========= END OF CLASS DATA ========= -->
-<!-- ======= START OF BOTTOM NAVBAR ====== -->
-<div class="bottomNav"><a name="navbar_bottom">
-<!-- -->
-</a><a href="#skip-navbar_bottom" title="Skip navigation links"></a><a name="navbar_bottom_firstrow">
-<!-- -->
-</a>
-<ul class="navList" title="Navigation">
-<li><a href="../../../../../overview-summary.html">Overview</a></li>
-<li><a href="package-summary.html">Package</a></li>
-<li class="navBarCell1Rev">Class</li>
-<li><a href="class-use/LoadDataSchemas.html">Use</a></li>
-<li><a href="package-tree.html">Tree</a></li>
-<li><a href="../../../../../deprecated-list.html">Deprecated</a></li>
-<li><a href="../../../../../index-files/index-1.html">Index</a></li>
-<li><a href="../../../../../help-doc.html">Help</a></li>
-</ul>
-</div>
-<div class="subNav">
-<ul class="navList">
-<li><a href="../../../../../org/apache/pirk/schema/data/DataSchema.html" title="class in org.apache.pirk.schema.data"><span class="strong">Prev Class</span></a></li>
-<li>Next Class</li>
-</ul>
-<ul class="navList">
-<li><a href="../../../../../index.html?org/apache/pirk/schema/data/LoadDataSchemas.html" target="_top">Frames</a></li>
-<li><a href="LoadDataSchemas.html" target="_top">No Frames</a></li>
-</ul>
-<ul class="navList" id="allclasses_navbar_bottom">
-<li><a href="../../../../../allclasses-noframe.html">All Classes</a></li>
-</ul>
-<div>
-<script type="text/javascript"><!--
- allClassesLink = document.getElementById("allclasses_navbar_bottom");
- if(window==top) {
- allClassesLink.style.display = "block";
- }
- else {
- allClassesLink.style.display = "none";
- }
- //-->
-</script>
-</div>
-<div>
-<ul class="subNavList">
-<li>Summary: </li>
-<li>Nested | </li>
-<li>Field | </li>
-<li><a href="#constructor_summary">Constr</a> | </li>
-<li><a href="#method_summary">Method</a></li>
-</ul>
-<ul class="subNavList">
-<li>Detail: </li>
-<li>Field | </li>
-<li><a href="#constructor_detail">Constr</a> | </li>
-<li><a href="#method_detail">Method</a></li>
-</ul>
-</div>
-<a name="skip-navbar_bottom">
-<!-- -->
-</a></div>
-<!-- ======== END OF BOTTOM NAVBAR ======= -->
-</body>
-</html>
diff --git a/pom-with-benchmarks.xml b/pom-with-benchmarks.xml
index 9c85639..ee98c43 100644
--- a/pom-with-benchmarks.xml
+++ b/pom-with-benchmarks.xml
@@ -17,17 +17,17 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
-
+
<groupId>org.apache.pirk</groupId>
<artifactId>pirk</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
-
+
<name>Apache Pirk (incubating) Project</name>
- <description>Apache Pirk (incubating) is a framework for scalable
+ <description>Apache Pirk (incubating) is a framework for scalable
Private Information Retrieval (PIR). </description>
<url>http://pirk.incubator.apache.org/</url>
@@ -70,12 +70,12 @@
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
-
+
<repository>
<id>conjars.org</id>
<url>http://conjars.org/repo</url>
</repository>
-
+
</repositories>
<properties>
@@ -123,7 +123,7 @@
<artifactId>spark-core_2.11</artifactId>
<version>1.6.1</version>
</dependency>
-
+
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
@@ -135,7 +135,7 @@
<artifactId>commons-net</artifactId>
<version>3.3</version>
</dependency>
-
+
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
@@ -155,61 +155,61 @@
</exclusion>
</exclusions>
</dependency>
-
+
<!-- Square's JNA GMP module-->
<dependency>
- <groupId>com.squareup.jnagmp</groupId>
- <artifactId>jnagmp</artifactId>
- <version>1.1.0</version>
+ <groupId>com.squareup.jnagmp</groupId>
+ <artifactId>jnagmp</artifactId>
+ <version>1.1.0</version>
</dependency>
<!-- JMH for benchmarking the Paillier functions -->
<dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-core</artifactId>
- <version>${jmh.version}</version>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>${jmh.version}</version>
</dependency>
<dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-generator-annprocess</artifactId>
- <version>${jmh.version}</version>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.version}</version>
</dependency>
-
+
</dependencies>
<build>
<plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.18</version>
- <configuration>
- <redirectTestOutputToFile>true</redirectTestOutputToFile>
- <argLine combine.children="append">-Xmx1G
- -Djava.net.preferIPv4Stack=true</argLine>
- </configuration>
- <dependencies>
- <dependency>
- <!-- Force surefire to use JUnit -->
- <groupId>org.apache.maven.surefire</groupId>
- <artifactId>surefire-junit4</artifactId>
- <version>2.18</version>
- </dependency>
- </dependencies>
- </plugin>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <compilerVersion>${javac.target}</compilerVersion>
- <source>${javac.target}</source>
- <target>${javac.target}</target>
- </configuration>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <configuration>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <argLine combine.children="append">-Xmx1G
+ -Djava.net.preferIPv4Stack=true</argLine>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <!-- Force surefire to use JUnit -->
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit4</artifactId>
+ <version>2.18</version>
+ </dependency>
+ </dependencies>
</plugin>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version> <!-- older versions of maven-shade-plugin make JMH painful -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <compilerVersion>${javac.target}</compilerVersion>
+ <source>${javac.target}</source>
+ <target>${javac.target}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.3</version> <!-- older versions of maven-shade-plugin make JMH painful -->
<executions>
<execution>
@@ -223,10 +223,10 @@
<shadedClassifierName>exe</shadedClassifierName>
<transformers>
<transformer
- implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
<transformer
- implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
<filters>
@@ -242,37 +242,37 @@
</configuration>
</execution>
<execution>
- <phase>package</phase>
- <id>benchmark</id>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <!-- The jar is very corrupted if it isn't minimized -->
- <minimizeJar>true</minimizeJar>
- <finalName>${benchmarkjar.name}</finalName>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.openjdk.jmh.Main</mainClass>
- </transformer>
- </transformers>
- <filters>
- <filter>
- <!--
- Shading signed JARs will fail without this.
- http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
- -->
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
+ <phase>package</phase>
+ <id>benchmark</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <!-- The jar is very corrupted if it isn't minimized -->
+ <minimizeJar>true</minimizeJar>
+ <finalName>${benchmarkjar.name}</finalName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.openjdk.jmh.Main</mainClass>
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <!--
+ Shading signed JARs will fail without this.
+ http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
+ -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
</execution>
-
+
</executions>
</plugin>
</plugins>
diff --git a/pom.xml b/pom.xml
index b46325e..c14a928 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,653 +11,747 @@
License for the specific language governing permissions and ~ limitations
under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache</groupId>
- <artifactId>apache</artifactId>
- <version>18</version>
- </parent>
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>18</version>
+ </parent>
- <groupId>org.apache.pirk</groupId>
- <artifactId>apache-pirk</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
+ <groupId>org.apache.pirk</groupId>
+ <artifactId>apache-pirk</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
- <name>Apache Pirk (incubating) Project</name>
- <description>Apache Pirk (incubating) is a framework for scalable Private Information Retrieval (PIR). </description>
- <url>http://pirk.incubator.apache.org/</url>
+ <name>Apache Pirk (incubating) Project</name>
+ <description>Apache Pirk (incubating) is a framework for scalable Private Information Retrieval (PIR).</description>
+ <url>http://pirk.incubator.apache.org/</url>
- <!-- this is the year of inception at ASF -->
- <inceptionYear>2016</inceptionYear>
+ <!-- this is the year of inception at ASF -->
+ <inceptionYear>2016</inceptionYear>
- <organization>
- <name>The Apache Software Foundation</name>
- <url>https://www.apache.org</url>
- </organization>
+ <organization>
+ <name>The Apache Software Foundation</name>
+ <url>https://www.apache.org</url>
+ </organization>
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>https://www.apache.org/licenses/LICENSE-2.0</url>
- </license>
- </licenses>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>https://www.apache.org/licenses/LICENSE-2.0</url>
+ </license>
+ </licenses>
- <mailingLists>
- <mailingList>
- <name>Dev</name>
- <subscribe>dev-subscribe@pirk.incubator.apache.org</subscribe>
- <unsubscribe>dev-unsubscribe@pirk.incubator.apache.org</unsubscribe>
- <post>dev@pirk.incubator.apache.org</post>
- <archive>http://mail-archives.apache.org/mod_mbox/incubator-pirk-dev/</archive>
- </mailingList>
- <mailingList>
- <name>Commits</name>
- <subscribe>commits-subscribe@pirk.incubator.apache.org</subscribe>
- <unsubscribe>commits-unsubscribe@pirk.incubator.apache.org</unsubscribe>
- <archive>http://mail-archives.apache.org/mod_mbox/incubator-pirk-commits</archive>
- </mailingList>
- </mailingLists>
+ <mailingLists>
+ <mailingList>
+ <name>Dev</name>
+ <subscribe>dev-subscribe@pirk.incubator.apache.org</subscribe>
+ <unsubscribe>dev-unsubscribe@pirk.incubator.apache.org</unsubscribe>
+ <post>dev@pirk.incubator.apache.org</post>
+ <archive>http://mail-archives.apache.org/mod_mbox/incubator-pirk-dev/</archive>
+ </mailingList>
+ <mailingList>
+ <name>Commits</name>
+ <subscribe>commits-subscribe@pirk.incubator.apache.org</subscribe>
+ <unsubscribe>commits-unsubscribe@pirk.incubator.apache.org</unsubscribe>
+ <archive>http://mail-archives.apache.org/mod_mbox/incubator-pirk-commits</archive>
+ </mailingList>
+ </mailingLists>
- <scm>
- <connection>scm:git:git://git.apache.org/incubator-pirk.git</connection>
- <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-pirk.git</developerConnection>
- <url>https://git-wip-us.apache.org/repos/asf?p=incubator-pirk.git</url>
- <tag>HEAD</tag>
- </scm>
+ <scm>
+ <connection>scm:git:git://git.apache.org/incubator-pirk.git</connection>
+ <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-pirk.git</developerConnection>
+ <url>https://git-wip-us.apache.org/repos/asf?p=incubator-pirk.git</url>
+ <tag>HEAD</tag>
+ </scm>
- <issueManagement>
- <system>JIRA</system>
- <url>https://issues.apache.org/jira/browse/PIRK</url>
- </issueManagement>
+ <issueManagement>
+ <system>JIRA</system>
+ <url>https://issues.apache.org/jira/browse/PIRK</url>
+ </issueManagement>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <scala.version>2.10.4</scala.version>
- <jmh.version>1.11.3</jmh.version>
- <benchmarkjar.name>benchmarks</benchmarkjar.name>
- <javac.target>1.8</javac.target>
- <slf4j.version>1.7.21</slf4j.version>
- <log4j2.version>2.6.2</log4j2.version>
- <junit.version>4.12</junit.version>
- <log4j.configuration>log4j2.xml</log4j.configuration>
- <hadoop.version>2.7.2</hadoop.version>
- <apache-commons.version>3.3</apache-commons.version>
- <elasticsearch.version>2.3.3</elasticsearch.version>
- <spark-streaming.version>2.0.0</spark-streaming.version>
- <pirk.forkCount>1C</pirk.forkCount>
- <pirk.reuseForks>true</pirk.reuseForks>
- </properties>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <scala.version>2.10.4</scala.version>
+ <jmh.version>1.11.3</jmh.version>
+ <benchmarkjar.name>benchmarks</benchmarkjar.name>
+ <javac.target>1.8</javac.target>
+ <slf4j.version>1.7.21</slf4j.version>
+ <log4j2.version>2.1</log4j2.version>
+ <junit.version>4.12</junit.version>
+ <log4j.configuration>log4j2.xml</log4j.configuration>
+ <hadoop.version>2.7.2</hadoop.version>
+ <spark.version>1.6.1</spark.version>
+ <apache-commons.version>3.3</apache-commons.version>
+ <elasticsearch.version>2.3.3</elasticsearch.version>
+ <storm.version>1.0.1</storm.version>
+ <kafka.version>0.9.0.1</kafka.version>
+ <spark-streaming.version>2.0.0</spark-streaming.version>
+ <pirk.forkCount>1C</pirk.forkCount>
+ <pirk.reuseForks>true</pirk.reuseForks>
+ </properties>
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- </dependency>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math3</artifactId>
- <version>${apache-commons.version}</version>
- </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>${apache-commons.version}</version>
+ </dependency>
- <dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>1.1</version>
- </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1</version>
+ </dependency>
- <dependency>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- <version>${apache-commons.version}</version>
- </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>${apache-commons.version}</version>
+ </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>1.6.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scalap</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-hadoop</artifactId>
- <version>${elasticsearch.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>commons-net</artifactId>
- <groupId>commons-net</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-service</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>cascading</groupId>
- <artifactId>cascading-local</artifactId>
- </exclusion>
- <exclusion>
- <groupId>cascading</groupId>
- <artifactId>cascading-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>cascading</groupId>
- <artifactId>cascading-hadoop</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark-streaming.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- <version>${spark-streaming.version}</version>
- </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch-hadoop</artifactId>
+ <version>${elasticsearch.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-service</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>cascading</groupId>
+ <artifactId>cascading-local</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>cascading</groupId>
+ <artifactId>cascading-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>cascading</groupId>
+ <artifactId>cascading-hadoop</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <!-- Square's JNA GMP module -->
- <dependency>
- <groupId>com.squareup.jnagmp</groupId>
- <artifactId>jnagmp</artifactId>
- <version>1.1.0</version>
- </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <!-- JMH for benchmarking the Paillier functions -->
- <dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-core</artifactId>
- <version>${jmh.version}</version>
- <scope>provided</scope>
- </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
- <dependency>
- <groupId>org.openjdk.jmh</groupId>
- <artifactId>jmh-generator-annprocess</artifactId>
- <version>${jmh.version}</version>
- <scope>provided</scope>
- </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <!-- Sl4j modules -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.10.0</version>
+ <scope>test</scope>
+ </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <version>2.6.2</version>
- </dependency>
+ <!-- Square's JNA GMP module -->
+ <dependency>
+ <groupId>com.squareup.jnagmp</groupId>
+ <artifactId>jnagmp</artifactId>
+ <version>1.1.0</version>
+ </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.6.2</version>
- </dependency>
+ <!-- JMH for benchmarking the Paillier functions -->
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>${jmh.version}</version>
+ <scope>provided</scope>
+ </dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.version}</version>
+ <scope>provided</scope>
+ </dependency>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
+ <!-- Sl4j modules -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- </plugin>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2.version}</version>
+ </dependency>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- </plugin>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j2.version}</version>
+ </dependency>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- </plugin>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- </plugin>
+ </dependencies>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-release-plugin</artifactId>
- </plugin>
- </plugins>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <version>0.12</version>
- <configuration>
- <excludes>
- <exclude>.travis.yml</exclude> <!-- Travis CI Build Descriptor File -->
- <exclude>findbugs-exclude.xml</exclude> <!-- False positives for FindBugs analysis -->
- <exclude>KEYS</exclude> <!-- GPG keys of Release Managers -->
- <exclude>eclipse*.xml</exclude> <!-- Exclude eclipse* xml -->
- <exclude>docs/*</exclude> <!-- Exclude docs -->
- <exclude>logs/*</exclude> <!-- Exclude logs -->
- <exclude>**/m2.conf</exclude> <!-- Exclude Maven conf which gets installed on travis and fails RAT check -->
- <exclude>src/main/resources/META-INF/**</exclude>
- </excludes>
- </configuration>
- <dependencies>
- <!-- workaround for RAT-158 -->
- <dependency>
- <groupId>org.apache.maven.doxia</groupId>
- <artifactId>doxia-core</artifactId>
- <version>1.6</version>
- <exclusions>
- <exclusion>
- <groupId>xerces</groupId>
- <artifactId>xercesImpl</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
- <executions>
- <execution>
- <phase>validate</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
- <!-- Coverage analysis for tests -->
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- <version>0.7.5.201505241946</version>
- <executions>
- <execution>
- <goals>
- <goal>prepare-agent</goal>
- </goals>
- <configuration>
- <output>file</output>
- <dumpOnExit>true</dumpOnExit>
- </configuration>
- </execution>
- <execution>
- <id>report</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>report</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
- <!-- Report jacoco coverage to coveralls.io -->
- <plugin>
- <groupId>org.eluder.coveralls</groupId>
- <artifactId>coveralls-maven-plugin</artifactId>
- <version>4.1.0</version>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-release-plugin</artifactId>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.12</version>
+ <configuration>
+ <excludes>
+ <exclude>.travis.yml</exclude> <!-- Travis CI Build Descriptor File -->
+ <exclude>findbugs-exclude.xml</exclude> <!-- False positives for FindBugs analysis -->
+ <exclude>KEYS</exclude> <!-- GPG keys of Release Managers -->
+ <exclude>eclipse*.xml</exclude> <!-- Exclude eclipse* xml -->
+ <exclude>docs/*</exclude> <!-- Exclude docs -->
+ <exclude>logs/*</exclude> <!-- Exclude logs -->
+ <exclude>**/m2.conf</exclude> <!-- Exclude Maven conf which gets installed on travis and fails RAT check -->
+ <exclude>src/main/resources/META-INF/**</exclude>
+ </excludes>
+ </configuration>
+ <dependencies>
+ <!-- workaround for RAT-158 -->
+ <dependency>
+ <groupId>org.apache.maven.doxia</groupId>
+ <artifactId>doxia-core</artifactId>
+ <version>1.6</version>
+ <exclusions>
+ <exclusion>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Coverage analysis for tests -->
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>0.7.5.201505241946</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ <configuration>
+ <output>file</output>
+ <dumpOnExit>true</dumpOnExit>
+ </configuration>
+ </execution>
+ <execution>
+ <id>report</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Report jacoco coverage to coveralls.io -->
+ <plugin>
+ <groupId>org.eluder.coveralls</groupId>
+ <artifactId>coveralls-maven-plugin</artifactId>
+ <version>4.1.0</version>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.18</version>
- <configuration>
- <redirectTestOutputToFile>true</redirectTestOutputToFile>
- <argLine combine.children="append">-Xmx1G
- -Djava.net.preferIPv4Stack=true
- </argLine>
- <systemPropertyVariables>
- <log4j.configuration>${log4j.configuration}</log4j.configuration>
- </systemPropertyVariables>
- <forkCount>${pirk.forkCount}</forkCount>
- <reuseForks>${pirk.reuseForks}</reuseForks>
- </configuration>
- <dependencies>
- <dependency>
- <!-- Force surefire to use JUnit -->
- <groupId>org.apache.maven.surefire</groupId>
- <artifactId>surefire-junit4</artifactId>
- <version>2.18</version>
- </dependency>
- </dependencies>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <configuration>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <argLine combine.children="append">-Xmx1G
+ -Djava.net.preferIPv4Stack=true
+ </argLine>
+ <systemPropertyVariables>
+ <log4j.configuration>${log4j.configuration}</log4j.configuration>
+ </systemPropertyVariables>
+ <forkCount>${pirk.forkCount}</forkCount>
+ <reuseForks>${pirk.reuseForks}</reuseForks>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <!-- Force surefire to use JUnit -->
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit4</artifactId>
+ <version>2.18</version>
+ </dependency>
+ </dependencies>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <compilerVersion>${javac.target}</compilerVersion>
- <source>${javac.target}</source>
- <target>${javac.target}</target>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.5.1</version>
+ <configuration>
+ <compilerVersion>${javac.target}</compilerVersion>
+ <source>${javac.target}</source>
+ <target>${javac.target}</target>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>3.0.1</version>
- <configuration>
- <excludes>
- <exclude>org/apache/pirk/benchmark/**</exclude>
- <exclude>*/openjdk/**</exclude>
- <exclude>generated-sources/**</exclude>
- </excludes>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.0.1</version>
+ <configuration>
+ <excludes>
+ <exclude>org/apache/pirk/benchmark/**</exclude>
+ <exclude>*/openjdk/**</exclude>
+ <exclude>generated-sources/**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>3.0.1</version>
- <configuration>
- <excludes>
- <exclude>org/apache/pirk/benchmark/**</exclude>
- <exclude>*/openjdk/**</exclude>
- <exclude>generated-sources/**</exclude>
- </excludes>
- </configuration>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <configuration>
+ <excludes>
+ <exclude>org/apache/pirk/benchmark/**</exclude>
+ <exclude>*/openjdk/**</exclude>
+ <exclude>generated-sources/**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version> <!-- older versions of maven-shade-plugin make JMH painful -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.4.3</version> <!-- older versions of maven-shade-plugin make JMH painful -->
- <executions>
- <execution>
- <phase>package</phase>
- <id>main</id>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <shadedClassifierName>exe</shadedClassifierName>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- </transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
- </transformer>
- </transformers>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <id>main</id>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>exe</shadedClassifierName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ </transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
- <!-- in the version with benchmarks (pom-with-benchmarks.xml), this
- is where that <execution></execution> lives -->
+ <!-- in the version with benchmarks (pom-with-benchmarks.xml), this
+ is where that <execution></execution> lives -->
- </executions>
- </plugin>
+ </executions>
+ </plugin>
- <!--This plugin's configuration is used to store Eclipse m2e settings
- only. It has no influence on the Maven build itself. -->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>
- org.scala-tools
- </groupId>
- <artifactId>
- maven-scala-plugin
- </artifactId>
- <versionRange>
- [2.15.2,)
- </versionRange>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore />
- </action>
- </pluginExecution>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>
- org.apache.rat
- </groupId>
- <artifactId>
- apache-rat-plugin
- </artifactId>
- <versionRange>
- [0.11,)
- </versionRange>
- <goals>
- <goal>check</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore />
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
+ <!--This plugin's configuration is used to store Eclipse m2e settings
+ only. It has no influence on the Maven build itself. -->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.scala-tools
+ </groupId>
+ <artifactId>
+ maven-scala-plugin
+ </artifactId>
+ <versionRange>
+ [2.15.2,)
+ </versionRange>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.apache.rat
+ </groupId>
+ <artifactId>
+ apache-rat-plugin
+ </artifactId>
+ <versionRange>
+ [0.11,)
+ </versionRange>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-release-plugin</artifactId>
- <version>2.5.1</version>
- <configuration>
- <useReleaseProfile>true</useReleaseProfile>
- <releaseProfiles>signed_release</releaseProfiles>
- <autoVersionSubmodules>true</autoVersionSubmodules>
- <goals>deploy</goals>
- <tagNameFormat>@{project.artifactId}-@{project.version}</tagNameFormat>
- <pushChanges>false</pushChanges>
- <localCheckout>true</localCheckout>
- </configuration>
- <executions>
- <execution>
- <id>default</id>
- <goals>
- <goal>perform</goal>
- </goals>
- <configuration>
- <pomFileName>pom.xml</pomFileName>
- </configuration>
- </execution>
- </executions>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-release-plugin</artifactId>
+ <version>2.5.1</version>
+ <configuration>
+ <useReleaseProfile>true</useReleaseProfile>
+ <releaseProfiles>signed_release</releaseProfiles>
+ <autoVersionSubmodules>true</autoVersionSubmodules>
+ <goals>deploy</goals>
+ <tagNameFormat>@{project.artifactId}-@{project.version}</tagNameFormat>
+ <pushChanges>false</pushChanges>
+ <localCheckout>true</localCheckout>
+ </configuration>
+ <executions>
+ <execution>
+ <id>default</id>
+ <goals>
+ <goal>perform</goal>
+ </goals>
+ <configuration>
+ <pomFileName>pom.xml</pomFileName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>2.10.4</version>
- <configuration>
- <javadocDirectory>docs</javadocDirectory>
- <testJavadocDirectory>docs/test</testJavadocDirectory>
- <javadocVersion>1.8</javadocVersion>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <javadocDirectory>docs</javadocDirectory>
+ <testJavadocDirectory>docs/test</testJavadocDirectory>
+ <javadocVersion>1.8</javadocVersion>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
- </build>
+ </build>
- <profiles>
- <profile>
- <!-- Performs execution of Integration Tests using the Maven FailSafe
- Plugin. The view of integration tests in this context are those tests interfacing
- with external sources and services requiring additional resources or credentials
- that cannot be explicitly provided. -->
- <id>integration-tests</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>integration-test</goal>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <!-- Checks style and licensing requirements. This is a good idea to run
- for contributions and for the release process. While it would be nice to
- run always these plugins can considerably slow the build and have proven
- to create unstable builds in our multi-module project and when building using
- multiple threads. The stability issues seen with Checkstyle in multi-module
- builds include false-positives and false negatives. -->
- <id>contrib-check</id>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>check</goal>
- </goals>
- <phase>verify</phase>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <executions>
- <execution>
- <id>check-style</id>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <!-- This profile will disable DocLint which performs strict JavaDoc processing
- which was introduced in JDK 8. These are technically errors in the JavaDoc
- which we need to eventually address. However, if a release is performed using
- JDK 8, the JavaDoc generation would fail. By activating this profile when
- running on JDK 8 we can ensure the JavaDocs continue to generate successfully -->
- <id>disable-doclint</id>
- <activation>
- <jdk>1.8</jdk>
- </activation>
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <additionalparam>-Xdoclint:none</additionalparam>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
- </profile>
- </profiles>
+ <profiles>
+ <profile>
+ <!-- Performs execution of Integration Tests using the Maven FailSafe
+ Plugin. The view of integration tests in this context are those tests interfacing
+ with external sources and services requiring additional resources or credentials
+ that cannot be explicitly provided. -->
+ <id>integration-tests</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <!-- Checks style and licensing requirements. This is a good idea to run
+ for contributions and for the release process. While it would be nice to
+ run always these plugins can considerably slow the build and have proven
+ to create unstable builds in our multi-module project and when building using
+ multiple threads. The stability issues seen with Checkstyle in multi-module
+ builds include false-positives and false negatives. -->
+ <id>contrib-check</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>verify</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>check-style</id>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <!-- This profile will disable DocLint which performs strict JavaDoc processing
+ which was introduced in JDK 8. These are technically errors in the JavaDoc
+ which we need to eventually address. However, if a release is performed using
+ JDK 8, the JavaDoc generation would fail. By activating this profile when
+ running on JDK 8 we can ensure the JavaDocs continue to generate successfully -->
+ <id>disable-doclint</id>
+ <activation>
+ <jdk>1.8</jdk>
+ </activation>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <additionalparam>-Xdoclint:none</additionalparam>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+ </profile>
+ </profiles>
-</project>
\ No newline at end of file
+</project>
diff --git a/src/main/java/org/apache/pirk/query/wideskies/Query.java b/src/main/java/org/apache/pirk/query/wideskies/Query.java
index f43fe47..b0322d0 100644
--- a/src/main/java/org/apache/pirk/query/wideskies/Query.java
+++ b/src/main/java/org/apache/pirk/query/wideskies/Query.java
@@ -25,8 +25,8 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-
import java.util.function.Consumer;
+
import org.apache.pirk.encryption.ModPowAbstraction;
import org.apache.pirk.serialization.Storable;
import org.slf4j.Logger;
diff --git a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
index 362d26f..76cd755 100644
--- a/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
+++ b/src/main/java/org/apache/pirk/query/wideskies/QueryInfo.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pirk.query.wideskies;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import org.apache.pirk.schema.query.QuerySchema;
@@ -44,7 +47,7 @@
private String queryType = null; // QueryType string const
private int hashBitSize = 0; // Bit size of the keyed hash function
- private String hashKey = null; // Key for the keyed hash function
+ private String hashKey; // Key for the keyed hash function
private int numBitsPerDataElement = 0; // total num bits per returned data value - defined relative to query type
private int dataPartitionBitSize = 0; // num of bits for each partition of an incoming data element, must be < 32 right now
@@ -96,6 +99,33 @@
printQueryInfo();
}
+ public QueryInfo(Map queryInfoMap)
+ {
+ // The Storm Config serializes the map as a json and reads back in with numeric values as longs.
+ // So numerics need to be cast as a long and call .intValue. However, in PirkHashScheme the map contains ints.
+ identifier = UUID.fromString((String) queryInfoMap.get("uuid"));
+ queryType = (String) queryInfoMap.get("queryType");
+ hashKey = (String) queryInfoMap.get("hashKey");
+ useExpLookupTable = (boolean) queryInfoMap.get("useExpLookupTable");
+ useHDFSExpLookupTable = (boolean) queryInfoMap.get("useHDFSExpLookupTable");
+ embedSelector = (boolean) queryInfoMap.get("embedSelector");
+ try
+ {
+ numSelectors = ((Long) queryInfoMap.get("numSelectors")).intValue();
+ hashBitSize = ((Long) queryInfoMap.get("hashBitSize")).intValue();
+ numBitsPerDataElement = ((Long) queryInfoMap.get("numBitsPerDataElement")).intValue();
+ numPartitionsPerDataElement = ((Long) queryInfoMap.get("numPartitionsPerDataElement")).intValue();
+ dataPartitionBitSize = ((Long) queryInfoMap.get("dataPartitionsBitSize")).intValue();
+ } catch (ClassCastException e)
+ {
+ numSelectors = (int) queryInfoMap.get("numSelectors");
+ hashBitSize = (int) queryInfoMap.get("hashBitSize");
+ numBitsPerDataElement = (int) queryInfoMap.get("numBitsPerDataElement");
+ numPartitionsPerDataElement = (int) queryInfoMap.get("numPartitionsPerDataElement");
+ dataPartitionBitSize = (int) queryInfoMap.get("dataPartitionsBitSize");
+ }
+ }
+
public UUID getIdentifier()
{
return identifier;
@@ -151,6 +181,24 @@
return embedSelector;
}
+ public Map toMap()
+ {
+ Map<String,Object> queryInfo = new HashMap<String,Object>();
+ queryInfo.put("uuid", identifier.toString());
+ queryInfo.put("queryType", queryType);
+ queryInfo.put("numSelectors", numSelectors);
+ queryInfo.put("hashBitSize", hashBitSize);
+ queryInfo.put("hashKey", hashKey);
+ queryInfo.put("numBitsPerDataElement", numBitsPerDataElement);
+ queryInfo.put("numPartitionsPerDataElement", numPartitionsPerDataElement);
+ queryInfo.put("dataPartitionsBitSize", dataPartitionBitSize);
+ queryInfo.put("useExpLookupTable", useExpLookupTable);
+ queryInfo.put("useHDFSExpLookupTable", useHDFSExpLookupTable);
+ queryInfo.put("embedSelector", embedSelector);
+
+ return queryInfo;
+ }
+
public void addQuerySchema(QuerySchema qSchemaIn)
{
qSchema = qSchemaIn;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index 6b32418..f8e396b 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -28,6 +28,7 @@
import org.apache.pirk.responder.wideskies.spark.ComputeResponse;
import org.apache.pirk.responder.wideskies.spark.streaming.ComputeStreamingResponse;
import org.apache.pirk.responder.wideskies.standalone.Responder;
+import org.apache.pirk.responder.wideskies.storm.PirkTopology;
import org.apache.pirk.serialization.LocalFileSystemStore;
import org.apache.pirk.utils.SystemConfiguration;
import org.slf4j.Logger;
@@ -49,6 +50,11 @@
{
private static final Logger logger = LoggerFactory.getLogger(ResponderDriver.class);
+ enum Platform
+ {
+ MAPREDUCE, SPARK, SPARKSTREAMING, STORM, STANDALONE, NONE;
+ }
+
public static void main(String[] args) throws Exception
{
ResponderCLI responderCLI = new ResponderCLI(args);
@@ -56,49 +62,64 @@
// For handling System.exit calls from Spark Streaming
System.setSecurityManager(new SystemExitManager());
- if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("mapreduce"))
+ Platform platform = Platform.NONE;
+ String platformString = SystemConfiguration.getProperty(ResponderProps.PLATFORM);
+ try
{
- logger.info("Launching MapReduce ResponderTool:");
-
- ComputeResponseTool pirWLTool = new ComputeResponseTool();
- ToolRunner.run(pirWLTool, new String[] {});
+ platform = Platform.valueOf(platformString.toUpperCase());
+ } catch (IllegalArgumentException e)
+ {
+ logger.error("platform " + platformString + " not found.");
}
- else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("spark"))
+
+ switch (platform)
{
- logger.info("Launching Spark ComputeResponse:");
+ case MAPREDUCE:
+ logger.info("Launching MapReduce ResponderTool:");
- FileSystem fs = FileSystem.get(new Configuration());
- ComputeResponse computeResponse = new ComputeResponse(fs);
- computeResponse.performQuery();
- }
- else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("sparkstreaming"))
- {
- logger.info("Launching Spark ComputeStreamingResponse:");
+ ComputeResponseTool pirWLTool = new ComputeResponseTool();
+ ToolRunner.run(pirWLTool, new String[] {});
+ break;
- FileSystem fs = FileSystem.get(new Configuration());
- ComputeStreamingResponse computeSR = new ComputeStreamingResponse(fs);
- try
- {
- computeSR.performQuery();
- } catch (SystemExitException e)
- {
- // If System.exit(0) is not caught from Spark Streaming,
- // the application will complete with a 'failed' status
- logger.info("Exited with System.exit(0) from Spark Streaming");
- }
+ case SPARK:
+ logger.info("Launching Spark ComputeResponse:");
- // Teardown the context
- computeSR.teardown();
- }
- else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("standalone"))
- {
- logger.info("Launching Standalone Responder:");
+ ComputeResponse computeResponse = new ComputeResponse(FileSystem.get(new Configuration()));
+ computeResponse.performQuery();
+ break;
- String queryInput = SystemConfiguration.getProperty("pir.queryInput");
- Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
+ case SPARKSTREAMING:
+ logger.info("Launching Spark ComputeStreamingResponse:");
- Responder pirResponder = new Responder(query);
- pirResponder.computeStandaloneResponse();
+ ComputeStreamingResponse computeSR = new ComputeStreamingResponse(FileSystem.get(new Configuration()));
+ try
+ {
+ computeSR.performQuery();
+ } catch (SystemExitException e)
+ {
+ // If System.exit(0) is not caught from Spark Streaming,
+ // the application will complete with a 'failed' status
+ logger.info("Exited with System.exit(0) from Spark Streaming");
+ }
+
+ // Teardown the context
+ computeSR.teardown();
+ break;
+
+ case STORM:
+ logger.info("Launching Storm PirkTopology:");
+ PirkTopology.runPirkTopology();
+ break;
+
+ case STANDALONE:
+ logger.info("Launching Standalone Responder:");
+
+ String queryInput = SystemConfiguration.getProperty("pir.queryInput");
+ Query query = new LocalFileSystemStore().recall(queryInput, Query.class);
+
+ Responder pirResponder = new Responder(query);
+ pirResponder.computeStandaloneResponse();
+ break;
}
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index 4fee85a..f5d24d7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -21,6 +21,8 @@
import java.util.Arrays;
import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.cli.Option;
import org.apache.pirk.inputformat.hadoop.InputFormatConst;
import org.apache.pirk.schema.data.DataSchemaLoader;
@@ -76,10 +78,45 @@
public static final String MAXBATCHES = "pir.sparkstreaming.maxBatches";
public static final String STOPGRACEFULLY = "spark.streaming.stopGracefullyOnShutdown";
- static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, ESPORT,
- OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
+ // Storm parameters
+ // hdfs
+ static final String HDFSURI = "hdfs.uri";
+ static final String USEHDFS = "hdfs.use";
+ // kafka
+ static final String KAFKATOPIC = "kafka.topic";
+ static final String KAFKACLIENTID = "kafka.clientId";
+ static final String KAFKAZK = "kafka.zk";
+ static final String KAFKAFORCEFROMSTART = "kafka.forceFromStart";
+ // pirk topo
+ static final String STORMTOPONAME = "storm.topoName";
+ static final String STORMWORKERS = "storm.workers";
+ static final String STORMNUMACKERS = "storm.numAckers";
+ static final String STORMRECEIVEBUFFERS = "storm.executor.receiveBufferSize";
+ static final String STORMSENDBUFFERS = "storm.executor.sendBufferSize";
+ static final String STORMTRANSFERBUFFERS = "storm.executor.transferBufferSize";
+ static final String STORMMAXSPOUTPENDING = "storm.maxSpoutPending";
+ static final String STORMHEAPMEMORY = "storm.worker.heapMemory";
+ static final String STORMCHILDOPTS = "storm.worker.childOpts";
+ static final String STORMMAXWORKERHEAP = "storm.maxWorkerHeapMemory";
+ static final String STORMCOMPONENTONHEAP = "storm.componentOnheapMem";
+ static final String STORMSPOUTPAR = "storm.spout.parallelism";
+ static final String STORMPARTITIONDATABOLTPAR = "storm.partitiondata.parallelism";
+ static final String STORMENCROWCALCBOLTPAR = "storm.encrowcalcbolt.parallelism";
+ static final String STORMENCCOLMULTBOLTPAR = "storm.enccolmultbolt.parallelism";
+ static final String STORMFLUSHFREQUENCY = "storm.encrowcalcbolt.ticktuple";
+ static final String STORMSPLITPARTITIONS = "storm.splitPartitions";
+ static final String STORMSALTCOLUMNS = "storm.saltColumns";
+ static final String STORMNUMROWDIVS = "storm.rowDivs";
+
+ static final String[] STORMPROPS = new String[]{HDFSURI, USEHDFS, KAFKATOPIC, KAFKACLIENTID, KAFKAZK, KAFKAFORCEFROMSTART, STORMTOPONAME, STORMWORKERS,
+ STORMNUMACKERS, STORMRECEIVEBUFFERS, STORMSENDBUFFERS, STORMTRANSFERBUFFERS, STORMMAXSPOUTPENDING, STORMHEAPMEMORY, STORMCHILDOPTS, STORMMAXWORKERHEAP,
+ STORMCOMPONENTONHEAP, STORMSPOUTPAR, STORMPARTITIONDATABOLTPAR, STORMENCROWCALCBOLTPAR, STORMENCCOLMULTBOLTPAR, STORMFLUSHFREQUENCY, STORMSPLITPARTITIONS,
+ STORMSALTCOLUMNS, STORMNUMROWDIVS};
+
+ static final List<String> PROPSLIST = Arrays.asList((String[]) ArrayUtils.addAll(new String[]{PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, OUTPUTFILE,
+ BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN,
- COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY);
+ COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY}, STORMPROPS));
/**
* Validates the responder properties
@@ -98,7 +135,7 @@
}
String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase();
- if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("standalone"))
+ if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("storm") && !platform.equals("standalone"))
{
logger.info("Unsupported platform: " + platform);
valid = false;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
index 0745bea..0050e29 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
@@ -44,7 +44,6 @@
/**
* Class to compute the encrypted row elements for a query from extracted data partitions
- *
*/
public class ComputeEncryptedRow
{
@@ -98,7 +97,6 @@
* Optionally uses a static LRU cache for the modular exponentiation
* <p>
* Emits {@code Tuple2<<colNum, colVal>>}
- *
*/
public static List<Tuple2<Long,BigInteger>> computeEncRow(Iterable<BytesArrayWritable> dataPartitionsIter, Query query, int rowIndex,
boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) throws IOException
@@ -112,7 +110,7 @@
int elementCounter = 0;
for (BytesArrayWritable dataPartitions : dataPartitionsIter)
{
- logger.debug("rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
+ logger.debug("rowIndex = {} elementCounter = {}", rowIndex, elementCounter);
if (limitHitsPerSelector)
{
@@ -121,7 +119,7 @@
break;
}
}
- logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter);
+ logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter);
// Update the associated column values
for (int i = 0; i < dataPartitions.size(); ++i)
@@ -142,8 +140,8 @@
{
e.printStackTrace();
}
- logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = "
- + exp + " i = " + i + " partition = " + dataPartitions.getBigInteger(i) + " = " + dataPartitions.getBigInteger(i).toString(2));
+ logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}",
+ rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.getBigInteger(i), dataPartitions.getBigInteger(i).toString(2));
returnPairs.add(new Tuple2<>(colCounter, exp));
@@ -162,7 +160,6 @@
* Optionally uses a static LRU cache for the modular exponentiation
* <p>
* Emits {@code Tuple2<<colNum, colVal>>}
- *
*/
public static List<Tuple2<Long,BigInteger>> computeEncRowBI(Iterable<List<BigInteger>> dataPartitionsIter, Query query, int rowIndex,
boolean limitHitsPerSelector, int maxHitsPerSelector, boolean useCache) throws IOException
@@ -178,7 +175,7 @@
{
// long startTime = System.currentTimeMillis();
- logger.debug("rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
+ logger.debug("rowIndex = {} elementCounter = {}", rowIndex, elementCounter);
if (limitHitsPerSelector)
{
@@ -188,8 +185,7 @@
break;
}
}
- logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter);
-
+ logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter);
// Update the associated column values
for (int i = 0; i < dataPartitions.size(); ++i)
{
@@ -209,8 +205,9 @@
{
e.printStackTrace();
}
- logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = "
- + exp + " i = " + i);
+
+ logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {}",
+ rowIndex, colCounter, part.toString(), part.toString(2), exp, i);
returnPairs.add(new Tuple2<>(colCounter, exp));
@@ -234,7 +231,6 @@
* For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values.
* <p>
* Emits {@code Tuple2<<colNum, colVal>>}
- *
*/
public static List<Tuple2<Long,BigInteger>> computeEncRowCacheInput(Iterable<List<BigInteger>> dataPartitionsIter, HashMap<Integer,BigInteger> cache,
int rowIndex, boolean limitHitsPerSelector, int maxHitsPerSelector) throws IOException
@@ -245,7 +241,7 @@
int elementCounter = 0;
for (List<BigInteger> dataPartitions : dataPartitionsIter)
{
- logger.debug("elementCounter = " + elementCounter);
+ logger.debug("elementCounter = {}", elementCounter);
if (limitHitsPerSelector)
{
@@ -254,7 +250,7 @@
break;
}
}
- logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter);
+ logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter);
// Update the associated column values
for (int i = 0; i < dataPartitions.size(); ++i)
@@ -262,7 +258,7 @@
BigInteger part = dataPartitions.get(i);
BigInteger exp = cache.get(part.intValue());
- logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " exp = " + exp + " i = " + i);
+ logger.debug("rowIndex = {} colCounter = {} part = {} exp = {} i = {}", rowIndex, colCounter, part.toString(), exp, i);
returnPairs.add(new Tuple2<>(colCounter, exp));
@@ -283,7 +279,6 @@
* Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values
* <p>
* Emits {@code Tuple2<<colNum, colVal>>}
- *
*/
public static List<Tuple2<Long,BigInteger>> computeEncRow(BytesArrayWritable dataPartitions, Query query, int rowIndex, int colIndex) throws IOException
{
@@ -295,7 +290,7 @@
// Initialize the column counter
long colCounter = colIndex;
- logger.debug("dataPartitions.size() = " + dataPartitions.size() + " rowIndex = " + rowIndex + " colCounter = " + colCounter);
+ logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions.size(), rowIndex, colCounter);
// Update the associated column values
for (int i = 0; i < dataPartitions.size(); ++i)
@@ -311,8 +306,8 @@
e.printStackTrace();
}
- logger.debug("rowIndex = " + rowIndex + " colCounter = " + colCounter + " part = " + part.toString() + " part binary = " + part.toString(2) + " exp = "
- + exp + " i = " + i + " partition = " + dataPartitions.getBigInteger(i) + " = " + dataPartitions.getBigInteger(i).toString(2));
+ logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}",
+ rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.getBigInteger(i), dataPartitions.getBigInteger(i).toString(2));
returnPairs.add(new Tuple2<>(colCounter, exp));
@@ -321,4 +316,81 @@
return returnPairs;
}
+
+ /**
+ * Method to compute the encrypted row elements for a query from extracted data partitions in the form of ArrayList<<BigInteger>>
+ * <p>
+ * For each row (as indicated by key = hash(selector)), iterates over the dataPartitions and calculates the column values.
+ * <p>
+ * Uses a static LRU cache for the modular exponentiation
+ * <p>
+ * Caller is responsible for keeping track of the colIndex and the the maxHitsPerSelector values
+ * <p>
+ * Emits {@code Tuple2<<colNum, colVal>>}
+ */
+ public static List<Tuple2<Long,BigInteger>> computeEncRow(List<BigInteger> dataPartitions, Query query, int rowIndex, int colIndex)
+ throws IOException
+ {
+ List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
+
+ // Pull the corresponding encrypted row query
+ BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+ // Initialize the column counter
+ long colCounter = colIndex;
+
+ logger.debug("dataPartitions.size() = {} rowIndex = {} colCounter = {}", dataPartitions, rowIndex, colCounter);
+
+ // Update the associated column values
+ for (int i = 0; i < dataPartitions.size(); ++i)
+ {
+ BigInteger part = dataPartitions.get(i);
+
+ BigInteger exp = null;
+ try
+ {
+ exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+ } catch (ExecutionException e)
+ {
+ e.printStackTrace();
+ break;
+ }
+
+ logger.debug("rowIndex = {} colCounter = {} part = {} part binary = {} exp = {} i = {} partition = {} = {}",
+ rowIndex, colCounter, part.toString(), part.toString(2), exp, i, dataPartitions.get(i), dataPartitions.get(i).toString(2));
+
+ returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+
+ ++colCounter;
+ }
+
+ return returnPairs;
+ }
+
+ public static List<Tuple2<Long,BigInteger>> computeEncRow(BigInteger part, Query query, int rowIndex, int colIndex) throws IOException
+ {
+ List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
+
+ // Pull the corresponding encrypted row query
+ BigInteger rowQuery = query.getQueryElement(rowIndex);
+
+ // Initialize the column counter
+ long colCounter = colIndex;
+
+ // Update the associated column values
+ BigInteger exp = null;
+ try
+ {
+ exp = expCache.get(new Tuple3<BigInteger,BigInteger,BigInteger>(rowQuery, part, query.getNSquared()));
+ } catch (ExecutionException e)
+ {
+ e.printStackTrace();
+ }
+
+ returnPairs.add(new Tuple2<Long,BigInteger>(colCounter, exp));
+
+ ++colCounter;
+
+ return returnPairs;
+ }
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
index 61169f2..e605d94 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/HashSelectorAndPartitionData.java
@@ -35,8 +35,8 @@
import scala.Tuple2;
/**
- * Given a MapWritable dataElement, this class gives the common functionality to extract the selector by queryType from each dataElement, perform a keyed hash
- * of the selector, extract the partitions of the dataElement, and outputs {@code <hash(selector), dataPartitions>}
+ * Given a MapWritable or JSON formatted dataElement, this class gives the common functionality to extract the selector by queryType from each dataElement,
+ * perform a keyed hash of the selector, extract the partitions of the dataElement, and outputs {@code <hash(selector), dataPartitions>}
*/
public class HashSelectorAndPartitionData
{
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index f34acf8..6014435 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -90,8 +90,9 @@
private BroadcastVars bVars = null;
private QueryInfo queryInfo = null;
- Query query = null;
- QuerySchema qSchema = null;
+
+ private Query query = null;
+ private QuerySchema qSchema = null;
private int numDataPartitions = 0;
private int numColMultPartitions = 0;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
new file mode 100644
index 0000000..08c9917
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncColMultBolt.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bolt class to perform encrypted column multiplication
+ * <p>
+ * Takes {@code <columnIndex, columnValue>} tuples as input and aggregates (multiplies) the columnValues for a given columnIndex as they are received.
+ * <p>
+ * EncRowCalcBolts send flush signals to the EncColMultBolts indicating that they have finished sending all tuples for a session. Whenever a flush signal is
+ * received from a EncRowCalcBolt, the num of received flush signals is tallied until each EncRowCalcBolt has emitted a flush signal.
+ * <p>
+ * Once a flush signal has been received from each EncRowCalcBolt, all {@code <columnIndex, aggregate colVal product>} tuples are sent to the OutputBolt and a session_end
+ * signal is sent back to each EncRowMultBolt.
+ * <p>
+ * The EncRowMultBolts buffer their output from the time that they send a flush signal to the EncColMultBolts until the time that they receive a session_end
+ * signal from all of the EncColMultBolts.
+ *
+ */
+public class EncColMultBolt extends BaseRichBolt
+{
+ private static final long serialVersionUID = 1L;
+
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncColMultBolt.class);
+
+ private OutputCollector outputCollector;
+
+ private BigInteger nSquared;
+ private long numFlushSignals;
+ private Long totalFlushSignals;
+
+ // This is the main object here. It holds column Id -> aggregated product
+ private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
+ private BigInteger colVal1;
+ private BigInteger colMult;
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
+ {
+ outputCollector = collector;
+ String nSquare = (String) map.get(StormConstants.N_SQUARED_KEY);
+ nSquared = new BigInteger(nSquare);
+ totalFlushSignals = (Long) map.get(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY);
+
+ logger.info("Initialized EncColMultBolt. ");
+ }
+
+ @Override
+ public void execute(Tuple tuple)
+ {
+ if (tuple.getSourceStreamId().equals(StormConstants.ENCROWCALCBOLT_FLUSH_SIG))
+ {
+ numFlushSignals += 1;
+ logger.debug("Received {} flush signals out of {}", numFlushSignals, totalFlushSignals);
+
+ // Need to receive notice from all EncRowCalcBolts in order to flush.
+ if (numFlushSignals == totalFlushSignals)
+ {
+ logger.debug("Received signal to flush in EncColMultBolt. Outputting {} results.", resultsMap.keySet().size());
+ for (Long key : resultsMap.keySet())
+ // key = column Id, value = aggregated product
+ outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(key, resultsMap.get(key)));
+ resultsMap.clear();
+
+ // Send signal to OutputBolt to write output and notify EncRowCalcBolt that results have been flushed.
+ outputCollector.emit(StormConstants.ENCCOLMULTBOLT_ID, new Values(new Long(-1), BigInteger.valueOf(0)));
+ outputCollector.emit(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Values(1));
+ numFlushSignals = 0;
+ }
+ }
+ else
+ {
+ // Data tuple received. Do column multiplication.
+
+ long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ERC_FIELD);
+ colVal1 = (BigInteger) tuple.getValueByField(StormConstants.ENCRYPTED_VALUE_FIELD);
+
+ logger.debug("Received tuple in ECM, multiplying {} to col {}", colVal1, colIndex);
+
+ if (resultsMap.containsKey(colIndex))
+ {
+ colMult = colVal1.multiply(resultsMap.get(colIndex));
+ resultsMap.put(colIndex, colMult.mod(nSquared));
+ }
+ else
+ {
+ resultsMap.put(colIndex, colVal1);
+ }
+ }
+ outputCollector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+ {
+ outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_ID,
+ new Fields(StormConstants.COLUMN_INDEX_ECM_FIELD, StormConstants.COLUMN_PRODUCT_FIELD));
+ outputFieldsDeclarer.declareStream(StormConstants.ENCCOLMULTBOLT_SESSION_END, new Fields("finished"));
+ }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
new file mode 100644
index 0000000..639a52b
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/EncRowCalcBolt.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.responder.wideskies.common.ComputeEncryptedRow;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Bolt class to perform the encrypted row calculation
+ * <p>
+ * Receives a {@code <hash(selector), dataPartitions>} tuple as input.
+ * <p>
+ * Encrypts the row data and emits a (column index, encrypted row-value) tuple for each encrypted block.
+ * <p>
+ * Every FLUSH_FREQUENCY seconds, it sends a signal to EncColMultBolt to flush its output and resets all counters. At that point, all outgoing (column index,
+ * encrypted row-value) tuples are buffered until a SESSION_END signal is received back from each EncColMultBolt.
+ */
+public class EncRowCalcBolt extends BaseRichBolt
+{
+ private static final long serialVersionUID = 1L;
+
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(EncRowCalcBolt.class);
+
+ private OutputCollector outputCollector;
+ private static Query query;
+ private static boolean querySet = false;
+
+ private Boolean limitHitsPerSelector;
+ private Long maxHitsPerSelector;
+ private Long totalEndSigs;
+ private int rowDivisions;
+ private Boolean saltColumns;
+ private Boolean splitPartitions;
+
+ private Random rand;
+
+ // These are the main data structures used here.
+ private Map<Integer,Integer> hitsByRow = new HashMap<Integer,Integer>();
+ private Map<Integer,Integer> colIndexByRow = new HashMap<Integer,Integer>();
+ private List<Tuple2<Long,BigInteger>> matrixElements = new ArrayList<>();
+ private List<BigInteger> dataArray = new ArrayList<>();
+
+ private int numEndSigs = 0;
+
+ // These buffered values are used in the case when a session has been ejected, but the SESSION_END signal has not been received
+ // yet from the next bolt.
+ private boolean buffering = false;
+ private List<Tuple2<Long,BigInteger>> bufferedValues = new ArrayList<>();
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector coll)
+ {
+ outputCollector = coll;
+ setQuery(map);
+ logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS));
+
+ maxHitsPerSelector = (Long) map.get(StormConstants.MAX_HITS_PER_SEL_KEY);
+ limitHitsPerSelector = (Boolean) map.get(StormConstants.LIMIT_HITS_PER_SEL_KEY);
+ totalEndSigs = (Long) map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY);
+ splitPartitions = (Boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY);
+ saltColumns = (Boolean) map.get(StormConstants.SALT_COLUMNS_KEY);
+ rowDivisions = ((Long) map.get(StormConstants.ROW_DIVISIONS_KEY)).intValue();
+
+ // If splitPartitions==true, the data is incoming partition by partition, rather than record by record.
+ // The numRecords below will increment every partition elt exceed the maxHitsPerSelector param far too
+ // soon unless the latter is modified.
+ if (splitPartitions)
+ maxHitsPerSelector *= query.getQueryInfo().getNumPartitionsPerDataElement();
+
+ rand = new Random();
+
+ logger.info("Initialized EncRowCalcBolt.");
+ }
+
+ @Override
+ public void execute(Tuple tuple)
+ {
+ if (tuple.getSourceStreamId().equals(StormConstants.DEFAULT))
+ {
+ matrixElements = processTupleFromPartitionDataBolt(tuple); // tuple: <hash,partitions>
+
+ if (buffering)
+ {
+ logger.debug("Buffering tuple.");
+ bufferedValues.addAll(matrixElements);
+ }
+ else
+ {
+ emitTuples(matrixElements);
+ }
+ }
+ else if (StormUtils.isTickTuple(tuple) && !buffering)
+ {
+ logger.debug("Sending flush signal to EncColMultBolt.");
+ outputCollector.emit(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Values(1));
+
+ colIndexByRow.clear();
+ hitsByRow.clear();
+
+ buffering = true;
+ }
+ else if (tuple.getSourceStreamId().equals(StormConstants.ENCCOLMULTBOLT_SESSION_END))
+ {
+ numEndSigs += 1;
+ logger.debug("SessionEnd signal {} of {} received", numEndSigs, totalEndSigs);
+
+ // Need to receive signal from all EncColMultBolt instances before stopping buffering.
+ if (numEndSigs == totalEndSigs)
+ {
+ logger.debug("Buffering completed, emitting {} tuples.", bufferedValues.size());
+ emitTuples(bufferedValues);
+ bufferedValues.clear();
+ buffering = false;
+
+ numEndSigs = 0;
+ }
+ }
+ outputCollector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+ {
+ outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
+ new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.ENCRYPTED_VALUE_FIELD, StormConstants.SALT));
+ outputFieldsDeclarer.declareStream(StormConstants.ENCROWCALCBOLT_FLUSH_SIG, new Fields(StormConstants.FLUSH));
+ }
+
+ /***
+ * Extracts (hash, data partitions) from tuple. Encrypts the data partitions. Returns all of the pairs of (col index, col value). Also advances the
+ * colIndexByRow and hitsByRow appropriately.
+ *
+ * @param tuple
+ * @return
+ */
+ private List<Tuple2<Long,BigInteger>> processTupleFromPartitionDataBolt(Tuple tuple)
+ {
+ matrixElements.clear();
+ int rowIndex = tuple.getIntegerByField(StormConstants.HASH_FIELD);
+
+ if (!colIndexByRow.containsKey(rowIndex))
+ {
+ colIndexByRow.put(rowIndex, 0);
+ hitsByRow.put(rowIndex, 0);
+ }
+
+ if (splitPartitions)
+ {
+ dataArray.add((BigInteger) tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD));
+ }
+ else
+ {
+ dataArray = (ArrayList<BigInteger>) tuple.getValueByField(StormConstants.PARTIONED_DATA_FIELD);
+ }
+ logger.debug("Retrieving {} elements in EncRowCalcBolt.", dataArray.size());
+
+ try
+ {
+ int colIndex = colIndexByRow.get(rowIndex);
+ int numRecords = hitsByRow.get(rowIndex);
+
+ if (limitHitsPerSelector && numRecords < maxHitsPerSelector)
+ {
+ logger.debug("computing matrix elements.");
+ matrixElements = ComputeEncryptedRow.computeEncRow(dataArray, query, rowIndex, colIndex);
+ colIndexByRow.put(rowIndex, colIndex + matrixElements.size());
+ hitsByRow.put(rowIndex, numRecords + 1);
+ }
+ else if (limitHitsPerSelector)
+ {
+ logger.info("maxHits: rowIndex = " + rowIndex + " elementCounter = " + numRecords);
+ }
+ } catch (IOException e)
+ {
+ logger.warn("Caught IOException while encrypting row. ", e);
+ }
+
+ dataArray.clear();
+ return matrixElements;
+ }
+
+ private void emitTuples(List<Tuple2<Long,BigInteger>> matrixElements)
+ {
+ // saltColumns distributes the column multiplication done in the next bolt EncColMultBolt to avoid hotspotting.
+ if (saltColumns)
+ {
+ for (Tuple2<Long,BigInteger> sTuple : matrixElements)
+ {
+ outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Values(sTuple._1(), sTuple._2(), rand.nextInt(rowDivisions)));
+ }
+ }
+ else
+ {
+ for (Tuple2<Long,BigInteger> sTuple : matrixElements)
+ {
+ outputCollector.emit(StormConstants.ENCROWCALCBOLT_DATASTREAM_ID, new Values(sTuple._1(), sTuple._2(), 0));
+ }
+ }
+ }
+
+ private synchronized static void setQuery(Map map)
+ {
+ if (!querySet)
+ {
+ query = StormUtils.prepareQuery(map);
+ querySet = true;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
new file mode 100644
index 0000000..68b02f3
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/OutputBolt.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Bolt to compute and output the final Response object for a query
+ * <p>
+ * Receives {@code <colIndex, colProduct>} tuples, computes the final column product for each colIndex, records the results in the final Response object, and
+ * outputs the final Response object for the query.
+ * <p>
+ * Flush signals are sent to the OuputBolt from the EncColMultBolts via a tuple of the form {@code <-1, 0>}. Once a flush signal has been received from each
+ * EncColMultBolt (or a timeout is reached), the final column product is computed and the final Response is formed and emitted.
+ * <p>
+ * Currently, the Responses are written to HDFS to location specified by the outputFile with the timestamp appended.
+ * <p>
+ * TODO: -- Enable other Response output locations
+ *
+ */
+public class OutputBolt extends BaseRichBolt
+{
+ private static final long serialVersionUID = 1L;
+
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(OutputBolt.class);
+
+ private OutputCollector outputCollector;
+ private QueryInfo queryInfo;
+ private Response response;
+ private String outputFile;
+ private boolean hdfs;
+ private String hdfsUri;
+ private Integer flushCounter = 0;
+ private List<Tuple> tuplesToAck = new ArrayList<>();
+ private Integer totalFlushSigs;
+
+ private LocalFileSystemStore localStore;
+ private HadoopFileSystemStore hadoopStore;
+
+ // This latch just serves as a hook for testing.
+ public static CountDownLatch latch = new CountDownLatch(4);
+
+ // This is the main object here. It holds column Id -> product
+ private Map<Long,BigInteger> resultsMap = new HashMap<Long,BigInteger>();
+
+ private BigInteger colVal;
+ private BigInteger colMult;
+
+ private BigInteger nSquared;
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector)
+ {
+ outputCollector = collector;
+
+ totalFlushSigs = ((Long) map.get(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY)).intValue();
+ outputFile = (String) map.get(StormConstants.OUTPUT_FILE_KEY);
+
+ hdfs = (boolean) map.get(StormConstants.USE_HDFS);
+
+ if (hdfs)
+ {
+ hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
+ try
+ {
+ FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
+ hadoopStore = new HadoopFileSystemStore(fs);
+ } catch (IOException e)
+ {
+ logger.error("Failed to initialize Hadoop file system for output.");
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ localStore = new LocalFileSystemStore();
+ }
+ nSquared = new BigInteger((String) map.get(StormConstants.N_SQUARED_KEY));
+ queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+ response = new Response(queryInfo);
+
+ logger.info("Intitialized OutputBolt.");
+ }
+
+ @Override
+ public void execute(Tuple tuple)
+ {
+ long colIndex = tuple.getLongByField(StormConstants.COLUMN_INDEX_ECM_FIELD);
+ colVal = (BigInteger) tuple.getValueByField(StormConstants.COLUMN_PRODUCT_FIELD);
+
+ // colIndex == -1 is just the signal sent by EncColMultBolt to notify that it flushed it's values.
+ // Could have created a new stream for such signals, but that seemed like overkill.
+ if (colIndex == -1)
+ {
+ flushCounter++;
+
+ logger.debug("Received " + flushCounter + " output flush signals out of " + totalFlushSigs);
+
+ // Wait till all EncColMultBolts have been flushed
+ if (flushCounter == totalFlushSigs)
+ {
+ logger.info("TimeToFlush reached - outputting response to " + outputFile + " with columns.size = " + resultsMap.keySet().size());
+ try
+ {
+ String timestamp = (new SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date())).toString();
+ for (long cv : resultsMap.keySet())
+ {
+ response.addElement((int) cv, resultsMap.get(cv));
+ }
+
+ if (hdfs)
+ {
+ hadoopStore.store(new Path(outputFile + "_" + timestamp), response);
+ }
+ else
+ { // In order to accommodate testing, this does not currently include timestamp.
+ // Should probably be fixed, but this will not likely be used outside of testing.
+ localStore.store(new File(outputFile), response);
+ for (long cv : resultsMap.keySet())
+ {
+ response.addElement((int) cv, resultsMap.get(cv));
+ logger.debug("column = " + cv + ", value = " + resultsMap.get(cv).toString());
+ }
+ }
+ } catch (IOException e)
+ {
+ logger.warn("Unable to write output file.");
+ }
+
+ // Reset
+ resultsMap.clear();
+ flushCounter = 0;
+ for (Tuple t : tuplesToAck)
+ outputCollector.ack(t);
+ // Used for integration test
+ latch.countDown();
+ }
+ }
+ else
+ {
+ // Process data values: add them to map. The column multiplication is only done in the case where saltColumns==true,
+ // in which case a small number of multiplications still need to be done per column.
+ if (resultsMap.containsKey(colIndex))
+ {
+ colMult = colVal.multiply(resultsMap.get(colIndex)).mod(nSquared);
+ resultsMap.put(colIndex, colMult);
+ }
+ else
+ {
+ resultsMap.put(colIndex, colVal);
+ }
+ logger.debug("column = " + colIndex + ", value = " + resultsMap.get(colIndex).toString());
+ }
+ outputCollector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+ {}
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
new file mode 100644
index 0000000..9d24620
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PartitionDataBolt.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pirk.responder.wideskies.storm;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.query.wideskies.QueryUtils;
+
+import org.apache.pirk.schema.data.DataSchema;
+import org.apache.pirk.schema.data.DataSchemaRegistry;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseBasicBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Bolt to extract the partitions of the data record and output {@code <hash(selector), dataPartitions>}
+ * <p>
+ * Currently receives a {@code <hash(selector), JSON data record>} as input.
+ * <p>
+ *
+ */
+public class PartitionDataBolt extends BaseBasicBolt
+{
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PartitionDataBolt.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private QueryInfo queryInfo;
+ private String queryType;
+ private QuerySchema qSchema = null;
+
+ private boolean embedSelector;
+
+ private boolean splitPartitions;
+
+ private JSONObject json;
+ private List<BigInteger> partitions;
+
+ @Override
+ public void prepare(Map map, TopologyContext context)
+ {
+ queryInfo = new QueryInfo((Map) map.get(StormConstants.QUERY_INFO_KEY));
+ queryType = queryInfo.getQueryType();
+ embedSelector = queryInfo.getEmbedSelector();
+ logger.info("partition databolt hdfs = " + map.get(StormConstants.USE_HDFS));
+ StormUtils.initializeSchemas(map, "partition");
+ try
+ {
+ if ((boolean) map.get(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY))
+ {
+ qSchema = queryInfo.getQuerySchema();
+ }
+ if (qSchema == null)
+ {
+ qSchema = QuerySchemaRegistry.get(queryType);
+ }
+ } catch (Exception e)
+ {
+ logger.error("Unable to initialize schemas in PartitionDataBolt. ", e);
+ }
+
+ json = new JSONObject();
+ splitPartitions = (boolean) map.get(StormConstants.SPLIT_PARTITIONS_KEY);
+
+ logger.info("Initialized ExtractAndPartitionDataBolt.");
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector outputCollector)
+ {
+ int hash = tuple.getIntegerByField(StormConstants.HASH_FIELD);
+ json = (JSONObject) tuple.getValueByField(StormConstants.JSON_DATA_FIELD);
+
+ try
+ {
+ partitions = QueryUtils.partitionDataElement(qSchema, json, embedSelector);
+
+ logger.debug("HashSelectorsAndPartitionDataBolt processing {} outputting results - {}", json.toString(), partitions.size());
+
+ // splitPartitions determines whether each partition piece is sent individually or the full Array is sent together.
+ // Since processing in the follow-on bolt (EncRowCalcBolt) is computationally expensive, current working theory is
+ // that splitting them up allows for better throughput. Though maybe with better knowledge/tuning of Storm internals
+ // and paramters (e.g. certain buffer sizes), it may make no difference.
+ if (splitPartitions)
+ {
+ for (BigInteger partition : partitions)
+ {
+ outputCollector.emit(new Values(hash, partition));
+ }
+ }
+ else
+ {
+ outputCollector.emit(new Values(hash, partitions));
+ }
+
+ } catch (Exception e)
+ {
+ logger.warn("Failed to partition data for record -- " + json + "\n", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
+ {
+ outputFieldsDeclarer.declare(new Fields(StormConstants.HASH_FIELD, StormConstants.PARTIONED_DATA_FIELD));
+ }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
new file mode 100644
index 0000000..76bb80c
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkHashScheme.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.query.wideskies.QueryUtils;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.utils.KeyedHash;
+
+import org.apache.storm.Config;
+import org.apache.storm.kafka.StringScheme;
+import org.apache.storm.spout.Scheme;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Scheme used by spout to retrieve and hash selector from JSON data on Kafka.
+ */
+public class PirkHashScheme extends StringScheme implements Scheme
+{
+
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PirkHashScheme.class);
+
+ private QueryInfo queryInfo;
+
+ transient private JSONParser parser;
+ transient private JSONObject json;
+ private boolean initialized = false;
+ private QuerySchema qSchema;
+ private Config conf;
+
+ public PirkHashScheme(Config conf)
+ {
+ this.conf = conf;
+ }
+
+ public List<Object> deserialize(ByteBuffer bytes)
+ {
+ if (!initialized)
+ {
+ parser = new JSONParser();
+ queryInfo = new QueryInfo((Map) conf.get(StormConstants.QUERY_INFO_KEY));
+
+ StormUtils.initializeSchemas(conf, "hashScheme");
+
+ if ((boolean) conf.get(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY))
+ {
+ qSchema = queryInfo.getQuerySchema();
+ }
+ if (qSchema == null)
+ {
+ qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType());
+ }
+
+ initialized = true;
+ }
+ String str = super.deserializeString(bytes);
+
+ try
+ {
+ json = (JSONObject) parser.parse(str);
+ } catch (ParseException e)
+ {
+ json = null;
+ logger.warn("ParseException parsing " + str, e);
+ }
+ String selector = QueryUtils.getSelectorByQueryTypeJSON(qSchema, json);
+ int hash = KeyedHash.hash(queryInfo.getHashKey(), queryInfo.getHashBitSize(), selector);
+
+ return new Values(hash, json);
+ }
+
+ public Fields getOutputFields()
+ {
+ return new Fields(StormConstants.HASH_FIELD, StormConstants.JSON_DATA_FIELD);
+ }
+
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
new file mode 100644
index 0000000..ddfca8b
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/PirkTopology.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.*;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Storm topology class for wideskies Pirk implementation
+ * <p>
+ *
+ */
+public class PirkTopology
+{
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(PirkTopology.class);
+
+ private static final String kafkaClientId = SystemConfiguration.getProperty("kafka.clientId", "KafkaSpout");
+ private static final String brokerZk = SystemConfiguration.getProperty("kafka.zk", "localhost:2181");
+ private static final String kafkaTopic = SystemConfiguration.getProperty("kafka.topic", "pirkTopic");
+ private static final Boolean forceFromStart = Boolean.parseBoolean(SystemConfiguration.getProperty("kafka.forceFromStart", "false"));
+
+ private static final Boolean useHdfs = Boolean.parseBoolean(SystemConfiguration.getProperty("hdfs.use", "true"));
+ private static final String hdfsUri = SystemConfiguration.getProperty("hdfs.uri", "localhost");
+
+ private static final String topologyName = SystemConfiguration.getProperty("storm.topoName", "PirkTopology");
+ private static final Integer numWorkers = Integer.parseInt(SystemConfiguration.getProperty("storm.workers", "1"));
+
+ private static final Integer spoutParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.spout.parallelism", "1"));
+ private static final Integer partitionDataBoltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.partitiondata.parallelism", "1"));
+ private static final Integer encrowcalcboltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.parallelism", "1"));
+ private static final Integer enccolmultboltParallelism = Integer.parseInt(SystemConfiguration.getProperty("storm.enccolmultbolt.parallelism", "1"));
+
+ private static final Boolean saltColumns = Boolean.parseBoolean(SystemConfiguration.getProperty("storm.saltColumns", "false"));
+ private static final Boolean splitPartitions = Boolean.parseBoolean(SystemConfiguration.getProperty("storm.splitPartitions", "false"));
+
+ private static final String queryFile = SystemConfiguration.getProperty("pir.queryInput");
+ private static final String outputPath = SystemConfiguration.getProperty("pir.outputFile");
+
+ public static void runPirkTopology() throws Exception
+ {
+ // Set up Kafka parameters
+ logger.info("Configuring Kafka.");
+ String zkRoot = "/" + kafkaTopic + "_pirk_storm";
+ BrokerHosts zkHosts = new ZkHosts(brokerZk);
+ SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, kafkaTopic, zkRoot, kafkaClientId);
+ kafkaConfig.ignoreZkOffsets = forceFromStart;
+
+ // Create conf
+ logger.info("Retrieving Query and generating Storm conf.");
+ Config conf = createStormConf();
+ Query query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile);
+ conf.put(StormConstants.N_SQUARED_KEY, query.getNSquared().toString());
+ conf.put(StormConstants.QUERY_INFO_KEY, query.getQueryInfo().toMap());
+
+ // Configure this for different types of input data on Kafka.
+ kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf));
+
+ // Create topology
+ StormTopology topology = getPirkTopology(kafkaConfig);
+
+ // Run topology
+ logger.info("Submitting Pirk topology to Storm...");
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+
+ } // main
+
+ /***
+ * Creates Pirk topology: KafkaSpout -> PartitionDataBolt -> EncRowCalcBolt -> EncColMultBolt -> OutputBolt Requires KafkaConfig to initialize KafkaSpout.
+ *
+ * @param kafkaConfig
+ * @return
+ */
+ public static StormTopology getPirkTopology(SpoutConfig kafkaConfig)
+ {
+ // Create spout and bolts
+ KafkaSpout spout = new KafkaSpout(kafkaConfig);
+ PartitionDataBolt partitionDataBolt = new PartitionDataBolt();
+ EncRowCalcBolt ercbolt = new EncRowCalcBolt();
+ EncColMultBolt ecmbolt = new EncColMultBolt();
+ OutputBolt outputBolt = new OutputBolt();
+
+ // Build Storm topology
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout(StormConstants.SPOUT_ID, spout, spoutParallelism);
+
+ builder.setBolt(StormConstants.PARTITION_DATA_BOLT_ID, partitionDataBolt, partitionDataBoltParallelism).fieldsGrouping(StormConstants.SPOUT_ID,
+ new Fields(StormConstants.HASH_FIELD));
+
+ // TODO: Decide whether to use Resource Aware Scheduler. (If not, get rid of b2 and b3).
+ BoltDeclarer b2 = builder.setBolt(StormConstants.ENCROWCALCBOLT_ID, ercbolt, encrowcalcboltParallelism)
+ .fieldsGrouping(StormConstants.PARTITION_DATA_BOLT_ID, new Fields(StormConstants.HASH_FIELD))
+ .allGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_SESSION_END)
+ .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, Integer.parseInt(SystemConfiguration.getProperty("storm.encrowcalcbolt.ticktuple")));
+
+ // b2.setMemoryLoad(5000);
+ // b2.setCPULoad(150.0);
+
+ BoltDeclarer b3 = builder.setBolt(StormConstants.ENCCOLMULTBOLT_ID, ecmbolt, enccolmultboltParallelism)
+ .fieldsGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_DATASTREAM_ID,
+ new Fields(StormConstants.COLUMN_INDEX_ERC_FIELD, StormConstants.SALT))
+ .allGrouping(StormConstants.ENCROWCALCBOLT_ID, StormConstants.ENCROWCALCBOLT_FLUSH_SIG);
+ // b3.setMemoryLoad(5000);
+ // b3.setCPULoad(500.0);
+
+ builder.setBolt(StormConstants.OUTPUTBOLT_ID, outputBolt, 1).globalGrouping(StormConstants.ENCCOLMULTBOLT_ID, StormConstants.ENCCOLMULTBOLT_ID);
+
+ return builder.createTopology();
+ }
+
+ public static Config createStormConf()
+ {
+
+ Boolean limitHitsPerSelector = Boolean.parseBoolean(SystemConfiguration.getProperty("pir.limitHitsPerSelector"));
+ Integer maxHitsPerSelector = Integer.parseInt(SystemConfiguration.getProperty("pir.maxHitsPerSelector"));
+ Integer rowDivisions = Integer.parseInt(SystemConfiguration.getProperty("storm.rowDivs", "1"));
+
+ Config conf = new Config();
+ conf.setNumAckers(Integer.parseInt(SystemConfiguration.getProperty("storm.numAckers", numWorkers.toString())));
+ conf.setMaxSpoutPending(Integer.parseInt(SystemConfiguration.getProperty("storm.maxSpoutPending", "300")));
+ conf.setNumWorkers(numWorkers);
+ conf.setDebug(false);
+ // conf.setNumEventLoggers(2);
+
+ conf.put(conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.receiveBufferSize", "1024")));
+ conf.put(conf.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.executor.sendBufferSize", "1024")));
+ conf.put(conf.TOPOLOGY_TRANSFER_BUFFER_SIZE, Integer.parseInt(SystemConfiguration.getProperty("storm.transferBufferSize", "32")));
+ conf.put(conf.WORKER_HEAP_MEMORY_MB, Integer.parseInt(SystemConfiguration.getProperty("storm.worker.heapMemory", "750")));
+ conf.put(conf.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, Double.parseDouble(SystemConfiguration.getProperty("storm.componentOnheapMem", "128")));
+
+ // Pirk parameters to send to bolts
+ conf.put(StormConstants.ALLOW_ADHOC_QSCHEMAS_KEY, SystemConfiguration.getProperty("pir.allowAdHocQuerySchemas", "false").equals("true"));
+ conf.put(StormConstants.QSCHEMA_KEY, SystemConfiguration.getProperty("query.schemas"));
+ conf.put(StormConstants.DSCHEMA_KEY, SystemConfiguration.getProperty("data.schemas"));
+ conf.put(StormConstants.HDFS_URI_KEY, hdfsUri);
+ conf.put(StormConstants.QUERY_FILE_KEY, queryFile);
+ conf.put(StormConstants.USE_HDFS, useHdfs);
+ conf.put(StormConstants.OUTPUT_FILE_KEY, outputPath);
+ conf.put(StormConstants.LIMIT_HITS_PER_SEL_KEY, limitHitsPerSelector);
+ conf.put(StormConstants.MAX_HITS_PER_SEL_KEY, maxHitsPerSelector);
+ conf.put(StormConstants.SPLIT_PARTITIONS_KEY, splitPartitions);
+ conf.put(StormConstants.SALT_COLUMNS_KEY, saltColumns);
+ conf.put(StormConstants.ROW_DIVISIONS_KEY, rowDivisions);
+ conf.put(StormConstants.ENCROWCALCBOLT_PARALLELISM_KEY, encrowcalcboltParallelism);
+ conf.put(StormConstants.ENCCOLMULTBOLT_PARALLELISM_KEY, enccolmultboltParallelism);
+
+ return conf;
+ }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
new file mode 100644
index 0000000..7f1e59d
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormConstants.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+public class StormConstants
+{
+ // Topology Components
+ public static final String SPOUT_ID = "kafkaspout";
+ public static final String PARTITION_DATA_BOLT_ID = "partitiondataBolt";
+ public static final String ENCROWCALCBOLT_ID = "encrowcalcbolt";
+ public static final String ENCCOLMULTBOLT_ID = "enccolmultbolt";
+ public static final String OUTPUTBOLT_ID = "outputbolt";
+
+ // Extra Streams
+ public static final String DEFAULT = "default";
+ public static final String ENCROWCALCBOLT_DATASTREAM_ID = "encrowcalcbolt_datastream_id";
+ public static final String ENCROWCALCBOLT_FLUSH_SIG = "encrowcalcbolt_flush";
+ public static final String ENCCOLMULTBOLT_SESSION_END = "enccolmultbolt_sess_end";
+
+ // Tuple Fields
+ // From HashBolt (and variants)
+ public static final String HASH_FIELD = "hash";
+ public static final String PARTIONED_DATA_FIELD = "parData";
+ public static final String JSON_DATA_FIELD = "data";
+ // From EncRowCalcBolt
+ public static final String COLUMN_INDEX_ERC_FIELD = "colIndexErc";
+ public static final String ENCRYPTED_VALUE_FIELD = "encRowValue";
+ // From EncColMultBolt
+ public static final String COLUMN_INDEX_ECM_FIELD = "colIndex";
+ public static final String COLUMN_PRODUCT_FIELD = "colProduct";
+
+ // Configuration Keys
+ public static final String USE_HDFS = "useHdfs";
+ public static final String HDFS_URI_KEY = "hdfsUri";
+ public static final String QUERY_FILE_KEY = "queryFile";
+ public static final String QUERY_INFO_KEY = "queryInfo";
+ public static final String ALLOW_ADHOC_QSCHEMAS_KEY = "allowAdHocQuerySchemas";
+ public static final String QSCHEMA_KEY = "qSchema";
+ public static final String DSCHEMA_KEY = "dschema";
+ public static final String OUTPUT_FILE_KEY = "output";
+ public static final String LIMIT_HITS_PER_SEL_KEY = "limitHitsPerSelector";
+ public static final String MAX_HITS_PER_SEL_KEY = "maxHitsPerSelector";
+ public static final String SALT_COLUMNS_KEY = "saltColumns";
+ public static final String ROW_DIVISIONS_KEY = "rowDivisions";
+ public static final String SPLIT_PARTITIONS_KEY = "splitPartitions";
+ public static final String N_SQUARED_KEY = "nSquared";
+ public static final String ENCROWCALCBOLT_PARALLELISM_KEY = "encrowcalcboltPar";
+ public static final String ENCCOLMULTBOLT_PARALLELISM_KEY = "enccolmultboltPar";
+
+ public static final String SALT = "salt";
+ public static final String FLUSH = "flush";
+
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
new file mode 100644
index 0000000..7fbca66
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/storm/StormUtils.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.storm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.schema.data.DataSchemaLoader;
+import org.apache.pirk.schema.query.QuerySchemaLoader;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Constants;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Utils class for the Storm implementation of Wideskies
+ */
+public class StormUtils
+{
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(StormUtils.class);
+
+ /**
+ * Method to read in serialized Query object from the given queryFile
+ *
+ * @param useHdfs
+ * @param hdfsUri
+ * @param queryFile
+ * @return
+ */
+ public static Query getQuery(boolean useHdfs, String hdfsUri, String queryFile)
+ {
+ Query query = null;
+
+ try
+ {
+ if (useHdfs)
+ {
+ FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
+ logger.info("reading query file from hdfs: " + queryFile);
+ query = (new HadoopFileSystemStore(fs)).recall(queryFile, Query.class);
+ }
+ else
+ {
+ logger.info("reading local query file from " + queryFile);
+ query = (new LocalFileSystemStore()).recall(queryFile, Query.class);
+ }
+ } catch (Exception e)
+ {
+ logger.error("Unable to initalize query info.", e);
+ throw new RuntimeException(e);
+ }
+ return query;
+ }
+
+ /**
+ * Method to read in and return a serialized Query object from the given file and initialize/load the query.schemas and data.schemas
+ *
+ * @param map
+ * @return
+ */
+ public static Query prepareQuery(Map map)
+ {
+ Query query = null;
+
+ boolean useHdfs = (boolean) map.get(StormConstants.USE_HDFS);
+ String hdfsUri = (String) map.get(StormConstants.HDFS_URI_KEY);
+ String queryFile = (String) map.get(StormConstants.QUERY_FILE_KEY);
+ try
+ {
+ query = StormUtils.getQuery(useHdfs, hdfsUri, queryFile);
+
+ } catch (Exception e)
+ {
+ logger.warn("Unable to initialize query info.", e);
+ }
+
+ return query;
+ }
+
+ /***
+ * Initialize data and query schema. Conf requires values for USE_HDFS, HDFS_URI_KEY, DSCHEMA_KEY, and QSCHEMA_KEY
+ *
+ * @param conf
+ */
+ public static void initializeSchemas(Map conf, String id)
+ {
+ SystemConfiguration.setProperty("data.schemas", (String) conf.get(StormConstants.DSCHEMA_KEY));
+ SystemConfiguration.setProperty("query.schemas", (String) conf.get(StormConstants.QSCHEMA_KEY));
+
+ try
+ {
+ boolean hdfs = (boolean) conf.get(StormConstants.USE_HDFS);
+ if (hdfs)
+ {
+ String hdfsUri = (String) conf.get(StormConstants.HDFS_URI_KEY);
+ FileSystem fs = FileSystem.get(URI.create(hdfsUri), new Configuration());
+ DataSchemaLoader.initialize(true, fs);
+ QuerySchemaLoader.initialize(true, fs);
+ }
+ else
+ {
+ DataSchemaLoader.initialize();
+ QuerySchemaLoader.initialize();
+ }
+ } catch (Exception e)
+ {
+ logger.error("Failed to initialize schema files.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static boolean isTickTuple(Tuple tuple)
+ {
+ return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+ }
+
+}
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index 28de96e..c651eaa 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -31,7 +31,9 @@
import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.pirk.schema.data.partitioner.DataPartitioner;
import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
import org.apache.pirk.utils.PIRException;
@@ -131,8 +133,8 @@
InputStream is = null;
if (hdfs)
{
- is = fs.open(new Path(schemaFile));
logger.info("hdfs: filePath = " + schemaFile);
+ is = fs.open(fs.makeQualified(new Path(schemaFile)));
}
else
{
diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
index 962e467..b0c9326 100644
--- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java
+++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
@@ -77,8 +77,6 @@
{
logger.info("Running testDNSHostnameQuery(): ");
- QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY);
-
int numExpectedResults = 6;
List<QueryResponseJSON> results;
if (isDistributed)
@@ -93,6 +91,14 @@
numExpectedResults = 7; // all 7 for non distributed case; if testFalsePositive==true, then 6
}
}
+ checkDNSHostnameQueryResults(results, isDistributed, numExpectedResults, testFalsePositive, dataElements);
+ logger.info("Completed testDNSHostnameQuery(): ");
+ }
+
+ public static void checkDNSHostnameQueryResults(List<QueryResponseJSON> results, boolean isDistributed, int numExpectedResults,
+ boolean testFalsePositive, List<JSONObject> dataElements)
+ {
+ QuerySchema qSchema = QuerySchemaRegistry.get(Inputs.DNS_HOSTNAME_QUERY);
logger.info("results:");
printResultList(results);
@@ -188,7 +194,6 @@
}
}
}
- logger.info("Completed testDNSHostnameQuery(): ");
}
public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
index 8f82f1c..7501aaa 100644
--- a/src/main/resources/log4j2.xml
+++ b/src/main/resources/log4j2.xml
@@ -46,4 +46,4 @@
</Root>
</Loggers>
-</Configuration>
\ No newline at end of file
+</Configuration>
diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties
index 963fa34..a88c846 100755
--- a/src/main/resources/pirk.properties
+++ b/src/main/resources/pirk.properties
@@ -228,9 +228,3 @@
#Parallelism for expLookupTable creation in hdfs
pir.expCreationSplits = 600
-
-
-
-
-
-
diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties
index 3ae92c7..ac6cb35 100644
--- a/src/main/resources/responder.properties
+++ b/src/main/resources/responder.properties
@@ -154,4 +154,47 @@
#default is false
#spark.streaming.stopGracefullyOnShutdown=
-
\ No newline at end of file
+ ##Properties for Kafka
+ #kafka.topic = topicName
+ #kafka.clientId = pirk_spout
+
+ # Kafka Zookeepers
+ #kafka.zk = localhost:2181
+ # Read from beginning of Kafka topic on startup
+ #kafka.forceFromStart = false
+
+
+ ##Properties for Storm
+ #storm.topoName = pir
+ #storm.workers = 1
+ #storm.numAckers = 1
+ #storm.maxSpoutPending=10
+ #storm.worker.heapMemory=6000
+ #storm.componentOnheapMem= 600.0
+
+ # This should be set to the number of Kafka partitions
+ #storm.spout.parallelism = 1
+
+ #storm.hashbolt.parallelism = 1
+ #storm.encrowcalcbolt.parallelism = 1
+ # This bolt is most computationally expensive and should have the highest value
+ #storm.enccolmultbolt.parallelism = 2
+
+ # These may be useful for tuning
+ #storm.executor.receiveBufferSize = 1024
+ #storm.executor.sendBufferSize = 1024
+ #storm.transferBufferSize = 8
+
+ # Frequency with which PIR matrix elements are flushed out
+ #storm.encrowcalcbolt.ticktuple = 60
+
+ # Design configurations:
+ # Hashbolt emits individual tuples for each data partition when splitPartitions =true
+ # emits the batch of data partitions for a record in a single tuple when =false
+ #storm.splitPartitions = true
+ # A task running EncColMultBolt will only be responsible for multiplying a subset of the row
+ # for any individual column when saltColumns = true
+ # All multiplication for a single column is done on a single EncColMultBolt instance when = false
+ #storm.saltColumns = true
+ # Only makes sense to tune if saltColumns=true
+ #storm.rowDivs = 1
diff --git a/src/test/java/org/apache/pirk/general/PaillierTest.java b/src/test/java/org/apache/pirk/general/PaillierTest.java
index 14347fa..1cc35d1 100644
--- a/src/test/java/org/apache/pirk/general/PaillierTest.java
+++ b/src/test/java/org/apache/pirk/general/PaillierTest.java
@@ -257,7 +257,7 @@
{
Random r = new Random();
int lowBitLength = 3073; // inclusive
- int highBitLength = 7001; // exclusive
+ int highBitLength = 4000; // exclusive
int loopVal = 1; // int loopVal = 1000; //change this and re-test for high loop testing
for (int i = 0; i < loopVal; ++i)
@@ -267,7 +267,7 @@
basicTestPaillierWithKeyGeneration(bitLength, certainty, ensureBitSet);
basicTestPaillierWithKeyGeneration(3072, certainty, ensureBitSet);
- // Test with random bit length between 3073 and 7000
+ // Test with random bit length between 3073 and 4000
int randomLargeBitLength = r.nextInt(highBitLength - lowBitLength) + lowBitLength;
basicTestPaillierWithKeyGeneration(randomLargeBitLength, certainty, ensureBitSet);
}
diff --git a/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
new file mode 100644
index 0000000..906c337
--- /dev/null
+++ b/src/test/java/org/apache/pirk/storm/KafkaStormIntegrationTest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.storm;
+
+import kafka.admin.AdminUtils;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pirk.encryption.Paillier;
+import org.apache.pirk.querier.wideskies.Querier;
+import org.apache.pirk.querier.wideskies.QuerierConst;
+import org.apache.pirk.querier.wideskies.decrypt.DecryptResponse;
+import org.apache.pirk.querier.wideskies.encrypt.EncryptQuery;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.storm.*;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.schema.query.filter.StopListFilter;
+import org.apache.pirk.schema.response.QueryResponseJSON;
+import org.apache.pirk.serialization.LocalFileSystemStore;
+import org.apache.pirk.test.utils.BaseTests;
+import org.apache.pirk.test.utils.Inputs;
+import org.apache.pirk.test.utils.TestUtils;
+import org.apache.pirk.utils.QueryResultsWriter;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.storm.Config;
+import org.apache.storm.ILocalCluster;
+import org.apache.storm.Testing;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.SpoutConfig;
+import org.apache.storm.kafka.ZkHosts;
+import org.apache.storm.spout.SchemeAsMultiScheme;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.testing.MkClusterParam;
+import org.apache.storm.testing.TestJob;
+import org.json.simple.JSONObject;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Properties;
+import java.util.HashMap;
+import java.util.Arrays;
+import java.util.ArrayList;
+
+@Category(IntegrationTest.class)
+public class KafkaStormIntegrationTest
+{
+ private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaStormIntegrationTest.class);
+
+ private static final LocalFileSystemStore localStore = new LocalFileSystemStore();
+
+ private static TestingServer zookeeperLocalCluster;
+ private static KafkaServer kafkaLocalBroker;
+ private static ZkClient zkClient;
+
+ private static final String topic = "pirk_test_topic";
+ private static final String kafkaTmpDir = "/tmp/kafka";
+
+ private static File fileQuery;
+ private static File fileQuerier;
+
+ private QueryInfo queryInfo;
+ private BigInteger nSquared;
+
+ private static int testCountDown = 4;
+
+ @Test
+ public void testKafkaStormIntegration() throws Exception
+ {
+ SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true");
+ SystemConfiguration.getProperty("pir.maxHitsPerSelector", "10");
+ SystemConfiguration.setProperty("storm.spout.parallelism", "1");
+ SystemConfiguration.setProperty("storm.hashbolt.parallelism", "1");
+ SystemConfiguration.setProperty("storm.encrowcalcbolt.parallelism", "2");
+ SystemConfiguration.setProperty("storm.enccolmultbolt.parallelism", "2");
+ SystemConfiguration.setProperty("storm.encrowcalcbolt.ticktuple", "8");
+ SystemConfiguration.setProperty("storm.rowDivs", "2");
+ SystemConfiguration.setProperty("hdfs.use", "false");
+
+ startZookeeper();
+ startKafka();
+
+ SystemConfiguration.setProperty("kafka.topic", topic);
+ SystemConfiguration.setProperty("storm.topoName", "pirTest");
+
+ // Create encrypted file
+ SystemConfiguration.setProperty("pir.stopListFile", "none");
+ Inputs.createSchemaFiles(StopListFilter.class.getName());
+
+ // Perform encryption. Set queryInfo, nSquared, fileQuery, and fileQuerier
+ performEncryption();
+ SystemConfiguration.setProperty("pir.queryInput", fileQuery.getAbsolutePath());
+
+ KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig());
+ loadTestData(producer);
+
+
+ logger.info("Test (splitPartitions,saltColumns) = (true,true)");
+ SystemConfiguration.setProperty("storm.splitPartitions", "true");
+ SystemConfiguration.setProperty("storm.saltColumns", "true");
+ runTest();
+
+ logger.info("Test (splitPartitions,saltColumns) = (true,false)");
+ SystemConfiguration.setProperty("storm.splitPartitions", "true");
+ SystemConfiguration.setProperty("storm.saltColumns", "false");
+ runTest();
+
+ logger.info("Test (splitPartitions,saltColumns) = (false,true)");
+ SystemConfiguration.setProperty("storm.splitPartitions", "false");
+ SystemConfiguration.setProperty("storm.saltColumns", "true");
+ runTest();
+
+ logger.info("Test (splitPartitions,saltColumns) = (false,false)");
+ SystemConfiguration.setProperty("storm.splitPartitions", "false");
+ SystemConfiguration.setProperty("storm.saltColumns", "false");
+ runTest();
+ }
+
+ private void runTest() throws Exception
+ {
+ File responderFile = File.createTempFile("responderFile", ".txt");
+ logger.info("Starting topology.");
+ runTopology(responderFile);
+
+ // decrypt results
+ logger.info("Decrypting results. " + responderFile.length());
+ File fileFinalResults = performDecryption(responderFile);
+
+ // check results
+ List<QueryResponseJSON> results = TestUtils.readResultsFile(fileFinalResults);
+ BaseTests.checkDNSHostnameQueryResults(results, false, 7, false, Inputs.createJSONDataElements());
+
+ responderFile.deleteOnExit();
+ fileFinalResults.deleteOnExit();
+ }
+
+ private void runTopology(File responderFile) throws Exception
+ {
+ MkClusterParam mkClusterParam = new MkClusterParam();
+ // The test sometimes fails because of timing issues when more than 1 supervisor set.
+ mkClusterParam.setSupervisors(1);
+
+ // Maybe using "withSimulatedTimeLocalCluster" would be better to avoid worrying about timing.
+ Config conf = PirkTopology.createStormConf();
+ conf.put(StormConstants.OUTPUT_FILE_KEY, responderFile.getAbsolutePath());
+ conf.put(StormConstants.N_SQUARED_KEY, nSquared.toString());
+ conf.put(StormConstants.QUERY_INFO_KEY, queryInfo.toMap());
+ // conf.setDebug(true);
+ mkClusterParam.setDaemonConf(conf);
+
+ TestJob testJob = createPirkTestJob(conf);
+ Testing.withLocalCluster(mkClusterParam, testJob);
+ // Testing.withSimulatedTimeLocalCluster(mkClusterParam, testJob);
+ }
+
+ private TestJob createPirkTestJob(final Config config)
+ {
+ final SpoutConfig kafkaConfig = setUpTestKafkaSpout(config);
+ return new TestJob()
+ {
+ StormTopology topology = PirkTopology.getPirkTopology(kafkaConfig);
+
+ @Override
+ public void run(ILocalCluster iLocalCluster) throws Exception
+ {
+ iLocalCluster.submitTopology("pirk_integration_test", config, topology);
+ logger.info("Pausing for setup.");
+ //Thread.sleep(4000);
+ //KafkaProducer producer = new KafkaProducer<String,String>(createKafkaProducerConfig());
+ //loadTestData(producer);
+ //Thread.sleep(10000);
+ while(OutputBolt.latch.getCount() == testCountDown){
+ Thread.sleep(1000);
+ }
+ testCountDown -=1;
+
+ logger.info("Finished...");
+ }
+ };
+ }
+
+ private SpoutConfig setUpTestKafkaSpout(Config conf)
+ {
+ ZkHosts zkHost = new ZkHosts(zookeeperLocalCluster.getConnectString());
+
+ SpoutConfig kafkaConfig = new SpoutConfig(zkHost, topic, "/pirk_test_root", "pirk_integr_test_spout");
+ kafkaConfig.scheme = new SchemeAsMultiScheme(new PirkHashScheme(conf));
+ logger.info("KafkaConfig initialized...");
+
+ return kafkaConfig;
+ }
+
+ private void startZookeeper() throws Exception
+ {
+ logger.info("Starting zookeeper.");
+ zookeeperLocalCluster = new TestingServer();
+ zookeeperLocalCluster.start();
+ logger.info("Zookeeper initialized.");
+
+ }
+
+ private void startKafka() throws Exception
+ {
+ FileUtils.deleteDirectory(new File(kafkaTmpDir));
+
+ Properties props = new Properties();
+ props.setProperty("zookeeper.session.timeout.ms", "100000");
+ props.put("advertised.host.name", "localhost");
+ props.put("port", 11111);
+ // props.put("broker.id", "0");
+ props.put("log.dir", kafkaTmpDir);
+ props.put("enable.zookeeper", "true");
+ props.put("zookeeper.connect", zookeeperLocalCluster.getConnectString());
+ KafkaConfig kafkaConfig = KafkaConfig.fromProps(props);
+ kafkaLocalBroker = new KafkaServer(kafkaConfig, new SystemTime(), scala.Option.apply("kafkaThread"));
+ kafkaLocalBroker.startup();
+
+ zkClient = new ZkClient(zookeeperLocalCluster.getConnectString(), 60000, 60000, ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperLocalCluster.getConnectString()), false);
+ //ZkUtils zkUtils = ZkUtils.apply(zookeeperLocalCluster.getConnectString(), 60000, 60000, false);
+ AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception
+ {
+ zkClient.close();
+ kafkaLocalBroker.shutdown();
+ zookeeperLocalCluster.stop();
+
+ FileUtils.deleteDirectory(new File(kafkaTmpDir));
+
+ fileQuery.delete();
+ fileQuerier.delete();
+
+ }
+
+ private HashMap<String,Object> createKafkaProducerConfig()
+ {
+ String kafkaHostName = "localhost";
+ Integer kafkaPorts = 11111;
+ HashMap<String,Object> config = new HashMap<String,Object>();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostName + ":" + kafkaPorts);
+ config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ return config;
+ }
+
+ private void loadTestData(KafkaProducer producer)
+ {
+ for (JSONObject dataRecord : Inputs.createJSONDataElements())
+ {
+ logger.info("Sending record to Kafka " + dataRecord.toString());
+ producer.send(new ProducerRecord<String,String>(topic, dataRecord.toString()));
+ }
+ }
+
+ private void performEncryption() throws Exception
+ {
+ // ArrayList<String> selectors = BaseTests.selectorsDomain;
+ List<String> selectors = new ArrayList<>(Arrays.asList("s.t.u.net", "d.e.com", "r.r.r.r", "a.b.c.com", "something.else", "x.y.net"));
+ String queryType = Inputs.DNS_HOSTNAME_QUERY;
+
+ Paillier paillier = new Paillier(BaseTests.paillierBitSize, BaseTests.certainty);
+
+ nSquared = paillier.getNSquared();
+
+ queryInfo = new QueryInfo(BaseTests.queryIdentifier, selectors.size(), BaseTests.hashBitSize, BaseTests.hashKey, BaseTests.dataPartitionBitSize, queryType,
+ false, true, false);
+
+ // Perform the encryption
+ logger.info("Performing encryption of the selectors - forming encrypted query vectors:");
+ EncryptQuery encryptQuery = new EncryptQuery(queryInfo, selectors, paillier);
+ Querier querier = encryptQuery.encrypt(1);
+ logger.info("Completed encryption of the selectors - completed formation of the encrypted query vectors:");
+
+ // Write out files.
+ fileQuerier = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERIER_FILETAG, ".txt");
+ fileQuery = File.createTempFile("pir_integrationTest-" + QuerierConst.QUERY_FILETAG, ".txt");
+
+ localStore.store(fileQuerier.getAbsolutePath(), querier);
+ localStore.store(fileQuery, querier.getQuery());
+ }
+
+ private File performDecryption(File responseFile) throws Exception
+ {
+ File finalResults = File.createTempFile("finalFileResults", ".txt");
+ String querierFilePath = fileQuerier.getAbsolutePath();
+ String responseFilePath = responseFile.getAbsolutePath();
+ String outputFile = finalResults.getAbsolutePath();
+ int numThreads = 1;
+
+ Response response = localStore.recall(responseFilePath, Response.class);
+ Querier querier = localStore.recall(querierFilePath, Querier.class);
+
+ // Perform decryption and output the result file
+ DecryptResponse decryptResponse = new DecryptResponse(response, querier);
+ decryptResponse.decrypt(numThreads);
+ QueryResultsWriter.writeResultFile(outputFile, decryptResponse.decrypt(numThreads));
+ return finalResults;
+ }
+
+}
diff --git a/src/test/java/org/apache/pirk/storm/SystemTime.java b/src/test/java/org/apache/pirk/storm/SystemTime.java
new file mode 100644
index 0000000..e8dcba2
--- /dev/null
+++ b/src/test/java/org/apache/pirk/storm/SystemTime.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.storm;
+
+import kafka.utils.Time;
+
+public class SystemTime implements Time
+{
+
+ @Override
+ public long milliseconds()
+ {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public long nanoseconds()
+ {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void sleep(long arg0)
+ {
+ try
+ {
+ Thread.sleep(arg0);
+ } catch (InterruptedException e)
+ {
+
+ }
+ }
+
+}