Merge pull request #116 from jiayuasu/0.8

Provide granular control to SpatialRDD sampling utils
diff --git a/core/pom.xml b/core/pom.xml
index 258203d..76ec53b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -3,7 +3,7 @@
 	<modelVersion>4.0.0</modelVersion>
 	<groupId>org.datasyslab</groupId>
 	<artifactId>geospark</artifactId>
-	<version>0.8.1</version>
+	<version>0.8.2</version>
 	
 	<name>${project.groupId}:${project.artifactId}</name>
 	<description>Geospatial extension for Apache Spark</description>
diff --git a/core/src/main/java/org/datasyslab/geospark/spatialRDD/SpatialRDD.java b/core/src/main/java/org/datasyslab/geospark/spatialRDD/SpatialRDD.java
index f1ab5ae..7f30e8c 100644
--- a/core/src/main/java/org/datasyslab/geospark/spatialRDD/SpatialRDD.java
+++ b/core/src/main/java/org/datasyslab/geospark/spatialRDD/SpatialRDD.java
@@ -82,6 +82,22 @@
 
     public StandardQuadTree partitionTree;
 
+    /** The sample number. */
+    public Long sampleNumber = (long) -1;
+    
+	public Long getSampleNumber() {
+		return sampleNumber;
+	}
+
+	/**
+	 * Sets the sample number.
+	 *
+	 * @param sampleNumber the new sample number
+	 */
+	public void setSampleNumber(Long sampleNumber) {
+		this.sampleNumber = sampleNumber;
+	}
+
 	/** The CR stransformation. */
 	protected boolean CRStransformation=false;;
 
@@ -90,7 +106,6 @@
 
 	/** The target epgsg code. */
 	protected String targetEpgsgCode="";
