blob: 7ec141d437fc7a8b9511dcd9b2c1de1422204f72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.mongodb.geo;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.rya.indexing.GeoConstants;
import org.apache.rya.indexing.GeoIndexer;
import org.apache.rya.indexing.IndexingExpr;
import org.apache.rya.indexing.IteratorFactory;
import org.apache.rya.indexing.SearchFunction;
import org.apache.rya.indexing.StatementConstraints;
import org.apache.rya.indexing.accumulo.geo.GeoTupleSet;
import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.io.ParseException;
import com.vividsolutions.jts.io.WKTReader;
public class MongoGeoTupleSet extends ExternalTupleSet {
private Configuration conf;
private GeoIndexer geoIndexer;
private IndexingExpr filterInfo;
public MongoGeoTupleSet(IndexingExpr filterInfo, GeoIndexer geoIndexer) {
this.filterInfo = filterInfo;
this.geoIndexer = geoIndexer;
this.conf = geoIndexer.getConf();
}
@Override
public Set<String> getBindingNames() {
return filterInfo.getBindingNames();
}
public GeoTupleSet clone() {
return new GeoTupleSet(filterInfo, geoIndexer);
}
@Override
public double cardinality() {
return 0.0; // No idea how the estimate cardinality here.
}
@Override
public String getSignature() {
return "(GeoTuple Projection) " + "variables: " + Joiner.on(", ").join(this.getBindingNames()).replaceAll("\\s+", " ");
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (!(other instanceof MongoGeoTupleSet)) {
return false;
}
MongoGeoTupleSet arg = (MongoGeoTupleSet) other;
return this.filterInfo.equals(arg.filterInfo);
}
@Override
public int hashCode() {
int result = 17;
result = 31*result + filterInfo.hashCode();
return result;
}
/**
* Returns an iterator over the result set of the contained IndexingExpr.
* <p>
* Should be thread-safe (concurrent invocation {@link OfflineIterable} this
* method can be expected with some query evaluators.
*/
@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings)
throws QueryEvaluationException {
IRI funcURI = filterInfo.getFunction();
SearchFunction searchFunction = (new MongoGeoSearchFunctionFactory(conf)).getSearchFunction(funcURI);
if(filterInfo.getArguments().length > 1) {
throw new IllegalArgumentException("Index functions do not support more than two arguments.");
}
String queryText = ((Value) filterInfo.getArguments()[0]).stringValue();
return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction);
}
//returns appropriate search function for a given URI
//search functions used in GeoMesaGeoIndexer to access index
public class MongoGeoSearchFunctionFactory {
Configuration conf;
private final Map<IRI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
public MongoGeoSearchFunctionFactory(Configuration conf) {
this.conf = conf;
}
/**
* Get a {@link GeoSearchFunction} for a given URI.
*
* @param searchFunction
* @return
*/
public SearchFunction getSearchFunction(final IRI searchFunction) {
SearchFunction geoFunc = null;
try {
geoFunc = getSearchFunctionInternal(searchFunction);
} catch (QueryEvaluationException e) {
e.printStackTrace();
}
return geoFunc;
}
private SearchFunction getSearchFunctionInternal(final IRI searchFunction) throws QueryEvaluationException {
SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction);
if (sf != null) {
return sf;
} else {
throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue());
}
}
private final SearchFunction GEO_EQUALS = new SearchFunction() {
@Override
public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
StatementConstraints contraints) throws QueryEvaluationException {
try {
WKTReader reader = new WKTReader();
Geometry geometry = reader.read(queryText);
CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
geometry, contraints);
return statements;
} catch (ParseException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public String toString() {
return "GEO_EQUALS";
};
};
private final SearchFunction GEO_DISJOINT = new SearchFunction() {
@Override
public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
StatementConstraints contraints) throws QueryEvaluationException {
try {
WKTReader reader = new WKTReader();
Geometry geometry = reader.read(queryText);
CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
geometry, contraints);
return statements;
} catch (ParseException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public String toString() {
return "GEO_DISJOINT";
};
};
private final SearchFunction GEO_INTERSECTS = new SearchFunction() {
@Override
public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
StatementConstraints contraints) throws QueryEvaluationException {
try {
WKTReader reader = new WKTReader();
Geometry geometry = reader.read(queryText);
CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
geometry, contraints);
return statements;
} catch (ParseException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public String toString() {
return "GEO_INTERSECTS";
};
};
private final SearchFunction GEO_TOUCHES = new SearchFunction() {
@Override
public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
StatementConstraints contraints) throws QueryEvaluationException {
try {
WKTReader reader = new WKTReader();
Geometry geometry = reader.read(queryText);
CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
geometry, contraints);
return statements;
} catch (ParseException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public String toString() {
return "GEO_TOUCHES";
};
};
private final SearchFunction GEO_CONTAINS = new SearchFunction() {
@Override
public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
StatementConstraints contraints) throws QueryEvaluationException {
try {
WKTReader reader = new WKTReader();
Geometry geometry = reader.read(queryText);
CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
geometry, contraints);
return statements;
} catch (ParseException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public String toString() {
return "GEO_CONTAINS";
};
};
private final SearchFunction GEO_OVERLAPS = new SearchFunction() {
@Override
public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
StatementConstraints contraints) throws QueryEvaluationException {
try {
WKTReader reader = new WKTReader();
Geometry geometry = reader.read(queryText);
CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
geometry, contraints);
return statements;
} catch (ParseException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public String toString() {
return "GEO_OVERLAPS";
};
};
private final SearchFunction GEO_CROSSES = new SearchFunction() {
@Override
public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
StatementConstraints contraints) throws QueryEvaluationException {
try {
WKTReader reader = new WKTReader();
Geometry geometry = reader.read(queryText);
CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
geometry, contraints);
return statements;
} catch (ParseException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public String toString() {
return "GEO_CROSSES";
};
};
private final SearchFunction GEO_WITHIN = new SearchFunction() {
@Override
public CloseableIteration<Statement, QueryEvaluationException> performSearch(String queryText,
StatementConstraints contraints) throws QueryEvaluationException {
try {
WKTReader reader = new WKTReader();
Geometry geometry = reader.read(queryText);
CloseableIteration<Statement, QueryEvaluationException> statements = geoIndexer.queryWithin(
geometry, contraints);
return statements;
} catch (ParseException e) {
throw new QueryEvaluationException(e);
}
}
@Override
public String toString() {
return "GEO_WITHIN";
};
};
{
SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_EQUALS, GEO_EQUALS);
SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_DISJOINT, GEO_DISJOINT);
SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_INTERSECTS, GEO_INTERSECTS);
SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_TOUCHES, GEO_TOUCHES);
SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CONTAINS, GEO_CONTAINS);
SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_OVERLAPS, GEO_OVERLAPS);
SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_CROSSES, GEO_CROSSES);
SEARCH_FUNCTION_MAP.put(GeoConstants.GEO_SF_WITHIN, GEO_WITHIN);
}
}
}