Allow index requests to be sent directly to ES
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 3f3794e..1c979d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -32,6 +32,7 @@
import org.apache.usergrid.corepersistence.index.CollectionSettings;
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
@@ -523,7 +524,7 @@
String entityType = cpEntity.getId().getType();
boolean skipIndexingForType = skipIndexingForType(entityType);
- Boolean asyncIndex = asyncIndexingForType(entityType);
+ IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
try {
@@ -551,14 +552,14 @@
}
if (!skipIndexingForType) {
- indexEntity(cpEntity, asyncIndex);
+ indexEntity(cpEntity, indexingStrategy);
deIndexOldVersionsOfEntity(cpEntity);
}
}
- private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) {
+ private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, IndexingStrategy indexingStrategy) {
// queue an event to update the new entity
- indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async);
+ indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , indexingStrategy);
}
private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
@@ -568,12 +569,11 @@
}
}
-
- private Boolean asyncIndexingForType( String type ) {
- return CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, applicationId, type);
-
+ private IndexingStrategy getIndexingStrategyForType(String type ) {
+ return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
}
+
private boolean skipIndexingForType( String type ) {
return CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, type);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 06f06ad..a23d6ac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -23,6 +23,7 @@
import org.apache.usergrid.corepersistence.index.CollectionSettings;
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
@@ -396,8 +397,8 @@
String entityType = cpHeadEntity.getId().getType();
if ( !skipIndexingForType( entityType) ) {
- Boolean async = asyncIndexingForType(entityType);
- indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, async);
+ IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
+ indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, indexingStrategy);
}
} );
@@ -405,8 +406,8 @@
String entityType = memberEntity.getId().getType();
if ( !skipIndexingForType( entityType ) ) {
- Boolean async = asyncIndexingForType(entityType);
- indexService.queueNewEdge(applicationScope, memberEntityId, edge, async);
+ IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
+ indexService.queueNewEdge(applicationScope, memberEntityId, edge, indexingStrategy);
}
@@ -714,7 +715,6 @@
ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
-
if ( logger.isTraceEnabled() ) {
logger.trace( "createConnection(): Indexing connection type '{}'\n from source {}:{}]\n to target {}:{}\n app {}",
connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(),
@@ -738,8 +738,8 @@
String entityType = targetEntity.getId().getType();
if ( !skipIndexingForType( entityType ) ) {
- Boolean async = asyncIndexingForType(entityType);
- indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, async);
+ IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
+ indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, indexingStrategy);
}
// remove any duplicate edges (keeps the duplicate edge with same timestamp)
@@ -1100,8 +1100,8 @@
}
- private Boolean asyncIndexingForType( String type ) {
- return CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, applicationId, type);
+ private IndexingStrategy getIndexingStrategyForType(String type ) {
+ return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1ddbac4..4305aea 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -21,6 +21,7 @@
import org.apache.usergrid.corepersistence.index.CollectionDeleteAction;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -52,8 +53,9 @@
* @param applicationScope
* @param entity The entity to index. Should be fired when an entity is updated
* @param updatedAfter
+ * @param
*/
- void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, Boolean async);
+ void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, IndexingStrategy strategy);
/**
@@ -66,7 +68,7 @@
* @param entityId
* @param newEdge
*/
- void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, Boolean async);
+ void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, IndexingStrategy indexingStrategy);
/**
* Queue the deletion of an edge
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 3e67110..8257640 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -29,11 +29,9 @@
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
@@ -216,6 +214,10 @@
start();
}
+ protected Histogram getMessageCycye() {
+ return messageCycle;
+ }
+
private String getQueueName(AsyncEventQueueType queueType) {
switch (queueType) {
case REGULAR:
@@ -275,25 +277,29 @@
/**
* Offer the EntityIdScope to SQS
*/
- private void offer(final Serializable operation) {
- offer(operation, AsyncEventQueueType.REGULAR, null);
+ protected void offer(final Serializable operation) {
+ offer(operation, AsyncEventQueueType.REGULAR, IndexingStrategy.DIRECT);
}
/**
* Offer the EntityIdScope to SQS
*/
- private void offer(final Serializable operation, Boolean async) {
- offer(operation, AsyncEventQueueType.REGULAR, async);
+ protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) {
+ offer(operation, AsyncEventQueueType.REGULAR, indexingStrategy);
}
/**
* Offer the EntityIdScope to SQS
*/
- private void offer(final Serializable operation, AsyncEventQueueType queueType, Boolean async) {
+ private void offer(final Serializable operation, AsyncEventQueueType queueType, IndexingStrategy indexingStrategy) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
+ Boolean async = null;
+ if (indexingStrategy != IndexingStrategy.DEFAULT) {
+ async = (indexingStrategy == IndexingStrategy.ASYNC);
+ }
getQueue(queueType).sendMessageToLocalRegion(operation, async);
} catch (IOException e) {
@@ -402,7 +408,7 @@
* @param messages
* @return
*/
- private List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {
+ protected List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {
if (logger.isDebugEnabled()) {
logger.debug("callEventHandlers with {} message(s)", messages.size());
@@ -542,7 +548,7 @@
@Override
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
- final Entity entity, long updatedAfter, Boolean async) {
+ final Entity entity, long updatedAfter, IndexingStrategy indexingStrategy) {
if (logger.isTraceEnabled()) {
@@ -555,7 +561,7 @@
new EntityIdScope(applicationScope, entity.getId()),
updatedAfter);
- offer(event, async);
+ offer(event, indexingStrategy);
}
@@ -593,14 +599,14 @@
public void queueNewEdge(final ApplicationScope applicationScope,
final Id entityId,
final Edge newEdge,
- Boolean async) {
+ IndexingStrategy indexingStrategy) {
if (logger.isTraceEnabled()) {
logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
newEdge.getType(), entityId.getUuid(), entityId.getType());
}
- offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), async);
+ offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), indexingStrategy);
}
@@ -704,7 +710,24 @@
offerTopic( elasticsearchIndexEvent, queueType );
}
- private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
+ protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) {
+
+ final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
+
+ final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+ final int expirationTimeInSeconds =
+ ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
+
+ //write to the map in ES
+ esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
+
+ return new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
+
+ }
+
+
+ protected void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
throws IndexDocNotFoundException {
Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 2ba6c0b..e5e981b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -22,6 +22,7 @@
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.asyncevents.direct.DirectFirstEventServiceImpl;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
@@ -105,7 +106,40 @@
final LegacyQueueManager.Implementation impl = LegacyQueueManager.Implementation.valueOf(value);
- final AsyncEventServiceImpl asyncEventService = new AsyncEventServiceImpl(
+ final AsyncEventServiceImpl asyncEventService = getAsyncEventService();
+
+ if ( impl.equals( LOCAL )) {
+ asyncEventService.MAX_TAKE = 1000;
+ }
+
+ if ( impl.equals( DISTRIBUTED )) {
+ asyncEventService.MAX_TAKE = 500;
+ }
+
+ return asyncEventService;
+ }
+
+
+ private AsyncEventServiceImpl getAsyncEventService() {
+
+
+ AsyncEventServiceImpl asyncEventService;
+/*
+ asyncEventService = new AsyncEventServiceImpl(
+ queueManagerFactory,
+ indexProcessorFig,
+ indexProducer,
+ metricsFactory,
+ entityCollectionManagerFactory,
+ indexLocationStrategyFactory,
+ entityIndexFactory,
+ eventBuilder,
+ mapManagerFactory,
+ queueFig,
+ rxTaskScheduler);
+ */
+
+ asyncEventService = new DirectFirstEventServiceImpl(
queueManagerFactory,
indexProcessorFig,
indexProducer,
@@ -118,14 +152,6 @@
queueFig,
rxTaskScheduler );
- if ( impl.equals( LOCAL )) {
- asyncEventService.MAX_TAKE = 1000;
- }
-
- if ( impl.equals( DISTRIBUTED )) {
- asyncEventService.MAX_TAKE = 500;
- }
-
return asyncEventService;
}
}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java
new file mode 100644
index 0000000..ab5f0b9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Created by peterajohnson on 9/8/17.
+ */
+public interface BufferedQueue<T> {
+
+ /**
+ * Set the consumer of these events
+ * @param consumer
+ */
+ void setConsumer(Consumer<List<T>> consumer);
+
+ /**
+ * Offer an entity. May block
+ *
+ * @param t
+ * @return
+ */
+ boolean offer(T t);
+
+ /**
+ * @return the current size of the queue
+ */
+ int size();
+
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
new file mode 100644
index 0000000..9123138
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+/**
+ * Bufferes events and dispatched then in batches.
+ * Ensures that the callback will be called at a min interval.
+ */
+public class BufferedQueueImpl<T> implements BufferedQueue<T> {
+
+ private String fileName = "my_file_name.txt";
+ private Consumer<List<T>> consumer;
+
+ ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
+
+ private final LinkedBlockingQueue<PendingDispatch> queue;
+ private final long intervalNanos;
+ private long timeOfLastDispatch = 0L;
+
+ public BufferedQueueImpl(int size, long interval , TimeUnit intervalTimeUnit) {
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new DispatchTask()));
+
+ this.intervalNanos = intervalTimeUnit.toNanos(interval);
+ threadPool.scheduleAtFixedRate(new DispatchTask(), intervalNanos,intervalNanos, TimeUnit.NANOSECONDS);
+ readBatchFile();
+ queue = new LinkedBlockingQueue<>(size);
+ }
+
+ public boolean offer(T t) {
+ PendingDispatch pd = new PendingDispatch(t);
+ if (timeOfLastDispatch + intervalNanos < System.nanoTime()) {
+ dispatchOne(pd);
+ return true;
+ }
+ try {
+ return queue.offer(pd, intervalNanos, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ public void setConsumer(Consumer<List<T>> consumer) {
+ this.consumer = consumer;
+ }
+
+
+ private void dispatchOne(PendingDispatch pd) {
+ List<PendingDispatch> messages = new ArrayList<>();
+ messages.add(pd);
+ dispatchMessages(messages);
+ }
+
+ protected void dispatchAll() {
+ if (!queue.isEmpty()) {
+ List<PendingDispatch> messages = new ArrayList<>();
+ queue.drainTo(messages);
+ dispatchMessages(messages);
+ }
+ }
+
+ private void dispatchMessages(List<PendingDispatch> messages) {
+ List<T> m = new ArrayList<>();
+ for (PendingDispatch pd : messages) {
+ if (!pd.isCancelled()) {
+ m.add(pd.getWrapped());
+ }
+ }
+ timeOfLastDispatch = System.nanoTime();
+ Boolean sent = Boolean.TRUE;
+ try {
+ consumer.accept(m);
+ } catch (Exception e) {
+ sent = Boolean.FALSE;
+ }
+ for (PendingDispatch pd : messages) {
+ pd.setResult(sent);
+ }
+ }
+
+
+ public int size() {
+ return queue.size();
+ }
+
+ private void readBatchFile() {
+
+ }
+
+
+ //
+ // Internal Helper classes
+ //
+
+
+
+ private class PendingDispatch implements Future<Boolean> {
+ T wrapped;
+ boolean canceled;
+ boolean done;
+ Boolean result = null;
+
+ PendingDispatch(T wrapped) {
+ this.wrapped = wrapped;
+ canceled = false;
+ done = false;
+ }
+
+ T getWrapped() {
+ return wrapped;
+ }
+
+ void setResult(Boolean b) {
+ result = b;
+ done = true;
+ synchronized (this) {
+ notify();
+ }
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ canceled = true;
+ return canceled;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return canceled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return done;
+ }
+
+ @Override
+ public Boolean get() throws InterruptedException, ExecutionException {
+ while (!done) {
+ synchronized (this) {
+ wait(100);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ if (!done) {
+ synchronized (this) {
+ wait(unit.toMillis(timeout));
+ }
+ }
+ return result;
+ }
+ }
+
+
+ private class DispatchTask implements Runnable {
+ @Override
+ public void run() {
+ try {
+ dispatchAll();
+ } catch (Throwable t) {
+ }
+ }
+ }
+
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
new file mode 100644
index 0000000..f842cea
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+import java.util.function.Consumer;
+
+/**
+ * Created by peterajohnson on 10/27/17.
+ */
+public class BufferedQueueNOP<T> implements BufferedQueue<T> {
+
+ private Consumer consumer;
+
+ @Override
+ public void setConsumer(Consumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public boolean offer(T o) {
+ consumer.accept(o);
+ return true;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
new file mode 100644
index 0000000..4dfce37
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the AsyncEventService that writes first directly to ES
+ * and then submits to ASW as a backup.
+ *
+ * Created by peterajohnson on 8/29/17.
+ */
+public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
+
+
+ private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
+
+ private IndexingStrategy configIndexingStrategy = IndexingStrategy.ASYNC;
+
+ private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
+
+ public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
+ super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
+
+ //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
+ bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
+
+ configIndexingStrategy = IndexingStrategy.get(queueFig.getQueueStrategy());
+
+ }
+
+ protected void dispatchToES(final List<Serializable> bodies) {
+
+ List<LegacyQueueMessage> messages = new ArrayList<>();
+ for (Serializable body : bodies) {
+ String uuid = UUID.randomUUID().toString();
+ LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
+ messages.add(message);
+ }
+
+ List<IndexEventResult> result = callEventHandlers(messages);
+
+ // failed to dispatch send to SQS
+ try {
+ List<LegacyQueueMessage> indexedMessages = submitToIndex(result, false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ for (Serializable body : bodies) {
+ super.offer(body);
+ }
+ }
+
+
+ }
+
+ /**
+ * Offer the EntityIdScope to SQS
+ *
+ * The body will be an implementation of one of the following:
+ * EntityIndexEvent
+ * EntityDeleteEvent
+ * EdgeIndexEvent
+ * EdgeDeleteEvent
+ */
+ protected void offer(final Serializable body) {
+ List<LegacyQueueMessage> messages = getMessageArray(body);
+ List<IndexEventResult> result = callEventHandlers(messages);
+ submitToIndex( result, false );
+ super.offer(body);
+ }
+
+ private List<LegacyQueueMessage> getMessageArray(final Serializable body) {
+ String uuid = UUID.randomUUID().toString();
+
+ LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Sync Handler called for body of class {} ", body.getClass().getSimpleName());
+ }
+
+ List<LegacyQueueMessage> messages = new ArrayList<>();
+ messages.add(message);
+ return messages;
+ }
+
+
+ protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) {
+ if (shouldSendToDirectToES(indexingStrategy)) {
+ List<LegacyQueueMessage> messages = getMessageArray(operation);
+ List<IndexEventResult> result = callEventHandlers(messages);
+ submitToIndex( result, false );
+ }
+
+ // only if single region.
+ if (shouldSendToAWS(indexingStrategy)) {
+ super.offer(operation, indexingStrategy);
+ }
+ }
+
+
+ protected List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
+
+ // if nothing came back then return empty list
+ if(indexEventResults==null){
+ return new ArrayList<>(0);
+ }
+
+ IndexOperationMessage combined = new IndexOperationMessage();
+ List<LegacyQueueMessage> queueMessages = indexEventResults.stream()
+
+ // filter out messages that are not present, they were not processed and put into the results
+ .filter( result -> result.getQueueMessage().isPresent() )
+ .map(indexEventResult -> {
+
+ //record the cycle time
+ getMessageCycye().update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+
+ // ingest each index op into our combined, single index op for the index producer
+ if(indexEventResult.getIndexOperationMessage().isPresent()){
+ combined.ingest(indexEventResult.getIndexOperationMessage().get());
+ }
+
+ return indexEventResult.getQueueMessage().get();
+ })
+ // collect into a list of QueueMessages that can be ack'd later
+ .collect(Collectors.toList());
+
+
+ // dispatch to ES
+ ElasticsearchIndexEvent elasticsearchIndexEvent = getIndexOperationMessage(combined);
+ handleIndexOperation(elasticsearchIndexEvent);
+ return queueMessages;
+ }
+
+ private boolean shouldSendToDirectToES(IndexingStrategy indexingStrategy) {
+ if (indexingStrategy == IndexingStrategy.DEFAULT) {
+ indexingStrategy = configIndexingStrategy;
+ }
+ return (indexingStrategy == IndexingStrategy.DIRECT || indexingStrategy == IndexingStrategy.DIRECTONLY);
+ }
+
+ private boolean shouldSendToAWS(IndexingStrategy indexingStrategy) {
+ if (indexingStrategy == IndexingStrategy.DEFAULT) {
+ indexingStrategy = configIndexingStrategy;
+ }
+ // and is in same region.
+ return (indexingStrategy != IndexingStrategy.DIRECTONLY);
+ }
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
new file mode 100644
index 0000000..69c5445
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.index;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the paths an index request can take
+ * between tomcat and ES.
+ *
+ * Created by peterajohnson on 10/30/17.
+ */
+public enum IndexingStrategy {
+
+ DIRECTONLY("directonly"), // Index request is sent directly to ES and not to AWS
+ DIRECT("direct"), // Index request is sent directly to ES before sync ASW
+ SYNC("sync"), // Index request is sent via a sync AWS to ES
+ ASYNC("async"), // Index request is sent via an async AWS to ES
+ DEFAULT("default"); // Follow the default setting
+
+ private String name;
+
+ private static final Map<String,IndexingStrategy> NAME_MAP;
+
+ static {
+ Map<String,IndexingStrategy> map = new HashMap<String,IndexingStrategy>();
+ for (IndexingStrategy instance : IndexingStrategy.values()) {
+ map.put(instance.getName(),instance);
+ }
+ NAME_MAP = Collections.unmodifiableMap(map);
+ }
+
+ IndexingStrategy(String name) {
+ this.name = name;
+ }
+
+ public static IndexingStrategy get(String name) {
+ IndexingStrategy indexingStrategy = NAME_MAP.get(name);
+ if (indexingStrategy == null) {
+ return DEFAULT;
+ }
+ return indexingStrategy;
+ }
+
+
+ public String getName() {
+ return this.name;
+ }
+
+}
+
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
index cef6d12..f38cefa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
@@ -21,10 +21,12 @@
import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+
import java.util.*;
import static org.apache.usergrid.persistence.Schema.*;
@@ -52,6 +54,16 @@
return VALID_SETTING_NAMES;
}
+ public static IndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
+
+ IndexingStrategy indexingStrategy = IndexingStrategy.DEFAULT;
+ String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
+ if (indexing != null) {
+ indexingStrategy = IndexingStrategy.get(indexing);
+ }
+ return indexingStrategy;
+ }
+
public static Boolean asyncIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index f19bede..4a12d14 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -114,4 +114,9 @@
@Default("true")
boolean isAsyncQueue();
+
+ @Key("usergrid.queue.strategy")
+ @Default("async")
+ String getQueueStrategy();
+
}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
new file mode 100644
index 0000000..f562475
--- /dev/null
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.rest.interceptors;
+
+import org.glassfish.jersey.server.ContainerRequest;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import javax.inject.*;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.ext.*;
+import javax.ws.rs.ext.Provider;
+
+/**
+ * If the request had an ACCEPT_ENCODING header containing 'gzip' then
+ * gzip the response and add CONTENT_ENCODING gzip header
+ *
+ * * If the request had an CONTENT_ENCODING header containing 'gzip' then
+ * unzip the request and remove the CONTENT_ENCODING gzip header
+ * Created by peterajohnson on 11/1/17.
+ */
+@Provider
+public class GZIPWriterInterceptor implements ReaderInterceptor, WriterInterceptor {
+
+ final private static String GZIP = "gzip";
+ @Inject
+ private javax.inject.Provider<ContainerRequest> requestProvider;
+
+ @Override
+ public void aroundWriteTo(WriterInterceptorContext context) throws IOException,WebApplicationException {
+ ContainerRequest request = requestProvider.get();
+
+ if (request != null) {
+ List<String> aeHeaders = request.getRequestHeader(HttpHeaders.ACCEPT_ENCODING);
+ if (aeHeaders != null && aeHeaders.size() > 0) {
+ String acceptEncodingHeader = aeHeaders.get(0);
+ if (acceptEncodingHeader.contains(GZIP)) {
+ OutputStream outputStream = context.getOutputStream();
+ context.setOutputStream(new GZIPOutputStream(outputStream));
+ context.getHeaders().putSingle(HttpHeaders.CONTENT_ENCODING, GZIP);
+ }
+ }
+ }
+ context.proceed();
+ }
+
+ @Override
+ public Object aroundReadFrom(ReaderInterceptorContext context) throws IOException, WebApplicationException {
+ String encoding = context.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING);
+ if (GZIP.equalsIgnoreCase(encoding)) {
+ GZIPInputStream is = new GZIPInputStream(context.getInputStream());
+ context.getHeaders().remove(HttpHeaders.CONTENT_ENCODING);
+ context.setInputStream(is);
+ }
+
+ return context.proceed();
+ }
+}