PIRK-80 Upgrade to Spark 2.0.0 - closes apache/incubator-pirk#113
diff --git a/pom.xml b/pom.xml
index 80e9a2b..88dff10 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,7 @@
<junit.version>4.12</junit.version>
<log4j.configuration>log4j2.xml</log4j.configuration>
<hadoop.version>2.7.3</hadoop.version>
- <spark.version>1.6.1</spark.version>
+ <spark.version>2.0.0</spark.version>
<elasticsearch.version>2.3.4</elasticsearch.version>
<storm.version>1.0.1</storm.version>
<kafka.version>0.9.0.1</kafka.version>
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
index f5a591e..e0b3b25 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalc.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -71,7 +72,7 @@
}
@Override
- public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Iterable<List<BigInteger>>> hashDocTuple) throws Exception
+ public Iterator<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Iterable<List<BigInteger>>> hashDocTuple) throws Exception
{
List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
@@ -98,6 +99,6 @@
returnPairs.addAll(encRowValues);
- return returnPairs;
+ return returnPairs.iterator();
}
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
index 8147ff6..5c1de76 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/EncRowCalcPrecomputedCache.java
@@ -21,6 +21,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import org.apache.pirk.query.wideskies.Query;
@@ -65,7 +66,7 @@
}
@Override
- public Iterable<Tuple2<Long,BigInteger>> call(Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>> hashDocTuple)
+ public Iterator<Tuple2<Long, BigInteger>> call(Tuple2<Integer,Tuple2<Iterable<Tuple2<Integer,BigInteger>>,Iterable<List<BigInteger>>>> hashDocTuple)
throws Exception
{
List<Tuple2<Long,BigInteger>> returnPairs = new ArrayList<>();
@@ -95,6 +96,6 @@
returnPairs.addAll(encRowValues);
- return returnPairs;
+ return returnPairs.iterator();
}
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
index 09b1e52..03c80f4 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpKeyFilenameMap.java
@@ -52,7 +52,7 @@
}
@Override
- public Iterable<Tuple2<Integer,String>> call(Iterator<Tuple2<Integer,Iterable<Tuple2<Integer,BigInteger>>>> iter) throws Exception
+ public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<Integer,Iterable<Tuple2<Integer,BigInteger>>>> iter) throws Exception
{
List<Tuple2<Integer,String>> keyFileList = new ArrayList<>();
@@ -87,6 +87,6 @@
}
bw.close();
- return keyFileList;
+ return keyFileList.iterator();
}
}
diff --git a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
index 4c54935..d14e36a 100644
--- a/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
+++ b/src/main/java/org/apache/pirk/responder/wideskies/spark/ExpTableGenerator.java
@@ -20,6 +20,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.apache.pirk.encryption.ModPowAbstraction;
@@ -51,7 +52,7 @@
}
@Override
- public Iterable<Tuple2<Integer,Tuple2<Integer,BigInteger>>> call(Integer queryHashKey) throws Exception
+ public Iterator<Tuple2<Integer, Tuple2<Integer, BigInteger>>> call(Integer queryHashKey) throws Exception
{
// queryHashKey -> <<power>,<element^power mod N^2>>
List<Tuple2<Integer,Tuple2<Integer,BigInteger>>> modExp = new ArrayList<>();
@@ -64,6 +65,6 @@
modExp.add(new Tuple2<>(queryHashKey, modPowTuple));
}
- return modExp;
+ return modExp.iterator();
}
}