Merge pull request #162 from jiayuasu/master
Urgent patch: Add an adapter function to convert Join result to DataFrame
diff --git a/README.md b/README.md
index 5c0bcb7..268bed9 100644
--- a/README.md
+++ b/README.md
@@ -24,13 +24,14 @@
## Version release notes: [click here](https://github.com/DataSystemsLab/GeoSpark/wiki/GeoSpark-Full-Version-Release-notes)
## News!
+* GeoSpark 0.9.1 is released: This is an urgent patch for 0.9.0 which provides an adapter to convert JoinQuery result to DataFrame. [GeoSpark SQL Maven Central coordinate](https://github.com/DataSystemsLab/GeoSpark/wiki/GeoSparkSQL-Maven-Central-Coordinates)
* GeoSpark 0.9.0 is released (more details in [Release notes](https://github.com/DataSystemsLab/GeoSpark/wiki/GeoSpark-Full-Version-Release-notes))
* much less memory consumption powered by GeoSpark customized serializer
* much faster spatial/distance join speed
* SpatialRDD that supports heterogenous geometries
* range, join, knn queries on heterogenous geometries
* Add KDB-Tree spatial partitioning
- * Create SpatialRDD from DataFrame / Create DataFrame from SpatialRDD (requires GeoSparkSQL) ([Scala example](https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala)[Java example](https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java))
+ * Create SpatialRDD from DataFrame / Create DataFrame from SpatialRDD (requires GeoSparkSQL) ([Scala example](https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala),[Java example](https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java))
* Welcome GeoSpark new contributor, Masha Basmanova (@mbasmanova) from Facebook. Masha has contributed more than 10 PRs to GeoSpark on refactoring GeoSpark architecture and improving GeoSpark join performance!
* Welcome GeoSpark new contributor, Zongsi Zhang (@zongsizhang) from Arizona State University. Zongsi participated the design of GeoSpark Shapefile parser and he has done a great job!
diff --git a/core/pom.xml b/core/pom.xml
index a10b3c9..48590fe 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.datasyslab</groupId>
<artifactId>geospark</artifactId>
- <version>0.9.0</version>
+ <version>0.9.1</version>
<name>${project.groupId}:${project.artifactId}</name>
<description>Geospatial extension for Apache Spark</description>
diff --git a/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java b/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
index 0df97d8..a1d7a0f 100644
--- a/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
+++ b/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
@@ -262,7 +262,6 @@
*
* Duplicates present in the input RDDs will be reflected in the join results.
*
- * @param <U> Type of the geometries in queryRDD/circleRDD set
* @param <T> Type of the geometries in spatialRDD set
* @param spatialRDD Set of geometries
* @param queryRDD Set of geometries
@@ -271,12 +270,12 @@
* @return RDD of pairs of matching geometries
* @throws Exception the exception
*/
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+ public static <T extends Geometry> JavaPairRDD<Geometry, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
final JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection, false);
return distanceJoin(spatialRDD, queryRDD, joinParams);
}
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+ public static <T extends Geometry> JavaPairRDD<Geometry, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
return distanceJoin(spatialRDD, queryRDD, joinParams);
}
@@ -296,7 +295,6 @@
* Because the results are reported as a HashSet, any duplicates in the original spatialRDD will
* be eliminated.
*
- * @param <U> Type of the geometries in queryRDD/circleRDD set
* @param <T> Type of the geometries in spatialRDD set
* @param spatialRDD Set of geometries
* @param queryRDD Set of geometries
@@ -305,21 +303,20 @@
* @return RDD of pairs where each pair contains a geometry and a set of matching geometries
* @throws Exception the exception
*/
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, HashSet<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+ public static <T extends Geometry> JavaPairRDD<Geometry, HashSet<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
final JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection, false);
- JavaPairRDD<U,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+ JavaPairRDD<Geometry,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
return collectGeometriesByKey(joinResults);
}
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, HashSet<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
- JavaPairRDD<U,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+ public static <T extends Geometry> JavaPairRDD<Geometry, HashSet<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+ JavaPairRDD<Geometry,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
return collectGeometriesByKey(joinResults);
}
/**
* A faster version of {@link #DistanceJoinQuery(SpatialRDD, CircleRDD, boolean, boolean)} which may produce duplicate results.
*
- * @param <U> Type of the geometries in queryRDD/circleRDD set
* @param <T> Type of the geometries in spatialRDD set
* @param spatialRDD Set of geometries
* @param queryRDD Set of geometries
@@ -328,14 +325,14 @@
* @return RDD of pairs where each pair contains a geometry and a set of matching geometries
* @throws Exception the exception
*/
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, HashSet<T>> DistanceJoinQueryWithDuplicates(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+ public static <T extends Geometry> JavaPairRDD<Geometry, HashSet<T>> DistanceJoinQueryWithDuplicates(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
final JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection, true);
- JavaPairRDD<U,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+ JavaPairRDD<Geometry,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
return collectGeometriesByKey(joinResults);
}
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, HashSet<T>> DistanceJoinQueryWithDuplicates(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
- JavaPairRDD<U,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+ public static <T extends Geometry> JavaPairRDD<Geometry, HashSet<T>> DistanceJoinQueryWithDuplicates(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+ JavaPairRDD<Geometry,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
return collectGeometriesByKey(joinResults);
}
@@ -344,7 +341,6 @@
*
* Duplicates present in the input RDDs will be reflected in the join results.
*
- * @param <U> Type of the geometries in queryRDD/circleRDD set
* @param <T> Type of the geometries in spatialRDD set
* @param spatialRDD Set of geometries
* @param queryRDD Set of geometries
@@ -353,14 +349,14 @@
* @return the result of {@link #DistanceJoinQueryFlat(SpatialRDD, CircleRDD, boolean, boolean)}, but in this pair RDD, each pair contains a geometry and the count of matching geometries
* @throws Exception the exception
*/
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+ public static <T extends Geometry> JavaPairRDD<Geometry, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
final JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection, false);
- final JavaPairRDD<U, T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+ final JavaPairRDD<Geometry, T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
return countGeometriesByKey(joinResults);
}
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD,CircleRDD queryRDD, JoinParams joinParams) throws Exception {
- final JavaPairRDD<U, T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+ public static <T extends Geometry> JavaPairRDD<Geometry, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD,CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+ final JavaPairRDD<Geometry, T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
return countGeometriesByKey(joinResults);
}
@@ -369,12 +365,12 @@
* Note: INTERNAL FUNCTION. API COMPATIBILITY IS NOT GUARANTEED. DO NOT USE IF YOU DON'T KNOW WHAT IT IS.
* </p>
*/
- public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> distanceJoin(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+ public static <T extends Geometry> JavaPairRDD<Geometry, T> distanceJoin(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
JavaPairRDD<Circle,T> joinResults = spatialJoin(queryRDD, spatialRDD, joinParams);
- return joinResults.mapToPair(new PairFunction<Tuple2<Circle, T>, U, T>() {
+ return joinResults.mapToPair(new PairFunction<Tuple2<Circle, T>, Geometry, T>() {
@Override
- public Tuple2<U, T> call(Tuple2<Circle, T> circleTTuple2) throws Exception {
- return new Tuple2<U,T>((U) circleTTuple2._1().getCenterGeometry(),circleTTuple2._2());
+ public Tuple2<Geometry, T> call(Tuple2<Circle, T> circleTTuple2) throws Exception {
+ return new Tuple2<Geometry,T>((Geometry) circleTTuple2._1().getCenterGeometry(),circleTTuple2._2());
}
});
}
diff --git a/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala b/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
index d7179ba..4923724 100644
--- a/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
+++ b/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
@@ -53,7 +53,7 @@
val geometryFactory=new GeometryFactory()
val kNNQueryPoint=geometryFactory.createPoint(new Coordinate(-84.01, 34.01))
val rangeQueryWindow=new Envelope (-90.01,-80.01,30.01,40.01)
- val joinQueryPartitioningType = GridType.RTREE
+ val joinQueryPartitioningType = GridType.QUADTREE
val eachQueryLoopTimes=1
it("should pass spatial range query") {
@@ -164,7 +164,7 @@
val objectRDD = new PointRDD(sc, PointRDDInputLocation, PointRDDOffset, PointRDDSplitter, true, StorageLevel.MEMORY_ONLY)
val queryWindowRDD = new CircleRDD(objectRDD,0.1)
- objectRDD.spatialPartitioning(GridType.RTREE)
+ objectRDD.spatialPartitioning(GridType.QUADTREE)
queryWindowRDD.spatialPartitioning(objectRDD.getPartitioner)
for(i <- 1 to eachQueryLoopTimes)
@@ -177,7 +177,7 @@
val objectRDD = new PointRDD(sc, PointRDDInputLocation, PointRDDOffset, PointRDDSplitter, true, StorageLevel.MEMORY_ONLY)
val queryWindowRDD = new CircleRDD(objectRDD,0.1)
- objectRDD.spatialPartitioning(GridType.RTREE)
+ objectRDD.spatialPartitioning(GridType.QUADTREE)
queryWindowRDD.spatialPartitioning(objectRDD.getPartitioner)
objectRDD.buildIndex(IndexType.RTREE,true)
diff --git a/sql/pom.xml b/sql/pom.xml
index 8fe9ddc..23d86f0 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.datasyslab</groupId>
<artifactId>geospark-sql</artifactId>
- <version>0.9.0</version>
+ <version>0.9.1</version>
<name>${project.groupId}:${project.artifactId}</name>
<description>SQL Extension of GeoSpark</description>
@@ -51,7 +51,7 @@
<artifactId>geospark</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
diff --git a/sql/src/main/scala/org/apache/spark/sql/geosparksql/GeometryWrapper.scala b/sql/src/main/scala/org/apache/spark/sql/geosparksql/GeometryWrapper.scala
index b63c130..bc51232 100644
--- a/sql/src/main/scala/org/apache/spark/sql/geosparksql/GeometryWrapper.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/geosparksql/GeometryWrapper.scala
@@ -14,7 +14,7 @@
def this(geometryString: String, fileDataSplitter: FileDataSplitter, geometryType: GeometryType)
{
this()
- var formatMapper = new FormatMapper(fileDataSplitter, true, geometryType)
+ var formatMapper = new FormatMapper(fileDataSplitter, false, geometryType)
this.geometry = formatMapper.readGeometry(geometryString)
}
diff --git a/sql/src/main/scala/org/datasyslab/geosparksql/utils/Adapter.scala b/sql/src/main/scala/org/datasyslab/geosparksql/utils/Adapter.scala
index 87621dc..4b38593 100644
--- a/sql/src/main/scala/org/datasyslab/geosparksql/utils/Adapter.scala
+++ b/sql/src/main/scala/org/datasyslab/geosparksql/utils/Adapter.scala
@@ -27,7 +27,7 @@
import com.vividsolutions.jts.geom.Geometry
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.geosparksql.GeometryWrapper
import org.apache.spark.sql.types._
@@ -56,6 +56,22 @@
return sparkSession.createDataFrame(rowRdd, schema)
}
+ def toDf(spatialPairRDD: JavaPairRDD[Geometry, Geometry], sparkSession: SparkSession): DataFrame =
+ {
+ val rowRdd = spatialPairRDD.rdd.map[Row](f =>
+ {
+ val seq1 = f._1.toString.split("\t").toSeq
+ val seq2 = f._2.toString.split("\t").toSeq
+ val result = seq1++seq2
+ Row.fromSeq(result)
+ })
+ var fieldArray = new Array[StructField](rowRdd.take(1)(0).size)
+ fieldArray(0) = StructField("windowrddshape", StringType)
+ for (i <- 1 to fieldArray.length-1) fieldArray(i) = StructField("_c"+i, StringType)
+ val schema = StructType(fieldArray)
+ return sparkSession.createDataFrame(rowRdd, schema)
+ }
+
/*
* Since UserDefinedType is hidden from users. We cannot directly return spatialRDD to spatialDf.
* Let's wait for Spark side's change
diff --git a/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java b/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java
index db163ad..1257d38 100644
--- a/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java
+++ b/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java
@@ -4,12 +4,17 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.datasyslab.geospark.enums.GridType;
+import org.datasyslab.geospark.enums.IndexType;
import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader;
import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator;
+import org.datasyslab.geospark.spatialOperator.JoinQuery;
+import org.datasyslab.geospark.spatialRDD.CircleRDD;
import org.datasyslab.geospark.spatialRDD.SpatialRDD;
import org.datasyslab.geosparksql.UDF.UdfRegistrator;
import org.datasyslab.geosparksql.utils.Adapter;
@@ -132,6 +137,68 @@
Adapter.toDf(spatialRDD,sparkSession).show();
}
+ @Test
+ public void testSpatialJoinToDataFrame() throws Exception {
+ UdfRegistrator.registerAll(sparkSession);
+
+ Dataset<Row> pointCsvDf = sparkSession.read().format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation);
+ pointCsvDf.createOrReplaceTempView("pointtable");
+ Dataset<Row> pointDf = sparkSession.sql("select ST_PointWithId(pointtable._c0,pointtable._c1,\"mypointid\") as arealandmark from pointtable");
+ SpatialRDD pointRDD = new SpatialRDD<Geometry>();
+ pointRDD.rawSpatialRDD = Adapter.toJavaRdd(pointDf);
+ pointRDD.analyze();
+
+ Dataset<Row> polygonWktDf = sparkSession.read().format("csv").option("delimiter","\t").option("header","false").load(mixedWktGeometryInputLocation);
+ polygonWktDf.createOrReplaceTempView("polygontable");
+ Dataset<Row> polygonDf = sparkSession.sql("select ST_GeomFromTextWithId(polygontable._c0,\"wkt\", concat(polygontable._c3,'\t',polygontable._c5)) as usacounty from polygontable");
+ SpatialRDD polygonRDD = new SpatialRDD<Geometry>();
+ polygonRDD.rawSpatialRDD = Adapter.toJavaRdd(polygonDf);
+ polygonRDD.analyze();
+
+ pointRDD.spatialPartitioning(GridType.QUADTREE);
+ polygonRDD.spatialPartitioning(pointRDD.getPartitioner());
+
+ pointRDD.buildIndex(IndexType.QUADTREE, true);
+
+ JavaPairRDD joinResultPairRDD = JoinQuery.SpatialJoinQueryFlat(pointRDD, polygonRDD, true, true);
+
+ Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession);
+
+ joinResultDf.show();
+ }
+
+ @Test
+ public void testDistanceJoinToDataFrame() throws Exception {
+ UdfRegistrator.registerAll(sparkSession);
+
+ Dataset<Row> pointCsvDf = sparkSession.read().format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation);
+ pointCsvDf.createOrReplaceTempView("pointtable");
+ Dataset<Row> pointDf = sparkSession.sql("select ST_PointWithId(pointtable._c0,pointtable._c1,\"mypointid\") as arealandmark from pointtable");
+ SpatialRDD pointRDD = new SpatialRDD<Geometry>();
+ pointRDD.rawSpatialRDD = Adapter.toJavaRdd(pointDf);
+ pointRDD.analyze();
+
+ Dataset<Row> polygonWktDf = sparkSession.read().format("csv").option("delimiter","\t").option("header","false").load(mixedWktGeometryInputLocation);
+ polygonWktDf.createOrReplaceTempView("polygontable");
+ Dataset<Row> polygonDf = sparkSession.sql("select ST_GeomFromTextWithId(polygontable._c0,\"wkt\", concat(polygontable._c3,'\t',polygontable._c5)) as usacounty from polygontable");
+ SpatialRDD polygonRDD = new SpatialRDD<Geometry>();
+ polygonRDD.rawSpatialRDD = Adapter.toJavaRdd(polygonDf);
+ polygonRDD.analyze();
+
+ CircleRDD circleRDD = new CircleRDD(polygonRDD, 0.2);
+
+ pointRDD.spatialPartitioning(GridType.QUADTREE);
+ circleRDD.spatialPartitioning(pointRDD.getPartitioner());
+
+ pointRDD.buildIndex(IndexType.QUADTREE, true);
+
+ JavaPairRDD joinResultPairRDD = JoinQuery.DistanceJoinQueryFlat(pointRDD, circleRDD, true, true);
+
+ Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession);
+
+ joinResultDf.show();
+ }
+
/**
* Tear down.
*/
diff --git a/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala b/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala
index bca1312..3b2e5cc 100644
--- a/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala
+++ b/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala
@@ -4,10 +4,12 @@
import org.apache.log4j.{Level, Logger}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
+import org.datasyslab.geospark.enums.{GridType, IndexType}
import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader
import org.datasyslab.geospark.monitoring.GeoSparkListener
import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator
-import org.datasyslab.geospark.spatialRDD.SpatialRDD
+import org.datasyslab.geospark.spatialOperator.JoinQuery
+import org.datasyslab.geospark.spatialRDD.{CircleRDD, SpatialRDD}
import org.datasyslab.geosparksql.UDF.UdfRegistrator
import org.datasyslab.geosparksql.utils.Adapter
import org.scalatest.{BeforeAndAfterAll, FunSpec}
@@ -120,5 +122,62 @@
Adapter.toDf(spatialRDD,sparkSession).show()
}
+ it("Convert spatial join result to DataFrame")
+ {
+ UdfRegistrator.registerAll(sparkSession)
+ var polygonWktDf = sparkSession.read.format("csv").option("delimiter","\t").option("header","false").load(mixedWktGeometryInputLocation)
+ polygonWktDf.createOrReplaceTempView("polygontable")
+ var polygonDf = sparkSession.sql("select ST_GeomFromTextWithId(polygontable._c0,\"wkt\", concat(polygontable._c3,'\t',polygontable._c5)) as usacounty from polygontable")
+ var polygonRDD = new SpatialRDD[Geometry]
+ polygonRDD.rawSpatialRDD = Adapter.toRdd(polygonDf)
+ polygonRDD.analyze()
+
+ var pointCsvDF = sparkSession.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ pointCsvDF.createOrReplaceTempView("pointtable")
+ var pointDf = sparkSession.sql("select ST_PointWithId(pointtable._c0,pointtable._c1,\"mypointid\") as arealandmark from pointtable")
+ var pointRDD = new SpatialRDD[Geometry]
+ pointRDD.rawSpatialRDD = Adapter.toRdd(pointDf)
+ pointRDD.analyze()
+
+ pointRDD.spatialPartitioning(GridType.QUADTREE)
+ polygonRDD.spatialPartitioning(pointRDD.getPartitioner)
+
+ pointRDD.buildIndex(IndexType.QUADTREE,true)
+
+ var joinResultPairRDD = JoinQuery.SpatialJoinQueryFlat(pointRDD, polygonRDD, true, true)
+
+ var joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession)
+ joinResultDf.show()
+ }
+
+ it("Convert distance join result to DataFrame")
+ {
+ UdfRegistrator.registerAll(sparkSession)
+
+ var pointCsvDF = sparkSession.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ pointCsvDF.createOrReplaceTempView("pointtable")
+ var pointDf = sparkSession.sql("select ST_PointWithId(pointtable._c0,pointtable._c1,\"mypointid\") as arealandmark from pointtable")
+ var pointRDD = new SpatialRDD[Geometry]
+ pointRDD.rawSpatialRDD = Adapter.toRdd(pointDf)
+ pointRDD.analyze()
+
+ var polygonWktDf = sparkSession.read.format("csv").option("delimiter","\t").option("header","false").load(mixedWktGeometryInputLocation)
+ polygonWktDf.createOrReplaceTempView("polygontable")
+ var polygonDf = sparkSession.sql("select ST_GeomFromTextWithId(polygontable._c0,\"wkt\", concat(polygontable._c3,'\t',polygontable._c5)) as usacounty from polygontable")
+ var polygonRDD = new SpatialRDD[Geometry]
+ polygonRDD.rawSpatialRDD = Adapter.toRdd(polygonDf)
+ polygonRDD.analyze()
+ var circleRDD = new CircleRDD(polygonRDD, 0.2)
+
+ pointRDD.spatialPartitioning(GridType.QUADTREE)
+ circleRDD.spatialPartitioning(pointRDD.getPartitioner)
+
+ pointRDD.buildIndex(IndexType.QUADTREE,true)
+
+ var joinResultPairRDD = JoinQuery.DistanceJoinQueryFlat(pointRDD, circleRDD, true, true)
+
+ var joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession)
+ joinResultDf.show()
+ }
}
}