-
 	/**
 	 * CRS transform.
 	 *
@@ -141,7 +156,7 @@
         	throw new Exception("[AbstractSpatialRDD][spatialPartitioning] SpatialRDD total count is unkown. Please call analyze() first.");
         }
 		//Calculate the number of samples we need to take.
-        int sampleNumberOfRecords = RDDSampleUtils.getSampleNumbers(numPartitions, this.approximateTotalCount);
+        int sampleNumberOfRecords = RDDSampleUtils.getSampleNumbers(numPartitions, this.approximateTotalCount, this.sampleNumber);
         //Take Sample
         ArrayList objectSampleList = new ArrayList(this.rawSpatialRDD.takeSample(false, sampleNumberOfRecords));
         //Sort
diff --git a/core/src/main/java/org/datasyslab/geospark/utils/RDDSampleUtils.java b/core/src/main/java/org/datasyslab/geospark/utils/RDDSampleUtils.java
index b347e70..256ce9d 100644
--- a/core/src/main/java/org/datasyslab/geospark/utils/RDDSampleUtils.java
+++ b/core/src/main/java/org/datasyslab/geospark/utils/RDDSampleUtils.java
@@ -12,40 +12,50 @@
  */
 
 public class RDDSampleUtils {
+   
     
     /**
      * Gets the sample numbers.
      *
      * @param numPartitions the num partitions
      * @param totalNumberOfRecords the total number of records
+     * @param givenSampleNumbers the given sample numbers
      * @return the sample numbers
      * @throws Exception the exception
      */
-    public static int getSampleNumbers(Integer numPartitions, long totalNumberOfRecords) throws Exception {
-    	long sampleNumbers;
-    	/*
-    	 * If the input RDD is too small, Geospark will use the entire RDD instead of taking samples.
-    	 */
-    	if(totalNumberOfRecords>=1000)
+    public static int getSampleNumbers(Integer numPartitions, long totalNumberOfRecords, long givenSampleNumbers) throws Exception{
+    	Long sampleNumber = new Long(0);
+
+    	if(givenSampleNumbers>0)
     	{
-    		sampleNumbers = totalNumberOfRecords / 100;
+    		// This means that the user manually specifies the sample number
+    		sampleNumber = givenSampleNumbers;
+    		return sampleNumber.intValue();
     	}
     	else
     	{
-    		sampleNumbers = totalNumberOfRecords;
+    		// Follow GeoSpark internal sampling rule
+        	/*
+        	 * If the input RDD is too small, Geospark will use the entire RDD instead of taking samples.
+        	 */
+        	if(totalNumberOfRecords>=1000)
+        	{
+        		sampleNumber = totalNumberOfRecords / 100;
+        	}
+        	else
+        	{
+        		sampleNumber = totalNumberOfRecords;
+        	}
+        	
+    		if(sampleNumber > Integer.MAX_VALUE) {
+    			sampleNumber = new Long(Integer.MAX_VALUE);
+    		}
+            if(sampleNumber < 2*numPartitions ) {
+                // Partition size is too big. Should throw exception for this.
+                throw new Exception("[RDDSampleUtils][getSampleNumbers] Too many RDD partitions. Call SpatialRDD.setSampleNumber() to manually increase sample or make partitionNum less than "+sampleNumber/2);
+            }
+            return sampleNumber.intValue();
     	}
-    	
-		if(sampleNumbers > Integer.MAX_VALUE) {
-			sampleNumbers = Integer.MAX_VALUE;
-		}
-        int result=(int)sampleNumbers;
-        // Partition size is too big. Should throw exception for this.
-        
-        if(sampleNumbers < 2*numPartitions ) {
-            throw new Exception("[RDDSampleUtils][getSampleNumbers] Too many RDD partitions. Please make this RDD's partitions less than "+sampleNumbers/2);
-        }
-        
-        return result;
 
 	}
 }
diff --git a/core/src/test/java/org/datasyslab/geospark/utils/RDDSampleUtilsTest.java b/core/src/test/java/org/datasyslab/geospark/utils/RDDSampleUtilsTest.java
index 8953625..15d3722 100644
--- a/core/src/test/java/org/datasyslab/geospark/utils/RDDSampleUtilsTest.java
+++ b/core/src/test/java/org/datasyslab/geospark/utils/RDDSampleUtilsTest.java
@@ -29,13 +29,13 @@
      */
     @Test
     public void testGetSampleNumbers() throws Exception {
-        assertEquals(10, RDDSampleUtils.getSampleNumbers(2, 10));
-        assertEquals(100, RDDSampleUtils.getSampleNumbers(2, 100));
-        assertEquals(10, RDDSampleUtils.getSampleNumbers(5, 1000));
-        assertEquals(100, RDDSampleUtils.getSampleNumbers(5, 10000));
-        assertEquals(100, RDDSampleUtils.getSampleNumbers(5, 10001));
-        assertEquals(1000, RDDSampleUtils.getSampleNumbers(5, 100011));
-        assertEquals(1000, RDDSampleUtils.getSampleNumbers(6, 100011));
+        assertEquals(10, RDDSampleUtils.getSampleNumbers(2, 10,-1));
+        assertEquals(100, RDDSampleUtils.getSampleNumbers(2, 100,-1));
+        assertEquals(10, RDDSampleUtils.getSampleNumbers(5, 1000,-1));
+        assertEquals(100, RDDSampleUtils.getSampleNumbers(5, 10000,-1));
+        assertEquals(100, RDDSampleUtils.getSampleNumbers(5, 10001,-1));
+        assertEquals(1000, RDDSampleUtils.getSampleNumbers(5, 100011,-1));
+        assertEquals(99, RDDSampleUtils.getSampleNumbers(6, 100011,99));
     }
     
     /**
@@ -46,8 +46,8 @@
     @Test(expected=Exception.class)
     public void testTooManyPartitions() throws Exception
     {
-        assertEquals(10, RDDSampleUtils.getSampleNumbers(6, 1010));
-        assertEquals(11, RDDSampleUtils.getSampleNumbers(6, 1110));
-        assertEquals(100, RDDSampleUtils.getSampleNumbers(100, 10000));
+        assertEquals(10, RDDSampleUtils.getSampleNumbers(6, 1010,-1));
+        assertEquals(11, RDDSampleUtils.getSampleNumbers(6, 1110,-1));
+        assertEquals(100, RDDSampleUtils.getSampleNumbers(100, 10000,-1));
     }
 }
\ No newline at end of file