blob: 7f30e8c1c6f5275aaba6f92381f4c70a16a0269e [file] [log] [blame]
/**
* FILE: SpatialRDD.java
* PATH: org.datasyslab.geospark.spatialRDD.SpatialRDD.java
* Copyright (c) 2015-2017 GeoSpark Development Team
* All rights reserved.
*/
package org.datasyslab.geospark.spatialRDD;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.storage.StorageLevel;
import org.datasyslab.geospark.enums.GridType;
import org.datasyslab.geospark.enums.IndexType;
import org.datasyslab.geospark.spatialPartitioning.*;
import org.datasyslab.geospark.spatialPartitioning.quadtree.StandardQuadTree;
import org.datasyslab.geospark.utils.RDDSampleUtils;
import org.datasyslab.geospark.utils.XMaxComparator;
import org.datasyslab.geospark.utils.XMinComparator;
import org.datasyslab.geospark.utils.YMaxComparator;
import org.datasyslab.geospark.utils.YMinComparator;
import org.geotools.geometry.jts.JTS;
import org.geotools.referencing.CRS;
import org.opengis.referencing.FactoryException;
import org.opengis.referencing.crs.CoordinateReferenceSystem;
import org.opengis.referencing.operation.MathTransform;
import org.wololo.geojson.Feature;
import org.wololo.jts2geojson.GeoJSONWriter;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Envelope;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import com.vividsolutions.jts.geom.LinearRing;
import com.vividsolutions.jts.geom.Polygon;
import com.vividsolutions.jts.index.SpatialIndex;
import com.vividsolutions.jts.index.quadtree.Quadtree;
import com.vividsolutions.jts.index.strtree.STRtree;
import scala.Tuple2;
// TODO: Auto-generated Javadoc
/**
* The Class SpatialRDD.
*/
public abstract class SpatialRDD implements Serializable{
/** The Constant logger. */
final static Logger logger = Logger.getLogger(SpatialRDD.class);
/** The total number of records. */
public long approximateTotalCount=-1;
/** The boundary envelope. */
public Envelope boundaryEnvelope = null;
/** The spatial partitioned RDD. */
public JavaRDD<Object> spatialPartitionedRDD;
/** The indexed RDD. */
public JavaRDD<SpatialIndex> indexedRDD;
/** The indexed raw RDD. */
public JavaRDD<Object> indexedRawRDD;
/** The raw spatial RDD. */
public JavaRDD<Object> rawSpatialRDD;
/** The grids. */
public List<Envelope> grids;
public StandardQuadTree partitionTree;
/** The sample number. */
public Long sampleNumber = (long) -1;
public Long getSampleNumber() {
return sampleNumber;
}
/**
* Sets the sample number.
*
* @param sampleNumber the new sample number
*/
public void setSampleNumber(Long sampleNumber) {
this.sampleNumber = sampleNumber;
}
/** The CR stransformation. */
protected boolean CRStransformation=false;;
/** The source epsg code. */
protected String sourceEpsgCode="";
/** The target epgsg code. */
protected String targetEpgsgCode="";
/**
* CRS transform.
*
* @param sourceEpsgCRSCode the source epsg CRS code
* @param targetEpsgCRSCode the target epsg CRS code
* @return true, if successful
*/
public boolean CRSTransform(String sourceEpsgCRSCode, String targetEpsgCRSCode)
{
try {
CoordinateReferenceSystem sourceCRS = CRS.decode(sourceEpsgCRSCode);
CoordinateReferenceSystem targetCRS = CRS.decode(targetEpsgCRSCode);
final MathTransform transform = CRS.findMathTransform(sourceCRS, targetCRS, false);
this.CRStransformation=true;
this.sourceEpsgCode=sourceEpsgCRSCode;
this.targetEpgsgCode=targetEpsgCRSCode;
this.rawSpatialRDD = this.rawSpatialRDD.map(new Function<Object,Object>()
{
@Override
public Object call(Object originalObject) throws Exception {
return JTS.transform((Geometry)originalObject,transform);
}
});
return true;
} catch (FactoryException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return false;
}
}
/**
* Spatial partitioning.
*
* @param gridType the grid type
* @return true, if successful
* @throws Exception the exception
*/
public boolean spatialPartitioning(GridType gridType) throws Exception
{
int numPartitions = this.rawSpatialRDD.rdd().partitions().length;;
if(this.boundaryEnvelope==null)
{
throw new Exception("[AbstractSpatialRDD][spatialPartitioning] SpatialRDD boundary is null. Please call analyze() first.");
}
if(this.approximateTotalCount==-1)
{
throw new Exception("[AbstractSpatialRDD][spatialPartitioning] SpatialRDD total count is unkown. Please call analyze() first.");
}
//Calculate the number of samples we need to take.
int sampleNumberOfRecords = RDDSampleUtils.getSampleNumbers(numPartitions, this.approximateTotalCount, this.sampleNumber);
//Take Sample
ArrayList objectSampleList = new ArrayList(this.rawSpatialRDD.takeSample(false, sampleNumberOfRecords));
//Sort
if(gridType == GridType.EQUALGRID)
{
EqualPartitioning EqualPartitioning=new EqualPartitioning(this.boundaryEnvelope,numPartitions);
grids=EqualPartitioning.getGrids();
}
else if(gridType == GridType.HILBERT)
{
HilbertPartitioning hilbertPartitioning=new HilbertPartitioning(objectSampleList,this.boundaryEnvelope,numPartitions);
grids=hilbertPartitioning.getGrids();
}
else if(gridType == GridType.RTREE)
{
RtreePartitioning rtreePartitioning=new RtreePartitioning(objectSampleList,this.boundaryEnvelope,numPartitions);
grids=rtreePartitioning.getGrids();
}
else if(gridType == GridType.VORONOI)
{
VoronoiPartitioning voronoiPartitioning=new VoronoiPartitioning(objectSampleList,this.boundaryEnvelope,numPartitions);
grids=voronoiPartitioning.getGrids();
}
else if (gridType == GridType.QUADTREE) {
QuadtreePartitioning quadtreePartitioning = new QuadtreePartitioning(objectSampleList, this.boundaryEnvelope, numPartitions);
partitionTree = quadtreePartitioning.getPartitionTree();
}
else
{
throw new Exception("[AbstractSpatialRDD][spatialPartitioning] Unsupported spatial partitioning method.");
}
if(gridType == GridType.QUADTREE)
{
JavaPairRDD<Integer, Object> spatialNumberingRDD = this.rawSpatialRDD.flatMapToPair(
new PairFlatMapFunction<Object, Integer, Object>() {
@Override
public Iterator<Tuple2<Integer, Object>> call(Object spatialObject) throws Exception {
return PartitionJudgement.getPartitionID(partitionTree,spatialObject);
}
}
);
this.spatialPartitionedRDD = spatialNumberingRDD.partitionBy(new SpatialPartitioner(partitionTree.getTotalNumLeafNode())).mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer,Object>>, Object>(
) {
@Override
public Iterator<Object> call(Iterator<Tuple2<Integer, Object>> tuple2Iterator) throws Exception {
List<Object> result = new ArrayList<Object>();
while (tuple2Iterator.hasNext())
{
result.add(tuple2Iterator.next()._2());
}
return result.iterator();
}
},true);
}
else
{
JavaPairRDD<Integer, Object> spatialNumberingRDD = this.rawSpatialRDD.flatMapToPair(
new PairFlatMapFunction<Object, Integer, Object>() {
@Override
public Iterator<Tuple2<Integer, Object>> call(Object spatialObject) throws Exception {
return PartitionJudgement.getPartitionID(grids,spatialObject);
}
}
);
this.spatialPartitionedRDD = spatialNumberingRDD.partitionBy(new SpatialPartitioner(grids.size())).mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer,Object>>, Object>(
) {
@Override
public Iterator<Object> call(Iterator<Tuple2<Integer, Object>> tuple2Iterator) throws Exception {
List<Object> result = new ArrayList<Object>();
while (tuple2Iterator.hasNext())
{
result.add(tuple2Iterator.next()._2());
}
return result.iterator();
}
},true);
}
JavaRDD<Integer> partitionedResult = this.spatialPartitionedRDD.mapPartitions(new FlatMapFunction<Iterator<Object>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<Object> objectIterator) throws Exception {
List<Integer> counts = new ArrayList<>();
Integer count=0;
while (objectIterator.hasNext())
{
Object testObject = objectIterator.next();
count++;
}
counts.add(count);
return counts.iterator();
}
}, true);
return true;
}
/**
* Spatial partitioning.
*
* @param otherGrids the other grids
* @return true, if successful
* @throws Exception the exception
*/
public boolean spatialPartitioning(final List<Envelope> otherGrids) throws Exception
{
JavaPairRDD<Integer, Object> spatialNumberingRDD = this.rawSpatialRDD.flatMapToPair(
new PairFlatMapFunction<Object, Integer, Object>() {
@Override
public Iterator<Tuple2<Integer, Object>> call(Object spatialObject) throws Exception {
return PartitionJudgement.getPartitionID(otherGrids,spatialObject);
}
}
);
this.grids = otherGrids;
this.spatialPartitionedRDD = spatialNumberingRDD.partitionBy(new SpatialPartitioner(grids.size())).mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer,Object>>, Object>(
) {
@Override
public Iterator<Object> call(Iterator<Tuple2<Integer, Object>> tuple2Iterator) throws Exception {
List<Object> result = new ArrayList<Object>();
while (tuple2Iterator.hasNext())
{
result.add(tuple2Iterator.next()._2());
}
return result.iterator();
}
},true);
JavaRDD<Integer> partitionedResult = this.spatialPartitionedRDD.mapPartitions(new FlatMapFunction<Iterator<Object>, Integer>(
) {
@Override
public Iterator<Integer> call(Iterator<Object> objectIterator) throws Exception {
List<Integer> counts = new ArrayList<>();
Integer count=0;
while (objectIterator.hasNext())
{
Object testObject = objectIterator.next();
count++;
}
counts.add(count);
return counts.iterator();
}
}, true);
return true;
}
public boolean spatialPartitioning(final StandardQuadTree partitionTree) throws Exception {
this.spatialPartitionedRDD = this.rawSpatialRDD.flatMapToPair(
new PairFlatMapFunction<Object, Integer, Object>() {
@Override
public Iterator<Tuple2<Integer, Object>> call(Object spatialObject) throws Exception {
return PartitionJudgement.getPartitionID(partitionTree, spatialObject);
}
}
).partitionBy(new SpatialPartitioner(partitionTree.getTotalNumLeafNode())).mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer,Object>>, Object>(
) {
@Override
public Iterator<Object> call(Iterator<Tuple2<Integer, Object>> tuple2Iterator) throws Exception {
List<Object> result = new ArrayList<Object>();
while (tuple2Iterator.hasNext())
{
result.add(tuple2Iterator.next()._2());
}
return result.iterator();
}
},true);
this.partitionTree = partitionTree;
return true;
}
/**
* Count without duplicates.
*
* @return the long
*/
public long countWithoutDuplicates()
{
List collectedResult = this.rawSpatialRDD.collect();
HashSet resultWithoutDuplicates = new HashSet();
for(int i=0;i<collectedResult.size();i++)
{
resultWithoutDuplicates.add(collectedResult.get(i));
}
return resultWithoutDuplicates.size();
}
/**
* Count without duplicates SPRDD.
*
* @return the long
*/
public long countWithoutDuplicatesSPRDD()
{
JavaRDD cleanedRDD = this.spatialPartitionedRDD;
List collectedResult = cleanedRDD.collect();
HashSet resultWithoutDuplicates = new HashSet();
for(int i=0;i<collectedResult.size();i++)
{
resultWithoutDuplicates.add(collectedResult.get(i));
}
return resultWithoutDuplicates.size();
}
/**
* Builds the index.
*
* @param indexType the index type
* @param buildIndexOnSpatialPartitionedRDD the build index on spatial partitioned RDD
* @throws Exception the exception
*/
public void buildIndex(final IndexType indexType,boolean buildIndexOnSpatialPartitionedRDD) throws Exception {
if (buildIndexOnSpatialPartitionedRDD==false) {
//This index is built on top of unpartitioned SRDD
this.indexedRawRDD = this.rawSpatialRDD.mapPartitions(new FlatMapFunction<Iterator<Object>,Object>()
{
@Override
public Iterator<Object> call(Iterator<Object> spatialObjects) throws Exception {
if(indexType == IndexType.RTREE)
{
STRtree rt = new STRtree();
while(spatialObjects.hasNext()){
Geometry spatialObject = (Geometry)spatialObjects.next();
rt.insert(spatialObject.getEnvelopeInternal(), spatialObject);
}
HashSet<Object> result = new HashSet<Object>();
rt.query(new Envelope(0.0,0.0,0.0,0.0));
result.add(rt);
return result.iterator();
}
else
{
Quadtree rt = new Quadtree();
while(spatialObjects.hasNext()){
Geometry spatialObject = (Geometry)spatialObjects.next();
rt.insert(spatialObject.getEnvelopeInternal(), spatialObject);
}
HashSet<Object> result = new HashSet<Object>();
rt.query(new Envelope(0.0,0.0,0.0,0.0));
result.add(rt);
return result.iterator();
}
}
});
}
else
{
if(this.spatialPartitionedRDD==null)
{
throw new Exception("[AbstractSpatialRDD][buildIndex] spatialPartitionedRDD is null. Please do spatial partitioning before build index.");
}
this.indexedRDD = this.spatialPartitionedRDD.mapPartitions(new FlatMapFunction<Iterator<Object>, SpatialIndex>() {
@Override
public Iterator<SpatialIndex> call(Iterator<Object> objectIterator) throws Exception {
if (indexType == IndexType.RTREE) {
STRtree rt = new STRtree();
while (objectIterator.hasNext()) {
Geometry spatialObject = (Geometry) objectIterator.next();
rt.insert(spatialObject.getEnvelopeInternal(), spatialObject);
}
HashSet<SpatialIndex> result = new HashSet<SpatialIndex>();
rt.query(new Envelope(0.0, 0.0, 0.0, 0.0));
result.add(rt);
return result.iterator();
} else {
Quadtree rt = new Quadtree();
while (objectIterator.hasNext()) {
Geometry spatialObject = (Geometry) objectIterator.next();
Geometry castedSpatialObject = (Geometry) spatialObject;
rt.insert(castedSpatialObject.getEnvelopeInternal(), castedSpatialObject);
}
HashSet<SpatialIndex> result = new HashSet<SpatialIndex>();
rt.query(new Envelope(0.0, 0.0, 0.0, 0.0));
result.add(rt);
return result.iterator();
}
}
});
}
}
/**
* Boundary.
*
* @return the envelope
*/
public Envelope boundary() {
Object minXEnvelope = this.rawSpatialRDD.min(new XMinComparator());
Object minYEnvelope = this.rawSpatialRDD.min(new YMinComparator());
Object maxXEnvelope = this.rawSpatialRDD.max(new XMaxComparator());
Object maxYEnvelope = this.rawSpatialRDD.max(new YMaxComparator());
Double[] boundary = new Double[4];
boundary[0] = ((Geometry) minXEnvelope).getEnvelopeInternal().getMinX();
boundary[1] = ((Geometry) minYEnvelope).getEnvelopeInternal().getMinY();
boundary[2] = ((Geometry) maxXEnvelope).getEnvelopeInternal().getMaxX();
boundary[3] = ((Geometry) maxYEnvelope).getEnvelopeInternal().getMaxY();
this.boundaryEnvelope = new Envelope(boundary[0],boundary[2],boundary[1],boundary[3]);
return this.boundaryEnvelope;
}
/**
* Gets the raw spatial RDD.
*
* @return the raw spatial RDD
*/
public JavaRDD<Object> getRawSpatialRDD() {
return rawSpatialRDD;
}
/**
* Sets the raw spatial RDD.
*
* @param rawSpatialRDD the new raw spatial RDD
*/
public void setRawSpatialRDD(JavaRDD<Object> rawSpatialRDD) {
this.rawSpatialRDD = rawSpatialRDD;
}
/**
* Analyze.
*
* @param newLevel the new level
* @return true, if successful
*/
public boolean analyze(StorageLevel newLevel)
{
this.rawSpatialRDD.persist(newLevel);
this.boundary();
this.approximateTotalCount = this.rawSpatialRDD.count();
return true;
}
/**
* Analyze.
*
* @return true, if successful
*/
public boolean analyze()
{
this.boundary();
this.approximateTotalCount = this.rawSpatialRDD.count();
return true;
}
public boolean analyze(Envelope datasetBoundary, Integer approximateTotalCount)
{
this.boundaryEnvelope = datasetBoundary;
this.approximateTotalCount = this.rawSpatialRDD.count();
return true;
}
/**
* Save as geo JSON.
*
* @param outputLocation the output location
*/
public void saveAsGeoJSON(String outputLocation) {
this.rawSpatialRDD.mapPartitions(new FlatMapFunction<Iterator<Object>, String>() {
@Override
public Iterator<String> call(Iterator<Object> iterator) throws Exception {
ArrayList<String> result = new ArrayList<String>();
GeoJSONWriter writer = new GeoJSONWriter();
while (iterator.hasNext()) {
Geometry spatialObject = (Geometry)iterator.next();
Feature jsonFeature;
if(spatialObject.getUserData()!=null)
{
Map<String,Object> userData = new HashMap<String,Object>();
userData.put("UserData", spatialObject.getUserData());
jsonFeature = new Feature(writer.write(spatialObject),userData);
}
else
{
jsonFeature = new Feature(writer.write(spatialObject),null);
}
String jsonstring = jsonFeature.toString();
result.add(jsonstring);
}
return result.iterator();
}
}).saveAsTextFile(outputLocation);
}
/**
* Minimum bounding rectangle.
*
* @return the rectangle RDD
*/
@Deprecated
public RectangleRDD MinimumBoundingRectangle() {
JavaRDD<Polygon> rectangleRDD = this.rawSpatialRDD.map(new Function<Object, Polygon>() {
public Polygon call(Object spatialObject) {
Double x1,x2,y1,y2;
LinearRing linear;
Coordinate[] coordinates = new Coordinate[5];
GeometryFactory fact = new GeometryFactory();
x1 = ((Geometry) spatialObject).getEnvelopeInternal().getMinX();
x2 = ((Geometry) spatialObject).getEnvelopeInternal().getMaxX();
y1 = ((Geometry) spatialObject).getEnvelopeInternal().getMinY();
y2 = ((Geometry) spatialObject).getEnvelopeInternal().getMaxY();
coordinates[0]=new Coordinate(x1,y1);
coordinates[1]=new Coordinate(x1,y2);
coordinates[2]=new Coordinate(x2,y2);
coordinates[3]=new Coordinate(x2,y1);
coordinates[4]=coordinates[0];
linear = fact.createLinearRing(coordinates);
Polygon polygonObject = new Polygon(linear, null, fact);
return polygonObject;
}
});
return new RectangleRDD(rectangleRDD);
}
/**
* Gets the CR stransformation.
*
* @return the CR stransformation
*/
public boolean getCRStransformation() {
return CRStransformation;
}
/**
* Gets the source epsg code.
*
* @return the source epsg code
*/
public String getSourceEpsgCode() {
return sourceEpsgCode;
}
/**
* Gets the target epgsg code.
*
* @return the target epgsg code
*/
public String getTargetEpgsgCode() {
return targetEpgsgCode;
}
}