Merge pull request #162 from jiayuasu/master

Urgent patch: Add an adapter function to convert Join result to DataFrame
diff --git a/README.md b/README.md
index 5c0bcb7..268bed9 100644
--- a/README.md
+++ b/README.md
@@ -24,13 +24,14 @@
 ##  Version release notes: [click here](https://github.com/DataSystemsLab/GeoSpark/wiki/GeoSpark-Full-Version-Release-notes)
 
 ## News!
+* GeoSpark 0.9.1 is released: This is an urgent patch for 0.9.0 which provides an adapter to convert JoinQuery result to DataFrame. [GeoSpark SQL Maven Central coordinate](https://github.com/DataSystemsLab/GeoSpark/wiki/GeoSparkSQL-Maven-Central-Coordinates)
 * GeoSpark 0.9.0 is released (more details in [Release notes](https://github.com/DataSystemsLab/GeoSpark/wiki/GeoSpark-Full-Version-Release-notes))
 	* much less memory consumption powered by GeoSpark customized serializer
 	* much faster spatial/distance join speed
 	* SpatialRDD that supports heterogenous geometries
 	* range, join, knn queries on heterogenous geometries
 	* Add KDB-Tree spatial partitioning
-	* Create SpatialRDD from DataFrame / Create DataFrame from SpatialRDD (requires GeoSparkSQL) ([Scala example](https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala)[Java example](https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java))
+	* Create SpatialRDD from DataFrame / Create DataFrame from SpatialRDD (requires GeoSparkSQL) ([Scala example](https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala),[Java example](https://github.com/DataSystemsLab/GeoSpark/blob/master/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java))
 * Welcome GeoSpark new contributor, Masha Basmanova (@mbasmanova) from Facebook. Masha has contributed more than 10 PRs to GeoSpark on refactoring GeoSpark architecture and improving GeoSpark join performance!
 * Welcome GeoSpark new contributor, Zongsi Zhang (@zongsizhang) from Arizona State University. Zongsi participated the design of GeoSpark Shapefile parser and he has done a great job!
 
diff --git a/core/pom.xml b/core/pom.xml
index a10b3c9..48590fe 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -3,7 +3,7 @@
 	<modelVersion>4.0.0</modelVersion>
 	<groupId>org.datasyslab</groupId>
 	<artifactId>geospark</artifactId>
-	<version>0.9.0</version>
+	<version>0.9.1</version>
 
 	<name>${project.groupId}:${project.artifactId}</name>
 	<description>Geospatial extension for Apache Spark</description>
diff --git a/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java b/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
index 0df97d8..a1d7a0f 100644
--- a/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
+++ b/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
@@ -262,7 +262,6 @@
      * 
      * Duplicates present in the input RDDs will be reflected in the join results.
      *
-     * @param <U> Type of the geometries in queryRDD/circleRDD set
      * @param <T> Type of the geometries in spatialRDD set
      * @param spatialRDD Set of geometries
      * @param queryRDD Set of geometries
@@ -271,12 +270,12 @@
      * @return RDD of pairs of matching geometries
      * @throws Exception the exception
      */
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+    public static <T extends Geometry> JavaPairRDD<Geometry, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
         final JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection, false);
         return distanceJoin(spatialRDD, queryRDD, joinParams);
     }
 
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+    public static <T extends Geometry> JavaPairRDD<Geometry, T> DistanceJoinQueryFlat(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
         return distanceJoin(spatialRDD, queryRDD, joinParams);
     }
 
@@ -296,7 +295,6 @@
      * Because the results are reported as a HashSet, any duplicates in the original spatialRDD will
      * be eliminated.
      *
-     * @param <U> Type of the geometries in queryRDD/circleRDD set
      * @param <T> Type of the geometries in spatialRDD set
      * @param spatialRDD Set of geometries
      * @param queryRDD Set of geometries
@@ -305,21 +303,20 @@
      * @return RDD of pairs where each pair contains a geometry and a set of matching geometries
      * @throws Exception the exception
      */
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, HashSet<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+    public static <T extends Geometry> JavaPairRDD<Geometry, HashSet<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
         final JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection, false);
-        JavaPairRDD<U,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+        JavaPairRDD<Geometry,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
         return collectGeometriesByKey(joinResults);
     }
 
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, HashSet<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
-        JavaPairRDD<U,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+    public static <T extends Geometry> JavaPairRDD<Geometry, HashSet<T>> DistanceJoinQuery(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+        JavaPairRDD<Geometry,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
         return collectGeometriesByKey(joinResults);
     }
 
     /**
      * A faster version of {@link #DistanceJoinQuery(SpatialRDD, CircleRDD, boolean, boolean)} which may produce duplicate results.
      *
-     * @param <U> Type of the geometries in queryRDD/circleRDD set
      * @param <T> Type of the geometries in spatialRDD set
      * @param spatialRDD Set of geometries
      * @param queryRDD Set of geometries
@@ -328,14 +325,14 @@
      * @return RDD of pairs where each pair contains a geometry and a set of matching geometries
      * @throws Exception the exception
      */
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, HashSet<T>> DistanceJoinQueryWithDuplicates(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+    public static <T extends Geometry> JavaPairRDD<Geometry, HashSet<T>> DistanceJoinQueryWithDuplicates(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
         final JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection, true);
-        JavaPairRDD<U,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+        JavaPairRDD<Geometry,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
         return collectGeometriesByKey(joinResults);
     }
 
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, HashSet<T>> DistanceJoinQueryWithDuplicates(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
-        JavaPairRDD<U,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+    public static <T extends Geometry> JavaPairRDD<Geometry, HashSet<T>> DistanceJoinQueryWithDuplicates(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+        JavaPairRDD<Geometry,T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
         return collectGeometriesByKey(joinResults);
     }
 
@@ -344,7 +341,6 @@
      * 
      * Duplicates present in the input RDDs will be reflected in the join results.
      *
-     * @param <U> Type of the geometries in queryRDD/circleRDD set
      * @param <T> Type of the geometries in spatialRDD set
      * @param spatialRDD Set of geometries
      * @param queryRDD Set of geometries
@@ -353,14 +349,14 @@
      * @return the result of {@link #DistanceJoinQueryFlat(SpatialRDD, CircleRDD, boolean, boolean)}, but in this pair RDD, each pair contains a geometry and the count of matching geometries
      * @throws Exception the exception
      */
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+    public static <T extends Geometry> JavaPairRDD<Geometry, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
         final JoinParams joinParams = new JoinParams(useIndex, considerBoundaryIntersection, false);
-        final JavaPairRDD<U, T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+        final JavaPairRDD<Geometry, T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
         return countGeometriesByKey(joinResults);
     }
 
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD,CircleRDD queryRDD, JoinParams joinParams) throws Exception {
-        final JavaPairRDD<U, T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
+    public static <T extends Geometry> JavaPairRDD<Geometry, Long> DistanceJoinQueryCountByKey(SpatialRDD<T> spatialRDD,CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+        final JavaPairRDD<Geometry, T> joinResults = distanceJoin(spatialRDD, queryRDD, joinParams);
         return countGeometriesByKey(joinResults);
     }
 
@@ -369,12 +365,12 @@
      *     Note: INTERNAL FUNCTION. API COMPATIBILITY IS NOT GUARANTEED. DO NOT USE IF YOU DON'T KNOW WHAT IT IS.
      * </p>
      */
-    public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> distanceJoin(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
+    public static <T extends Geometry> JavaPairRDD<Geometry, T> distanceJoin(SpatialRDD<T> spatialRDD, CircleRDD queryRDD, JoinParams joinParams) throws Exception {
         JavaPairRDD<Circle,T> joinResults = spatialJoin(queryRDD, spatialRDD, joinParams);
-        return joinResults.mapToPair(new PairFunction<Tuple2<Circle, T>, U, T>() {
+        return joinResults.mapToPair(new PairFunction<Tuple2<Circle, T>, Geometry, T>() {
             @Override
-            public Tuple2<U, T> call(Tuple2<Circle, T> circleTTuple2) throws Exception {
-                return new Tuple2<U,T>((U) circleTTuple2._1().getCenterGeometry(),circleTTuple2._2());
+            public Tuple2<Geometry, T> call(Tuple2<Circle, T> circleTTuple2) throws Exception {
+                return new Tuple2<Geometry,T>((Geometry) circleTTuple2._1().getCenterGeometry(),circleTTuple2._2());
             }
         });
     }
diff --git a/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala b/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
index d7179ba..4923724 100644
--- a/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
+++ b/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
@@ -53,7 +53,7 @@
 		val geometryFactory=new GeometryFactory()
 		val kNNQueryPoint=geometryFactory.createPoint(new Coordinate(-84.01, 34.01))
 		val rangeQueryWindow=new Envelope (-90.01,-80.01,30.01,40.01)
-		val joinQueryPartitioningType = GridType.RTREE
+		val joinQueryPartitioningType = GridType.QUADTREE
 		val eachQueryLoopTimes=1
 
 		it("should pass spatial range query") {
@@ -164,7 +164,7 @@
 			val objectRDD = new PointRDD(sc, PointRDDInputLocation, PointRDDOffset, PointRDDSplitter, true, StorageLevel.MEMORY_ONLY)
 			val queryWindowRDD = new CircleRDD(objectRDD,0.1)
 
-			objectRDD.spatialPartitioning(GridType.RTREE)
+			objectRDD.spatialPartitioning(GridType.QUADTREE)
 			queryWindowRDD.spatialPartitioning(objectRDD.getPartitioner)
 
 			for(i <- 1 to eachQueryLoopTimes)
@@ -177,7 +177,7 @@
 			val objectRDD = new PointRDD(sc, PointRDDInputLocation, PointRDDOffset, PointRDDSplitter, true, StorageLevel.MEMORY_ONLY)
 			val queryWindowRDD = new CircleRDD(objectRDD,0.1)
 
-			objectRDD.spatialPartitioning(GridType.RTREE)
+			objectRDD.spatialPartitioning(GridType.QUADTREE)
 			queryWindowRDD.spatialPartitioning(objectRDD.getPartitioner)
 
 			objectRDD.buildIndex(IndexType.RTREE,true)
diff --git a/sql/pom.xml b/sql/pom.xml
index 8fe9ddc..23d86f0 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -3,7 +3,7 @@
 	<modelVersion>4.0.0</modelVersion>
 	<groupId>org.datasyslab</groupId>
 	<artifactId>geospark-sql</artifactId>
-	<version>0.9.0</version>
+	<version>0.9.1</version>
 
 	<name>${project.groupId}:${project.artifactId}</name>
 	<description>SQL Extension of GeoSpark</description>
@@ -51,7 +51,7 @@
             <artifactId>geospark</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
-        </dependency>        
+        </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.11</artifactId>
diff --git a/sql/src/main/scala/org/apache/spark/sql/geosparksql/GeometryWrapper.scala b/sql/src/main/scala/org/apache/spark/sql/geosparksql/GeometryWrapper.scala
index b63c130..bc51232 100644
--- a/sql/src/main/scala/org/apache/spark/sql/geosparksql/GeometryWrapper.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/geosparksql/GeometryWrapper.scala
@@ -14,7 +14,7 @@
   def this(geometryString: String, fileDataSplitter: FileDataSplitter, geometryType: GeometryType)
   {
     this()
-    var formatMapper = new FormatMapper(fileDataSplitter, true, geometryType)
+    var formatMapper = new FormatMapper(fileDataSplitter, false, geometryType)
     this.geometry = formatMapper.readGeometry(geometryString)
   }
 
diff --git a/sql/src/main/scala/org/datasyslab/geosparksql/utils/Adapter.scala b/sql/src/main/scala/org/datasyslab/geosparksql/utils/Adapter.scala
index 87621dc..4b38593 100644
--- a/sql/src/main/scala/org/datasyslab/geosparksql/utils/Adapter.scala
+++ b/sql/src/main/scala/org/datasyslab/geosparksql/utils/Adapter.scala
@@ -27,7 +27,7 @@
 
 
 import com.vividsolutions.jts.geom.Geometry
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.geosparksql.GeometryWrapper
 import org.apache.spark.sql.types._
@@ -56,6 +56,22 @@
     return sparkSession.createDataFrame(rowRdd, schema)
   }
 
+  def toDf(spatialPairRDD: JavaPairRDD[Geometry, Geometry], sparkSession: SparkSession): DataFrame =
+  {
+    val rowRdd = spatialPairRDD.rdd.map[Row](f =>
+    {
+      val seq1 = f._1.toString.split("\t").toSeq
+      val seq2 = f._2.toString.split("\t").toSeq
+      val result = seq1++seq2
+      Row.fromSeq(result)
+    })
+    var fieldArray = new Array[StructField](rowRdd.take(1)(0).size)
+    fieldArray(0) = StructField("windowrddshape", StringType)
+    for (i <- 1 to fieldArray.length-1) fieldArray(i) = StructField("_c"+i, StringType)
+    val schema = StructType(fieldArray)
+    return sparkSession.createDataFrame(rowRdd, schema)
+  }
+
   /*
    * Since UserDefinedType is hidden from users. We cannot directly return spatialRDD to spatialDf.
    * Let's wait for Spark side's change
diff --git a/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java b/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java
index db163ad..1257d38 100644
--- a/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java
+++ b/sql/src/test/java/org/datasyslab/geosparksql/readTestJava.java
@@ -4,12 +4,17 @@
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.datasyslab.geospark.enums.GridType;
+import org.datasyslab.geospark.enums.IndexType;
 import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader;
 import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator;
+import org.datasyslab.geospark.spatialOperator.JoinQuery;
+import org.datasyslab.geospark.spatialRDD.CircleRDD;
 import org.datasyslab.geospark.spatialRDD.SpatialRDD;
 import org.datasyslab.geosparksql.UDF.UdfRegistrator;
 import org.datasyslab.geosparksql.utils.Adapter;
@@ -132,6 +137,68 @@
         Adapter.toDf(spatialRDD,sparkSession).show();
     }
 
+    @Test
+    public void testSpatialJoinToDataFrame() throws Exception {
+        UdfRegistrator.registerAll(sparkSession);
+
+        Dataset<Row> pointCsvDf = sparkSession.read().format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation);
+        pointCsvDf.createOrReplaceTempView("pointtable");
+        Dataset<Row> pointDf = sparkSession.sql("select ST_PointWithId(pointtable._c0,pointtable._c1,\"mypointid\") as arealandmark from pointtable");
+        SpatialRDD pointRDD = new SpatialRDD<Geometry>();
+        pointRDD.rawSpatialRDD = Adapter.toJavaRdd(pointDf);
+        pointRDD.analyze();
+
+        Dataset<Row> polygonWktDf = sparkSession.read().format("csv").option("delimiter","\t").option("header","false").load(mixedWktGeometryInputLocation);
+        polygonWktDf.createOrReplaceTempView("polygontable");
+        Dataset<Row> polygonDf = sparkSession.sql("select ST_GeomFromTextWithId(polygontable._c0,\"wkt\", concat(polygontable._c3,'\t',polygontable._c5)) as usacounty from polygontable");
+        SpatialRDD polygonRDD = new SpatialRDD<Geometry>();
+        polygonRDD.rawSpatialRDD = Adapter.toJavaRdd(polygonDf);
+        polygonRDD.analyze();
+
+        pointRDD.spatialPartitioning(GridType.QUADTREE);
+        polygonRDD.spatialPartitioning(pointRDD.getPartitioner());
+
+        pointRDD.buildIndex(IndexType.QUADTREE, true);
+
+        JavaPairRDD joinResultPairRDD = JoinQuery.SpatialJoinQueryFlat(pointRDD, polygonRDD, true, true);
+
+        Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession);
+
+        joinResultDf.show();
+    }
+
+    @Test
+    public void testDistanceJoinToDataFrame() throws Exception {
+        UdfRegistrator.registerAll(sparkSession);
+
+        Dataset<Row> pointCsvDf = sparkSession.read().format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation);
+        pointCsvDf.createOrReplaceTempView("pointtable");
+        Dataset<Row> pointDf = sparkSession.sql("select ST_PointWithId(pointtable._c0,pointtable._c1,\"mypointid\") as arealandmark from pointtable");
+        SpatialRDD pointRDD = new SpatialRDD<Geometry>();
+        pointRDD.rawSpatialRDD = Adapter.toJavaRdd(pointDf);
+        pointRDD.analyze();
+
+        Dataset<Row> polygonWktDf = sparkSession.read().format("csv").option("delimiter","\t").option("header","false").load(mixedWktGeometryInputLocation);
+        polygonWktDf.createOrReplaceTempView("polygontable");
+        Dataset<Row> polygonDf = sparkSession.sql("select ST_GeomFromTextWithId(polygontable._c0,\"wkt\", concat(polygontable._c3,'\t',polygontable._c5)) as usacounty from polygontable");
+        SpatialRDD polygonRDD = new SpatialRDD<Geometry>();
+        polygonRDD.rawSpatialRDD = Adapter.toJavaRdd(polygonDf);
+        polygonRDD.analyze();
+
+        CircleRDD circleRDD = new CircleRDD(polygonRDD, 0.2);
+
+        pointRDD.spatialPartitioning(GridType.QUADTREE);
+        circleRDD.spatialPartitioning(pointRDD.getPartitioner());
+
+        pointRDD.buildIndex(IndexType.QUADTREE, true);
+
+        JavaPairRDD joinResultPairRDD = JoinQuery.DistanceJoinQueryFlat(pointRDD, circleRDD, true, true);
+
+        Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession);
+
+        joinResultDf.show();
+    }
+
     /**
      * Tear down.
      */
diff --git a/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala b/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala
index bca1312..3b2e5cc 100644
--- a/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala
+++ b/sql/src/test/scala/org/datasyslab/geosparksql/readTestScala.scala
@@ -4,10 +4,12 @@
 import org.apache.log4j.{Level, Logger}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.SparkSession
+import org.datasyslab.geospark.enums.{GridType, IndexType}
 import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileReader
 import org.datasyslab.geospark.monitoring.GeoSparkListener
 import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator
-import org.datasyslab.geospark.spatialRDD.SpatialRDD
+import org.datasyslab.geospark.spatialOperator.JoinQuery
+import org.datasyslab.geospark.spatialRDD.{CircleRDD, SpatialRDD}
 import org.datasyslab.geosparksql.UDF.UdfRegistrator
 import org.datasyslab.geosparksql.utils.Adapter
 import org.scalatest.{BeforeAndAfterAll, FunSpec}
@@ -120,5 +122,62 @@
       Adapter.toDf(spatialRDD,sparkSession).show()
     }
 
+    it("Convert spatial join result to DataFrame")
+    {
+      UdfRegistrator.registerAll(sparkSession)
+      var polygonWktDf = sparkSession.read.format("csv").option("delimiter","\t").option("header","false").load(mixedWktGeometryInputLocation)
+      polygonWktDf.createOrReplaceTempView("polygontable")
+      var polygonDf = sparkSession.sql("select ST_GeomFromTextWithId(polygontable._c0,\"wkt\", concat(polygontable._c3,'\t',polygontable._c5)) as usacounty from polygontable")
+      var polygonRDD = new SpatialRDD[Geometry]
+      polygonRDD.rawSpatialRDD = Adapter.toRdd(polygonDf)
+      polygonRDD.analyze()
+
+      var pointCsvDF = sparkSession.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+      pointCsvDF.createOrReplaceTempView("pointtable")
+      var pointDf = sparkSession.sql("select ST_PointWithId(pointtable._c0,pointtable._c1,\"mypointid\") as arealandmark from pointtable")
+      var pointRDD = new SpatialRDD[Geometry]
+      pointRDD.rawSpatialRDD = Adapter.toRdd(pointDf)
+      pointRDD.analyze()
+
+      pointRDD.spatialPartitioning(GridType.QUADTREE)
+      polygonRDD.spatialPartitioning(pointRDD.getPartitioner)
+
+      pointRDD.buildIndex(IndexType.QUADTREE,true)
+
+      var joinResultPairRDD = JoinQuery.SpatialJoinQueryFlat(pointRDD, polygonRDD, true, true)
+
+      var joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession)
+      joinResultDf.show()
+    }
+
+    it("Convert distance join result to DataFrame")
+    {
+      UdfRegistrator.registerAll(sparkSession)
+
+      var pointCsvDF = sparkSession.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+      pointCsvDF.createOrReplaceTempView("pointtable")
+      var pointDf = sparkSession.sql("select ST_PointWithId(pointtable._c0,pointtable._c1,\"mypointid\") as arealandmark from pointtable")
+      var pointRDD = new SpatialRDD[Geometry]
+      pointRDD.rawSpatialRDD = Adapter.toRdd(pointDf)
+      pointRDD.analyze()
+
+      var polygonWktDf = sparkSession.read.format("csv").option("delimiter","\t").option("header","false").load(mixedWktGeometryInputLocation)
+      polygonWktDf.createOrReplaceTempView("polygontable")
+      var polygonDf = sparkSession.sql("select ST_GeomFromTextWithId(polygontable._c0,\"wkt\", concat(polygontable._c3,'\t',polygontable._c5)) as usacounty from polygontable")
+      var polygonRDD = new SpatialRDD[Geometry]
+      polygonRDD.rawSpatialRDD = Adapter.toRdd(polygonDf)
+      polygonRDD.analyze()
+      var circleRDD = new CircleRDD(polygonRDD, 0.2)
+
+      pointRDD.spatialPartitioning(GridType.QUADTREE)
+      circleRDD.spatialPartitioning(pointRDD.getPartitioner)
+
+      pointRDD.buildIndex(IndexType.QUADTREE,true)
+
+      var joinResultPairRDD = JoinQuery.DistanceJoinQueryFlat(pointRDD, circleRDD, true, true)
+
+      var joinResultDf = Adapter.toDf(joinResultPairRDD, sparkSession)
+      joinResultDf.show()
+    }
 	}
 }