Merge pull request #108 from jiayuasu/master

Upload GeoSpark 0.8.1 and Babylon 0.2.2
diff --git a/README.md b/README.md
index 88f3b95..500c5b2 100644
--- a/README.md
+++ b/README.md
@@ -3,8 +3,8 @@
 
 | Status   |      Stable    | Latest | Source code|
 |:----------:|:-------------:|:------:|:------:|
-| GeoSpark |  0.8.0 | [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/geospark/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/geospark) | [![Build Status](https://travis-ci.org/jiayuasu/GeoSpark.svg?branch=master)](https://travis-ci.org/jiayuasu/GeoSpark)[![codecov.io](http://codecov.io/github/jiayuasu/GeoSpark/coverage.svg?branch=master)](http://codecov.io/github/jiayuasu/GeoSpark?branch=master)|
-| [Babylon Viz System](https://github.com/DataSystemsLab/GeoSpark/tree/master/babylon) |   0.2.1  |   [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/babylon/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/babylon) | [![Build Status](https://travis-ci.org/jiayuasu/GeoSpark.svg?branch=master)](https://travis-ci.org/jiayuasu/GeoSpark)[![codecov.io](http://codecov.io/github/jiayuasu/GeoSpark/coverage.svg?branch=master)](http://codecov.io/github/jiayuasu/GeoSpark?branch=master)|
+| GeoSpark |  0.8.1 | [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/geospark/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/geospark) | [![Build Status](https://travis-ci.org/jiayuasu/GeoSpark.svg?branch=master)](https://travis-ci.org/jiayuasu/GeoSpark)[![codecov.io](http://codecov.io/github/jiayuasu/GeoSpark/coverage.svg?branch=master)](http://codecov.io/github/jiayuasu/GeoSpark?branch=master)|
+| [Babylon Viz System](https://github.com/DataSystemsLab/GeoSpark/tree/master/babylon) |   0.2.2  |   [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/babylon/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/babylon) | [![Build Status](https://travis-ci.org/jiayuasu/GeoSpark.svg?branch=master)](https://travis-ci.org/jiayuasu/GeoSpark)[![codecov.io](http://codecov.io/github/jiayuasu/GeoSpark/coverage.svg?branch=master)](http://codecov.io/github/jiayuasu/GeoSpark?branch=master)|
 
 [GeoSpark@Twitter](https://twitter.com/GeoSpark_ASU)||[GeoSpark Discussion Board](https://groups.google.com/forum/#!forum/geospark-discussion-board)||[![Join the chat at https://gitter.im/geospark-datasys/Lobby](https://badges.gitter.im/geospark-datasys/Lobby.svg)](https://gitter.im/geospark-datasys/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
 
@@ -24,7 +24,6 @@
 
 ## News!
 * GeoSpark and Babylon (v0.1.X-0.2.X) Template Project is available here: [Template Project](https://github.com/jiayuasu/GeoSparkTemplateProject)
-* GeoSpark (0.8.0 and later) is able to load and query ESRI ShapeFile (.shp, .shx, .dbf) from local disk and HDFS! ([Scala Example](https://github.com/DataSystemsLab/GeoSpark/tree/master/core/src/main/scala/org/datasyslab/geospark/showcase), [Java Example](https://github.com/DataSystemsLab/GeoSpark/tree/master/core/src/main/java/org/datasyslab/geospark/showcase))
 
 * GeoSpark (0.8.0 and later) provides alternative Spatial RDD constructors to speed up RDD data loading and initlializing. See [Advanced GeoSpark Tutorial](https://github.com/DataSystemsLab/GeoSpark/wiki/Advanced-Tutorial-Tune-your-GeoSpark-Application).
 
@@ -76,6 +75,8 @@
 
 GeoSpark Scala and Java template project is available here: [Template Project](https://github.com/jiayuasu/GeoSparkTemplateProject)
 
+GeoSpark Function Use Cases: [Scala Example](https://github.com/DataSystemsLab/GeoSpark/tree/master/core/src/main/scala/org/datasyslab/geospark/showcase), [Java Example](https://github.com/DataSystemsLab/GeoSpark/tree/master/core/src/main/java/org/datasyslab/geospark/showcase)
+
 # Babylon Visualization System on GeoSpark
 Babylon is a large-scale in-memory geospatial visualization system.
 
@@ -107,8 +108,9 @@
 
 ## Questions
 
-* Please join [![Join the chat at https://gitter.im/geospark-datasys/Lobby](https://badges.gitter.im/geospark-datasys/Lobby.svg)](https://gitter.im/geospark-datasys/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
-
+* [GeoSpark@Twitter](https://twitter.com/GeoSpark_ASU)
+* [GeoSpark Discussion Board](https://groups.google.com/forum/#!forum/geospark-discussion-board)
+* [![Join the chat at https://gitter.im/geospark-datasys/Lobby](https://badges.gitter.im/geospark-datasys/Lobby.svg)](https://gitter.im/geospark-datasys/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
 * Email us!
 
 ## Contact
diff --git a/babylon/README.md b/babylon/README.md
index 354c342..cf2cb16 100644
--- a/babylon/README.md
+++ b/babylon/README.md
@@ -2,8 +2,8 @@
 
 | Status   |      Stable    | Latest | Source code|
 |:----------:|:-------------:|:------:|:------:|
-| GeoSpark |  0.8.0 | [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/geospark/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/geospark) | [![Build Status](https://travis-ci.org/jiayuasu/GeoSpark.svg?branch=master)](https://travis-ci.org/jiayuasu/GeoSpark)[![codecov.io](http://codecov.io/github/jiayuasu/GeoSpark/coverage.svg?branch=master)](http://codecov.io/github/jiayuasu/GeoSpark?branch=master)|
-| [Babylon Viz System](https://github.com/DataSystemsLab/GeoSpark/tree/master/babylon) |   0.2.1  |   [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/babylon/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/babylon) | [![Build Status](https://travis-ci.org/jiayuasu/GeoSpark.svg?branch=master)](https://travis-ci.org/jiayuasu/GeoSpark)[![codecov.io](http://codecov.io/github/jiayuasu/GeoSpark/coverage.svg?branch=master)](http://codecov.io/github/jiayuasu/GeoSpark?branch=master)|
+| GeoSpark |  0.8.1 | [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/geospark/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/geospark) | [![Build Status](https://travis-ci.org/jiayuasu/GeoSpark.svg?branch=master)](https://travis-ci.org/jiayuasu/GeoSpark)[![codecov.io](http://codecov.io/github/jiayuasu/GeoSpark/coverage.svg?branch=master)](http://codecov.io/github/jiayuasu/GeoSpark?branch=master)|
+| [Babylon Viz System](https://github.com/DataSystemsLab/GeoSpark/tree/master/babylon) |   0.2.2  |   [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/babylon/badge.svg)](https://maven-badges.herokuapp.com/maven-central/org.datasyslab/babylon) | [![Build Status](https://travis-ci.org/jiayuasu/GeoSpark.svg?branch=master)](https://travis-ci.org/jiayuasu/GeoSpark)[![codecov.io](http://codecov.io/github/jiayuasu/GeoSpark/coverage.svg?branch=master)](http://codecov.io/github/jiayuasu/GeoSpark?branch=master)|
 
 [![Join the chat at https://gitter.im/geospark-datasys/Lobby](https://badges.gitter.im/geospark-datasys/Lobby.svg)](https://gitter.im/geospark-datasys/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
 
@@ -60,7 +60,7 @@
 You also can buld your new image filter by easily extending the photo filter!
 
 ### Various Image Type
-* Raster image: PNG, JPG, GIF
+* Raster image: PNG
 * Vector image: SVG (Only support Scatter plot and Choropleth Map)
 * More!
 
@@ -75,6 +75,12 @@
 * Choropleth Map
 * More!
 
+### Current Output Storage
+
+* Local disk
+* Hadoop Distributed File System (HDFS)
+* Amazon Simple Storage Service (Amazon S3) 
+
 You also can build your new self-designed effects by easily extending the visualization operator!
 
 # Babylon Tutorial ([more](https://github.com/DataSystemsLab/GeoSpark/wiki))
diff --git a/babylon/pom.xml b/babylon/pom.xml
index 9ee7384..dd45cc4 100644
--- a/babylon/pom.xml
+++ b/babylon/pom.xml
@@ -3,7 +3,7 @@
 	<modelVersion>4.0.0</modelVersion>
 	<groupId>org.datasyslab</groupId>
 	<artifactId>babylon</artifactId>
-	<version>0.2.1</version>
+	<version>0.2.2</version>
 	
 	<name>${project.groupId}:${project.artifactId}</name>
 	<description>Geospatial visualization extension for Apache Spark</description>
@@ -70,7 +70,11 @@
             <version>0.1.0</version>
             <scope>provided</scope>
         </dependency>
-
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-core</artifactId>
+            <version>1.11.160</version>
+        </dependency>
         <!-- Test -->
         <dependency>
             <groupId>junit</groupId>
diff --git a/babylon/src/main/java/org/datasyslab/babylon/core/BigBufferedImage.java b/babylon/src/main/java/org/datasyslab/babylon/core/BigBufferedImage.java
new file mode 100644
index 0000000..952f2b6
--- /dev/null
+++ b/babylon/src/main/java/org/datasyslab/babylon/core/BigBufferedImage.java
@@ -0,0 +1,455 @@
+/**
+ * FILE: BigBufferedImage.java
+ * PATH: org.datasyslab.babylon.core.BigBufferedImage.java
+ * Copyright (c) 2015-2017 GeoSpark Development Team
+ * All rights reserved.
+ */
+package org.datasyslab.babylon.core;
+
+/*
+ * This class is part of MCFS (Mission Control - Flight Software) a development
+ * of Team Puli Space, official Google Lunar XPRIZE contestant.
+ * This class is released under Creative Commons CC0.
+ * @author Zsolt Pocze, Dimitry Polivaev
+ * Please like us on facebook, and/or join our Small Step Club.
+ * http://www.pulispace.com
+ * https://www.facebook.com/pulispace
+ * http://nyomdmegteis.hu/en/
+ */
+
+import sun.nio.ch.DirectBuffer;
+
+import javax.imageio.ImageIO;
+import javax.imageio.ImageReadParam;
+import javax.imageio.ImageReader;
+import javax.imageio.stream.ImageInputStream;
+import java.awt.*;
+import java.awt.color.ColorSpace;
+import java.awt.image.*;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class BigBufferedImage.
+ */
+public class BigBufferedImage extends BufferedImage {
+
+    /** The Constant TMP_DIR. */
+    private static final String TMP_DIR = System.getProperty("java.io.tmpdir");
+    
+    /** The Constant MAX_PIXELS_IN_MEMORY. */
+    public static final int MAX_PIXELS_IN_MEMORY =  1024 * 1024;
+
+    /**
+     * Creates the.
+     *
+     * @param width the width
+     * @param height the height
+     * @param imageType the image type
+     * @return the buffered image
+     */
+    public static BufferedImage create(int width, int height, int imageType) {
+        if (width * height > MAX_PIXELS_IN_MEMORY) {
+            try {
+                final File tempDir = new File(TMP_DIR);
+                return createBigBufferedImage(tempDir, width, height, imageType);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            return new BufferedImage(width, height, imageType);
+        }
+    }
+
+    /**
+     * Creates the.
+     *
+     * @param inputFile the input file
+     * @param imageType the image type
+     * @return the buffered image
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    public static BufferedImage create(File inputFile, int imageType) throws IOException {
+        try (ImageInputStream stream = ImageIO.createImageInputStream(inputFile);) {
+            Iterator<ImageReader> readers = ImageIO.getImageReaders(stream);
+            if (readers.hasNext()) {
+                try {
+                    ImageReader reader = readers.next();
+                    reader.setInput(stream, true, true);
+                    int width = reader.getWidth(reader.getMinIndex());
+                    int height = reader.getHeight(reader.getMinIndex());
+                    BufferedImage image = create(width, height, imageType);
+                    int cores = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
+                    int block = Math.min(MAX_PIXELS_IN_MEMORY / cores / width, (int) (Math.ceil(height / (double) cores)));
+                    ExecutorService generalExecutor = Executors.newFixedThreadPool(cores);
+                    List<Callable<ImagePartLoader>> partLoaders = new ArrayList<>();
+                    for (int y = 0; y < height; y += block) {
+                        partLoaders.add(new ImagePartLoader(
+                                y, width, Math.min(block, height - y), inputFile, image));
+                    }
+                    generalExecutor.invokeAll(partLoaders);
+                    generalExecutor.shutdown();
+                    return image;
+                } catch (InterruptedException ex) {
+                    Logger.getLogger(BigBufferedImage.class.getName()).log(Level.SEVERE, null, ex);
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Creates the big buffered image.
+     *
+     * @param tempDir the temp dir
+     * @param width the width
+     * @param height the height
+     * @param imageType the image type
+     * @return the buffered image
+     * @throws FileNotFoundException the file not found exception
+     * @throws IOException Signals that an I/O exception has occurred.
+     */
+    private static BufferedImage createBigBufferedImage(File tempDir, int width, int height, int imageType)
+            throws FileNotFoundException, IOException {
+        FileDataBuffer buffer = new FileDataBuffer(tempDir, width * height, 4);
+        ColorModel colorModel = null;
+        BandedSampleModel sampleModel = null;
+        switch (imageType) {
+            case TYPE_INT_RGB:
+                colorModel = new ComponentColorModel(ColorSpace.getInstance(ColorSpace.CS_sRGB),
+                        new int[]{8, 8, 8, 0},
+                        false,
+                        false,
+                        ComponentColorModel.TRANSLUCENT,
+                        DataBuffer.TYPE_BYTE);
+                sampleModel = new BandedSampleModel(DataBuffer.TYPE_BYTE, width, height, 3);
+                break;
+            case TYPE_INT_ARGB:
+                colorModel = new ComponentColorModel(ColorSpace.getInstance(ColorSpace.CS_sRGB),
+                        new int[]{8, 8, 8, 8},
+                        true,
+                        false,
+                        ComponentColorModel.TRANSLUCENT,
+                        DataBuffer.TYPE_BYTE);
+                sampleModel = new BandedSampleModel(DataBuffer.TYPE_BYTE, width, height, 4);
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported image type: " + imageType);
+        }
+        SimpleRaster raster = new SimpleRaster(sampleModel, buffer, new Point(0, 0));
+        BigBufferedImage image = new BigBufferedImage(colorModel, raster, colorModel.isAlphaPremultiplied(), null);
+        return image;
+    }
+
+    /**
+     * The Class ImagePartLoader.
+     */
+    private static class ImagePartLoader implements Callable<ImagePartLoader> {
+
+        /** The y. */
+        private final int y;
+        
+        /** The image. */
+        private final BufferedImage image;
+        
+        /** The region. */
+        private final Rectangle region;
+        
+        /** The file. */
+        private final File file;
+
+        /**
+         * Instantiates a new image part loader.
+         *
+         * @param y the y
+         * @param width the width
+         * @param height the height
+         * @param file the file
+         * @param image the image
+         */
+        public ImagePartLoader(int y, int width, int height, File file, BufferedImage image) {
+            this.y = y;
+            this.image = image;
+            this.file = file;
+            region = new Rectangle(0, y, width, height);
+        }
+
+        /* (non-Javadoc)
+         * @see java.util.concurrent.Callable#call()
+         */
+        @Override
+        public ImagePartLoader call() throws Exception {
+            Thread.currentThread().setPriority((Thread.MIN_PRIORITY + Thread.NORM_PRIORITY) / 2);
+            try (ImageInputStream stream = ImageIO.createImageInputStream(file);) {
+                Iterator<ImageReader> readers = ImageIO.getImageReaders(stream);
+                if (readers.hasNext()) {
+                    ImageReader reader = readers.next();
+                    reader.setInput(stream, true, true);
+                    ImageReadParam param = reader.getDefaultReadParam();
+                    param.setSourceRegion(region);
+                    BufferedImage part = reader.read(0, param);
+                    Raster source = part.getRaster();
+                    WritableRaster target = image.getRaster();
+                    target.setRect(0, y, source);
+                }
+            }
+            return ImagePartLoader.this;
+        }
+    }
+
+    /**
+     * Instantiates a new big buffered image.
+     *
+     * @param cm the cm
+     * @param raster the raster
+     * @param isRasterPremultiplied the is raster premultiplied
+     * @param properties the properties
+     */
+    private BigBufferedImage(ColorModel cm, SimpleRaster raster, boolean isRasterPremultiplied, Hashtable<?, ?> properties) {
+        super(cm, raster, isRasterPremultiplied, properties);
+    }
+
+    /**
+     * Dispose.
+     */
+    public void dispose() {
+        ((SimpleRaster) getRaster()).dispose();
+    }
+
+    /**
+     * Dispose.
+     *
+     * @param image the image
+     */
+    public static void dispose(RenderedImage image) {
+        if (image instanceof BigBufferedImage) {
+            ((BigBufferedImage) image).dispose();
+        }
+    }
+
+    /**
+     * The Class SimpleRaster.
+     */
+    private static class SimpleRaster extends WritableRaster {
+
+        /**
+         * Instantiates a new simple raster.
+         *
+         * @param sampleModel the sample model
+         * @param dataBuffer the data buffer
+         * @param origin the origin
+         */
+        public SimpleRaster(SampleModel sampleModel, FileDataBuffer dataBuffer, Point origin) {
+            super(sampleModel, dataBuffer, origin);
+        }
+
+        /**
+         * Dispose.
+         */
+        public void dispose() {
+            ((FileDataBuffer) getDataBuffer()).dispose();
+        }
+
+    }
+
+    /**
+     * The Class FileDataBufferDeleterHook.
+     */
+    private static final class FileDataBufferDeleterHook extends Thread {
+
+        static {
+            Runtime.getRuntime().addShutdownHook(new FileDataBufferDeleterHook());
+        }
+
+        /** The Constant undisposedBuffers. */
+        private static final HashSet<FileDataBuffer> undisposedBuffers = new HashSet<>();
+
+        /* (non-Javadoc)
+         * @see java.lang.Thread#run()
+         */
+        @Override
+        public void run() {
+            final FileDataBuffer[] buffers = undisposedBuffers.toArray(new FileDataBuffer[0]);
+            for (FileDataBuffer b : buffers) {
+                b.disposeNow();
+            }
+        }
+    }
+
+    /**
+     * The Class FileDataBuffer.
+     */
+    private static class FileDataBuffer extends DataBuffer {
+
+        /** The id. */
+        private final String id = "buffer-" + System.currentTimeMillis() + "-" + ((int) (Math.random() * 1000));
+        
+        /** The dir. */
+        private File dir;
+        
+        /** The path. */
+        private String path;
+        
+        /** The files. */
+        private File[] files;
+        
+        /** The access files. */
+        private RandomAccessFile[] accessFiles;
+        
+        /** The buffer. */
+        private MappedByteBuffer[] buffer;
+
+        /**
+         * Instantiates a new file data buffer.
+         *
+         * @param dir the dir
+         * @param size the size
+         * @throws FileNotFoundException the file not found exception
+         * @throws IOException Signals that an I/O exception has occurred.
+         */
+        public FileDataBuffer(File dir, int size) throws FileNotFoundException, IOException {
+            super(TYPE_BYTE, size);
+            this.dir = dir;
+            init();
+        }
+
+        /**
+         * Instantiates a new file data buffer.
+         *
+         * @param dir the dir
+         * @param size the size
+         * @param numBanks the num banks
+         * @throws FileNotFoundException the file not found exception
+         * @throws IOException Signals that an I/O exception has occurred.
+         */
+        public FileDataBuffer(File dir, int size, int numBanks) throws FileNotFoundException, IOException {
+            super(TYPE_BYTE, size, numBanks);
+            this.dir = dir;
+            init();
+        }
+
+        /**
+         * Inits the.
+         *
+         * @throws FileNotFoundException the file not found exception
+         * @throws IOException Signals that an I/O exception has occurred.
+         */
+        private void init() throws FileNotFoundException, IOException {
+            FileDataBufferDeleterHook.undisposedBuffers.add(this);
+            if (dir == null) {
+                dir = new File(".");
+            }
+            if (!dir.exists()) {
+                throw new RuntimeException("FileDataBuffer constructor parameter dir does not exist: " + dir);
+            }
+            if (!dir.isDirectory()) {
+                throw new RuntimeException("FileDataBuffer constructor parameter dir is not a directory: " + dir);
+            }
+            path = dir.getPath() + "/" + id;
+            File subDir = new File(path);
+            subDir.mkdir();
+            buffer = new MappedByteBuffer[banks];
+            accessFiles = new RandomAccessFile[banks];
+            files = new File[banks];
+            for (int i = 0; i < banks; i++) {
+                File file = files[i] = new File(path + "/bank" + i + ".dat");
+                final RandomAccessFile randomAccessFile = accessFiles[i] = new RandomAccessFile(file, "rw");
+                buffer[i] = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, getSize());
+            }
+        }
+
+        /* (non-Javadoc)
+         * @see java.awt.image.DataBuffer#getElem(int, int)
+         */
+        @Override
+        public int getElem(int bank, int i) {
+            return buffer[bank].get(i) & 0xff;
+        }
+
+        /* (non-Javadoc)
+         * @see java.awt.image.DataBuffer#setElem(int, int, int)
+         */
+        @Override
+        public void setElem(int bank, int i, int val) {
+            buffer[bank].put(i, (byte) val);
+        }
+
+        /* (non-Javadoc)
+         * @see java.lang.Object#finalize()
+         */
+        @Override
+        protected void finalize() throws Throwable {
+            dispose();
+        }
+
+        /**
+         * Dispose now.
+         */
+        private void disposeNow() {
+            final MappedByteBuffer[] disposedBuffer = this.buffer;
+            this.buffer = null;
+            disposeNow(disposedBuffer);
+        }
+
+        /**
+         * Dispose.
+         */
+        public void dispose() {
+            final MappedByteBuffer[] disposedBuffer = this.buffer;
+            this.buffer = null;
+            new Thread() {
+                @Override
+                public void run() {
+                    disposeNow(disposedBuffer);
+                }
+            }.start();
+        }
+
+        /**
+         * Dispose now.
+         *
+         * @param disposedBuffer the disposed buffer
+         */
+        private void disposeNow(final MappedByteBuffer[] disposedBuffer) {
+            FileDataBufferDeleterHook.undisposedBuffers.remove(this);
+            if (disposedBuffer != null) {
+                for (MappedByteBuffer b : disposedBuffer) {
+                    ((DirectBuffer) b).cleaner().clean();
+                }
+            }
+            if (accessFiles != null) {
+                for (RandomAccessFile file : accessFiles) {
+                    try {
+                        file.close();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+                accessFiles = null;
+            }
+            if (files != null) {
+                for (File file : files) {
+                    file.delete();
+                }
+                files = null;
+            }
+            if (path != null) {
+                new File(path).delete();
+                path = null;
+            }
+        }
+
+    }
+}
\ No newline at end of file
diff --git a/babylon/src/main/java/org/datasyslab/babylon/core/ImageGenerator.java b/babylon/src/main/java/org/datasyslab/babylon/core/ImageGenerator.java
index 14f6672..cce86ed 100644
--- a/babylon/src/main/java/org/datasyslab/babylon/core/ImageGenerator.java
+++ b/babylon/src/main/java/org/datasyslab/babylon/core/ImageGenerator.java
@@ -8,37 +8,62 @@
 
 import java.awt.image.BufferedImage;
 
-import java.io.Serializable;
+import java.io.*;
+import java.net.URI;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
-import org.datasyslab.babylon.utils.ImageType;
+import org.apache.spark.api.java.function.VoidFunction;
 
+import org.datasyslab.babylon.utils.ImageType;
+import org.datasyslab.babylon.utils.RasterizationUtils;
+import org.datasyslab.babylon.utils.S3Operator;
 import scala.Tuple2;
 
+import javax.imageio.ImageIO;
+
 
 // TODO: Auto-generated Javadoc
 /**
  * The Class ImageGenerator.
  */
-public abstract class ImageGenerator implements Serializable{
+public class ImageGenerator implements Serializable{
 
+	/** The Constant logger. */
+	final static Logger logger = Logger.getLogger(ImageGenerator.class);
 
 	/**
-	 * Save raster image as spark file.
+	 * Save raster image as local file.
 	 *
 	 * @param distributedImage the distributed image
 	 * @param outputPath the output path
 	 * @param imageType the image type
+	 * @param zoomLevel the zoom level
+	 * @param partitionOnX the partition on X
+	 * @param partitionOnY the partition on Y
 	 * @return true, if successful
 	 * @throws Exception the exception
 	 */
-	public boolean SaveRasterImageAsSparkFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, String outputPath, ImageType imageType) throws Exception
+	public boolean SaveRasterImageAsLocalFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception
 	{
-		
-		distributedImage.saveAsObjectFile(outputPath+"."+imageType.getTypeName());
+		logger.info("[Babylon][SaveRasterImageAsLocalFile][Start]");
+		for(int i=0;i<partitionOnX*partitionOnY;i++) {
+			deleteLocalFile(outputPath+"-"+ RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i),imageType);
+		}
+		distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
+			@Override
+			public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
+				SaveRasterImageAsLocalFile(integerImageSerializableWrapperTuple2._2.image, outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
+			}
+		});
+		logger.info("[Babylon][SaveRasterImageAsLocalFile][Stop]");
 		return true;
 	}
 	
@@ -60,7 +85,69 @@
 		}
 		return true;
 	}
-	
+
+	/**
+	 * Save raster image as hadoop file.
+	 *
+	 * @param distributedImage the distributed image
+	 * @param outputPath the output path
+	 * @param imageType the image type
+	 * @param zoomLevel the zoom level
+	 * @param partitionOnX the partition on X
+	 * @param partitionOnY the partition on Y
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean SaveRasterImageAsHadoopFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception
+	{
+		logger.info("[Babylon][SaveRasterImageAsHadoopFile][Start]");
+		for(int i=0;i<partitionOnX*partitionOnY;i++) {
+			deleteHadoopFile(outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+".", imageType);
+		}
+		distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
+			@Override
+			public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
+				SaveRasterImageAsHadoopFile(integerImageSerializableWrapperTuple2._2.image, outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
+			}
+		});
+		logger.info("[Babylon][SaveRasterImageAsHadoopFile][Stop]");
+		return true;
+	}
+
+	/**
+	 * Save raster image as S 3 file.
+	 *
+	 * @param distributedImage the distributed image
+	 * @param regionName the region name
+	 * @param accessKey the access key
+	 * @param secretKey the secret key
+	 * @param bucketName the bucket name
+	 * @param path the path
+	 * @param imageType the image type
+	 * @param zoomLevel the zoom level
+	 * @param partitionOnX the partition on X
+	 * @param partitionOnY the partition on Y
+	 * @return true, if successful
+	 */
+	public boolean SaveRasterImageAsS3File(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage,
+										   final String regionName, final String accessKey, final String secretKey,
+										   final String bucketName, final String path, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY)
+	{
+		logger.info("[Babylon][SaveRasterImageAsS3File][Start]");
+		S3Operator s3Operator = new S3Operator(regionName, accessKey, secretKey);
+		for(int i=0;i<partitionOnX*partitionOnY;i++) {
+			s3Operator.deleteImage(bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+"."+imageType.getTypeName());
+		}
+		distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
+			@Override
+			public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
+				SaveRasterImageAsS3File(integerImageSerializableWrapperTuple2._2.image, regionName, accessKey, secretKey, bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
+			}
+		});
+		logger.info("[Babylon][SaveRasterImageAsS3File][Stop]");
+		return true;
+	}
+
 	/**
 	 * Save raster image as local file.
 	 *
@@ -70,27 +157,89 @@
 	 * @return true, if successful
 	 * @throws Exception the exception
 	 */
-	public abstract boolean SaveRasterImageAsLocalFile(BufferedImage rasterImage, String outputPath, ImageType imageType) throws Exception;
-
-	
-	/**
-	 * Save vector image as spark file.
-	 *
-	 * @param distributedImage the distributed image
-	 * @param outputPath the output path
-	 * @param imageType the image type
-	 * @return true, if successful
-	 * @throws Exception the exception
-	 */
-	public boolean SaveVectorImageAsSparkFile(JavaPairRDD<Integer,String> distributedImage, String outputPath, ImageType imageType) throws Exception
+	public boolean SaveRasterImageAsLocalFile(BufferedImage rasterImage, String outputPath, ImageType imageType) throws Exception
 	{
-		
-		distributedImage.saveAsTextFile(outputPath+"."+imageType.getTypeName());
+		logger.info("[Babylon][SaveRasterImageAsLocalFile][Start]");
+		File outputImage = new File(outputPath+"."+imageType.getTypeName());
+		outputImage.getParentFile().mkdirs();
+		try {
+			ImageIO.write(rasterImage,imageType.getTypeName(),outputImage);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		logger.info("[Babylon][SaveRasterImageAsLocalFile][Stop]");
 		return true;
 	}
-	
+
 	/**
-	 * Save vectormage as local file.
+	 * Save raster image as hadoop file.
+	 *
+	 * @param rasterImage the raster image
+	 * @param originalOutputPath the original output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean SaveRasterImageAsHadoopFile(BufferedImage rasterImage, String originalOutputPath, ImageType imageType) throws Exception
+	{
+		logger.info("[Babylon][SaveRasterImageAsHadoopFile][Start]");
+		// Locate HDFS path
+		String outputPath = originalOutputPath+"."+imageType.getTypeName();
+		String[] splitString = outputPath.split(":");
+		String hostName = splitString[0]+":"+splitString[1];
+		String[] portAndPath = splitString[2].split("/");
+		String port = portAndPath[0];
+		String localPath = "";
+		for(int i=1;i<portAndPath.length;i++)
+		{
+			localPath+="/"+portAndPath[i];
+		}
+		localPath+="."+imageType.getTypeName();
+		// Delete existing files
+		Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+		logger.info("[Babylon][SaveRasterImageAsSparkFile] HDFS URI BASE: "+hostName+":"+port);
+		FileSystem hdfs = FileSystem.get(new URI(hostName+":"+port), hadoopConf);
+		logger.info("[Babylon][SaveRasterImageAsSparkFile] Check the existence of path: "+localPath);
+		if (hdfs.exists(new org.apache.hadoop.fs.Path(localPath))) {
+			logger.info("[Babylon][SaveRasterImageAsSparkFile] Deleting path: "+localPath);
+			hdfs.delete(new org.apache.hadoop.fs.Path(localPath), true);
+			logger.info("[Babylon][SaveRasterImageAsSparkFile] Deleted path: "+localPath);
+		}
+		Path path = new Path(outputPath);
+		FSDataOutputStream out = hdfs.create(path);
+		ImageIO.write(rasterImage,"png",out);
+		out.close();
+		hdfs.close();
+		logger.info("[Babylon][SaveRasterImageAsHadoopFile][Stop]");
+		return true;
+	}
+
+
+
+	/**
+	 * Save raster image as S 3 file.
+	 *
+	 * @param rasterImage the raster image
+	 * @param regionName the region name
+	 * @param accessKey the access key
+	 * @param secretKey the secret key
+	 * @param bucketName the bucket name
+	 * @param path the path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws IOException Signals that an I/O exception has occurred.
+	 */
+	public boolean SaveRasterImageAsS3File(BufferedImage rasterImage, String regionName, String accessKey, String secretKey, String bucketName, String path, ImageType imageType) throws IOException {
+		logger.info("[Babylon][SaveRasterImageAsS3File][Start]");
+		S3Operator s3Operator = new S3Operator(regionName,accessKey, secretKey);
+		s3Operator.putImage(bucketName,path+"."+imageType.getTypeName(),rasterImage);
+		logger.info("[Babylon][SaveRasterImageAsS3File][Stop]");
+		return true;
+	}
+
+
+	/**
+	 * Save vector image as local file.
 	 *
 	 * @param distributedImage the distributed image
 	 * @param outputPath the output path
@@ -98,8 +247,9 @@
 	 * @return true, if successful
 	 * @throws Exception the exception
 	 */
-	public boolean SaveVectormageAsLocalFile(JavaPairRDD<Integer,String> distributedImage, String outputPath, ImageType imageType) throws Exception
+	public boolean SaveVectorImageAsLocalFile(JavaPairRDD<Integer,String> distributedImage, String outputPath, ImageType imageType) throws Exception
 	{
+		logger.info("[Babylon][SaveVectormageAsLocalFile][Start]");
 		JavaRDD<String> distributedVectorImageNoKey= distributedImage.map(new Function<Tuple2<Integer,String>, String>()
 		{
 
@@ -107,12 +257,13 @@
 			public String call(Tuple2<Integer, String> vectorObject) throws Exception {
 				return vectorObject._2();
 			}
-			
+
 		});
 		this.SaveVectorImageAsLocalFile(distributedVectorImageNoKey.collect(), outputPath, imageType);
+		logger.info("[Babylon][SaveVectormageAsLocalFile][Stop]");
 		return true;
 	}
-	
+
 	/**
 	 * Save vector image as local file.
 	 *
@@ -122,7 +273,101 @@
 	 * @return true, if successful
 	 * @throws Exception the exception
 	 */
-	public abstract boolean SaveVectorImageAsLocalFile(List<String> vectorImage, String outputPath, ImageType imageType) throws Exception;
+	public boolean SaveVectorImageAsLocalFile(List<String> vectorImage, String outputPath, ImageType imageType) throws Exception
+	{
+		logger.info("[Babylon][SaveVectorImageAsLocalFile][Start]");
+		File outputImage = new File(outputPath+"."+imageType.getTypeName());
+		outputImage.getParentFile().mkdirs();
 
+		BufferedWriter bw = null;
+		FileWriter fw = null;
+		try {
+			fw = new FileWriter(outputImage);
+			bw = new BufferedWriter(fw);
+			for(String svgElement : vectorImage)
+			{
+				bw.write(svgElement);
+			}
 
+		} catch (IOException e) {
+
+			e.printStackTrace();
+
+		} finally {
+
+			try {
+
+				if (bw != null)
+					bw.close();
+
+				if (fw != null)
+					fw.close();
+
+			} catch (IOException ex) {
+
+				ex.printStackTrace();
+
+			}
+		}
+		logger.info("[Babylon][SaveVectorImageAsLocalFile][Stop]");
+		return true;
+	}
+
+	/**
+	 * Delete hadoop file.
+	 *
+	 * @param originalOutputPath the original output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean deleteHadoopFile(String originalOutputPath, ImageType imageType) throws Exception {
+		String outputPath = originalOutputPath+"."+imageType.getTypeName();
+		String[] splitString = outputPath.split(":");
+		String hostName = splitString[0]+":"+splitString[1];
+		String[] portAndPath = splitString[2].split("/");
+		String port = portAndPath[0];
+		String localPath = "";
+		for(int i=1;i<portAndPath.length;i++)
+		{
+			localPath+="/"+portAndPath[i];
+		}
+		localPath+="."+imageType.getTypeName();
+		// Delete existing files
+		Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+		logger.info("[Babylon][SaveRasterImageAsSparkFile] HDFS URI BASE: "+hostName+":"+port);
+		FileSystem hdfs = FileSystem.get(new URI(hostName+":"+port), hadoopConf);
+		logger.info("[Babylon][SaveRasterImageAsSparkFile] Check the existence of path: "+localPath);
+		if (hdfs.exists(new org.apache.hadoop.fs.Path(localPath))) {
+			logger.info("[Babylon][SaveRasterImageAsSparkFile] Deleting path: "+localPath);
+			hdfs.delete(new org.apache.hadoop.fs.Path(localPath), true);
+			logger.info("[Babylon][SaveRasterImageAsSparkFile] Deleted path: "+localPath);
+		}
+		return true;
+	}
+
+	/**
+	 * Delete local file.
+	 *
+	 * @param originalOutputPath the original output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 */
+	public boolean deleteLocalFile(String originalOutputPath, ImageType imageType) {
+		File file = null;
+		try {
+
+			// create new file
+			file = new File(originalOutputPath+"."+imageType.getTypeName());
+
+			// tries to delete a non-existing file
+			file.delete();
+
+		} catch(Exception e) {
+
+			// if any error occurs
+			e.printStackTrace();
+		}
+		return true;
+	}
 }
diff --git a/babylon/src/main/java/org/datasyslab/babylon/core/ImageSerializableWrapper.java b/babylon/src/main/java/org/datasyslab/babylon/core/ImageSerializableWrapper.java
index e1e2d2e..7abf39c 100644
--- a/babylon/src/main/java/org/datasyslab/babylon/core/ImageSerializableWrapper.java
+++ b/babylon/src/main/java/org/datasyslab/babylon/core/ImageSerializableWrapper.java
@@ -58,4 +58,14 @@
     	System.out.println("I got nothing from the stream!");
     }
   }
+
+  /**
+   * Gets the image.
+   *
+   * @return the image
+   */
+  public BufferedImage getImage()
+  {
+    return this.image;
+  }
 }
\ No newline at end of file
diff --git a/babylon/src/main/java/org/datasyslab/babylon/core/ImageStitcher.java b/babylon/src/main/java/org/datasyslab/babylon/core/ImageStitcher.java
new file mode 100644
index 0000000..9df29be
--- /dev/null
+++ b/babylon/src/main/java/org/datasyslab/babylon/core/ImageStitcher.java
@@ -0,0 +1,195 @@
+/**
+ * FILE: ImageStitcher.java
+ * PATH: org.datasyslab.babylon.core.ImageStitcher.java
+ * Copyright (c) 2015-2017 GeoSpark Development Team
+ * All rights reserved.
+ */
+package org.datasyslab.babylon.core;
+
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.datasyslab.babylon.utils.ImageType;
+import org.datasyslab.babylon.utils.RasterizationUtils;
+import org.datasyslab.babylon.utils.S3Operator;
+
+import scala.Tuple2;
+
+import javax.imageio.ImageIO;
+import java.awt.image.BufferedImage;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+// TODO: Auto-generated Javadoc
+/**
+ * The Class ImageStitcher.
+ */
+public class ImageStitcher {
+    
+    /** The Constant logger. */
+    final static Logger logger = Logger.getLogger(ImageStitcher.class);
+
+    /**
+     * Stitch image partitions from local file.
+     *
+     * @param imageTilePath the image tile path
+     * @param resolutionX the resolution X
+     * @param resolutionY the resolution Y
+     * @param zoomLevel the zoom level
+     * @param partitionOnX the partition on X
+     * @param partitionOnY the partition on Y
+     * @return true, if successful
+     * @throws Exception the exception
+     */
+    public static boolean stitchImagePartitionsFromLocalFile(String imageTilePath, int resolutionX, int resolutionY, int zoomLevel, int partitionOnX, int partitionOnY) throws Exception
+    {
+        logger.info("[Babylon][stitchImagePartitions][Start]");
+
+        BufferedImage stitchedImage = BigBufferedImage.create(resolutionX, resolutionY,BufferedImage.TYPE_INT_ARGB);
+        //Stitch all image partitions together
+        for(int i=0;i<partitionOnX*partitionOnY;i++)
+        {
+            BufferedImage imageTile = null;
+            try {
+                imageTile = ImageIO.read(new File(""+imageTilePath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX,partitionOnY,i)+".png"));
+            } catch (IOException e) {
+                continue;
+            }
+            Tuple2<Integer,Integer> partitionCoordinate = RasterizationUtils.Decode1DTo2DId(partitionOnX, partitionOnY, i);
+            int partitionMinX = partitionCoordinate._1*Math.round(resolutionX/partitionOnX);
+            int partitionMinY = partitionCoordinate._2*Math.round(resolutionY/partitionOnY);
+            //if(partitionMinX!=0){partitionMinX--;}
+            //if(partitionMinY!=0){partitionMinY--;}
+            int[] rgbArray = imageTile.getRGB(0, 0, imageTile.getWidth(), imageTile.getHeight(), null, 0, imageTile.getWidth());
+            int partitionMaxX = partitionMinX+imageTile.getWidth();
+            int partitionMaxY = partitionMinY+imageTile.getHeight();
+            logger.debug("[Babylon][stitchImagePartitions] stitching image tile..."+i+" ResolutionX " + resolutionX+" ResolutionY "+resolutionY);
+            logger.debug("[Babylon][stitchImagePartitions] stitching a image tile..."+i+" MinX "+partitionMinX+" MaxX "+partitionMaxX+" MinY "+partitionMinY+" MaxY "+partitionMaxY);
+            stitchedImage.setRGB(partitionMinX, partitionMinY, imageTile.getWidth(), imageTile.getHeight(), rgbArray, 0, imageTile.getWidth());
+        }
+        ImageGenerator imageGenerator = new ImageGenerator();
+        imageGenerator.SaveRasterImageAsLocalFile(stitchedImage,imageTilePath+"-"+zoomLevel+"-stitched", ImageType.PNG);
+        logger.info("[Babylon][stitchImagePartitions][Stop]");
+        return true;
+    }
+
+    /**
+     * Stitch image partitions from S 3 file.
+     *
+     * @param regionName the region name
+     * @param accessKey the access key
+     * @param secretKey the secret key
+     * @param bucketName the bucket name
+     * @param imageTilePath the image tile path
+     * @param resolutionX the resolution X
+     * @param resolutionY the resolution Y
+     * @param zoomLevel the zoom level
+     * @param partitionOnX the partition on X
+     * @param partitionOnY the partition on Y
+     * @return true, if successful
+     * @throws Exception the exception
+     */
+    public static boolean stitchImagePartitionsFromS3File(String regionName, String accessKey, String secretKey, String bucketName, String imageTilePath, int resolutionX, int resolutionY, int zoomLevel, int partitionOnX, int partitionOnY) throws Exception
+    {
+        logger.info("[Babylon][stitchImagePartitions][Start]");
+
+        BufferedImage stitchedImage = BigBufferedImage.create(resolutionX, resolutionY,BufferedImage.TYPE_INT_ARGB);
+        S3Operator s3Operator = new S3Operator(regionName, accessKey, secretKey);
+        //Stitch all image partitions together
+        for(int i=0;i<partitionOnX*partitionOnY;i++)
+        {
+            BufferedImage imageTile = null;
+            try {
+                imageTile = s3Operator.getImage(bucketName, imageTilePath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX,partitionOnY,i)+".png");
+            } catch (AmazonS3Exception e) {
+                continue;
+            }
+            Tuple2<Integer,Integer> partitionCoordinate = RasterizationUtils.Decode1DTo2DId(partitionOnX, partitionOnY, i);
+            int partitionMinX = partitionCoordinate._1*Math.round(resolutionX/partitionOnX);
+            int partitionMinY = partitionCoordinate._2*Math.round(resolutionY/partitionOnY);
+            //if(partitionMinX!=0){partitionMinX--;}
+            //if(partitionMinY!=0){partitionMinY--;}
+            int[] rgbArray = imageTile.getRGB(0, 0, imageTile.getWidth(), imageTile.getHeight(), null, 0, imageTile.getWidth());
+            int partitionMaxX = partitionMinX+imageTile.getWidth();
+            int partitionMaxY = partitionMinY+imageTile.getHeight();
+            logger.debug("[Babylon][stitchImagePartitions] stitching image tile..."+i+" ResolutionX " + resolutionX+" ResolutionY "+resolutionY);
+            logger.debug("[Babylon][stitchImagePartitions] stitching a image tile..."+i+" MinX "+partitionMinX+" MaxX "+partitionMaxX+" MinY "+partitionMinY+" MaxY "+partitionMaxY);
+            stitchedImage.setRGB(partitionMinX, partitionMinY, imageTile.getWidth(), imageTile.getHeight(), rgbArray, 0, imageTile.getWidth());
+        }
+        ImageGenerator imageGenerator = new ImageGenerator();
+        imageGenerator.SaveRasterImageAsS3File(stitchedImage,regionName, accessKey, secretKey, bucketName,imageTilePath+"-"+zoomLevel+"-stitched", ImageType.PNG);
+        logger.info("[Babylon][stitchImagePartitions][Stop]");
+        return true;
+    }
+
+    /**
+     * Stitch image partitions from hadoop file.
+     *
+     * @param imageTilePath the image tile path
+     * @param resolutionX the resolution X
+     * @param resolutionY the resolution Y
+     * @param zoomLevel the zoom level
+     * @param partitionOnX the partition on X
+     * @param partitionOnY the partition on Y
+     * @return true, if successful
+     * @throws Exception the exception
+     */
+    public static boolean stitchImagePartitionsFromHadoopFile(String imageTilePath,  int resolutionX, int resolutionY, int zoomLevel, int partitionOnX, int partitionOnY) throws Exception
+    {
+        logger.info("[Babylon][stitchImagePartitions][Start]");
+
+        BufferedImage stitchedImage = BigBufferedImage.create(resolutionX, resolutionY,BufferedImage.TYPE_INT_ARGB);
+
+        String[] splitString = imageTilePath.split(":");
+        String hostName = splitString[0]+":"+splitString[1];
+        String[] portAndPath = splitString[2].split("/");
+        String port = portAndPath[0];
+        String localPath = "";
+        for(int i=1;i<portAndPath.length;i++)
+        {
+            localPath+="/"+portAndPath[i];
+        }
+
+        Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+        FileSystem hdfs = FileSystem.get(new URI(hostName+":"+port), hadoopConf);
+
+        //Stitch all image partitions together
+        for(int i=0;i<partitionOnX*partitionOnY;i++)
+        {
+            BufferedImage imageTile = null;
+            try {
+                if (hdfs.exists(new org.apache.hadoop.fs.Path(localPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX,partitionOnY,i)+".png")))
+                {
+                    InputStream inputStream = hdfs.open(new org.apache.hadoop.fs.Path(localPath+"-"+i+".png"));
+                    imageTile = ImageIO.read(inputStream);
+                    inputStream.close();
+                    hdfs.close();
+                }
+                else
+                {
+                    continue;
+                }
+            } catch (IOException e) {
+                continue;
+            }
+            Tuple2<Integer,Integer> partitionCoordinate = RasterizationUtils.Decode1DTo2DId(partitionOnX, partitionOnY, i);
+            int partitionMinX = partitionCoordinate._1*Math.round(resolutionX/partitionOnX);
+            int partitionMinY = partitionCoordinate._2*Math.round(resolutionY/partitionOnY);
+            //if(partitionMinX!=0){partitionMinX--;}
+            //if(partitionMinY!=0){partitionMinY--;}
+            int[] rgbArray = imageTile.getRGB(0, 0, imageTile.getWidth(), imageTile.getHeight(), null, 0, imageTile.getWidth());
+            int partitionMaxX = partitionMinX+imageTile.getWidth();
+            int partitionMaxY = partitionMinY+imageTile.getHeight();
+            logger.debug("[Babylon][stitchImagePartitions] stitching image tile..."+i+" ResolutionX " + resolutionX+" ResolutionY "+resolutionY);
+            logger.debug("[Babylon][stitchImagePartitions] stitching a image tile..."+i+" MinX "+partitionMinX+" MaxX "+partitionMaxX+" MinY "+partitionMinY+" MaxY "+partitionMaxY);
+            stitchedImage.setRGB(partitionMinX, partitionMinY, imageTile.getWidth(), imageTile.getHeight(), rgbArray, 0, imageTile.getWidth());
+        }
+        ImageGenerator imageGenerator = new ImageGenerator();
+        imageGenerator.SaveRasterImageAsLocalFile(stitchedImage,imageTilePath+"-"+zoomLevel+"-stitched", ImageType.PNG);
+        logger.info("[Babylon][stitchImagePartitions][Stop]");
+        return true;
+    }
+}
diff --git a/babylon/src/main/java/org/datasyslab/babylon/core/VisualizationOperator.java b/babylon/src/main/java/org/datasyslab/babylon/core/VisualizationOperator.java
index 23ad518..1b8a254 100644
--- a/babylon/src/main/java/org/datasyslab/babylon/core/VisualizationOperator.java
+++ b/babylon/src/main/java/org/datasyslab/babylon/core/VisualizationOperator.java
@@ -750,11 +750,12 @@
 
 
     /**
-     * Stitch image partitions.
+     * Sets the max pixel count.
      *
+     * @param maxPixelCount the max pixel count
      * @return true, if successful
-     * @throws Exception the exception
      */
+    /*
     public boolean stitchImagePartitions() throws Exception
     {
         logger.info("[Babylon][stitchImagePartitions][Start]");
@@ -778,6 +779,7 @@
         logger.info("[Babylon][stitchImagePartitions][Stop]");
         return true;
     }
+    */
 
     /**
      * Sets the max pixel count.
diff --git a/babylon/src/main/java/org/datasyslab/babylon/core/VisualizationPartitioner.java b/babylon/src/main/java/org/datasyslab/babylon/core/VisualizationPartitioner.java
index 2b8ef39..ec5918e 100644
--- a/babylon/src/main/java/org/datasyslab/babylon/core/VisualizationPartitioner.java
+++ b/babylon/src/main/java/org/datasyslab/babylon/core/VisualizationPartitioner.java
@@ -173,7 +173,7 @@
 		int partitionCoordinateY = coordinateY/partitionIntervalY;
 		int partitionId = -1;
 		try {
-			partitionId = RasterizationUtils.Encode2DTo1DId(partitionX,partitionY,partitionCoordinateX,partitionCoordinateY);
+			partitionId = RasterizationUtils.Encode2DTo1DId(partitionX,partitionY,partitionCoordinateX,partitionY-1-partitionCoordinateY);
 		} catch (Exception e) {
 			// TODO Auto-generated catch block
 			//e.printStackTrace();
diff --git a/babylon/src/main/java/org/datasyslab/babylon/extension/imageGenerator/BabylonImageGenerator.java b/babylon/src/main/java/org/datasyslab/babylon/extension/imageGenerator/BabylonImageGenerator.java
index 2e192c1..f2a99dd 100644
--- a/babylon/src/main/java/org/datasyslab/babylon/extension/imageGenerator/BabylonImageGenerator.java
+++ b/babylon/src/main/java/org/datasyslab/babylon/extension/imageGenerator/BabylonImageGenerator.java
@@ -11,24 +11,135 @@
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 
 import javax.imageio.ImageIO;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
-import org.datasyslab.babylon.core.ImageGenerator;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.datasyslab.babylon.core.ImageSerializableWrapper;
 import org.datasyslab.babylon.utils.ImageType;
+import org.datasyslab.babylon.utils.RasterizationUtils;
+import org.datasyslab.babylon.utils.S3Operator;
+import scala.Tuple2;
 
 // TODO: Auto-generated Javadoc
 /**
  * The Class BabylonImageGenerator.
  */
-public class BabylonImageGenerator extends ImageGenerator{
+public class BabylonImageGenerator {
+
 
 	/** The Constant logger. */
 	final static Logger logger = Logger.getLogger(BabylonImageGenerator.class);
-	/* (non-Javadoc)
-	 * @see org.datasyslab.babylon.core.AbstractImageGenerator#SaveRasterImageAsLocalFile(java.awt.image.BufferedImage, java.lang.String, org.datasyslab.babylon.utils.ImageType)
+
+	/**
+	 * Save raster image as local file.
+	 *
+	 * @param distributedImage the distributed image
+	 * @param outputPath the output path
+	 * @param imageType the image type
+	 * @param zoomLevel the zoom level
+	 * @param partitionOnX the partition on X
+	 * @param partitionOnY the partition on Y
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean SaveRasterImageAsLocalFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception
+	{
+		logger.info("[Babylon][SaveRasterImageAsLocalFile][Start]");
+		for(int i=0;i<partitionOnX*partitionOnY;i++) {
+			deleteLocalFile(outputPath+"-"+ RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i),imageType);
+		}
+		distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
+			@Override
+			public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
+				SaveRasterImageAsLocalFile(integerImageSerializableWrapperTuple2._2.getImage(), outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
+			}
+		});
+		logger.info("[Babylon][SaveRasterImageAsLocalFile][Stop]");
+		return true;
+	}
+
+
+	/**
+	 * Save raster image as hadoop file.
+	 *
+	 * @param distributedImage the distributed image
+	 * @param outputPath the output path
+	 * @param imageType the image type
+	 * @param zoomLevel the zoom level
+	 * @param partitionOnX the partition on X
+	 * @param partitionOnY the partition on Y
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean SaveRasterImageAsHadoopFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, final String outputPath, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY) throws Exception
+	{
+		logger.info("[Babylon][SaveRasterImageAsHadoopFile][Start]");
+		for(int i=0;i<partitionOnX*partitionOnY;i++) {
+			deleteHadoopFile(outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+".", imageType);
+		}
+		distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
+			@Override
+			public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
+				SaveRasterImageAsHadoopFile(integerImageSerializableWrapperTuple2._2.getImage(), outputPath+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
+			}
+		});
+		logger.info("[Babylon][SaveRasterImageAsHadoopFile][Stop]");
+		return true;
+	}
+
+	/**
+	 * Save raster image as S 3 file.
+	 *
+	 * @param distributedImage the distributed image
+	 * @param regionName the region name
+	 * @param accessKey the access key
+	 * @param secretKey the secret key
+	 * @param bucketName the bucket name
+	 * @param path the path
+	 * @param imageType the image type
+	 * @param zoomLevel the zoom level
+	 * @param partitionOnX the partition on X
+	 * @param partitionOnY the partition on Y
+	 * @return true, if successful
+	 */
+	public boolean SaveRasterImageAsS3File(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage,
+										   final String regionName, final String accessKey, final String secretKey,
+										   final String bucketName, final String path, final ImageType imageType, final int zoomLevel, final int partitionOnX, final int partitionOnY)
+	{
+		logger.info("[Babylon][SaveRasterImageAsS3File][Start]");
+		S3Operator s3Operator = new S3Operator(regionName, accessKey, secretKey);
+		for(int i=0;i<partitionOnX*partitionOnY;i++) {
+			s3Operator.deleteImage(bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,i)+"."+imageType.getTypeName());
+		}
+		distributedImage.foreach(new VoidFunction<Tuple2<Integer, ImageSerializableWrapper>>() {
+			@Override
+			public void call(Tuple2<Integer, ImageSerializableWrapper> integerImageSerializableWrapperTuple2) throws Exception {
+				SaveRasterImageAsS3File(integerImageSerializableWrapperTuple2._2.getImage(), regionName, accessKey, secretKey, bucketName, path+"-"+RasterizationUtils.getImageTileName(zoomLevel,partitionOnX, partitionOnY,integerImageSerializableWrapperTuple2._1), imageType);
+			}
+		});
+		logger.info("[Babylon][SaveRasterImageAsS3File][Stop]");
+		return true;
+	}
+
+	/**
+	 * Save raster image as local file.
+	 *
+	 * @param rasterImage the raster image
+	 * @param outputPath the output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws Exception the exception
 	 */
 	public boolean SaveRasterImageAsLocalFile(BufferedImage rasterImage, String outputPath, ImageType imageType) throws Exception
 	{
@@ -43,16 +154,114 @@
 		logger.info("[Babylon][SaveRasterImageAsLocalFile][Stop]");
 		return true;
 	}
-	
-	/* (non-Javadoc)
-	 * @see org.datasyslab.babylon.core.AbstractImageGenerator#SaveVectorImageAsLocalFile(java.util.List, java.lang.String, org.datasyslab.babylon.utils.ImageType)
+
+	/**
+	 * Save raster image as hadoop file.
+	 *
+	 * @param rasterImage the raster image
+	 * @param originalOutputPath the original output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean SaveRasterImageAsHadoopFile(BufferedImage rasterImage, String originalOutputPath, ImageType imageType) throws Exception
+	{
+		logger.info("[Babylon][SaveRasterImageAsHadoopFile][Start]");
+		// Locate HDFS path
+		String outputPath = originalOutputPath+"."+imageType.getTypeName();
+		String[] splitString = outputPath.split(":");
+		String hostName = splitString[0]+":"+splitString[1];
+		String[] portAndPath = splitString[2].split("/");
+		String port = portAndPath[0];
+		String localPath = "";
+		for(int i=1;i<portAndPath.length;i++)
+		{
+			localPath+="/"+portAndPath[i];
+		}
+		localPath+="."+imageType.getTypeName();
+		// Delete existing files
+		Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+		logger.info("[Babylon][SaveRasterImageAsSparkFile] HDFS URI BASE: "+hostName+":"+port);
+		FileSystem hdfs = FileSystem.get(new URI(hostName+":"+port), hadoopConf);
+		logger.info("[Babylon][SaveRasterImageAsSparkFile] Check the existence of path: "+localPath);
+		if (hdfs.exists(new org.apache.hadoop.fs.Path(localPath))) {
+			logger.info("[Babylon][SaveRasterImageAsSparkFile] Deleting path: "+localPath);
+			hdfs.delete(new org.apache.hadoop.fs.Path(localPath), true);
+			logger.info("[Babylon][SaveRasterImageAsSparkFile] Deleted path: "+localPath);
+		}
+		Path path = new Path(outputPath);
+		FSDataOutputStream out = hdfs.create(path);
+		ImageIO.write(rasterImage,"png",out);
+		out.close();
+		hdfs.close();
+		logger.info("[Babylon][SaveRasterImageAsHadoopFile][Stop]");
+		return true;
+	}
+
+
+
+	/**
+	 * Save raster image as S 3 file.
+	 *
+	 * @param rasterImage the raster image
+	 * @param regionName the region name
+	 * @param accessKey the access key
+	 * @param secretKey the secret key
+	 * @param bucketName the bucket name
+	 * @param path the path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws IOException Signals that an I/O exception has occurred.
+	 */
+	public boolean SaveRasterImageAsS3File(BufferedImage rasterImage, String regionName, String accessKey, String secretKey, String bucketName, String path, ImageType imageType) throws IOException {
+		logger.info("[Babylon][SaveRasterImageAsS3File][Start]");
+		S3Operator s3Operator = new S3Operator(regionName,accessKey, secretKey);
+		s3Operator.putImage(bucketName,path+"."+imageType.getTypeName(),rasterImage);
+		logger.info("[Babylon][SaveRasterImageAsS3File][Stop]");
+		return true;
+	}
+
+	/**
+	 * Save vector image as local file.
+	 *
+	 * @param distributedImage the distributed image
+	 * @param outputPath the output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean SaveVectorImageAsLocalFile(JavaPairRDD<Integer,String> distributedImage, String outputPath, ImageType imageType) throws Exception
+	{
+		logger.info("[Babylon][SaveVectormageAsLocalFile][Start]");
+		JavaRDD<String> distributedVectorImageNoKey= distributedImage.map(new Function<Tuple2<Integer,String>, String>()
+		{
+
+			@Override
+			public String call(Tuple2<Integer, String> vectorObject) throws Exception {
+				return vectorObject._2();
+			}
+
+		});
+		this.SaveVectorImageAsLocalFile(distributedVectorImageNoKey.collect(), outputPath, imageType);
+		logger.info("[Babylon][SaveVectormageAsLocalFile][Stop]");
+		return true;
+	}
+
+	/**
+	 * Save vector image as local file.
+	 *
+	 * @param vectorImage the vector image
+	 * @param outputPath the output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws Exception the exception
 	 */
 	public boolean SaveVectorImageAsLocalFile(List<String> vectorImage, String outputPath, ImageType imageType) throws Exception
 	{
 		logger.info("[Babylon][SaveVectorImageAsLocalFile][Start]");
 		File outputImage = new File(outputPath+"."+imageType.getTypeName());
 		outputImage.getParentFile().mkdirs();
-		
+
 		BufferedWriter bw = null;
 		FileWriter fw = null;
 		try {
@@ -86,4 +295,82 @@
 		logger.info("[Babylon][SaveVectorImageAsLocalFile][Stop]");
 		return true;
 	}
+
+	/**
+	 * Delete hadoop file.
+	 *
+	 * @param originalOutputPath the original output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean deleteHadoopFile(String originalOutputPath, ImageType imageType) throws Exception {
+		String outputPath = originalOutputPath+"."+imageType.getTypeName();
+		String[] splitString = outputPath.split(":");
+		String hostName = splitString[0]+":"+splitString[1];
+		String[] portAndPath = splitString[2].split("/");
+		String port = portAndPath[0];
+		String localPath = "";
+		for(int i=1;i<portAndPath.length;i++)
+		{
+			localPath+="/"+portAndPath[i];
+		}
+		localPath+="."+imageType.getTypeName();
+		// Delete existing files
+		Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
+		logger.info("[Babylon][SaveRasterImageAsSparkFile] HDFS URI BASE: "+hostName+":"+port);
+		FileSystem hdfs = FileSystem.get(new URI(hostName+":"+port), hadoopConf);
+		logger.info("[Babylon][SaveRasterImageAsSparkFile] Check the existence of path: "+localPath);
+		if (hdfs.exists(new org.apache.hadoop.fs.Path(localPath))) {
+			logger.info("[Babylon][SaveRasterImageAsSparkFile] Deleting path: "+localPath);
+			hdfs.delete(new org.apache.hadoop.fs.Path(localPath), true);
+			logger.info("[Babylon][SaveRasterImageAsSparkFile] Deleted path: "+localPath);
+		}
+		return true;
+	}
+
+	/**
+	 * Delete local file.
+	 *
+	 * @param originalOutputPath the original output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 */
+	public boolean deleteLocalFile(String originalOutputPath, ImageType imageType) {
+		File file = null;
+		try {
+
+			// create new file
+			file = new File(originalOutputPath+"."+imageType.getTypeName());
+
+			// tries to delete a non-existing file
+			file.delete();
+
+		} catch(Exception e) {
+
+			// if any error occurs
+			e.printStackTrace();
+		}
+		return true;
+	}
+
+	/**
+	 * Save raster image as local file.
+	 *
+	 * @param distributedImage the distributed image
+	 * @param outputPath the output path
+	 * @param imageType the image type
+	 * @return true, if successful
+	 * @throws Exception the exception
+	 */
+	public boolean SaveRasterImageAsLocalFile(JavaPairRDD<Integer,ImageSerializableWrapper> distributedImage, String outputPath, ImageType imageType) throws Exception
+	{
+		List<Tuple2<Integer,ImageSerializableWrapper>> imagePartitions = distributedImage.collect();
+		for(Tuple2<Integer,ImageSerializableWrapper> imagePartition:imagePartitions)
+		{
+			SaveRasterImageAsLocalFile(imagePartition._2.getImage(), outputPath+"-"+imagePartition._1, imageType);
+		}
+		return true;
+	}
+
 }
diff --git a/babylon/src/main/java/org/datasyslab/babylon/showcase/Example.java b/babylon/src/main/java/org/datasyslab/babylon/showcase/Example.java
index bfd4016..66c6707 100644
--- a/babylon/src/main/java/org/datasyslab/babylon/showcase/Example.java
+++ b/babylon/src/main/java/org/datasyslab/babylon/showcase/Example.java
@@ -17,6 +17,8 @@
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
+import org.datasyslab.babylon.core.ImageGenerator;
+import org.datasyslab.babylon.core.ImageStitcher;
 import org.datasyslab.babylon.core.RasterOverlayOperator;
 import org.datasyslab.babylon.extension.imageGenerator.BabylonImageGenerator;
 import org.datasyslab.babylon.extension.visualizationEffect.ChoroplethMap;
@@ -152,7 +154,7 @@
 			visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 			visualizationOperator.Visualize(sparkContext, spatialRDD);
 			imageGenerator = new BabylonImageGenerator();
-			imageGenerator.SaveVectorImageAsSparkFile(visualizationOperator.distributedVectorImage, "file://"+outputPath+"-distributed",ImageType.SVG);
+			imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.distributedVectorImage, "file://"+outputPath+"-distributed",ImageType.SVG);
 		}
 		catch(Exception e)
 		{
@@ -259,9 +261,9 @@
 			RectangleRDD spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY());
 			HeatMap visualizationOperator = new HeatMap(1000,600,USMainLandBoundary,false,2,4,4,true,true);
 			visualizationOperator.Visualize(sparkContext, spatialRDD);
-			visualizationOperator.stitchImagePartitions();
-			BabylonImageGenerator imageGenerator = new BabylonImageGenerator();
-			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, outputPath,ImageType.PNG);
+			ImageGenerator imageGenerator = new ImageGenerator();
+			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, outputPath,ImageType.PNG, 0, 4, 4);
+			ImageStitcher.stitchImagePartitionsFromLocalFile(outputPath, 1000, 600,0,4,4);
 		}
 		catch(Exception e)
 		{
diff --git a/babylon/src/main/java/org/datasyslab/babylon/utils/RasterizationUtils.java b/babylon/src/main/java/org/datasyslab/babylon/utils/RasterizationUtils.java
index 38b570f..fb70265 100644
--- a/babylon/src/main/java/org/datasyslab/babylon/utils/RasterizationUtils.java
+++ b/babylon/src/main/java/org/datasyslab/babylon/utils/RasterizationUtils.java
@@ -460,6 +460,11 @@
 			result.addAll(FindPixelCoordinates(resolutionX,resolutionY,pixelCoordinate1,pixelCoordinate2,reverseSpatialCoordinate));
 		}
 		return result;
-		
+	}
+
+	public static String getImageTileName(int zoomLevel, int partitionOnX, int partitionOnY, int tileSerialId)
+	{
+		Tuple2<Integer,Integer> tileCoordinate = RasterizationUtils.Decode1DTo2DId(partitionOnX,partitionOnY,tileSerialId);
+		return zoomLevel+"-"+tileCoordinate._1()+"-"+tileCoordinate._2();
 	}
 }
diff --git a/babylon/src/main/java/org/datasyslab/babylon/utils/S3Operator.java b/babylon/src/main/java/org/datasyslab/babylon/utils/S3Operator.java
new file mode 100644
index 0000000..7fd0334
--- /dev/null
+++ b/babylon/src/main/java/org/datasyslab/babylon/utils/S3Operator.java
@@ -0,0 +1,79 @@
+/**
+ * FILE: S3Operator.java
+ * PATH: org.datasyslab.babylon.utils.S3Operator.java
+ * Copyright (c) 2015-2017 GeoSpark Development Team
+ * All rights reserved.
+ */
+package org.datasyslab.babylon.utils;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.Bucket;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.S3Object;
+import org.apache.log4j.Logger;
+
+import javax.imageio.ImageIO;
+import java.awt.image.BufferedImage;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class S3Operator {
+
+	private AmazonS3 s3client;
+	
+	public final static Logger logger = Logger.getLogger(S3Operator.class);
+
+	public S3Operator(String regionName, String accessKey, String secretKey)
+	{
+		Regions region = Regions.fromName(regionName);
+	    BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey);
+		s3client = AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(new AWSStaticCredentialsProvider(awsCreds)).build();
+        logger.info("[Babylon][Constructor] Initialized a S3 client");
+    }
+	
+	public boolean createBucket(String bucketName) {
+		Bucket bucket = s3client.createBucket(bucketName);
+        logger.info("[Babylon][createBucket] Created a bucket: " + bucket.toString());
+		return true;
+	}
+	
+	public boolean deleteImage(String bucketName, String path) {
+		s3client.deleteObject(bucketName, path);
+        logger.info("[Babylon][deleteImage] Deleted an image if exist");
+        return true;
+	}
+	
+	public boolean putImage(String bucketName, String path, BufferedImage rasterImage) throws IOException {
+        deleteImage(bucketName,path);
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ImageIO.write(rasterImage, "png", outputStream);
+        byte[] buffer = outputStream.toByteArray();
+        InputStream inputStream = new ByteArrayInputStream(buffer);
+        ObjectMetadata metadata = new ObjectMetadata();
+        metadata.setContentLength(buffer.length);
+		s3client.putObject(new PutObjectRequest(bucketName, path, inputStream, metadata));
+		inputStream.close();
+		outputStream.close();
+        logger.info("[Babylon][putImage] Put an image");
+        return true;
+	}
+	
+	public BufferedImage getImage(String bucketName, String path) throws Exception {
+		logger.debug("[Babylon][getImage] Start");
+		S3Object s3Object =  s3client.getObject(bucketName, path);
+        InputStream inputStream = s3Object.getObjectContent();
+		BufferedImage rasterImage = ImageIO.read(inputStream);
+		inputStream.close();
+		s3Object.close();
+        logger.info("[Babylon][getImage] Got an image");
+		return rasterImage;
+	}
+}
diff --git a/babylon/src/main/scala/org/datasyslab/babylon/showcase/ScalaExample.scala b/babylon/src/main/scala/org/datasyslab/babylon/showcase/ScalaExample.scala
index 9534e6a..61e76b4 100644
--- a/babylon/src/main/scala/org/datasyslab/babylon/showcase/ScalaExample.scala
+++ b/babylon/src/main/scala/org/datasyslab/babylon/showcase/ScalaExample.scala
@@ -15,21 +15,16 @@
 import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
 import org.apache.spark.storage.StorageLevel
-import org.datasyslab.babylon.core.RasterOverlayOperator
-import org.datasyslab.babylon.extension.imageGenerator.BabylonImageGenerator
+import org.datasyslab.babylon.core.{ImageGenerator, RasterOverlayOperator}
 import org.datasyslab.babylon.extension.visualizationEffect.ChoroplethMap
 import org.datasyslab.babylon.extension.visualizationEffect.HeatMap
 import org.datasyslab.babylon.extension.visualizationEffect.ScatterPlot
 import org.datasyslab.babylon.utils.{ColorizeOption, ImageType}
-import org.datasyslab.geospark.enums.FileDataSplitter
-import org.datasyslab.geospark.enums.GridType
-import org.datasyslab.geospark.enums.IndexType
-import org.datasyslab.geospark.spatialOperator.JoinQuery
-import org.datasyslab.geospark.spatialRDD.PointRDD
-import org.datasyslab.geospark.spatialRDD.PolygonRDD
-import org.datasyslab.geospark.spatialRDD.RectangleRDD
 import com.vividsolutions.jts.geom.Envelope
+import org.datasyslab.geospark.enums.{FileDataSplitter, GridType, IndexType}
 import org.datasyslab.geospark.formatMapper.EarthdataHDFPointMapper
+import org.datasyslab.geospark.spatialOperator.JoinQuery
+import org.datasyslab.geospark.spatialRDD.{PointRDD, PolygonRDD, RectangleRDD}
 
 
 /**
@@ -100,18 +95,18 @@
 		var visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false)
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
 		visualizationOperator.Visualize(sparkContext, spatialRDD)
-		var imageGenerator = new BabylonImageGenerator
+		var imageGenerator = new ImageGenerator
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, outputPath, ImageType.PNG)
 		visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false, -1, -1, false, true)
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
 		visualizationOperator.Visualize(sparkContext, spatialRDD)
-		imageGenerator = new BabylonImageGenerator
+		imageGenerator = new ImageGenerator
 		imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.vectorImage, outputPath, ImageType.SVG)
 		visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false, -1, -1, true, true)
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
 		visualizationOperator.Visualize(sparkContext, spatialRDD)
-		imageGenerator = new BabylonImageGenerator
-		imageGenerator.SaveVectorImageAsSparkFile(visualizationOperator.distributedVectorImage, "file://" + outputPath + "-distributed", ImageType.SVG)
+		imageGenerator = new ImageGenerator
+		imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.distributedVectorImage, "file://" + outputPath + "-distributed", ImageType.SVG)
 		true
 	}
 
@@ -125,7 +120,7 @@
 		val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY)
 		val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2)
 		visualizationOperator.Visualize(sparkContext, spatialRDD)
-		val imageGenerator = new BabylonImageGenerator
+		val imageGenerator = new ImageGenerator
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, outputPath, ImageType.PNG)
 		true
 	}
@@ -151,7 +146,7 @@
 		frontImage.Visualize(sparkContext, queryRDD)
 		val overlayOperator = new RasterOverlayOperator(visualizationOperator.rasterImage)
 		overlayOperator.JoinImage(frontImage.rasterImage)
-		val imageGenerator = new BabylonImageGenerator
+		val imageGenerator = new ImageGenerator
 		imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage, outputPath, ImageType.PNG)
 		true
 	}
@@ -166,8 +161,8 @@
 		val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY)
 		val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2, 4, 4, true, true)
 		visualizationOperator.Visualize(sparkContext, spatialRDD)
-		val imageGenerator = new BabylonImageGenerator
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, outputPath, ImageType.PNG)
+		val imageGenerator = new ImageGenerator
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, outputPath, ImageType.PNG,0,4,4)
 		true
 	}
 
@@ -181,8 +176,7 @@
 		val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY)
 		val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2, 4, 4, true, true)
 		visualizationOperator.Visualize(sparkContext, spatialRDD)
-		visualizationOperator.stitchImagePartitions
-		val imageGenerator = new BabylonImageGenerator
+		val imageGenerator = new ImageGenerator
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, outputPath, ImageType.PNG)
 		true
 	}
@@ -194,7 +188,7 @@
 		val visualizationOperator = new ScatterPlot(1000, 600, spatialRDD.boundaryEnvelope, ColorizeOption.EARTHOBSERVATION, false, false)
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.BLUE, true)
 		visualizationOperator.Visualize(sparkContext, spatialRDD)
-		val imageGenerator = new BabylonImageGenerator
+		val imageGenerator = new ImageGenerator
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, outputPath, ImageType.PNG)
 		true
 	}
diff --git a/babylon/src/test/java/org/datasyslab/babylon/ChoroplethmapTest.java b/babylon/src/test/java/org/datasyslab/babylon/ChoroplethmapTest.java
index c518567..f6260a6 100644
--- a/babylon/src/test/java/org/datasyslab/babylon/ChoroplethmapTest.java
+++ b/babylon/src/test/java/org/datasyslab/babylon/ChoroplethmapTest.java
@@ -17,9 +17,9 @@
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
+import org.datasyslab.babylon.core.ImageGenerator;
 import org.datasyslab.babylon.core.RasterOverlayOperator;
 import org.datasyslab.babylon.core.VectorOverlayOperator;
-import org.datasyslab.babylon.extension.imageGenerator.BabylonImageGenerator;
 import org.datasyslab.babylon.extension.visualizationEffect.ChoroplethMap;
 import org.datasyslab.babylon.extension.visualizationEffect.ScatterPlot;
 import org.datasyslab.babylon.utils.ImageType;
@@ -184,7 +184,7 @@
 		RasterOverlayOperator overlayOperator = new RasterOverlayOperator(visualizationOperator.rasterImage);
 		overlayOperator.JoinImage(frontImage.rasterImage);
 		
-		BabylonImageGenerator imageGenerator = new BabylonImageGenerator();
+		ImageGenerator imageGenerator = new ImageGenerator();
 		imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage,"./target/choroplethmap/RectangleRDD-combined", ImageType.PNG);
 	}
 	
@@ -214,7 +214,7 @@
 		RasterOverlayOperator rasterOverlayOperator = new RasterOverlayOperator(visualizationOperator.rasterImage);
 		rasterOverlayOperator.JoinImage(frontImage.rasterImage);
 		
-		BabylonImageGenerator imageGenerator = new BabylonImageGenerator();
+		ImageGenerator imageGenerator = new ImageGenerator();
 		imageGenerator.SaveRasterImageAsLocalFile(rasterOverlayOperator.backRasterImage, "./target/choroplethmap/PolygonRDD-combined", ImageType.GIF);
 		
 		visualizationOperator = new ChoroplethMap(1000,600,USMainLandBoundary,false,true);
@@ -232,7 +232,7 @@
 		VectorOverlayOperator vectorOverlayOperator = new VectorOverlayOperator(visualizationOperator.vectorImage);
 		vectorOverlayOperator.JoinImage(frontImage.vectorImage);
 		
-		imageGenerator = new BabylonImageGenerator();
+		imageGenerator = new ImageGenerator();
 		imageGenerator.SaveVectorImageAsLocalFile(vectorOverlayOperator.backVectorImage, "./target/choroplethmap/PolygonRDD-combined", ImageType.SVG);
 	}
 
diff --git a/babylon/src/test/java/org/datasyslab/babylon/HeatmapTest.java b/babylon/src/test/java/org/datasyslab/babylon/HeatmapTest.java
index d93cc22..4c43242 100644
--- a/babylon/src/test/java/org/datasyslab/babylon/HeatmapTest.java
+++ b/babylon/src/test/java/org/datasyslab/babylon/HeatmapTest.java
@@ -14,7 +14,8 @@
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
-import org.datasyslab.babylon.extension.imageGenerator.BabylonImageGenerator;
+import org.datasyslab.babylon.core.ImageGenerator;
+import org.datasyslab.babylon.core.ImageStitcher;
 import org.datasyslab.babylon.extension.visualizationEffect.HeatMap;
 import org.datasyslab.babylon.utils.ImageType;
 import org.datasyslab.geospark.enums.FileDataSplitter;
@@ -159,7 +160,7 @@
 		PointRDD spatialRDD = new PointRDD(sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions, StorageLevel.MEMORY_ONLY());
 		HeatMap visualizationOperator = new HeatMap(800,500,USMainLandBoundary,false,3);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
+		ImageGenerator imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/heatmap/PointRDD", ImageType.PNG);
 	}
 	
@@ -173,9 +174,10 @@
 		RectangleRDD spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY());
 		HeatMap visualizationOperator = new HeatMap(800,500,USMainLandBoundary,false,2, 4,4,false,true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		visualizationOperator.stitchImagePartitions();
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/heatmap/RectangleRDD", ImageType.PNG);
+		
+		ImageGenerator imageGenerator = new ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, "./target/heatmap/RectangleRDD",ImageType.PNG, 0, 4, 4);
+		ImageStitcher.stitchImagePartitionsFromLocalFile("./target/heatmap/RectangleRDD", 800, 500,0,4,4);
 	}
 	
 	/**
@@ -189,9 +191,10 @@
 		PolygonRDD spatialRDD = new PolygonRDD(sparkContext, PolygonInputLocation, PolygonSplitter, false, PolygonNumPartitions, StorageLevel.MEMORY_ONLY());
 		HeatMap visualizationOperator = new HeatMap(800,500,USMainLandBoundary,false,2);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/heatmap/PolygonRDD", ImageType.PNG);
 		
+		ImageGenerator imageGenerator = new ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/heatmap/PolygonRDD",ImageType.PNG);
+
 	}
 	
 	/**
@@ -204,7 +207,8 @@
 		LineStringRDD spatialRDD = new LineStringRDD(sparkContext, LineStringInputLocation, LineStringSplitter, false, LineStringNumPartitions, StorageLevel.MEMORY_ONLY());
 		HeatMap visualizationOperator = new HeatMap(800,500,USMainLandBoundary,false,2);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/heatmap/LineStringRDD", ImageType.PNG);
+		
+		ImageGenerator imageGenerator = new ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/heatmap/LineStringRDD",ImageType.PNG);
 	}
 }
diff --git a/babylon/src/test/java/org/datasyslab/babylon/NYCTripTest.java b/babylon/src/test/java/org/datasyslab/babylon/NYCTripTest.java
index 7263ad1..e0e037c 100644
--- a/babylon/src/test/java/org/datasyslab/babylon/NYCTripTest.java
+++ b/babylon/src/test/java/org/datasyslab/babylon/NYCTripTest.java
@@ -19,7 +19,7 @@
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
-import org.datasyslab.babylon.extension.imageGenerator.BabylonImageGenerator;
+import org.datasyslab.babylon.core.ImageGenerator;
 import org.datasyslab.babylon.extension.visualizationEffect.HeatMap;
 import org.datasyslab.babylon.utils.ImageType;
 import org.datasyslab.babylon.utils.RasterizationUtils;
@@ -197,7 +197,7 @@
 		HeatMap visualizationOperator = new HeatMap(resolutionX,resolutionY,NYCBoundary,false,5);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
 		
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
+		ImageGenerator imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/heatmap/NYCTrip", ImageType.PNG);	
 	}
 }
diff --git a/babylon/src/test/java/org/datasyslab/babylon/ParallelVisualizationTest.java b/babylon/src/test/java/org/datasyslab/babylon/ParallelVisualizationTest.java
index 2d00221..90656ae 100644
--- a/babylon/src/test/java/org/datasyslab/babylon/ParallelVisualizationTest.java
+++ b/babylon/src/test/java/org/datasyslab/babylon/ParallelVisualizationTest.java
@@ -14,7 +14,8 @@
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
-import org.datasyslab.babylon.extension.imageGenerator.BabylonImageGenerator;
+import org.datasyslab.babylon.core.ImageGenerator;
+import org.datasyslab.babylon.core.ImageStitcher;
 import org.datasyslab.babylon.extension.visualizationEffect.HeatMap;
 import org.datasyslab.babylon.utils.ImageType;
 import org.datasyslab.geospark.enums.FileDataSplitter;
@@ -179,10 +180,11 @@
 		PointRDD spatialRDD = new PointRDD(sparkContext, PointInputLocation, PointOffset, PointSplitter, false, PointNumPartitions, StorageLevel.MEMORY_ONLY());
 		HeatMap visualizationOperator = new HeatMap(resolutionX,resolutionY,USMainLandBoundary,false,2,partitionX,partitionY,true,true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		visualizationOperator.stitchImagePartitions();
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/parallelvisualization/PointRDD",ImageType.PNG);
-	}
+
+		ImageGenerator imageGenerator = new  ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, "./target/parallelvisualization/PointRDD",ImageType.PNG,0,partitionX, partitionY);
+        ImageStitcher.stitchImagePartitionsFromLocalFile("./target/parallelvisualization/PointRDD", resolutionX,resolutionY,0,partitionX, partitionY);
+    }
 	
 	/**
 	 * Test rectangle RDD visualization with tiles.
@@ -194,9 +196,10 @@
 		RectangleRDD spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY());
 		HeatMap visualizationOperator = new HeatMap(resolutionX,resolutionY,USMainLandBoundary,false,2,partitionX,partitionY,true,true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		visualizationOperator.stitchImagePartitions();
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/parallelvisualization/RectangleRDDWithTiles",ImageType.PNG);
+
+		ImageGenerator imageGenerator = new  ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, "./target/parallelvisualization/RectangleRDDWithTiles",ImageType.PNG,0,partitionX, partitionY);
+        ImageStitcher.stitchImagePartitionsFromLocalFile("./target/parallelvisualization/RectangleRDDWithTiles", resolutionX,resolutionY,0,partitionX, partitionY);
 	}
 
     /**
@@ -207,10 +210,12 @@
     @Test
     public void testRectangleRDDVisualizationNoTiles() throws Exception {
         RectangleRDD spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY());
-        HeatMap visualizationOperator = new HeatMap(resolutionX,resolutionY,USMainLandBoundary,false,5,partitionX,partitionY,true,false);
+        HeatMap visualizationOperator = new HeatMap(resolutionX,resolutionY,USMainLandBoundary,false,5,partitionX,partitionY,true,true);
         visualizationOperator.Visualize(sparkContext, spatialRDD);
-        BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-        imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/parallelvisualization/RectangleRDDNoTiles",ImageType.PNG);
+
+        ImageGenerator imageGenerator = new  ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, "./target/parallelvisualization/RectangleRDDNoTiles",ImageType.PNG, 0, partitionX, partitionY);
+		ImageStitcher.stitchImagePartitionsFromLocalFile("./target/parallelvisualization/RectangleRDDNoTiles", resolutionX,resolutionY,0,partitionX, partitionY);
     }
 
 	/**
@@ -224,9 +229,11 @@
 		PolygonRDD spatialRDD = new PolygonRDD(sparkContext, PolygonInputLocation, PolygonSplitter, false, PolygonNumPartitions, StorageLevel.MEMORY_ONLY());
 		HeatMap visualizationOperator = new HeatMap(resolutionX,resolutionY,USMainLandBoundary,false,2,partitionX,partitionY,true,true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		visualizationOperator.stitchImagePartitions();
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/parallelvisualization/PolygonRDD",ImageType.PNG);
+		
+		ImageGenerator imageGenerator = new ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, "./target/parallelvisualization/PolygonRDD",ImageType.PNG, 0, partitionX, partitionY);
+		ImageStitcher.stitchImagePartitionsFromLocalFile("./target/parallelvisualization/PolygonRDD", resolutionX,resolutionY,0,partitionX, partitionY);
+		
 	}
 	
 	/**
@@ -239,8 +246,9 @@
 		LineStringRDD spatialRDD = new LineStringRDD(sparkContext, LineStringInputLocation, LineStringSplitter, false, LineStringNumPartitions, StorageLevel.MEMORY_ONLY());
 		HeatMap visualizationOperator = new HeatMap(resolutionX,resolutionY,USMainLandBoundary,false,2,partitionX,partitionY,true,true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		visualizationOperator.stitchImagePartitions();
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/parallelvisualization/LineStringRDD",ImageType.PNG);
+		
+		ImageGenerator imageGenerator = new ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, "./target/parallelvisualization/LineStringRDD",ImageType.PNG, 0, partitionX, partitionY);
+		ImageStitcher.stitchImagePartitionsFromLocalFile("./target/parallelvisualization/LineStringRDD", resolutionX,resolutionY,0,partitionX, partitionY);
 	}
 }
diff --git a/babylon/src/test/java/org/datasyslab/babylon/ScatterplotTest.java b/babylon/src/test/java/org/datasyslab/babylon/ScatterplotTest.java
index 024c9f4..14dcc82 100644
--- a/babylon/src/test/java/org/datasyslab/babylon/ScatterplotTest.java
+++ b/babylon/src/test/java/org/datasyslab/babylon/ScatterplotTest.java
@@ -15,7 +15,8 @@
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
-import org.datasyslab.babylon.extension.imageGenerator.BabylonImageGenerator;
+import org.datasyslab.babylon.core.ImageGenerator;
+import org.datasyslab.babylon.core.ImageStitcher;
 import org.datasyslab.babylon.extension.visualizationEffect.ScatterPlot;
 import org.datasyslab.babylon.utils.ColorizeOption;
 import org.datasyslab.babylon.utils.ImageType;
@@ -195,13 +196,13 @@
 		ScatterPlot visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
+		ImageGenerator imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/scatterplot/PointRDD",ImageType.PNG);
 		
 		visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false,true);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		imageGenerator = new  BabylonImageGenerator();
+		imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.vectorImage, "./target/scatterplot/PointRDD",ImageType.SVG);
 	}
 
@@ -217,10 +218,10 @@
 				false, 4, 4, true, false);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		visualizationOperator.stitchImagePartitions();
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/scatterplot/PointRDD-parallelrender",ImageType.PNG);
-		//imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, "./target/scatterplot/PointRDD-parallelrender",ImageType.PNG);
+
+		ImageGenerator imageGenerator = new ImageGenerator();
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, "./target/heatmap/PointRDD-parallelrender",ImageType.PNG, 0, 4, 4);
+		ImageStitcher.stitchImagePartitionsFromLocalFile("./target/scatterplot/PointRDD-parallelrender", 1000, 600,0,4,4);
 	}
 	
 	/**
@@ -234,17 +235,17 @@
 		ScatterPlot visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false,2,2,true,false);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
+		ImageGenerator imageGenerator = new  ImageGenerator();
 		
         String scatterPlotOutputPath = System.getProperty("user.dir") + "/target/scatterplot/";
 
-		imageGenerator.SaveRasterImageAsSparkFile(visualizationOperator.distributedRasterImage, scatterPlotOutputPath+"PointRDD-parallel-raster",ImageType.PNG);
+		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, scatterPlotOutputPath+"PointRDD-parallel-raster",ImageType.PNG);
 		
 		visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false,-1,-1,true,true);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		imageGenerator = new  BabylonImageGenerator();
-		imageGenerator.SaveVectorImageAsSparkFile(visualizationOperator.distributedVectorImage, scatterPlotOutputPath+"PointRDD-parallel-vector",ImageType.SVG);
+		imageGenerator = new  ImageGenerator();
+		imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.distributedVectorImage, scatterPlotOutputPath+"PointRDD-parallel-vector",ImageType.SVG);
 	}
 	
 	/**
@@ -258,13 +259,13 @@
 		ScatterPlot visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.RED, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
+		ImageGenerator imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/scatterplot/RectangleRDD",ImageType.GIF);
 		
 		visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false,true);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.RED, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		imageGenerator = new  BabylonImageGenerator();
+		imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.vectorImage, "./target/scatterplot/RectangleRDD",ImageType.SVG);	
 	}
 	
@@ -280,13 +281,13 @@
 		ScatterPlot visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
+		ImageGenerator imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/scatterplot/PolygonRDD",ImageType.GIF);
 		
 		visualizationOperator = new ScatterPlot(1000,600,USMainLandBoundary,false,true);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		imageGenerator = new  BabylonImageGenerator();
+		imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.vectorImage, "./target/scatterplot/PolygonRDD",ImageType.SVG);
 	}
 	
@@ -304,13 +305,13 @@
 		ScatterPlot visualizationOperator = new ScatterPlot(resolutionX,resolutionY,USMainLandBoundary,false);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		BabylonImageGenerator imageGenerator = new  BabylonImageGenerator();
+		ImageGenerator imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, "./target/scatterplot/LineStringRDD",ImageType.GIF);
 		
 		visualizationOperator = new ScatterPlot(resolutionX,resolutionY,USMainLandBoundary,false,true);
 		visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true);
 		visualizationOperator.Visualize(sparkContext, spatialRDD);
-		imageGenerator = new  BabylonImageGenerator();
+		imageGenerator = new  ImageGenerator();
 		imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.vectorImage, "./target/scatterplot/LineStringRDD",ImageType.SVG);
 	}
 }
diff --git a/babylon/src/test/scala/org/datasyslab/geospark/scalaTest.scala b/babylon/src/test/scala/org/datasyslab/babylon/scalaTest.scala
similarity index 90%
rename from babylon/src/test/scala/org/datasyslab/geospark/scalaTest.scala
rename to babylon/src/test/scala/org/datasyslab/babylon/scalaTest.scala
index 311be78..ffb8ce3 100644
--- a/babylon/src/test/scala/org/datasyslab/geospark/scalaTest.scala
+++ b/babylon/src/test/scala/org/datasyslab/babylon/scalaTest.scala
@@ -1,4 +1,4 @@
-package org.datasyslab.geospark
+package org.datasyslab.babylon
 
 import java.awt.Color
 import java.io.FileInputStream
@@ -8,8 +8,7 @@
 import org.apache.log4j.{Level, Logger}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.{SparkConf, SparkContext}
-import org.datasyslab.babylon.core.RasterOverlayOperator
-import org.datasyslab.babylon.extension.imageGenerator.BabylonImageGenerator
+import org.datasyslab.babylon.core.{ImageGenerator, RasterOverlayOperator}
 import org.datasyslab.babylon.extension.visualizationEffect.{ChoroplethMap, HeatMap, ScatterPlot}
 import org.datasyslab.babylon.utils.{ColorizeOption, ImageType}
 import org.datasyslab.geospark.enums.{FileDataSplitter, GridType, IndexType}
@@ -74,18 +73,18 @@
 			var visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false)
 			visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
 			visualizationOperator.Visualize(sparkContext, spatialRDD)
-			var imageGenerator = new BabylonImageGenerator
+			var imageGenerator = new ImageGenerator
 			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, scatterPlotOutputPath, ImageType.PNG)
 			visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false, -1, -1, false, true)
 			visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
 			visualizationOperator.Visualize(sparkContext, spatialRDD)
-			imageGenerator = new BabylonImageGenerator
+			imageGenerator = new ImageGenerator
 			imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.vectorImage, scatterPlotOutputPath, ImageType.SVG)
 			visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary, false, -1, -1, true, true)
 			visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
 			visualizationOperator.Visualize(sparkContext, spatialRDD)
-			imageGenerator = new BabylonImageGenerator
-			imageGenerator.SaveVectorImageAsSparkFile(visualizationOperator.distributedVectorImage, "file://" + scatterPlotOutputPath + "-distributed", ImageType.SVG)
+			imageGenerator = new ImageGenerator
+			imageGenerator.SaveVectorImageAsLocalFile(visualizationOperator.distributedVectorImage, scatterPlotOutputPath + "-distributed", ImageType.SVG)
 			true
 		}
 
@@ -93,7 +92,7 @@
 			val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY)
 			val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2)
 			visualizationOperator.Visualize(sparkContext, spatialRDD)
-			val imageGenerator = new BabylonImageGenerator
+			val imageGenerator = new ImageGenerator
 			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, heatMapOutputPath, ImageType.PNG)
 			true
 		}
@@ -113,7 +112,7 @@
 			frontImage.Visualize(sparkContext, queryRDD)
 			val overlayOperator = new RasterOverlayOperator(visualizationOperator.rasterImage)
 			overlayOperator.JoinImage(frontImage.rasterImage)
-			val imageGenerator = new BabylonImageGenerator
+			val imageGenerator = new ImageGenerator
 			imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage, choroplethMapOutputPath, ImageType.PNG)
 			true
 		}
@@ -122,8 +121,8 @@
 			val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY)
 			val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2, 4, 4, true, true)
 			visualizationOperator.Visualize(sparkContext, spatialRDD)
-			val imageGenerator = new BabylonImageGenerator
-			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, parallelFilterRenderStitchOutputPath, ImageType.PNG)
+			val imageGenerator = new ImageGenerator
+			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, parallelFilterRenderStitchOutputPath, ImageType.PNG, 0, 4, 4)
 			true
 		}
 
@@ -131,9 +130,8 @@
 			val spatialRDD = new RectangleRDD(sparkContext, RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions, StorageLevel.MEMORY_ONLY)
 			val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary, false, 2, 4, 4, true, true)
 			visualizationOperator.Visualize(sparkContext, spatialRDD)
-			visualizationOperator.stitchImagePartitions
-			val imageGenerator = new BabylonImageGenerator
-			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, parallelFilterRenderStitchOutputPath+"-stitched", ImageType.PNG)
+			val imageGenerator = new ImageGenerator
+			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage, parallelFilterRenderStitchOutputPath, ImageType.PNG)
 			true
 		}
 
@@ -144,7 +142,7 @@
 			val visualizationOperator = new ScatterPlot(1000, 600, spatialRDD.boundaryEnvelope, ColorizeOption.EARTHOBSERVATION, false, false)
 			visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.BLUE, true)
 			visualizationOperator.Visualize(sparkContext, spatialRDD)
-			val imageGenerator = new BabylonImageGenerator
+			val imageGenerator = new ImageGenerator
 			imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage, earthdataScatterPlotOutputPath, ImageType.PNG)
 			true
 		}
diff --git a/core/pom.xml b/core/pom.xml
index 2358f48..258203d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -3,7 +3,7 @@
 	<modelVersion>4.0.0</modelVersion>
 	<groupId>org.datasyslab</groupId>
 	<artifactId>geospark</artifactId>
-	<version>0.8.0</version>
+	<version>0.8.1</version>
 	
 	<name>${project.groupId}:${project.artifactId}</name>
 	<description>Geospatial extension for Apache Spark</description>
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/ShapefileRDD.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/ShapefileRDD.java
index 0d43ec7..0df1a93 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/ShapefileRDD.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/ShapefileRDD.java
@@ -13,6 +13,8 @@
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.VoidFunction;
+import org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.BoundBox;
+import org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShpFileParser;
 import org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.TypeUnknownException;
 import org.datasyslab.geospark.formatMapper.shapefileParser.shapes.PrimitiveShape;
 import org.datasyslab.geospark.formatMapper.shapefileParser.shapes.ShapeInputFormat;
@@ -38,13 +40,19 @@
     /** The geometry factory. */
     public static GeometryFactory geometryFactory;
 
+    /**  bounding box. */
+    private BoundBox boundBox = null;
+
     /**
-     *  ShapefileRDD
+     *  ShapefileRDD.
+     *
      * @param sparkContext the spark context
      * @param filePath the file path
      */
     public ShapefileRDD(JavaSparkContext sparkContext, String filePath){
         geometryFactory = new GeometryFactory();
+        boundBox = new BoundBox();
+        ShpFileParser.boundBox = boundBox;
         JavaPairRDD<ShapeKey, PrimitiveShape> shapePrimitiveRdd = sparkContext.newAPIHadoopFile(
                 filePath,
                 ShapeInputFormat.class,
@@ -108,7 +116,9 @@
                     MultiPoint multiObjects = (MultiPoint)spatialObject;
                     for (int i=0;i<multiObjects.getNumGeometries();i++)
                     {
-                        result.add((Point) multiObjects.getGeometryN(i));
+                        Point oneObject = (Point) multiObjects.getGeometryN(i);
+                        oneObject.setUserData(multiObjects.getUserData());
+                        result.add(oneObject);
                     }
                 }
                 else if(spatialObject instanceof Point)
@@ -141,7 +151,9 @@
                     MultiPolygon multiObjects = (MultiPolygon)spatialObject;
                     for (int i=0;i<multiObjects.getNumGeometries();i++)
                     {
-                        result.add((Polygon) multiObjects.getGeometryN(i));
+                        Polygon oneObject = (Polygon) multiObjects.getGeometryN(i);
+                        oneObject.setUserData(multiObjects.getUserData());
+                        result.add(oneObject);
                     }
                 }
                 else if (spatialObject instanceof Polygon)
@@ -174,7 +186,9 @@
                     MultiLineString multiObjects = (MultiLineString)spatialObject;
                     for (int i=0;i<multiObjects.getNumGeometries();i++)
                     {
-                        result.add((LineString) multiObjects.getGeometryN(i));
+                        LineString oneObject = (LineString) multiObjects.getGeometryN(i);
+                        oneObject.setUserData(multiObjects.getUserData());
+                        result.add(oneObject);
                     }
                 }
                 else if(spatialObject instanceof LineString)
@@ -191,4 +205,21 @@
         });
     }
 
+    /**
+     * Gets the bound box.
+     *
+     * @return the bound box
+     */
+    public BoundBox getBoundBox(){
+        return boundBox;
+    }
+
+    /**
+     * Count.
+     *
+     * @return the long
+     */
+    public long count(){
+        return shapeRDD.count();
+    }
 }
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/dbf/DbfParseUtil.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/dbf/DbfParseUtil.java
index fbc1d14..a4b8feb 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/dbf/DbfParseUtil.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/dbf/DbfParseUtil.java
@@ -131,13 +131,38 @@
     {
         byte[] delimiter = {'\t'};
         Text attributes = new Text();
-        for(FieldDescriptor descriptor : fieldDescriptors){
+        for(int i = 0;i < fieldDescriptors.size(); ++i){
+            FieldDescriptor descriptor = fieldDescriptors.get(i);
             byte[] fldBytes = new byte[descriptor.getFieldLength()];
             inputStream.readFully(fldBytes);
-            attributes.append(fldBytes,0,fldBytes.length);
-            attributes.append(delimiter,0,1);
+            //System.out.println(descriptor.getFiledName() + "  " + new String(fldBytes));
+            byte[] attr = new String(fldBytes).trim().getBytes();
+            if(i > 0) attributes.append(delimiter, 0, 1);// first attribute doesn't append '\t'
+            attributes.append(attr, 0, attr.length);
         }
+        String attrs = attributes.toString();
         return attributes.toString();
+
+    }
+
+    /**
+     * Copied from org.geotools.data.shapefile.dbf.fastParse
+     * Performs a faster byte[] to String conversion under the assumption the content
+     * is represented with one byte per char
+     * @param bytes
+     * @param fieldOffset
+     * @param fieldLen
+     * @return
+     */
+    private static String fastParse(final byte[] bytes, final int fieldOffset, final int fieldLen) {
+        // faster reading path, the decoder is for some reason slower,
+        // probably because it has to make extra checks to support multibyte chars
+        final char[] chars = new char[fieldLen];
+        for (int i = 0; i < fieldLen; i++) {
+            // force the byte to a positive integer interpretation before casting to char
+            chars[i] = ((char) (0x00FF & bytes[fieldOffset+i]));
+        }
+        return new String(chars);
     }
 
 
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/dbf/FieldDescriptor.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/dbf/FieldDescriptor.java
index 4fa4353..ad44156 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/dbf/FieldDescriptor.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/dbf/FieldDescriptor.java
@@ -8,48 +8,92 @@
 
 import java.io.Serializable;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class FieldDescriptor.
+ */
 public class FieldDescriptor implements Serializable{
 
-    /** field name */
+    /**  field name. */
     private String filedName = null;
 
-    /** field type */
+    /**  field type. */
     private byte fieldType = 0;
 
-    /** field length */
+    /**  field length. */
     private int fieldLength = 0;
 
-    /** decimal count */
+    /**  decimal count. */
     private byte fieldDecimalCount = 0;
 
+    /**
+     * Gets the filed name.
+     *
+     * @return the filed name
+     */
     public String getFiledName() {
         return filedName;
     }
 
+    /**
+     * Sets the filed name.
+     *
+     * @param filedName the new filed name
+     */
     public void setFiledName(String filedName) {
         this.filedName = filedName;
     }
 
+    /**
+     * Gets the field type.
+     *
+     * @return the field type
+     */
     public byte getFieldType() {
         return fieldType;
     }
 
+    /**
+     * Sets the field type.
+     *
+     * @param fieldType the new field type
+     */
     public void setFieldType(byte fieldType) {
         this.fieldType = fieldType;
     }
 
+    /**
+     * Gets the field length.
+     *
+     * @return the field length
+     */
     public int getFieldLength() {
         return fieldLength;
     }
 
+    /**
+     * Sets the field length.
+     *
+     * @param fieldLength the new field length
+     */
     public void setFieldLength(int fieldLength) {
         this.fieldLength = fieldLength;
     }
 
+    /**
+     * Gets the field decimal count.
+     *
+     * @return the field decimal count
+     */
     public byte getFieldDecimalCount() {
         return fieldDecimalCount;
     }
 
+    /**
+     * Sets the field decimal count.
+     *
+     * @param fieldDecimalCount the new field decimal count
+     */
     public void setFieldDecimalCount(byte fieldDecimalCount) {
         this.fieldDecimalCount = fieldDecimalCount;
     }
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/BoundBox.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/BoundBox.java
new file mode 100644
index 0000000..cf2c28f
--- /dev/null
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/BoundBox.java
@@ -0,0 +1,117 @@
+/**
+ * FILE: BoundBox.java
+ * PATH: org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.BoundBox.java
+ * Copyright (c) 2015-2017 GeoSpark Development Team
+ * All rights reserved.
+ */
+package org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp;
+
+import java.io.Serializable;
+
+// TODO: Auto-generated Javadoc
+/**
+ * Created by zongsizhang on 7/6/17.
+ */
+public class BoundBox implements Serializable{
+
+    /** bounds of 8 numbers. Xmin, Ymin, Xmax, Ymax, Zmin, Zmax, Mmin, Mmax */
+    double[] bounds = null;
+
+    /**
+     * construct bounds with an array.
+     *
+     * @param bounds the bounds
+     */
+    public BoundBox(double[] bounds) {
+        this.bounds = bounds;
+    }
+
+    /**
+     * construct a initial boundBox with all value 0.
+     */
+    public BoundBox() {
+        bounds = new double[8];
+    }
+
+    /**
+     * set value at i with value.
+     *
+     * @param i the i
+     * @param value the value
+     */
+    public void set(int i, double value){
+        bounds[i] = value;
+    }
+
+    /**
+     * Gets the x min.
+     *
+     * @return the x min
+     */
+    public double getXMin(){
+        return bounds[0];
+    }
+
+    /**
+     * Gets the x max.
+     *
+     * @return the x max
+     */
+    public double getXMax(){
+        return bounds[2];
+    }
+
+    /**
+     * Gets the y min.
+     *
+     * @return the y min
+     */
+    public double getYMin(){
+        return bounds[1];
+    }
+
+    /**
+     * Gets the y max.
+     *
+     * @return the y max
+     */
+    public double getYMax(){
+        return bounds[3];
+    }
+
+    /**
+     * Gets the z min.
+     *
+     * @return the z min
+     */
+    public double getZMin(){
+        return bounds[4];
+    }
+
+    /**
+     * Gets the z max.
+     *
+     * @return the z max
+     */
+    public double getZMax(){
+        return bounds[5];
+    }
+
+    /**
+     * Gets the m min.
+     *
+     * @return the m min
+     */
+    public double getMMin(){
+        return bounds[6];
+    }
+
+    /**
+     * Gets the m max.
+     *
+     * @return the m max
+     */
+    public double getMMax(){
+        return bounds[7];
+    }
+}
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ByteBufferReader.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ByteBufferReader.java
index 0dfa360..783cfd2 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ByteBufferReader.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ByteBufferReader.java
@@ -11,13 +11,19 @@
 import java.nio.ByteOrder;
 import java.nio.DoubleBuffer;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class ByteBufferReader.
+ */
 public class ByteBufferReader extends ShapeReader{
 
+    /** The buffer. */
     private ByteBuffer buffer = null;
 
     /**
-     * construct the reader with byte array
-     * @param bytes
+     * construct the reader with byte array.
+     *
+     * @param bytes the bytes
      * @param endianOrder false = little true = big
      */
     public ByteBufferReader(byte[] bytes, boolean endianOrder){
@@ -26,32 +32,50 @@
         else buffer.order(ByteOrder.LITTLE_ENDIAN);
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#readDouble()
+     */
     @Override
     public double readDouble() throws IOException {
         return buffer.getDouble();
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#readInt()
+     */
     @Override
     public int readInt() throws IOException {
         return buffer.getInt();
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#read(byte[])
+     */
     @Override
     public void read(byte[] bytes) throws IOException {
         buffer.get(bytes);
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#read(byte[], int, int)
+     */
     @Override
     public void read(byte[] bytes, int offset, int len) throws IOException {
         buffer.get(bytes, offset, len);
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#read(double[])
+     */
     @Override
     public void read(double[] doubles) throws IOException {
         DoubleBuffer doubleBuffer = buffer.asDoubleBuffer();
         doubleBuffer.get(doubles);
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#skip(int)
+     */
     @Override
     public void skip(int n) throws IOException {
         buffer.position(buffer.position() + n);
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/DataInputStreamReader.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/DataInputStreamReader.java
index 0bf0cbb..276dbec 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/DataInputStreamReader.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/DataInputStreamReader.java
@@ -11,35 +11,63 @@
 import java.nio.ByteBuffer;
 import java.nio.DoubleBuffer;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class DataInputStreamReader.
+ */
 public class DataInputStreamReader extends ShapeReader {
 
+    /** The input stream. */
     private DataInputStream inputStream = null;
 
+    /**
+     * Instantiates a new data input stream reader.
+     *
+     * @param dis the dis
+     */
     public DataInputStreamReader(DataInputStream dis){
         inputStream = dis;
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#readDouble()
+     */
     @Override
     public double readDouble() throws IOException {
 
         return inputStream.readDouble();
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#readInt()
+     */
     @Override
     public int readInt() throws IOException {
-        return inputStream.readInt();
+        byte[] intbytes = new byte[ShapeFileConst.INT_LENGTH];
+        this.read(intbytes);
+
+        return ByteBuffer.wrap(intbytes).getInt();
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#read(byte[])
+     */
     @Override
     public void read(byte[] bytes) throws IOException {
-        inputStream.read(bytes);
+        inputStream.readFully(bytes);
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#read(byte[], int, int)
+     */
     @Override
     public void read(byte[] bytes, int offset, int len) throws IOException {
-        inputStream.read(bytes, offset, len);
+        inputStream.readFully(bytes, offset, len);
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#read(double[])
+     */
     @Override
     public void read(double[] doubles) throws IOException {
         byte[] bytes = new byte[doubles.length * ShapeFileConst.DOUBLE_LENGTH];
@@ -48,6 +76,9 @@
         doubleBuffer.get(doubles);
     }
 
+    /* (non-Javadoc)
+     * @see org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeReader#skip(int)
+     */
     @Override
     public void skip(int n) throws IOException {
         inputStream.skip(n);
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/MultiPointParser.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/MultiPointParser.java
index db90011..93e05dd 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/MultiPointParser.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/MultiPointParser.java
@@ -13,21 +13,27 @@
 
 import java.io.IOException;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class MultiPointParser.
+ */
 public class MultiPointParser extends ShapeParser {
 
     /**
-     * create a parser that can abstract a MultiPolyline from input source with given GeometryFactory
-     * @param geometryFactory
+     * create a parser that can abstract a MultiPolyline from input source with given GeometryFactory.
+     *
+     * @param geometryFactory the geometry factory
      */
     public MultiPointParser(GeometryFactory geometryFactory) {
         super(geometryFactory);
     }
 
     /**
-     * abstract a MultiPoint shape
-     * @param reader
-     * @return
-     * @throws IOException
+     * abstract a MultiPoint shape.
+     *
+     * @param reader the reader
+     * @return the geometry
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     @Override
     public Geometry parserShape(ShapeReader reader) throws IOException {
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PointParser.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PointParser.java
index fdc840b..e2a9603 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PointParser.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PointParser.java
@@ -13,21 +13,27 @@
 
 import java.io.IOException;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PointParser.
+ */
 public class PointParser extends ShapeParser {
 
     /**
-     * create a parser that can abstract a Point from input source with given GeometryFactory
-     * @param geometryFactory
+     * create a parser that can abstract a Point from input source with given GeometryFactory.
+     *
+     * @param geometryFactory the geometry factory
      */
     public PointParser(GeometryFactory geometryFactory) {
         super(geometryFactory);
     }
 
     /**
-     * abstract a Point shape
-     * @param reader
-     * @return
-     * @throws IOException
+     * abstract a Point shape.
+     *
+     * @param reader the reader
+     * @return the geometry
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     @Override
     public Geometry parserShape(ShapeReader reader) throws IOException {
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PolyLineParser.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PolyLineParser.java
index 83c19bd..c142f06 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PolyLineParser.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PolyLineParser.java
@@ -13,21 +13,27 @@
 
 import java.io.IOException;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PolyLineParser.
+ */
 public class PolyLineParser extends ShapeParser{
 
     /**
-     * create a parser that can abstract a MultiPolyline from input source with given GeometryFactory
-     * @param geometryFactory
+     * create a parser that can abstract a MultiPolyline from input source with given GeometryFactory.
+     *
+     * @param geometryFactory the geometry factory
      */
     public PolyLineParser(GeometryFactory geometryFactory) {
         super(geometryFactory);
     }
 
     /**
-     * abstract a Polyline shape
-     * @param reader
-     * @return
-     * @throws IOException
+     * abstract a Polyline shape.
+     *
+     * @param reader the reader
+     * @return the geometry
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     @Override
     public Geometry parserShape(ShapeReader reader) throws IOException {
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PolygonParser.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PolygonParser.java
index 8a8a7f0..56d28ec 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PolygonParser.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/PolygonParser.java
@@ -14,21 +14,27 @@
 import java.util.ArrayList;
 import java.util.List;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class PolygonParser.
+ */
 public class PolygonParser extends ShapeParser{
 
     /**
-     * create a parser that can abstract a Polygon from input source with given GeometryFactory
-     * @param geometryFactory
+     * create a parser that can abstract a Polygon from input source with given GeometryFactory.
+     *
+     * @param geometryFactory the geometry factory
      */
     public PolygonParser(GeometryFactory geometryFactory) {
         super(geometryFactory);
     }
 
     /**
-     * abstract abstract a Polygon shape
-     * @param reader
-     * @return
-     * @throws IOException
+     * abstract abstract a Polygon shape.
+     *
+     * @param reader the reader
+     * @return the geometry
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     @Override
     public Geometry parserShape(ShapeReader reader) throws IOException {
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeFileConst.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeFileConst.java
index a6aa2ae..3c070c4 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeFileConst.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeFileConst.java
@@ -13,19 +13,33 @@
 import java.util.HashMap;
 import java.util.Map;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Interface ShapeFileConst.
+ */
 public interface ShapeFileConst {
 
     /**
      * Consts for .shp file
      */
     public static final int EXPECT_FILE_CODE = 9994;
+    
+    /** The Constant EXPECT_FILE_VERSION. */
     public static final int EXPECT_FILE_VERSION = 1000;
+    
+    /** The Constant HEAD_FILE_LENGTH_16BIT. */
     public static final int HEAD_FILE_LENGTH_16BIT = 50;
 
+    /** The Constant HEAD_EMPTY_NUM. */
     public static final int HEAD_EMPTY_NUM = 5;
+    
+    /** The Constant HEAD_BOX_NUM. */
     public static final int HEAD_BOX_NUM = 8;
 
+    /** The Constant INT_LENGTH. */
     public static final int INT_LENGTH = 4;
+    
+    /** The Constant DOUBLE_LENGTH. */
     public static final int DOUBLE_LENGTH = 8;
 
 
@@ -33,8 +47,16 @@
      * Consts for .dbf file
      */
     public static final byte FIELD_DESCRIPTOR_TERMINATOR = 0x0d;
+    
+    /** The Constant FIELD_NAME_LENGTH. */
     public static final byte FIELD_NAME_LENGTH = 11;
+    
+    /** The Constant RECORD_DELETE_FLAG. */
     public static final byte RECORD_DELETE_FLAG = 0x2A;
+    
+    /** The Constant FILE_END_FLAG. */
     public static final byte FILE_END_FLAG = 0x1A;
+    
+    /** The Constant RECORD_EXIST_FLAG. */
     public static final byte RECORD_EXIST_FLAG = 0x20;
 }
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeParser.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeParser.java
index 7a743d4..2baa73d 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeParser.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeParser.java
@@ -12,17 +12,30 @@
 import java.io.IOException;
 import java.io.Serializable;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class ShapeParser.
+ */
 public abstract class ShapeParser implements Serializable, ShapeFileConst{
 
+    /** The geometry factory. */
     protected GeometryFactory geometryFactory = null;
 
+    /**
+     * Instantiates a new shape parser.
+     *
+     * @param geometryFactory the geometry factory
+     */
     public ShapeParser(GeometryFactory geometryFactory) {
         this.geometryFactory = geometryFactory;
     }
 
     /**
-     * parse the shape to a geometry
-     * @param reader
+     * parse the shape to a geometry.
+     *
+     * @param reader the reader
+     * @return the geometry
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public abstract Geometry parserShape(ShapeReader reader) throws IOException;
 
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeReader.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeReader.java
index 0e6593f..56ac8ad 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeReader.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeReader.java
@@ -9,43 +9,59 @@
 import java.io.IOException;
 import java.io.Serializable;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class ShapeReader.
+ */
 public abstract class ShapeReader implements Serializable, ShapeFileConst{
 
     /**
-     * read a double from source
-     * @return
+     * read a double from source.
+     *
+     * @return the double
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public abstract double readDouble() throws IOException;
 
     /**
-     * read an integer from source
-     * @return
+     * read an integer from source.
+     *
+     * @return the int
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public abstract int readInt() throws IOException;
 
     /**
-     * fully read an array of byte from source
-     * @param bytes
+     * fully read an array of byte from source.
+     *
+     * @param bytes the bytes
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public abstract void read(byte[] bytes) throws IOException;
 
     /**
      * read len of bytes from source start at offset.
-     * @param bytes
-     * @param offset
-     * @param len
+     *
+     * @param bytes the bytes
+     * @param offset the offset
+     * @param len the len
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public abstract void read(byte[] bytes, int offset, int len) throws IOException;
 
     /**
-     * read len of bytes from source
-     * @param doubles
+     * read len of bytes from source.
+     *
+     * @param doubles the doubles
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public abstract void read(double[] doubles) throws IOException;
 
     /**
-     * skip n bytes in source
-     * @param n
+     * skip n bytes in source.
+     *
+     * @param n the n
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public abstract void skip(int n) throws IOException;
 
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeType.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeType.java
index 22bb188..7a4f0bf 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeType.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShapeType.java
@@ -11,30 +11,47 @@
 
 import java.io.Serializable;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class ShapeType.
+ */
 public class ShapeType implements Serializable{
 
+    /** The id. */
     protected final int id;
 
+    /** The Constant UNDEFINED. */
     public static final ShapeType UNDEFINED = new ShapeType(0);
 
+    /** The Constant NULL. */
     public static final ShapeType NULL = new ShapeType(0);
 
+    /** The Constant POINT. */
     public static final ShapeType POINT = new ShapeType(1);
 
+    /** The Constant POLYLINE. */
     public static final ShapeType POLYLINE = new ShapeType(3);
 
+    /** The Constant POLYGON. */
     public static final ShapeType POLYGON = new ShapeType(5);
 
+    /** The Constant MULTIPOINT. */
     public static final ShapeType MULTIPOINT = new ShapeType(8);
 
+    /**
+     * Instantiates a new shape type.
+     *
+     * @param i the i
+     */
     protected ShapeType(int i){
         id = i;
     }
 
     /**
-     * return the corresponding ShapeType instance by int id
-     * @param idx
-     * @return
+     * return the corresponding ShapeType instance by int id.
+     *
+     * @param idx the idx
+     * @return the type
      */
     public static ShapeType getType(int idx){
         ShapeType type;
@@ -61,9 +78,10 @@
     }
 
     /**
-     * generate a parser according to current shape type
-     * @param geometryFactory
-     * @return
+     * generate a parser according to current shape type.
+     *
+     * @param geometryFactory the geometry factory
+     * @return the parser
      */
     public ShapeParser getParser(GeometryFactory geometryFactory){
         ShapeParser parser = null;
@@ -87,8 +105,9 @@
     }
 
     /**
-     * return the shape type id
-     * @return
+     * return the shape type id.
+     *
+     * @return the id
      */
     public int getId() {
         return id;
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShpFileParser.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShpFileParser.java
index 416e9fd..43bfab4 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShpFileParser.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShpFileParser.java
@@ -12,24 +12,33 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class ShpFileParser.
+ */
 public class ShpFileParser implements Serializable, ShapeFileConst{
 
     /** shape type of current .shp file */
     public int currentTokenType = 0;
 
-    /** lenth of file in bytes */
+    /**  lenth of file in bytes. */
     public long fileLength = 0;
 
-    /** remain length of bytes to parse */
+    /**  remain length of bytes to parse. */
     public long remainLength = 0;
 
-    /** input reader */
+    /**  input reader. */
     ShapeReader reader = null;
 
+    /**  current boundbox. */
+    public static BoundBox boundBox = null;
+
     /**
-     * create a new shape file parser with a input source that is instance of DataInputStream
-     * @param inputStream
+     * create a new shape file parser with a input source that is instance of DataInputStream.
+     *
+     * @param inputStream the input stream
      */
     public ShpFileParser(DataInputStream inputStream) {
         reader = new DataInputStreamReader(inputStream);
@@ -37,7 +46,8 @@
 
     /**
      * extract and validate information from .shp file header
-     * @throws IOException
+     *
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public void parseShapeFileHead()
             throws IOException
@@ -48,13 +58,20 @@
         remainLength = fileLength;
         int fileVersion = EndianUtils.swapInteger(reader.readInt());
         currentTokenType = EndianUtils.swapInteger(reader.readInt());
-        reader.skip(HEAD_BOX_NUM * DOUBLE_LENGTH);
+        // if bound box is not referenced, skip it
+        if(boundBox == null) reader.skip(HEAD_BOX_NUM * DOUBLE_LENGTH);
+        else{// else assign value
+            for(int i = 0;i < HEAD_BOX_NUM; ++i){
+                boundBox.set(i, EndianUtils.swapDouble(reader.readDouble()));
+            }
+        }
     }
 
     /**
      * abstract information from record header and then copy primitive bytes data of record to a primitive record.
-     * @return
-     * @throws IOException
+     *
+     * @return the shp record
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public ShpRecord parseRecordPrimitiveContent() throws IOException{
         // get length of record content
@@ -68,9 +85,10 @@
     }
 
     /**
-     * abstract id number from record header
-     * @return
-     * @throws IOException
+     * abstract id number from record header.
+     *
+     * @return the int
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public int parseRecordHeadID() throws IOException{
         int id = reader.readInt();
@@ -79,7 +97,8 @@
 
     /**
      * get current progress of parsing records.
-     * @return
+     *
+     * @return the progress
      */
     public float getProgress(){
         return 1 - (float)remainLength / (float) fileLength;
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShpParseUtil.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShpParseUtil.java
index 47b8234..db20529 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShpParseUtil.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/ShpParseUtil.java
@@ -11,15 +11,20 @@
 import com.vividsolutions.jts.geom.GeometryFactory;
 import java.io.IOException;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class ShpParseUtil.
+ */
 public class ShpParseUtil implements ShapeFileConst {
 
     /**
      * read numPoints of coordinates from input source.
-     * @param reader
-     * @param numPoints
-     * @param geometryFactory
-     * @return
-     * @throws IOException
+     *
+     * @param reader the reader
+     * @param numPoints the num points
+     * @param geometryFactory the geometry factory
+     * @return the coordinate sequence
+     * @throws IOException Signals that an I/O exception has occurred.
      */
     public static CoordinateSequence readCoordinates(ShapeReader reader, int numPoints, GeometryFactory geometryFactory) throws IOException {
         CoordinateSequence coordinateSequence = geometryFactory.getCoordinateSequenceFactory().create(numPoints,2);
@@ -34,9 +39,10 @@
 
     /**
      * returns true if testPoint is a point in the pointList list.
-     * @param testPoint
-     * @param pointList
-     * @return
+     *
+     * @param testPoint the test point
+     * @param pointList the point list
+     * @return true, if successful
      */
     public static boolean pointInList(Coordinate testPoint, Coordinate[] pointList) {
         Coordinate p;
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/TypeUnknownException.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/TypeUnknownException.java
index c399286..adf5e86 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/TypeUnknownException.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/parseUtils/shp/TypeUnknownException.java
@@ -6,11 +6,16 @@
  */
 package org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp;
 
+// TODO: Auto-generated Javadoc
+/**
+ * The Class TypeUnknownException.
+ */
 public class TypeUnknownException extends Exception{
 
     /**
      * create an exception indicates that the shape type number we get from .shp file is valid
-     * @param typeID
+     *
+     * @param typeID the type ID
      */
     public TypeUnknownException(int typeID) {
         super("Unknown shape type " + typeID);
diff --git a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/shapes/CombineShapeReader.java b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/shapes/CombineShapeReader.java
index 5d3ed4b..b9df53c 100644
--- a/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/shapes/CombineShapeReader.java
+++ b/core/src/main/java/org/datasyslab/geospark/formatMapper/shapefileParser/shapes/CombineShapeReader.java
@@ -13,6 +13,7 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.datasyslab.geospark.formatMapper.shapefileParser.parseUtils.shp.ShapeType;
 
 import java.io.IOException;
 import java.util.List;
@@ -85,12 +86,16 @@
     }
 
     public boolean nextKeyValue() throws IOException, InterruptedException {
-        //only .shp mode
-        if(!hasDbf) return shapeFileReader.nextKeyValue();
-        // with dbf, check consistency first
+
         boolean hasNextShp = shapeFileReader.nextKeyValue();
-        hasNextDbf = dbfFileReader.nextKeyValue();
-        //some dbf records lost
+        if(hasDbf) hasNextDbf = dbfFileReader.nextKeyValue();
+        int curShapeType = shapeFileReader.getCurrentValue().getTypeID();
+        while(curShapeType == ShapeType.NULL.getId() || curShapeType == ShapeType.UNDEFINED.getId()){
+            hasNextShp = shapeFileReader.nextKeyValue();
+            if(hasDbf) hasNextDbf = dbfFileReader.nextKeyValue();
+            curShapeType = shapeFileReader.getCurrentValue().getTypeID();
+        }
+        // check if records match in .shp and .dbf
         if(hasNextShp && !hasNextDbf){
             Exception e = new Exception("shape record loses attributes in .dbf file at ID=" + shapeFileReader.getCurrentKey().getIndex());
             e.printStackTrace();
diff --git a/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java b/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
index ac20a6e..e2a961d 100644
--- a/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
+++ b/core/src/main/java/org/datasyslab/geospark/spatialOperator/JoinQuery.java
@@ -83,7 +83,7 @@
                 return pairGeometry.getPolygonTuple2();
             }
         });
-        return DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinResultWithDuplicates);
+        return joinResultWithDuplicates;
     }
 
     /**
@@ -123,7 +123,7 @@
             }
         });
 
-        return DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinResultWithDuplicates);
+        return joinResultWithDuplicates;
     }
 
 
@@ -167,7 +167,7 @@
             }
         });
 
-        return DuplicatesHandler.removeDuplicatesGeometryByCircle(joinResultWithDuplicates);
+        return joinResultWithDuplicates;
     }
 
     /**
@@ -206,9 +206,9 @@
             }
         });
 
-        return DuplicatesHandler.removeDuplicatesGeometryByCircle(joinResultWithDuplicates);
+        return joinResultWithDuplicates;
     }
-	
+
     /**
      * Spatial join query.
      *
@@ -221,48 +221,36 @@
      */
     public static JavaPairRDD<Polygon, HashSet<Point>> SpatialJoinQuery(PointRDD spatialRDD,RectangleRDD queryRDD,boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
 
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
-            JavaPairRDD<Polygon, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Point>>()
-            {
-				@Override
-				public HashSet<Point> call(HashSet<Geometry> spatialObjects) throws Exception {
-					HashSet<Point> castedSpatialObjects = new HashSet<Point>();
-					Iterator spatialObjectIterator = spatialObjects.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						castedSpatialObjects.add((Point)spatialObjectIterator.next());
-					}
-					return castedSpatialObjects;
-				}
-            	
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
         else
         {
 
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
-            JavaPairRDD<Polygon, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Point>>()
-            {
-				@Override
-				public HashSet<Point> call(HashSet<Geometry> spatialObjects) throws Exception {
-					HashSet<Point> castedSpatialObjects = new HashSet<Point>();
-					Iterator spatialObjectIterator = spatialObjects.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						castedSpatialObjects.add((Point)spatialObjectIterator.next());
-					}
-					return castedSpatialObjects;
-				}
-            	
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinListResultAfterAggregation);
+        JavaPairRDD<Polygon, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Point>>()
+        {
+            @Override
+            public HashSet<Point> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<Point> castedSpatialObjects = new HashSet<Point>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((Point)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
+
+        });
+        return castedResult;
     }
-    
- 
+
+
 
     /**
      * Spatial join query.
@@ -276,48 +264,35 @@
      */
     public static JavaPairRDD<Polygon, HashSet<Polygon>> SpatialJoinQuery(RectangleRDD spatialRDD,RectangleRDD queryRDD,boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
 
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
-            JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Polygon>>()
-            {
-				@Override
-				public HashSet<Polygon> call(HashSet<Geometry> spatialObjects) throws Exception {
-					HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
-					Iterator spatialObjectIterator = spatialObjects.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						castedSpatialObjects.add((Polygon)spatialObjectIterator.next());
-					}
-					return castedSpatialObjects;
-				}
-            	
-            });
-                        
-            return castedResult;
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
         else
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
 
-            JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Polygon>>()
-            {
-				@Override
-				public HashSet<Polygon> call(HashSet<Geometry> spatialObjects) throws Exception {
-					HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
-					Iterator spatialObjectIterator = spatialObjects.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						castedSpatialObjects.add((Polygon)spatialObjectIterator.next());
-					}
-					return castedSpatialObjects;
-				}
-            	
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinListResultAfterAggregation);
+        JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Polygon>>()
+        {
+            @Override
+            public HashSet<Polygon> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((Polygon)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
+
+        });
+        return castedResult;
     }
-   
+
     /**
      * Spatial join query.
      *
@@ -329,48 +304,37 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<Polygon, HashSet<Point>> SpatialJoinQuery(PointRDD spatialRDD,PolygonRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<Polygon, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Polygon,HashSet<Geometry>>,Polygon,HashSet<Point>>()
-            {
-				@Override
-				public Tuple2<Polygon, HashSet<Point>> call(Tuple2<Polygon, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Point> castedSpatialObjects = new HashSet<Point>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Point castedSpatialObject = (Point)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Polygon,HashSet<Point>>(pairObjects._1,castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
         else
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);             
-            JavaPairRDD<Polygon, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Point>>()
-            {
-				@Override
-				public HashSet<Point> call(HashSet<Geometry> spatialObjects) throws Exception {
-					HashSet<Point> castedSpatialObjects = new HashSet<Point>();
-					Iterator spatialObjectIterator = spatialObjects.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						castedSpatialObjects.add((Point)spatialObjectIterator.next());
-					}
-					return castedSpatialObjects;
-				}
-            });
-            return castedResult;
+
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinListResultAfterAggregation);
+        JavaPairRDD<Polygon, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Point>>()
+        {
+            @Override
+            public HashSet<Point> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<Point> castedSpatialObjects = new HashSet<Point>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((Point)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
 
-   }
-   
+        });
+        return castedResult;
+    }
 
-   
+
+
     /**
      * Spatial join query.
      *
@@ -382,46 +346,34 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<Polygon, HashSet<Polygon>> SpatialJoinQuery(PolygonRDD spatialRDD,PolygonRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);             
-            JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Polygon,HashSet<Geometry>>,Polygon,HashSet<Polygon>>()
-            {
-				@Override
-				public Tuple2<Polygon, HashSet<Polygon>> call(Tuple2<Polygon, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Polygon castedSpatialObject = (Polygon)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Polygon,HashSet<Polygon>>(pairObjects._1,castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
         else
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
 
-            JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Polygon>>()
-            {
-				@Override
-				public HashSet<Polygon> call(HashSet<Geometry> spatialObjects) throws Exception {
-					HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
-					Iterator spatialObjectIterator = spatialObjects.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						castedSpatialObjects.add((Polygon)spatialObjectIterator.next());
-					}
-					return castedSpatialObjects;
-				}
-            });
-            return castedResult;
         }
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinListResultAfterAggregation);
+        JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Polygon>>()
+        {
+            @Override
+            public HashSet<Polygon> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((Polygon)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
 
-   }
+        });
+        return castedResult;
+
+    }
 
     /**
      * Spatial join query.
@@ -434,45 +386,186 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<Polygon, HashSet<LineString>> SpatialJoinQuery(LineStringRDD spatialRDD,PolygonRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<Polygon, HashSet<LineString>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Polygon,HashSet<Geometry>>,Polygon,HashSet<LineString>>()
-            {
-				@Override
-				public Tuple2<Polygon, HashSet<LineString>> call(Tuple2<Polygon, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						LineString castedSpatialObject = (LineString)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Polygon,HashSet<LineString>>(pairObjects._1,castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
         else
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);             
-            JavaPairRDD<Polygon, HashSet<LineString>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<LineString>>()
-            {
-				@Override
-				public HashSet<LineString> call(HashSet<Geometry> spatialObjects) throws Exception {
-					HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
-					Iterator spatialObjectIterator = spatialObjects.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						castedSpatialObjects.add((LineString)spatialObjectIterator.next());
-					}
-					return castedSpatialObjects;
-				}
-            });
-            return castedResult;
-        }
 
-   }
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinListResultAfterAggregation);
+        JavaPairRDD<Polygon, HashSet<LineString>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<LineString>>()
+        {
+            @Override
+            public HashSet<LineString> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((LineString)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
+
+        });
+        return castedResult;
+
+    }
+
+    public static JavaPairRDD<Polygon, HashSet<Point>> SpatialJoinQueryWithDuplicates(PointRDD spatialRDD,RectangleRDD queryRDD,boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
+        if(useIndex)
+        {
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+        }
+        else
+        {
+
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        JavaPairRDD<Polygon, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Point>>()
+        {
+            @Override
+            public HashSet<Point> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<Point> castedSpatialObjects = new HashSet<Point>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((Point)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
+
+        });
+        return castedResult;
+    }
+
+
+    public static JavaPairRDD<Polygon, HashSet<Polygon>> SpatialJoinQueryWithDuplicates(RectangleRDD spatialRDD,RectangleRDD queryRDD,boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
+        if(useIndex)
+        {
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+        }
+        else
+        {
+
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Polygon>>()
+        {
+            @Override
+            public HashSet<Polygon> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((Polygon)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
+
+        });
+        return castedResult;
+    }
+
+    public static JavaPairRDD<Polygon, HashSet<Point>> SpatialJoinQueryWithDuplicates(PointRDD spatialRDD,PolygonRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
+        if(useIndex)
+        {
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+        }
+        else
+        {
+
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        JavaPairRDD<Polygon, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Point>>()
+        {
+            @Override
+            public HashSet<Point> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<Point> castedSpatialObjects = new HashSet<Point>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((Point)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
+
+        });
+        return castedResult;
+    }
+
+
+    public static JavaPairRDD<Polygon, HashSet<Polygon>> SpatialJoinQueryWithDuplicates(PolygonRDD spatialRDD,PolygonRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
+        if(useIndex)
+        {
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+        }
+        else
+        {
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<Polygon>>()
+        {
+            @Override
+            public HashSet<Polygon> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((Polygon)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
+
+        });
+        return castedResult;
+
+    }
+
+    public static JavaPairRDD<Polygon, HashSet<LineString>> SpatialJoinQueryWithDuplicates(LineStringRDD spatialRDD,PolygonRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
+        if(useIndex)
+        {
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+        }
+        else
+        {
+
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        JavaPairRDD<Polygon, HashSet<LineString>> castedResult = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,HashSet<LineString>>()
+        {
+            @Override
+            public HashSet<LineString> call(HashSet<Geometry> spatialObjects) throws Exception {
+                HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
+                Iterator spatialObjectIterator = spatialObjects.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    castedSpatialObjects.add((LineString)spatialObjectIterator.next());
+                }
+                return castedSpatialObjects;
+            }
+
+        });
+        return castedResult;
+
+    }
+
 
     /**
      * Spatial join query count by key.
@@ -485,42 +578,29 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<Polygon, Long> SpatialJoinQueryCountByKey(SpatialRDD spatialRDD,PolygonRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);             
-            JavaPairRDD<Polygon, Long> resultCountByKey = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Polygon,HashSet<Geometry>>,Polygon,Long>()
-            {
-				@Override
-				public Tuple2<Polygon, Long> call(Tuple2<Polygon, HashSet<Geometry>> pairObjects) throws Exception {
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					long count=0;
-					while(spatialObjectIterator.hasNext())
-					{
-						Geometry castedSpatialObject = (Geometry)spatialObjectIterator.next();
-						count++;
-					}
-					return new Tuple2<Polygon,Long>(pairObjects._1,count);
-				}
-            });
-            return resultCountByKey;
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
         else
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<Polygon, Long> resultCountByKey = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,Long>()
-            {
-				@Override
-				public Long call(HashSet<Geometry> spatialObjects) throws Exception {
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
 
-					return (long) spatialObjects.size();
-				}
-            });
-            return resultCountByKey;
         }
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinListResultAfterAggregation);
+        JavaPairRDD<Polygon, Long> resultCountByKey = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,Long>()
+        {
+            @Override
+            public Long call(HashSet<Geometry> spatialObjects) throws Exception {
 
-   }
-    
-    
+                return (long) spatialObjects.size();
+            }
+        });
+        return resultCountByKey;
+    }
+
+
     /**
      * Spatial join query count by key.
      *
@@ -533,31 +613,26 @@
      */
     public static JavaPairRDD<Polygon, Long> SpatialJoinQueryCountByKey(SpatialRDD spatialRDD,RectangleRDD queryRDD,boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
 
+        JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);     
-            JavaPairRDD<Polygon, Long> resultCountByKey = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,Long>()
-            {
-				@Override
-				public Long call(HashSet<Geometry> spatialObjects) throws Exception {
-					return (long) spatialObjects.size();
-				}
-            	
-            });
-            return resultCountByKey;
+            joinListResultAfterAggregation = executeSpatialJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
         else
         {
-        	JavaPairRDD<Polygon, HashSet<Geometry>> joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);             
-            JavaPairRDD<Polygon, Long> resultCountByKey = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,Long>()
-            {
-				@Override
-				public Long call(HashSet<Geometry> spatialObjects) throws Exception {
-					return (long) spatialObjects.size();
-				}
-            });
-            return resultCountByKey;
+            joinListResultAfterAggregation = executeSpatialJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByPolygon(joinListResultAfterAggregation);
+        JavaPairRDD<Polygon, Long> resultCountByKey = joinListResultAfterAggregation.mapValues(new Function<HashSet<Geometry>,Long>()
+        {
+            @Override
+            public Long call(HashSet<Geometry> spatialObjects) throws Exception {
+
+                return (long) spatialObjects.size();
+            }
+        });
+        return resultCountByKey;
     }
 
     /**
@@ -571,50 +646,34 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<Polygon, HashSet<Polygon>> DistanceJoinQuery(PolygonRDD spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Polygon,HashSet<Polygon>>()
-            {
-				@Override
-				public Tuple2<Polygon, HashSet<Polygon>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Polygon castedSpatialObject = (Polygon)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Polygon,HashSet<Polygon>>((Polygon)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
         else
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-        	JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Polygon,HashSet<Polygon>>()
-            {
-				@Override
-				public Tuple2<Polygon, HashSet<Polygon>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Polygon castedSpatialObject = (Polygon)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Polygon,HashSet<Polygon>>((Polygon)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
-   }
-    
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByCircle(joinListResultAfterAggregation);
+        JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Polygon,HashSet<Polygon>>()
+        {
+            @Override
+            public Tuple2<Polygon, HashSet<Polygon>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    Polygon castedSpatialObject = (Polygon)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<Polygon,HashSet<Polygon>>((Polygon)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
+            }
+        });
+        return castedResult;
+    }
 
-    
- 
-    
     /**
      * Distance join query.
      *
@@ -626,46 +685,34 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<Point, HashSet<Point>> DistanceJoinQuery(PointRDD spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<Point, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Point,HashSet<Point>>()
-            {
-				@Override
-				public Tuple2<Point, HashSet<Point>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Point> castedSpatialObjects = new HashSet<Point>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Point castedSpatialObject = (Point)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Point,HashSet<Point>>((Point)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
         else
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-        	JavaPairRDD<Point, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Point,HashSet<Point>>()
-            {
-				@Override
-				public Tuple2<Point, HashSet<Point>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Point> castedSpatialObjects = new HashSet<Point>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Point castedSpatialObject = (Point)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Point,HashSet<Point>>((Point)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
-   }
-    
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByCircle(joinListResultAfterAggregation);
+        JavaPairRDD<Point, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Point,HashSet<Point>>()
+        {
+            @Override
+            public Tuple2<Point, HashSet<Point>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<Point> castedSpatialObjects = new HashSet<Point>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    Point castedSpatialObject = (Point)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<Point,HashSet<Point>>((Point)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
+            }
+        });
+        return castedResult;
+    }
+
     /**
      * Distance join query.
      *
@@ -677,46 +724,123 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<LineString, HashSet<LineString>> DistanceJoinQuery(LineStringRDD spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<LineString, HashSet<LineString>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,LineString,HashSet<LineString>>()
-            {
-				@Override
-				public Tuple2<LineString, HashSet<LineString>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						LineString castedSpatialObject = (LineString)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<LineString,HashSet<LineString>>((LineString)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
         else
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-        	JavaPairRDD<LineString, HashSet<LineString>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,LineString,HashSet<LineString>>()
-            {
-				@Override
-				public Tuple2<LineString, HashSet<LineString>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						LineString castedSpatialObject = (LineString)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<LineString,HashSet<LineString>>((LineString)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
-   }
- 
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByCircle(joinListResultAfterAggregation);
+        JavaPairRDD<LineString, HashSet<LineString>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,LineString,HashSet<LineString>>()
+        {
+            @Override
+            public Tuple2<LineString, HashSet<LineString>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    LineString castedSpatialObject = (LineString)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<LineString,HashSet<LineString>>((LineString)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
+            }
+        });
+        return castedResult;
+    }
+
+    public static JavaPairRDD<Polygon, HashSet<Polygon>> DistanceJoinQueryWithDuplicates(PolygonRDD spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
+        if(useIndex)
+        {
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        else
+        {
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+        }
+        JavaPairRDD<Polygon, HashSet<Polygon>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Polygon,HashSet<Polygon>>()
+        {
+            @Override
+            public Tuple2<Polygon, HashSet<Polygon>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    Polygon castedSpatialObject = (Polygon)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<Polygon,HashSet<Polygon>>((Polygon)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
+            }
+        });
+        return castedResult;
+    }
+
+
+
+    public static JavaPairRDD<Point, HashSet<Point>> DistanceJoinQueryWithDuplicates(PointRDD spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
+        if(useIndex)
+        {
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        else
+        {
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+        }
+        JavaPairRDD<Point, HashSet<Point>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Point,HashSet<Point>>()
+        {
+            @Override
+            public Tuple2<Point, HashSet<Point>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<Point> castedSpatialObjects = new HashSet<Point>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    Point castedSpatialObject = (Point)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<Point,HashSet<Point>>((Point)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
+            }
+        });
+        return castedResult;
+    }
+
+    public static JavaPairRDD<LineString, HashSet<LineString>> DistanceJoinQueryWithDuplicates(LineStringRDD spatialRDD, CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
+        if(useIndex)
+        {
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        else
+        {
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
+        }
+        JavaPairRDD<LineString, HashSet<LineString>> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,LineString,HashSet<LineString>>()
+        {
+            @Override
+            public Tuple2<LineString, HashSet<LineString>> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    LineString castedSpatialObject = (LineString)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<LineString,HashSet<LineString>>((LineString)pairObjects._1.getCenterGeometry(),castedSpatialObjects);
+            }
+        });
+        return castedResult;
+    }
+
+
     /**
      * Distance join query count by key.
      *
@@ -728,50 +852,37 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<Polygon, Long> DistanceJoinQueryCountByKey(PolygonRDD spatialRDD,CircleRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<Polygon, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Polygon,Long>()
-            {
-				@Override
-				public Tuple2<Polygon, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Polygon castedSpatialObject = (Polygon)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Polygon,Long>((Polygon)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
         else
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-        	JavaPairRDD<Polygon, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Polygon,Long>()
-            {
-				@Override
-				public Tuple2<Polygon, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Polygon castedSpatialObject = (Polygon)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Polygon,Long>((Polygon)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
-   }
-    
+        JavaPairRDD<Polygon, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Polygon,Long>()
+        {
+            @Override
+            public Tuple2<Polygon, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<Polygon> castedSpatialObjects = new HashSet<Polygon>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    Polygon castedSpatialObject = (Polygon)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<Polygon,Long>((Polygon)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
+            }
+        });
+        return castedResult;
+    }
 
-    
- 
-    
+
+
+
+
     /**
      * Distance join query count by key.
      *
@@ -783,46 +894,34 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<Point, Long> DistanceJoinQueryCountByKey(PointRDD spatialRDD,PointRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<Point, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Point,Long>()
-            {
-				@Override
-				public Tuple2<Point, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Point> castedSpatialObjects = new HashSet<Point>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Point castedSpatialObject = (Point)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Point,Long>((Point)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
         else
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-        	JavaPairRDD<Point, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Point,Long>()
-            {
-				@Override
-				public Tuple2<Point, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<Point> castedSpatialObjects = new HashSet<Point>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						Point castedSpatialObject = (Point)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<Point,Long>((Point)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
-   }
-    
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByCircle(joinListResultAfterAggregation);
+        JavaPairRDD<Point, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,Point,Long>()
+        {
+            @Override
+            public Tuple2<Point, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<Point> castedSpatialObjects = new HashSet<Point>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    Point castedSpatialObject = (Point)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<Point,Long>((Point)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
+            }
+        });
+        return castedResult;
+    }
+
     /**
      * Distance join query count by key.
      *
@@ -834,45 +933,33 @@
      * @throws Exception the exception
      */
     public static JavaPairRDD<LineString, Long> DistanceJoinQueryCountByKey(LineStringRDD spatialRDD,LineStringRDD queryRDD, boolean useIndex,boolean considerBoundaryIntersection) throws Exception {
+        JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = null;
         if(useIndex)
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-            JavaPairRDD<LineString, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,LineString,Long>()
-            {
-				@Override
-				public Tuple2<LineString, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						LineString castedSpatialObject = (LineString)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<LineString,Long>((LineString)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinUsingIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
+
         }
         else
         {
-        	JavaPairRDD<Circle, HashSet<Geometry>> joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection); 
-        	JavaPairRDD<LineString, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,LineString,Long>()
-            {
-				@Override
-				public Tuple2<LineString, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
-					HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
-					Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
-					while(spatialObjectIterator.hasNext())
-					{
-						LineString castedSpatialObject = (LineString)spatialObjectIterator.next();
-						castedSpatialObjects.add(castedSpatialObject);
-					}
-					return new Tuple2<LineString,Long>((LineString)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
-				}
-            });
-            return castedResult;
+            joinListResultAfterAggregation = executeDistanceJoinNoIndex(spatialRDD,queryRDD,considerBoundaryIntersection);
         }
-   }
-    
+        joinListResultAfterAggregation = DuplicatesHandler.removeDuplicatesGeometryByCircle(joinListResultAfterAggregation);
+        JavaPairRDD<LineString, Long> castedResult = joinListResultAfterAggregation.mapToPair(new PairFunction<Tuple2<Circle,HashSet<Geometry>>,LineString,Long>()
+        {
+            @Override
+            public Tuple2<LineString, Long> call(Tuple2<Circle, HashSet<Geometry>> pairObjects) throws Exception {
+                HashSet<LineString> castedSpatialObjects = new HashSet<LineString>();
+                Iterator<Geometry> spatialObjectIterator = pairObjects._2.iterator();
+                while(spatialObjectIterator.hasNext())
+                {
+                    LineString castedSpatialObject = (LineString)spatialObjectIterator.next();
+                    castedSpatialObjects.add(castedSpatialObject);
+                }
+                return new Tuple2<LineString,Long>((LineString)pairObjects._1.getCenterGeometry(),(long) castedSpatialObjects.size());
+            }
+        });
+        return castedResult;
+    }
+
 }
 
diff --git a/core/src/main/java/org/datasyslab/geospark/spatialRDD/SpatialRDD.java b/core/src/main/java/org/datasyslab/geospark/spatialRDD/SpatialRDD.java
index 760d41a..f1ab5ae 100644
--- a/core/src/main/java/org/datasyslab/geospark/spatialRDD/SpatialRDD.java
+++ b/core/src/main/java/org/datasyslab/geospark/spatialRDD/SpatialRDD.java
@@ -98,7 +98,7 @@
 	 * @param targetEpsgCRSCode the target epsg CRS code
 	 * @return true, if successful
 	 */
-	protected boolean CRSTransform(String sourceEpsgCRSCode, String targetEpsgCRSCode)
+	public boolean CRSTransform(String sourceEpsgCRSCode, String targetEpsgCRSCode)
 	{
 		try {
     	CoordinateReferenceSystem sourceCRS = CRS.decode(sourceEpsgCRSCode);
diff --git a/core/src/main/scala/org/datasyslab/geospark/showcase/SpatialJoinShp.scala b/core/src/main/scala/org/datasyslab/geospark/showcase/SpatialJoinShp.scala
new file mode 100644
index 0000000..8e2ca75
--- /dev/null
+++ b/core/src/main/scala/org/datasyslab/geospark/showcase/SpatialJoinShp.scala
@@ -0,0 +1,56 @@
+package org.datasyslab.geospark.showcase
+
+import com.vividsolutions.jts.geom.Polygon
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.storage.StorageLevel
+import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileRDD
+import org.datasyslab.geospark.spatialRDD.PolygonRDD
+
+object SpatialJoinShp extends App {
+
+  def loadShapefile(path: String, numPartitions: Int = 20): PolygonRDD = {
+    val shp = new ShapefileRDD(sc, path)
+    val polygon = new PolygonRDD(shp.getPolygonRDD, StorageLevel.MEMORY_ONLY)
+    //polygon.rawSpatialRDD = polygon.rawSpatialRDD.repartition(numPartitions)
+    //polygon.analyze()
+    polygon
+  }
+
+
+
+  Logger.getLogger("org").setLevel(Level.WARN)
+  Logger.getLogger("akka").setLevel(Level.WARN)
+
+  val conf = new SparkConf().setAppName("SpatialJoinSpeciesPA").setMaster("local[4]")
+  val sc = new SparkContext(conf)
+
+  val shp1 = new ShapefileRDD(sc, "/Users/jiayu/Downloads/spark4geo_subset/wdpa")
+  val wdpa = new PolygonRDD(shp1.getPolygonRDD, StorageLevel.MEMORY_ONLY)
+
+  val shp2 = new ShapefileRDD(sc, "/Users/jiayu/Downloads/spark4geo_subset/amphib")
+  val species = new PolygonRDD(shp2.getPolygonRDD, StorageLevel.MEMORY_ONLY)
+
+  //wdpa.spatialPartitioning(GridType.QUADTREE)
+  //species.spatialPartitioning(wdpa.partitionTree)
+
+
+  val result = shp2.getShapeRDD.collect();
+
+  for( a <- 1 until result.size()){
+    println( "print..."+result.get(a).getUserData+" END");
+  }
+
+  //val query = JoinQuery.SpatialJoinQuery(wdpa, species, false, false)
+
+  println("polygon is "+shp2.getPolygonRDD.take(100).get(55))
+  println("userdata is "+wdpa.rawSpatialRDD.take(100).get(55).asInstanceOf[Polygon].getUserData)
+  println(species.rawSpatialRDD.count())
+
+
+  //val user_data_sample = JoinQuery.SpatialJoinQuery(wdpa, species, false, false).first()._1.getUserData
+  //if (user_data_sample.toString.isEmpty) println("UserData is empty") else println(user_data_sample)
+
+//  val join_result = query.rdd.map((tuple: (Polygon, util.HashSet[Polygon])) => (tuple._1, tuple._2.asScala.map(tuple._1.intersection(_).getArea)) )
+//  val intersections = join_result.collect()
+}
diff --git a/core/src/test/java/org/datasyslab/geospark/formatMapper/shapefileParser/shapes/ShapefileRDDTest.java b/core/src/test/java/org/datasyslab/geospark/formatMapper/shapefileParser/shapes/ShapefileRDDTest.java
index 890a5bd..7394122 100644
--- a/core/src/test/java/org/datasyslab/geospark/formatMapper/shapefileParser/shapes/ShapefileRDDTest.java
+++ b/core/src/test/java/org/datasyslab/geospark/formatMapper/shapefileParser/shapes/ShapefileRDDTest.java
@@ -6,6 +6,7 @@
  */
 package org.datasyslab.geospark.formatMapper.shapefileParser.shapes;
 
+import com.vividsolutions.jts.geom.*;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
@@ -18,6 +19,8 @@
 import org.geotools.data.DataStore;
 import org.geotools.data.DataStoreFinder;
 import org.geotools.data.FeatureSource;
+import org.geotools.data.shapefile.files.ShpFiles;
+import org.geotools.data.shapefile.shp.ShapefileReader;
 import org.geotools.feature.FeatureCollection;
 import org.geotools.feature.FeatureIterator;
 import org.junit.*;
@@ -25,9 +28,6 @@
 import org.opengis.feature.simple.SimpleFeatureType;
 import org.opengis.filter.Filter;
 
-import com.vividsolutions.jts.geom.Envelope;
-import com.vividsolutions.jts.geom.Geometry;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -254,12 +254,41 @@
 			// TODO Auto-generated catch block
 			e.printStackTrace();
 		}
+		assert ((Polygon) spatialRDD.rawSpatialRDD.take(1).get(0)).getUserData().equals("20\t175\t00485050\t0500000US20175\t20175\tSeward\t06\t1655865960\t2777350");
         for (Geometry geometry : shapefileRDD.getShapeRDD().collect()) {
             Assert.assertEquals(featureIterator.next(), geometry.toText());
         }
         dataStore.dispose();
     }
 
+    /**
+     * Test if parse the boundary in header correctly
+     * @throws IOException
+     */
+    @Test
+    public void testParseBoundary() throws IOException{
+        InputLocation = ShapefileRDDTest.class.getClassLoader().getResource("shapefiles/dbf").getPath();
+        // load shapefile with geotools's reader
+        ShpFiles shpFile = new ShpFiles(InputLocation + "/map.shp");
+        GeometryFactory geometryFactory = new GeometryFactory();
+        ShapefileReader gtlReader = new ShapefileReader(shpFile, false, true, geometryFactory);
+        String gtlbounds =
+                gtlReader.getHeader().minX() + ":" +
+                gtlReader.getHeader().minY() + ":" +
+                gtlReader.getHeader().maxX() + ":" +
+                gtlReader.getHeader().maxY();
+        // read shapefile by our reader
+        ShapefileRDD shapefileRDD = new ShapefileRDD(sc, InputLocation);
+        shapefileRDD.count();
+        String myBounds =
+                shapefileRDD.getBoundBox().getXMin() + ":" +
+                shapefileRDD.getBoundBox().getYMin() + ":" +
+                shapefileRDD.getBoundBox().getXMax() + ":" +
+                shapefileRDD.getBoundBox().getYMax();
+        Assert.assertEquals(gtlbounds, myBounds);
+        gtlReader.close();
+    }
+
     @AfterClass
     public static void tearDown() throws Exception {
         sc.stop();
diff --git a/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala b/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
index 7932f6c..3a23836 100644
--- a/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
+++ b/core/src/test/scala/org/datasyslab/geospark/scalaTest.scala
@@ -1,11 +1,12 @@
 package org.datasyslab.geospark
 
-import com.vividsolutions.jts.geom.{Coordinate, Envelope, GeometryFactory}
+import com.vividsolutions.jts.geom.{Coordinate, Envelope, GeometryFactory, Polygon}
 import org.apache.log4j.{Level, Logger}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.{SparkConf, SparkContext}
 import org.datasyslab.geospark.enums.{FileDataSplitter, GridType, IndexType}
 import org.datasyslab.geospark.formatMapper.EarthdataHDFPointMapper
+import org.datasyslab.geospark.formatMapper.shapefileParser.ShapefileRDD
 import org.datasyslab.geospark.spatialOperator.{JoinQuery, KNNQuery, RangeQuery}
 import org.datasyslab.geospark.spatialRDD.{CircleRDD, PointRDD, PolygonRDD}
 import org.scalatest.FunSpec