[PIRK-21] - Initial Spark Streaming Responder Implementation -- closes apache/incubator-pirk#76
diff --git a/pom.xml b/pom.xml
index 09c1faa..b46325e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,9 +87,9 @@
<hadoop.version>2.7.2</hadoop.version>
<apache-commons.version>3.3</apache-commons.version>
<elasticsearch.version>2.3.3</elasticsearch.version>
+ <spark-streaming.version>2.0.0</spark-streaming.version>
<pirk.forkCount>1C</pirk.forkCount>
<pirk.reuseForks>true</pirk.reuseForks>
- <jackson.version>2.8.1</jackson.version>
</properties>
<dependencies>
@@ -217,6 +217,12 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.10</artifactId>
+ <version>${spark-streaming.version}</version>
+ </dependency>
+
<!-- Square's JNA GMP module -->
<dependency>
<groupId>com.squareup.jnagmp</groupId>
@@ -258,13 +264,6 @@
<version>2.6.2</version>
</dependency>
- <!-- Jackson dependency -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>${jackson.version}</version>
- </dependency>
-
</dependencies>
<build>
diff --git a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
index d91fa2d..4f26a71 100644
--- a/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
+++ b/src/main/java/org/apache/pirk/querier/wideskies/QuerierDriver.java
@@ -113,7 +113,7 @@
{
queryType = SystemConfiguration.getProperty(QuerierProps.QUERYTYPE);
hashBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.HASHBITSIZE));
- hashKey = SystemConfiguration.getProperty(QuerierProps.HASHBITSIZE);
+ hashKey = SystemConfiguration.getProperty(QuerierProps.HASHKEY);
dataPartitionBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.DATAPARTITIONSIZE));
paillierBitSize = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.PAILLIERBITSIZE));
certainty = Integer.parseInt(SystemConfiguration.getProperty(QuerierProps.CERTAINTY));
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
index ff43be6..0c031ec 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderCLI.java
@@ -377,6 +377,46 @@
optionAllowEmbeddedQS.setType(String.class);
options.addOption(optionAllowEmbeddedQS);
+ // batchSeconds - spark streaming
+ Option optionBatchSeconds = new Option("batchSeconds", ResponderProps.BATCHSECONDS, true,
+ "optional -- Number of seconds per batch in Spark Streaming; defaults to 30");
+ optionBatchSeconds.setRequired(false);
+ optionBatchSeconds.setArgName(ResponderProps.BATCHSECONDS);
+ optionBatchSeconds.setType(String.class);
+ options.addOption(optionBatchSeconds);
+
+ // windowLength - spark streaming
+ Option optionWindowLength = new Option("windowLength", ResponderProps.WINDOWLENGTH, true,
+ "optional -- Number of seconds per window in Spark Streaming; defaults to 60");
+ optionWindowLength.setRequired(false);
+ optionWindowLength.setArgName(ResponderProps.WINDOWLENGTH);
+ optionWindowLength.setType(String.class);
+ options.addOption(optionWindowLength);
+
+ // maxBatches - spark streaming
+ Option optionMaxBatches = new Option("maxBatches", ResponderProps.MAXBATCHES, true,
+ "optional -- Max batches to process in Spark Streaming; defaults to -1 - unlimited");
+ optionMaxBatches.setRequired(false);
+ optionMaxBatches.setArgName(ResponderProps.MAXBATCHES);
+ optionMaxBatches.setType(String.class);
+ options.addOption(optionMaxBatches);
+
+ // stopGracefully - spark streaming
+ Option optionStopGracefully = new Option("stopGracefully", ResponderProps.STOPGRACEFULLY, true,
+ "optional -- Whether or not to stop gracefully in Spark Streaming; defaults to false");
+ optionStopGracefully.setRequired(false);
+ optionStopGracefully.setArgName(ResponderProps.STOPGRACEFULLY);
+ optionStopGracefully.setType(String.class);
+ options.addOption(optionStopGracefully);
+
+ // useQueueStream - spark streaming
+ Option optionUseQueueStream = new Option("queueStream", ResponderProps.USEQUEUESTREAM, true,
+ "optional -- Whether or not to use a queue stream in Spark Streaming; defaults to false");
+ optionUseQueueStream.setRequired(false);
+ optionUseQueueStream.setArgName(ResponderProps.USEQUEUESTREAM);
+ optionUseQueueStream.setType(String.class);
+ options.addOption(optionUseQueueStream);
+
return options;
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
index da24ae4..6b32418 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderDriver.java
@@ -18,12 +18,15 @@
*/
package org.apache.pirk.responder.wideskies;
+import java.security.Permission;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.ToolRunner;
import org.apache.pirk.query.wideskies.Query;
import org.apache.pirk.responder.wideskies.mapreduce.ComputeResponseTool;
import org.apache.pirk.responder.wideskies.spark.ComputeResponse;
+import org.apache.pirk.responder.wideskies.spark.streaming.ComputeStreamingResponse;
import org.apache.pirk.responder.wideskies.standalone.Responder;
import org.apache.pirk.serialization.LocalFileSystemStore;
import org.apache.pirk.utils.SystemConfiguration;
@@ -50,6 +53,9 @@
{
ResponderCLI responderCLI = new ResponderCLI(args);
+ // For handling System.exit calls from Spark Streaming
+ System.setSecurityManager(new SystemExitManager());
+
if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("mapreduce"))
{
logger.info("Launching MapReduce ResponderTool:");
@@ -65,6 +71,25 @@
ComputeResponse computeResponse = new ComputeResponse(fs);
computeResponse.performQuery();
}
+ else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("sparkstreaming"))
+ {
+ logger.info("Launching Spark ComputeStreamingResponse:");
+
+ FileSystem fs = FileSystem.get(new Configuration());
+ ComputeStreamingResponse computeSR = new ComputeStreamingResponse(fs);
+ try
+ {
+ computeSR.performQuery();
+ } catch (SystemExitException e)
+ {
+ // If System.exit(0) is not caught from Spark Streaming,
+ // the application will complete with a 'failed' status
+ logger.info("Exited with System.exit(0) from Spark Streaming");
+ }
+
+ // Teardown the context
+ computeSR.teardown();
+ }
else if (SystemConfiguration.getProperty(ResponderProps.PLATFORM).equals("standalone"))
{
logger.info("Launching Standalone Responder:");
@@ -76,4 +101,30 @@
pirResponder.computeStandaloneResponse();
}
}
+
+ // Exception and Security Manager classes used to catch System.exit from Spark Streaming
+ private static class SystemExitException extends SecurityException
+ {}
+
+ private static class SystemExitManager extends SecurityManager
+ {
+ @Override
+ public void checkPermission(Permission perm)
+ {}
+
+ @Override
+ public void checkExit(int status)
+ {
+ super.checkExit(status);
+ if (status == 0) // If we exited cleanly, throw SystemExitException
+ {
+ throw new SystemExitException();
+ }
+ else
+ {
+ throw new SecurityException();
+ }
+
+ }
+ }
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index a9f8fae..4fee85a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;
+import org.apache.commons.cli.Option;
import org.apache.pirk.inputformat.hadoop.InputFormatConst;
import org.apache.pirk.schema.data.DataSchemaLoader;
import org.apache.pirk.schema.query.QuerySchemaLoader;
@@ -59,19 +60,26 @@
public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions";
public static final String USEMODEXPJOIN = "pir.useModExpJoin";
public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey";
- static final String NUMREDUCETASKS = "pir.numReduceTasks";
- static final String MAPMEMORY = "mapreduce.map.memory.mb";
- static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
- static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
- static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
- static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
- static final String NUMDATAPARTITIONS = "pir.numDataPartitions";
- static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
+ public static final String NUMREDUCETASKS = "pir.numReduceTasks";
+ public static final String MAPMEMORY = "mapreduce.map.memory.mb";
+ public static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
+ public static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
+ public static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
+ public static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
+ public static final String NUMDATAPARTITIONS = "pir.numDataPartitions";
+ public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
+
+ // For Spark Streaming - optional
+ public static final String BATCHSECONDS = "pir.sparkstreaming.batchSeconds";
+ public static final String WINDOWLENGTH = "pir.sparkstreaming.windowLength";
+ public static final String USEQUEUESTREAM = "pir.sparkstreaming.useQueueStream";
+ public static final String MAXBATCHES = "pir.sparkstreaming.maxBatches";
+ public static final String STOPGRACEFULLY = "spark.streaming.stopGracefullyOnShutdown";
static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, ESNODES, ESPORT,
OUTPUTFILE, BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
REDUCEJAVAOPTS, QUERYSCHEMAS, DATASCHEMAS, NUMEXPLOOKUPPARTS, USEHDFSLOOKUPTABLE, NUMDATAPARTITIONS, NUMCOLMULTPARTITIONS, USEMODEXPJOIN,
- COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS);
+ COLMULTREDUCEBYKEY, ALLOWEMBEDDEDQUERYSCHEMAS, BATCHSECONDS, WINDOWLENGTH, USEQUEUESTREAM, MAXBATCHES, STOPGRACEFULLY);
/**
* Validates the responder properties
@@ -90,7 +98,7 @@
}
String platform = SystemConfiguration.getProperty(PLATFORM).toLowerCase();
- if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("standalone"))
+ if (!platform.equals("mapreduce") && !platform.equals("spark") && !platform.equals("sparkstreaming") && !platform.equals("standalone"))
{
logger.info("Unsupported platform: " + platform);
valid = false;
@@ -176,7 +184,7 @@
valid = false;
}
- // Parse optional properties with defaults
+ // Parse optional properties
if (SystemConfiguration.hasProperty(QUERYSCHEMAS))
{
@@ -188,6 +196,8 @@
SystemConfiguration.appendProperty("data.schemas", SystemConfiguration.getProperty(DATASCHEMAS));
}
+ // Parse optional properties with defaults
+
if (!SystemConfiguration.hasProperty(USEHDFSLOOKUPTABLE))
{
SystemConfiguration.setProperty(USEHDFSLOOKUPTABLE, "false");
@@ -223,6 +233,31 @@
SystemConfiguration.setProperty(USELOCALCACHE, "true");
}
+ if (!SystemConfiguration.hasProperty(BATCHSECONDS))
+ {
+ SystemConfiguration.setProperty(BATCHSECONDS, "30");
+ }
+
+ if (!SystemConfiguration.hasProperty(WINDOWLENGTH))
+ {
+ SystemConfiguration.setProperty(WINDOWLENGTH, "30");
+ }
+
+ if (!SystemConfiguration.hasProperty(USEQUEUESTREAM))
+ {
+ SystemConfiguration.setProperty(USEQUEUESTREAM, "false");
+ }
+
+ if (!SystemConfiguration.hasProperty(MAXBATCHES))
+ {
+ SystemConfiguration.setProperty(MAXBATCHES, "-1");
+ }
+
+ if (!SystemConfiguration.hasProperty(STOPGRACEFULLY))
+ {
+ SystemConfiguration.setProperty(STOPGRACEFULLY, "false");
+ }
+
// Load the new local query and data schemas
if (valid)
{
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
index 6ef1bdc..0745bea 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/common/ComputeEncryptedRow.java
@@ -184,7 +184,7 @@
{
if (elementCounter >= maxHitsPerSelector)
{
- logger.info("maxHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
+ logger.debug("maxHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
break;
}
}
@@ -222,7 +222,7 @@
++elementCounter;
}
- logger.info("totalHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
+ logger.debug("totalHits: rowIndex = " + rowIndex + " elementCounter = " + elementCounter);
return returnPairs;
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
index 11473a0..fb5fb91 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
@@ -40,6 +40,7 @@
private Accumulator<Integer> numRecordsAfterFilter = null;
private Accumulator<Integer> numHashes = null;
private Accumulator<Integer> numColumns = null;
+ private Accumulator<Integer> numBatches = null;
public Accumulators(JavaSparkContext sc)
{
@@ -48,6 +49,7 @@
numRecordsAfterFilter = sc.accumulator(0);
numHashes = sc.accumulator(0);
numColumns = sc.accumulator(0);
+ numBatches = sc.accumulator(0);
}
public Integer numRecordsReceivedGetValue()
@@ -100,6 +102,16 @@
numColumns.add(val);
}
+ public Integer numBatchesGetValue()
+ {
+ return numBatches.value();
+ }
+
+ public void incNumBatches(int val)
+ {
+ numBatches.add(val);
+ }
+
public void resetAll()
{
numRecordsReceived.setValue(0);
@@ -107,11 +119,12 @@
numRecordsAfterFilter.setValue(0);
numHashes.setValue(0);
numColumns.setValue(0);
+ numBatches.setValue(0);
}
public void printAll()
{
logger.info("numRecordsReceived = " + numRecordsReceived.value() + " \n numRecordsFiltered = " + numRecordsFiltered + " \n numRecordsAfterFilter = "
- + numRecordsAfterFilter + " \n numHashes = " + numHashes + " \n numColumns = " + numColumns);
+ + numRecordsAfterFilter + " \n numHashes = " + numHashes + " \n numColumns = " + numColumns.value() + " \n numBatches = " + numBatches.value());
}
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
index bab4ae9..d0215fc 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/BroadcastVars.java
@@ -45,7 +45,7 @@
private Broadcast<QuerySchema> querySchema = null;
- private Broadcast<String> useLocalCache = null;
+ private Broadcast<Boolean> useLocalCache = null;
private Broadcast<Boolean> limitHitsPerSelector = null;
@@ -53,6 +53,10 @@
private Broadcast<String> expDir = null;
+ private Broadcast<String> output = null;
+
+ private Broadcast<Integer> maxBatches = null;
+
public BroadcastVars(JavaSparkContext sc)
{
jsc = sc;
@@ -73,6 +77,16 @@
return queryInfo.getValue();
}
+ public void setOutput(String outputIn)
+ {
+ output = jsc.broadcast(outputIn);
+ }
+
+ public String getOutput()
+ {
+ return output.getValue();
+ }
+
public void setQueryInfo(QueryInfo queryInfoIn)
{
queryInfo = jsc.broadcast(queryInfoIn);
@@ -98,12 +112,12 @@
return dataSchema.getValue();
}
- public void setUseLocalCache(String useLocalCacheInput)
+ public void setUseLocalCache(Boolean useLocalCacheInput)
{
useLocalCache = jsc.broadcast(useLocalCacheInput);
}
- public String getUseLocalCache()
+ public Boolean getUseLocalCache()
{
return useLocalCache.getValue();
}
@@ -137,4 +151,14 @@
{
return expDir.getValue();
}
+
+ public Integer getMaxBatches()
+ {
+ return maxBatches.getValue();
+ }
+
+ public void setMaxBatches(Integer maxBatchesIn)
+ {
+ maxBatches = jsc.broadcast(maxBatchesIn);
+ }
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index 2050643..f34acf8 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -91,6 +91,7 @@
private QueryInfo queryInfo = null;
Query query = null;
+ QuerySchema qSchema = null;
private int numDataPartitions = 0;
private int numColMultPartitions = 0;
@@ -175,7 +176,6 @@
bVars.setQuery(query);
bVars.setQueryInfo(queryInfo);
- QuerySchema qSchema = null;
if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false))
{
qSchema = queryInfo.getQuerySchema();
@@ -190,7 +190,7 @@
bVars.setDataSchema(dSchema);
// Set the local cache flag
- bVars.setUseLocalCache(SystemConfiguration.getProperty("pir.useLocalCache", "true"));
+ bVars.setUseLocalCache(SystemConfiguration.getBooleanProperty("pir.useLocalCache", true));
useHDFSLookupTable = SystemConfiguration.isSetTrue("pir.useHDFSLookupTable");
@@ -246,7 +246,7 @@
{
logger.info("Reading data ");
- JavaRDD<MapWritable> dataRDD;
+ JavaRDD<MapWritable> jsonRDD;
Job job = new Job();
String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
@@ -262,7 +262,6 @@
logger.debug("schemaName = " + name);
}
- QuerySchema qSchema = QuerySchemaRegistry.get(bVars.getQueryInfo().getQueryType());
job.getConfiguration().set("dataSchemaName", qSchema.getDataSchemaName());
job.getConfiguration().set("data.schemas", SystemConfiguration.getProperty("data.schemas"));
@@ -278,12 +277,19 @@
FileInputFormat.setInputPaths(job, inputData);
// Read data from hdfs
- JavaRDD<MapWritable> jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values().coalesce(numDataPartitions);
+ jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values().coalesce(numDataPartitions);
// Filter out by the provided stopListFile entries
- dataRDD = jsonRDD.filter(new FilterData(accum, bVars));
-
- return dataRDD;
+ if (qSchema.getFilter() != null)
+ {
+ JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars));
+ return filteredRDD;
+ }
+ else
+ {
+ logger.info("qSchema.getFilter() is null");
+ return jsonRDD;
+ }
}
/**
@@ -294,7 +300,7 @@
{
logger.info("Reading data ");
- JavaRDD<MapWritable> dataRDD;
+ JavaRDD<MapWritable> jsonRDD;
Job job = new Job();
String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
@@ -304,13 +310,19 @@
job.getConfiguration().set("es.resource", esResource);
job.getConfiguration().set("es.query", esQuery);
- JavaRDD<MapWritable> jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values()
- .coalesce(numDataPartitions);
+ jsonRDD = sc.newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values().coalesce(numDataPartitions);
// Filter out by the provided stopListFile entries
- dataRDD = jsonRDD.filter(new FilterData(accum, bVars));
-
- return dataRDD;
+ if (qSchema.getFilter() != null)
+ {
+ JavaRDD<MapWritable> filteredRDD = jsonRDD.filter(new FilterData(accum, bVars));
+ return filteredRDD;
+ }
+ else
+ {
+ logger.info("qSchema.getFilter() is null");
+ return jsonRDD;
+ }
}
/**
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
index 0f82b6d..56c917c 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultGroupedMapper.java
@@ -39,7 +39,7 @@
Query query = null;
- EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn)
+ public EncColMultGroupedMapper(Accumulators accumIn, BroadcastVars bbVarsIn)
{
query = bbVarsIn.getQuery();
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
index 44bce8d..9242df7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncColMultReducer.java
@@ -37,7 +37,7 @@
Query query = null;
- EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn)
+ public EncColMultReducer(Accumulators accumIn, BroadcastVars bbVarsIn)
{
query = bbVarsIn.getQuery();
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
index dc25439..ef279e2 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
@@ -63,10 +63,7 @@
query = bvIn.getQuery();
queryInfo = bvIn.getQueryInfo();
- if (bvIn.getUseLocalCache().equals("true"))
- {
- useLocalCache = true;
- }
+ useLocalCache = bvIn.getUseLocalCache();
limitHitsPerSelector = bvIn.getLimitHitsPerSelector();
maxHitsPerSelector = bvIn.getMaxHitsPerSelector();
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
index fb87b06..0a8959a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/FilterData.java
@@ -61,11 +61,7 @@
accum.incNumRecordsReceived(1);
// Perform the filter
- boolean passFilter = true;
- if (filter != null)
- {
- passFilter = ((DataFilter) filter).filterDataElement(dataElement, dSchema);
- }
+ boolean passFilter = ((DataFilter) filter).filterDataElement(dataElement, dSchema);
if (passFilter)
{
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
new file mode 100644
index 0000000..eaf7384
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/ComputeStreamingResponse.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark.streaming;
+
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pirk.inputformat.hadoop.BaseInputFormat;
+import org.apache.pirk.inputformat.hadoop.InputFormatConst;
+import org.apache.pirk.query.wideskies.Query;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.spark.Accumulators;
+import org.apache.pirk.responder.wideskies.spark.BroadcastVars;
+import org.apache.pirk.responder.wideskies.spark.EncColMultGroupedMapper;
+import org.apache.pirk.responder.wideskies.spark.EncColMultReducer;
+import org.apache.pirk.responder.wideskies.spark.EncRowCalc;
+import org.apache.pirk.responder.wideskies.spark.FilterData;
+import org.apache.pirk.responder.wideskies.spark.HashSelectorsAndPartitionData;
+import org.apache.pirk.schema.data.DataSchema;
+import org.apache.pirk.schema.data.DataSchemaLoader;
+import org.apache.pirk.schema.data.DataSchemaRegistry;
+import org.apache.pirk.schema.query.QuerySchema;
+import org.apache.pirk.schema.query.QuerySchemaLoader;
+import org.apache.pirk.schema.query.QuerySchemaRegistry;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.pirk.utils.PIRException;
+import org.apache.pirk.utils.SystemConfiguration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.elasticsearch.hadoop.mr.EsInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Master class for the PIR query spark streaming application
+ * <p>
+ * NOTE:
+ * <p>
+ * - NOT using Elasticsearch in practice - proved to be some speed issues with ES and Spark that appear to be ES-Spark specific - leave code in anticipation
+ * that the ES-Spark issues resolve...
+ * <p>
+ * - Even if rdd.count() calls are embedded in logger.debug statements, they are computed by Spark. Thus, they are commented out in the code below - uncomment
+ * for rdd.count() debug
+ *
+ */
+public class ComputeStreamingResponse
+{
+ private static final Logger logger = LoggerFactory.getLogger(ComputeStreamingResponse.class);
+
+ private String dataInputFormat = null;
+ private String inputData = null;
+ private String outputFile = null;
+ private String outputDirExp = null;
+
+ private String queryInput = null;
+ QuerySchema qSchema = null;
+
+ private String esQuery = "none";
+ private String esResource = "none";
+
+ private FileSystem fs = null;
+ private HadoopFileSystemStore storage = null;
+ private JavaStreamingContext jssc = null;
+
+ boolean useQueueStream = false;
+
+ private long batchSeconds = 0;
+ private long windowLength = 0;
+
+ private Accumulators accum = null;
+ private BroadcastVars bVars = null;
+
+ private QueryInfo queryInfo = null;
+ Query query = null;
+
+ private int numDataPartitions = 0;
+ private int numColMultPartitions = 0;
+
+ private boolean colMultReduceByKey = false;
+
+ public ComputeStreamingResponse(FileSystem fileSys) throws Exception
+ {
+ fs = fileSys;
+ storage = new HadoopFileSystemStore(fs);
+
+ dataInputFormat = SystemConfiguration.getProperty("pir.dataInputFormat");
+ if (!InputFormatConst.ALLOWED_FORMATS.contains(dataInputFormat))
+ {
+ throw new IllegalArgumentException("inputFormat = " + dataInputFormat + " is of an unknown form");
+ }
+ logger.info("inputFormat = " + dataInputFormat);
+ if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT))
+ {
+ inputData = SystemConfiguration.getProperty("pir.inputData", "none");
+ if (inputData.equals("none"))
+ {
+ throw new IllegalArgumentException("For inputFormat = " + dataInputFormat + " an inputFile must be specified");
+ }
+ logger.info("inputFile = " + inputData);
+ }
+ else if (dataInputFormat.equals(InputFormatConst.ES))
+ {
+ esQuery = SystemConfiguration.getProperty("pir.esQuery", "none");
+ esResource = SystemConfiguration.getProperty("pir.esResource", "none");
+ if (esQuery.equals("none"))
+ {
+ throw new IllegalArgumentException("esQuery must be specified");
+ }
+ if (esResource.equals("none"))
+ {
+ throw new IllegalArgumentException("esResource must be specified");
+ }
+ logger.info("esQuery = " + esQuery + " esResource = " + esResource);
+ }
+ outputFile = SystemConfiguration.getProperty("pir.outputFile");
+ outputDirExp = outputFile + "_exp";
+
+ queryInput = SystemConfiguration.getProperty("pir.queryInput");
+ String stopListFile = SystemConfiguration.getProperty("pir.stopListFile");
+
+ logger.info("outputFile = " + outputFile + " queryInputDir = " + queryInput + " stopListFile = " + stopListFile + " esQuery = " + esQuery
+ + " esResource = " + esResource);
+
+ // Pull the batchSeconds and windowLength parameters
+ batchSeconds = SystemConfiguration.getLongProperty("pir.sparkstreaming.batchSeconds", 30);
+ windowLength = SystemConfiguration.getLongProperty("pir.sparkstreaming.windowLength", 60);
+ if (windowLength % batchSeconds != 0)
+ {
+ throw new IllegalArgumentException("batchSeconds = " + batchSeconds + " must divide windowLength = " + windowLength);
+ }
+ useQueueStream = SystemConfiguration.getBooleanProperty("pir.sparkstreaming.useQueueStream", false);
+ logger.info("useQueueStream = " + useQueueStream);
+
+ // Set the necessary configurations
+ SparkConf conf = new SparkConf().setAppName("SparkPIR").setMaster("yarn-cluster");
+ conf.set("es.nodes", SystemConfiguration.getProperty("es.nodes", "none"));
+ conf.set("es.port", SystemConfiguration.getProperty("es.port", "none"));
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.streaming.stopGracefullyOnShutdown", SystemConfiguration.getProperty("spark.streaming.stopGracefullyOnShutdown", "false"));
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ jssc = new JavaStreamingContext(sc, Durations.seconds(batchSeconds));
+
+ // Setup, run query, teardown
+ logger.info("Setting up for query run");
+ setup();
+ logger.info("Setup complete");
+ }
+
+ // Setup for the accumulators and broadcast variables
+ private void setup() throws Exception
+ {
+ // Load the schemas
+ DataSchemaLoader.initialize(true, fs);
+ QuerySchemaLoader.initialize(true, fs);
+
+ // Create the accumulators and broadcast variables
+ accum = new Accumulators(jssc.sparkContext());
+ bVars = new BroadcastVars(jssc.sparkContext());
+
+ // Set the Query and QueryInfo broadcast variables
+ query = storage.recall(queryInput, Query.class);
+ queryInfo = query.getQueryInfo();
+ bVars.setQuery(query);
+ bVars.setQueryInfo(queryInfo);
+
+ if (SystemConfiguration.getBooleanProperty("pir.allowAdHocQuerySchemas", false))
+ {
+ qSchema = queryInfo.getQuerySchema();
+ }
+ if (qSchema == null)
+ {
+ qSchema = QuerySchemaRegistry.get(queryInfo.getQueryType());
+ }
+
+ DataSchema dSchema = DataSchemaRegistry.get(qSchema.getDataSchemaName());
+ bVars.setQuerySchema(qSchema);
+ bVars.setDataSchema(dSchema);
+
+ // Set the local cache flag
+ bVars.setUseLocalCache(SystemConfiguration.getBooleanProperty("pir.useLocalCache", true));
+
+ // Set the hit limit variables
+ bVars.setLimitHitsPerSelector(SystemConfiguration.getBooleanProperty("pir.limitHitsPerSelector", false));
+ bVars.setMaxHitsPerSelector(SystemConfiguration.getIntProperty("pir.maxHitsPerSelector", 100));
+
+ // Set the number of data and column multiplication partitions
+ numDataPartitions = SystemConfiguration.getIntProperty("pir.numDataPartitions", 1000);
+ numColMultPartitions = SystemConfiguration.getIntProperty("pir.numColMultPartitions", numDataPartitions);
+
+ // Whether or not we are performing a reduceByKey or a groupByKey->reduce for column multiplication
+ colMultReduceByKey = SystemConfiguration.getBooleanProperty("pir.colMultReduceByKey", false);
+
+ // Set the expDir
+ bVars.setExpDir(outputDirExp);
+
+ // Set the maxBatches
+ int maxBatches = SystemConfiguration.getIntProperty("pir.sparkstreaming.maxBatches", -1);
+ logger.info("maxBatches = " + maxBatches);
+ bVars.setMaxBatches(maxBatches);
+ }
+
+ /**
+ * Method to start the computation
+ *
+ * @throws InterruptedException
+ */
+ public void start() throws InterruptedException
+ {
+ logger.info("Starting computation...");
+
+ jssc.start();
+ jssc.awaitTermination();
+ }
+
+ /**
+ * Method to tear down necessary elements when app is complete
+ */
+ public void teardown()
+ {
+ logger.info("Tearing down...");
+ jssc.stop();
+ logger.info("Tear down complete");
+ }
+
+ /**
+ * Method to read in data from an allowed input source/format and perform the query
+ */
+ public void performQuery() throws Exception
+ {
+ logger.info("Performing query: ");
+
+ JavaDStream<MapWritable> inputRDD = null;
+ if (dataInputFormat.equals(InputFormatConst.BASE_FORMAT))
+ {
+ inputRDD = readData();
+ }
+ else if (dataInputFormat.equals(InputFormatConst.ES))
+ {
+ inputRDD = readDataES();
+ }
+
+ performQuery(inputRDD);
+ }
+
+ /**
+ * Method to read in the data from an allowed input format, filter, and return a RDD of MapWritable data elements
+ */
+ @SuppressWarnings("unchecked")
+ public JavaDStream<MapWritable> readData() throws ClassNotFoundException, Exception
+ {
+ logger.info("Reading data ");
+
+ Job job = new Job();
+ String baseQuery = SystemConfiguration.getProperty("pir.baseQuery");
+ String jobName = "pirSpark_base_" + baseQuery + "_" + System.currentTimeMillis();
+ job.setJobName(jobName);
+ job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
+ job.getConfiguration().set("query", baseQuery);
+
+ job.getConfiguration().set("dataSchemaName", qSchema.getDataSchemaName());
+ job.getConfiguration().set("data.schemas", SystemConfiguration.getProperty("data.schemas"));
+
+ // Set the inputFormatClass based upon the baseInputFormat property
+ String classString = SystemConfiguration.getProperty("pir.baseInputFormat");
+ Class<BaseInputFormat> inputClass = (Class<BaseInputFormat>) Class.forName(classString);
+ if (!Class.forName("org.apache.pirk.inputformat.hadoop.BaseInputFormat").isAssignableFrom(inputClass))
+ {
+ throw new Exception("baseInputFormat class = " + classString + " does not extend BaseInputFormat");
+ }
+ job.setInputFormatClass(inputClass);
+
+ FileInputFormat.setInputPaths(job, inputData);
+
+ // Read data from hdfs
+ logger.info("useQueueStream = " + useQueueStream);
+ JavaDStream<MapWritable> mwStream = null;
+ if (useQueueStream)
+ {
+ Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>();
+ JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), inputClass, Text.class, MapWritable.class).values()
+ .coalesce(numDataPartitions);
+
+ rddQueue.add(rddIn);
+ mwStream = jssc.queueStream(rddQueue);
+ }
+ else
+ {
+ JavaPairInputDStream<Text,MapWritable> inputRDD = jssc.fileStream(inputData, Text.class, MapWritable.class, inputClass);
+ mwStream = inputRDD.transform(new Function<JavaPairRDD<Text,MapWritable>,JavaRDD<MapWritable>>()
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public JavaRDD<MapWritable> call(JavaPairRDD<Text,MapWritable> pair) throws Exception
+ {
+ return pair.values();
+ }
+ }).repartition(numDataPartitions);
+ }
+
+ // Filter out by the provided stopListFile entries
+ if (qSchema.getFilter() != null)
+ {
+ JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars));
+ return filteredRDD;
+ }
+
+ return mwStream;
+ }
+
+ /**
+ * Method to read in the data from elasticsearch, filter, and return a RDD of MapWritable data elements
+ */
+ @SuppressWarnings("unchecked")
+ public JavaDStream<MapWritable> readDataES() throws Exception
+ {
+ logger.info("Reading data ");
+
+ Job job = new Job();
+ String jobName = "pirSpark_ES_" + esQuery + "_" + System.currentTimeMillis();
+ job.setJobName(jobName);
+ job.getConfiguration().set("es.nodes", SystemConfiguration.getProperty("es.nodes"));
+ job.getConfiguration().set("es.port", SystemConfiguration.getProperty("es.port"));
+ job.getConfiguration().set("es.resource", esResource);
+ job.getConfiguration().set("es.query", esQuery);
+
+ // Read data from hdfs
+ JavaDStream<MapWritable> mwStream = null;
+ if (useQueueStream)
+ {
+ Queue<JavaRDD<MapWritable>> rddQueue = new LinkedList<JavaRDD<MapWritable>>();
+ JavaRDD<MapWritable> rddIn = jssc.sparkContext().newAPIHadoopRDD(job.getConfiguration(), EsInputFormat.class, Text.class, MapWritable.class).values()
+ .coalesce(numDataPartitions);
+ rddQueue.add(rddIn);
+
+ mwStream = jssc.queueStream(rddQueue);
+ }
+ else
+ {
+ JavaPairInputDStream<Text,MapWritable> inputRDD = jssc.fileStream(inputData, Text.class, MapWritable.class, EsInputFormat.class);
+ mwStream = inputRDD.transform(new Function<JavaPairRDD<Text,MapWritable>,JavaRDD<MapWritable>>()
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public JavaRDD<MapWritable> call(JavaPairRDD<Text,MapWritable> pair) throws Exception
+ {
+ return pair.values();
+ }
+ }).repartition(numDataPartitions);
+ }
+
+ // Filter out by the provided stopListFile entries
+ if (qSchema.getFilter() != null)
+ {
+ JavaDStream<MapWritable> filteredRDD = mwStream.filter(new FilterData(accum, bVars));
+ return filteredRDD;
+ }
+ else
+ {
+ return mwStream;
+ }
+ }
+
+ /**
+ * Method to perform the query given an input JavaDStream of JSON
+ *
+ * @throws InterruptedException
+ *
+ */
+ public void performQuery(JavaDStream<MapWritable> input) throws PIRException, InterruptedException
+ {
+ logger.info("Performing query: ");
+
+ // Process non-overlapping windows of data of duration windowLength seconds
+ // If we are using queue streams, there is no need to window
+ if (!useQueueStream)
+ {
+ input.window(Durations.seconds(windowLength), Durations.seconds(windowLength));
+ }
+
+ // Extract the selectors for each dataElement based upon the query type
+ // and perform a keyed hash of the selectors
+ JavaPairDStream<Integer,List<BigInteger>> selectorHashToDocRDD = input.mapToPair(new HashSelectorsAndPartitionData(bVars));
+
+ // Group by hashed selector (row) -- can combine with the line above, separating for testing and benchmarking...
+ JavaPairDStream<Integer,Iterable<List<BigInteger>>> selectorGroupRDD = selectorHashToDocRDD.groupByKey();
+
+ // Calculate the encrypted row values for each row, emit <colNum, colVal> for each row
+ JavaPairDStream<Long,BigInteger> encRowRDD = selectorGroupRDD.flatMapToPair(new EncRowCalc(accum, bVars));
+
+ // Multiply the column values by colNum: emit <colNum, finalColVal> and write the final result object
+ encryptedColumnCalc(encRowRDD);
+
+ // Start the streaming computation
+ start();
+ }
+
+ // Method to compute the final encrypted columns
+ private void encryptedColumnCalc(JavaPairDStream<Long,BigInteger> encRowRDD) throws PIRException
+ {
+ // Multiply the column values by colNum: emit <colNum, finalColVal>
+ JavaPairDStream<Long,BigInteger> encColRDD;
+ if (colMultReduceByKey)
+ {
+ encColRDD = encRowRDD.reduceByKey(new EncColMultReducer(accum, bVars), numColMultPartitions);
+ }
+ else
+ {
+ encColRDD = encRowRDD.groupByKey(numColMultPartitions).mapToPair(new EncColMultGroupedMapper(accum, bVars));
+ }
+
+ // Update the output name, by batch number
+ bVars.setOutput(outputFile + "_" + accum.numBatchesGetValue());
+
+ // Form and write the response object
+ encColRDD.repartition(1).foreachRDD(new VoidFunction<JavaPairRDD<Long,BigInteger>>()
+ {
+ @Override
+ public void call(JavaPairRDD<Long,BigInteger> rdd)
+ {
+ rdd.foreachPartition(new FinalResponseFunction(accum, bVars));
+
+ int maxBatchesVar = bVars.getMaxBatches();
+ if (maxBatchesVar != -1 && accum.numBatchesGetValue() == maxBatchesVar)
+ {
+ logger.info("num batches = maxBatches = " + maxBatchesVar + "; shutting down");
+ System.exit(0);
+ }
+
+ }
+ });
+ }
+}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java
new file mode 100644
index 0000000..a9f07bd
--- /dev/null
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/streaming/FinalResponseFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pirk.responder.wideskies.spark.streaming;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.pirk.query.wideskies.QueryInfo;
+import org.apache.pirk.responder.wideskies.spark.Accumulators;
+import org.apache.pirk.responder.wideskies.spark.BroadcastVars;
+import org.apache.pirk.response.wideskies.Response;
+import org.apache.pirk.serialization.HadoopFileSystemStore;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Tuple2;
+
+public class FinalResponseFunction implements VoidFunction<Iterator<Tuple2<Long,BigInteger>>>
+{
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger logger = LoggerFactory.getLogger(FinalResponseFunction.class);
+
+ private BroadcastVars bVars = null;
+ private Accumulators accum = null;
+
+ public FinalResponseFunction(Accumulators accumIn, BroadcastVars bbVarsIn)
+ {
+ bVars = bbVarsIn;
+ accum = accumIn;
+ }
+
+ public void call(Iterator<Tuple2<Long,BigInteger>> iter) throws Exception
+ {
+ // Form the response object
+ QueryInfo queryInfo = bVars.getQueryInfo();
+ Response response = new Response(queryInfo);
+ while (iter.hasNext())
+ {
+ Tuple2<Long,BigInteger> input = iter.next();
+ response.addElement(input._1().intValue(), input._2());
+ logger.debug("colNum = " + input._1().intValue() + " column = " + input._2().toString());
+ }
+
+ // Write out the response
+ FileSystem fs = FileSystem.get(new Configuration());
+ HadoopFileSystemStore storage = new HadoopFileSystemStore(fs);
+ String outputFile = bVars.getOutput();
+ logger.debug("outputFile = " + outputFile);
+ try
+ {
+ storage.store(outputFile, response);
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ accum.incNumBatches(1);
+ }
+}
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchema.java b/src/main/java/org/apache/pirk/schema/data/DataSchema.java
index 9557ce6..8728b96 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchema.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchema.java
@@ -29,6 +29,8 @@
import org.apache.pirk.schema.data.partitioner.DataPartitioner;
import org.apache.pirk.schema.data.partitioner.PrimitiveTypePartitioner;
import org.apache.pirk.utils.PIRException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A data schema describes the target data being referenced by a <code>Querier</code> and a <code>Responder</code>.
@@ -43,6 +45,8 @@
{
private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(DataSchema.class);
+
// This schema's name.
private final String schemaName;
@@ -212,6 +216,7 @@
text = new Text(elementName);
textRep.put(elementName, text);
}
+
return text;
}
diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java
index 1535e1f..7930464 100644
--- a/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java
+++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestCLI.java
@@ -166,6 +166,9 @@
tests += "J - JSON/HDFS MapReduce\n";
tests += "ES - Elasticsearch Spark \n";
tests += "JS - JSON/HDFS Spark \n";
+ tests += "SS - Spark Streaming Tests \n";
+ tests += "JSS - JSON/HDFS Spark Streaming \n";
+ tests += "ESS - Elasticsearch Spark Streaming \n";
Option optionTestSelection = new Option("t", "tests", true, "optional -- Select which tests to execute: \n" + tests);
optionTestSelection.setRequired(false);
diff --git a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java
index a8cec45..f312932 100755
--- a/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java
+++ b/src/main/java/org/apache/pirk/test/distributed/DistributedTestDriver.java
@@ -88,6 +88,7 @@
List<JSONObject> dataElements = Inputs.createJSONInput(fs);
String localStopListFile = Inputs.createStopList(fs, true);
+
SystemConfiguration.setProperty("pir.stopListFile", localStopListFile);
Inputs.createSchemaFiles(fs, true, StopListFilter.class.getName());
@@ -100,11 +101,30 @@
*/
public static void test(FileSystem fs, DistributedTestCLI cli, List<JSONObject> pirDataElements) throws Exception
{
+ // MapReduce JSON input
if (cli.run("1:J"))
{
DistTestSuite.testJSONInputMR(fs, pirDataElements);
}
- if (cli.run("1:E") || cli.run("1:ES"))
+
+ // Spark with JSON input
+ if (cli.run("1:JS"))
+ {
+ DistTestSuite.testJSONInputSpark(fs, pirDataElements);
+ }
+
+ // Spark Streaming
+ if (cli.run("1:SS"))
+ {
+ DistTestSuite.testSparkStreaming(fs, pirDataElements);
+ }
+ if (cli.run("1:JSS"))
+ {
+ DistTestSuite.testJSONInputSparkStreaming(fs, pirDataElements);
+ }
+
+ // Elasticsearch input
+ if (cli.run("1:E") || cli.run("1:ES") || cli.run("1:ESS"))
{
Inputs.createESInput();
if (cli.run("1:E"))
@@ -115,10 +135,10 @@
{
DistTestSuite.testESInputSpark(fs, pirDataElements);
}
- }
- if (cli.run("1:JS"))
- {
- DistTestSuite.testJSONInputSpark(fs, pirDataElements);
+ if (cli.run("1:ESS"))
+ {
+ DistTestSuite.testESInputSparkStreaming(fs, pirDataElements);
+ }
}
}
diff --git a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
index bc59619..f44815a 100644
--- a/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
+++ b/src/main/java/org/apache/pirk/test/distributed/testsuite/DistTestSuite.java
@@ -80,13 +80,13 @@
// Run tests
SystemConfiguration.setProperty("pirTest.embedSelector", "true");
BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1);
- BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1);
+ BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1, false);
SystemConfiguration.setProperty("pirTest.embedSelector", "false");
BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 2);
- BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
- BaseTests.testSRCIPQueryNoFilter(dataElements, fs, false, true, 2);
+ BaseTests.testSRCIPQueryNoFilter(dataElements, fs, false, true, 2, false);
// Test hit limits per selector
SystemConfiguration.setProperty("pir.limitHitsPerSelector", "true");
@@ -97,8 +97,8 @@
// Test the local cache for modular exponentiation
SystemConfiguration.setProperty("pir.useLocalCache", "true");
- BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
- BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
+ BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2, false);
SystemConfiguration.setProperty("pir.useLocalCache", "false");
// Change query for NXDOMAIN
@@ -114,7 +114,7 @@
// In memory table
SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
SystemConfiguration.setProperty("pirTest.useExpLookupTable", "true");
- BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
// Create exp table in hdfs
SystemConfiguration.setProperty("mapreduce.map.memory.mb", "10000");
@@ -126,7 +126,7 @@
SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
SystemConfiguration.setProperty("pir.expCreationSplits", "50");
SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150");
- BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
// Reset exp properties
SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
@@ -172,12 +172,12 @@
// Run tests
SystemConfiguration.setProperty("pirTest.embedSelector", "true");
BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 1);
- BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2);
- BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1);
+ BaseTests.testSRCIPQuery(dataElements, fs, false, true, 2, false);
+ BaseTests.testDNSIPQuery(dataElements, fs, false, true, 1, false);
SystemConfiguration.setProperty("pirTest.embedSelector", "false");
BaseTests.testDNSHostnameQuery(dataElements, fs, false, true, 2);
- BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, false, true, 2, false);
// Change query for NXDOMAIN
SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3");
@@ -215,14 +215,14 @@
// Run tests
SystemConfiguration.setProperty("pirTest.embedSelector", "true");
BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1);
- BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1);
+ BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1, false);
SystemConfiguration.setProperty("pirTest.embedSelector", "false");
BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 2);
- BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2);
- BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false);
+ BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2, false);
- BaseTests.testSRCIPQueryNoFilter(dataElements, fs, true, true, 2);
+ BaseTests.testSRCIPQueryNoFilter(dataElements, fs, true, true, 2, false);
// Test embedded QuerySchema
SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "true");
@@ -253,18 +253,18 @@
// Test the local cache for modular exponentiation
SystemConfiguration.setProperty("pirTest.embedSelector", "true");
SystemConfiguration.setProperty("pir.useLocalCache", "true");
- BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3);
+ BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3, false);
// Test the join functionality for the modular exponentiation table
SystemConfiguration.setProperty("pir.useModExpJoin", "true");
- BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3);
+ BaseTests.testDNSIPQuery(dataElements, fs, true, true, 3, false);
SystemConfiguration.setProperty("pir.useModExpJoin", "false");
// Test file based exp lookup table for modular exponentiation
SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "true");
SystemConfiguration.setProperty("pir.expCreationSplits", "500");
SystemConfiguration.setProperty("pir.numExpLookupPartitions", "150");
- BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false);
SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
// Change query for NXDOMAIN
@@ -303,12 +303,12 @@
// Run tests
SystemConfiguration.setProperty("pirTest.embedSelector", "true");
BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 1);
- BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1);
- BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, true, true, 1, false);
+ BaseTests.testSRCIPQuery(dataElements, fs, true, true, 2, false);
SystemConfiguration.setProperty("pirTest.embedSelector", "false");
BaseTests.testDNSHostnameQuery(dataElements, fs, true, true, 2);
- BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2);
+ BaseTests.testDNSIPQuery(dataElements, fs, true, true, 2, false);
// Change query for NXDOMAIN
SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:3");
@@ -321,9 +321,89 @@
logger.info("Completed testESInputSpark");
}
+ public static void testSparkStreaming(FileSystem fs, List<JSONObject> pirDataElements) throws Exception
+ {
+ testJSONInputSparkStreaming(fs, pirDataElements);
+ testESInputSparkStreaming(fs, pirDataElements);
+ }
+
+ public static void testJSONInputSparkStreaming(FileSystem fs, List<JSONObject> pirDataElements) throws Exception
+ {
+ logger.info("Starting testJSONInputSparkStreaming");
+
+ SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
+ SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000");
+
+ SystemConfiguration.setProperty("pir.numColMultPartitions", "20");
+ SystemConfiguration.setProperty("pir.colMultReduceByKey", "false");
+
+ SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
+ SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
+
+ SystemConfiguration.setProperty("pirTest.embedSelector", "true");
+
+ SystemConfiguration.setProperty("pir.numDataPartitions", "3");
+
+ SystemConfiguration.setProperty("pir.sparkstreaming.batchSeconds", "30");
+ SystemConfiguration.setProperty("pir.sparkstreaming.windowLength", "60");
+ SystemConfiguration.setProperty("pir.sparkstreaming.useQueueStream", "true");
+ SystemConfiguration.setProperty("pir.sparkstreaming.maxBatches", "1");
+
+ SystemConfiguration.setProperty("spark.streaming.stopGracefullyOnShutdown", "false");
+
+ // Set up JSON configs
+ SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.BASE_FORMAT);
+ SystemConfiguration.setProperty("pir.inputData", SystemConfiguration.getProperty(DistributedTestDriver.JSON_PIR_INPUT_FILE_PROPERTY));
+ SystemConfiguration.setProperty("pir.baseQuery", "?q=rcode:0");
+
+ // Run tests
+ BaseTests.testDNSHostnameQuery(pirDataElements, fs, true, true, 1, false, true);
+ BaseTests.testDNSIPQuery(pirDataElements, fs, true, true, 1, true);
+ BaseTests.testSRCIPQuery(pirDataElements, fs, true, true, 3, true);
+ BaseTests.testSRCIPQueryNoFilter(pirDataElements, fs, true, true, 2, true);
+
+ logger.info("Completed testJSONInputSparkStreaming");
+ }
+
+ public static void testESInputSparkStreaming(FileSystem fs, List<JSONObject> pirDataElements) throws Exception
+ {
+ logger.info("Starting testESInputSparkStreaming");
+
+ SystemConfiguration.setProperty("pirTest.useHDFSExpLookupTable", "false");
+ SystemConfiguration.setProperty("pirTest.useExpLookupTable", "false");
+
+ SystemConfiguration.setProperty("pir.limitHitsPerSelector", "false");
+ SystemConfiguration.setProperty("pir.maxHitsPerSelector", "1000");
+
+ SystemConfiguration.setProperty("pir.allowAdHocQuerySchemas", "false");
+ SystemConfiguration.setProperty("pir.embedQuerySchema", "false");
+
+ SystemConfiguration.setProperty("pir.sparkstreaming.batchSeconds", "30");
+ SystemConfiguration.setProperty("pir.sparkstreaming.windowLength", "60");
+ SystemConfiguration.setProperty("pir.sparkstreaming.useQueueStream", "true");
+ SystemConfiguration.setProperty("pir.sparkstreaming.maxBatches", "1");
+
+ SystemConfiguration.setProperty("spark.streaming.stopGracefullyOnShutdown", "false");
+
+ // Set up ES configs
+ SystemConfiguration.setProperty("pir.dataInputFormat", InputFormatConst.ES);
+ SystemConfiguration.setProperty("pir.esQuery", "?q=rcode:0");
+ SystemConfiguration.setProperty("pir.esResource", SystemConfiguration.getProperty(DistributedTestDriver.ES_INPUT_RESOURCE_PROPERTY));
+
+ // Run tests
+ SystemConfiguration.setProperty("pirTest.embedSelector", "true");
+ BaseTests.testDNSHostnameQuery(pirDataElements, fs, true, true, 1, false, true);
+ BaseTests.testDNSIPQuery(pirDataElements, fs, true, true, 1, true);
+ BaseTests.testSRCIPQuery(pirDataElements, fs, true, true, 3, true);
+
+ logger.info("Completed testESInputSparkStreaming");
+ }
+
// Base method to perform query
- public static List<QueryResponseJSON> performQuery(String queryType, ArrayList<String> selectors, FileSystem fs, boolean isSpark, int numThreads)
- throws Exception
+ // TODO: This could be changed to pass in the platform instead of isSpark and isStreaming...
+ @SuppressWarnings("unused")
+ public static List<QueryResponseJSON> performQuery(String queryType, ArrayList<String> selectors, FileSystem fs, boolean isSpark, int numThreads,
+ boolean isStreaming) throws Exception
{
logger.info("performQuery: ");
@@ -379,11 +459,28 @@
logger.info("Performing encrypted query:");
if (isSpark)
{
+ logger.info("spark.home = " + SystemConfiguration.getProperty("spark.home"));
+
// Build args
String inputFormat = SystemConfiguration.getProperty("pir.dataInputFormat");
logger.info("inputFormat = " + inputFormat);
ArrayList<String> args = new ArrayList<>();
- args.add("-" + ResponderProps.PLATFORM + "=spark");
+ if (isStreaming)
+ {
+ logger.info("platform = sparkstreaming");
+ args.add("-" + ResponderProps.PLATFORM + "=sparkstreaming");
+ args.add("-" + ResponderProps.BATCHSECONDS + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.batchSeconds", "30"));
+ args.add("-" + ResponderProps.WINDOWLENGTH + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.windowLength", "60"));
+ args.add("-" + ResponderProps.MAXBATCHES + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.maxBatches", "-1"));
+ args.add("-" + ResponderProps.STOPGRACEFULLY + "=" + SystemConfiguration.getProperty("spark.streaming.stopGracefullyOnShutdown", "false"));
+ args.add("-" + ResponderProps.NUMDATAPARTITIONS + "=" + SystemConfiguration.getProperty("pir.numDataPartitions", "3"));
+ args.add("-" + ResponderProps.USEQUEUESTREAM + "=" + SystemConfiguration.getProperty("pir.sparkstreaming.useQueueStream", "false"));
+ }
+ else
+ {
+ logger.info("platform = spark");
+ args.add("-" + ResponderProps.PLATFORM + "=spark");
+ }
args.add("-" + ResponderProps.DATAINPUTFORMAT + "=" + inputFormat);
args.add("-" + ResponderProps.QUERYINPUT + "=" + SystemConfiguration.getProperty("pir.queryInput"));
args.add("-" + ResponderProps.OUTPUTFILE + "=" + SystemConfiguration.getProperty("pir.outputFile"));
@@ -436,6 +533,11 @@
// Perform decryption
// Reconstruct the necessary objects from the files
logger.info("Performing decryption; writing final results file");
+ if (isStreaming)
+ {
+ outputFile = outputFile + "_0"; // currently only processing one batch for testing
+ }
+ logger.info("Pulling results from outputFile = " + outputFile);
Response response = new HadoopFileSystemStore(fs).recall(outputFile, Response.class);
// Perform decryption and output the result file
diff --git a/src/main/java/org/apache/pirk/test/utils/BaseTests.java b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
index c1fa1e9..962e467 100644
--- a/src/main/java/org/apache/pirk/test/utils/BaseTests.java
+++ b/src/main/java/org/apache/pirk/test/utils/BaseTests.java
@@ -62,18 +62,18 @@
public static void testDNSHostnameQuery(ArrayList<JSONObject> dataElements, int numThreads, boolean testFalsePositive) throws Exception
{
- testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive);
+ testDNSHostnameQuery(dataElements, null, false, false, numThreads, testFalsePositive, false);
}
public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
throws Exception
{
- testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false);
+ testDNSHostnameQuery(dataElements, fs, isSpark, isDistributed, numThreads, false, false);
}
// Query for the watched hostname occurred; ; watched value type: hostname (String)
public static void testDNSHostnameQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads,
- boolean testFalsePositive) throws Exception
+ boolean testFalsePositive, boolean isStreaming) throws Exception
{
logger.info("Running testDNSHostnameQuery(): ");
@@ -83,7 +83,7 @@
List<QueryResponseJSON> results;
if (isDistributed)
{
- results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, fs, isSpark, numThreads);
+ results = DistTestSuite.performQuery(Inputs.DNS_HOSTNAME_QUERY, selectorsDomain, fs, isSpark, numThreads, isStreaming);
}
else
{
@@ -193,11 +193,12 @@
public static void testDNSIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception
{
- testDNSIPQuery(dataElements, null, false, false, numThreads);
+ testDNSIPQuery(dataElements, null, false, false, numThreads, false);
}
// The watched IP address was detected in the response to a query; watched value type: IP address (String)
- public static void testDNSIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception
+ public static void testDNSIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, boolean isStreaming)
+ throws Exception
{
logger.info("Running testDNSIPQuery(): ");
@@ -206,7 +207,7 @@
if (isDistributed)
{
- results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads);
+ results = DistTestSuite.performQuery(Inputs.DNS_IP_QUERY, selectorsIP, fs, isSpark, numThreads, isStreaming);
if (results.size() != 5)
{
@@ -280,7 +281,7 @@
if (isDistributed)
{
- results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads);
+ results = DistTestSuite.performQuery(Inputs.DNS_NXDOMAIN_QUERY, selectorsDomain, fs, isSpark, numThreads, false);
}
else
{
@@ -330,11 +331,12 @@
public static void testSRCIPQuery(ArrayList<JSONObject> dataElements, int numThreads) throws Exception
{
- testSRCIPQuery(dataElements, null, false, false, numThreads);
+ testSRCIPQuery(dataElements, null, false, false, numThreads, false);
}
// Query for responses from watched srcIPs
- public static void testSRCIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads) throws Exception
+ public static void testSRCIPQuery(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads, boolean isStreaming)
+ throws Exception
{
logger.info("Running testSRCIPQuery(): ");
@@ -345,7 +347,7 @@
int numExpectedResults = 1;
if (isDistributed)
{
- results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, selectorsIP, fs, isSpark, numThreads);
+ results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY, selectorsIP, fs, isSpark, numThreads, isStreaming);
removeTailElements = 2; // The last two elements are on the distributed stoplist
}
else
@@ -406,8 +408,8 @@
}
// Query for responses from watched srcIPs
- public static void testSRCIPQueryNoFilter(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads)
- throws Exception
+ public static void testSRCIPQueryNoFilter(List<JSONObject> dataElements, FileSystem fs, boolean isSpark, boolean isDistributed, int numThreads,
+ boolean isStreaming) throws Exception
{
logger.info("Running testSRCIPQueryNoFilter(): ");
@@ -417,7 +419,7 @@
int numExpectedResults = 3;
if (isDistributed)
{
- results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, fs, isSpark, numThreads);
+ results = DistTestSuite.performQuery(Inputs.DNS_SRCIP_QUERY_NO_FILTER, selectorsIP, fs, isSpark, numThreads, isStreaming);
}
else
{
diff --git a/src/main/java/org/apache/pirk/test/utils/Inputs.java b/src/main/java/org/apache/pirk/test/utils/Inputs.java
index 587ac99..b6e7251 100644
--- a/src/main/java/org/apache/pirk/test/utils/Inputs.java
+++ b/src/main/java/org/apache/pirk/test/utils/Inputs.java
@@ -207,8 +207,7 @@
dataElementsJSON.add(jsonObj7);
- // This should never be returned - doesn't hit on any domain selectors
- // resolution ip on stoplist
+ // Doesn't hit on any domain selectors; resolution ip on stoplist
JSONObject jsonObj8 = new JSONObject();
jsonObj8.put(DATE, "2016-02-20T23:29:12.000Z");
jsonObj8.put(QNAME, "something.else2");
@@ -220,7 +219,7 @@
dataElementsJSON.add(jsonObj8);
- // This should never be returned in distributed case -- domain and resolution ip on stoplist
+ // Domain and resolution ip on stoplist
JSONObject jsonObj9 = new JSONObject();
jsonObj9.put(DATE, "2016-02-20T23:29:13.000Z");
jsonObj9.put(QNAME, "something.else.on.stoplist");
@@ -340,14 +339,14 @@
"{\"qname\":\"something.else\",\"date\":\"2016-02-20T23:29:11.000Z\",\"qtype\":[\"1\"]"
+ ",\"rcode\":\"0\",\"src_ip\":\"1.1.1.1\",\"dest_ip\":\"2.2.2.2\"" + ",\"ip\":[\"3.3.3.3\"]}");
- // Never should be returned - doesn't hit on any selectors
+ // Doesn't hit on any domain selectors; resolution ip on stoplist
String indexTypeNum8 = esTestIndex + "/" + esType + "/8";
logger.info("indexTypeNum8 = " + indexTypeNum8);
ProcessBuilder pAdd8 = new ProcessBuilder("curl", "-XPUT", indexTypeNum8, "-d",
"{\"qname\":\"something.else2\",\"date\":\"2016-02-20T23:29:12.000Z\",\"qtype\":[\"1\"]"
- + ",\"rcode\":\"0\",\"src_ip\":\"1.1.1.12\",\"dest_ip\":\"2.2.2.22\"" + ",\"ip\":[\"3.3.3.32\"]}");
+ + ",\"rcode\":\"0\",\"src_ip\":\"5.6.7.8\",\"dest_ip\":\"2.2.2.22\"" + ",\"ip\":[\"3.3.3.132\"]}");
- // This should never be returned -- domain on stoplist
+ // Domain on stoplist
String indexTypeNum9 = esTestIndex + "/" + esType + "/9";
logger.info("indexTypeNum9 = " + indexTypeNum9);
ProcessBuilder pAdd9 = new ProcessBuilder("curl", "-XPUT", indexTypeNum9, "-d",
diff --git a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
index 4146e5b..8cb5d17 100755
--- a/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
+++ b/src/main/java/org/apache/pirk/utils/SystemConfiguration.java
@@ -117,6 +117,23 @@
}
/**
+ * Gets the specified property as an <code>long</code>, or the default value if the property isn't found.
+ *
+ * @param propertyName
+ * The name of the requested long property value.
+ * @param defaultValue
+ * The value to return if the property is undefined.
+ * @return The value of the requested property, or the default value if the property is undefined.
+ * @throws NumberFormatException
+ * If the property does not contain a parsable <code>long</code> value.
+ */
+ public static long getLongProperty(String propertyName, long defaultValue)
+ {
+ String value = props.getProperty(propertyName);
+ return (value == null) ? defaultValue : Long.parseLong(value);
+ }
+
+ /**
* Gets the specified property as a <code>boolean</code>, or the default value if the property isn't defined.
*
* @param propertyName
@@ -207,6 +224,7 @@
*/
public static void loadPropsFromDir(String dirName)
{
+ logger.info("Loading properties from dirName = " + dirName);
File[] directoryListing = new File(dirName).listFiles(new FilenameFilter()
{
@Override
diff --git a/src/main/resources/pirk.properties b/src/main/resources/pirk.properties
index f8efea7..963fa34 100755
--- a/src/main/resources/pirk.properties
+++ b/src/main/resources/pirk.properties
@@ -81,7 +81,7 @@
##
#ES host address - One Elasticsearch node in the cluster - may include port specification
-es.nodes= none
+es.nodes=none
#Default HTTP/REST port used for connecting to Elasticsearch
es.port=9200
@@ -106,7 +106,7 @@
#Elasticsearch resource - Elasticsearch resource location where data is read and written to.
#Requires the format <index>/<type>
-test.es.resource= none
+test.es.resource= /testindex/pkt
#Pathname in hdfs to place input JSON file testing
test.inputJSONFile = /tmp/testJSONInput
diff --git a/src/main/resources/responder.properties b/src/main/resources/responder.properties
index 11ad7f6..3ae92c7 100644
--- a/src/main/resources/responder.properties
+++ b/src/main/resources/responder.properties
@@ -27,7 +27,7 @@
#outputFile -- required -- Fully qualified name of output file in hdfs
pir.outputFile=
-#platform -- required -- 'mapreduce', 'spark', or 'standalone'
+#platform -- required -- 'mapreduce', 'spark', 'sparkstreaming', or 'standalone'
#Processing platform technology for the responder
platform=
@@ -134,4 +134,24 @@
#numExpLookupPartitions -- optional -- Number of partitions for the exp lookup table
#pir.numExpLookupPartitions=
+
+##Props for Spark Streaming
+
+#batchSeconds - optional - Batch size (in seconds) for Spark Streaming - defaults to 30 sec
+#pir.sparkstreaming.batchSeconds=
+
+#windowLength - optional - Window size (in seconds) for Spark Streaming - defaults to 60 sec
+#pir.sparkstreaming.windowLength=
+
+#queueStream - optional - Use queue stream for Spark Streaming - defaults to false
+#pir.sparkstreaming.useQueueStream=
+
+#pir.sparkstreaming.maxBatches - optional - Spark Streaming - Max number of batches to process
+#defaults to -1 (no maximum)
+#pir.sparkstreaming.maxBatches=
+
+#spark.streaming.stopGracefullyOnShutdown - Spark Streaming - Whether or not to stop 'gracefully' during shutdown
+#default is false
+#spark.streaming.stopGracefullyOnShutdown=
+
\ No newline at end of file
diff --git a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
index 9ac2522..bb70153 100644
--- a/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
+++ b/src/test/java/org/apache/pirk/general/QueryParserUtilsTest.java
@@ -66,11 +66,6 @@
Inputs.createSchemaFiles(null, false, null);
dSchema = DataSchemaRegistry.get(Inputs.TEST_DATA_SCHEMA_NAME);
-
- // ProcessBuilder pAdd1 = new ProcessBuilder("curl", "-XPUT", indexTypeNum1, "-d",
- // "{\"qname\":\"a.b.c.com\",\"date\":\"2016-02-20T23:29:05.000Z\",\"qtype\":[\"1\"]"
- // + ",\"rcode\":\"0\",\"src_ip\":\"55.55.55.55\",\"dest_ip\":\"1.2.3.6\"" + ",\"ip\":[\"10.20.30.40\",\"10.20.30.60\"]}");
- //
doc = StringUtils.jsonStringToMapWritableWithArrayWritable(dataElementsJSON.get(0).toJSONString(), dSchema);
docWAW = StringUtils.jsonStringToMapWritableWithWritableArrayWritable(dataElementsJSON.get(0).toJSONString(), dSchema);
docMap = StringUtils.jsonStringToMap(dataElementsJSON.get(0).toJSONString(), dSchema);
diff --git a/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java b/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java
index a801b01..0b09fa9 100644
--- a/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java
+++ b/src/test/java/org/apache/pirk/schema/query/LoadQuerySchemaTest.java
@@ -305,8 +305,8 @@
// Create the stoplist file and alter the properties accordingly
private void createStopListFile() throws IOException, PIRException
{
- SystemConfiguration.setProperty("pir.stopListFile", "testStopListFile");
String newSLFile = Inputs.createStopList(null, false);
+
SystemConfiguration.setProperty("pir.stopListFile", newSLFile);
}
diff --git a/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java b/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java
index 2144ee1..cb65a60 100644
--- a/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java
+++ b/src/test/java/org/apache/pirk/wideskies/standalone/StandaloneTest.java
@@ -62,9 +62,10 @@
// Create the stoplist file
stopListFileProp = SystemConfiguration.getProperty("pir.stopListFile");
- SystemConfiguration.setProperty("pir.stopListFile", STOPLIST_FILE);
+
String newSLFile = Inputs.createStopList(null, false);
SystemConfiguration.setProperty("pir.stopListFile", newSLFile);
+
logger.info("stopListFileProp = " + stopListFileProp + " new prop = " + SystemConfiguration.getProperty("pir.stopListFile"));
// Create data and query schemas