RYA-236 Changes to other indexers
The GeoTemporal indexer is very closely related to the Entity
Indexer. Abstracted out some common areas.
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
index 1c5b72c..808afdf 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/TemporalTupleSet.java
@@ -4,6 +4,7 @@
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.joda.time.DateTime;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
@@ -35,7 +36,6 @@
*/
import info.aduna.iteration.CloseableIteration;
-import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
//Indexing Node for temporal expressions to be inserted into execution plan
//to delegate temporal portion of query to temporal index
@@ -111,7 +111,7 @@
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings)
throws QueryEvaluationException {
final URI funcURI = filterInfo.getFunction();
- final SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI);
+ final SearchFunction searchFunction = new TemporalSearchFunctionFactory(conf, temporalIndexer).getSearchFunction(funcURI);
if(filterInfo.getArguments().length > 1) {
throw new IllegalArgumentException("Index functions do not support more than two arguments.");
@@ -123,12 +123,14 @@
//returns appropriate search function for a given URI
//search functions used by TemporalIndexer to query Temporal Index
- private class TemporalSearchFunctionFactory {
+ public static class TemporalSearchFunctionFactory {
private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
+ private final TemporalIndexer temporalIndexer;
Configuration conf;
- public TemporalSearchFunctionFactory(final Configuration conf) {
+ public TemporalSearchFunctionFactory(final Configuration conf, final TemporalIndexer temporalIndexer) {
this.conf = conf;
+ this.temporalIndexer = temporalIndexer;
}
/**
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
index 41ae9ad..5cc1c44 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -124,6 +124,7 @@
public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater";
public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName";
+ public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo";
public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType";
public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType";
@@ -427,6 +428,7 @@
return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false);
}
+
/**
* @return The name of the Fluo Application this instance of RYA is using to
* incrementally update PCJs.
@@ -436,10 +438,12 @@
return Optional.fromNullable(conf.get(FLUO_APP_NAME));
}
+
public static boolean getUseMongo(final Configuration conf) {
return conf.getBoolean(USE_MONGO, false);
}
+
public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) {
final List<String> indexList = Lists.newArrayList();
@@ -452,6 +456,7 @@
indexList.add(MongoFreeTextIndexer.class.getName());
useFilterIndex = true;
}
+
if (getUseEntity(conf)) {
indexList.add(MongoEntityIndexer.class.getName());
optimizers.add(EntityIndexOptimizer.class.getName());
@@ -462,9 +467,9 @@
useFilterIndex = true;
}
} else {
- if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
- conf.setPcjOptimizer(PCJOptimizer.class);
- }
+ if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) {
+ conf.setPcjOptimizer(PCJOptimizer.class);
+ }
if (getUsePcjUpdaterIndex(conf)) {
indexList.add(PrecomputedJoinIndexer.class.getName());
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
index e9d6c30..fcc1c58 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
@@ -378,7 +378,6 @@
}
}
-
/**
* statements where the datetime is exactly the same as the queryInstant.
*/
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java
index 61efc91..1e6abdb 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/EntityIndexException.java
@@ -19,11 +19,12 @@
package org.apache.rya.indexing.entity;
import org.apache.rya.indexing.entity.model.TypedEntity;
+import org.apache.rya.indexing.mongodb.IndexingException;
/**
* An operation over the {@link TypedEntity} index failed to complete.
*/
-public class EntityIndexException extends Exception {
+public class EntityIndexException extends IndexingException {
private static final long serialVersionUID = 1L;
/**
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java
index 34dbf15..6f0b9ae 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/EntityStorage.java
@@ -22,12 +22,12 @@
import java.util.Set;
import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.indexing.entity.EntityIndexException;
import org.apache.rya.indexing.entity.model.Entity;
import org.apache.rya.indexing.entity.model.Property;
import org.apache.rya.indexing.entity.model.Type;
import org.apache.rya.indexing.entity.model.TypedEntity;
import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage;
import org.calrissian.mango.collect.CloseableIterator;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -37,36 +37,7 @@
* Stores and provides access to {@link Entity}s.
*/
@DefaultAnnotation(NonNull.class)
-public interface EntityStorage {
-
- /**
- * Creates a new {@link Entity} within the storage. The new Entity's subject must be unique.
- *
- * @param entity - The {@link Entity} to create. (not null)
- * @throws EntityAlreadyExistsException An {@link Entity} could not be created because one already exists for the Subject.
- * @throws EntityStorageException A problem occurred while creating the Entity.
- */
- public void create(Entity entity) throws EntityAlreadyExistsException, EntityStorageException;
-
- /**
- * Get an {@link Entity} from the storage by its subject.
- *
- * @param subject - Identifies which {@link Entity} to get. (not null)
- * @return The {@link Entity} if one exists for the subject.
- * @throws EntityStorageException A problem occurred while fetching the Entity from the storage.
- */
- public Optional<Entity> get(RyaURI subject) throws EntityStorageException;
-
- /**
- * Update the state of an {@link Entity}.
- *
- * @param old - The Entity the changes were applied to. (not null)
- * @param updated - The updated Entity to store. (not null)
- * @throws StaleUpdateException The {@code old} Entity does not match any Entities that are stored.
- * @throws EntityStorageException A problem occurred while updating the Entity within the storage.
- */
- public void update(Entity old, Entity updated) throws StaleUpdateException, EntityStorageException;
-
+public interface EntityStorage extends RyaObjectStorage<Entity> {
/**
* Search the stored {@link Entity}s that have a specific {@link Type} as
* well as the provided {@link Property} values.
@@ -80,18 +51,9 @@
public ConvertingCursor<TypedEntity> search(final Optional<RyaURI> subject, Type type, Set<Property> properties) throws EntityStorageException;
/**
- * Deletes an {@link Entity} from the storage.
- *
- * @param subject -Identifies which {@link Entity} to delete. (not null)
- * @return {@code true} if something was deleted; otherwise {@code false}.
- * @throws EntityStorageException A problem occurred while deleting from the storage.
- */
- public boolean delete(RyaURI subject) throws EntityStorageException;
-
- /**
* Indicates a problem while interacting with an {@link EntityStorage}.
*/
- public static class EntityStorageException extends EntityIndexException {
+ public static class EntityStorageException extends ObjectStorageException {
private static final long serialVersionUID = 1L;
/**
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
index 1b4681d..a71d673 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorage.java
@@ -55,19 +55,19 @@
@DefaultAnnotation(NonNull.class)
public class MongoEntityStorage implements EntityStorage {
- private static final String COLLECTION_NAME = "entity-entities";
+ protected static final String COLLECTION_NAME = "entity-entities";
private static final EntityDocumentConverter ENTITY_CONVERTER = new EntityDocumentConverter();
/**
* A client connected to the Mongo instance that hosts the Rya instance.
*/
- private final MongoClient mongo;
+ protected final MongoClient mongo;
/**
* The name of the Rya instance the {@link TypedEntity}s are for.
*/
- private final String ryaInstanceName;
+ protected final String ryaInstanceName;
/**
* Constructs an instance of {@link MongoEntityStorage}.
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
index 7da9918..84b0bdc 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/BaseEntityIndexer.java
@@ -40,10 +40,10 @@
import org.apache.rya.indexing.entity.model.Property;
import org.apache.rya.indexing.entity.model.Type;
import org.apache.rya.indexing.entity.storage.EntityStorage;
-import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
import org.apache.rya.indexing.entity.storage.TypeStorage;
import org.apache.rya.indexing.entity.storage.TypeStorage.TypeStorageException;
import org.apache.rya.indexing.entity.storage.mongo.ConvertingCursor;
+import org.apache.rya.indexing.mongodb.IndexingException;
import org.apache.rya.mongodb.MongoDBRdfConfiguration;
import org.apache.rya.mongodb.MongoSecondaryIndex;
import org.openrdf.model.URI;
@@ -98,7 +98,7 @@
for(final Entry<RyaURI, List<RyaStatement>> entry : groupedBySubject.entrySet()) {
try {
updateEntity(entry.getKey(), entry.getValue());
- } catch (final EntityStorageException e) {
+ } catch (final IndexingException e) {
throw new IOException("Failed to update the Entity index.", e);
}
}
@@ -109,8 +109,9 @@
*
* @param subject - The Subject of the {@link Entity} the statements are for. (not null)
* @param statements - Statements that the {@link Entity} will be updated with. (not null)
+ * @throws IndexingException
*/
- private void updateEntity(final RyaURI subject, final Collection<RyaStatement> statements) throws EntityStorageException {
+ private void updateEntity(final RyaURI subject, final Collection<RyaStatement> statements) throws IndexingException {
requireNonNull(subject);
requireNonNull(statements);
@@ -216,7 +217,7 @@
return Optional.of( updated.build() );
});
- } catch (final EntityStorageException e) {
+ } catch (final IndexingException e) {
throw new IOException("Failed to update the Entity index.", e);
}
}
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
index fb5e957..2edbe37 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/entity/update/EntityUpdater.java
@@ -21,14 +21,13 @@
import static java.util.Objects.requireNonNull;
import java.util.Optional;
-import java.util.function.Function;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.indexing.entity.model.Entity;
import org.apache.rya.indexing.entity.storage.EntityStorage;
-import org.apache.rya.indexing.entity.storage.EntityStorage.EntityAlreadyExistsException;
import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
-import org.apache.rya.indexing.entity.storage.EntityStorage.StaleUpdateException;
+import org.apache.rya.indexing.mongodb.update.DocumentUpdater;
+import org.apache.rya.indexing.mongodb.update.RyaObjectStorage.ObjectStorageException;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -37,7 +36,7 @@
* Performs update operations over an {@link EntityStorage}.
*/
@DefaultAnnotation(NonNull.class)
-public class EntityUpdater {
+public class EntityUpdater implements DocumentUpdater<RyaURI, Entity>{
private final EntityStorage storage;
@@ -50,73 +49,30 @@
this.storage = requireNonNull(storage);
}
- /**
- * Tries to updates the state of an {@link Entity} until the update succeeds
- * or a non-recoverable exception is thrown.
- *
- * @param subject - The Subject of the {@link Entity} that will be updated. (not null)
- * @param mutator - Performs the mutation on the old state of the Entity and returns
- * the new state of the Entity. (not null)
- * @throws EntityStorageException A non-recoverable error has caused the update to fail.
- */
- public void update(final RyaURI subject, final EntityMutator mutator) throws EntityStorageException {
- requireNonNull(subject);
- requireNonNull(mutator);
-
- // Fetch the current state of the Entity.
- boolean completed = false;
- while(!completed) {
- try {
- final Optional<Entity> old = storage.get(subject);
- final Optional<Entity> updated = mutator.apply(old);
-
- final boolean doWork = updated.isPresent();
- if(doWork) {
- if(!old.isPresent()) {
- storage.create(updated.get());
- } else {
- storage.update(old.get(), updated.get());
- }
- }
- completed = true;
- } catch(final EntityAlreadyExistsException | StaleUpdateException e) {
- // These are recoverable exceptions. Try again.
- } catch(final RuntimeException e) {
- throw new EntityStorageException("Failed to update Entity with Subject '" + subject.getData() + "'.", e);
- }
+ @Override
+ public void create(final Entity newObj) throws EntityStorageException {
+ try {
+ storage.create(newObj);
+ } catch (final ObjectStorageException e) {
+ throw new EntityStorageException(e.getMessage(), e);
}
}
- /**
- * Implementations of this interface are used to update the state of an
- * {@link Entity} in unison with a {@link EntityUpdater}.
- * </p>
- * This table describes what the updater will do depending on if an Entity
- * exists and if an updated Entity is returned.
- * </p>
- * <table border="1px">
- * <tr><th>Entity Provided</th><th>Update Returned</th><th>Effect</th></tr>
- * <tr>
- * <td>true</td>
- * <td>true</td>
- * <td>The old Entity will be updated using the returned state.</td>
- * </tr>
- * <tr>
- * <td>true</td>
- * <td>false</td>
- * <td>No work is performed.</td>
- * </tr>
- * <tr>
- * <td>false</td>
- * <td>true</td>
- * <td>A new Entity will be created using the returned state.</td>
- * </tr>
- * <tr>
- * <td>false</td>
- * <td>false</td>
- * <td>No work is performed.</td>
- * </tr>
- * </table>
- */
- public interface EntityMutator extends Function<Optional<Entity>, Optional<Entity>> { }
+ @Override
+ public void update(final Entity old, final Entity updated) throws EntityStorageException {
+ try {
+ storage.update(old, updated);
+ } catch (final ObjectStorageException e) {
+ throw new EntityStorageException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Optional<Entity> getOld(final RyaURI key) throws EntityStorageException {
+ try {
+ return storage.get(key);
+ } catch (final ObjectStorageException e) {
+ throw new EntityStorageException(e.getMessage(), e);
+ }
+ }
}
\ No newline at end of file
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
index 56070b7..2428e28 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
@@ -25,20 +25,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
-import org.openrdf.model.Literal;
-import org.openrdf.model.Statement;
-import org.openrdf.model.URI;
-import org.openrdf.query.QueryEvaluationException;
-
-import com.mongodb.DB;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-import com.mongodb.MongoClient;
-import com.mongodb.QueryBuilder;
-import com.mongodb.ServerAddress;
-
-import info.aduna.iteration.CloseableIteration;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaURI;
import org.apache.rya.api.resolver.RyaToRdfConversions;
@@ -47,6 +33,22 @@
import org.apache.rya.mongodb.MongoDBRdfConfiguration;
import org.apache.rya.mongodb.MongoDBRyaDAO;
import org.apache.rya.mongodb.MongoSecondaryIndex;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.QueryEvaluationException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.QueryBuilder;
+import com.mongodb.ServerAddress;
+import com.mongodb.WriteConcern;
+
+import info.aduna.iteration.CloseableIteration;
/**
* Secondary Indexer using MondoDB
@@ -71,15 +73,16 @@
db = this.mongoClient.getDB(dbName);
collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName());
}
-
+
@Override
- public void setClient(MongoClient client){
+ public void setClient(final MongoClient client){
this.mongoClient = client;
}
- // TODO this method is only intended to be used in testing
+ @VisibleForTesting
public void initIndexer(final Configuration conf, final MongoClient client) {
- ServerAddress address = client.getAddress();
+ setClient(client);
+ final ServerAddress address = client.getAddress();
conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE, address.getHost());
conf.set(MongoDBRdfConfiguration.MONGO_INSTANCE_PORT, Integer.toString(address.getPort()));
setConf(conf);
@@ -144,8 +147,7 @@
if (isValidPredicate && (statement.getObject() instanceof Literal)) {
final DBObject obj = storageStrategy.serialize(ryaStatement);
if (obj != null) {
- final DBObject query = storageStrategy.serialize(ryaStatement);
- collection.update(query, obj, true, false);
+ collection.insert(obj, WriteConcern.ACKNOWLEDGED);
}
}
} catch (final IllegalArgumentException e) {
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java
new file mode 100644
index 0000000..7029b45
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/IndexingException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb;
+
+/**
+ * An indexing operation over mongoDB failed to complete.
+ */
+public class IndexingException extends Exception {
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public IndexingException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public IndexingException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java
index eefcfb1..6beb6f1 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/temporal/TemporalMongoDBStorageStrategy.java
@@ -21,20 +21,21 @@
import java.util.regex.Matcher;
-import com.mongodb.BasicDBObject;
-import com.mongodb.DBCollection;
-import com.mongodb.DBObject;
-
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.indexing.TemporalInstantRfc3339;
import org.apache.rya.indexing.TemporalInterval;
import org.apache.rya.indexing.mongodb.IndexingMongoDBStorageStrategy;
+import com.mongodb.BasicDBObject;
+import com.mongodb.BasicDBObjectBuilder;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+
/**
* Defines how time based intervals/instants are stored in MongoDB.
* <p>
* Time can be stored as the following:
- * <p>
+ * <p>l
* <li><b>instant</b> {[statement], instant: TIME}</li>
* <li><b>interval</b> {[statement], start: TIME, end: TIME}</li>
* @see {@link TemporalInstantRfc3339} for how the dates are formatted.
@@ -53,16 +54,24 @@
@Override
public DBObject serialize(final RyaStatement ryaStatement) {
- final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement);
- final String objString = ryaStatement.getObject().getData();
- final Matcher match = TemporalInstantRfc3339.PATTERN.matcher(objString);
- if(match.find()) {
- final TemporalInterval date = TemporalInstantRfc3339.parseInterval(ryaStatement.getObject().getData());
- base.append(INTERVAL_START, date.getHasBeginning().getAsDateTime().toDate());
- base.append(INTERVAL_END, date.getHasEnd().getAsDateTime().toDate());
- } else {
- base.append(INSTANT, TemporalInstantRfc3339.FORMATTER.parseDateTime(objString).toDate());
- }
- return base;
+ final BasicDBObject base = (BasicDBObject) super.serialize(ryaStatement);
+ final DBObject time = getTimeValue(ryaStatement.getObject().getData());
+ base.putAll(time.toMap());
+ return base;
}
+
+ public DBObject getTimeValue(final String timeData) {
+ final Matcher match = TemporalInstantRfc3339.PATTERN.matcher(timeData);
+ final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start();
+ if(match.find()) {
+ final TemporalInterval date = TemporalInstantRfc3339.parseInterval(timeData);
+ builder.add(INTERVAL_START, date.getHasBeginning().getAsDateTime().toDate());
+ builder.add(INTERVAL_END, date.getHasEnd().getAsDateTime().toDate());
+ } else {
+ builder.add(INSTANT, TemporalInstantRfc3339.FORMATTER.parseDateTime(timeData).toDate());
+ }
+ return builder.get();
+ }
+
+
}
\ No newline at end of file
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
new file mode 100644
index 0000000..0b9db13
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/DocumentUpdater.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.update;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.apache.rya.indexing.mongodb.IndexingException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Performs an update operation on a Document in mongodb.
+ * @param <T> - The key to find the object.
+ * @param <V> - The type of object to get updated.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface DocumentUpdater<T, V> {
+ public default void update(final T key, final DocumentMutator<V> mutator) throws IndexingException {
+ requireNonNull(mutator);
+
+ // Fetch the current state of the Entity.
+ boolean completed = false;
+ while(!completed) {
+ //this cast is safe since the mutator interface is defined below to use Optional<V>
+ final Optional<V> old = getOld(key);
+ final Optional<V> updated = mutator.apply(old);
+
+ final boolean doWork = updated.isPresent();
+ if(doWork) {
+ if(!old.isPresent()) {
+ create(updated.get());
+ } else {
+ update(old.get(), updated.get());
+ }
+ }
+ completed = true;
+ }
+ }
+
+ Optional<V> getOld(T key) throws IndexingException;
+
+ void create(final V newObj) throws IndexingException;
+
+ void update(final V old, final V updated) throws IndexingException;
+
+ /**
+ * Implementations of this interface are used to update the state of a
+ * {@link DocumentUpdater#V} in unison with a {@link DocumentUpdater}.
+ * </p>
+ * This table describes what the updater will do depending on if the object
+ * exists and if an updated object is returned.
+ * </p>
+ * <table border="1px">
+ * <tr><th>Object Provided</th><th>Update Returned</th><th>Effect</th></tr>
+ * <tr>
+ * <td>true</td>
+ * <td>true</td>
+ * <td>The old Object will be updated using the returned state.</td>
+ * </tr>
+ * <tr>
+ * <td>true</td>
+ * <td>false</td>
+ * <td>No work is performed.</td>
+ * </tr>
+ * <tr>
+ * <td>false</td>
+ * <td>true</td>
+ * <td>A new Object will be created using the returned state.</td>
+ * </tr>
+ * <tr>
+ * <td>false</td>
+ * <td>false</td>
+ * <td>No work is performed.</td>
+ * </tr>
+ * </table>
+ */
+ public interface DocumentMutator<V> extends Function<Optional<V>, Optional<V>> { }
+}
\ No newline at end of file
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
new file mode 100644
index 0000000..10feb0d
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/update/RyaObjectStorage.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.update;
+
+import java.util.Optional;
+
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.mongodb.IndexingException;
+
+/**
+ * Stores and provides access to objects of type T.
+ * @param <T> - The type of object to store/access.
+ */
+public interface RyaObjectStorage<T> {
+
+ /**
+ * Creates a new {@link RyaObjectStorage#T} within the storage. The new object's subject must be unique.
+ *
+ * @param obj - The {@link RyaObjectStorage#T} to create. (not null)
+ * @throws ObjectAlreadyExistsException An Object could not be created because one already exists for the Subject.
+ * @throws ObjectStorageException A problem occurred while creating the Object.
+ */
+ public void create(T doc) throws ObjectAlreadyExistsException, ObjectStorageException;
+
+ /**
+ * Get an Object from the storage by its subject.
+ *
+ * @param subject - Identifies which Object to get. (not null)
+ * @return The Object if one exists for the subject.
+ * @throws ObjectStorageException A problem occurred while fetching the Object from the storage.
+ */
+ public Optional<T> get(RyaURI subject) throws ObjectStorageException;
+
+ /**
+ * Update the state of an {@link RyaObjectStorage#T}.
+ *
+ * @param old - The Object the changes were applied to. (not null)
+ * @param updated - The updated Object to store. (not null)
+ * @throws StaleUpdateException The {@code old} Object does not match any that are stored.
+ * @throws ObjectStorageException A problem occurred while updating the Object within the storage.
+ */
+ public void update(T old, T updated) throws StaleUpdateException, ObjectStorageException;
+
+ /**
+ * Deletes an {@link RyaObjectStorage#T} from the storage.
+ *
+ * @param subject -Identifies which {@link RyaObjectStorage#T} to delete. (not null)
+ * @return {@code true} if something was deleted; otherwise {@code false}.
+ * @throws ObjectStorageException A problem occurred while deleting from the storage.
+ */
+ public boolean delete(RyaURI subject) throws ObjectStorageException;
+
+ /**
+ * Indicates a problem while interacting with an {@link RyaObjectStorage}.
+ */
+ public static class ObjectStorageException extends IndexingException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new exception with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public ObjectStorageException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new exception with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * {@code cause} is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public ObjectStorageException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * An {@link RyaObjectStorage#T} could not be created because one already exists for the Subject.
+ */
+ public static class ObjectAlreadyExistsException extends ObjectStorageException {
+ private static final long serialVersionUID = 1L;
+
+ public ObjectAlreadyExistsException(final String message) {
+ super(message);
+ }
+
+ public ObjectAlreadyExistsException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ /**
+ * An object could not be updated because the old state does not
+ * match the current state.
+ */
+ public static class StaleUpdateException extends ObjectStorageException {
+ private static final long serialVersionUID = 1L;
+
+ public StaleUpdateException(final String message) {
+ super(message);
+ }
+
+ public StaleUpdateException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java
index d271ba0..5d26bc0 100644
--- a/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/indexing/entity/storage/mongo/MongoEntityStorageIT.java
@@ -50,7 +50,7 @@
private static final String RYA_INSTANCE_NAME = "testInstance";
@Test
- public void create_and_get() throws EntityStorageException {
+ public void create_and_get() throws Exception {
// An Entity that will be stored.
final Entity entity = Entity.builder()
.setSubject(new RyaURI("urn:GTIN-14/00012345600012"))
@@ -71,7 +71,7 @@
}
@Test
- public void can_not_create_with_same_subject() throws EntityStorageException {
+ public void can_not_create_with_same_subject() throws Exception {
// A Type that will be stored.
final Entity entity = Entity.builder()
.setSubject(new RyaURI("urn:GTIN-14/00012345600012"))
@@ -95,7 +95,7 @@
}
@Test
- public void get_noneExisting() throws EntityStorageException {
+ public void get_noneExisting() throws Exception {
// Get a Type that hasn't been created.
final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
final Optional<Entity> storedEntity = storage.get(new RyaURI("urn:GTIN-14/00012345600012"));
@@ -105,7 +105,7 @@
}
@Test
- public void delete() throws EntityStorageException {
+ public void delete() throws Exception {
// An Entity that will be stored.
final Entity entity = Entity.builder()
.setSubject(new RyaURI("urn:GTIN-14/00012345600012"))
@@ -126,7 +126,7 @@
}
@Test
- public void delete_nonExisting() throws EntityStorageException {
+ public void delete_nonExisting() throws Exception {
// Delete an Entity that has not been created.
final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
final boolean deleted = storage.delete( new RyaURI("urn:GTIN-14/00012345600012") );
@@ -305,7 +305,7 @@
}
@Test
- public void update() throws EntityStorageException {
+ public void update() throws Exception {
final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
// Store Alice in the repository.
@@ -338,7 +338,7 @@
}
@Test(expected = StaleUpdateException.class)
- public void update_stale() throws EntityStorageException {
+ public void update_stale() throws Exception {
final EntityStorage storage = new MongoEntityStorage(super.getMongoClient(), RYA_INSTANCE_NAME);
// Store Alice in the repository.
@@ -370,7 +370,7 @@
}
@Test(expected = EntityStorageException.class)
- public void update_differentSubjects() throws StaleUpdateException, EntityStorageException {
+ public void update_differentSubjects() throws Exception {
// Two objects that do not have the same Subjects.
final Entity old = Entity.builder()
.setSubject( new RyaURI("urn:SSN/111-11-1111") )
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java
index dd6ea40..8d4486f 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/OptionalConfigUtils.java
@@ -28,6 +28,8 @@
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.indexing.accumulo.ConfigUtils;
import org.apache.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import org.apache.rya.indexing.geotemporal.GeoTemporalOptimizer;
+import org.apache.rya.indexing.geotemporal.mongo.MongoGeoTemporalIndexer;
import org.apache.rya.indexing.mongodb.geo.MongoGeoIndexer;
import org.openrdf.model.URI;
@@ -46,6 +48,7 @@
public static final String GEO_NUM_PARTITIONS = "sc.geo.numPartitions";
public static final String USE_GEO = "sc.use_geo";
+ public static final String USE_GEOTEMPORAL = "sc.use_geotemporal";
public static final String USE_FREETEXT = "sc.use_freetext";
public static final String USE_TEMPORAL = "sc.use_temporal";
public static final String USE_ENTITY = "sc.use_entity";
@@ -67,6 +70,10 @@
return conf.getBoolean(USE_GEO, false);
}
+ public static boolean getUseGeoTemporal(final Configuration conf) {
+ return conf.getBoolean(USE_GEOTEMPORAL, false);
+ }
+
/**
* Retrieves the value for the geo indexer type from the config.
* @param conf the {@link Configuration}.
@@ -83,11 +90,14 @@
boolean useFilterIndex = false;
ConfigUtils.setIndexers(conf);
- for (final String index : conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS)){
- indexList.add(index);
- }
- for (final String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){
- optimizers.add(optimizer);
+ final String[] existingIndexers = conf.getStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS);
+ if(existingIndexers != null ) {
+ for (final String index : existingIndexers) {
+ indexList.add(index);
+ }
+ for (final String optimizer : conf.getStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS)){
+ optimizers.add(optimizer);
+ }
}
final GeoIndexerType geoIndexerType = getGeoIndexerType(conf);
@@ -102,6 +112,11 @@
}
useFilterIndex = true;
}
+
+ if (getUseGeoTemporal(conf)) {
+ indexList.add(MongoGeoTemporalIndexer.class.getName());
+ optimizers.add(GeoTemporalOptimizer.class.getName());
+ }
} else {
if (getUseGeo(conf)) {
if (geoIndexerType == null) {
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java
index f77e726..d00b849 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/accumulo/geo/GeoTupleSet.java
@@ -4,6 +4,13 @@
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.indexing.GeoConstants;
+import org.apache.rya.indexing.GeoIndexer;
+import org.apache.rya.indexing.IndexingExpr;
+import org.apache.rya.indexing.IteratorFactory;
+import org.apache.rya.indexing.SearchFunction;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
import org.openrdf.model.Statement;
import org.openrdf.model.URI;
import org.openrdf.query.BindingSet;
@@ -36,13 +43,6 @@
import info.aduna.iteration.CloseableIteration;
-import org.apache.rya.indexing.GeoConstants;
-import org.apache.rya.indexing.GeoIndexer;
-import org.apache.rya.indexing.IndexingExpr;
-import org.apache.rya.indexing.IteratorFactory;
-import org.apache.rya.indexing.SearchFunction;
-import org.apache.rya.indexing.StatementConstraints;
-import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
//Indexing Node for geo expressions to be inserted into execution plan
//to delegate geo portion of query to geo index
@@ -116,7 +116,7 @@
final URI funcURI = filterInfo.getFunction();
- final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf).getSearchFunction(funcURI);
+ final SearchFunction searchFunction = new GeoSearchFunctionFactory(conf, geoIndexer).getSearchFunction(funcURI);
if(filterInfo.getArguments().length > 1) {
throw new IllegalArgumentException("Index functions do not support more than two arguments.");
}
@@ -130,14 +130,17 @@
//returns appropriate search function for a given URI
//search functions used in GeoMesaGeoIndexer to access index
- public class GeoSearchFunctionFactory {
+ public static class GeoSearchFunctionFactory {
Configuration conf;
private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap();
- public GeoSearchFunctionFactory(final Configuration conf) {
+ private final GeoIndexer geoIndexer;
+
+ public GeoSearchFunctionFactory(final Configuration conf, final GeoIndexer geoIndexer) {
this.conf = conf;
+ this.geoIndexer = geoIndexer;
}
diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
index 7069d73..8b2ebc3 100644
--- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
+++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/mongodb/geo/GeoMongoDBStorageStrategy.java
@@ -65,7 +65,7 @@
public abstract String getKeyword();
}
- static class GeoQuery {
+ public static class GeoQuery {
private final GeoQueryType queryType;
private final Geometry geo;
@@ -140,7 +140,7 @@
}
}
- private List<double[]> getCorrespondingPoints(final Geometry geo){
+ public List<double[]> getCorrespondingPoints(final Geometry geo){
final List<double[]> points = new ArrayList<double[]>();
for (final Coordinate coord : geo.getCoordinates()){
points.add(new double[] {