Collection clear using version changes
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 6a93af5..ba1a924 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -41,7 +41,9 @@
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
+import org.apache.usergrid.persistence.graph.serialization.impl.GraphManagerFactoryImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
import org.apache.usergrid.persistence.index.guice.IndexModule;
import org.apache.usergrid.persistence.token.guice.TokenModule;
@@ -183,6 +185,8 @@
bind( ApplicationService.class ).to( ApplicationServiceImpl.class );
bind( StatusService.class ).to( StatusServiceImpl.class );
+
+ bind( GraphManagerFactory.class ).to(GraphManagerFactoryImpl.class);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index e192939..9435ed6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -47,6 +47,7 @@
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.apache.usergrid.persistence.entities.*;
import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
@@ -741,9 +742,9 @@
Set<String> existingCollections = new HashSet<>();
for (String existingCollection : getRelationManager( getApplication() ).getCollections()) {
- if (Application.isCustomCollectionName(existingCollection)) {
+ if (CollectionUtils.isCustomCollectionOrEntityName(existingCollection)) {
// check for correct version
- VersionedCollectionName v = CollectionVersionUtil.parseVersionedName(existingCollection);
+ VersionedCollectionName v = CollectionVersionUtils.parseVersionedName(existingCollection);
CollectionVersionManager cvm = collectionVersionManagerFactory.getInstance(
new CollectionScopeImpl(getApplication().asId(), v.getCollectionName())
);
@@ -784,7 +785,7 @@
if ( !Schema.isAssociatedEntityType( collectionName ) ) {
Long count = counts.get( APPLICATION_COLLECTION + collectionName );
- String unversionedCollectionName = CollectionVersionUtil.getBaseCollectionName(collectionName);
+ String unversionedCollectionName = CollectionVersionUtils.getBaseCollectionName(collectionName);
Map<String, Object> entry = new HashMap<String, Object>();
entry.put( "count", count != null ? count : 0 );
entry.put( "type", singularize( unversionedCollectionName ) );
@@ -825,7 +826,7 @@
StringField uniqueLookupRepairField = new StringField( propertyName, aliasType.toString());
Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields(
- Inflector.getInstance().singularize( collectionType ), Arrays.<Field>asList( uniqueLookupRepairField ), uniqueIndexRepair);
+ singularize( collectionType ), Arrays.<Field>asList( uniqueLookupRepairField ), uniqueIndexRepair);
if(fieldSetObservable == null){
@@ -849,7 +850,7 @@
}
fieldSet = ecm.getEntitiesFromFields(
- Inflector.getInstance().singularize( collectionType ),
+ singularize( collectionType ),
Collections.singletonList(uniqueLookupRepairField), uniqueIndexRepair).toBlocking().last();
}
@@ -870,7 +871,7 @@
StringField uniqueLookupRepairField = new StringField( propertyName, aliasType);
Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields(
- Inflector.getInstance().singularize( collectionType ),
+ singularize( collectionType ),
Collections.singletonList(uniqueLookupRepairField), uniqueIndexRepair);
if(fieldSetObservable == null){
@@ -895,7 +896,7 @@
}
fieldSet = ecm.getEntitiesFromFields(
- Inflector.getInstance().singularize( collectionType ),
+ singularize( collectionType ),
Collections.singletonList(uniqueLookupRepairField), uniqueIndexRepair).toBlocking().last();
}
@@ -2491,7 +2492,7 @@
final Object propertyValue ) {
//convert to a string, that's what we store
- final Id results = ecm.getIdField( Inflector.getInstance().singularize( collectionName ), new StringField(
+ final Id results = ecm.getIdField( singularize( collectionName ), new StringField(
propertyName, propertyValue.toString() ) ).toBlocking() .lastOrDefault( null );
return results;
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index c02ca7d..a8b309c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -491,7 +491,7 @@
return;
}
}
- // handles normal app collection deletes
+ // handles normal app collection delete
em.delete( itemRef );
return;
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 7ce208f..917ad5b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -101,7 +101,7 @@
* @param collectionScope
* @param collectionVersion
*/
- void queueCollectionDelete(final CollectionScope collectionScope, final String collectionVersion);
+ void queueCollectionClear(final CollectionScope collectionScope, final String collectionVersion);
/**
* current queue depth
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index e33865e..883b784 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -41,6 +41,11 @@
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
@@ -58,6 +63,7 @@
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
+import org.apache.usergrid.corepersistence.index.CollectionVersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -74,8 +80,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
/**
* TODO, this whole class is becoming a nightmare.
@@ -120,9 +124,11 @@
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
+ private final CollectionVersionManagerFactory collectionVersionManagerFactory;
private final EventBuilder eventBuilder;
private final RxTaskScheduler rxTaskScheduler;
private final AllEntityIdsObservable allEntityIdsObservable;
+ private final GraphManagerFactory graphManagerFactory;
private final Timer readTimer;
private final Timer writeTimer;
@@ -160,6 +166,8 @@
final MapManagerFactory mapManagerFactory,
final LegacyQueueFig queueFig,
final CollectionVersionFig collectionVersionFig,
+ final CollectionVersionManagerFactory collectionVersionManagerFactory,
+ final GraphManagerFactory graphManagerFactory,
final AllEntityIdsObservable allEntityIdsObservable,
@EventExecutionScheduler
final RxTaskScheduler rxTaskScheduler ) {
@@ -204,6 +212,8 @@
this.indexProcessorFig = indexProcessorFig;
this.queueFig = queueFig;
this.collectionVersionFig = collectionVersionFig;
+ this.collectionVersionManagerFactory = collectionVersionManagerFactory;
+ this.graphManagerFactory = graphManagerFactory;
this.allEntityIdsObservable = allEntityIdsObservable;
this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write");
@@ -471,9 +481,9 @@
single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
- } else if (event instanceof CollectionDeleteEvent) {
+ } else if (event instanceof CollectionClearEvent) {
- handleCollectionDelete(message);
+ handleCollectionClear(message);
} else {
@@ -483,7 +493,7 @@
if( !(event instanceof ElasticsearchIndexEvent)
&& !(event instanceof InitializeApplicationIndexEvent)
- && !(event instanceof CollectionDeleteEvent)
+ && !(event instanceof CollectionClearEvent)
&& single.isEmpty() ){
logger.warn("No index operation messages came back from event processing for eventType: {}, msgId: {}, msgBody: {}",
event.getClass().getSimpleName(), message.getMessageId(), message.getStringBody());
@@ -542,8 +552,10 @@
final Entity entity, long updatedAfter) {
- logger.trace("Offering EntityIndexEvent for {}:{}",
- entity.getId().getUuid(), entity.getId().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EntityIndexEvent for {}:{}",
+ entity.getId().getUuid(), entity.getId().getType());
+ }
offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
new EntityIdScope(applicationScope, entity.getId()), updatedAfter));
@@ -569,6 +581,9 @@
final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
final Id entityId = entityIdScope.getId();
final long updatedAfter = entityIndexEvent.getUpdatedAfter();
+ if (logger.isTraceEnabled()) {
+ logger.trace("handleEntityIndexUpdate entityId={}, type={}", entityId, entityId.getType());
+ }
final EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, entityId, updatedAfter);
@@ -584,8 +599,10 @@
final Entity entity,
final Edge newEdge) {
- logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
- newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
+ newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+ }
offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));
@@ -602,6 +619,9 @@
String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
+ if (logger.isTraceEnabled()) {
+ logger.trace("handleEdgeIndex entityId={} targetNode={}", edgeIndexEvent.getEntityId(), edgeIndexEvent.getEdge().getTargetNode());
+ }
final EntityCollectionManager ecm =
entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() );
@@ -682,7 +702,9 @@
//send to the topic so all regions index the batch
- logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId);
+ }
offerTopic( elasticsearchIndexEvent, queueType );
}
@@ -839,88 +861,95 @@
final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
- final boolean markedOnly = entityDeleteEvent.markedOnly();
if (logger.isDebugEnabled()) {
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
}
- final IndexOperationMessage indexOperationMessage = markedOnly ?
- eventBuilder.buildEntityDelete( applicationScope, entityId ) :
- eventBuilder.buildEntityDeleteAllVersions( applicationScope, entityId );
+ final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete( applicationScope, entityId );
return indexOperationMessage;
}
@Override
- public void queueCollectionDelete(final CollectionScope collectionScope, final String collectionVersion) {
+ public void queueCollectionClear(final CollectionScope collectionScope, final String collectionVersion) {
if (logger.isDebugEnabled()) {
- logger.debug("Offering CollectionDeleteEvent for application={}, collectionName={}, collectionVersion={}",
+ logger.debug("Offering CollectionClearEvent for application={}, collectionName={}, collectionVersion={}",
collectionScope.getApplication().getUuid(), collectionScope.getCollectionName(), collectionVersion);
}
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
- offer(new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
+ offer(new CollectionClearEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
AsyncEventQueueType.DELETE);
}
- private void handleCollectionDelete(final LegacyQueueMessage message) {
+ private void handleCollectionClear(final LegacyQueueMessage message) {
- Preconditions.checkNotNull(message, "Queue Message cannot be null for handleCollectionDelete");
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleCollectionClear");
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(event, "QueueMessage body cannot be null for handleCollectionDelete" );
- Preconditions.checkArgument( event instanceof CollectionDeleteEvent,
- String.format( "Event Type for handleCollectionDelete must be COLLECTION_DELETE, got %s", event.getClass() ) );
+ Preconditions.checkNotNull(event, "QueueMessage body cannot be null for handleCollectionClear" );
+ Preconditions.checkArgument( event instanceof CollectionClearEvent,
+ String.format( "Event Type for handleCollectionClear must be COLLECTION_CLEAR, got %s", event.getClass() ) );
- final CollectionDeleteEvent collectionDeleteEvent = ( CollectionDeleteEvent ) event;
- final CollectionScope collectionScope = collectionDeleteEvent.getCollectionScope();
+ final CollectionClearEvent collectionClearEvent = (CollectionClearEvent) event;
+ final CollectionScope collectionScope = collectionClearEvent.getCollectionScope();
if (collectionScope == null) {
- logger.error("CollectionDeleteEvent received with null collectionScope");
+ logger.error("CollectionClearEvent received with null collectionScope");
// ack message, nothing more to do
return;
}
final UUID applicationID = collectionScope.getApplication().getUuid();
if (applicationID == null) {
- logger.error("CollectionDeleteEvent collectionScope has null application");
+ logger.error("CollectionClearEvent collectionScope has null application");
// ack message, nothing more to do
return;
}
- String collectionVersion = collectionDeleteEvent.getCollectionVersion();
+ String collectionVersion = collectionClearEvent.getCollectionVersion();
if (collectionVersion == null) {
collectionVersion = "";
}
final ApplicationScope applicationScope = CpNamingUtils.getApplicationScope(applicationID);
- final String versionedCollectionName =
- CollectionVersionUtil.buildVersionedNameString(collectionScope.getCollectionName(),
- collectionVersion, false);
+ final String versionedCollectionName =
+ CollectionVersionUtils.buildVersionedNameString(collectionScope.getCollectionName(),
+ collectionVersion, false, false);
+ logger.info("collectionClear: versionedCollectionName:{}", versionedCollectionName);
+
+ final EntityCollectionManager ecm =
+ entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+ final GraphManager gm =
+ graphManagerFactory.createEdgeManager(applicationScope);
final AtomicInteger count = new AtomicInteger();
int maxDeletes = collectionVersionFig.getDeletesPerEvent();
if (logger.isDebugEnabled()) {
- logger.debug("handleCollectionDelete: applicationScope={} collectionName={} maxDeletes={}", applicationScope.toString(), versionedCollectionName, maxDeletes);
+ logger.debug("handleCollectionClear: applicationScope={} collectionName={} maxDeletes={}", applicationScope.toString(), versionedCollectionName, maxDeletes);
}
allEntityIdsObservable.getEdgesToEntities(Observable.just(applicationScope),
Optional.fromNullable(CpNamingUtils.getEdgeTypeFromCollectionName(versionedCollectionName.toLowerCase())), Optional.absent())
//.takeWhile(edgeScope-> count.intValue() < maxDeletes)
.take(maxDeletes)
+ .doOnNext(edgeScope -> {
+ // mark the entity for deletion
+ ecm.mark( edgeScope.getEdge().getTargetNode(), null ).mergeWith( gm.markNode( edgeScope.getEdge().getTargetNode(), CpNamingUtils.createGraphOperationTimestamp() ) ).toBlocking().last();
+ })
.doOnNext(edgeScope-> {
+ //logger.info("edgeScope sourceNode:{} targetNode:{} type:{}", edgeScope.getEdge().getSourceNode().toString(), edgeScope.getEdge().getTargetNode().toString(), edgeScope.getEdge().getType());
- offer(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
- new EntityIdScope(applicationScope, edgeScope.getEdge().getTargetNode()),false),
- AsyncEventQueueType.DELETE);
+ queueEntityDelete(applicationScope, edgeScope.getEdge().getTargetNode());
count.incrementAndGet();
}).toBlocking().lastOrDefault(null);
- logger.info("handleCollectionDelete: queued {} entity deletes for deleted collection", count.intValue());
+ logger.info("handleCollectionClear: queued {} entity deletes for cleared collection", count.intValue());
if (count.intValue() >= maxDeletes) {
- // requeue collection delete for next chunk of deletes
- offer (new CollectionDeleteEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
+ // requeue collection clear for next chunk
+ offer (new CollectionClearEvent(queueFig.getPrimaryRegion(), collectionScope, collectionVersion),
AsyncEventQueueType.DELETE);
}
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 31fcd6d..ccfb574 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -21,12 +21,14 @@
import org.apache.usergrid.corepersistence.index.CollectionVersionFig;
+import org.apache.usergrid.corepersistence.index.CollectionVersionManagerFactory;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.queue.LegacyQueueManager;
@@ -61,6 +63,8 @@
private final MapManagerFactory mapManagerFactory;
private final LegacyQueueFig queueFig;
private final CollectionVersionFig collectionVersionFig;
+ private final CollectionVersionManagerFactory collectionVersionManagerFactory;
+ private final GraphManagerFactory graphManagerFactory;
private final AllEntityIdsObservable allEntityIdsObservable;
private AsyncEventService asyncEventService;
@@ -79,6 +83,8 @@
final MapManagerFactory mapManagerFactory,
final LegacyQueueFig queueFig,
final CollectionVersionFig collectionVersionFig,
+ final CollectionVersionManagerFactory collectionVersionManagerFactory,
+ final GraphManagerFactory graphManagerFactory,
final AllEntityIdsObservable allEntityIdsObservable) {
this.indexProcessorFig = indexProcessorFig;
@@ -93,6 +99,8 @@
this.mapManagerFactory = mapManagerFactory;
this.queueFig = queueFig;
this.collectionVersionFig = collectionVersionFig;
+ this.collectionVersionManagerFactory = collectionVersionManagerFactory;
+ this.graphManagerFactory = graphManagerFactory;
this.allEntityIdsObservable = allEntityIdsObservable;
}
@@ -125,6 +133,8 @@
mapManagerFactory,
queueFig,
collectionVersionFig,
+ collectionVersionManagerFactory,
+ graphManagerFactory,
allEntityIdsObservable,
rxTaskScheduler );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionClearTooSoonException.java
similarity index 87%
rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java
rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionClearTooSoonException.java
index bd37d46..314dcfc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionDeleteTooSoonException.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/CollectionClearTooSoonException.java
@@ -19,12 +19,12 @@
package org.apache.usergrid.corepersistence.asyncevents;
-public class CollectionDeleteTooSoonException extends RuntimeException {
+public class CollectionClearTooSoonException extends RuntimeException {
private final long timeLastDeleted;
private final long timeRequiredBeforeDeleteMsec;
- public CollectionDeleteTooSoonException(final long timeLastDeleted, final long timeRequiredBeforeDeleteMsec) {
+ public CollectionClearTooSoonException(final long timeLastDeleted, final long timeRequiredBeforeDeleteMsec) {
this.timeLastDeleted = timeLastDeleted;
this.timeRequiredBeforeDeleteMsec = timeRequiredBeforeDeleteMsec;
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 081b3bc..7c68521 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -28,6 +28,7 @@
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -65,15 +66,6 @@
*/
IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId );
- /**
- * Return a bin with 2 observable streams for entity delete. This deletes all versions -- used only for an old
- * collection version. Does not require versions to be marked for deletion.
- * @param applicationScope
- * @param entityId
- * @return
- */
- IndexOperationMessage buildEntityDeleteAllVersions(ApplicationScope applicationScope, Id entityId );
-
/**
@@ -95,6 +87,30 @@
Id entityId, UUID markedVersion );
/**
+ * Get id that includes collection version.
+ */
+ Id getCollectionVersionedId(ApplicationScope applicationScope, Id id, boolean forceVersion);
+
+
+ /**
+ * Get entity that includes collection version.
+ */
+ Entity getCollectionVersionedEntity(ApplicationScope applicationScope, Entity entity, boolean forceVersion);
+
+
+ /**
+ * Get edge that includes collection version.
+ */
+ Edge getCollectionVersionedEdge(ApplicationScope applicationScope, Edge edge, boolean forceVersion);
+
+
+ /**
+ * Get edge that includes collection version.
+ */
+ SearchEdge getCollectionVersionedSearchEdge(ApplicationScope applicationScope, SearchEdge searchEdge, boolean forceVersion);
+
+
+ /**
* A bean to hold both our observables so the caller can choose the subscription mechanism. Note that
* indexOperationMessages should be subscribed and completed BEFORE the getEntitiesDeleted is subscribed
*/
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index ade6818..049cd4e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -24,12 +24,22 @@
import java.util.List;
import java.util.UUID;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.index.IndexEdge;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.impl.IndexEdgeImpl;
+import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.corepersistence.index.CollectionVersionUtils;
+import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -62,19 +72,93 @@
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final GraphManagerFactory graphManagerFactory;
private final SerializationFig serializationFig;
+ private final CollectionVersionManagerFactory collectionVersionManagerFactory;
@Inject
public EventBuilderImpl( final IndexService indexService,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig ) {
+ final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig,
+ final CollectionVersionManagerFactory collectionVersionManagerFactory) {
this.indexService = indexService;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.graphManagerFactory = graphManagerFactory;
this.serializationFig = serializationFig;
+ this.collectionVersionManagerFactory = collectionVersionManagerFactory;
}
+ @Override
+ public Id getCollectionVersionedId(ApplicationScope applicationScope, Id id, boolean forceVersion) {
+
+ String currentCollectionName = InflectionUtils.pluralize(id.getType());
+
+ // if already versioned, or not a custom (versionable) collection, we're done
+ if (!CollectionUtils.isCustomCollectionOrEntityName(currentCollectionName) ||
+ CollectionVersionUtils.isVersionedName(currentCollectionName)) {
+ return id;
+ }
+
+ CollectionVersionManager cvm = collectionVersionManagerFactory.getInstance(
+ new CollectionScopeImpl(applicationScope.getApplication(), currentCollectionName)
+ );
+ String currentCollectionVersion = cvm.getCollectionVersion(true);
+
+ String newEntityType = CollectionVersionUtils.buildVersionedNameString(id.getType(), currentCollectionVersion,
+ false, forceVersion);
+
+ return new SimpleId(id.getUuid(), newEntityType);
+ }
+
+ @Override
+ public Entity getCollectionVersionedEntity(final ApplicationScope applicationScope, final Entity entity, boolean forceVersion) {
+
+ return new Entity(getCollectionVersionedId(applicationScope, entity.getId(), forceVersion), entity.getVersion() );
+
+ }
+
+ @Override
+ public Edge getCollectionVersionedEdge(final ApplicationScope applicationScope, final Edge edge, boolean forceVersion) {
+ Edge returnEdge;
+ if (edge instanceof SimpleMarkedEdge) {
+ MarkedEdge markedEdge = (MarkedEdge)edge;
+ returnEdge = new SimpleMarkedEdge(
+ getCollectionVersionedId(applicationScope, markedEdge.getSourceNode(), forceVersion),
+ markedEdge.getType(),
+ getCollectionVersionedId(applicationScope, markedEdge.getTargetNode(), forceVersion),
+ markedEdge.getTimestamp(),
+ markedEdge.isDeleted(),
+ markedEdge.isSourceNodeDelete(),
+ markedEdge.isTargetNodeDeleted()
+ );
+ } else { // SimpleEdge
+ returnEdge = new SimpleEdge(getCollectionVersionedId(applicationScope, edge.getSourceNode(), forceVersion), edge.getType(),
+ getCollectionVersionedId(applicationScope, edge.getTargetNode(), forceVersion), edge.getTimestamp());
+ }
+
+ return returnEdge;
+ }
+
+ @Override
+ public SearchEdge getCollectionVersionedSearchEdge(final ApplicationScope applicationScope, final SearchEdge searchEdge, boolean forceVersion) {
+ SearchEdge returnSearchEdge;
+ if (searchEdge instanceof IndexEdgeImpl) {
+ IndexEdge indexEdge = (IndexEdge)searchEdge;
+ returnSearchEdge = new IndexEdgeImpl(
+ getCollectionVersionedId(applicationScope, indexEdge.getNodeId(), forceVersion),
+ indexEdge.getEdgeName(),
+ indexEdge.getNodeType(),
+ indexEdge.getTimestamp()
+ );
+ } else { // SearchEdgeImpl
+ returnSearchEdge = new SearchEdgeImpl(getCollectionVersionedId(applicationScope, searchEdge.getNodeId(), forceVersion),
+ searchEdge.getEdgeName(), searchEdge.getNodeType());
+
+ }
+
+ return returnSearchEdge;
+ }
+
@Override
public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
@@ -126,45 +210,46 @@
//Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
- private IndexOperationMessage buildEntityDeleteCommon(final ApplicationScope applicationScope, final Id entityId,
- boolean markedOnly) {
+ @Override
+ public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
if (logger.isDebugEnabled()) {
- logger.debug("Deleting entity id ({} versions) from index in app scope {} with entityId {}",
- markedOnly ? "marked" : "all", applicationScope, entityId);
+ logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}",
+ applicationScope, entityId);
}
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- //TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED)
+ MvccLogEntry mostRecentToDelete =
+ ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() )
+ .toBlocking()
+ .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
- MvccLogEntry mostRecentToDelete = markedOnly ?
- ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
- .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ) :
- ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
- .firstOrDefault( null );
+// logger.info("mostRecent stage={} entityId={} version={} state={}",
+// mostRecentToDelete.getStage().name(), mostRecentToDelete.getEntityId(),
+// mostRecentToDelete.getVersion().toString(), mostRecentToDelete.getState().name());
-
- // if only marked entities should be deleted and nothing is marked, then abort
- if(markedOnly && mostRecentToDelete == null){
+ if (mostRecentToDelete == null) {
+ logger.info("No entity versions to delete for id {}", entityId.toString());
+ }
+ // if nothing is marked, then abort
+ if(mostRecentToDelete == null){
return new IndexOperationMessage();
}
final List<MvccLogEntry> logEntries = new ArrayList<>();
Observable<MvccLogEntry> mvccLogEntryListObservable =
ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() );
- if(markedOnly){
- mvccLogEntryListObservable
- .filter(mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED);
- }
- mvccLogEntryListObservable
- .filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
- .buffer( serializationFig.getBufferSize() )
- .doOnNext( buffer -> ecm.delete( buffer ) )
- .doOnNext(mvccLogEntries -> {
- logEntries.addAll(mvccLogEntries);
- }).toBlocking().lastOrDefault(null);
+ mvccLogEntryListObservable
+ .filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
+ .buffer( serializationFig.getBufferSize() )
+ .doOnNext( buffer -> ecm.delete( buffer ) )
+ .doOnNext(mvccLogEntries -> {
+ logEntries.addAll(mvccLogEntries);
+ }).toBlocking().lastOrDefault(null);
+
+ //logger.info("logEntries size={}", logEntries.size());
IndexOperationMessage combined = new IndexOperationMessage();
@@ -186,7 +271,9 @@
// Further comments using the example of deleting "server1" from the above example.
gm.compactNode(entityId).doOnNext(markedEdge -> {
- logger.debug("Processing deleted edge for de-indexing {}", markedEdge);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing deleted edge for de-indexing {}", markedEdge);
+ }
// if the edge was for a connection where the entity to be deleted is the source node, we need to load
// the target node's versions so that all versions of connections to that entity can be de-indexed
@@ -223,17 +310,6 @@
}
@Override
- public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
- return buildEntityDeleteCommon(applicationScope, entityId, true);
- }
-
- // this deletes all versions of an entity, only used for collection delete
- @Override
- public IndexOperationMessage buildEntityDeleteAllVersions(final ApplicationScope applicationScope, final Id entityId ) {
- return buildEntityDeleteCommon(applicationScope, entityId, false);
- }
-
- @Override
public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 0ea0fdc..9e444f3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -45,7 +45,7 @@
@JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
@JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" ),
@JsonSubTypes.Type( value = DeIndexOldVersionsEvent.class, name = "deIndexOldVersionsEvent" ),
- @JsonSubTypes.Type( value = CollectionDeleteEvent.class, name = "collectionDeleteEvent" )
+ @JsonSubTypes.Type( value = CollectionClearEvent.class, name = "collectionDeleteEvent" )
} )
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionClearEvent.java
similarity index 87%
rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java
rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionClearEvent.java
index 9fc978c..8dd7d97 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/CollectionClearEvent.java
@@ -23,9 +23,9 @@
import org.apache.usergrid.corepersistence.index.CollectionScope;
/**
- * Event that will signal to queue up entity deletes for a collection delete.
+ * Event that will signal to queue up entity deletes for a collection clear.
*/
-public final class CollectionDeleteEvent extends AsyncEvent {
+public final class CollectionClearEvent extends AsyncEvent {
@JsonProperty
@@ -38,11 +38,11 @@
* Do not delete! Needed for Jackson
*/
@SuppressWarnings( "unused" )
- public CollectionDeleteEvent() {
+ public CollectionClearEvent() {
super();
}
- public CollectionDeleteEvent(String sourceRegion, CollectionScope collectionScope, String collectionVersion) {
+ public CollectionClearEvent(String sourceRegion, CollectionScope collectionScope, String collectionVersion) {
super(sourceRegion);
this.collectionScope = collectionScope;
this.collectionVersion = collectionVersion;
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index aa6a15b..db13290 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -27,33 +27,19 @@
*/
public final class EntityDeleteEvent extends AsyncEvent {
-
@JsonProperty
protected EntityIdScope entityIdScope;
- @JsonProperty
- protected boolean markedOnly;
-
public EntityDeleteEvent() {
super();
}
public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) {
- this(sourceRegion, entityIdScope, true);
- }
-
- public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope, boolean markedOnly) {
super(sourceRegion);
this.entityIdScope = entityIdScope;
- this.markedOnly = markedOnly;
}
-
public EntityIdScope getEntityIdScope() {
return entityIdScope;
}
-
- public boolean markedOnly() {
- return markedOnly;
- }
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
index ff64d6a..2717d85 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionClearServiceImpl.java
@@ -53,7 +53,7 @@
applicationID.toString(), baseCollectionName, oldVersion, collectionVersionManager.getCollectionVersion(false));
// queue up delete of old version entities
- asyncEventService.queueCollectionDelete(scope, oldVersion);
+ asyncEventService.queueCollectionClear(scope, oldVersion);
}
@Override
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
index 74acd09..5e71cec 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
@@ -51,7 +51,7 @@
public Optional<Map<String, Object>> getCollectionSettings(final String collectionName ) {
// collectionName may be a versioned collection name -- get the base name
- String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName();
+ String baseCollectionName = CollectionVersionUtils.parseVersionedName(collectionName).getCollectionName();
String settings;
@@ -80,7 +80,7 @@
public void putCollectionSettings(final String collectionName, final String collectionSchema ){
// collectionName may be a versioned collection name -- get the base name
- String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName();
+ String baseCollectionName = CollectionVersionUtils.parseVersionedName(collectionName).getCollectionName();
mapManager.putString( baseCollectionName, collectionSchema );
cache.put(scope, collectionSchema);
@@ -91,7 +91,7 @@
public void deleteCollectionSettings(final String collectionName){
// collectionName may be a versioned collection name -- get the base name
- String baseCollectionName = CollectionVersionUtil.parseVersionedName(collectionName).getCollectionName();
+ String baseCollectionName = CollectionVersionUtils.parseVersionedName(collectionName).getCollectionName();
mapManager.delete( baseCollectionName );
cache.invalidate( scope );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java
index 3bb75c7..43ad3bb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionFig.java
@@ -43,7 +43,7 @@
int getCacheTimeout();
@Key(TIME_BETWEEN_DELETES_MS)
- @Default("60000")
+ @Default("0")
long getTimeBetweenDeletes();
@Key(DELETES_PER_EVENT)
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java
index 9768a55..c3cc1ca 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManager.java
@@ -18,7 +18,7 @@
package org.apache.usergrid.corepersistence.index;
-import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException;
+import org.apache.usergrid.corepersistence.asyncevents.CollectionClearTooSoonException;
public interface CollectionVersionManager {
@@ -29,7 +29,7 @@
String getVersionedCollectionName(final boolean bypassCache);
- String updateCollectionVersion() throws CollectionDeleteTooSoonException;
+ String updateCollectionVersion() throws CollectionClearTooSoonException;
Long getTimeLastChanged();
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
index c5bb417..b467242 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionManagerImpl.java
@@ -18,7 +18,7 @@
package org.apache.usergrid.corepersistence.index;
import com.google.inject.Inject;
-import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException;
+import org.apache.usergrid.corepersistence.asyncevents.CollectionClearTooSoonException;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.slf4j.Logger;
@@ -78,19 +78,19 @@
@Override
public String getVersionedCollectionName(final boolean bypassCache) {
String collectionVersion = getCollectionVersion(bypassCache);
- return CollectionVersionUtil.buildVersionedNameString(collectionName, collectionVersion, false);
+ return CollectionVersionUtils.buildVersionedNameString(collectionName, collectionVersion, false);
}
// returns old collection version
@Override
- public String updateCollectionVersion() throws CollectionDeleteTooSoonException {
+ public String updateCollectionVersion() throws CollectionClearTooSoonException {
// check for time last changed
Long timeLastChanged = getTimeLastChanged();
long timeBetweenDeletes = collectionVersionFig.getTimeBetweenDeletes();
if (timeLastChanged != null) {
if (System.currentTimeMillis() - timeLastChanged < timeBetweenDeletes) {
// too soon
- throw new CollectionDeleteTooSoonException(timeLastChanged, timeBetweenDeletes);
+ throw new CollectionClearTooSoonException(timeLastChanged, timeBetweenDeletes);
}
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtils.java
similarity index 74%
rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java
rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtils.java
index 46e4e09..241a8ad 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtil.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionVersionUtils.java
@@ -18,13 +18,14 @@
package org.apache.usergrid.corepersistence.index;
-import com.amazonaws.util.StringUtils;
import com.google.common.base.Preconditions;
+import org.apache.usergrid.utils.StringUtils;
import java.util.regex.Pattern;
-public class CollectionVersionUtil {
- private static final String VERSIONED_NAME_SEPARATOR = "%~!~%";
+import static org.apache.usergrid.persistence.model.util.CollectionUtils.VERSIONED_NAME_SEPARATOR;
+
+public class CollectionVersionUtils {
public static VersionedCollectionName parseVersionedName(String versionedCollectionNameString) throws IllegalArgumentException {
Preconditions.checkNotNull(versionedCollectionNameString, "collection name string is required");
@@ -51,6 +52,10 @@
return parseVersionedName(versionedCollectionNameString).getCollectionName();
}
+ public static String getCollectionVersion(String versionedCollectionNameString) throws IllegalArgumentException {
+ return parseVersionedName(versionedCollectionNameString).getCollectionVersion();
+ }
+
public static boolean collectionNameHasVersion(String collectionNameString) {
try {
VersionedCollectionName parsedName = parseVersionedName(collectionNameString);
@@ -61,19 +66,34 @@
}
}
+ public static boolean isVersionedName(String name) {
+ try {
+ return name.contains(VERSIONED_NAME_SEPARATOR);
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+
public static String buildVersionedNameString(final String baseName, final String collectionVersion,
final boolean validateBaseName) throws IllegalArgumentException {
+ return buildVersionedNameString(baseName, collectionVersion, validateBaseName, false);
+ }
+
+ public static String buildVersionedNameString(final String baseName, final String collectionVersion,
+ final boolean validateBaseName, final boolean forceVersion) {
Preconditions.checkNotNull(baseName, "base name is required");
if (validateBaseName && baseName.contains(VERSIONED_NAME_SEPARATOR)) {
throw new IllegalArgumentException("Cannot build versioned name using a base name that already includes the version separator");
}
- if (collectionVersion == null || collectionVersion == "") {
+ if (!forceVersion && collectionVersion == null || collectionVersion == "") {
return baseName;
}
return baseName + VERSIONED_NAME_SEPARATOR + collectionVersion;
+
}
- public static VersionedCollectionName createVersionedName(String baseName, String collectionVersion) {
+ public static VersionedCollectionName createVersionedName(final String baseName, final String collectionVersion) {
return new VersionedCollectionNameImpl(baseName, collectionVersion);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index c8dfc31..cc99a98 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -254,6 +254,9 @@
public Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
final Id entityId, final UUID entityVersion){
+ if (logger.isTraceEnabled()) {
+ logger.trace("deIndexEdge edge={} entityId={} entityVersion={}", edge.toString(), entityId.toString(), entityVersion.toString());
+ }
final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
final EntityIndexBatch entityBatch = ei.createBatch();
entityBatch.deindex(generateScopeFromSource( edge ), entityId, entityVersion);
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 7770436..2da67f7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -25,6 +25,7 @@
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.DistanceField;
import org.apache.usergrid.persistence.model.field.DoubleField;
import org.apache.usergrid.persistence.model.field.EntityObjectField;
@@ -112,7 +113,7 @@
candidates.flatMap(candidatesList -> {
Collection<SelectFieldMapping> mappings = candidatesList.get(0).getFields();
Observable<EntitySet> entitySets = Observable.from(candidatesList)
- .map(candidateEntry -> candidateEntry.getCandidateResult().getId()).toList()
+ .map(candidateEntry -> (Id)new SimpleId(candidateEntry.getCandidateResult().getId(), false)).toList()
.flatMap(idList -> entityCollectionManager.load(idList));
//now we have a collection, validate our canidate set is correct.
return entitySets.map(
@@ -273,7 +274,7 @@
final CandidateResult candidateResult = candidate.getCandidateResult();
final boolean isGeo = candidateResult instanceof GeoCandidateResult;
final SearchEdge searchEdge = candidate.getSearchEdge();
- final Id candidateId = candidateResult.getId();
+ final Id candidateId = new SimpleId(candidateResult.getId(), false);
final UUID candidateVersion = candidateResult.getVersion();
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
index 0a4360f..2bea0cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
@@ -40,40 +40,6 @@
public static final String ENTITY_TYPE = "application";
- public static final String COLLECTION_USERS = "users";
-
- public static final String COLLECTION_GROUPS = "groups";
-
- public static final String COLLECTION_ASSETS = "assets";
-
- public static final String COLLECTION_ACTIVITIES = "activities";
-
- public static final String COLLECTION_EVENTS = "events";
-
- public static final String COLLECTION_FOLDERS = "folders";
-
- public static final String COLLECTION_DEVICES = "devices";
-
- public static final String COLLECTION_NOTIFICATIONS = "notifications";
-
- public static final String COLLECTION_ROLES = "roles";
-
- public static boolean isCustomCollectionName(String collectionName) {
- switch (collectionName.toLowerCase()) {
- case COLLECTION_USERS:
- case COLLECTION_GROUPS:
- case COLLECTION_ASSETS:
- case COLLECTION_ACTIVITIES:
- case COLLECTION_EVENTS:
- case COLLECTION_FOLDERS:
- case COLLECTION_DEVICES:
- case COLLECTION_NOTIFICATIONS:
- case COLLECTION_ROLES:
- return false;
- }
- return true;
- }
-
@EntityProperty(indexed = true, fulltextIndexed = false, required = true, mutable = false, aliasProperty = true,
basic = true)
protected String name;
diff --git a/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java
index dde2f4f..6929e8d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/utils/InflectionUtils.java
@@ -17,7 +17,7 @@
package org.apache.usergrid.utils;
-import org.apache.usergrid.corepersistence.index.CollectionVersionUtil;
+import org.apache.usergrid.corepersistence.index.CollectionVersionUtils;
import org.apache.usergrid.corepersistence.index.VersionedCollectionName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,17 +29,17 @@
private static VersionedCollectionName parseName(Object word) {
String name = word.toString().trim();
try {
- return CollectionVersionUtil.parseVersionedName(name);
+ return CollectionVersionUtils.parseVersionedName(name);
}
catch (Exception e) {
logger.error("parseName(): failed to parse the versioned name: {}", name);
- return CollectionVersionUtil.createVersionedName(name, "");
+ return CollectionVersionUtils.createVersionedName(name, "");
}
}
private static String getVersionedName(String name, String version) {
try {
- return CollectionVersionUtil.buildVersionedNameString(name, version, true);
+ return CollectionVersionUtils.buildVersionedNameString(name, version, true);
}
catch (Exception e) {
// if versioned invalid, return name
diff --git a/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java b/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java
index 6bb44d8..4eff293 100644
--- a/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/utils/StringUtils.java
@@ -160,6 +160,10 @@
return obj instanceof String;
}
+ public static Boolean isNullOrEmpty(String s) {
+ return (s == null || s.equals(""));
+ }
+
public static String readClasspathFileAsString( String filePath ) {
try {
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
index c9bba02..9d4aca6 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
@@ -31,6 +31,7 @@
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.EsRunner;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
@@ -89,11 +90,17 @@
public CollectionVersionFig collectionVersionFig;
@Inject
+ public CollectionVersionManagerFactory collectionVersionManagerFactory;
+
+ @Inject
+ public GraphManagerFactory graphManagerFactory;
+
+ @Inject
public AllEntityIdsObservable allEntityIdsObservable;
@Override
protected AsyncEventService getAsyncEventService() {
- return new AsyncEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, collectionVersionFig, allEntityIdsObservable, rxTaskScheduler );
+ return new AsyncEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, collectionVersionFig, collectionVersionManagerFactory, graphManagerFactory, allEntityIdsObservable, rxTaskScheduler );
}
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index f47afe6..d9ee87e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -45,6 +45,7 @@
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.junit.Before;
import org.junit.Test;
@@ -205,7 +206,9 @@
assertEquals( 1, collectionResults.size() );
- assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() );
+ // with collection versioning, empty versions are included
+ assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(collectionResults.get(0).getId().getType()));
+ assertEquals(testEntity.getId().getUuid(), collectionResults.get(0).getId().getUuid());
//query until the connection edge is available
@@ -216,7 +219,9 @@
assertEquals( 1, connectionResults.size() );
- assertEquals( testEntity.getId(), connectionResults.get( 0 ).getId() );
+ // with collection versioning, empty versions are included
+ assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(collectionResults.get(0).getId().getType()));
+ assertEquals(testEntity.getId().getUuid(), collectionResults.get(0).getId().getUuid());
}
/**
@@ -294,8 +299,9 @@
assertEquals( 1, collectionResults.size() );
- assertEquals( testEntity.getId(), collectionResults.get( 0 ).getId() );
-
+ // with collection versioning, empty versions are included
+ assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(collectionResults.get(0).getId().getType()));
+ assertEquals(testEntity.getId().getUuid(), collectionResults.get(0).getId().getUuid());
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
@@ -306,7 +312,8 @@
assertEquals( 1, connectionResults.size() );
- assertEquals( testEntity.getId(), connectionResults.get( 0 ).getId() );
+ assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(collectionResults.get(0).getId().getType()));
+ assertEquals(testEntity.getId().getUuid(), collectionResults.get(0).getId().getUuid());
final SearchEdge lastConnectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( lastSearch );
@@ -318,7 +325,8 @@
assertEquals( 1, lastConnectionResults.size() );
- assertEquals( testEntity.getId(), lastConnectionResults.get( 0 ).getId() );
+ assertEquals(testEntity.getId().getType(), CollectionUtils.stripEmptyVersion(lastConnectionResults.get(0).getId().getType()));
+ assertEquals(testEntity.getId().getUuid(), lastConnectionResults.get(0).getId().getUuid());
}
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index 281f2af..e9d9e62 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -163,7 +163,7 @@
waitForRebuild( status, reIndexService );
- app.waitForQueueDrainAndRefreshIndex(5000);
+ app.waitForQueueDrainAndRefreshIndex(10000);
// ----------------- test that we can read the catherder collection and not the catshepard
@@ -172,7 +172,7 @@
}
- @Test( timeout = 120000 )
+ @Test( timeout = 240000 )
public void rebuildIndex() throws Exception {
logger.info( "Started rebuildIndex()" );
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index d22ac65..9b7566a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -147,7 +147,7 @@
public Observable<MarkedEdge> writeEdge( final Edge edge ) {
GraphValidation.validateEdge( edge );
- final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false );
+ final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false, false, false );
final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> {
@@ -178,7 +178,7 @@
public Observable<MarkedEdge> markEdge( final Edge edge ) {
GraphValidation.validateEdge( edge );
- final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
+ final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true, false, false );
final Observable<MarkedEdge> observable = Observable.just( markedEdge ).map( edge1 -> {
@@ -269,6 +269,7 @@
final Observable<MarkedEdge> nodeObservable =
Observable.just( inputNode )
.map( node -> nodeSerialization.getMaxVersion( scope, node ) )
+ //.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
.takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
//map our delete listener
.flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java
index a89cd96..23ab074 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleEdge.java
@@ -23,6 +23,7 @@
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
/**
@@ -54,6 +55,15 @@
GraphValidation.validateEdge( this );
}
+ public SimpleEdge( final Edge another, boolean includeEmptyVersion ) {
+ this(
+ new SimpleId(another.getSourceNode(), includeEmptyVersion),
+ another.getType(),
+ new SimpleId(another.getTargetNode(), includeEmptyVersion),
+ another.getTimestamp()
+ );
+ }
+
@Override
public Id getSourceNode() {
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index 9c35e2e..c244ad4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -25,6 +25,7 @@
import org.apache.usergrid.persistence.model.entity.Id;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
/**
@@ -63,8 +64,22 @@
}
- public SimpleMarkedEdge( final Edge edge, final boolean isDeleted ) {
- this( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), isDeleted );
+ public SimpleMarkedEdge( final Edge edge, final boolean isDeleted, final boolean isSourceNodeDeleted, final boolean isTargetNodeDeleted ) {
+ this(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(),
+ isDeleted, isSourceNodeDeleted, isTargetNodeDeleted);
+ }
+
+
+ public SimpleMarkedEdge( final MarkedEdge another, final boolean includeEmptyVersion ) {
+ this(
+ new SimpleId(another.getSourceNode(), includeEmptyVersion),
+ another.getType(),
+ new SimpleId(another.getTargetNode(), includeEmptyVersion),
+ another.getTimestamp(),
+ another.isDeleted(),
+ another.isSourceNodeDelete(),
+ another.isTargetNodeDeleted()
+ );
}
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java
index b915e4f..707b45f 100644
--- a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Entity.java
@@ -76,6 +76,12 @@
this.version = version;
}
+ public Entity(final Entity another, boolean includeEmptyVersion) {
+ this(new SimpleId(another.getId(), includeEmptyVersion), another.getVersion());
+ this.setFieldMap(another.getFieldMap());
+ this.setSize(another.getSize());
+ }
+
/**
* Generate a new entity with the given type and a new id
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java
index 39014ab..6e048ac 100644
--- a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/Id.java
@@ -44,6 +44,12 @@
*/
String getType();
+ /**
+ * Get the unique type for this id, using specified empty version handling
+ * @return
+ */
+ String getType(boolean includeEmptyVersion);
+
//Application -> Class "Application"
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java
index 6a45558..11bf0e4 100644
--- a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/entity/SimpleId.java
@@ -29,6 +29,8 @@
import com.fasterxml.uuid.UUIDComparator;
import com.google.common.base.Preconditions;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
+
/** @author tnine */
public class SimpleId implements Id, Serializable {
@@ -56,6 +58,11 @@
this.type = type;
}
+ public SimpleId(final Id another, boolean includeEmptyVersion) {
+ this.uuid = another.getUuid();
+ this.type = another.getType(includeEmptyVersion);
+ }
+
/**
* Create a new ID. Should only be used for new entities
@@ -78,6 +85,18 @@
}
+ @Override
+ public String getType(boolean includeEmptyVersion) {
+ String retType;
+ if (includeEmptyVersion) {
+ retType = CollectionUtils.addEmptyVersion(type);
+ } else {
+ retType = CollectionUtils.stripEmptyVersion(type);
+ }
+ return retType;
+ }
+
+
/**
* Do not delete! Needed for Jackson
diff --git a/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/CollectionUtils.java b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/CollectionUtils.java
new file mode 100644
index 0000000..bff9509
--- /dev/null
+++ b/stack/corepersistence/model/src/main/java/org/apache/usergrid/persistence/model/util/CollectionUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.model.util;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class CollectionUtils {
+ public static final String VERSIONED_NAME_SEPARATOR = "~-_~_-~";
+
+ public static final String COLLECTION_USERS = "users";
+ public static final String COLLECTION_GROUPS = "groups";
+ public static final String COLLECTION_ASSETS = "assets";
+ public static final String COLLECTION_ACTIVITIES = "activities";
+ public static final String COLLECTION_EVENTS = "events";
+ public static final String COLLECTION_FOLDERS = "folders";
+ public static final String COLLECTION_DEVICES = "devices";
+ public static final String COLLECTION_NOTIFICATIONS = "notifications";
+ public static final String COLLECTION_ROLES = "roles";
+
+ public static final String COLLECTION_ENTITY_USER = "user";
+ public static final String COLLECTION_ENTITY_GROUP = "group";
+ public static final String COLLECTION_ENTITY_ASSET = "asset";
+ public static final String COLLECTION_ENTITY_ACTIVITY = "activity";
+ public static final String COLLECTION_ENTITY_EVENT = "event";
+ public static final String COLLECTION_ENTITY_FOLDER = "folder";
+ public static final String COLLECTION_ENTITY_DEVICE = "device";
+ public static final String COLLECTION_ENTITY_NOTIFICATION = "notification";
+ public static final String COLLECTION_ENTITY_ROLE = "role";
+
+ private static final Set<String> customNames;
+
+ static {
+ customNames = new HashSet<>();
+ customNames.add(COLLECTION_USERS);
+ customNames.add(COLLECTION_GROUPS);
+ customNames.add(COLLECTION_ASSETS);
+ customNames.add(COLLECTION_ACTIVITIES);
+ customNames.add(COLLECTION_EVENTS);
+ customNames.add(COLLECTION_FOLDERS);
+ customNames.add(COLLECTION_DEVICES);
+ customNames.add(COLLECTION_NOTIFICATIONS);
+ customNames.add(COLLECTION_ROLES);
+
+ customNames.add(COLLECTION_ENTITY_USER);
+ customNames.add(COLLECTION_ENTITY_GROUP);
+ customNames.add(COLLECTION_ENTITY_ASSET);
+ customNames.add(COLLECTION_ENTITY_ACTIVITY);
+ customNames.add(COLLECTION_ENTITY_EVENT);
+ customNames.add(COLLECTION_ENTITY_FOLDER);
+ customNames.add(COLLECTION_ENTITY_DEVICE);
+ customNames.add(COLLECTION_ENTITY_NOTIFICATION);
+ customNames.add(COLLECTION_ENTITY_ROLE);
+ }
+
+ public static boolean isCustomCollectionOrEntityName(String collectionName) {
+ return !customNames.contains(collectionName);
+ }
+
+ public static String stripEmptyVersion(final String name) {
+ // versioned name with empty version is name followed by separator
+ if (name.endsWith(VERSIONED_NAME_SEPARATOR)) {
+ return name.substring(0, name.length() - VERSIONED_NAME_SEPARATOR.length());
+ }
+ return name;
+ }
+
+ public static String addEmptyVersion(final String name) {
+ if (!isCustomCollectionOrEntityName(name) ||
+ name.contains(VERSIONED_NAME_SEPARATOR)) {
+ // not custom collection or already has version
+ return name;
+ }
+ return name + VERSIONED_NAME_SEPARATOR;
+ }
+
+ public static String handleEmptyVersion(final String name, boolean addEmptyVersion) {
+ String ret;
+ if (addEmptyVersion) {
+ ret = addEmptyVersion(name);
+ } else {
+ ret = stripEmptyVersion(name);
+ }
+ return ret;
+ }
+}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index aefceda..b5477f8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -33,6 +33,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
@@ -43,6 +45,8 @@
@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
public class DeIndexOperation implements BatchOperation {
+ private static final Logger logger = LoggerFactory.getLogger( DeIndexOperation.class );
+
@JsonProperty
public String[] indexes;
@@ -59,11 +63,13 @@
UUID version ) {
this.indexes = indexes;
this.documentId = createIndexDocId( applicationScope, id, version, searchEdge );
+ //logger.info("documentId={}", this.documentId);
}
public DeIndexOperation( String[] indexes, String docId) {
this.indexes = indexes;
this.documentId = docId;
+ //logger.info("documentId={}", this.documentId);
}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 1bb97b3..ac2f50a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +89,9 @@
}
//add app id for indexing
- container.addIndexRequest(new IndexOperation(writeAlias, applicationScope, indexEdge, entity,fieldsToIndex));
+ container.addIndexRequest(new IndexOperation(writeAlias, applicationScope,
+ new IndexEdgeImpl(indexEdge, false),
+ new Entity(entity, false),fieldsToIndex));
return this;
}
@@ -111,7 +114,9 @@
}
- container.addDeIndexRequest(new DeIndexOperation(indexes, applicationScope, searchEdge, id, version));
+ container.addDeIndexRequest(new DeIndexOperation(indexes, applicationScope,
+ new SearchEdgeImpl(searchEdge, false),
+ new SimpleId(id, false), version));
return this;
}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexEdgeImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexEdgeImpl.java
index 5b525cb..409639d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexEdgeImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexEdgeImpl.java
@@ -26,6 +26,7 @@
import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
/**
@@ -41,6 +42,15 @@
this.timestamp = timestamp;
}
+ public IndexEdgeImpl( final IndexEdge another, final boolean includeEmptyVersion) {
+ this(
+ new SimpleId(another.getNodeId(), includeEmptyVersion),
+ another.getEdgeName(),
+ another.getNodeType(),
+ another.getTimestamp()
+ );
+ }
+
@Override
public long getTimestamp() {
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
index f8560ba..f29e7bc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
@@ -33,6 +33,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -40,6 +42,8 @@
*/
public class IndexOperation implements BatchOperation {
+ private static final Logger logger = LoggerFactory.getLogger( IndexOperation.class );
+
@JsonProperty
public String writeAlias;
@JsonProperty
@@ -61,6 +65,7 @@
this.writeAlias = writeAlias;
this.data = data;
this.documentId = documentId;
+ //logger.info("documentId={}", documentId);
}
/**
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 5fe5b39..c878ea2 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -1,4 +1,4 @@
-package org.apache.usergrid.persistence.index.impl;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,12 +17,14 @@
* under the License.
*/
+package org.apache.usergrid.persistence.index.impl;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.apache.usergrid.persistence.index.CandidateResult;
import org.apache.usergrid.persistence.index.GeoCandidateResult;
import org.apache.usergrid.persistence.index.IndexEdge;
@@ -69,9 +71,9 @@
private static final Pattern DOCUMENT_PATTERN = Pattern.compile( DOCUMENT_ID_REGEX );
// These are not allowed in document type names: _ . , | #
- public static final String FIELD_SEPERATOR = ".";
+ public static final String FIELD_SEPARATOR = ".";
- public static final String ID_SEPERATOR = ",";
+ public static final String ID_SEPARATOR = ",";
/**
@@ -137,13 +139,17 @@
*
* TODO make this format more readable and parsable
*/
- public static String createContextName( final ApplicationScope applicationScope, final SearchEdge scope ) {
+ public static String createContextName( final ApplicationScope applicationScope, final SearchEdge searchEdge ) {
+ SearchEdge strippedSearchEdge = new SearchEdgeImpl(
+ new SimpleId(searchEdge.getNodeId().getUuid(), CollectionUtils.stripEmptyVersion(searchEdge.getNodeId().getType())),
+ searchEdge.getEdgeName(), searchEdge.getNodeType()
+ );
StringBuilder sb = new StringBuilder();
idString( sb, APPID_NAME, applicationScope.getApplication() );
- sb.append( FIELD_SEPERATOR );
- idString( sb, NODEID_NAME, scope.getNodeId() );
- sb.append( FIELD_SEPERATOR );
- appendField( sb, EDGE_NAME, scope.getEdgeName() );
+ sb.append(FIELD_SEPARATOR);
+ idString( sb, NODEID_NAME, strippedSearchEdge.getNodeId() );
+ sb.append(FIELD_SEPARATOR);
+ appendField( sb, EDGE_NAME, strippedSearchEdge.getEdgeName() );
return sb.toString();
}
@@ -163,34 +169,41 @@
public static String createIndexDocId( final ApplicationScope applicationScope, final Id entityId,
final UUID version, final SearchEdge searchEdge ) {
+ // strip empty collection versions to maintain backward compatibility
+ Id strippedEntityId = new SimpleId(entityId.getUuid(), CollectionUtils.stripEmptyVersion(entityId.getType()));
+ SearchEdge strippedSearchEdge = new SearchEdgeImpl(
+ new SimpleId(searchEdge.getNodeId().getUuid(), CollectionUtils.stripEmptyVersion(searchEdge.getNodeId().getType())),
+ searchEdge.getEdgeName(), searchEdge.getNodeType()
+ );
+
StringBuilder sb = new StringBuilder();
idString( sb, APPID_NAME, applicationScope.getApplication() );
- sb.append( FIELD_SEPERATOR );
- idString( sb, ENTITY_ID_FIELDNAME, entityId );
- sb.append( FIELD_SEPERATOR );
+ sb.append(FIELD_SEPARATOR);
+ idString( sb, ENTITY_ID_FIELDNAME, strippedEntityId );
+ sb.append(FIELD_SEPARATOR);
appendField( sb, VERSION_NAME, version.toString() );
- sb.append( FIELD_SEPERATOR );
- idString( sb, NODEID_NAME, searchEdge.getNodeId() );
- sb.append( FIELD_SEPERATOR );
- appendField( sb, EDGE_NAME, searchEdge.getEdgeName() );
- sb.append( FIELD_SEPERATOR );
- appendField( sb, NODE_TYPE_NAME, searchEdge.getNodeType().name() );
+ sb.append(FIELD_SEPARATOR);
+ idString( sb, NODEID_NAME, strippedSearchEdge.getNodeId() );
+ sb.append(FIELD_SEPARATOR);
+ appendField( sb, EDGE_NAME, strippedSearchEdge.getEdgeName() );
+ sb.append(FIELD_SEPARATOR);
+ appendField( sb, NODE_TYPE_NAME, strippedSearchEdge.getNodeType().name() );
return sb.toString();
}
- public static final String entityId( final Id id ) {
+ public static String entityId( final Id id ) {
return idString( ENTITY_NAME, id );
}
- public static final String applicationId( final Id id ) {
+ public static String applicationId( final Id id ) {
return idString( APPID_NAME, id );
}
- public static final String nodeId( final Id id ) {
+ public static String nodeId( final Id id ) {
return idString( NODEID_NAME, id );
}
@@ -198,7 +211,7 @@
/**
* Construct and Id string with the specified type for the id provided.
*/
- private static final String idString( final String type, final Id id ) {
+ private static String idString( final String type, final Id id ) {
final StringBuilder stringBuilder = new StringBuilder();
idString( stringBuilder, type, id );
@@ -211,7 +224,7 @@
* Append the id to the string
*/
private static final void idString( final StringBuilder builder, final String type, final Id id ) {
- builder.append( type ).append( "(" ).append( id.getUuid() ).append( ID_SEPERATOR )
+ builder.append( type ).append( "(" ).append( id.getUuid() ).append(ID_SEPARATOR)
.append( id.getType().toLowerCase() ).append( ")" );
}
@@ -254,7 +267,7 @@
//Other fields can be parsed using groups. The groups start at value 1, group 0 is the entire match
final String entityUUID = matcher.group(3);
- final String entityType = matcher.group(4);
+ final String entityType = CollectionUtils.addEmptyVersion(matcher.group(4));
final String versionUUID = matcher.group(5);
@@ -297,8 +310,8 @@
StringBuilder sb = new StringBuilder();
idString( sb, APPID_NAME, applicationScope.getApplication() );
- sb.append( FIELD_SEPERATOR );
- sb.append( ENTITY_TYPE_NAME).append("(" ).append( type ).append( ")" );
+ sb.append(FIELD_SEPARATOR);
+ sb.append( ENTITY_TYPE_NAME).append("(" ).append( CollectionUtils.stripEmptyVersion(type) ).append( ")" );
return sb.toString();
}
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchEdgeImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchEdgeImpl.java
index 3b62691..7bb2db5 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchEdgeImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchEdgeImpl.java
@@ -20,6 +20,7 @@
import org.apache.usergrid.persistence.index.SearchEdge;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
/**
@@ -60,6 +61,14 @@
this.nodeType = nodeType;
}
+ public SearchEdgeImpl( final SearchEdge another, boolean includeEmptyVersion) {
+ this(
+ new SimpleId(another.getNodeId(), includeEmptyVersion),
+ another.getEdgeName(),
+ another.getNodeType()
+ );
+ }
+
@Override
public Id getNodeId() {
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index ac7d10d..1e75cea 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -32,6 +32,7 @@
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.model.field.*;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -170,24 +171,29 @@
final CandidateResult candidate1 = candidateResults.get(0);
//check the id and version
- assertEquals( entity1.getId(), candidate1.getId() );
+ // with collection versioning, empty versions are included in results
+ assertEquals(entity1.getId().getType(), CollectionUtils.stripEmptyVersion(candidate1.getId().getType()));
+ assertEquals(entity1.getId().getUuid(), candidate1.getId().getUuid());
assertEquals(entity1.getVersion(), candidate1.getVersion());
final CandidateResult candidate2 = candidateResults.get(1);
//check the id and version
- assertEquals( entity2.getId(), candidate2.getId() );
+ assertEquals(entity2.getId().getType(), CollectionUtils.stripEmptyVersion(candidate2.getId().getType()));
+ assertEquals(entity2.getId().getUuid(), candidate2.getId().getUuid());
assertEquals( entity2.getVersion(), candidate2.getVersion() );
//make sure we can query uuids out as strings and not wrapped
candidateResults =
entityIndex.search( indexEdge, searchTypes, "select * where testuuid = '"+uuid+"'", 100, 0, false );
- assertEquals(entity1.getId(),candidateResults.get(0).getId());
+ assertEquals(entity1.getId().getType(), CollectionUtils.stripEmptyVersion(candidateResults.get(0).getId().getType()));
+ assertEquals(entity1.getId().getUuid(), candidateResults.get(0).getId().getUuid());
candidateResults =
entityIndex.search( indexEdge, searchTypes, "select * where testuuid = "+uuid, 100, 0, false);
- assertEquals(entity1.getId(),candidateResults.get(0).getId());
+ assertEquals(entity1.getId().getType(), CollectionUtils.stripEmptyVersion(candidateResults.get(0).getId().getType()));
+ assertEquals(entity1.getId().getUuid(), candidateResults.get(0).getId().getUuid());
}
@@ -519,7 +525,8 @@
final String query = "where username = 'edanuff'";
CandidateResults r = entityIndex.search( indexSCope, SearchTypes.fromTypes( "edanuff" ), query, 10, 0, false);
- assertEquals( user.getId(), r.get( 0 ).getId());
+ assertEquals(user.getId().getType(), CollectionUtils.stripEmptyVersion(r.get(0).getId().getType()));
+ assertEquals(user.getId().getUuid(), r.get(0).getId().getUuid());
batch.deindex( indexSCope, user.getId(), user.getVersion() );
indexProducer.put(batch.build()).subscribe();;
@@ -734,7 +741,10 @@
final CandidateResults r =
entityIndex.search( indexSCope, SearchTypes.fromTypes(entityId.getType()), query, 10, 0, false);
- assertEquals(user.getId(), r.get(0).getId());
+
+ // with collection versioning, empty versions are included in results
+ assertEquals(user.getId().getType(), CollectionUtils.stripEmptyVersion(r.get(0).getId().getType()));
+ assertEquals(user.getId().getUuid(), r.get(0).getId().getUuid());
}
@@ -774,7 +784,9 @@
final CandidateResults r =
entityIndex.search( indexSCope, SearchTypes.fromTypes( entityId.getType() ), query, 10, 0, false);
- assertEquals(user.getId(), r.get(0).getId());
+ // with collection versioning, empty versions are included in results
+ assertEquals(user.getId().getType(), CollectionUtils.stripEmptyVersion(r.get(0).getId().getType()));
+ assertEquals(user.getId().getUuid(), r.get(0).getId().getUuid());
//shouldn't match
final String queryNoWildCard = "where string = 'I am'";
@@ -833,8 +845,11 @@
entityIndex.search(indexSCope, SearchTypes.fromTypes( first.getId().getType() ), ascQuery, 10 , 0, false);
- assertEquals( first.getId(), ascResults.get( 0).getId() );
- assertEquals( second.getId(), ascResults.get( 1 ).getId() );
+ // with collection versioning, empty versions are included in results
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(ascResults.get(0).getId().getType()));
+ assertEquals(first.getId().getUuid(), ascResults.get(0).getId().getUuid());
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(ascResults.get(1).getId().getType()));
+ assertEquals(second.getId().getUuid(), ascResults.get(1).getId().getUuid());
//search in reversed
@@ -844,8 +859,11 @@
entityIndex.search(indexSCope, SearchTypes.fromTypes( first.getId().getType() ), descQuery, 10 , 0, false);
- assertEquals( second.getId(), descResults.get( 0).getId() );
- assertEquals( first.getId(), descResults.get( 1 ).getId() );
+ // with collection versioning, empty versions are included when parsing doc IDs
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(descResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), descResults.get(0).getId().getUuid());
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(descResults.get(1).getId().getType()));
+ assertEquals(first.getId().getUuid(), descResults.get(1).getId().getUuid());
}
@@ -899,7 +917,8 @@
assertEquals(1, singleResults.size());
- assertEquals(first.getId(), singleResults.get(0).getId());
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(singleResults.get(0).getId().getType()));
+ assertEquals(first.getId().getUuid(), singleResults.get(0).getId().getUuid());
//search in reversed
@@ -910,8 +929,11 @@
assertEquals( 2, singleKeywordUnion.size() );
- assertEquals( second.getId(), singleKeywordUnion.get( 0).getId() );
- assertEquals( first.getId(), singleKeywordUnion.get( 1 ).getId() );
+ // with collection versioning, empty versions are included when parsing doc IDs
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(singleKeywordUnion.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), singleKeywordUnion.get(0).getId().getUuid());
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(singleKeywordUnion.get(1).getId().getType()));
+ assertEquals(first.getId().getUuid(), singleKeywordUnion.get(1).getId().getUuid());
final String twoKeywordMatches = "string contains 'alpha' OR string contains 'long'";
@@ -921,8 +943,10 @@
assertEquals( 2, towMatchResults.size() );
- assertEquals(second.getId(), towMatchResults.get( 0).getId() );
- assertEquals(first.getId(), towMatchResults.get( 1 ).getId() );
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(towMatchResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), towMatchResults.get(0).getId().getUuid());
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(towMatchResults.get(1).getId().getType()));
+ assertEquals(first.getId().getUuid(), towMatchResults.get(1).getId().getUuid());
}
@@ -980,7 +1004,8 @@
assertEquals( 1, notFirstResults.size() );
- assertEquals(second.getId(), notFirstResults.get( 0 ).getId() );
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(notFirstResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), notFirstResults.get(0).getId().getUuid());
//search in reversed
@@ -991,7 +1016,8 @@
assertEquals( 1, notSecondUnion.size() );
- assertEquals( first.getId(), notSecondUnion.get( 0 ).getId() );
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(notSecondUnion.get(0).getId().getType()));
+ assertEquals(first.getId().getUuid(), notSecondUnion.get(0).getId().getUuid());
final String notBothReturn = "NOT int = 3";
@@ -1001,8 +1027,10 @@
assertEquals( 2, notBothReturnResults.size() );
- assertEquals( second.getId(), notBothReturnResults.get( 0).getId() );
- assertEquals( first.getId(), notBothReturnResults.get( 1 ).getId() );
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(notBothReturnResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), notBothReturnResults.get(0).getId().getUuid());
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(notBothReturnResults.get(1).getId().getType()));
+ assertEquals(first.getId().getUuid(), notBothReturnResults.get(1).getId().getUuid());
final String notFilterBoth = "(NOT int = 1) AND (NOT int = 2) ";
@@ -1020,8 +1048,10 @@
assertEquals( 2, noMatchesAndResults.size() );
- assertEquals( second.getId(), noMatchesAndResults.get( 0).getId() );
- assertEquals( first.getId(), noMatchesAndResults.get( 1 ).getId() );
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(noMatchesAndResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), noMatchesAndResults.get(0).getId().getUuid());
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(noMatchesAndResults.get(1).getId().getType()));
+ assertEquals(first.getId().getUuid(), noMatchesAndResults.get(1).getId().getUuid());
final String noMatchesOr = "(NOT int = 3) AND (NOT int = 4)";
@@ -1031,8 +1061,10 @@
assertEquals( 2, noMatchesOrResults.size() );
- assertEquals( second.getId(), noMatchesOrResults.get( 0).getId() );
- assertEquals( first.getId(), noMatchesOrResults.get( 1 ).getId() );
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(noMatchesOrResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), noMatchesOrResults.get(0).getId().getUuid());
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(noMatchesOrResults.get(1).getId().getType()));
+ assertEquals(first.getId().getUuid(), noMatchesOrResults.get(1).getId().getUuid());
}
@@ -1090,7 +1122,9 @@
assertEquals( 1, notFirstResults.size() );
- assertEquals(second.getId(), notFirstResults.get( 0 ).getId() );
+ // with collection versioning, empty versions are included
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(notFirstResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), notFirstResults.get(0).getId().getUuid());
final String notFirstWildCard = "NOT string = 'I ate*'";
@@ -1100,7 +1134,8 @@
assertEquals( 1, notFirstWildCardResults.size() );
- assertEquals(second.getId(), notFirstWildCardResults.get( 0 ).getId() );
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(notFirstWildCardResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), notFirstWildCardResults.get(0).getId().getUuid());
final String notFirstContains = "NOT string contains 'sammich'";
@@ -1110,7 +1145,8 @@
assertEquals( 1, notFirstContainsResults.size() );
- assertEquals(second.getId(), notFirstContainsResults.get( 0 ).getId() );
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(notFirstContainsResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), notFirstContainsResults.get(0).getId().getUuid());
//search in reversed
@@ -1121,7 +1157,8 @@
assertEquals( 1, notSecondUnion.size() );
- assertEquals( first.getId(), notSecondUnion.get( 0 ).getId() );
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(notSecondUnion.get(0).getId().getType()));
+ assertEquals(first.getId().getUuid(), notSecondUnion.get(0).getId().getUuid());
final String notSecondWildcard = "NOT string = 'I drank*'";
@@ -1131,7 +1168,8 @@
assertEquals( 1, notSecondWildcardUnion.size() );
- assertEquals( first.getId(), notSecondWildcardUnion.get( 0 ).getId() );
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(notSecondWildcardUnion.get(0).getId().getType()));
+ assertEquals(first.getId().getUuid(), notSecondWildcardUnion.get(0).getId().getUuid());
final String notSecondContains = "NOT string contains 'beer'";
@@ -1141,7 +1179,8 @@
assertEquals( 1, notSecondContainsUnion.size() );
- assertEquals( first.getId(), notSecondContainsUnion.get( 0 ).getId() );
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(notSecondContainsUnion.get(0).getId().getType()));
+ assertEquals(first.getId().getUuid(), notSecondContainsUnion.get(0).getId().getUuid());
final String notBothReturn = "NOT string = 'I'm a foodie'";
@@ -1151,8 +1190,10 @@
assertEquals( 2, notBothReturnResults.size() );
- assertEquals( second.getId(), notBothReturnResults.get( 0).getId() );
- assertEquals( first.getId(), notBothReturnResults.get( 1 ).getId() );
+ assertEquals(second.getId().getType(), CollectionUtils.stripEmptyVersion(notBothReturnResults.get(0).getId().getType()));
+ assertEquals(second.getId().getUuid(), notBothReturnResults.get(0).getId().getUuid());
+ assertEquals(first.getId().getType(), CollectionUtils.stripEmptyVersion(notBothReturnResults.get(1).getId().getType()));
+ assertEquals(first.getId().getUuid(), notBothReturnResults.get(1).getId().getUuid());
final String notFilterBoth = "(NOT string = 'I ate a sammich') AND (NOT string = 'I drank a beer') ";
@@ -1299,7 +1340,8 @@
final CandidateResult candidate1 = candidateResults.get(0);
//check the id and version
- assertEquals( entity1.getId(), candidate1.getId() );
+ assertEquals(entity1.getId().getType(), CollectionUtils.stripEmptyVersion(candidate1.getId().getType()));
+ assertEquals(entity1.getId().getUuid(), candidate1.getId().getUuid());
assertEquals(entity1.getVersion(), candidate1.getVersion());
}
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
index ba33030..91f41b6 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/GeoPagingTest.java
@@ -28,6 +28,7 @@
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -101,7 +102,7 @@
/**
* Test that geo-query returns co-located entities in expected order.
- */
+ nde*/
@Test
public void groupQueriesWithDistanceOrderedResults() throws Exception {
@@ -150,7 +151,9 @@
final Entity expected = cats[consistent];
- assertEquals(expected.getId(), candidate.getId());
+ // with collection versioning, empty versions are included
+ assertEquals(expected.getId().getType(), CollectionUtils.stripEmptyVersion(candidate.getId().getType()));
+ assertEquals(expected.getId().getUuid(), candidate.getId().getUuid());
}
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
index d93f8a3..af94864 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
@@ -22,6 +22,7 @@
import java.util.UUID;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.junit.Test;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
@@ -87,7 +88,9 @@
final CandidateResult parsedId = parseIndexDocId( output );
assertEquals(version, parsedId.getVersion());
- assertEquals(id, parsedId.getId());
+ // with collection versioning, empty versions are included when parsing doc IDs
+ assertEquals(id.getType(), CollectionUtils.stripEmptyVersion(parsedId.getId().getType()));
+ assertEquals(id.getUuid(), parsedId.getId().getUuid());
}
@@ -119,7 +122,9 @@
final CandidateResult parsedId = parseIndexDocId( output );
assertEquals(version, parsedId.getVersion());
- assertEquals(id, parsedId.getId());
+ // with collection versioning, empty versions are included when parsing doc IDs
+ assertEquals(id.getType(), CollectionUtils.stripEmptyVersion(parsedId.getId().getType()));
+ assertEquals(id.getUuid(), parsedId.getId().getUuid());
final UUID appId = parseAppIdFromIndexDocId(output);
assertEquals(appId,applicationScope.getApplication().getUuid());
@@ -154,7 +159,9 @@
final CandidateResult parsedId = parseIndexDocId( output );
assertEquals(version, parsedId.getVersion());
- assertEquals(id, parsedId.getId());
+ // with collection versioning, empty versions are included when parsing doc IDs
+ assertEquals(id.getType(), CollectionUtils.stripEmptyVersion(parsedId.getId().getType()));
+ assertEquals(id.getUuid(), parsedId.getId().getUuid());
}
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index d77e7e8..f78be41 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -18,7 +18,7 @@
# Properties for JUnit tests
-queue.standalone=true
+queue.standalone=false
usergrid.cluster_name=Test Cluster
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index 86b3216..f2e428d 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -31,7 +31,7 @@
import javax.ws.rs.core.PathSegment;
import javax.ws.rs.core.UriInfo;
-import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.model.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
@@ -89,7 +89,7 @@
logger.trace( "CollectionResource.executeClearCollection" );
}
- if (!Application.isCustomCollectionName(itemName.getPath())) {
+ if (!CollectionUtils.isCustomCollectionOrEntityName(itemName.getPath())) {
throw new IllegalArgumentException(
"Cannot clear built-in collections (" + itemName + ")."
);
@@ -139,7 +139,7 @@
logger.trace( "CollectionResource.executeGetCollectionVersion" );
}
- if (!Application.isCustomCollectionName(itemName.getPath())) {
+ if (!CollectionUtils.isCustomCollectionOrEntityName(itemName.getPath())) {
throw new IllegalArgumentException(
"Built-in collections are not versioned."
);
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java
index 14ed54f..306231f 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
import com.google.cloud.storage.StorageException;
import org.apache.commons.lang.StringUtils;
-import org.apache.usergrid.corepersistence.index.CollectionVersionUtil;
import org.apache.usergrid.corepersistence.index.VersionedCollectionName;
import org.apache.usergrid.management.OrganizationConfig;
import org.apache.usergrid.management.OrganizationConfigProps;
@@ -40,6 +39,7 @@
import org.apache.usergrid.services.assets.BinaryStoreFactory;
import org.apache.usergrid.services.assets.data.*;
import org.apache.usergrid.services.exceptions.AwsPropertiesNotFoundException;
+import org.apache.usergrid.corepersistence.index.CollectionVersionUtils;
import org.apache.usergrid.utils.JsonUtils;
import org.glassfish.jersey.media.multipart.BodyPart;
import org.glassfish.jersey.media.multipart.BodyPartEntity;
@@ -47,7 +47,6 @@
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeanInfoFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@@ -431,7 +430,7 @@
for (int i = 0; i < r.getEntities().size(); i++) {
Entity e = r.getEntity(i);
String oldType = e.getType();
- VersionedCollectionName v = CollectionVersionUtil.parseVersionedName(oldType);
+ VersionedCollectionName v = CollectionVersionUtils.parseVersionedName(oldType);
if (v.hasVersion()) {
e.setType(v.getCollectionName());
r.setEntity(i, e);
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/CollectionDeleteTooSoonExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/CollectionClearTooSoonExceptionMapper.java
similarity index 77%
rename from stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/CollectionDeleteTooSoonExceptionMapper.java
rename to stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/CollectionClearTooSoonExceptionMapper.java
index 3b247df..bc49d2a 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/CollectionDeleteTooSoonExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/CollectionClearTooSoonExceptionMapper.java
@@ -17,7 +17,7 @@
package org.apache.usergrid.rest.exceptions;
-import org.apache.usergrid.corepersistence.asyncevents.CollectionDeleteTooSoonException;
+import org.apache.usergrid.corepersistence.asyncevents.CollectionClearTooSoonException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,15 +28,15 @@
@Provider
-public class CollectionDeleteTooSoonExceptionMapper extends AbstractExceptionMapper<CollectionDeleteTooSoonException> {
+public class CollectionClearTooSoonExceptionMapper extends AbstractExceptionMapper<CollectionClearTooSoonException> {
- private static final Logger logger = LoggerFactory.getLogger(CollectionDeleteTooSoonExceptionMapper.class);
+ private static final Logger logger = LoggerFactory.getLogger(CollectionClearTooSoonExceptionMapper.class);
@Override
- public Response toResponse( CollectionDeleteTooSoonException e ) {
+ public Response toResponse( CollectionClearTooSoonException e ) {
if(logger.isTraceEnabled()) {
- logger.trace("Tried to delete collection too soon after previous deletion", e.getMessage());
+ logger.trace("Tried to clear collection too soon after previous clear", e.getMessage());
}
return toResponse( BAD_REQUEST, e );
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
index be60177..3752b08 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
@@ -354,6 +354,9 @@
response.setProperty( "status", status.getStatus() );
response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
response.setProperty( "numberQueued", status.getNumberProcessed() );
+ if (request.getUpdateTimestamp().isPresent() && request.getUpdateTimestamp().get() > 0L) {
+ response.setProperty("updatedSince", request.getUpdateTimestamp());
+ }
response.setSuccess();
return response;
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
index e40c193..b25a717 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionClearTest.java
@@ -44,7 +44,7 @@
* @throws Exception
*/
@Test
- public void collectionDelete() throws Exception {
+ public void collectionClear() throws Exception {
String collectionName = "children";
int numEntities = 10;
@@ -54,47 +54,98 @@
String namePrefixAfterClear = "abc";
// verify collection version is empty
- ApiResponse tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
- LinkedHashMap dataMap = (LinkedHashMap)tempResponse.getData();
- assertEquals("", dataMap.get("version"));
- assertEquals(collectionName, dataMap.get("collectionName"));
+ String collectionVersion = getCollectionVersion(collectionName);
+ assertEquals("", collectionVersion);
- createEntities( collectionName, namePrefix, numEntities );
+ createEntities( collectionName, namePrefix, 1, numEntities );
// retrieve entities, provide 1 more than num entities
- QueryParameters parms = new QueryParameters().setLimit( numEntities + 1 ).setQuery("order by created asc");
- List<Entity> entities = retrieveEntities(collectionName, namePrefix, parms, numEntities, false);
+ QueryParameters parms = new QueryParameters().setLimit( numEntities + 1 );
+ List<Entity> entities = retrieveEntities(collectionName, namePrefix, parms, 1, numEntities, true);
assertEquals(numEntities, entities.size());
// clear the collection
- Map<String, Object> payload = new HashMap<String, Object>();
+ Map<String, Object> payload = new HashMap<>();
parms = new QueryParameters().setKeyValue("confirm_collection_name", collectionName);
- tempResponse = this.app().collection(collectionName).collection("_clear").post(true, payload, parms);
+ ApiResponse clearResponse = this.app().collection(collectionName).collection("_clear").post(true, payload, parms);
// verify collection version has changed
- tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
- dataMap = (LinkedHashMap)tempResponse.getData();
- String newVersion = (String)dataMap.get("version");
+ String newVersion = getCollectionVersion(collectionName);
assertNotEquals("", newVersion);
- assertEquals(collectionName, dataMap.get("collectionName"));
// validate that 0 entities left
- List<Entity> entitiesAfterClear = retrieveEntities(collectionName, namePrefix, parms, 0, true);
+ List<Entity> entitiesAfterClear = retrieveEntities(collectionName, namePrefix, parms, 1, 0, true);
assertEquals(0, entitiesAfterClear.size());
// insert more entities using same collectionName
- createEntities( collectionName, namePrefixAfterClear, numEntitiesAfterClear );
+ createEntities( collectionName, namePrefixAfterClear, 1, numEntitiesAfterClear );
// validate correct number of entities
- parms = new QueryParameters().setLimit( numEntitiesAfterClear + 1 ).setQuery("order by created asc");
- List<Entity> newEntities = retrieveEntities(collectionName, namePrefixAfterClear, parms, numEntitiesAfterClear, false);
+ parms = new QueryParameters().setLimit( numEntitiesAfterClear + 1 );
+ List<Entity> newEntities = retrieveEntities(collectionName, namePrefixAfterClear, parms, 1, numEntitiesAfterClear, true);
assertEquals(numEntitiesAfterClear, newEntities.size());
// verify collection version has not changed
- tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
- dataMap = (LinkedHashMap)tempResponse.getData();
- assertEquals(newVersion, dataMap.get("version"));
+ String lastVersion = getCollectionVersion(collectionName);
+ assertEquals(newVersion, lastVersion);
+ }
+
+
+ /**
+ * Tests that old collection entities are deleted.
+ * @throws Exception
+ */
+ @Test
+ public void collectionMultipleClear() throws Exception {
+ String collectionName = "dogs";
+ int numEntities = 2000;
+ String namePrefix = "dog";
+ int numDeleteCycles = 3;
+ int startingEntityNum = 1;
+
+ // should start out as unversioned
+ String currentVersion = getCollectionVersion(collectionName);
+ assertEquals("", currentVersion);
+
+ for (int cycle = 1; cycle <= numDeleteCycles; cycle++) {
+ logger.info("Creating entities {} - {} for cycle {}", startingEntityNum, lastEntityNum(startingEntityNum, numEntities), cycle);
+ createEntities( collectionName, namePrefix, startingEntityNum, numEntities );
+
+ // retrieve entities, provide 1 more than num entities
+ logger.info("Retrieving entities {} - {} for cycle {}", startingEntityNum, lastEntityNum(startingEntityNum, numEntities), cycle);
+ QueryParameters parms = new QueryParameters().setLimit( numEntities + 1 );
+ List<Entity> entities = retrieveEntities(collectionName, namePrefix, parms, startingEntityNum, numEntities, true);
+ assertEquals(numEntities, entities.size());
+
+ // clear collection
+ logger.info("Clearing collection for cycle {}", cycle);
+ String newVersion = clearCollection(collectionName);
+ logger.info("Collection version is {} for cycle {}", newVersion, cycle);
+ assertNotEquals(currentVersion, newVersion);
+
+ // validate that 0 entities left
+ List<Entity> entitiesAfterClear = retrieveEntities(collectionName, namePrefix, parms, 1, 0, true);
+ assertEquals(0, entitiesAfterClear.size());
+
+ currentVersion = newVersion;
+ startingEntityNum = startingEntityNum + numEntities;
+ }
+
+ }
+
+ private int lastEntityNum(int startingEntityNum, int numEntities) {
+ return startingEntityNum + numEntities - 1;
+ }
+
+
+ /**
+ * Get collection version
+ */
+ private String getCollectionVersion(String collectionName) {
+ ApiResponse tempResponse = this.app().collection(collectionName).collection("_version").get().getResponse();
+ LinkedHashMap dataMap = (LinkedHashMap)tempResponse.getData();
assertEquals(collectionName, dataMap.get("collectionName"));
+ return (String)dataMap.get("version");
}
@@ -104,10 +155,10 @@
* @param collectionName
* @param numOfEntities
*/
- public List<Entity> createEntities(String collectionName, String namePrefix, int numOfEntities ){
+ public List<Entity> createEntities(String collectionName, String namePrefix, int firstEntity, int numOfEntities ){
List<Entity> entities = new LinkedList<>( );
- for ( int i = 1; i <= numOfEntities; i++ ) {
+ for ( int i = firstEntity; i <= lastEntityNum(firstEntity, numOfEntities); i++ ) {
Map<String, Object> entityPayload = new HashMap<String, Object>();
entityPayload.put( "name", namePrefix + String.valueOf( i ) );
entityPayload.put( "num", i );
@@ -117,12 +168,8 @@
entities.add( entity );
this.app().collection( collectionName ).post( entity );
-
- if ( i % 100 == 0){
- logger.info("created {} entities", i);
- }
}
- logger.info("created {} total entities", numOfEntities);
+ logger.info("created {} entities", numOfEntities);
this.waitForQueueDrainAndRefreshIndex();
@@ -135,15 +182,15 @@
* @param parms
* @param numOfEntities
*/
- public List<Entity> retrieveEntities(String collectionName, String namePrefix, QueryParameters parms, int numOfEntities, boolean reverseOrder){
+ public List<Entity> retrieveEntities(String collectionName, String namePrefix, QueryParameters parms, int firstEntity, int numOfEntities, boolean reverseOrder){
List<Entity> entities = new LinkedList<>( );
Collection testCollection = this.app().collection( collectionName ).get(parms, true);
int entityNum;
if (reverseOrder) {
- entityNum = numOfEntities;
+ entityNum = lastEntityNum(firstEntity, numOfEntities);
} else {
- entityNum = 1;
+ entityNum = firstEntity;
}
while (testCollection.getCursor() != null) {
while (testCollection.hasNext()) {
@@ -176,4 +223,16 @@
return entities;
}
+ private String clearCollection(String collectionName) {
+ // clear the collection
+ Map<String, Object> payload = new HashMap<>();
+ QueryParameters parms = new QueryParameters().setKeyValue("confirm_collection_name", collectionName);
+ ApiResponse clearResponse = this.app().collection(collectionName).collection("_clear").post(true, payload, parms);
+
+ // verify collection version has changed
+ String newVersion = getCollectionVersion(collectionName);
+
+ return newVersion;
+ }
+
}
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
index dee78f9..10ac63a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractConnectionsService.java
@@ -17,6 +17,7 @@
package org.apache.usergrid.services;
+import org.apache.usergrid.corepersistence.index.CollectionVersionUtils;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.Query.Level;
import org.apache.usergrid.persistence.index.query.Identifier;
@@ -32,13 +33,16 @@
import rx.schedulers.Schedulers;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import static org.apache.usergrid.services.ServiceManager.MAP_VERSIONED_COLLECTION_NAME_KEY;
import static org.apache.usergrid.services.ServiceParameter.filter;
import static org.apache.usergrid.services.ServiceParameter.firstParameterIsName;
import static org.apache.usergrid.utils.ClassUtils.cast;
import static org.apache.usergrid.utils.InflectionUtils.pluralize;
+import static org.apache.usergrid.utils.InflectionUtils.singularize;
import static org.apache.usergrid.utils.ListUtils.dequeue;
import static org.apache.usergrid.utils.ListUtils.initCopy;
@@ -90,6 +94,9 @@
cType = dequeue( parameters ).getName();
}
if ( cType != null ) {
+ // this is not a versionable collection. If there is a version here, it is because the connection name matches
+ // a versioned collection. Remove the version.
+ cType = CollectionVersionUtils.getBaseCollectionName(cType);
collectionName = cType;
}
@@ -121,6 +128,9 @@
}
else {
eType = Schema.normalizeEntityType( s );
+ Map<String,String> collectionInfo = sm.getVersionedCollectionInfo(s, eType);
+ eType = collectionInfo.get(MAP_VERSIONED_COLLECTION_NAME_KEY);
+ //logger.info("connection service collection eType:{}", eType);
first_parameter = dequeue( parameters );
if ( first_parameter instanceof QueryParameter ) {
query = first_parameter.getQuery();
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java
index 711a86c..1f5aeb4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java
@@ -25,7 +25,7 @@
import org.apache.usergrid.corepersistence.index.CollectionScopeImpl;
import org.apache.usergrid.corepersistence.index.CollectionVersionManager;
import org.apache.usergrid.corepersistence.index.CollectionVersionManagerFactory;
-import org.apache.usergrid.corepersistence.index.CollectionVersionUtil;
+import org.apache.usergrid.corepersistence.index.CollectionVersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@@ -49,6 +49,7 @@
import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
import static org.apache.usergrid.utils.InflectionUtils.pluralize;
+import static org.apache.usergrid.utils.InflectionUtils.singularize;
public class ServiceManager {
@@ -225,6 +226,57 @@
}
+ public static String MAP_VERSIONED_COLLECTION_NAME_KEY = "versionedName";
+ public static String MAP_UNVERSIONED_COLLECTION_NAME_KEY = "unversionedName";
+ public static String MAP_VERSIONED_ITEM_TYPE_KEY = "versionedType";
+ public static String MAP_UNVERSIONED_ITEM_TYPE_KEY = "unversionedType";
+ public static String MAP_COLLECTION_VERSION_KEY = "version";
+
+ /**
+ * Returns versioned collection info in Map (
+ * @param collectionName
+ * @param itemType
+ * @return
+ */
+ public Map<String,String> getVersionedCollectionInfo(String collectionName, String itemType) {
+ Map<String,String> collectionInfo = new HashMap<>();
+ String versionedCollectionName = collectionName;
+ String unversionedCollectionName = CollectionVersionUtils.getBaseCollectionName(collectionName);
+ String versionedItemType = itemType;
+ // this works for item type too
+ String unversionedItemType = CollectionVersionUtils.getBaseCollectionName(itemType);
+ String collectionVersion = "";
+ if (collectionName.equals(unversionedCollectionName)) {
+ // no version passed in
+ CollectionVersionManager collectionVersionManager = cvmf.getInstance(new CollectionScopeImpl(applicationId, unversionedCollectionName));
+ // always bypass collection version cache for now
+ collectionVersion = collectionVersionManager.getCollectionVersion(true);
+
+ if (collectionVersion != "") {
+ if (logger.isTraceEnabled()) {
+ logger.trace("getVersionedCollectionName: currentCollectionVersion={}", collectionVersion);
+ }
+ versionedCollectionName = CollectionVersionUtils.buildVersionedNameString(unversionedCollectionName, collectionVersion, false);
+ versionedItemType = CollectionVersionUtils.buildVersionedNameString(itemType, collectionVersion, false);
+ if (logger.isTraceEnabled()) {
+ logger.trace("getVersionedCollectionName() - using versioned collection name: collectionName={} versionedCollectionName={} versionedItemType={}",
+ unversionedCollectionName, versionedCollectionName, versionedItemType);
+ }
+ }
+ } else {
+ // version was passed in
+ collectionVersion = CollectionVersionUtils.getCollectionVersion(versionedCollectionName);
+ }
+ collectionInfo.put(MAP_VERSIONED_COLLECTION_NAME_KEY, versionedCollectionName);
+ collectionInfo.put(MAP_UNVERSIONED_COLLECTION_NAME_KEY, unversionedCollectionName);
+ collectionInfo.put(MAP_VERSIONED_ITEM_TYPE_KEY, versionedItemType);
+ collectionInfo.put(MAP_UNVERSIONED_ITEM_TYPE_KEY, unversionedItemType);
+ collectionInfo.put(MAP_COLLECTION_VERSION_KEY, collectionVersion);
+
+ return collectionInfo;
+ }
+
+
public Service getService( String serviceType ) {
return getService( serviceType, true );
}
@@ -246,32 +298,15 @@
return null;
}
- // use versionedCollectionName if appropriate
- String versionedCollectionName = info.getCollectionName();
- String unversionedCollectionName = CollectionVersionUtil.getBaseCollectionName(versionedCollectionName);
+ Map<String,String> collectionInfo = getVersionedCollectionInfo(info.getCollectionName(), info.getItemType());
+ String versionedCollectionName = collectionInfo.get(MAP_VERSIONED_COLLECTION_NAME_KEY);
+ String versionedItemType = collectionInfo.get(MAP_VERSIONED_ITEM_TYPE_KEY);
if (logger.isTraceEnabled()) {
logger.trace("getService: serviceType={} incoming collectionName={}", serviceType, versionedCollectionName);
}
- // if versioned collection name was passed in, use it, because it may be for an old version
- if (versionedCollectionName.equals(unversionedCollectionName)) {
- // no version passed in
- CollectionVersionManager collectionVersionManager = cvmf.getInstance(new CollectionScopeImpl(applicationId, unversionedCollectionName));
- // always bypass collection version cache for now
- String currentCollectionVersion = collectionVersionManager.getCollectionVersion(true);
-
- if (currentCollectionVersion != "") {
- if (logger.isTraceEnabled()) {
- logger.trace("getService: currentCollectionVersion={}", currentCollectionVersion);
- }
- versionedCollectionName = CollectionVersionUtil.buildVersionedNameString(unversionedCollectionName, currentCollectionVersion, false);
- String versionedItemType = CollectionVersionUtil.buildVersionedNameString(info.getItemType(), currentCollectionVersion, false);
- if (logger.isTraceEnabled()) {
- logger.trace("getService() - using versioned collection name: collectionName={} versionedCollectionName={} versionedItemType={}",
- unversionedCollectionName, versionedCollectionName, versionedItemType);
- }
- info = ServiceInfo.getVersionedServiceInfo(info, versionedCollectionName, versionedItemType);
- }
+ if (!versionedCollectionName.equals(info.getCollectionName())) {
+ info = ServiceInfo.getVersionedServiceInfo(info, versionedCollectionName, versionedItemType);
}
Service service = getServiceInstance( info );