Add option to include old version in result
Add debug options
Add gzip support
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1c979d6..4d44112 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -32,7 +32,6 @@
import org.apache.usergrid.corepersistence.index.CollectionSettings;
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
@@ -70,6 +69,7 @@
import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.mq.Message;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import org.apache.usergrid.utils.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -524,7 +524,7 @@
String entityType = cpEntity.getId().getType();
boolean skipIndexingForType = skipIndexingForType(entityType);
- IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
+ QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
try {
@@ -552,14 +552,14 @@
}
if (!skipIndexingForType) {
- indexEntity(cpEntity, indexingStrategy);
+ indexEntity(cpEntity, queueIndexingStrategy);
deIndexOldVersionsOfEntity(cpEntity);
}
}
- private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, IndexingStrategy indexingStrategy) {
+ private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, QueueIndexingStrategy queueIndexingStrategy) {
// queue an event to update the new entity
- indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , indexingStrategy);
+ indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , queueIndexingStrategy);
}
private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
@@ -569,7 +569,7 @@
}
}
- private IndexingStrategy getIndexingStrategyForType(String type ) {
+ private QueueIndexingStrategy getIndexingStrategyForType(String type ) {
return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
}
@@ -1813,7 +1813,8 @@
for (String validName : CpCollectionUtils.getValidSettings()) {
if (newSettings.containsKey(validName)) {
- updatedSettings.put(validName, newSettings.get(validName));
+ String value = CpCollectionUtils.validateValue(validName, newSettings.get(validName));
+ updatedSettings.put(validName, value);
}
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index a23d6ac..e329c29 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -20,10 +20,7 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
-import org.apache.usergrid.corepersistence.index.CollectionSettings;
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
-import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
@@ -50,6 +47,8 @@
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.settings.IndexConsistency;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import org.apache.usergrid.persistence.schema.CollectionInfo;
import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.MapUtils;
@@ -62,7 +61,6 @@
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
import static org.apache.usergrid.persistence.Schema.*;
-import static org.apache.usergrid.utils.ClassUtils.cast;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
import static org.apache.usergrid.utils.MapUtils.addMapSet;
@@ -397,8 +395,8 @@
String entityType = cpHeadEntity.getId().getType();
if ( !skipIndexingForType( entityType) ) {
- IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
- indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, indexingStrategy);
+ QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
+ indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, queueIndexingStrategy);
}
} );
@@ -406,8 +404,8 @@
String entityType = memberEntity.getId().getType();
if ( !skipIndexingForType( entityType ) ) {
- IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
- indexService.queueNewEdge(applicationScope, memberEntityId, edge, indexingStrategy);
+ QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
+ indexService.queueNewEdge(applicationScope, memberEntityId, edge, queueIndexingStrategy);
}
@@ -668,6 +666,8 @@
queryString, cursor );
search.setAnalyzeOnly(analyzeOnly);
+ IndexConsistency indexConsistency = getIndexConsistencyForType(collectionName);
+ search.setKeepStaleEntries(indexConsistency == IndexConsistency.LATEST);
return collectionService.searchCollection( search );
}
@@ -738,8 +738,8 @@
String entityType = targetEntity.getId().getType();
if ( !skipIndexingForType( entityType ) ) {
- IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
- indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, indexingStrategy);
+ QueueIndexingStrategy queueIndexingStrategy = getIndexingStrategyForType(entityType);
+ indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, queueIndexingStrategy);
}
// remove any duplicate edges (keeps the duplicate edge with same timestamp)
@@ -1100,7 +1100,11 @@
}
- private IndexingStrategy getIndexingStrategyForType(String type ) {
+ private IndexConsistency getIndexConsistencyForType(String type ) {
+ return CpCollectionUtils.getIndexConsistencyForType(collectionSettingsFactory, applicationId, type);
+ }
+
+ private QueueIndexingStrategy getIndexingStrategyForType(String type ) {
return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 4305aea..b8e8117 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -21,13 +21,13 @@
import org.apache.usergrid.corepersistence.index.CollectionDeleteAction;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import java.util.UUID;
@@ -55,7 +55,7 @@
* @param updatedAfter
* @param
*/
- void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, IndexingStrategy strategy);
+ void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, QueueIndexingStrategy strategy);
/**
@@ -68,7 +68,7 @@
* @param entityId
* @param newEdge
*/
- void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, IndexingStrategy indexingStrategy);
+ void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, QueueIndexingStrategy queueIndexingStrategy);
/**
* Queue the deletion of an edge
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 8257640..ec08dfe 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -29,7 +29,6 @@
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
@@ -58,6 +57,7 @@
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -214,7 +214,7 @@
start();
}
- protected Histogram getMessageCycye() {
+ protected Histogram getMessageCycle() {
return messageCycle;
}
@@ -278,28 +278,25 @@
* Offer the EntityIdScope to SQS
*/
protected void offer(final Serializable operation) {
- offer(operation, AsyncEventQueueType.REGULAR, IndexingStrategy.DIRECT);
+ offer(operation, AsyncEventQueueType.REGULAR, QueueIndexingStrategy.DIRECT);
}
/**
* Offer the EntityIdScope to SQS
*/
- protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) {
- offer(operation, AsyncEventQueueType.REGULAR, indexingStrategy);
+ protected void offer(final Serializable operation, QueueIndexingStrategy queueIndexingStrategy) {
+ offer(operation, AsyncEventQueueType.REGULAR, queueIndexingStrategy);
}
/**
* Offer the EntityIdScope to SQS
*/
- private void offer(final Serializable operation, AsyncEventQueueType queueType, IndexingStrategy indexingStrategy) {
+ private void offer(final Serializable operation, AsyncEventQueueType queueType, QueueIndexingStrategy queueIndexingStrategy) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- Boolean async = null;
- if (indexingStrategy != IndexingStrategy.DEFAULT) {
- async = (indexingStrategy == IndexingStrategy.ASYNC);
- }
+ Boolean async = (queueIndexingStrategy == QueueIndexingStrategy.ASYNC);
getQueue(queueType).sendMessageToLocalRegion(operation, async);
} catch (IOException e) {
@@ -548,7 +545,7 @@
@Override
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
- final Entity entity, long updatedAfter, IndexingStrategy indexingStrategy) {
+ final Entity entity, long updatedAfter, QueueIndexingStrategy queueIndexingStrategy) {
if (logger.isTraceEnabled()) {
@@ -561,7 +558,7 @@
new EntityIdScope(applicationScope, entity.getId()),
updatedAfter);
- offer(event, indexingStrategy);
+ offer(event, queueIndexingStrategy);
}
@@ -599,14 +596,14 @@
public void queueNewEdge(final ApplicationScope applicationScope,
final Id entityId,
final Edge newEdge,
- IndexingStrategy indexingStrategy) {
+ QueueIndexingStrategy queueIndexingStrategy) {
if (logger.isTraceEnabled()) {
logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
newEdge.getType(), entityId.getUuid(), entityId.getType());
}
- offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), indexingStrategy);
+ offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), queueIndexingStrategy);
}
@@ -710,7 +707,7 @@
offerTopic( elasticsearchIndexEvent, queueType );
}
- protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) {
+ protected ElasticsearchIndexEvent getESIndexEvent(final IndexOperationMessage indexOperationMessage) {
final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
deleted file mode 100644
index 9123138..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.asyncevents.direct;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.function.Consumer;
-
-/**
- * Bufferes events and dispatched then in batches.
- * Ensures that the callback will be called at a min interval.
- */
-public class BufferedQueueImpl<T> implements BufferedQueue<T> {
-
- private String fileName = "my_file_name.txt";
- private Consumer<List<T>> consumer;
-
- ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
-
- private final LinkedBlockingQueue<PendingDispatch> queue;
- private final long intervalNanos;
- private long timeOfLastDispatch = 0L;
-
- public BufferedQueueImpl(int size, long interval , TimeUnit intervalTimeUnit) {
-
- Runtime.getRuntime().addShutdownHook(new Thread(new DispatchTask()));
-
- this.intervalNanos = intervalTimeUnit.toNanos(interval);
- threadPool.scheduleAtFixedRate(new DispatchTask(), intervalNanos,intervalNanos, TimeUnit.NANOSECONDS);
- readBatchFile();
- queue = new LinkedBlockingQueue<>(size);
- }
-
- public boolean offer(T t) {
- PendingDispatch pd = new PendingDispatch(t);
- if (timeOfLastDispatch + intervalNanos < System.nanoTime()) {
- dispatchOne(pd);
- return true;
- }
- try {
- return queue.offer(pd, intervalNanos, TimeUnit.NANOSECONDS);
- } catch (InterruptedException e) {
- return false;
- }
- }
-
- public void setConsumer(Consumer<List<T>> consumer) {
- this.consumer = consumer;
- }
-
-
- private void dispatchOne(PendingDispatch pd) {
- List<PendingDispatch> messages = new ArrayList<>();
- messages.add(pd);
- dispatchMessages(messages);
- }
-
- protected void dispatchAll() {
- if (!queue.isEmpty()) {
- List<PendingDispatch> messages = new ArrayList<>();
- queue.drainTo(messages);
- dispatchMessages(messages);
- }
- }
-
- private void dispatchMessages(List<PendingDispatch> messages) {
- List<T> m = new ArrayList<>();
- for (PendingDispatch pd : messages) {
- if (!pd.isCancelled()) {
- m.add(pd.getWrapped());
- }
- }
- timeOfLastDispatch = System.nanoTime();
- Boolean sent = Boolean.TRUE;
- try {
- consumer.accept(m);
- } catch (Exception e) {
- sent = Boolean.FALSE;
- }
- for (PendingDispatch pd : messages) {
- pd.setResult(sent);
- }
- }
-
-
- public int size() {
- return queue.size();
- }
-
- private void readBatchFile() {
-
- }
-
-
- //
- // Internal Helper classes
- //
-
-
-
- private class PendingDispatch implements Future<Boolean> {
- T wrapped;
- boolean canceled;
- boolean done;
- Boolean result = null;
-
- PendingDispatch(T wrapped) {
- this.wrapped = wrapped;
- canceled = false;
- done = false;
- }
-
- T getWrapped() {
- return wrapped;
- }
-
- void setResult(Boolean b) {
- result = b;
- done = true;
- synchronized (this) {
- notify();
- }
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- canceled = true;
- return canceled;
- }
-
- @Override
- public boolean isCancelled() {
- return canceled;
- }
-
- @Override
- public boolean isDone() {
- return done;
- }
-
- @Override
- public Boolean get() throws InterruptedException, ExecutionException {
- while (!done) {
- synchronized (this) {
- wait(100);
- }
- }
- return result;
- }
-
- @Override
- public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- if (!done) {
- synchronized (this) {
- wait(unit.toMillis(timeout));
- }
- }
- return result;
- }
- }
-
-
- private class DispatchTask implements Runnable {
- @Override
- public void run() {
- try {
- dispatchAll();
- } catch (Throwable t) {
- }
- }
- }
-
-}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
index f842cea..c4d28b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
@@ -21,6 +21,9 @@
import java.util.function.Consumer;
/**
+ * This is NOP buffer. An alternate implementation of this interface might buffer the
+ * events to smooth out 'bursts'
+ *
* Created by peterajohnson on 10/27/17.
*/
public class BufferedQueueNOP<T> implements BufferedQueue<T> {
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
index 4dfce37..2ca0bc1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
@@ -23,7 +23,6 @@
import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
@@ -34,6 +33,7 @@
import org.apache.usergrid.persistence.queue.LegacyQueueFig;
import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,17 +54,19 @@
private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
- private IndexingStrategy configIndexingStrategy = IndexingStrategy.ASYNC;
+ private boolean indexDebugMode = false;
+ private QueueIndexingStrategy configQueueIndexingStrategy = QueueIndexingStrategy.ASYNC;
private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
- //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
- configIndexingStrategy = IndexingStrategy.get(queueFig.getQueueStrategy());
+ configQueueIndexingStrategy = QueueIndexingStrategy.get(queueFig.getQueueStrategy());
+
+ indexDebugMode = Boolean.valueOf(queueFig.getQueueDebugMode());
}
@@ -82,8 +84,10 @@
// failed to dispatch send to SQS
try {
List<LegacyQueueMessage> indexedMessages = submitToIndex(result, false);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sent {} messages to ES ", indexedMessages.size());
+ }
} catch (Exception e) {
- e.printStackTrace();
for (Serializable body : bodies) {
super.offer(body);
}
@@ -123,16 +127,16 @@
}
- protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) {
- if (shouldSendToDirectToES(indexingStrategy)) {
+ protected void offer(final Serializable operation, QueueIndexingStrategy queueIndexingStrategy) {
+ queueIndexingStrategy = resolveIndexingStrategy(queueIndexingStrategy);
+ if (queueIndexingStrategy.shouldSendDirectToES()) {
List<LegacyQueueMessage> messages = getMessageArray(operation);
List<IndexEventResult> result = callEventHandlers(messages);
submitToIndex( result, false );
}
- // only if single region.
- if (shouldSendToAWS(indexingStrategy)) {
- super.offer(operation, indexingStrategy);
+ if (queueIndexingStrategy.shouldSendToAWS()) {
+ super.offer(operation, queueIndexingStrategy);
}
}
@@ -152,7 +156,7 @@
.map(indexEventResult -> {
//record the cycle time
- getMessageCycye().update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+ getMessageCycle().update(System.currentTimeMillis() - indexEventResult.getCreationTime());
// ingest each index op into our combined, single index op for the index producer
if(indexEventResult.getIndexOperationMessage().isPresent()){
@@ -166,23 +170,24 @@
// dispatch to ES
- ElasticsearchIndexEvent elasticsearchIndexEvent = getIndexOperationMessage(combined);
+ ElasticsearchIndexEvent elasticsearchIndexEvent = getESIndexEvent(combined);
handleIndexOperation(elasticsearchIndexEvent);
return queueMessages;
}
- private boolean shouldSendToDirectToES(IndexingStrategy indexingStrategy) {
- if (indexingStrategy == IndexingStrategy.DEFAULT) {
- indexingStrategy = configIndexingStrategy;
+ // If the collection has not defined an indexing strategy then use the default from the fig.
+ // only allow NOINDEX or DIRECTONLY when in debug mode
+ private QueueIndexingStrategy resolveIndexingStrategy(QueueIndexingStrategy queueIndexingStrategy) {
+ switch (queueIndexingStrategy) {
+ case CONFIG:
+ return configQueueIndexingStrategy;
+ case NOINDEX:
+ case DIRECTONLY:
+ if (!indexDebugMode) {
+ return configQueueIndexingStrategy;
+ }
+ default:
+ return queueIndexingStrategy;
}
- return (indexingStrategy == IndexingStrategy.DIRECT || indexingStrategy == IndexingStrategy.DIRECTONLY);
- }
-
- private boolean shouldSendToAWS(IndexingStrategy indexingStrategy) {
- if (indexingStrategy == IndexingStrategy.DEFAULT) {
- indexingStrategy = configIndexingStrategy;
- }
- // and is in same region.
- return (indexingStrategy != IndexingStrategy.DIRECTONLY);
}
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
deleted file mode 100644
index 69c5445..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.index;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class describes the paths an index request can take
- * between tomcat and ES.
- *
- * Created by peterajohnson on 10/30/17.
- */
-public enum IndexingStrategy {
-
- DIRECTONLY("directonly"), // Index request is sent directly to ES and not to AWS
- DIRECT("direct"), // Index request is sent directly to ES before sync ASW
- SYNC("sync"), // Index request is sent via a sync AWS to ES
- ASYNC("async"), // Index request is sent via an async AWS to ES
- DEFAULT("default"); // Follow the default setting
-
- private String name;
-
- private static final Map<String,IndexingStrategy> NAME_MAP;
-
- static {
- Map<String,IndexingStrategy> map = new HashMap<String,IndexingStrategy>();
- for (IndexingStrategy instance : IndexingStrategy.values()) {
- map.put(instance.getName(),instance);
- }
- NAME_MAP = Collections.unmodifiableMap(map);
- }
-
- IndexingStrategy(String name) {
- this.name = name;
- }
-
- public static IndexingStrategy get(String name) {
- IndexingStrategy indexingStrategy = NAME_MAP.get(name);
- if (indexingStrategy == null) {
- return DEFAULT;
- }
- return indexingStrategy;
- }
-
-
- public String getName() {
- return this.name;
- }
-
-}
-
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
index 13edb2c..34799bb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/Pipeline.java
@@ -50,6 +50,8 @@
private final RequestCursor requestCursor;
private int limit;
+ private boolean keepStaleEntries;
+ private String query;
//Generics hell, intentionally without a generic, we check at the filter level
private Observable currentObservable;
@@ -58,7 +60,7 @@
/**
* Create our filter pipeline
*/
- public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit ) {
+ public Pipeline( final ApplicationScope applicationScope, final Optional<String> cursor, final int limit, boolean keepStaleEntries, String query) {
ValidationUtils.validateApplicationScope( applicationScope );
@@ -78,6 +80,9 @@
final FilterResult<Id> filter = new FilterResult<>( applicationScope.getApplication(), Optional.absent() );
this.currentObservable = Observable.just( filter );
+
+ this.keepStaleEntries = keepStaleEntries;
+ this.query = query;
}
@@ -86,7 +91,7 @@
- final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount );
+ final PipelineContext context = new PipelineContext( applicationScope, requestCursor, limit, idCount, keepStaleEntries, query );
filter.setContext( context );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
index 018abb7..88b5001 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/PipelineContext.java
@@ -39,14 +39,20 @@
private final ApplicationScope applicationScope;
private final RequestCursor requestCursor;
private final int limit;
+ // An entry is stale if the ES version number is less than the Cassandra version number
+ // it can happen if ES was not updated or has yet to be updated.
+ private final boolean keepStaleEntries;
+ private String query;
- public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id ) {
+ public PipelineContext( final ApplicationScope applicationScope, final RequestCursor requestCursor, final int limit, final int id, boolean keepStaleEntries, String query ) {
this.applicationScope = applicationScope;
this.requestCursor = requestCursor;
this.limit = limit;
this.id = id;
+ this.keepStaleEntries = keepStaleEntries;
+ this.query = query;
}
@@ -78,5 +84,18 @@
return limit;
}
+ /**
+ * return true if stales entries are not to be filtered out.
+ */
+ public boolean getKeepStaleEntries() {
+ return keepStaleEntries;
+ }
+
+ /**
+ * return the query string if any
+ */
+ public String getQuery() {
+ return query;
+ }
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
index 9354127..a3b6fd9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/CandidateBuilder.java
@@ -24,6 +24,7 @@
import org.apache.usergrid.corepersistence.pipeline.Pipeline;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.service.CollectionSearch;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -33,12 +34,20 @@
private final Pipeline<FilterResult<Candidate>> pipeline;
private final FilterFactory filterFactory;
+ private CollectionSearch search;
+
+ public CandidateBuilder(final Pipeline<FilterResult<Candidate>> pipeline,
+ final FilterFactory filterFactory) {
+ this(pipeline,filterFactory,null);
+ }
- public CandidateBuilder( final Pipeline<FilterResult<Candidate>> pipeline,
- final FilterFactory filterFactory ) {
+ public CandidateBuilder(final Pipeline<FilterResult<Candidate>> pipeline,
+ final FilterFactory filterFactory,
+ CollectionSearch search) {
this.pipeline = pipeline;
this.filterFactory = filterFactory;
+ this.search = search;
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index a7f9ad9..4f44ac4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -31,6 +31,7 @@
import org.apache.usergrid.corepersistence.pipeline.read.collect.IdResumeFilter;
import org.apache.usergrid.corepersistence.pipeline.read.collect.ResultsPageCollector;
import org.apache.usergrid.corepersistence.pipeline.read.search.Candidate;
+import org.apache.usergrid.corepersistence.service.CollectionSearch;
import org.apache.usergrid.persistence.ConnectionRef;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -125,9 +126,16 @@
final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter(
ql, collectionName, entityType, analyzeOnly ) );
- return new CandidateBuilder( newFilter, filterFactory );
+ return new CandidateBuilder( newFilter, filterFactory , null);
}
+ public CandidateBuilder searchCollection(final String collectionName, final String ql, final CollectionSearch search ) {
+
+ final Pipeline<FilterResult<Candidate>> newFilter = pipeline.withFilter( filterFactory.searchCollectionFilter(
+ ql, collectionName, search.getEntityType(), search.getAnalyzeOnly() ) );
+
+ return new CandidateBuilder( newFilter, filterFactory, search );
+ }
/**
* Search all connections from our input Id and search their connections
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
index f1a44ea..624f9dc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/PipelineBuilder.java
@@ -44,6 +44,8 @@
private Optional<String> cursor = Optional.absent();
private int limit = 10;
private final FilterFactory filterFactory;
+ private boolean keepStaleEntries = false;
+ private String query = "";
/**
@@ -81,6 +83,21 @@
return this;
}
+ /**
+ */
+ public PipelineBuilder keepStaleEntries(final boolean keepStaleEntries){
+ this.keepStaleEntries = keepStaleEntries;
+ return this;
+ }
+
+ /**
+ */
+ public PipelineBuilder query(final Optional<String> query){
+ if (query.isPresent()) {
+ this.query = query.get();
+ }
+ return this;
+ }
/**
* Set our start point in our graph traversal to the specified entity id. A 1.0 compatibility API. eventually this should be replaced with
@@ -91,7 +108,7 @@
*/
@Deprecated
public IdBuilder fromId(final Id entityId){
- Pipeline<FilterResult<Id>> pipeline = new Pipeline( applicationScope, this.cursor,limit ).withFilter( filterFactory.getEntityIdFilter( entityId ) );
+ Pipeline<FilterResult<Id>> pipeline = new Pipeline( applicationScope, this.cursor,limit,keepStaleEntries,query ).withFilter( filterFactory.getEntityIdFilter( entityId ) );
return new IdBuilder( pipeline, filterFactory );
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index 7770436..20bcfe9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -26,10 +26,10 @@
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.model.field.DistanceField;
-import org.apache.usergrid.persistence.model.field.DoubleField;
import org.apache.usergrid.persistence.model.field.EntityObjectField;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
+import org.apache.usergrid.utils.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +74,7 @@
this.entityIndexFactory = entityIndexFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.indexProducer = indexProducer;
+
}
@@ -96,6 +97,9 @@
final EntityIndex applicationIndex = entityIndexFactory
.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
+ boolean keepStaleEntries = pipelineContext.getKeepStaleEntries();
+ String query = pipelineContext.getQuery();
+
//buffer them to get a page size we can make 1 network hop
final Observable<FilterResult<Entity>> searchIdSetObservable =
candidateResultsObservable.buffer( pipelineContext.getLimit() )
@@ -119,7 +123,7 @@
entitySet -> new EntityVerifier(
applicationIndex.createBatch(), entitySet, candidateResults,indexProducer)
)
- .doOnNext(entityCollector -> entityCollector.merge())
+ .doOnNext(entityCollector -> entityCollector.merge(keepStaleEntries, query))
.flatMap(entityCollector -> Observable.from(entityCollector.getResults()))
.map(entityFilterResult -> {
final Entity entity = entityFilterResult.getValue();
@@ -246,10 +250,10 @@
/**
* Merge our candidates and our entity set into results
*/
- public void merge() {
+ public void merge(boolean keepStaleEntries, String query) {
for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
- validate( candidateResult );
+ validate( candidateResult , keepStaleEntries, query);
}
indexProducer.put(batch.build()).toBlocking().lastOrDefault(null); // want to rethrow if batch fails
@@ -267,7 +271,23 @@
}
- private void validate( final FilterResult<Candidate> filterResult ) {
+ // Helper function to convert a UUID time stamp into a unix date
+ private Date UUIDTimeStampToDate(UUID uuid) {
+ long timeStamp = 0L;
+ // The UUID is supposed to be time based so this should always be '1'
+ // but this is just used for logging so we don't want to throw an error i it is misused.
+ if (uuid.version() == 1) {
+ // this is the difference between midnight October 15, 1582 UTC and midnight January 1, 1970 UTC as 100 nanosecond units
+ long epochDiff = 122192928000000000L;
+ // the UUID timestamp is in 100 nanosecond units.
+ // convert that to milliseconds
+ timeStamp = ((uuid.timestamp()-epochDiff)/10000);
+ }
+ return new Date(timeStamp);
+ }
+
+
+ private void validate( final FilterResult<Candidate> filterResult, boolean keepStaleEntries, String query ) {
final Candidate candidate = filterResult.getValue();
final CandidateResult candidateResult = candidate.getCandidateResult();
@@ -297,18 +317,40 @@
final UUID entityVersion = entity.getVersion();
final Id entityId = entity.getId();
+ //entity is newer than ES version, could be a missed or slow index event
+ if ( UUIDComparator.staticCompare(entityVersion, candidateVersion) > 0 ) {
+
+ Date candidateTimeStamp = UUIDTimeStampToDate(candidateVersion);
+ Date entityTimeStamp = UUIDTimeStampToDate(entityVersion);
+
+ Map<String,String> fields = new HashMap<>();
+ for (Field field : entity.getEntity().get().getFields()) {
+ fields.put(field.getName(),String.valueOf(field.getValue()));
+ }
+
+ logger.warn( "Found stale entity on edge {} for entityId {} Entity version date = {}. Candidate version date = {}. Will be returned in result set = {} Query = [{}] Entity fields = {}",
+ searchEdge,
+ entityId.getUuid(),
+ DateUtils.instance.formatIso8601Date(entityTimeStamp),
+ DateUtils.instance.formatIso8601Date(candidateTimeStamp),
+ keepStaleEntries,
+ query,
+ fields
+ );
+
+ if (!keepStaleEntries) {
+ batch.deindex(searchEdge, entityId, candidateVersion);
+ return;
+ }
+ }
-
-
- //entity is newer than ES version, could be an update or the entity is marked as deleted
- if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ||
- !entity.getEntity().isPresent() ||
- entity.getStatus() == MvccEntity.Status.DELETED ) {
+ // The entity is marked as deleted
+ if (!entity.getEntity().isPresent() || entity.getStatus() == MvccEntity.Status.DELETED ) {
// when updating entities, we don't delete previous versions from ES so this action is expected
if(logger.isDebugEnabled()){
- logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}",
+ logger.debug( "Deindexing deleted entity on edge {} for entityId {} and version {}",
searchEdge, entityId, entityVersion);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
index 6240028..6b6edfc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionSearch.java
@@ -42,6 +42,7 @@
private final Optional<String> cursor;
private Level level = Level.ALL;
private boolean analyzeOnly;
+ private boolean keepStaleEntries;
public CollectionSearch( final ApplicationScope applicationScope, final Id collectionOwnerId, final String
@@ -103,4 +104,12 @@
public void setAnalyzeOnly(final boolean analyzeOnly){
this.analyzeOnly = analyzeOnly;
}
+
+ public boolean getKeepStaleEntries() {
+ return keepStaleEntries;
+ }
+
+ public void setKeepStaleEntries(final boolean keepStaleEntries){
+ this.keepStaleEntries = keepStaleEntries;
+ }
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
index 7684050..e052e2e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/CollectionServiceImpl.java
@@ -22,6 +22,7 @@
import org.apache.usergrid.corepersistence.pipeline.builder.IdBuilder;
import org.apache.usergrid.corepersistence.pipeline.builder.PipelineBuilderFactory;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
+import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -58,8 +59,12 @@
final Optional<String> query = search.getQuery();
final IdBuilder pipelineBuilder =
- pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
- .withLimit( search.getLimit() ).fromId( search.getCollectionOwnerId() );
+ pipelineBuilderFactory.create( applicationScope )
+ .withCursor( search.getCursor() )
+ .withLimit( search.getLimit() )
+ .keepStaleEntries(search.getKeepStaleEntries())
+ .query(query)
+ .fromId( search.getCollectionOwnerId() );
final EntityBuilder results;
@@ -68,7 +73,7 @@
results = pipelineBuilder.traverseCollection( collectionName ).loadEntities();
}
else {
- results = pipelineBuilder.searchCollection( collectionName, query.get(),search.getEntityType(), search.getAnalyzeOnly()).loadEntities();
+ results = pipelineBuilder.searchCollection( collectionName, query.get(),search).loadEntities();
}
@@ -81,7 +86,6 @@
final ApplicationScope applicationScope = search.getApplicationScope();
final String collectionName = search.getCollectionName();
- final Optional<String> query = search.getQuery();
final IdBuilder pipelineBuilder =
pipelineBuilderFactory.create( applicationScope ).withCursor( search.getCursor() )
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
index f38cefa..66a4cfb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
@@ -21,10 +21,11 @@
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
-import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.settings.IndexConsistency;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import java.util.*;
@@ -42,38 +43,49 @@
public static final String SETTING_FIELDS = "fields";
public static final String SETTING_QUEUE_INDEX = "queueIndex";
+ public static final String SETTING_INDEX_CONSISTENCY = "indexConsistency";
private static Set<String> VALID_SETTING_NAMES = new HashSet<>();
static {
VALID_SETTING_NAMES.add(SETTING_FIELDS);
VALID_SETTING_NAMES.add(SETTING_QUEUE_INDEX);
+ VALID_SETTING_NAMES.add(SETTING_INDEX_CONSISTENCY);
}
public static Set<String> getValidSettings() {
return VALID_SETTING_NAMES;
}
- public static IndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
-
- IndexingStrategy indexingStrategy = IndexingStrategy.DEFAULT;
- String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
- if (indexing != null) {
- indexingStrategy = IndexingStrategy.get(indexing);
+ public static String validateValue(String name, Object value) {
+ if (SETTING_QUEUE_INDEX.equals(name)) {
+ return QueueIndexingStrategy.get(value.toString()).getName();
}
- return indexingStrategy;
+ if (SETTING_INDEX_CONSISTENCY.equals(name)) {
+ return IndexConsistency.get(value.toString()).getName();
+ }
+ return "";
}
- public static Boolean asyncIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
+ public static QueueIndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
+ QueueIndexingStrategy queueIndexingStrategy = QueueIndexingStrategy.CONFIG;
String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
- if ("async".equals(indexing)) {
- return Boolean.TRUE;
+ if (indexing != null) {
+ queueIndexingStrategy = QueueIndexingStrategy.get(indexing);
}
- if ("sync".equals(indexing)) {
- return Boolean.FALSE;
+ return queueIndexingStrategy;
+ }
+
+
+ public static IndexConsistency getIndexConsistencyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
+
+ IndexConsistency indexConsistency = IndexConsistency.STRICT;
+ String indexConsistencyString = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_INDEX_CONSISTENCY);
+ if ( indexConsistencyString != null) {
+ indexConsistency = IndexConsistency.get(indexConsistencyString);
}
- return null;
+ return indexConsistency;
}
public static boolean skipIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index b444199..437f9bf 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -153,7 +153,6 @@
*/
String[] getIndexes();
-
/**
* type of alias
*/
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 3d2f576..211cf70 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -773,6 +773,7 @@
return Health.valueOf( chr.getStatus().name() );
}
catch ( Exception ex ) {
+ ex.printStackTrace();
logger.error( "Error connecting to ElasticSearch", ex.getMessage() );
}
@@ -859,7 +860,6 @@
}
-
/**
* Interface for operations.
*/
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index ac7d10d..902c5d3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -484,6 +484,7 @@
}
+
@Test
public void deleteVerification() throws Throwable {
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index 4a12d14..4cb6f37 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -110,13 +110,12 @@
@Default("900000") // 15 minutes
int getMapMessageTimeout();
- @Key("usergrid.queue.is.async")
- @Default("true")
- boolean isAsyncQueue();
-
-
@Key("usergrid.queue.strategy")
@Default("async")
String getQueueStrategy();
+ @Key("usergrid.queue.test")
+ @Default("false")
+ String getQueueDebugMode();
+
}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index bc9be57..b18411d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -26,6 +26,7 @@
import com.amazonaws.ClientConfiguration;
import com.amazonaws.services.sqs.model.*;
+import org.apache.usergrid.persistence.queue.settings.QueueIndexingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -541,10 +542,9 @@
}
}
-
@Override
public <T extends Serializable> void sendMessageToAllRegions(final T body, Boolean async) throws IOException {
- boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
+ boolean sendAsync = async == null || async.booleanValue();
if (sendAsync) {
sendMessageToAllRegionsAsync(body);
} else {
@@ -552,7 +552,6 @@
}
}
-
private <T extends Serializable> void sendMessageToAllRegionsSync(final T body) throws IOException {
if ( sns == null ) {
logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
@@ -634,8 +633,9 @@
@Override
public void sendMessages( final List bodies ) throws IOException {
+ QueueIndexingStrategy queueIndexingStrategy = QueueIndexingStrategy.get(fig.getQueueStrategy());
for ( Object body : bodies ) {
- if (fig.isAsyncQueue()) {
+ if (queueIndexingStrategy == QueueIndexingStrategy.ASYNC) {
sendMessageToLocalRegionAsync((Serializable) body);
} else {
sendMessageToLocalRegionSync((Serializable) body);
@@ -682,7 +682,7 @@
@Override
public <T extends Serializable> void sendMessageToLocalRegion(final T body, Boolean async) throws IOException {
- boolean sendAsync = async == null ? fig.isAsyncQueue() : async.booleanValue();
+ boolean sendAsync = async.booleanValue();
if (sendAsync) {
sendMessageToLocalRegionAsync(body);
} else {
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
new file mode 100644
index 0000000..531716a
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/IndexConsistency.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.queue.settings;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the consistency rules when returning results set between C* and ES
+ *
+ * Created by peterajohnson on 10/30/17.
+ */
+public enum IndexConsistency {
+
+ STRICT("strict"), // Result canidate must be exact match to be returned in result set
+ LATEST("latest"); // Result canidate must be exact match OR most recent version to be returned in result set
+
+ private String name;
+
+ private static final Map<String,IndexConsistency> NAME_MAP;
+
+ static {
+ Map<String,IndexConsistency> map = new HashMap<>();
+ for (IndexConsistency instance : IndexConsistency.values()) {
+ map.put(instance.getName(),instance);
+ }
+ NAME_MAP = Collections.unmodifiableMap(map);
+ }
+
+ IndexConsistency(String name) {
+ this.name = name;
+ }
+
+ public static IndexConsistency get(String name) {
+ IndexConsistency queueIndexingStrategy = NAME_MAP.get(name);
+ if (queueIndexingStrategy == null) {
+ return LATEST;
+ }
+ return queueIndexingStrategy;
+ }
+
+
+ public String getName() {
+ return this.name;
+ }
+
+}
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java
new file mode 100644
index 0000000..375de71
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/settings/QueueIndexingStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.queue.settings;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the paths an index request can take
+ * between tomcat and ES.
+ *
+ * Created by peterajohnson on 10/30/17.
+ */
+public enum QueueIndexingStrategy {
+
+ NOINDEX("debug_noindex"), // Do not Index the entity (DEBUG only use for testing)
+ DIRECTONLY("debug_directonly"), // Index request is sent directly to ES and not to AWS
+
+ DIRECT("direct"), // Index request is sent directly to ES before sync ASW
+ SYNC("sync"), // Index request is sent via a sync AWS to ES
+ ASYNC("async"), // Index request is sent via an async AWS to ES
+ CONFIG("config"); // Follow the default setting of the fig
+
+ private String name;
+
+ private static final Map<String,QueueIndexingStrategy> NAME_MAP;
+
+ static {
+ Map<String,QueueIndexingStrategy> map = new HashMap<String,QueueIndexingStrategy>();
+ for (QueueIndexingStrategy instance : QueueIndexingStrategy.values()) {
+ map.put(instance.getName(),instance);
+ }
+ NAME_MAP = Collections.unmodifiableMap(map);
+ }
+
+ QueueIndexingStrategy(String name) {
+ this.name = name;
+ }
+
+ public static QueueIndexingStrategy get(String name) {
+ QueueIndexingStrategy queueIndexingStrategy = NAME_MAP.get(name);
+ if (queueIndexingStrategy == null) {
+ return CONFIG;
+ }
+ return queueIndexingStrategy;
+ }
+
+
+ public String getName() {
+ return this.name;
+ }
+
+ public boolean shouldSendDirectToES() {
+ return (this == QueueIndexingStrategy.DIRECT || this == QueueIndexingStrategy.DIRECTONLY);
+ }
+
+ public boolean shouldSendToAWS() {
+ // and is in same region.
+ return (this != QueueIndexingStrategy.DIRECTONLY && this != QueueIndexingStrategy.NOINDEX);
+ }
+
+}
+
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java
similarity index 94%
rename from stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
rename to stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java
index f562475..77e06ed 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPInterceptor.java
@@ -35,12 +35,13 @@
* If the request had an ACCEPT_ENCODING header containing 'gzip' then
* gzip the response and add CONTENT_ENCODING gzip header
*
- * * If the request had an CONTENT_ENCODING header containing 'gzip' then
+ * If the request had an CONTENT_ENCODING header containing 'gzip' then
* unzip the request and remove the CONTENT_ENCODING gzip header
+ *
* Created by peterajohnson on 11/1/17.
*/
@Provider
-public class GZIPWriterInterceptor implements ReaderInterceptor, WriterInterceptor {
+public class GZIPInterceptor implements ReaderInterceptor, WriterInterceptor {
final private static String GZIP = "gzip";
@Inject