Removed unused variables, general code clean up - closes apache/incubator-pirk#69
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
index b1d2828..6dbf030 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/ResponderProps.java
@@ -48,23 +48,23 @@
public static final String ESQUERY = "pir.esQuery";
public static final String BASEINPUTFORMAT = "pir.baseInputFormat";
public static final String STOPLISTFILE = "pir.stopListFile";
- public static final String NUMREDUCETASKS = "pir.numReduceTasks";
- public static final String USELOCALCACHE = "pir.useLocalCache";
- public static final String LIMITHITSPERSELECTOR = "pir.limitHitsPerSelector";
- public static final String MAXHITSPERSELECTOR = "pir.maxHitsPerSelector";
- public static final String MAPMEMORY = "mapreduce.map.memory.mb";
- public static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
- public static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
- public static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
public static final String QUERYSCHEMAS = "responder.querySchemas";
public static final String DATASCHEMAS = "responder.dataSchemas";
public static final String NUMEXPLOOKUPPARTS = "pir.numExpLookupPartitions";
- public static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
- public static final String NUMDATAPARTITIONS = "pir.numDataPartitions";
+ public static final String USELOCALCACHE = "pir.useLocalCache";
+ public static final String LIMITHITSPERSELECTOR = "pir.limitHitsPerSelector";
+ public static final String MAXHITSPERSELECTOR = "pir.maxHitsPerSelector";
public static final String NUMCOLMULTPARTITIONS = "pir.numColMultPartitions";
public static final String USEMODEXPJOIN = "pir.useModExpJoin";
public static final String COLMULTREDUCEBYKEY = "pir.colMultReduceByKey";
- public static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
+ static final String NUMREDUCETASKS = "pir.numReduceTasks";
+ static final String MAPMEMORY = "mapreduce.map.memory.mb";
+ static final String REDUCEMEMORY = "mapreduce.reduce.memory.mb";
+ static final String MAPJAVAOPTS = "mapreduce.map.java.opts";
+ static final String REDUCEJAVAOPTS = "mapreduce.reduce.java.opts";
+ static final String USEHDFSLOOKUPTABLE = "pir.useHDFSLookupTable";
+ static final String NUMDATAPARTITIONS = "pir.numDataPartitions";
+ static final String ALLOWEMBEDDEDQUERYSCHEMAS = "pir.allowEmbeddedQuerySchemas";
static final List<String> PROPSLIST = Arrays.asList(PLATFORM, QUERYINPUT, DATAINPUTFORMAT, INPUTDATA, BASEQUERY, ESRESOURCE, ESQUERY, OUTPUTFILE,
BASEINPUTFORMAT, STOPLISTFILE, NUMREDUCETASKS, USELOCALCACHE, LIMITHITSPERSELECTOR, MAXHITSPERSELECTOR, MAPMEMORY, REDUCEMEMORY, MAPJAVAOPTS,
@@ -75,7 +75,7 @@
* Validates the responder properties
*
*/
- public static boolean validateResponderProperties()
+ static boolean validateResponderProperties()
{
boolean valid = true;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
index c9ed966..1399fd7 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/mapreduce/HashSelectorsAndPartitionDataMapper.java
@@ -19,7 +19,6 @@
package org.apache.pirk.responder.wideskies.mapreduce;
import java.io.IOException;
-import java.util.HashSet;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
@@ -42,7 +41,6 @@
import org.apache.pirk.utils.SystemConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Tuple2;
/**
@@ -58,9 +56,6 @@
private IntWritable keyOut = null;
- HashSet<String> stopList = null;
-
- private Query query = null;
private QueryInfo queryInfo = null;
private QuerySchema qSchema = null;
private DataSchema dSchema = null;
@@ -79,7 +74,7 @@
// Can make this so that it reads multiple queries at one time...
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
- query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
+ Query query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
queryInfo = query.getQueryInfo();
try
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
index 1345fe5..11473a0 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/Accumulators.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *******************************************************************************/
+ */
package org.apache.pirk.responder.wideskies.spark;
import java.io.Serializable;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
index 3124a3f..2050643 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ComputeResponse.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.math.BigInteger;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -64,7 +63,7 @@
* <p>
* - Even if rdd.count() calls are embedded in logger.debug statements, they are computed by Spark. Thus, they are commented out in the code below - uncomment
* for rdd.count() debug
- *
+ *
*/
public class ComputeResponse
{
@@ -316,7 +315,7 @@
/**
* Method to perform the query given an input RDD of MapWritables
- *
+ *
*/
public void performQuery(JavaRDD<MapWritable> inputRDD) throws PIRException
{
@@ -332,7 +331,7 @@
// Extract the selectors for each dataElement based upon the query type
// and perform a keyed hash of the selectors
- JavaPairRDD<Integer,List<BigInteger>> selectorHashToDocRDD = inputRDD.mapToPair(new HashSelectorsAndPartitionData(accum, bVars));
+ JavaPairRDD<Integer,List<BigInteger>> selectorHashToDocRDD = inputRDD.mapToPair(new HashSelectorsAndPartitionData(bVars));
// Group by hashed selector (row) -- can combine with the line above, separating for testing and benchmarking...
JavaPairRDD<Integer,Iterable<List<BigInteger>>> selectorGroupRDD = selectorHashToDocRDD.groupByKey();
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
index 04f8cc2..f298ffe 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
@@ -31,7 +31,6 @@
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Tuple2;
/**
@@ -95,16 +94,10 @@
ComputeEncryptedRow.loadCacheFromHDFS(fs, query.getExpFile(rowIndex), query);
}
- // logger.debug("Encrypting row = " + rowIndex);
- // long startTime = System.currentTimeMillis();
-
// Compute the encrypted row elements for a query from extracted data partitions
List<Tuple2<Long,BigInteger>> encRowValues = ComputeEncryptedRow.computeEncRowBI(hashDocTuple._2, query, rowIndex, limitHitsPerSelector,
maxHitsPerSelector, useLocalCache);
- // long endTime = System.currentTimeMillis();
- // logger.debug("Completed encrypting row = " + rowIndex + " duration = " + (endTime-startTime));
-
returnPairs.addAll(encRowValues);
return returnPairs;
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
index 39d23ce..8d19c3f 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
@@ -20,33 +20,28 @@
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.List;
import org.apache.pirk.encryption.ModPowAbstraction;
import org.apache.pirk.query.wideskies.Query;
import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import scala.Tuple2;
/**
* Class to generate the query element modular exponentiations
* <p>
- *
+ *
*/
public class ExpTableGenerator implements PairFlatMapFunction<Integer,Integer,Tuple2<Integer,BigInteger>>
{
private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory.getLogger(ExpTableGenerator.class);
-
Query query = null;
private BigInteger NSquared = null;
private int maxValue = 0;
public ExpTableGenerator(BroadcastVars bbVarsIn)
{
-
query = bbVarsIn.getQuery();
NSquared = query.getNSquared();
@@ -58,7 +53,7 @@
public Iterable<Tuple2<Integer,Tuple2<Integer,BigInteger>>> call(Integer queryHashKey) throws Exception
{
// queryHashKey -> <<power>,<element^power mod N^2>>
- ArrayList<Tuple2<Integer,Tuple2<Integer,BigInteger>>> modExp = new ArrayList<>();
+ List<Tuple2<Integer,Tuple2<Integer,BigInteger>>> modExp = new ArrayList<>();
BigInteger element = query.getQueryElement(queryHashKey);
for (int i = 0; i <= maxValue; ++i)
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java
index f426aad..a696778 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/HashSelectorsAndPartitionData.java
@@ -19,7 +19,6 @@
package org.apache.pirk.responder.wideskies.spark;
import java.math.BigInteger;
-import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.MapWritable;
@@ -30,7 +29,6 @@
import org.apache.spark.api.java.function.PairFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Tuple2;
/**
@@ -48,7 +46,7 @@
private QuerySchema qSchema = null;
private DataSchema dSchema = null;
- public HashSelectorsAndPartitionData(Accumulators accumIn, BroadcastVars bvIn)
+ public HashSelectorsAndPartitionData(BroadcastVars bvIn)
{
queryInfo = bvIn.getQueryInfo();
qSchema = bvIn.getQuerySchema();
diff --git a/src/main/java/org/apache/pirk/response/wideskies/Response.java b/src/main/java/org/apache/pirk/response/wideskies/Response.java
index 667b8a3..e3fdad1 100644
--- a/src/main/java/org/apache/pirk/response/wideskies/Response.java
+++ b/src/main/java/org/apache/pirk/response/wideskies/Response.java
@@ -24,8 +24,6 @@
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.serialization.Storable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Class to hold the encrypted response elements for the PIR query
@@ -37,8 +35,6 @@
{
private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory.getLogger(Response.class);
-
private QueryInfo queryInfo = null; // holds all query info
private TreeMap<Integer,BigInteger> responseElements = null; // encrypted response columns, colNum -> column
@@ -46,7 +42,6 @@
public Response(QueryInfo queryInfoInput)
{
queryInfo = queryInfoInput;
-
responseElements = new TreeMap<>();
}
diff --git a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
index dc07787..a51e7b6 100644
--- a/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/data/DataSchemaLoader.java
@@ -24,7 +24,7 @@
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashSet;
-
+import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
@@ -71,7 +71,7 @@
{
private static final Logger logger = LoggerFactory.getLogger(DataSchemaLoader.class);
- private static HashSet<String> allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT,
+ private static Set<String> allowedPrimitiveJavaTypes = new HashSet<>(Arrays.asList(PrimitiveTypePartitioner.BYTE, PrimitiveTypePartitioner.SHORT,
PrimitiveTypePartitioner.INT, PrimitiveTypePartitioner.LONG, PrimitiveTypePartitioner.FLOAT, PrimitiveTypePartitioner.DOUBLE,
PrimitiveTypePartitioner.CHAR, PrimitiveTypePartitioner.STRING));
@@ -211,8 +211,8 @@
* @param stream
* The input stream.
* @return A {@link Document} representing the XML document.
- * @throws IOException
- * @throws PIRException
+ * @throws IOException - Failed to read schema file
+ * @throws PIRException - Schema description is invalid
*/
private Document parseXMLDocument(InputStream stream) throws IOException, PIRException
{
@@ -238,7 +238,7 @@
* A data schema element node.
* @param schema
* The data schema
- * @throws PIRException
+ * @throws PIRException - Schema description is invalid
*/
private void extractElementNode(Element eElement, DataSchema schema) throws PIRException
{
@@ -293,9 +293,9 @@
*
* @param typeName
* The type name to check.
- * @throws PIRException
+ * @throws PIRException -
*/
- void validateIsPrimitiveType(String typeName) throws PIRException
+ private void validateIsPrimitiveType(String typeName) throws PIRException
{
if (!allowedPrimitiveJavaTypes.contains(typeName.toLowerCase()))
{
@@ -311,9 +311,9 @@
* @param partitionerTypeName
* The name of the {@link DataPartitioner} subclass to instantiate.
* @return An instance of the named {@link DataPartitioner} subclass.
- * @throws PIRException
+ * @throws PIRException -
*/
- DataPartitioner instantiatePartitioner(String partitionerTypeName) throws PIRException
+ private DataPartitioner instantiatePartitioner(String partitionerTypeName) throws PIRException
{
Object obj;
try
diff --git a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
index 2d4c6b5..94e6ff2 100644
--- a/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
+++ b/src/main/java/org/apache/pirk/schema/query/QuerySchemaLoader.java
@@ -270,8 +270,8 @@
* @param stream
* The input stream.
* @return A Document representing the XML document.
- * @throws IOException
- * @throws PIRException
+ * @throws IOException - failed to read input
+ * @throws PIRException - file could not be parsed
*/
private Document parseXMLDocument(InputStream stream) throws IOException, PIRException
{
@@ -296,7 +296,7 @@
* @param doc
* An XML document specifying names upon which we will filter the query.
* @return The set of names upon which we will filter the query.
- * @throws PIRException
+ * @throws PIRException - Filter lists not found
*/
private Set<String> extractFilteredElementNames(Document doc) throws PIRException
{
@@ -338,7 +338,7 @@
* @param tagName
* The name of the tag we wish to extract from the {@code doc}
* @return The text content of the tag.
- * @throws PIRException
+ * @throws PIRException - XML Document is Empty
*/
private String extractValue(Document doc, String tagName) throws PIRException
{
@@ -360,8 +360,8 @@
* @param filteredElementNames
* The set of names of elements of the data schema up which the filter will act.
* @return An instantiation of the filter, set up to filter upon the specified names.
- * @throws IOException
- * @throws PIRException
+ * @throws IOException - failed to read input
+ * @throws PIRException - File could not be instantiated
*/
private DataFilter instantiateFilter(String filterTypeName, Set<String> filteredElementNames) throws IOException, PIRException
{
diff --git a/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java b/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java
index c68d300..628a7e5 100644
--- a/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java
+++ b/src/main/java/org/apache/pirk/schema/query/filter/StopListFilter.java
@@ -27,8 +27,6 @@
import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.utils.StopListUtils;
import org.elasticsearch.hadoop.mr.WritableArrayWritable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Filter class to filter data elements based upon a stoplist applied to specified field elements
@@ -37,8 +35,6 @@
{
private static final long serialVersionUID = 1L;
- private static final Logger logger = LoggerFactory.getLogger(StopListFilter.class);
-
private Set<String> filterSet = null;
private Set<String> stopList = null;
@@ -68,12 +64,15 @@
elementArray = Arrays.asList(((ArrayWritable) dataElement.get(dSchema.getTextName(filterName))).toStrings());
}
- for (String element : elementArray)
+ if (elementArray != null && elementArray.size() > 0)
{
- passFilter = StopListUtils.checkElement(element, stopList);
- if (!passFilter)
+ for (String element : elementArray)
{
- break;
+ passFilter = StopListUtils.checkElement(element, stopList);
+ if (!passFilter)
+ {
+ break;
+ }
}
}
}
diff --git a/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java b/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java
index 488c51c..d7dd239 100644
--- a/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java
+++ b/src/main/java/org/apache/pirk/schema/response/QueryResponseJSON.java
@@ -20,11 +20,8 @@
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map.Entry;
import java.util.Set;
-import org.apache.hadoop.io.Text;
import org.apache.pirk.query.wideskies.QueryInfo;
import org.apache.pirk.schema.data.DataSchema;
import org.apache.pirk.schema.data.DataSchemaRegistry;
@@ -53,13 +50,10 @@
private QueryInfo queryInfo = null;
public static final String EVENT_TYPE = "event_type"; // notification type the matched the record
- public static final Text EVENT_TYPE_TEXT = new Text(EVENT_TYPE);
public static final String QUERY_ID = "query_id"; // query ID that generated the notification
- public static final Text QUERY_ID_TEXT = new Text(QUERY_ID);
public static final String SELECTOR = "match"; // tag for selector that generated the hit
- public static final Text SELECTOR_TEXT = new Text(SELECTOR);
/**
* Constructor with data schema checking
@@ -117,22 +111,6 @@
return queryInfo;
}
- // Create empty JSON object based on the DataSchema
- @SuppressWarnings("unchecked")
- private void initialize()
- {
- Set<String> schemaStringRep = dSchema.getNonArrayElements();
- for (String key : schemaStringRep)
- {
- jsonObj.put(key, "");
- }
- Set<String> schemaListRep = dSchema.getArrayElements();
- for (String key : schemaListRep)
- {
- jsonObj.put(key, new ArrayList<>());
- }
- }
-
/**
* Add a <key,value> pair to the response object; checks the data schema if this QueryResponseJSON object was instantiated with schema checking (with a
* QueryInfo object)
@@ -187,14 +165,6 @@
jsonObj.put(SELECTOR, val);
}
- public void setAllFields(HashMap<String,String> dataMap)
- {
- for (Entry<String,String> entry : dataMap.entrySet())
- {
- setMapping(entry.getKey(), entry.getValue());
- }
- }
-
/**
* Method to set the common query response fields
*/