Merge branch 'collectionClearJob'
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d505a96..bf1f5e7 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -112,7 +112,7 @@
# Read timeout for an individual request (in millseconds)
#
-#cassandra.timeout=5000
+#cassandra.timeout=20000
# Set the credentials used for Cassandra, if any.
@@ -786,6 +786,7 @@
# instead, use character class ([.] instead of backslash-period)
usergrid.org.config.property.regex=usergrid[.]view[.].*
+usergrid.viewable.loginEndpoint=http://localhost:8080
########################### Usergrid Email Templates ########################
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ec6b775..a0748e6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -140,6 +140,7 @@
bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
+ bind( CollectionDeleteService.class ).to( CollectionDeleteServiceImpl.class );
bind( ExportService.class ).to( ExportServiceImpl.class );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index cdb4fc7..68c4ef0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -2471,10 +2471,9 @@
final Entity entity;
- //this is the fall back, why isn't this writt
if ( entityType == null ) {
return null;
-// throw new EntityNotFoundException( String.format( "Counld not find type for uuid {}", uuid ) );
+// throw new EntityNotFoundException( String.format( "Could not find type for uuid {}", uuid ) );
}
entity = get( new SimpleEntityRef( entityType, uuid ) );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index cec7258..bad5b2c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -487,7 +487,7 @@
// evict app Id from cache
applicationIdCache.evictAppId(appName);
- logger.info("Initialized application {}", appName);
+ logger.info("Initialized application {}, uuid {}", appName, appInfo.getUuid().toString());
return appInfo;
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 872ffbb..46c7a1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -39,7 +39,7 @@
int sleep();
@Key( "usergrid.entityManager.enable_deindex_on_update" )
- @Default( "true" )
+ @Default( "false" )
boolean getDeindexOnUpdate();
/**
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
new file mode 100644
index 0000000..4b91e17
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventQueueType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+public enum AsyncEventQueueType {
+ REGULAR ("regular"), UTILITY("utility"), DELETE("delete");
+
+ private String displayName;
+ AsyncEventQueueType(String displayName) {
+ this.displayName = displayName;
+ }
+
+ @Override
+ public String toString() {
+ return displayName;
+ }
+}
+
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index cab4e3e..04eaf4c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteAction;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -33,7 +34,7 @@
/**
* Low level queue service for events in the entity. These events are fire and forget, and will always be asynchronous
*/
-public interface AsyncEventService extends ReIndexAction {
+public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction {
/**
@@ -84,9 +85,9 @@
/**
*
* @param indexOperationMessage
- * @param forUtilityQueue
+ * @param queueType
*/
- void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue);
+ void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType);
/**
* @param applicationScope
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 530cf7d..3d06cae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -59,6 +59,7 @@
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -74,9 +75,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.commons.lang.StringUtils.indexOf;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
/**
* TODO, this whole class is becoming a nightmare.
@@ -104,13 +102,15 @@
public int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
- public static final String DEAD_LETTER_SUFFIX = "_dead";
+ public static final String QUEUE_NAME_DELETE = "delete";
private final LegacyQueueManager indexQueue;
private final LegacyQueueManager utilityQueue;
+ private final LegacyQueueManager deleteQueue;
private final LegacyQueueManager indexQueueDead;
private final LegacyQueueManager utilityQueueDead;
+ private final LegacyQueueManager deleteQueueDead;
private final IndexProcessorFig indexProcessorFig;
private final LegacyQueueFig queueFig;
private final IndexProducer indexProducer;
@@ -132,8 +132,10 @@
private final Counter indexErrorCounter;
private final AtomicLong counter = new AtomicLong();
private final AtomicLong counterUtility = new AtomicLong();
+ private final AtomicLong counterDelete = new AtomicLong();
private final AtomicLong counterIndexDead = new AtomicLong();
private final AtomicLong counterUtilityDead = new AtomicLong();
+ private final AtomicLong counterDeleteDead = new AtomicLong();
private final AtomicLong inFlight = new AtomicLong();
private final Histogram messageCycle;
private final MapManager esMapPersistence;
@@ -174,16 +176,24 @@
LegacyQueueScope utilityQueueScope =
new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL);
+ LegacyQueueScope deleteQueueScope =
+ new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL);
+
LegacyQueueScope indexQueueDeadScope =
new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL, true);
LegacyQueueScope utilityQueueDeadScope =
new LegacyQueueScopeImpl(QUEUE_NAME_UTILITY, LegacyQueueScope.RegionImplementation.ALL, true);
+ LegacyQueueScope deleteQueueDeadScope =
+ new LegacyQueueScopeImpl(QUEUE_NAME_DELETE, LegacyQueueScope.RegionImplementation.ALL, true);
+
this.indexQueue = queueManagerFactory.getQueueManager(indexQueueScope);
this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
+ this.deleteQueue = queueManagerFactory.getQueueManager(deleteQueueScope);
this.indexQueueDead = queueManagerFactory.getQueueManager(indexQueueDeadScope);
this.utilityQueueDead = queueManagerFactory.getQueueManager(utilityQueueDeadScope);
+ this.deleteQueueDead = queueManagerFactory.getQueueManager(deleteQueueDeadScope);
this.indexProcessorFig = indexProcessorFig;
this.queueFig = queueFig;
@@ -206,34 +216,90 @@
start();
}
+ private String getQueueName(AsyncEventQueueType queueType) {
+ switch (queueType) {
+ case REGULAR:
+ return QUEUE_NAME;
+
+ case UTILITY:
+ return QUEUE_NAME_UTILITY;
+
+ case DELETE:
+ return QUEUE_NAME_DELETE;
+
+ default:
+ throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+ }
+ }
+
+ private LegacyQueueManager getQueue(AsyncEventQueueType queueType) {
+ return getQueue(queueType, false);
+ }
+
+ private LegacyQueueManager getQueue(AsyncEventQueueType queueType, boolean isDeadQueue) {
+ switch (queueType) {
+ case REGULAR:
+ return isDeadQueue ? indexQueueDead : indexQueue;
+
+ case UTILITY:
+ return isDeadQueue ? utilityQueueDead : utilityQueue;
+
+ case DELETE:
+ return isDeadQueue ? deleteQueueDead : deleteQueue;
+
+ default:
+ throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+ }
+ }
+
+ private AtomicLong getCounter(AsyncEventQueueType queueType, boolean isDeadQueue) {
+ switch (queueType) {
+ case REGULAR:
+ return isDeadQueue ? counterIndexDead : counter;
+
+ case UTILITY:
+ return isDeadQueue ? counterUtilityDead : counterUtility;
+
+ case DELETE:
+ return isDeadQueue ? counterDeleteDead : counterDelete;
+
+ default:
+ throw new IllegalArgumentException("Invalid queue type: " + queueType.toString());
+ }
+ }
+
+
+
+
/**
* Offer the EntityIdScope to SQS
*/
private void offer(final Serializable operation) {
+ offer(operation, AsyncEventQueueType.REGULAR);
+ }
+
+ private void offer(final Serializable operation, AsyncEventQueueType queueType) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- this.indexQueue.sendMessageToLocalRegion( operation );
+ getQueue(queueType).sendMessageToLocalRegion(operation);
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
timer.stop();
}
+
}
- private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
+ private void offerTopic(final Serializable operation, AsyncEventQueueType queueType) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- if (forUtilityQueue) {
- this.utilityQueue.sendMessageToAllRegions(operation);
- } else {
- this.indexQueue.sendMessageToAllRegions(operation);
- }
+ getQueue(queueType).sendMessageToAllRegions(operation);
}
catch ( IOException e ) {
throw new RuntimeException( "Unable to queue message", e );
@@ -244,15 +310,11 @@
}
- private void offerBatch(final List operations, boolean forUtilityQueue){
+ private void offerBatch(final List operations, AsyncEventQueueType queueType){
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- if( forUtilityQueue ){
- this.utilityQueue.sendMessages(operations);
- }else{
- this.indexQueue.sendMessages(operations);
- }
+ getQueue(queueType).sendMessages(operations);
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -260,25 +322,16 @@
}
}
- private void offerBatchToUtilityQueue(final List operations){
- try {
- //signal to SQS
- this.utilityQueue.sendMessages(operations);
- } catch (IOException e) {
- throw new RuntimeException("Unable to queue message", e);
- }
- }
-
/**
* Take message
*/
- private List<LegacyQueueMessage> take() {
+ private List<LegacyQueueMessage> take(AsyncEventQueueType queueType, boolean isDeadQueue) {
final Timer.Context timer = this.readTimer.time();
try {
- return indexQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+ return getQueue(queueType, isDeadQueue).getMessages(MAX_TAKE, AsyncEvent.class);
}
finally {
//stop our timer
@@ -286,52 +339,8 @@
}
}
- /**
- * Take message from SQS utility queue
- */
- private List<LegacyQueueMessage> takeFromUtilityQueue() {
-
- final Timer.Context timer = this.readTimer.time();
-
- try {
- return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
- }
- finally {
- //stop our timer
- timer.stop();
- }
- }
-
- /**
- * Take message from index dead letter queue
- */
- private List<LegacyQueueMessage> takeFromIndexDeadQueue() {
-
- final Timer.Context timer = this.readTimer.time();
-
- try {
- return indexQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
- }
- finally {
- //stop our timer
- timer.stop();
- }
- }
-
- /**
- * Take message from SQS utility dead letter queue
- */
- private List<LegacyQueueMessage> takeFromUtilityDeadQueue() {
-
- final Timer.Context timer = this.readTimer.time();
-
- try {
- return utilityQueueDead.getMessages(MAX_TAKE, AsyncEvent.class);
- }
- finally {
- //stop our timer
- timer.stop();
- }
+ private List<LegacyQueueMessage> take(AsyncEventQueueType queueType) {
+ return take(queueType, false);
}
@@ -362,38 +371,15 @@
}
}
-
- /**
- * calls the event handlers and returns a result with information on whether
- * it needs to be ack'd and whether it needs to be indexed
- * Ack message in SQS
- */
- public void ackUtilityQueue(final List<LegacyQueueMessage> messages) {
- try{
- utilityQueue.commitMessages( messages );
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
+ public void ack(final List<LegacyQueueMessage> messages, AsyncEventQueueType queueType, boolean isDeadQueue) {
+ if (queueType == AsyncEventQueueType.REGULAR && !isDeadQueue) {
+ // different functionality
+ ack(messages);
}
- }
-
- /**
- * ack messages in index dead letter queue
- */
- public void ackIndexDeadQueue(final List<LegacyQueueMessage> messages) {
- try{
- indexQueueDead.commitMessages( messages );
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
+ try {
+ getQueue(queueType, isDeadQueue).commitMessages( messages );
}
- }
-
- /**
- * ack messages in utility dead letter queue
- */
- public void ackUtilityDeadQueue(final List<LegacyQueueMessage> messages) {
- try{
- utilityQueueDead.commitMessages( messages );
- }catch(Exception e){
+ catch (Exception e) {
throw new RuntimeException("Unable to ack messages", e);
}
}
@@ -532,11 +518,13 @@
applicationScope);
- logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
- applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering InitializeApplicationIndexEvent for {}:{}",
+ applicationScope.getApplication().getUuid(), applicationScope.getApplication().getType());
+ }
offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
- new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
+ new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), AsyncEventQueueType.REGULAR);
}
@@ -545,8 +533,10 @@
final Entity entity, long updatedAfter) {
- logger.trace("Offering EntityIndexEvent for {}:{}",
- entity.getId().getUuid(), entity.getId().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EntityIndexEvent for {}:{}",
+ entity.getId().getUuid(), entity.getId().getType());
+ }
offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),
new EntityIdScope(applicationScope, entity.getId()), updatedAfter));
@@ -587,8 +577,10 @@
final Entity entity,
final Edge newEdge) {
- logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
- newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
+ newEdge.getType(), entity.getId().getUuid(), entity.getId().getType());
+ }
offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));
@@ -622,11 +614,13 @@
public void queueDeleteEdge(final ApplicationScope applicationScope,
final Edge edge) {
- logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
- edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EdgeDeleteEvent for type {} to target {}:{}",
+ edge.getType(), edge.getTargetNode().getUuid(), edge.getTargetNode().getType());
+ }
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
- offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
+ offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ), AsyncEventQueueType.DELETE );
}
private IndexOperationMessage handleEdgeDelete(final LegacyQueueMessage message) {
@@ -650,8 +644,7 @@
}
// default this observable's return to empty index operation message if nothing is emitted
- return eventBuilder.buildDeleteEdge(applicationScope, edge)
- .toBlocking().lastOrDefault(new IndexOperationMessage());
+ return eventBuilder.buildDeleteEdge(applicationScope, edge);
}
@@ -660,9 +653,9 @@
/**
* Queue up an indexOperationMessage for multi region execution
* @param indexOperationMessage
- * @param forUtilityQueue
+ * @param queueType
*/
- public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) {
+ public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, AsyncEventQueueType queueType) {
// don't try to produce something with nothing
if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -686,9 +679,11 @@
//send to the topic so all regions index the batch
- logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId );
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering ElasticsearchIndexEvent for message {}", newMessageId);
+ }
- offerTopic( elasticsearchIndexEvent, forUtilityQueue );
+ offerTopic( elasticsearchIndexEvent, queueType );
}
private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
@@ -760,11 +755,13 @@
// queue the de-index of old versions to the topic so cleanup happens in all regions
- logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
- applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering DeIndexOldVersionsEvent for app {} {}:{}",
+ applicationScope.getApplication().getUuid(), entityId.getUuid(), entityId.getType());
+ }
offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
- new EntityIdScope( applicationScope, entityId), markedVersion), false);
+ new EntityIdScope( applicationScope, entityId), markedVersion), AsyncEventQueueType.DELETE );
}
@@ -821,10 +818,13 @@
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
- logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering EntityDeleteEvent for {}:{}", entityId.getUuid(), entityId.getType());
+ }
// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
- offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
+ offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ),
+ AsyncEventQueueType.DELETE );
}
private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
@@ -840,21 +840,15 @@
final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
+ final boolean isCollectionDelete = entityDeleteEvent.isCollectionDelete();
+ final long updatedBefore = entityDeleteEvent.getUpdatedBefore();
- if (logger.isDebugEnabled())
- logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Deleting entity id from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore {}",
+ applicationScope, entityId, isCollectionDelete, updatedBefore);
+ }
- final EventBuilderImpl.EntityDeleteResults
- entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
-
-
- // Delete the entities and remove from graph separately
- entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
-
- entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
-
- // default this observable's return to empty index operation message if nothing is emitted
- return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(new IndexOperationMessage());
+ return eventBuilder.buildEntityDelete( applicationScope, entityId, isCollectionDelete, updatedBefore );
}
@@ -879,23 +873,40 @@
public void start() {
final int indexCount = indexProcessorFig.getWorkerCount();
final int utilityCount = indexProcessorFig.getWorkerCountUtility();
+ final int deleteCount = indexProcessorFig.getWorkerCountDelete();
final int indexDeadCount = indexProcessorFig.getWorkerCountDeadLetter();
final int utilityDeadCount = indexProcessorFig.getWorkerCountUtilityDeadLetter();
+ final int deleteDeadCount = indexProcessorFig.getWorkerCountDeleteDeadLetter();
+ logger.info("Starting queue workers for indexing: index={} indexDLQ={} utility={} utilityDLQ={} delete={} deleteDLQ={}",
+ indexCount, indexDeadCount, utilityCount, utilityDeadCount, deleteCount, deleteDeadCount);
for (int i = 0; i < indexCount; i++) {
- startWorker(QUEUE_NAME);
+ startWorker(AsyncEventQueueType.REGULAR);
}
for (int i = 0; i < utilityCount; i++) {
- startWorker(QUEUE_NAME_UTILITY);
+ startWorker(AsyncEventQueueType.UTILITY);
}
- for (int i = 0; i < indexDeadCount; i++) {
- startDeadQueueWorker(QUEUE_NAME);
+ for (int i = 0; i < deleteCount; i++) {
+ startWorker(AsyncEventQueueType.DELETE);
}
- for (int i = 0; i < utilityDeadCount; i++) {
- startDeadQueueWorker(QUEUE_NAME_UTILITY);
+ if( indexQueue instanceof SNSQueueManagerImpl) {
+ logger.info("Queue manager implementation supports dead letters, start dead letter queue workers.");
+ for (int i = 0; i < indexDeadCount; i++) {
+ startDeadQueueWorker(AsyncEventQueueType.REGULAR);
+ }
+
+ for (int i = 0; i < utilityDeadCount; i++) {
+ startDeadQueueWorker(AsyncEventQueueType.UTILITY);
+ }
+
+ for (int i = 0; i < deleteDeadCount; i++) {
+ startDeadQueueWorker(AsyncEventQueueType.DELETE);
+ }
+ }else{
+ logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue workers.");
}
}
@@ -914,11 +925,10 @@
}
- private void startWorker(final String type) {
- Preconditions.checkNotNull(type, "Worker type required");
+ private void startWorker(final AsyncEventQueueType queueType) {
synchronized (mutex) {
- boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+ String type = getQueueName(queueType);
Observable<List<LegacyQueueMessage>> consumer =
Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
@@ -926,20 +936,15 @@
public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
//name our thread so it's easy to see
- long threadNum = isUtilityQueue ?
- counterUtility.incrementAndGet() : counter.incrementAndGet();
- Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
+ long threadNum = getCounter(queueType, false).incrementAndGet();
+ Thread.currentThread().setName( "QueueConsumer_" + type + "_" + threadNum );
List<LegacyQueueMessage> drainList = null;
do {
try {
- if ( isUtilityQueue ){
- drainList = takeFromUtilityQueue();
- }else{
- drainList = take();
+ drainList = take(queueType);
- }
//emit our list in it's entity to hand off to a worker pool
subscriber.onNext(drainList);
@@ -995,7 +1000,7 @@
// submit the processed messages to index producer
List<LegacyQueueMessage> messagesToAck =
- submitToIndex( indexEventResults, isUtilityQueue );
+ submitToIndex( indexEventResults, queueType );
if ( messagesToAck.size() < messages.size() ) {
logger.warn(
@@ -1005,12 +1010,7 @@
// ack each message if making it to this point
if( messagesToAck.size() > 0 ){
-
- if ( isUtilityQueue ){
- ackUtilityQueue( messagesToAck );
- }else{
- ack( messagesToAck );
- }
+ ack(messagesToAck, queueType, false);
}
return messagesToAck;
@@ -1034,129 +1034,112 @@
}
- private void startDeadQueueWorker(final String type) {
- Preconditions.checkNotNull(type, "Worker type required");
+ private void startDeadQueueWorker(final AsyncEventQueueType queueType) {
+ String type = getQueueName(queueType);
synchronized (mutex) {
- boolean isUtilityDeadQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
-
Observable<List<LegacyQueueMessage>> consumer =
- Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
- @Override
- public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
+ Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
+ @Override
+ public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
- //name our thread so it's easy to see
- long threadNum = isUtilityDeadQueue ?
- counterUtilityDead.incrementAndGet() : counterIndexDead.incrementAndGet();
- Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type+ "_" + threadNum );
+ //name our thread so it's easy to see
+ long threadNum = getCounter(queueType, true).incrementAndGet();
+ Thread.currentThread().setName( "QueueDeadLetterConsumer_" + type + "_" + threadNum );
- List<LegacyQueueMessage> drainList = null;
+ List<LegacyQueueMessage> drainList = null;
- do {
- try {
- if ( isUtilityDeadQueue ){
- drainList = takeFromUtilityDeadQueue();
- }else{
- drainList = takeFromIndexDeadQueue();
- }
- //emit our list in it's entity to hand off to a worker pool
- subscriber.onNext(drainList);
+ do {
+ try {
+ drainList = take(queueType, true);
- //take since we're in flight
- inFlight.addAndGet( drainList.size() );
+ //emit our list in it's entity to hand off to a worker pool
+ subscriber.onNext(drainList);
- } catch ( Throwable t ) {
+ //take since we're in flight
+ inFlight.addAndGet( drainList.size() );
- final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
+ } catch ( Throwable t ) {
- // there might be an error here during tests, just clean the cache
- indexQueueDead.clearQueueNameCache();
+ final long sleepTime = indexProcessorFig.getDeadLetterFailureRetryTime();
- if ( t instanceof InvalidQueryException ) {
+ // there might be an error here during tests, just clean the cache
+ indexQueueDead.clearQueueNameCache();
- // don't fill up log with exceptions when keyspace and column
- // families are not ready during bootstrap/setup
- logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
- t.getMessage(), sleepTime );
+ if ( t instanceof InvalidQueryException ) {
- } else {
- logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
- }
+ // don't fill up log with exceptions when keyspace and column
+ // families are not ready during bootstrap/setup
+ logger.warn( "Failed to dequeue dead letters due to '{}'. Sleeping for {} ms",
+ t.getMessage(), sleepTime );
- if ( drainList != null ) {
- inFlight.addAndGet( -1 * drainList.size() );
- }
-
- try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
+ } else {
+ logger.error( "Failed to dequeue dead letters. Sleeping for {} ms", sleepTime, t);
}
+
+ if ( drainList != null ) {
+ inFlight.addAndGet( -1 * drainList.size() );
+ }
+
+ try { Thread.sleep( sleepTime ); } catch ( InterruptedException ie ) {}
}
- while ( true );
}
- } ) //this won't block our read loop, just reads and proceeds
- .flatMap( sqsMessages -> {
+ while ( true );
+ }
+ } ) //this won't block our read loop, just reads and proceeds
+ .flatMap( sqsMessages -> {
- //do this on a different schedule, and introduce concurrency
- // with flatmap for faster processing
- return Observable.just( sqsMessages )
+ //do this on a different schedule, and introduce concurrency
+ // with flatmap for faster processing
+ return Observable.just( sqsMessages )
- .map( messages -> {
- if ( messages == null || messages.size() == 0 ) {
- // no messages came from the queue, move on
- return null;
- }
+ .map( messages -> {
+ if ( messages == null || messages.size() == 0 ) {
+ // no messages came from the queue, move on
+ return null;
+ }
- try {
- // put the dead letter messages back in the appropriate queue
- LegacyQueueManager returnQueue = null;
- String queueType;
- if (isUtilityDeadQueue) {
- returnQueue = utilityQueue;
- queueType = "utility";
- } else {
- returnQueue = indexQueue;
- queueType = "index";
- }
- List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
- for (LegacyQueueMessage msg : successMessages) {
- logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType, msg.getType(), msg.getMessageId(), msg.getStringBody());
- }
- int unsuccessfulMessagesSize = messages.size() - successMessages.size();
- if (unsuccessfulMessagesSize > 0) {
- // some messages couldn't be sent to originating queue, log
- Set<String> successMessageIds = new HashSet<>();
- for (LegacyQueueMessage msg : successMessages) {
- String messageId = msg.getMessageId();
- if (successMessageIds.contains(messageId)) {
- logger.warn("Found duplicate messageId in returned messages: {}", messageId);
- } else {
- successMessageIds.add(messageId);
- }
- }
- for (LegacyQueueMessage msg : messages) {
- String messageId = msg.getMessageId();
- if (!successMessageIds.contains(messageId)) {
- logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType, msg.getType(), messageId, msg.getStringBody());
- }
- }
- }
+ try {
+ // put the dead letter messages back in the appropriate queue
+ LegacyQueueManager returnQueue = getQueue(queueType, false);
- if (isUtilityDeadQueue) {
- ackUtilityDeadQueue(successMessages);
- } else {
- ackIndexDeadQueue(successMessages);
- }
+ List<LegacyQueueMessage> successMessages = returnQueue.sendQueueMessages(messages);
+ for (LegacyQueueMessage msg : successMessages) {
+ logger.warn("Returning message to {} queue: type:{}, messageId:{} body: {}", queueType.toString(), msg.getType(), msg.getMessageId(), msg.getStringBody());
+ }
+ int unsuccessfulMessagesSize = messages.size() - successMessages.size();
+ if (unsuccessfulMessagesSize > 0) {
+ // some messages couldn't be sent to originating queue, log
+ Set<String> successMessageIds = new HashSet<>();
+ for (LegacyQueueMessage msg : successMessages) {
+ String messageId = msg.getMessageId();
+ if (successMessageIds.contains(messageId)) {
+ logger.warn("Found duplicate messageId in returned messages: {}", messageId);
+ } else {
+ successMessageIds.add(messageId);
+ }
+ }
+ for (LegacyQueueMessage msg : messages) {
+ String messageId = msg.getMessageId();
+ if (!successMessageIds.contains(messageId)) {
+ logger.warn("Failed to return message to {} queue: type:{} messageId:{} body: {}", queueType.toString(), msg.getType(), messageId, msg.getStringBody());
+ }
+ }
+ }
- return messages;
- }
- catch ( Exception e ) {
- logger.error( "Failed to ack messages", e );
- return null;
- //do not rethrow so we can process all of them
- }
- } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+ ack(successMessages, queueType, true);
- //end flatMap
- }, indexProcessorFig.getEventConcurrencyFactor() );
+ return messages;
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to ack messages", e );
+ return null;
+ //do not rethrow so we can process all of them
+ }
+ } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+
+ //end flatMap
+ }, indexProcessorFig.getEventConcurrencyFactor() );
//start in the background
@@ -1170,7 +1153,7 @@
* Submit results to index and return the queue messages to be ack'd
*
*/
- private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
+ private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, AsyncEventQueueType queueType) {
// if nothing came back then return empty list
if(indexEventResults==null){
@@ -1197,7 +1180,7 @@
// collect into a list of QueueMessages that can be ack'd later
.collect(Collectors.toList());
- queueIndexOperationMessage(combined, forUtilityQueue);
+ queueIndexOperationMessage(combined, queueType);
return queueMessages;
}
@@ -1208,10 +1191,10 @@
new EntityIndexOperation( applicationScope, id, updatedSince);
queueIndexOperationMessage(
- eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
+ eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), AsyncEventQueueType.REGULAR );
}
- public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
+ public void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType) {
final List<EntityIndexEvent> batch = new ArrayList<>();
edges.forEach(e -> {
@@ -1222,9 +1205,25 @@
});
- logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
+ if (logger.isTraceEnabled()) {
+ logger.trace("Offering batch of EntityIndexEvent of size {}", batch.size());
+ }
- offerBatch( batch, forUtilityQueue );
+ offerBatch( batch, queueType );
+ }
+
+ public void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType) {
+
+ final List<EntityDeleteEvent> batch = new ArrayList<>();
+ edges.forEach(e -> {
+
+ //change to id scope to avoid serialization issues
+ batch.add(new EntityDeleteEvent(queueFig.getPrimaryRegion(),
+ new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), true, updatedBefore));
+
+ });
+
+ offerBatch(batch, queueType);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 4db9f4b..4bb6312 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -27,6 +27,7 @@
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -54,7 +55,7 @@
* @param edge
* @return
*/
- Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
+ IndexOperationMessage buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
/**
* Return a bin with 2 observable streams for entity delete.
@@ -62,7 +63,18 @@
* @param entityId
* @return
*/
- EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+ IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId);
+
+ /**
+ * Return a bin with 2 observable streams for entity delete.
+ * @param applicationScope
+ * @param entityId
+ * @param isCollectionDelete
+ * @param updatedBefore
+ * @return
+ */
+ IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId,
+ boolean isCollectionDelete, long updatedBefore);
@@ -94,17 +106,17 @@
- private final Observable<Id> compactedNode;
+ private final Observable<MarkedEdge> deletedEdges;
public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable,
final Observable<List<MvccLogEntry>> entitiesDeleted,
- final Observable<Id> compactedNode) {
+ final Observable<MarkedEdge> deletedEdges) {
this.indexOperationMessageObservable = indexOperationMessageObservable;
this.entitiesDeleted = entitiesDeleted;
- this.compactedNode = compactedNode;
+ this.deletedEdges = deletedEdges;
}
@@ -116,8 +128,8 @@
return entitiesDeleted;
}
- public Observable<Id> getCompactedNode() {
- return compactedNode;
+ public Observable<MarkedEdge> getEdgesDeleted() {
+ return deletedEdges;
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index bbdce5a..7c72b72 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -24,12 +24,13 @@
import java.util.List;
import java.util.UUID;
+import org.antlr.misc.Graph;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -82,7 +83,7 @@
if (logger.isDebugEnabled()) {
logger.debug("Indexing in app scope {} with entity {} and new edge {}",
- applicationScope, entity, newEdge);
+ applicationScope, entity, newEdge);
}
return indexService.indexEdge( applicationScope, entity, newEdge );
@@ -90,15 +91,38 @@
@Override
- public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge
+ public IndexOperationMessage buildDeleteEdge( final ApplicationScope applicationScope, final Edge
edge ) {
if (logger.isDebugEnabled()) {
logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
}
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- return gm.deleteEdge( edge )
- .flatMap( deletedEdge -> indexService.deleteIndexEdge( applicationScope, deletedEdge ));
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+ final IndexOperationMessage combined = new IndexOperationMessage();
+
+ gm.deleteEdge( edge )
+ .doOnNext( deletedEdge -> {
+
+ logger.debug("Processing deleted edge for de-indexing {}", deletedEdge);
+
+ // get ALL versions of the target node as any connection from this source node needs to be removed
+ ecm.getVersionsFromMaxToMin(deletedEdge.getTargetNode(), UUIDUtils.newTimeUUID())
+ .doOnNext(mvccLogEntry -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", deletedEdge.getTargetNode(), mvccLogEntry);
+ }
+ combined.ingest(
+ indexService
+ .deIndexEdge(applicationScope, deletedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
+ .toBlocking().lastOrDefault(new IndexOperationMessage()));
+
+ }).toBlocking().lastOrDefault(null);
+
+ }).toBlocking().lastOrDefault(null);
+
+ return combined;
}
@@ -106,43 +130,135 @@
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
@Override
- public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+ public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+ return buildEntityDelete(applicationScope, entityId, false, Long.MAX_VALUE);
+ }
+
+ @Override
+ public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId,
+ final boolean isCollectionDelete, final long updatedBefore) {
+
if (logger.isDebugEnabled()) {
- logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+ logger.debug("Deleting entity id (marked versions) from index in app scope {} with entityId {}, isCollectionDelete {}, updatedBefore={}",
+ applicationScope, entityId, isCollectionDelete, updatedBefore);
}
- final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
- final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
- //TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED)
+ boolean deleteEntity = ecm.load(entityId).
+ map(entity -> {
+ final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
- MvccLogEntry mostRecentlyMarked = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
- .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
+ boolean willDelete = false;
+ if ( modified == null ) {
+ // We don't have a modified field, so we can't check, so delete it
+ willDelete = true;
+ } else if (modified.getValue() <= updatedBefore) {
+ willDelete = true;
+ }
- // De-indexing and entity deletes don't check log entries. We must do that first. If no DELETED logs, then
- // return an empty observable as our no-op.
- Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
- Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
+ if (isCollectionDelete && willDelete) {
+ // need to mark for deletion
+ ecm.mark(entityId, null)
+ .mergeWith(gm.markNode(entityId, CpNamingUtils.createGraphOperationTimestamp()))
+ .toBlocking().last();
+ }
- if(mostRecentlyMarked != null){
+ return willDelete;
+ }).toBlocking().firstOrDefault(true);
- // fetch entity versions to be de-index by looking in cassandra
- deIndexObservable =
- indexService.deIndexEntity(applicationScope, entityId, mostRecentlyMarked.getVersion(),
- getVersionsOlderThanMarked(ecm, entityId, mostRecentlyMarked.getVersion()));
-
- ecmDeleteObservable =
- ecm.getVersionsFromMaxToMin( entityId, mostRecentlyMarked.getVersion() )
- .filter( mvccLogEntry->
- mvccLogEntry.getVersion().timestamp() <= mostRecentlyMarked.getVersion().timestamp() )
- .buffer( serializationFig.getBufferSize() )
- .doOnNext( buffer -> ecm.delete( buffer ) );
+ if (!deleteEntity) {
+ return new IndexOperationMessage();
}
- // Graph compaction checks the versions inside compactNode, just build this up for the caller to subscribe to
- final Observable<Id> graphCompactObservable = gm.compactNode(entityId);
- return new EntityDeleteResults( deIndexObservable, ecmDeleteObservable, graphCompactObservable );
+ MvccLogEntry mostRecentToDelete =
+ ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() )
+ .toBlocking()
+ .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
+
+// logger.info("mostRecent stage={} entityId={} version={} state={}",
+// mostRecentToDelete.getStage().name(), mostRecentToDelete.getEntityId(),
+// mostRecentToDelete.getVersion().toString(), mostRecentToDelete.getState().name());
+
+ if (mostRecentToDelete == null) {
+ logger.info("No entity versions to delete for id {}", entityId.toString());
+ }
+ // if nothing is marked, then abort
+ if(mostRecentToDelete == null){
+ return new IndexOperationMessage();
+ }
+
+ final List<MvccLogEntry> logEntries = new ArrayList<>();
+ Observable<MvccLogEntry> mvccLogEntryListObservable =
+ ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() );
+ mvccLogEntryListObservable
+ .filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
+ .buffer( serializationFig.getBufferSize() )
+ .doOnNext( buffer -> ecm.delete( buffer ) )
+ .doOnNext(mvccLogEntries -> {
+ logEntries.addAll(mvccLogEntries);
+ }).toBlocking().lastOrDefault(null);
+
+ //logger.info("logEntries size={}", logEntries.size());
+
+ IndexOperationMessage combined = new IndexOperationMessage();
+
+ // do the edge deletes and build up de-index messages for each edge deleted
+ // assume we have "server1" and "region1" nodes in the graph with the following relationships (edges/connections):
+ //
+ // region1 -- zzzconnzzz|has --> server1
+ // server1 -- zzzconnzzz|in --> region1
+ //
+ // there will always be a relationship from the appId to each entity based on the entity type (collection):
+ //
+ // application -- zzzcollzzz|servers --> server1
+ // application -- zzzcollzzz|regions --> region1
+ //
+ // When deleting either "server1" or "region1" entity, the connections should get deleted and de-indexed along
+ // with the entry for the entity itself in the collection. The above example should have at minimum 3 things to
+ // be de-indexed. There may be more as either "server1" or "region1" could have multiple versions.
+ //
+ // Further comments using the example of deleting "server1" from the above example.
+ gm.compactNode(entityId).doOnNext(markedEdge -> {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing deleted edge for de-indexing {}", markedEdge);
+ }
+
+ // if the edge was for a connection where the entity to be deleted is the source node, we need to load
+ // the target node's versions so that all versions of connections to that entity can be de-indexed
+ // server1 -- zzzconnzzz|in --> region1
+ if(!markedEdge.getTargetNode().getType().equals(entityId.getType())){
+
+ // get ALL versions of the target node as any connection from this source node needs to be removed
+ ecm.getVersionsFromMaxToMin( markedEdge.getTargetNode(), UUIDUtils.newTimeUUID() )
+ .doOnNext(mvccLogEntry -> {
+ logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, mvccLogEntry);
+ combined.ingest(
+ indexService
+ .deIndexEdge(applicationScope, markedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
+ .toBlocking().lastOrDefault(new IndexOperationMessage()));
+
+ }).toBlocking().lastOrDefault(null);
+
+ }else {
+
+ // for each version of the entity being deleted, de-index the connections where the entity is the target
+ // node ( application -- zzzcollzzz|servers --> server1 ) or (region1 -- zzzconnzzz|has --> server1)
+ logEntries.forEach(logEntry -> {
+ logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, logEntry);
+ combined.ingest(
+ indexService
+ .deIndexEdge(applicationScope, markedEdge, logEntry.getEntityId(), logEntry.getVersion())
+ .toBlocking().lastOrDefault(new IndexOperationMessage()));
+ });
+ }
+
+ }).toBlocking().lastOrDefault(null);
+
+ return combined;
}
@Override
@@ -185,22 +301,22 @@
return indexService.deIndexOldVersions( applicationScope, entityId,
- getVersionsOlderThanMarked(ecm, entityId, markedVersion), markedVersion);
+ getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion));
}
- private List<UUID> getVersionsOlderThanMarked( final EntityCollectionManager ecm,
- final Id entityId, final UUID markedVersion ){
+ private List<UUID> getVersionsOlderThanOrEqualToMarked(final EntityCollectionManager ecm,
+ final Id entityId, final UUID markedVersion ){
final List<UUID> versions = new ArrayList<>();
- // only take last 5 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
+ // only take last 100 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
// clusters that do not have this in-line cleanup
ecm.getVersionsFromMaxToMin( entityId, markedVersion)
- .take(5)
+ .take(100)
.forEach( mvccLogEntry -> {
- if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) {
+ if ( mvccLogEntry.getVersion().timestamp() <= markedVersion.timestamp() ) {
versions.add(mvccLogEntry.getVersion());
}
@@ -210,4 +326,17 @@
return versions;
}
+ private List<UUID> getAllVersions( final EntityCollectionManager ecm,
+ final Id entityId ) {
+
+ final List<UUID> versions = new ArrayList<>();
+
+ ecm.getVersionsFromMaxToMin(entityId, UUIDUtils.newTimeUUID())
+ .forEach( mvccLogEntry -> {
+ versions.add(mvccLogEntry.getVersion());
+ });
+
+ return versions;
+ }
+
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index 01d2ba8..1589632 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -24,6 +24,7 @@
/**
* Event that will signal to finish the actual delete (post-mark delete) for an Entity
+ * It will mark if this is for a collection delete
*/
public final class EntityDeleteEvent extends AsyncEvent {
@@ -31,17 +32,41 @@
@JsonProperty
protected EntityIdScope entityIdScope;
+ @JsonProperty
+ private long updatedBefore;
+
+ @JsonProperty
+ private boolean isCollectionDelete;
+
public EntityDeleteEvent() {
super();
}
public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope) {
super(sourceRegion);
- this.entityIdScope = entityIdScope;
+ this.entityIdScope = entityIdScope;
+ this.updatedBefore = Long.MAX_VALUE;
+ this.isCollectionDelete = false;
+ }
+
+ public EntityDeleteEvent(String sourceRegion, EntityIdScope entityIdScope,
+ boolean isCollectionDelete, long updatedBefore) {
+ super(sourceRegion);
+ this.entityIdScope = entityIdScope;
+ this.updatedBefore = updatedBefore;
+ this.isCollectionDelete = isCollectionDelete;
}
public EntityIdScope getEntityIdScope() {
return entityIdScope;
}
+
+ public long getUpdatedBefore() {
+ return updatedBefore;
+ }
+
+ public boolean isCollectionDelete() {
+ return isCollectionDelete;
+ }
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
new file mode 100644
index 0000000..7bad06b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.List;
+
+
+/**
+ * Callback to perform a collection delete operation based on an scope during bulk collection delete operations
+ */
+public interface CollectionDeleteAction {
+
+ /**
+ * Delete a batch list of entities.
+ * @param edges
+ * @param updatedBefore
+ * @param queueType
+ */
+ void deleteBatch(final List<EdgeScope> edges, final long updatedBefore, AsyncEventQueueType queueType);
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
new file mode 100644
index 0000000..4abdfea
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * A builder interface to build our collection delete request
+ */
+public interface CollectionDeleteRequestBuilder {
+
+ /**
+ * Set the application id
+ */
+ CollectionDeleteRequestBuilder withApplicationId(final UUID applicationId);
+
+ /**
+ * Set the collection name.
+ * @param collectionName
+ * @return
+ */
+ CollectionDeleteRequestBuilder withCollection(final String collectionName);
+
+ /**
+ * Set our cursor to resume processing
+ * @param cursor
+ * @return
+ */
+ CollectionDeleteRequestBuilder withCursor(final String cursor);
+
+
+ CollectionDeleteRequestBuilder withDelay(int delayTimer, TimeUnit timeUnit);
+
+ /**
+ * Set the timestamp to delete entities updated <= this timestamp
+ * @param timestamp
+ * @return
+ */
+ CollectionDeleteRequestBuilder withEndTimestamp(final Long timestamp);
+
+
+ Optional<Integer> getDelayTimer();
+
+ Optional<TimeUnit> getTimeUnitOptional();
+
+ /**
+ * Get the application scope
+ * @return
+ */
+ Optional<ApplicationScope> getApplicationScope();
+
+ /**
+ * Get the collection name
+ * @return
+ */
+ Optional<String> getCollectionName();
+
+ /**
+ * Get the cursor
+ * @return
+ */
+ Optional<String> getCursor();
+
+ /**
+ * Get the latest timestamp to delete
+ * @return
+ */
+ Optional<Long> getEndTimestamp();
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
new file mode 100644
index 0000000..890b770
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteRequestBuilderImpl.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import com.google.common.base.Optional;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * collection delete service request builder
+ */
+public class CollectionDeleteRequestBuilderImpl implements CollectionDeleteRequestBuilder {
+
+ private Optional<UUID> withApplicationId = Optional.absent();
+ private Optional<String> withCollectionName = Optional.absent();
+ private Optional<String> cursor = Optional.absent();
+ private Optional<Long> endTimestamp = Optional.absent();
+ private Optional<Integer> delayTimer = Optional.absent();
+ private Optional<TimeUnit> timeUnitOptional = Optional.absent();
+
+
+ /***
+ *
+ * @param applicationId The application id
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withApplicationId( final UUID applicationId ) {
+ this.withApplicationId = Optional.fromNullable( applicationId );
+ return this;
+ }
+
+
+ /**
+ * the collection name
+ * @param collectionName
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withCollection( final String collectionName ) {
+ this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName.toLowerCase() ) );
+ return this;
+ }
+
+
+ /**
+ * The cursor
+ * @param cursor
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withCursor( final String cursor ) {
+ this.cursor = Optional.fromNullable( cursor );
+ return this;
+ }
+
+
+ /**
+ * Determines whether we should tack on a delay for collection delete and for how long if we do. Also
+ * allowed to specify how throttled back it should be.
+ * @param delayTimer
+ * @param timeUnit
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withDelay( final int delayTimer, final TimeUnit timeUnit ){
+ this.delayTimer = Optional.fromNullable( delayTimer );
+ this.timeUnitOptional = Optional.fromNullable( timeUnit );
+
+ return this;
+ }
+
+
+ /**
+ * Set end timestamp in epoch time. Only entities created before this time will be processed for deletion
+ * @param timestamp
+ * @return
+ */
+ @Override
+ public CollectionDeleteRequestBuilder withEndTimestamp( final Long timestamp ) {
+ this.endTimestamp = Optional.fromNullable( timestamp );
+ return this;
+ }
+
+
+ @Override
+ public Optional<Integer> getDelayTimer() {
+ return delayTimer;
+ }
+
+ @Override
+ public Optional<TimeUnit> getTimeUnitOptional() {
+ return timeUnitOptional;
+ }
+
+
+ @Override
+ public Optional<ApplicationScope> getApplicationScope() {
+
+ if ( this.withApplicationId.isPresent() ) {
+ return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
+ }
+
+ return Optional.absent();
+ }
+
+
+ @Override
+ public Optional<String> getCollectionName() {
+ return withCollectionName;
+ }
+
+
+ @Override
+ public Optional<String> getCursor() {
+ return cursor;
+ }
+
+
+ @Override
+ public Optional<Long> getEndTimestamp() {
+ return endTimestamp;
+ }
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
new file mode 100644
index 0000000..c939dd3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteService.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+/**
+ * An interface for re-indexing all entities in an application
+ */
+public interface CollectionDeleteService {
+
+
+ /**
+ * Perform a collection delete via service
+ *
+ * @param collectionDeleteRequestBuilder The builder to build the request
+ */
+ CollectionDeleteStatus deleteCollection(final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder);
+
+
+ /**
+ * Generate a build for the collection delete
+ */
+ CollectionDeleteRequestBuilder getBuilder();
+
+
+ /**
+ * Get the status of a job
+ * @param jobId The jobId returned during the collection delete
+ * @return
+ */
+ CollectionDeleteStatus getStatus(final String jobId);
+
+
+ /**
+ * The response when requesting a collection delete operation
+ */
+ public class CollectionDeleteStatus {
+ final String jobId;
+ final Status status;
+ final long numberProcessed;
+ final long lastUpdated;
+
+
+ public CollectionDeleteStatus(final String jobId, final Status status, final long numberProcessed,
+ final long lastUpdated ) {
+ this.jobId = jobId;
+ this.status = status;
+ this.numberProcessed = numberProcessed;
+ this.lastUpdated = lastUpdated;
+ }
+
+
+ /**
+ * Get the jobId used to resume this operation
+ */
+ public String getJobId() {
+ return jobId;
+ }
+
+
+ /**
+ * Get the last updated time, as a long
+ * @return
+ */
+ public long getLastUpdated() {
+ return lastUpdated;
+ }
+
+
+ /**
+ * Get the number of records processed
+ * @return
+ */
+ public long getNumberProcessed() {
+ return numberProcessed;
+ }
+
+
+ /**
+ * Get the status
+ * @return
+ */
+ public Status getStatus() {
+ return status;
+ }
+ }
+
+ enum Status{
+ STARTED, INPROGRESS, COMPLETE, UNKNOWN;
+ }
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
new file mode 100644
index 0000000..7b3e324
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionDeleteServiceImpl.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
+import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
+import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.utils.InflectionUtils;
+import org.apache.usergrid.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+import static com.google.common.base.Optional.fromNullable;
+
+
+@Singleton
+public class CollectionDeleteServiceImpl implements CollectionDeleteService {
+
+ private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteServiceImpl.class );
+
+ private static final MapScope RESUME_MAP_SCOPE =
+ new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "collectiondeleteresume" );
+
+ //Keep cursors to resume collection delete for 10 days.
+ private static final int CURSOR_TTL = 60 * 60 * 24 * 10;
+
+ private static final String MAP_CURSOR_KEY = "cursor";
+ private static final String MAP_COUNT_KEY = "count";
+ private static final String MAP_STATUS_KEY = "status";
+ private static final String MAP_UPDATED_KEY = "lastUpdated";
+
+
+ private final AllApplicationsObservable allApplicationsObservable;
+ private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final AllEntityIdsObservable allEntityIdsObservable;
+ private final IndexProcessorFig indexProcessorFig;
+ private final MapManager mapManager;
+ private final MapManagerFactory mapManagerFactory;
+ private final AsyncEventService indexService;
+ private final EntityIndexFactory entityIndexFactory;
+ private final CollectionSettingsFactory collectionSettingsFactory;
+
+
+ @Inject
+ public CollectionDeleteServiceImpl(final EntityIndexFactory entityIndexFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final AllEntityIdsObservable allEntityIdsObservable,
+ final MapManagerFactory mapManagerFactory,
+ final AllApplicationsObservable allApplicationsObservable,
+ final IndexProcessorFig indexProcessorFig,
+ final CollectionSettingsFactory collectionSettingsFactory,
+ final AsyncEventService indexService ) {
+ this.entityIndexFactory = entityIndexFactory;
+ this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.allEntityIdsObservable = allEntityIdsObservable;
+ this.allApplicationsObservable = allApplicationsObservable;
+ this.indexProcessorFig = indexProcessorFig;
+ this.indexService = indexService;
+ this.collectionSettingsFactory = collectionSettingsFactory;
+ this.mapManagerFactory = mapManagerFactory;
+ this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
+ }
+
+
+ //TODO: optional delay, param.
+ @Override
+ public CollectionDeleteStatus deleteCollection( final CollectionDeleteRequestBuilder collectionDeleteRequestBuilder) {
+
+ final AtomicInteger count = new AtomicInteger();
+
+ final Optional<EdgeScope> cursor = parseCursor( collectionDeleteRequestBuilder.getCursor() );
+
+ final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
+
+ final Optional<Integer> delayTimer = collectionDeleteRequestBuilder.getDelayTimer();
+
+ final Optional<TimeUnit> timeUnitOptional = collectionDeleteRequestBuilder.getTimeUnitOptional();
+
+ Optional<ApplicationScope> appId = collectionDeleteRequestBuilder.getApplicationScope();
+
+ Preconditions.checkArgument(collectionDeleteRequestBuilder.getCollectionName().isPresent(),
+ "You must specify a collection name");
+ String collectionName = collectionDeleteRequestBuilder.getCollectionName().get();
+
+ Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
+ "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid." );
+ Preconditions.checkArgument( cursor.isPresent() || appId.isPresent(),
+ "Either application ID or cursor is required.");
+
+ ApplicationScope applicationScope;
+ if (appId.isPresent()) {
+ applicationScope = appId.get();
+ } else { // cursor is present
+ applicationScope = cursor.get().getApplicationScope();
+ }
+
+
+ final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+ // default to current time
+ final long endTimestamp = collectionDeleteRequestBuilder.getEndTimestamp().or( System.currentTimeMillis() );
+
+ String pluralizedCollectionName = InflectionUtils.pluralize(CpNamingUtils.getNameFromEdgeType(collectionName));
+
+ CollectionSettings collectionSettings =
+ collectionSettingsFactory.getInstance(new CollectionSettingsScopeImpl(applicationScope.getApplication(), pluralizedCollectionName));
+
+ Optional<Map<String, Object>> existingSettings =
+ collectionSettings.getCollectionSettings( pluralizedCollectionName );
+
+ if ( existingSettings.isPresent() ) {
+
+ Map jsonMapData = existingSettings.get();
+
+ jsonMapData.put( "lastCollectionClear", Instant.now().toEpochMilli() );
+
+ collectionSettings.putCollectionSettings(
+ pluralizedCollectionName, JsonUtils.mapToJsonString(jsonMapData ) );
+ }
+
+ allEntityIdsObservable.getEdgesToEntities( Observable.just(applicationScope),
+ fromNullable(collectionName), cursorSeek.getSeekValue() )
+ .buffer( indexProcessorFig.getCollectionDeleteBufferSize())
+ .doOnNext( edgeScopes -> {
+ logger.info("Sending batch of {} to be deleted.", edgeScopes.size());
+ indexService.deleteBatch(edgeScopes, endTimestamp, AsyncEventQueueType.DELETE);
+ count.addAndGet(edgeScopes.size() );
+ if( edgeScopes.size() > 0 ) {
+ writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
+ }
+ writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
+ .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+ .subscribeOn( Schedulers.io() ).subscribe();
+
+
+ return new CollectionDeleteStatus( jobId, Status.STARTED, 0, 0 );
+ }
+
+
+ @Override
+ public CollectionDeleteRequestBuilder getBuilder() {
+ return new CollectionDeleteRequestBuilderImpl();
+ }
+
+
+ @Override
+ public CollectionDeleteStatus getStatus( final String jobId ) {
+ Preconditions.checkNotNull( jobId, "jobId must not be null" );
+ return getCollectionDeleteResponse( jobId );
+ }
+
+
+ /**
+ * Get the resume edge scope
+ *
+ * @param edgeScope The optional edge scope from the cursor
+ */
+ private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) {
+
+
+ if ( edgeScope.isPresent() ) {
+ return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) );
+ }
+
+ return new CursorSeek<>( Optional.absent() );
+ }
+
+
+ /**
+ * Swap our cursor for an optional edgescope
+ */
+ private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) {
+
+ if ( !cursor.isPresent() ) {
+ return Optional.absent();
+ }
+
+ //get our cursor
+ final String persistedCursor = mapManager.getString( cursor.get() );
+
+ if ( persistedCursor == null ) {
+ return Optional.absent();
+ }
+
+ final JsonNode node = CursorSerializerUtil.fromString( persistedCursor );
+
+ final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() );
+
+ return Optional.of( edgeScope );
+ }
+
+
+ /**
+ * Write the cursor state to the map in cassandra
+ */
+ private void writeCursorState( final String jobId, final EdgeScope edge ) {
+
+ final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge );
+
+ final String serializedState = CursorSerializerUtil.asString( node );
+
+ mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, CURSOR_TTL);
+ }
+
+
+ /**
+ * Write our state meta data into cassandra so everyone can see it
+ * @param jobId
+ * @param status
+ * @param processedCount
+ * @param lastUpdated
+ */
+ private void writeStateMeta( final String jobId, final Status status, final long processedCount,
+ final long lastUpdated ) {
+
+ if(logger.isDebugEnabled()) {
+ logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+ jobId, status, processedCount, lastUpdated);
+ }
+
+ mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
+ mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+ mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+ }
+
+
+ /**
+ * Get the index response from the jobId
+ * @param jobId
+ * @return
+ */
+ private CollectionDeleteStatus getCollectionDeleteResponse( final String jobId ) {
+
+ final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
+
+ if(stringStatus == null){
+ return new CollectionDeleteStatus( jobId, Status.UNKNOWN, 0, 0 );
+ }
+
+ final Status status = Status.valueOf( stringStatus );
+
+ final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
+ final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
+
+ return new CollectionDeleteStatus( jobId, status, processedCount, lastUpdated );
+ }
+}
+
+
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
index 921777a..76f0b4b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/CollectionSettingsImpl.java
@@ -20,7 +20,6 @@
import com.google.common.base.Optional;
import com.google.inject.Inject;
-import com.google.inject.Singleton;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.utils.JsonUtils;
import org.slf4j.Logger;
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7eecf04..948e106 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -40,10 +40,14 @@
String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
+ String ELASTICSEARCH_WORKER_COUNT_DELETE = "elasticsearch.worker_count_delete";
+
String ELASTICSEARCH_WORKER_COUNT_DEADLETTER = "elasticsearch.worker_count_deadletter";
String ELASTICSEARCH_WORKER_COUNT_UTILITY_DEADLETTER = "elasticsearch.worker_count_utility_deadletter";
+ String ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER = "elasticsearch.worker_count_delete_deadletter";
+
String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -52,6 +56,8 @@
String REINDEX_BUFFER_SIZE = "elasticsearch.reindex.buffer_size";
+ String COLLECTION_DELETE_BUFFER_SIZE = "elasticsearch.collection_delete.buffer_size";
+
String REINDEX_CONCURRENCY_FACTOR = "elasticsearch.reindex.concurrency.factor";
@@ -105,6 +111,13 @@
int getWorkerCountUtility();
/**
+ * The number of worker threads used to read delete requests from the queue.
+ */
+ @Default("1")
+ @Key(ELASTICSEARCH_WORKER_COUNT_DELETE)
+ int getWorkerCountDelete();
+
+ /**
* The number of worker threads used to read dead messages from the index dead letter queue and reload them into the index queue.
*/
@Default("1")
@@ -119,6 +132,13 @@
int getWorkerCountUtilityDeadLetter();
/**
+ * The number of worker threads used to read dead messages from the delete dead letter queue and reload them into the delete queue.
+ */
+ @Default("1")
+ @Key(ELASTICSEARCH_WORKER_COUNT_DELETE_DEADLETTER)
+ int getWorkerCountDeleteDeadLetter();
+
+ /**
* Set the implementation to use for queuing.
* Valid values: TEST, LOCAL, SQS, SNS
* NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
@@ -139,6 +159,13 @@
int getReindexConcurrencyFactor();
/**
+ * Number of parallel buffers during collection delete
+ */
+ @Default("500")
+ @Key(COLLECTION_DELETE_BUFFER_SIZE)
+ int getCollectionDeleteBufferSize();
+
+ /**
* Flag to resolve the LOCAL queue implementation service synchronously.
*/
@Default("false")
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index b989a9c..58d470a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -69,18 +69,17 @@
*/
Observable<IndexOperationMessage> deleteIndexEdge(final ApplicationScope applicationScope, final Edge edge);
-
/**
- * De-index all documents with the specified entityId and versions provided. This will also remove any documents
- * where the entity is a source/target node ( index docs where this entityId is a part of connections).
- *
+ * Delete an index edge from the specified scope for a specific entity version
* @param applicationScope
+ * @param edge
* @param entityId
- * @param markedVersion
+ * @param entityVersion
* @return
*/
- Observable<IndexOperationMessage> deIndexEntity(final ApplicationScope applicationScope, final Id entityId,
- final UUID markedVersion, final List<UUID> allVersionsBeforeMarked);
+ Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
+ final Id entityId, final UUID entityVersion);
+
/**
@@ -88,10 +87,9 @@
*
* @param applicationScope
* @param entityId
- * @param markedVersion
* @return
*/
Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope, final Id entityId,
- final List<UUID> versions, UUID markedVersion);
+ final List<UUID> versions);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 1b8614f..32470f6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -29,9 +29,7 @@
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.*;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
import org.apache.usergrid.persistence.index.*;
@@ -103,7 +101,7 @@
//we always index in the target scope
- final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
+ final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
//we may have to index we're indexing from source->target here
final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
@@ -216,10 +214,8 @@
return Optional.of(defaultProperties);
}
- //Steps to delete an IndexEdge.
- //1.Take the search edge given and search for all the edges in elasticsearch matching that search edge
- //2. Batch Delete all of those edges returned in the previous search.
- //TODO: optimize loops further.
+ // DO NOT USE THIS AS THE QUERY TO ES CAN CAUSE EXTREME LOAD
+ // TODO REMOVE THIS AND UPDATE THE TESTS TO NOT USE THIS METHOD
@Override
public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope applicationScope,
final Edge edge ) {
@@ -256,49 +252,31 @@
return ObservableTimer.time( batches, addTimer );
}
-
@Override
- public Observable<IndexOperationMessage> deIndexEntity( final ApplicationScope applicationScope, final Id entityId,
- final UUID markedVersion,
- final List<UUID> allVersionsBeforeMarked ) {
+ public Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
+ final Id entityId, final UUID entityVersion){
- final EntityIndex ei = entityIndexFactory.
- createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
-
- // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
- // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
- final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
- CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
- Long.MAX_VALUE ) );
-
-
-
- final EntityIndexBatch batch = ei.createBatch();
-
- // de-index each version of the entity before the marked version
- allVersionsBeforeMarked.forEach(version -> batch.deindex(searchEdgeFromSource, entityId, version));
-
-
- // for now, query the index to remove docs where the entity is source/target node and older than markedVersion
- // TODO: investigate getting this information from graph
- CandidateResults candidateResults = ei.getNodeDocsOlderThanMarked(entityId, markedVersion );
- candidateResults.forEach(candidateResult -> batch.deindex(candidateResult));
-
- return Observable.just(batch.build());
+ if (logger.isTraceEnabled()) {
+ logger.trace("deIndexEdge edge={} entityId={} entityVersion={}", edge.toString(), entityId.toString(), entityVersion.toString());
+ }
+ final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
+ final EntityIndexBatch entityBatch = ei.createBatch();
+ entityBatch.deindex(generateScopeFromSource( edge ), entityId, entityVersion);
+ return Observable.just(entityBatch.build());
}
+
@Override
public Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope,
final Id entityId,
- final List<UUID> versions,
- UUID markedVersion) {
+ final List<UUID> versions) {
final EntityIndex ei = entityIndexFactory.
createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
- // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
- // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
+ // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code for de-indexing
+ // previously .timsetamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
Long.MAX_VALUE ) );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 2b3573e..d6bdd93 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -42,7 +43,7 @@
* Index a batch list of entities. Goes to the utility queue.
* @param edges
* @param updatedSince
- * @param forUtilityQueue
+ * @param queueType
*/
- void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
+ void indexBatch(final List<EdgeScope> edges, final long updatedSince, AsyncEventQueueType queueType);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..05602fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@
.buffer( indexProcessorFig.getReindexBufferSize())
.doOnNext( edgeScopes -> {
logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
- indexService.indexBatch(edgeScopes, modifiedSince, true);
+ indexService.indexBatch(edgeScopes, modifiedSince, AsyncEventQueueType.UTILITY);
count.addAndGet(edgeScopes.size() );
if( edgeScopes.size() > 0 ) {
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
@@ -351,7 +352,7 @@
final Status status = Status.valueOf( stringStatus );
final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
- final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
+ final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY );
return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 83f4c8b..b1b7f75 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -121,12 +122,8 @@
if (isDeleted) {
logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
- final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-
- indexMessageObservable
- .compose(applyCollector())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
- .subscribe();
+ final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -135,18 +132,8 @@
final Id sourceNodeId = markedEdge.getSourceNode();
logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
- final EventBuilderImpl.EntityDeleteResults
- entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-
- entityDeleteResults.getIndexObservable()
- .compose(applyCollector())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
- .subscribe();
-
- Observable.merge(entityDeleteResults.getEntitiesDeleted(),
- entityDeleteResults.getCompactedNode())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
- subscribe();
+ final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -155,19 +142,8 @@
final Id targetNodeId = markedEdge.getTargetNode();
logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
- final EventBuilderImpl.EntityDeleteResults
- entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-
- entityDeleteResults.getIndexObservable()
- .compose(applyCollector())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
- .subscribe();
-
- Observable.merge(entityDeleteResults.getEntitiesDeleted(),
- entityDeleteResults.getCompactedNode())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
- subscribe();
-
+ final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -252,13 +228,13 @@
}
}
- private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+ private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
return observable -> observable
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
- asyncEventService.queueIndexOperationMessage(indexOperation, false);
+ asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
});
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index 1afb524..c75545e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventQueueType;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
@@ -121,12 +122,8 @@
if (isDeleted) {
logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
- final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-
- indexMessageObservable
- .compose(applyCollector())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
- .subscribe();
+ final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -135,18 +132,8 @@
final Id sourceNodeId = markedEdge.getSourceNode();
logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
- final EventBuilderImpl.EntityDeleteResults
- entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-
- entityDeleteResults.getIndexObservable()
- .compose(applyCollector())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
- .subscribe();
-
- Observable.merge(entityDeleteResults.getEntitiesDeleted(),
- entityDeleteResults.getCompactedNode())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
- subscribe();
+ final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -155,18 +142,8 @@
final Id targetNodeId = markedEdge.getTargetNode();
logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
- final EventBuilderImpl.EntityDeleteResults
- entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-
- entityDeleteResults.getIndexObservable()
- .compose(applyCollector())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
- .subscribe();
-
- Observable.merge(entityDeleteResults.getEntitiesDeleted(),
- entityDeleteResults.getCompactedNode())
- .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
- subscribe();
+ final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+ asyncEventService.queueIndexOperationMessage(indexOperationMessage, AsyncEventQueueType.DELETE);
}
@@ -245,13 +222,13 @@
}
}
- private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector() {
+ private Observable.Transformer<IndexOperationMessage, IndexOperationMessage> applyCollector(AsyncEventQueueType queueType) {
return observable -> observable
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
- asyncEventService.queueIndexOperationMessage(indexOperation, false);
+ asyncEventService.queueIndexOperationMessage(indexOperation, queueType);
});
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index ae4623d..c9752c3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -743,7 +743,7 @@
/**
* Add a new index to the application for scale
- * @param suffix unique indentifier for additional index
+ * @param newIndexName unique identifier for additional index
* @param shards number of shards
* @param replicas number of replicas
* @param writeConsistency only "one, quorum, or all"
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
index 900bda5..4c9d16c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Query.java
@@ -31,6 +31,8 @@
import org.apache.usergrid.persistence.index.utils.ClassUtils;
import org.apache.usergrid.persistence.index.utils.ListUtils;
import org.apache.usergrid.persistence.index.utils.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
@@ -43,6 +45,7 @@
public class Query {
+ private static final Logger logger = LoggerFactory.getLogger(Query.class);
public enum Level {
IDS, REFS, CORE_PROPERTIES, ALL_PROPERTIES, LINKED_PROPERTIES
@@ -319,6 +322,13 @@
public static Query fromIdentifier( Object id ) {
+ if (id == null) {
+ throw new IllegalArgumentException("null identifier passed in");
+ }
+ Identifier objectIdentifier = Identifier.from(id);
+ if (objectIdentifier == null) {
+ throw new IllegalArgumentException("Supplied id results in null Identifier");
+ }
Query q = new Query();
q.addIdentifier( Identifier.from(id) );
return q;
@@ -409,6 +419,10 @@
}
for ( Identifier identifier : identifiers ) {
+ if (identifier == null) {
+ logger.error("containsUuidIdentifiersOnly(): identifier in identifiers list is null");
+ return false;
+ }
if ( !identifier.isUUID() ) {
return false;
}
@@ -635,6 +649,9 @@
if ( identifiers == null ) {
identifiers = new ArrayList<Identifier>();
}
+ if (identifier == null) {
+ throw new IllegalArgumentException("adding null identifier is not allowed");
+ }
identifiers.add( identifier );
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
index 790b4d9..0a4360f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Application.java
@@ -48,6 +48,32 @@
public static final String COLLECTION_ACTIVITIES = "activities";
+ public static final String COLLECTION_EVENTS = "events";
+
+ public static final String COLLECTION_FOLDERS = "folders";
+
+ public static final String COLLECTION_DEVICES = "devices";
+
+ public static final String COLLECTION_NOTIFICATIONS = "notifications";
+
+ public static final String COLLECTION_ROLES = "roles";
+
+ public static boolean isCustomCollectionName(String collectionName) {
+ switch (collectionName.toLowerCase()) {
+ case COLLECTION_USERS:
+ case COLLECTION_GROUPS:
+ case COLLECTION_ASSETS:
+ case COLLECTION_ACTIVITIES:
+ case COLLECTION_EVENTS:
+ case COLLECTION_FOLDERS:
+ case COLLECTION_DEVICES:
+ case COLLECTION_NOTIFICATIONS:
+ case COLLECTION_ROLES:
+ return false;
+ }
+ return true;
+ }
+
@EntityProperty(indexed = true, fulltextIndexed = false, required = true, mutable = false, aliasProperty = true,
basic = true)
protected String name;
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 3bfe460..68834b3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -92,7 +92,7 @@
final GraphManager gm = managerCache.getGraphManager( scope );
- edgesToTargetObservable.edgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+ edgesToTargetObservable.edgesToTarget( gm, target, true).doOnNext(new Action1<Edge>() {
@Override
public void call( final Edge edge ) {
final String edgeType = edge.getType();
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9e84219..55b77c0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -89,7 +89,7 @@
final GraphManager gm = managerCache.getGraphManager( scope );
- edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> {
+ edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId, true).doOnNext(edge -> {
final String edgeType = edge.getType();
final Id target = edge.getTargetNode();
@@ -118,7 +118,7 @@
//test connections
- edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> {
+ edgesFromSourceObservable.edgesFromSourceDescending( gm, source, true).doOnNext(edge -> {
final String edgeType = edge.getType();
final Id target = edge.getTargetNode();
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
new file mode 100644
index 0000000..ddf2c68
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionDeleteTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Injector;
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+
+@NotThreadSafe
+public class CollectionDeleteTest extends AbstractCoreIT {
+ private static final Logger logger = LoggerFactory.getLogger( CollectionDeleteTest.class );
+
+ private static final MetricRegistry registry = new MetricRegistry();
+
+
+ private static final int ENTITIES_TO_DELETE = 1000;
+ private static final int ENTITIES_TO_ADD_AFTER_TIME = 3;
+
+
+ @Before
+ public void startReporting() {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting metrics reporting");
+ }
+ }
+
+
+ @After
+ public void printReport() {
+ logger.debug( "Printing metrics report" );
+ }
+
+
+ @Test( timeout = 240000 )
+ public void clearOneCollection() throws Exception {
+
+ logger.info( "Started clearOneCollection()" );
+
+ String rand = RandomStringUtils.randomAlphanumeric( 5 );
+ final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
+
+ final EntityManager em = setup.getEmf().getEntityManager( appId );
+
+ final CollectionDeleteService collectionDeleteService = setup.getInjector().getInstance( CollectionDeleteService.class );
+
+ // ----------------- create a bunch of entities
+
+ Map<String, Object> entityMap = new HashMap<String, Object>() {{
+ put( "key1", 1000 );
+ put( "key2", 2000 );
+ put( "key3", "Some value" );
+ }};
+
+ String collectionName = "items";
+ String itemType = "item";
+
+
+ List<EntityRef> entityRefs = new ArrayList<EntityRef>();
+ for ( int i = 0; i < ENTITIES_TO_DELETE; i++ ) {
+
+ final Entity entity;
+
+ try {
+ entityMap.put( "key", i );
+ entity = em.create(itemType, entityMap);
+ }
+ catch ( Exception ex ) {
+ throw new RuntimeException( "Error creating entity", ex );
+ }
+
+ entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ if ( i % 10 == 0 ) {
+ logger.info( "Created {} entities", i );
+ }
+ }
+
+ logger.info("Created {} entities", ENTITIES_TO_DELETE);
+ long timeFirstPutDone = System.currentTimeMillis();
+ logger.info("timeFirstPutDone={}", timeFirstPutDone);
+
+ for (int i = 0; i < ENTITIES_TO_ADD_AFTER_TIME; i++) {
+
+ final Entity entity;
+
+ try {
+ entityMap.put( "key", ENTITIES_TO_DELETE + i );
+ entity = em.create(itemType, entityMap);
+ }
+ catch ( Exception ex ) {
+ throw new RuntimeException( "Error creating entity", ex );
+ }
+
+ entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ if ( i % 10 == 0 ) {
+ logger.info( "Created {} entities after delete time", i );
+ }
+
+ }
+ logger.info("Created {} entities after delete time", ENTITIES_TO_ADD_AFTER_TIME);
+
+
+ app.waitForQueueDrainAndRefreshIndex(5000);
+
+ final CollectionDeleteRequestBuilder builder =
+ collectionDeleteService.getBuilder()
+ .withApplicationId( em.getApplicationId() )
+ .withCollection(collectionName)
+ .withEndTimestamp(timeFirstPutDone);
+
+ CollectionDeleteService.CollectionDeleteStatus status = collectionDeleteService.deleteCollection(builder);
+
+ assertNotNull( status.getJobId(), "JobId is present" );
+
+ logger.info( "Delete collection" );
+
+
+ waitForDelete( status, collectionDeleteService );
+
+ app.waitForQueueDrainAndRefreshIndex(15000);
+
+ // ----------------- test that we can read the entries after the timestamp
+
+ readData( em, collectionName,ENTITIES_TO_ADD_AFTER_TIME);
+ }
+
+ /**
+ * Wait for the delete to occur
+ */
+ private void waitForDelete( final CollectionDeleteService.CollectionDeleteStatus status, final CollectionDeleteService collectionDeleteService )
+ throws InterruptedException, IllegalArgumentException {
+ if (status != null) {
+ logger.info("waitForDelete: jobID={}", status.getJobId());
+ } else {
+ logger.info("waitForDelete: error, status = null");
+ throw new IllegalArgumentException("collectionDeleteStatus = null");
+ }
+ while ( true ) {
+
+ try {
+ final CollectionDeleteService.CollectionDeleteStatus updatedStatus =
+ collectionDeleteService.getStatus( status.getJobId() );
+
+ if (updatedStatus == null) {
+ logger.info("waitForDelete: updated status is null");
+ } else {
+ logger.info("waitForDelete: status={} numberProcessed={}",
+ updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed());
+
+ if ( updatedStatus.getStatus() == CollectionDeleteService.Status.COMPLETE ) {
+ break;
+ }
+ }
+ }
+ catch ( IllegalArgumentException iae ) {
+ //swallow. Thrown if our job can't be found. I.E hasn't updated yet
+ }
+
+
+ Thread.sleep( 1000 );
+ }
+ }
+
+
+ private int readData(EntityManager em, String collectionName, int expectedEntities)
+ throws Exception {
+
+ app.waitForQueueDrainAndRefreshIndex();
+
+ Results results = em.getCollection(em.getApplicationRef(), collectionName, null, expectedEntities,
+ Query.Level.ALL_PROPERTIES, false);
+
+ int count = 0;
+ while ( true ) {
+
+ if (results.getEntities().size() == 0) {
+ break;
+ }
+
+ UUID lastEntityUUID = null;
+ for ( Entity e : results.getEntities() ) {
+
+ assertEquals(2000, e.getProperty("key2"));
+
+ if (count % 100 == 0) {
+ logger.info("read {} entities", count);
+ }
+ lastEntityUUID = e.getUuid();
+ count++;
+ }
+
+ results = em.getCollection(em.getApplicationRef(), collectionName, lastEntityUUID, expectedEntities,
+ Query.Level.ALL_PROPERTIES, false);
+
+ }
+ logger.info("read {} total entities", count);
+
+ assertEquals( "Did not get expected entities", expectedEntities, count );
+ return count;
+ }
+
+ private int countEntities( EntityManager em, String collectionName, int expectedEntities)
+ throws Exception {
+
+ app.waitForQueueDrainAndRefreshIndex();
+
+ Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+ Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
+
+ int count = 0;
+ while ( true ) {
+
+ count += results.size();
+
+
+ if ( results.hasCursor() ) {
+ logger.info( "Counted {} : query again with cursor", count );
+ q.setCursor( results.getCursor() );
+ results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+ }
+ else {
+ break;
+ }
+ }
+
+ assertEquals( "Did not get expected entities", expectedEntities, count );
+ return count;
+ }
+
+
+}
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CoreSchemaManager.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CoreSchemaManager.java
index c064751..1d51c25 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CoreSchemaManager.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CoreSchemaManager.java
@@ -18,6 +18,7 @@
import org.apache.usergrid.locking.LockManager;
+import org.apache.usergrid.persistence.core.CassandraConfig;
import org.apache.usergrid.persistence.core.CassandraFig;
import org.apache.usergrid.persistence.core.datastax.CQLUtils;
import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
@@ -37,14 +38,14 @@
private static final Logger logger = LoggerFactory.getLogger( CoreSchemaManager.class );
private final Setup setup;
- private final CassandraFig cassandraFig;
+ private final CassandraConfig cassandraConfig;
private final LockManager lockManager;
private final DataStaxCluster dataStaxCluster;
public CoreSchemaManager( final Setup setup, Injector injector ) {
this.setup = setup;
- this.cassandraFig = injector.getInstance( CassandraFig.class );
+ this.cassandraConfig = injector.getInstance( CassandraConfig.class );
this.lockManager = injector.getInstance( LockManager.class );
this.dataStaxCluster = injector.getInstance( DataStaxCluster.class );
}
@@ -80,16 +81,21 @@
@Override
public void destroy() {
- logger.info( "dropping keyspaces" );
try {
+ logger.info( "dropping application keyspace" );
dataStaxCluster.getClusterSession()
- .execute("DROP KEYSPACE "+ CQLUtils.quote(cassandraFig.getApplicationKeyspace()));
+ .execute("DROP KEYSPACE "+ CQLUtils.quote(cassandraConfig.getApplicationKeyspace()));
+ dataStaxCluster.waitForSchemaAgreement();
+
+ logger.info( "dropping application local keyspace" );
+ dataStaxCluster.getClusterSession()
+ .execute("DROP KEYSPACE "+ CQLUtils.quote(cassandraConfig.getApplicationLocalKeyspace()));
dataStaxCluster.waitForSchemaAgreement();
dataStaxCluster.getClusterSession().close(); // close session so it's meta will get refreshed
}
catch ( Exception e ) {
- logger.error("Error dropping application keyspace: {} error: {}", cassandraFig.getApplicationKeyspace(), e);
+ logger.error("Error dropping application keyspaces: {} error: {}", cassandraConfig.getApplicationKeyspace(), e);
}
logger.info( "keyspaces dropped" );
logger.info( "dropping indices" );
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
index a7759de..a1ff84b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -74,10 +74,10 @@
}
- @Test( timeout = 120000 )
+ @Test( timeout = 240000 )
public void rebuildOneCollectionIndex() throws Exception {
- logger.info( "Started rebuildIndex()" );
+ logger.info( "Started rebuildOneCollectionIndex()" );
String rand = RandomStringUtils.randomAlphanumeric( 5 );
final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
@@ -163,7 +163,7 @@
waitForRebuild( status, reIndexService );
- app.waitForQueueDrainAndRefreshIndex(5000);
+ app.waitForQueueDrainAndRefreshIndex(15000);
// ----------------- test that we can read the catherder collection and not the catshepard
@@ -172,7 +172,7 @@
}
- @Test( timeout = 120000 )
+ @Test( timeout = 240000 )
public void rebuildIndex() throws Exception {
logger.info( "Started rebuildIndex()" );
@@ -234,7 +234,7 @@
}
logger.info( "Created {} entities", ENTITIES_TO_INDEX );
- app.waitForQueueDrainAndRefreshIndex(15000);
+ app.waitForQueueDrainAndRefreshIndex(30000);
// ----------------- test that we can read them, should work fine
@@ -278,7 +278,7 @@
assertNotNull( status.getJobId(), "JobId is present" );
- logger.info( "Rebuilt index" );
+ logger.info( "Rebuilt index, jobID={}", status.getJobId());
waitForRebuild( status, reIndexService );
@@ -301,7 +301,7 @@
@Test( timeout = 120000 )
public void rebuildIndexGeo() throws Exception {
- logger.info( "Started rebuildIndex()" );
+ logger.info( "Started rebuildIndexGeo()" );
String rand = RandomStringUtils.randomAlphanumeric( 5 );
final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
@@ -414,7 +414,7 @@
@Test( timeout = 120000 )
public void rebuildUpdatedSince() throws Exception {
- logger.info( "Started rebuildIndex()" );
+ logger.info( "Started rebuildUpdatedSince()" );
String rand = RandomStringUtils.randomAlphanumeric( 5 );
final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
@@ -436,7 +436,7 @@
final Entity secondEntity = em.create( "thing", entityData);
- app.waitForQueueDrainAndRefreshIndex(5000);
+ app.waitForQueueDrainAndRefreshIndex(15000);
// ----------------- test that we can read them, should work fine
@@ -511,14 +511,26 @@
* Wait for the rebuild to occur
*/
private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
- throws InterruptedException {
+ throws InterruptedException, IllegalArgumentException {
+ if (status != null) {
+ logger.info("waitForRebuild: jobID={}", status.getJobId());
+ } else {
+ logger.info("waitForRebuild: error, status = null");
+ throw new IllegalArgumentException("reindexStatus = null");
+ }
while ( true ) {
try {
final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
- if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
- break;
+ if (updatedStatus == null) {
+ logger.info("waitForRebuild: updated status is null");
+ } else {
+ logger.info("waitForRebuild: status={} numberProcessed={}", updatedStatus.getStatus().toString(), updatedStatus.getNumberProcessed());
+
+ if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
+ break;
+ }
}
}
catch ( IllegalArgumentException iae ) {
diff --git a/stack/core/src/test/resources/project.properties b/stack/core/src/test/resources/project.properties
index 1a848bc..77a785a 100644
--- a/stack/core/src/test/resources/project.properties
+++ b/stack/core/src/test/resources/project.properties
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index 8f9058d..24f3046 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -16,6 +16,7 @@
# with ug.heapmax=5000m and ug.heapmin=3000m (set in Maven settings.xml)
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=25000
#Not a good number for real systems. Write shards should be 2x cluster size from our tests
@@ -28,7 +29,7 @@
elasticsearch.managment_index=usergrid_core_management
#cassandra.keyspace.application=core_tests_schema
elasticsearch.queue_impl.resolution=true
-elasticsearch.queue_impl=DISTRIBUTED
+elasticsearch.queue_impl=LOCAL
elasticsearch.buffer_timeout=1
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index bfa3abe..ea9ada8 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -377,6 +377,7 @@
Iterator<Class> messageTypes = routerProducer.getMessageTypes().iterator();
while ( messageTypes.hasNext() ) {
Class messageType = messageTypes.next();
+ logger.info("createClusterSystem: routerProducer {}: message type={}", routerProducer.getRouterPath(), messageType.getName());
routersByMessageType.put( messageType, routerProducer.getRouterPath() );
}
}
@@ -467,7 +468,7 @@
if (started) {
logger.info( "ClientActor [{}] has started", ra.path() );
} else {
- throw new RuntimeException( "ClientActor ["+ra.path()+"] did not start in time" );
+ throw new RuntimeException( "ClientActor ["+ra.path()+"] did not start in time, validate that akka seeds are configured properly" );
}
}
diff --git a/stack/corepersistence/collection/src/test/resources/dynamic-test.properties b/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
index ca1a51c..59b8d31 100644
--- a/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
+++ b/stack/corepersistence/collection/src/test/resources/dynamic-test.properties
@@ -2,6 +2,7 @@
# safe dynamic property defaults for our testing via IDE or Maven
cassandra.connections=30
cassandra.timeout.pool=30000
+cassandra.timeout=30000
cassandra.port=9160
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
index c82bf83..9e95663 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-CHOP.properties
@@ -20,6 +20,7 @@
cassandra.connections=30
cassandra.timeout.pool=30000
+cassandra.timeout=30000
cassandra.port=9160
# a comma delimited private IP address list to your chop cassandra cluster
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
index 82bea14..0253e6e 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
@@ -21,6 +21,7 @@
# purposely setting connections lower to slow down the activity on the cassandra server locally
cassandra.connections=5
cassandra.timeout.pool=60000
+cassandra.timeout=60000
cassandra.port=9160
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid.properties b/stack/corepersistence/collection/src/test/resources/usergrid.properties
index e85b1ac..b7a6c11 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties
@@ -33,6 +33,7 @@
cassandra.connections=15
cassandra.timeout.pool=240000
+cassandra.timeout=240000
queue.num.actors=5
queue.sender.num.actors=5
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 000c633..b746c61 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -93,7 +93,7 @@
* @param node The node to remove. This will apply a timestamp to apply the delete + compact operation. Any edges connected to this node with a timestamp
* <= the specified time on the mark will be removed from the graph
*/
- Observable<Id> compactNode( final Id node );
+ Observable<MarkedEdge> compactNode( final Id node );
/**
* Get all versions of this edge where versions <= max version
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 5fcdcb4..2fe40b1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -262,24 +262,17 @@
@Override
- public Observable<Id> compactNode( final Id inputNode ) {
-
+ public Observable<MarkedEdge> compactNode( final Id inputNode ) {
final UUID startTime = UUIDGenerator.newTimeUUID();
-
- final Observable<Id> nodeObservable =
- Observable.just( inputNode ).map( node -> nodeSerialization.getMaxVersion( scope, node ) ).takeWhile(
- maxTimestamp -> maxTimestamp.isPresent() )
-
+ final Observable<MarkedEdge> nodeObservable =
+ Observable.just( inputNode )
+ .map( node -> nodeSerialization.getMaxVersion( scope, node ) )
+ //.doOnNext(maxTimestamp -> logger.info("compactNode maxTimestamp={}", maxTimestamp.toString()))
+ .takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
//map our delete listener
- .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) )
- //set to 0 if nothing is emitted
- .lastOrDefault( 0 )
- //log for posterity
- .doOnNext( count -> logger.trace( "Removed {} edges from node {}", count, inputNode ) )
- //return our id
- .map( count -> inputNode );
+ .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
}
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 9392dbc..71d2f1d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -169,6 +169,15 @@
return true;
}
+ @Override
+ public String toString(){
+ return "SimpleSearchByEdgeType{node="+node
+ +", type="+type
+ +", maxTimestamp="+maxTimestamp
+ +", order="+order
+ +", filterMarked="+filterMarked
+ +", last="+last+"}";
+ }
@Override
public int hashCode() {
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
index 68569e5..3bcdc55 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
@@ -22,6 +22,8 @@
import java.util.UUID;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -39,8 +41,8 @@
* @param node The node that was deleted
* @param timestamp The timestamp of the event
*
- * @return An observable that emits the total number of edges that have been removed with this node both as the
+ * @return An observable that emits the marked edges that have been removed with this node both as the
* target and source
*/
- Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp );
+ Observable<MarkedEdge> receive(final ApplicationScope scope, final Id node, final UUID timestamp );
}
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index cd5b1a8..df4e5d5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -95,53 +95,37 @@
* @param node The node that was deleted
* @param timestamp The timestamp of the event
*
- * @return An observable that emits the total number of edges that have been removed with this node both as the
+ * @return An observable that emits the marked edges that have been removed with this node both as the
* target and source
*/
- public Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
+ public Observable<MarkedEdge> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
return Observable.just( node )
//delete source and targets in parallel and merge them into a single observable
- .flatMap( new Func1<Id, Observable<Integer>>() {
- @Override
- public Observable<Integer> call( final Id node ) {
+ .flatMap( id -> {
- final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
+ final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
- if (logger.isTraceEnabled()) {
- logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
- }
-
-
- if ( !maxVersion.isPresent() ) {
- return Observable.empty();
- }
-
-
- //do all the delete, then when done, delete the node
- return doDeletes( node, scope, maxVersion.get(), timestamp ).count()
- //if nothing is ever emitted, emit 0 so that we know no operations took place.
- // Finally remove
- // the
- // target node in the mark
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- try {
- nodeSerialization.delete( scope, node, maxVersion.get()).execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to casandra", e );
- }
- }
- } );
+ if (logger.isTraceEnabled()) {
+ logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
}
- } ).defaultIfEmpty( 0 );
+ if ( !maxVersion.isPresent() ) {
+ return Observable.empty();
+ }
+
+ // do all the edge deletes and then remove the marked node, return all edges just deleted
+ return
+ doDeletes( node, scope, maxVersion.get(), timestamp ).doOnCompleted( () -> {
+ try {
+ nodeSerialization.delete( scope, node, maxVersion.get()).execute();
+ } catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to cassandra", e );
+ }
+ });
+ });
}
-
-
/**
* Do the deletes
*/
@@ -162,7 +146,7 @@
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageSerialization.getEdgesToTarget(scope,
- new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+ new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
}
}));
@@ -174,7 +158,7 @@
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageSerialization.getEdgesFromSource(scope,
- new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+ new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
}
}));
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 78a1d4b..5577bd0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -36,9 +36,10 @@
* Return an observable of all edges from a source
* @param gm
* @param sourceNode
+ * @param filterMarked
* @return
*/
- Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode );
+ Observable<Edge> edgesFromSourceDescending(final GraphManager gm, final Id sourceNode, boolean filterMarked);
/**
@@ -54,9 +55,10 @@
* Return an observable of all edges to a target
* @param gm
* @param targetNode
+ * @param filterMarked
* @return
*/
- Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode);
+ Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode, boolean filterMarked);
/**
* Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
index 1f81864..e9e2b28 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
@@ -400,7 +400,6 @@
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateSearchEdgeType( search );
-
final Id applicationId = scope.getApplication();
final Id searchNode = search.getNode();
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 20efe42..9e0998d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -35,7 +35,6 @@
import com.google.common.base.Optional;
import rx.Observable;
-import rx.functions.Func1;
/**
@@ -55,7 +54,7 @@
* Get all edges from the source
*/
@Override
- public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) {
+ public Observable<Edge> edgesFromSourceDescending(final GraphManager gm, final Id sourceNode, boolean filterMarked) {
final Observable<String> edgeTypes =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
@@ -67,7 +66,7 @@
return gm.loadEdgesFromSource(
new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- Optional.<Edge>absent() ) );
+ Optional.<Edge>absent(), filterMarked ) );
} );
}
@@ -119,19 +118,16 @@
* Get all edges from the source
*/
@Override
- public Observable<Edge> edgesToTarget( final GraphManager gm, final Id targetNode ) {
- final Observable<String> edgeTypes =
- gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
+ public Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode, boolean filterMarked) {
- return edgeTypes.flatMap( edgeType -> {
-
- if (logger.isTraceEnabled()) {
- logger.trace("Loading edges of edgeType {} to {}", edgeType, targetNode);
- }
-
+ return gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) )
+ .flatMap( edgeType -> {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Loading edges of edgeType {} to {}", edgeType, targetNode);
+ }
return gm.loadEdgesToTarget(
new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- Optional.<Edge>absent() ) );
+ Optional.<Edge>absent(), filterMarked ) );
} );
}
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 6a08d46..69dd43b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -21,7 +21,6 @@
import com.google.inject.Inject;
-import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
@@ -29,7 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
-import rx.functions.Func1;
/**
* Emits the id of all nodes that are target nodes from the given source node
@@ -55,7 +53,7 @@
public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
//only search edge types that start with collections
- return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( edge -> {
+ return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode, true).map(edge -> {
final Id targetNode = edge.getTargetNode();
if (logger.isDebugEnabled()) {
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 8eccdbd..1d4331c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -89,7 +89,7 @@
final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
//get edges from the source
- return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 )
+ return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode, true).buffer( 1000 )
.doOnNext( edges -> {
final MutationBatch batch = keyspace.prepareMutationBatch();
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 438a978..80198de 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -135,7 +135,7 @@
UUID eventTime = UUIDGenerator.newTimeUUID();
- int count = deleteListener.receive( scope, sourceNode, eventTime ).toBlocking().last();
+ int count = deleteListener.receive( scope, sourceNode, eventTime ).count().toBlocking().last();
assertEquals( "Mark was not set, no delete should be executed", 0, count );
@@ -171,7 +171,7 @@
nodeSerialization.mark( scope, sourceNode, timestamp ).execute();
- int count = deleteListener.receive( scope, sourceNode, deleteEventTimestamp ).toBlocking().last();
+ int count = deleteListener.receive( scope, sourceNode, deleteEventTimestamp ).count().toBlocking().last();
assertEquals( 1, count );
@@ -256,7 +256,7 @@
nodeSerialization.mark( scope, targetNode, deleteBefore ).execute();
- int count = deleteListener.receive( scope, targetNode, UUIDGenerator.newTimeUUID() ).toBlocking().last();
+ int count = deleteListener.receive( scope, targetNode, UUIDGenerator.newTimeUUID() ).count().toBlocking().last();
assertEquals( 1, count );
@@ -366,7 +366,7 @@
nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
- int count = deleteListener.receive( scope, toDelete, UUIDGenerator.newTimeUUID() ).toBlocking().last();
+ int count = deleteListener.receive( scope, toDelete, UUIDGenerator.newTimeUUID() ).count().toBlocking().last();
assertEquals( edgeCount, count );
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties b/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
index 4dd17e2..63da408 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-AWS.properties
@@ -1,6 +1,7 @@
# Keep nothing but overriding test defaults in here
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=20000
cassandra.port=9160
cassandra.hosts=
#cassandra.hosts=localhost
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
index ea4c3c7..9680d88 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-CHOP.properties
@@ -2,6 +2,7 @@
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=20000
cassandra.port=9160
# a comma delimited private IP address list to your chop cassandra cluster
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
index 96834e6..3cb5608 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
@@ -1,6 +1,7 @@
# Keep nothing but overriding test defaults in here
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=20000
cassandra.port=9160
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid.properties b/stack/corepersistence/graph/src/test/resources/usergrid.properties
index d744efe..f8b7a8d 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid.properties
@@ -3,3 +3,4 @@
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=20000
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 14020a9..b444199 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -20,14 +20,12 @@
package org.apache.usergrid.persistence.index;
-import com.google.common.base.Optional;
import org.apache.usergrid.persistence.core.CPManager;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
import java.util.Map;
-import java.util.UUID;
/**
@@ -36,7 +34,7 @@
public interface EntityIndex extends CPManager {
- public static final int MAX_LIMIT = 1000;
+ int MAX_LIMIT = 1000;
/**
* Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
@@ -134,14 +132,6 @@
CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId);
/**
- * Returns all entity docs that match the entityId being the nodeId ( aka connections where entityId = sourceNode)
- *
- * @param entityId The entityId to match when searching
- * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
- * @return
- */
- CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion);
- /**
* delete all application records
*
* @return
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index a35921c..f4fae2b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -34,7 +34,6 @@
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.core.util.StringUtils;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
import org.apache.usergrid.persistence.index.exceptions.IndexException;
@@ -502,7 +501,7 @@
searchResponse = srb.execute().actionGet();
}
catch ( Throwable t ) {
- logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );
+ logger.error( "Unable to communicate with Elasticsearch: {}", t.getMessage() );
failureMonitor.fail( "Unable to execute batch", t );
throw t;
}
@@ -584,68 +583,6 @@
}
- @Override
- public CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion ) {
-
- // TODO: investigate if functionality via iterator so a caller can page the deletion until all is gone
-
- Preconditions.checkNotNull( entityId, "entityId cannot be null" );
- Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null");
- ValidationUtils.verifyVersion(markedVersion);
-
- SearchResponse searchResponse;
- List<CandidateResult> candidates = new ArrayList<>();
-
- final long markedTimestamp = markedVersion.timestamp();
-
- // never let this fetch more than 100 to save memory
- final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());
-
- // this query will find all the documents where this entity is a source/target node
- final QueryBuilder nodeQuery = QueryBuilders
- .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(entityId));
-
- final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder()
- .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC);
-
- try {
-
- long queryTimestamp = 0L;
-
- QueryBuilder timestampQuery = QueryBuilders
- .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
- .gte(queryTimestamp)
- .lt(markedTimestamp);
-
- QueryBuilder finalQuery = QueryBuilders.constantScoreQuery(
- QueryBuilders
- .boolQuery()
- .must(timestampQuery)
- .must(nodeQuery)
- );
-
-
- searchResponse = srb
- .setQuery(finalQuery)
- .setSize(searchLimit)
- .execute()
- .actionGet();
-
-
- candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
-
- }
- catch ( Throwable t ) {
- logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );
- failureMonitor.fail( "Unable to execute batch", t );
- throw t;
- }
- failureMonitor.success();
-
- return new CandidateResults( candidates, Collections.EMPTY_SET);
- }
-
-
/**
* Completely delete an index.
*/
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
index 84a28f0..70d2284 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Identifier.java
@@ -34,6 +34,7 @@
public static final String UUID_REX =
"[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}";
public static final String EMAIL_REX = "[a-zA-Z0-9._%'+\\-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}";
+ public static final String NAME_REX = "[a-zA-Z0-9_\\-./'+ ]*";
public enum Type {
UUID, NAME, EMAIL
@@ -46,7 +47,7 @@
static Pattern emailRegEx = Pattern.compile( EMAIL_REX );
// "Pattern nameRegEx" below used to be [a-zA-Z0-9_\\-./], changed it to contain a 'space' to a
// ddress https://issues.apache.org/jira/browse/USERGRID-94
- static Pattern nameRegEx = Pattern.compile( "[a-zA-Z0-9_\\-./'+ ]*" );
+ static Pattern nameRegEx = Pattern.compile( NAME_REX );
private Identifier( Type type, Object value ) {
diff --git a/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties b/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
index 2589a7f..aaa4653 100644
--- a/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/dynamic-test.properties
@@ -2,6 +2,7 @@
# safe dynamic property defaults for our testing via IDE or Maven
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=20000
cassandra.port=9160
cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
diff --git a/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties b/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
index 677df96..855a8b5 100644
--- a/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/usergrid-CHOP.properties
@@ -1,6 +1,7 @@
# These are for CHOP environment settings
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=20000
cassandra.port=9160
# a comma delimited private IP address list to your chop cassandra cluster
diff --git a/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
index 67a9ab1..c0166e7 100644
--- a/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/queryindex/src/test/resources/usergrid-UNIT.properties
@@ -6,6 +6,7 @@
cassandra.cluster_name=Usergrid
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=20000
collections.keyspace=Usergrid_Collections
collections.keyspace.strategy.options=replication_factor:1
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
index 153ed4b..005ce0c 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -93,6 +93,12 @@
<groupId>org.apache.usergrid</groupId>
<artifactId>common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>cassandra-all</artifactId>
+ <groupId>org.apache.cassandra</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
@@ -152,18 +158,6 @@
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.usergrid</groupId>
- <artifactId>common</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>cassandra-all</artifactId>
- <groupId>org.apache.cassandra</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
<!--
<dependency>
<groupId>com.datastax.cassandra</groupId>
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
index d3a46aa..2ed37e0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
@@ -163,7 +163,7 @@
@Override
public Collection<TableDefinition> getTables() {
return Collections.singletonList(
- new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), "queues", CQL ) );
+ new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_QUEUES, CQL ) );
}
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index a485f55..a6e2451 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -43,7 +43,7 @@
public QueueModule( String queueManagerType ) {
logger.info("QueueManagerType={}", queueManagerType);
- if ( "DISTRIBUTED_SNS".equals( queueManagerType ) ) {
+ if ( "DISTRIBUTED_SNS".equals( queueManagerType ) || "SNS".equals(queueManagerType)) {
this.implementation = LegacyQueueManager.Implementation.DISTRIBUTED_SNS;
}
else if ( "DISTRIBUTED".equals( queueManagerType ) ) {
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
index e220650..b2cebaa 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/KeyspaceDropper.java
@@ -34,7 +34,7 @@
*/
public class KeyspaceDropper {
- private static final Logger logger = LoggerFactory.getLogger( AbstractTest.class );
+ private static final Logger logger = LoggerFactory.getLogger( KeyspaceDropper.class );
static { dropTestKeyspaces(); }
@@ -57,6 +57,10 @@
dropTestKeyspace( keyspaceApp, hosts, port );
dropTestKeyspace( keyspaceQueue, hosts, port );
+
+ // drop local test keyspaces
+ dropTestKeyspace(keyspaceApp + "_", hosts, port);
+ dropTestKeyspace(keyspaceQueue + "_", hosts, port);
}
public static void dropTestKeyspace( String keyspace, String[] hosts, int port ) {
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index d4ed7ef..8da2180 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -340,7 +340,7 @@
if (qmm.getQueueDepth( queueName, available ) == numMessages) {
break;
}
- Thread.sleep( 500 );
+ Thread.sleep( 1000 );
}
Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName, available ) );
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index d77e7e8..2a1a83e 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -67,3 +67,4 @@
#cassandra.connections=30
cassandra.timeout.pool=20000
+cassandra.timeout=20000
diff --git a/stack/pom.xml b/stack/pom.xml
index 513c894..c98c72d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -661,9 +661,9 @@
</dependency>
<dependency>
- <groupId>com.github.stephenc</groupId>
+ <groupId>com.github.jbellis</groupId>
<artifactId>jamm</artifactId>
- <version>0.2.5</version>
+ <version>0.3.1</version>
</dependency>
<!-- Third Party Non-Commercial Dependencies -->
@@ -1311,7 +1311,7 @@
<useSystemClassLoader>false</useSystemClassLoader>
<testFailureIgnore>false</testFailureIgnore>
<argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin}
- -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+ -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
${ug.argline}
</argLine>
<systemPropertyVariables>
@@ -1548,7 +1548,7 @@
<version>${surefire.plugin.version}</version>
<configuration>
<argLine>
- -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+ -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
${ug.argline}
-javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec
</argLine>
diff --git a/stack/query-validator/src/test/resources/usergrid-custom-test.properties b/stack/query-validator/src/test/resources/usergrid-custom-test.properties
index bc1ba56..c8e3eee 100644
--- a/stack/query-validator/src/test/resources/usergrid-custom-test.properties
+++ b/stack/query-validator/src/test/resources/usergrid-custom-test.properties
@@ -30,3 +30,5 @@
# This property is required to be set and cannot be defaulted anywhere
usergrid.cluster_name=usergrid
+
+elasticsearch.queue_impl=LOCAL
diff --git a/stack/rest/pom.xml b/stack/rest/pom.xml
index 9bb83a6..e7aa68a 100644
--- a/stack/rest/pom.xml
+++ b/stack/rest/pom.xml
@@ -93,7 +93,7 @@
<argLine>-Dwebapp.directory=${basedir}/src/main/webapp
-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax}
-Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
- -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+ -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
-Djava.util.logging.config.file=${basedir}/src/test/resources/logging.properties ${ug.argline}
</argLine>
<includes>
@@ -460,7 +460,7 @@
-Dwebapp.directory=${basedir}/src/main/webapp
-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
- -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+ -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
-Djava.util.logging.config.file=${basedir}/src/test/resources/logging.properties
${ug.argline}
</argLine>
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index b8c1caa..c9174c1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -18,19 +18,19 @@
package org.apache.usergrid.rest.applications;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
+import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.PathSegment;
import javax.ws.rs.core.UriInfo;
+import com.google.common.base.Preconditions;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilder;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteRequestBuilderImpl;
+import org.apache.usergrid.corepersistence.index.CollectionDeleteService;
+import org.apache.usergrid.persistence.index.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@@ -48,6 +48,9 @@
import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* A collection resource that stands before the Service Resource. If it cannot find
@@ -61,6 +64,9 @@
})
public class CollectionResource extends ServiceResource {
+ private static final Logger logger = LoggerFactory.getLogger( CollectionResource.class );
+ private static final String UPDATED_BEFORE_FIELD = "updatedBefore";
+
public CollectionResource() {
}
@@ -190,6 +196,61 @@
}
+ @PUT
+ @Path("{itemName}/_clear")
+ @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
+ @RequireApplicationAccess
+ @JSONP
+ public ApiResponse clearCollectionPut(
+ final Map<String, Object> payload,
+ @PathParam("itemName") final String collectionName,
+ @QueryParam("callback") @DefaultValue("callback") String callback
+ ) throws Exception {
+
+ logger.info("Clearing collection {} for application {}", collectionName, getApplicationId().toString());
+
+ final CollectionDeleteRequestBuilder request = createRequest()
+ .withApplicationId(getApplicationId())
+ .withCollection(collectionName);
+
+ return executeResumeAndCreateResponse(payload, request, callback);
+
+ }
+
+
+ @GET
+ @Path( "{itemName}/_clear/{jobId}")
+ @Produces({MediaType.APPLICATION_JSON,"application/javascript"})
+ @RequireApplicationAccess
+ @JSONP
+ public ApiResponse clearCollectionJobGet(
+ @Context UriInfo ui,
+ @PathParam("itemName") PathSegment itemName,
+ @PathParam("jobId") String jobId,
+ @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+ if(logger.isTraceEnabled()){
+ logger.trace( "CollectionResource.clearCollectionJobGet" );
+ }
+
+ Preconditions
+ .checkNotNull(jobId, "path param jobId must not be null" );
+
+ CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().getStatus(jobId);
+
+ final ApiResponse response = createApiResponse();
+
+ response.setAction( "clear collection" );
+ response.setProperty( "jobId", status.getJobId() );
+ response.setProperty( "status", status.getStatus() );
+ response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+ response.setProperty( "numberCheckedForDeletion", status.getNumberProcessed() );
+ response.setSuccess();
+
+ return response;
+ }
+
+
// TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
// So system access only.
// TODO: use scheduler here to get around people sending a reindex call 30 times.
@@ -210,4 +271,57 @@
services.getApplicationId().toString(),itemName.getPath(),false,callback );
}
+
+ private CollectionDeleteService getCollectionDeleteService() {
+ return injector.getInstance( CollectionDeleteService.class );
+ }
+
+
+ private CollectionDeleteRequestBuilder createRequest() {
+ return new CollectionDeleteRequestBuilderImpl();
+ }
+
+
+ private ApiResponse executeResumeAndCreateResponse( final Map<String, Object> payload,
+ final CollectionDeleteRequestBuilder request,
+ final String callback ) {
+
+ Map<String,Object> newPayload = payload;
+ if(newPayload == null || !payload.containsKey( UPDATED_BEFORE_FIELD )){
+ newPayload = new HashMap<>(1);
+ newPayload.put(UPDATED_BEFORE_FIELD,Long.MAX_VALUE);
+ }
+
+ Preconditions.checkArgument(newPayload.get(UPDATED_BEFORE_FIELD) instanceof Number,
+ "The field \"updatedBefore\" in the payload must be a timestamp" );
+
+ //add our updated timestamp to the request
+ if ( newPayload.containsKey( UPDATED_BEFORE_FIELD ) ) {
+ final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_BEFORE_FIELD));
+ request.withEndTimestamp( timestamp );
+ }
+
+ return executeAndCreateResponse( request, callback );
+ }
+
+ /**
+ * Execute the request and return the response.
+ */
+ private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) {
+
+
+ final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request );
+
+ final ApiResponse response = createApiResponse();
+
+ response.setAction( "clear collection" );
+ response.setProperty( "jobId", status.getJobId() );
+ response.setProperty( "status", status.getStatus() );
+ response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+ response.setProperty( "numberQueued", status.getNumberProcessed() );
+ response.setSuccess();
+
+ return response;
+ }
+
}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
index 5435f7e..3e4542d 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
@@ -17,6 +17,7 @@
package org.apache.usergrid.rest.applications.users;
+import java.util.Collection;
import java.util.Map;
import java.util.UUID;
@@ -465,6 +466,14 @@
if ( ( password1 != null ) || ( password2 != null ) ) {
if ( management.checkPasswordResetTokenForAppUser( getApplicationId(), getUserUuid(), token ) ) {
if ( ( password1 != null ) && password1.equals( password2 ) ) {
+ // validate password
+ Collection<String> violations = management.passwordPolicyCheck(password1, false);
+ if (violations.size() > 0) {
+ // password not valid
+ errorMsg = management.getPasswordDescription(false);
+ return handleViewable("resetpw_set_form", this, getOrganizationName());
+ }
+
management.setAppUserPassword( getApplicationId(), getUser().getUuid(), password1 );
management.revokeAccessTokenForAppUser( token );
return handleViewable( "resetpw_set_success", this, getOrganizationName() );
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
index 4cbe9b2..1f80bc1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
@@ -43,6 +43,7 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriInfo;
+import java.util.Collection;
import java.util.Map;
import java.util.UUID;
@@ -66,6 +67,9 @@
String token = null;
+ String loginEndpoint;
+
+
public UserResource() {
}
@@ -294,8 +298,17 @@
if ( ( password1 != null ) || ( password2 != null ) ) {
if ( management.checkPasswordResetTokenForAdminUser( user.getUuid(), tokenInfo ) ) {
if ( ( password1 != null ) && password1.equals( password2 ) ) {
+ // validate password
+ Collection<String> violations = management.passwordPolicyCheck(password1, true);
+ if (violations.size() > 0) {
+ // password not valid
+ errorMsg = management.getPasswordDescription(true);
+ return handleViewable( "resetpw_set_form", this, organizationId );
+ }
+
management.setAdminUserPassword( user.getUuid(), password1 );
management.revokeAccessTokenForAdminUser( user.getUuid(), token );
+ loginEndpoint = properties.getProperty("usergrid.viewable.loginEndpoint");
return handleViewable( "resetpw_set_success", this, organizationId );
}
else {
@@ -342,6 +355,9 @@
return errorMsg;
}
+ public String getLoginEndpoint() {
+ return loginEndpoint;
+ }
public String getToken() {
return token;
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/error.jsp
@@ -27,7 +27,7 @@
</head>
<body>
- <p>An error occurred <c:out value="${it}"/>.</p>
+ <p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/test.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/test.jsp
index 83a6ad1..68c12f2 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/test.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/TestResource/test.jsp
@@ -1,5 +1,6 @@
<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
pageEncoding="ISO-8859-1"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -24,6 +25,6 @@
<link rel="stylesheet" type="text/css" href="/css/styles.css" />
</head>
<body>
-<h1>${it.foo}</h1>
+<h1>${fn:escapeXml(it.foo)}</h1>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/authorize_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/authorize_form.jsp
index 6b1b8b2..ed934a7 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/authorize_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/authorize_form.jsp
@@ -2,6 +2,7 @@
pageEncoding="ISO-8859-1"%>
<%@ page import="org.apache.usergrid.rest.AbstractContextResource"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -28,13 +29,13 @@
<body>
<div class="dialog-area">
- <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+ <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
<form class="dialog-form" action="" method="post">
- <input type="hidden" name="response_type" value="${it.responseType}">
- <input type="hidden" name="client_id" value="${it.clientId}">
- <input type="hidden" name="redirect_uri" value="${it.redirectUri}">
- <input type="hidden" name="scope" value="${it.scope}">
- <input type="hidden" name="state" value="${it.state}">
+ <input type="hidden" name="response_type" value="${fn:escapeXml(it.responseType)}">
+ <input type="hidden" name="client_id" value="${fn:escapeXml(it.clientId)}">
+ <input type="hidden" name="redirect_uri" value="${fn:escapeXml(it.redirectUri)}">
+ <input type="hidden" name="scope" value="${fn:escapeXml(it.scope)}">
+ <input type="hidden" name="state" value="${fn:escapeXml(it.state)}">
<fieldset>
<p>
<label for="username">Username</label>
@@ -56,4 +57,4 @@
</div>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/ApplicationResource/error.jsp
@@ -27,7 +27,7 @@
</head>
<body>
- <p>An error occurred <c:out value="${it}"/>.</p>
+ <p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/activate.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/activate.jsp
index dfcf3b7..20e69b8 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/activate.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/activate.jsp
@@ -26,7 +26,7 @@
</head>
<body>
- <p>Your account with email address <c:out value="${it.user.email}"/> has been successfully activated.</p>
+ <p>Your account with email address <c:out value="${it.user.email}" escapeXml="true"/> has been successfully activated.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/confirm.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/confirm.jsp
index 02e9ee3..d7f3acc 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/confirm.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/confirm.jsp
@@ -26,8 +26,8 @@
</head>
<body>
- <p>Your account with email address <c:out value="${it.user.email}"/> has been successfully confirmed.
+ <p>Your account with email address <c:out value="${it.user.email}" escapeXml="true"/> has been successfully confirmed.
You will received an email soon to let you know when you account has been activated</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/error.jsp
@@ -27,7 +27,7 @@
</head>
<body>
- <p>An error occurred <c:out value="${it}"/>.</p>
+ <p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_form.jsp
index 0f53bfc..5230ea7 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_form.jsp
@@ -1,6 +1,7 @@
<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
pageEncoding="ISO-8859-1"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -27,13 +28,13 @@
<body>
<div class="dialog-area">
<c:if test="${!empty it.errorMsg}">
- <div class="dialog-form-message">${it.errorMsg}</div>
+ <div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div>
</c:if>
<form class="dialog-form" action="" method="post">
<fieldset>
<p>
Enter the captcha to have your password reset instructions sent to
- <c:out value="${it.user.email}" />
+ <c:out value="${it.user.email}" escapeXml="true" />
</p>
<p id="human-proof"></p>
${it.reCaptchaHtml}
@@ -44,4 +45,4 @@
</form>
</div>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_success.jsp
index 23f8508..41c5176 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_email_success.jsp
@@ -29,7 +29,7 @@
</head>
<body>
- <p>Email with instructions for password reset sent to <c:out value="${it.user.email}"/></p>
+ <p>Email with instructions for password reset sent to <c:out value="${it.user.email}" escapeXml="true"/></p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_form.jsp
index a83d80d..c0203ce 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_form.jsp
@@ -4,6 +4,7 @@
<%@ page import="net.tanesha.recaptcha.ReCaptchaFactory"%>
<%@ page import="org.apache.usergrid.rest.AbstractContextResource"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -30,12 +31,12 @@
<body>
<div class="dialog-area">
- <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+ <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
<form class="dialog-form" action="" method="post">
- <input type="hidden" name="token" value="${it.token}">
+ <input type="hidden" name="token" value="${fn:escapeXml(it.token)}">
<fieldset>
<p>
- <label for="password1">Please enter your new password for <c:out value="${it.user.email}"/>.</label>
+ <label for="password1">Please enter your new password for <c:out value="${it.user.email}" escapeXml="true"/>.</label>
</p>
<p>
<input class="text_field" id="password1" name="password1" type="password" />
@@ -54,4 +55,4 @@
</div>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_success.jsp
index 9de90ba..3915084 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UserResource/resetpw_set_success.jsp
@@ -29,7 +29,7 @@
</head>
<body>
- <p>New password set for <c:out value="${it.user.email}"/></p>
+ <p>New password set for <c:out value="${it.user.email}" escapeXml="true"/></p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/error.jsp
@@ -27,7 +27,7 @@
</head>
<body>
- <p>An error occurred <c:out value="${it}"/>.</p>
+ <p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_form.jsp
index 3211a3a..01dfc57 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_form.jsp
@@ -1,6 +1,7 @@
<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
pageEncoding="ISO-8859-1"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -28,7 +29,7 @@
<div class="dialog-area">
<c:if test="${!empty it.errorMsg}">
- <div class="dialog-form-message">${it.errorMsg}</div>
+ <div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div>
</c:if>
<form class="dialog-form" action="" method="post">
<fieldset>
@@ -50,4 +51,4 @@
</div>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_success.jsp
index 23f8508..41c5176 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/applications/users/UsersResource/resetpw_email_success.jsp
@@ -29,7 +29,7 @@
</head>
<body>
- <p>Email with instructions for password reset sent to <c:out value="${it.user.email}"/></p>
+ <p>Email with instructions for password reset sent to <c:out value="${it.user.email}" escapeXml="true"/></p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/authorize_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/authorize_form.jsp
index 6b1b8b2..ed934a7 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/authorize_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/authorize_form.jsp
@@ -2,6 +2,7 @@
pageEncoding="ISO-8859-1"%>
<%@ page import="org.apache.usergrid.rest.AbstractContextResource"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -28,13 +29,13 @@
<body>
<div class="dialog-area">
- <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+ <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
<form class="dialog-form" action="" method="post">
- <input type="hidden" name="response_type" value="${it.responseType}">
- <input type="hidden" name="client_id" value="${it.clientId}">
- <input type="hidden" name="redirect_uri" value="${it.redirectUri}">
- <input type="hidden" name="scope" value="${it.scope}">
- <input type="hidden" name="state" value="${it.state}">
+ <input type="hidden" name="response_type" value="${fn:escapeXml(it.responseType)}">
+ <input type="hidden" name="client_id" value="${fn:escapeXml(it.clientId)}">
+ <input type="hidden" name="redirect_uri" value="${fn:escapeXml(it.redirectUri)}">
+ <input type="hidden" name="scope" value="${fn:escapeXml(it.scope)}">
+ <input type="hidden" name="state" value="${fn:escapeXml(it.state)}">
<fieldset>
<p>
<label for="username">Username</label>
@@ -56,4 +57,4 @@
</div>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/ManagementResource/error.jsp
@@ -27,7 +27,7 @@
</head>
<body>
- <p>An error occurred <c:out value="${it}"/>.</p>
+ <p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/activate.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/activate.jsp
index 85114cd..f5fa14d 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/activate.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/activate.jsp
@@ -26,7 +26,7 @@
</head>
<body>
- <p>Your organization <c:out value="${it.organization.name}"/> has been successfully activated.</p>
+ <p>Your organization <c:out value="${it.organization.name}" escapeXml="true"/> has been successfully activated.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/confirm.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/confirm.jsp
index f4307b7..5fb41c7 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/confirm.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/confirm.jsp
@@ -26,8 +26,8 @@
</head>
<body>
- <p>Your organization <c:out value="${it.organization.name}"/> has been successfully confirmed.
+ <p>Your organization <c:out value="${it.organization.name}" escapeXml="true"/> has been successfully confirmed.
You will received an email soon to let you know when you organization has been activated</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/organizations/OrganizationResource/error.jsp
@@ -27,7 +27,7 @@
</head>
<body>
- <p>An error occurred <c:out value="${it}"/>.</p>
+ <p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/activate.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/activate.jsp
index dfcf3b7..20e69b8 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/activate.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/activate.jsp
@@ -26,7 +26,7 @@
</head>
<body>
- <p>Your account with email address <c:out value="${it.user.email}"/> has been successfully activated.</p>
+ <p>Your account with email address <c:out value="${it.user.email}" escapeXml="true"/> has been successfully activated.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/confirm.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/confirm.jsp
index 02e9ee3..d7f3acc 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/confirm.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/confirm.jsp
@@ -26,8 +26,8 @@
</head>
<body>
- <p>Your account with email address <c:out value="${it.user.email}"/> has been successfully confirmed.
+ <p>Your account with email address <c:out value="${it.user.email}" escapeXml="true"/> has been successfully confirmed.
You will received an email soon to let you know when you account has been activated</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/error.jsp
@@ -27,7 +27,7 @@
</head>
<body>
- <p>An error occurred <c:out value="${it}"/>.</p>
+ <p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_form.jsp
index 3e56cd1..6341d60 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_form.jsp
@@ -1,6 +1,7 @@
<%@ page language="java" contentType="text/html; charset=ISO-8859-1"
pageEncoding="ISO-8859-1"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
+<%@ taglib uri="http://java.sun.com/jsp/jstl/functions" prefix="fn"%>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -27,13 +28,13 @@
<body>
<div class="dialog-area password-reset-form">
<c:if test="${!empty it.errorMsg}">
- <div class="dialog-form-message">${it.errorMsg}</div>
+ <div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div>
</c:if>
<form class="dialog-form" action="" method="post">
<fieldset>
<p>
Enter the captcha to have your password reset instructions sent to
- <c:out value="${it.user.email}" />
+ <c:out value="${it.user.email}" escapeXml="true" />
</p>
<p id="human-proof"></p>
${it.reCaptchaHtml}
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_success.jsp
index 23f8508..41c5176 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_email_success.jsp
@@ -29,7 +29,7 @@
</head>
<body>
- <p>Email with instructions for password reset sent to <c:out value="${it.user.email}"/></p>
+ <p>Email with instructions for password reset sent to <c:out value="${it.user.email}" escapeXml="true"/></p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_form.jsp
index a83d80d..6334466 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_form.jsp
@@ -30,12 +30,12 @@
<body>
<div class="dialog-area">
- <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+ <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
<form class="dialog-form" action="" method="post">
- <input type="hidden" name="token" value="${it.token}">
+ <input type="hidden" name="token" value="${fn:escapeXml(it.token)}">
<fieldset>
<p>
- <label for="password1">Please enter your new password for <c:out value="${it.user.email}"/>.</label>
+ <label for="password1">Please enter your new password for <c:out value="${it.user.email}" escapeXml="true"/>.</label>
</p>
<p>
<input class="text_field" id="password1" name="password1" type="password" />
@@ -54,4 +54,4 @@
</div>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_success.jsp
index 9de90ba..3915084 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UserResource/resetpw_set_success.jsp
@@ -29,7 +29,7 @@
</head>
<body>
- <p>New password set for <c:out value="${it.user.email}"/></p>
+ <p>New password set for <c:out value="${it.user.email}" escapeXml="true"/></p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/error.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/error.jsp
index be184b1..d02ad40 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/error.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/error.jsp
@@ -27,7 +27,7 @@
</head>
<body>
- <p>An error occurred <c:out value="${it}"/>.</p>
+ <p>An error occurred <c:out value="${it}" escapeXml="true"/>.</p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_form.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_form.jsp
index 8643016..f8cf496 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_form.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_form.jsp
@@ -27,7 +27,7 @@
<body>
<div class="dialog-area">
- <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${it.errorMsg}</div></c:if>
+ <c:if test="${!empty it.errorMsg}"><div class="dialog-form-message">${fn:escapeXml(it.errorMsg)}</div></c:if>
<form class="dialog-form" action="" method="post">
<fieldset>
<p>
@@ -47,4 +47,4 @@
</div>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_success.jsp b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_success.jsp
index 23f8508..41c5176 100644
--- a/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_success.jsp
+++ b/stack/rest/src/main/webapp/WEB-INF/jsp/org/apache/usergrid/rest/management/users/UsersResource/resetpw_email_success.jsp
@@ -29,7 +29,7 @@
</head>
<body>
- <p>Email with instructions for password reset sent to <c:out value="${it.user.email}"/></p>
+ <p>Email with instructions for password reset sent to <c:out value="${it.user.email}" escapeXml="true"/></p>
</body>
-</html>
\ No newline at end of file
+</html>
diff --git a/stack/rest/src/test/resources/project.properties b/stack/rest/src/test/resources/project.properties
index 94ef3bd..20a38f6 100644
--- a/stack/rest/src/test/resources/project.properties
+++ b/stack/rest/src/test/resources/project.properties
@@ -15,4 +15,4 @@
# limitations under the License.
target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
diff --git a/stack/rest/src/test/resources/usergrid-custom-test.properties b/stack/rest/src/test/resources/usergrid-custom-test.properties
index cbec81f..ee7b039 100644
--- a/stack/rest/src/test/resources/usergrid-custom-test.properties
+++ b/stack/rest/src/test/resources/usergrid-custom-test.properties
@@ -18,7 +18,7 @@
cassandra.startup=external
cassandra.connections=30
cassandra.timeout.pool=20000
-cassandra.timeout=20000
+cassandra.timeout=25000
hystrix.threadpool.graph_user.coreSize=1200
@@ -68,7 +68,7 @@
collection.uniquevalues.actors=300
collection.uniquevalues.authoritative.region=us-east
-elasticsearch.queue_impl=DISTRIBUTED
+elasticsearch.queue_impl=LOCAL
# Queueing Test Settings
# Reduce the long polling time for the tests
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index b1df1b4..29fa311 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -102,7 +102,7 @@
<useSystemClassLoader>false</useSystemClassLoader>
<argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
- -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+ -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
${ug.argline} -Dlog4j.configuration=file:${basedir}/src/test/resources/log4j.properties
</argLine>
<includes>
@@ -499,7 +499,7 @@
<configuration>
<argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
- -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+ -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
${ug.argline}
-javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec
</argLine>
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
index 2b88b07..8b840d6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
@@ -17,11 +17,7 @@
package org.apache.usergrid.management;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import org.apache.usergrid.persistence.CredentialsInfo;
import org.apache.usergrid.persistence.Entity;
@@ -372,6 +368,10 @@
Observable<Id> deleteAllEntities(final UUID applicationId,final int limit);
+ Collection<String> passwordPolicyCheck(String password, boolean isAdminUser);
+
+ String getPasswordDescription(boolean isAdminUser);
+
// DO NOT REMOVE BELOW METHODS, THEY ARE HERE TO ALLOW EXTERNAL CLASSES TO OVERRIDE AND HOOK INTO POST PROCESSING
void createOrganizationPostProcessing( final OrganizationInfo orgInfo,
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index 2ba9bde..89375fd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -3412,6 +3412,16 @@
return service.deleteAllEntities(CpNamingUtils.getApplicationScope(applicationId),limit);
}
+ @Override
+ public Collection<String> passwordPolicyCheck(String password, boolean isAdminUser) {
+ return passwordPolicy.policyCheck(password, isAdminUser);
+ }
+
+ @Override
+ public String getPasswordDescription(boolean isAdminUser) {
+ return passwordPolicy.getDescription(isAdminUser);
+ }
+
private String getProperty(String key) {
String obj = properties.getProperty(key);
if(StringUtils.isEmpty(obj))
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ActivitiesServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ActivitiesServiceIT.java
index c4762f2..c8a446a 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ActivitiesServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ActivitiesServiceIT.java
@@ -93,11 +93,14 @@
app.testRequest( ServiceAction.GET, 4, null, "users", userC.getUuid(), "feed" );
- app.testRequest( ServiceAction.GET, 2, null, "users", userC.getUuid(), "feed",
- Query.fromQL( "select * where content contains 'cookie'" ) );
+ // time for indexing
+ Thread.sleep(10000);
app.testRequest( ServiceAction.GET, 1, "users", userC.getUuid(), "feed",
- Query.fromQL( "select * where verb='post' and content contains 'cookie'" ) );
+ Query.fromQL( "select * where verb='post' and content contains 'cookie'" ) );
+
+ app.testRequest( ServiceAction.GET, 2, null, "users", userC.getUuid(), "feed",
+ Query.fromQL( "select * where content contains 'cookie'" ) );
app.put( "username", "finn" );
app.put( "email", "finn@ooo.com" );
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
index 81dced1..434fd80 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/ServiceInvocationIT.java
@@ -102,6 +102,9 @@
app.testRequest( ServiceAction.GET, 2, "users", "edanuff", "likes", "restaurants" );
+ // time for indexing
+ Thread.sleep(10000);
+
app.testRequest( ServiceAction.GET, 1, "users", "edanuff", "likes", "restaurants",
Query.fromQL( "select * where name='Brickhouse'" ) );
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
index c035192..9b3dfcd 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
@@ -49,9 +49,9 @@
protected Notification notificationWaitForComplete(Notification notification)
throws Exception {
- long timeout = System.currentTimeMillis() + 60000;
+ long timeout = System.currentTimeMillis() + 120000;
while (System.currentTimeMillis() < timeout) {
- app.waitForQueueDrainAndRefreshIndex(200);
+ app.waitForQueueDrainAndRefreshIndex(1000);
notification = app.getEntityManager().get(notification.getUuid(), Notification.class);
if (notification.getFinished() != null) {
return notification;
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 8360009..ecc5443 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -632,7 +632,7 @@
// receipts are created and queried, wait a bit longer for this to happen as indexing
- app.waitForQueueDrainAndRefreshIndex(500);
+ app.waitForQueueDrainAndRefreshIndex(5000);
// get the receipts entity IDs
List<EntityRef> receipts = getNotificationReceipts(notification);
diff --git a/stack/services/src/test/resources/project.properties b/stack/services/src/test/resources/project.properties
index d38e878..03736c0 100644
--- a/stack/services/src/test/resources/project.properties
+++ b/stack/services/src/test/resources/project.properties
@@ -16,4 +16,4 @@
target.directory=${project.build.directory}
resources.dir=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
diff --git a/stack/services/src/test/resources/usergrid-custom-test.properties b/stack/services/src/test/resources/usergrid-custom-test.properties
index bcc8b8e..4178a07 100644
--- a/stack/services/src/test/resources/usergrid-custom-test.properties
+++ b/stack/services/src/test/resources/usergrid-custom-test.properties
@@ -35,7 +35,7 @@
elasticsearch.buffer_timeout=1
elasticsearch.queue_impl.resolution=true
-elasticsearch.queue_impl=DISTRIBUTED
+elasticsearch.queue_impl=LOCAL
# Queueing Test Settings
queue.long.polling.time.millis=50
diff --git a/stack/test-utils/pom.xml b/stack/test-utils/pom.xml
index bbcf1ff..f99d43a 100644
--- a/stack/test-utils/pom.xml
+++ b/stack/test-utils/pom.xml
@@ -59,7 +59,7 @@
<threadCount>${usergrid.it.threads}</threadCount>
<threadCountClasses></threadCountClasses>
<reuseForks>true</reuseForks>
- <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+ <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
<includes>
<include>**/CassandraResourceITSuite.java</include>
</includes>
@@ -290,7 +290,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire.plugin.version}</version>
<configuration>
- <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+ <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
</configuration>
</plugin>
</plugins>
diff --git a/stack/test-utils/src/test/resources/project.properties b/stack/test-utils/src/test/resources/project.properties
index cd5b819..0bc9bb7 100644
--- a/stack/test-utils/src/test/resources/project.properties
+++ b/stack/test-utils/src/test/resources/project.properties
@@ -14,4 +14,4 @@
# limitations under the License.
target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index cbd2c1e..b34b068 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -61,7 +61,7 @@
<storage-config>${basedir}/src/test/conf</storage-config>
</systemPropertyVariables>
<forkMode>always</forkMode>
- <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+ <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
</configuration>
</plugin>
diff --git a/stack/websocket/pom.xml b/stack/websocket/pom.xml
index af5ed56..72a5c9d 100644
--- a/stack/websocket/pom.xml
+++ b/stack/websocket/pom.xml
@@ -70,7 +70,7 @@
<storage-config>${basedir}/src/test/conf</storage-config>
</systemPropertyVariables>
<forkMode>always</forkMode>
- <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+ <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
</configuration>
</plugin>
</plugins>