Allow index requests to be sent directly to ES
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 3f3794e..1c979d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -32,6 +32,7 @@
 import org.apache.usergrid.corepersistence.index.CollectionSettings;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.corepersistence.service.CollectionService;
 import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.corepersistence.util.CpCollectionUtils;
@@ -523,7 +524,7 @@
 
         String entityType = cpEntity.getId().getType();
         boolean skipIndexingForType = skipIndexingForType(entityType);
-        Boolean asyncIndex = asyncIndexingForType(entityType);
+        IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
 
         try {
 
@@ -551,14 +552,14 @@
         }
 
         if (!skipIndexingForType) {
-            indexEntity(cpEntity, asyncIndex);
+            indexEntity(cpEntity, indexingStrategy);
             deIndexOldVersionsOfEntity(cpEntity);
         }
     }
 
-    private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) {
+    private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, IndexingStrategy indexingStrategy) {
         // queue an event to update the new entity
-        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async);
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , indexingStrategy);
     }
 
     private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) {
@@ -568,12 +569,11 @@
         }
     }
 
-
-    private Boolean asyncIndexingForType( String type ) {
-        return CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, applicationId, type);
-
+    private IndexingStrategy getIndexingStrategyForType(String type ) {
+        return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
     }
 
+
     private boolean skipIndexingForType( String type ) {
         return CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, type);
     }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 06f06ad..a23d6ac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -23,6 +23,7 @@
 import org.apache.usergrid.corepersistence.index.CollectionSettings;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor;
 import org.apache.usergrid.corepersistence.results.EntityQueryExecutor;
@@ -396,8 +397,8 @@
 
                 String entityType = cpHeadEntity.getId().getType();
                 if ( !skipIndexingForType( entityType) ) {
-                    Boolean async = asyncIndexingForType(entityType);
-                    indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, async);
+                    IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
+                    indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, indexingStrategy);
                 }
 
             } );
@@ -405,8 +406,8 @@
 
             String entityType = memberEntity.getId().getType();
             if ( !skipIndexingForType( entityType ) ) {
-                Boolean async = asyncIndexingForType(entityType);
-                indexService.queueNewEdge(applicationScope, memberEntityId, edge, async);
+                IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
+                indexService.queueNewEdge(applicationScope, memberEntityId, edge, indexingStrategy);
             }
 
 
@@ -714,7 +715,6 @@
 
         ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
 
-
         if ( logger.isTraceEnabled() ) {
             logger.trace( "createConnection(): Indexing connection type '{}'\n   from source {}:{}]\n   to target {}:{}\n   app {}",
                 connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(),
@@ -738,8 +738,8 @@
 
         String entityType = targetEntity.getId().getType();
         if ( !skipIndexingForType( entityType ) ) {
-            Boolean async = asyncIndexingForType(entityType);
-            indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, async);
+            IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType);
+            indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, indexingStrategy);
         }
 
         // remove any duplicate edges (keeps the duplicate edge with same timestamp)
@@ -1100,8 +1100,8 @@
 
     }
 
-    private Boolean asyncIndexingForType( String type ) {
-        return CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, applicationId, type);
+    private IndexingStrategy getIndexingStrategyForType(String type ) {
+        return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type);
 
     }
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1ddbac4..4305aea 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -21,6 +21,7 @@
 
 
 import org.apache.usergrid.corepersistence.index.CollectionDeleteAction;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.corepersistence.index.ReIndexAction;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -52,8 +53,9 @@
      * @param applicationScope
      * @param entity The entity to index.  Should be fired when an entity is updated
      * @param updatedAfter
+     * @param
      */
-    void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, Boolean async);
+    void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, IndexingStrategy strategy);
 
 
     /**
@@ -66,7 +68,7 @@
      * @param entityId
      * @param newEdge
      */
-    void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, Boolean async);
+    void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, IndexingStrategy indexingStrategy);
 
     /**
      * Queue the deletion of an edge
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 3e67110..8257640 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -29,11 +29,9 @@
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.index.*;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
@@ -216,6 +214,10 @@
         start();
     }
 
+    protected Histogram getMessageCycye() {
+        return messageCycle;
+    }
+
     private String getQueueName(AsyncEventQueueType queueType) {
         switch (queueType) {
             case REGULAR:
@@ -275,25 +277,29 @@
     /**
      * Offer the EntityIdScope to SQS
      */
-    private void offer(final Serializable operation) {
-        offer(operation, AsyncEventQueueType.REGULAR, null);
+    protected void offer(final Serializable operation) {
+        offer(operation, AsyncEventQueueType.REGULAR, IndexingStrategy.DIRECT);
     }
 
     /**
      * Offer the EntityIdScope to SQS
      */
-    private void offer(final Serializable operation, Boolean async) {
-        offer(operation, AsyncEventQueueType.REGULAR, async);
+    protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) {
+        offer(operation, AsyncEventQueueType.REGULAR, indexingStrategy);
     }
 
      /**
       * Offer the EntityIdScope to SQS
       */
-    private void offer(final Serializable operation, AsyncEventQueueType queueType, Boolean async) {
+    private void offer(final Serializable operation, AsyncEventQueueType queueType, IndexingStrategy indexingStrategy) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
+            Boolean async = null;
+            if (indexingStrategy != IndexingStrategy.DEFAULT) {
+                async = (indexingStrategy == IndexingStrategy.ASYNC);
+            }
             getQueue(queueType).sendMessageToLocalRegion(operation, async);
 
         } catch (IOException e) {
@@ -402,7 +408,7 @@
      * @param messages
      * @return
      */
-    private List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {
+    protected List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {
 
         if (logger.isDebugEnabled()) {
             logger.debug("callEventHandlers with {} message(s)", messages.size());
@@ -542,7 +548,7 @@
 
     @Override
     public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
-                                       final Entity entity, long updatedAfter, Boolean async) {
+                                       final Entity entity, long updatedAfter, IndexingStrategy indexingStrategy) {
 
 
         if (logger.isTraceEnabled()) {
@@ -555,7 +561,7 @@
             new EntityIdScope(applicationScope, entity.getId()),
             updatedAfter);
 
-        offer(event, async);
+        offer(event, indexingStrategy);
 
     }
 
@@ -593,14 +599,14 @@
     public void queueNewEdge(final ApplicationScope applicationScope,
                              final Id entityId,
                              final Edge newEdge,
-                             Boolean async) {
+                             IndexingStrategy indexingStrategy) {
 
         if (logger.isTraceEnabled()) {
             logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}",
                 newEdge.getType(), entityId.getUuid(), entityId.getType());
         }
 
-        offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), async);
+        offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), indexingStrategy);
 
     }
 
@@ -704,7 +710,24 @@
         offerTopic( elasticsearchIndexEvent, queueType );
     }
 
-    private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
+    protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) {
+
+        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
+
+        final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+        final int expirationTimeInSeconds =
+            ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
+
+        //write to the map in ES
+        esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
+
+        return new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
+
+    }
+
+
+    protected void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent)
         throws IndexDocNotFoundException {
 
         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 2ba6c0b..e5e981b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -22,6 +22,7 @@
 
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.asyncevents.direct.DirectFirstEventServiceImpl;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
@@ -105,7 +106,40 @@
 
         final LegacyQueueManager.Implementation impl = LegacyQueueManager.Implementation.valueOf(value);
 
-        final AsyncEventServiceImpl asyncEventService = new AsyncEventServiceImpl(
+        final AsyncEventServiceImpl asyncEventService = getAsyncEventService();
+
+        if ( impl.equals( LOCAL )) {
+            asyncEventService.MAX_TAKE = 1000;
+        }
+
+        if ( impl.equals( DISTRIBUTED )) {
+            asyncEventService.MAX_TAKE = 500;
+        }
+
+        return asyncEventService;
+    }
+
+
+    private AsyncEventServiceImpl getAsyncEventService() {
+
+
+        AsyncEventServiceImpl asyncEventService;
+/*
+       asyncEventService =  new AsyncEventServiceImpl(
+                queueManagerFactory,
+                indexProcessorFig,
+                indexProducer,
+                metricsFactory,
+                entityCollectionManagerFactory,
+                indexLocationStrategyFactory,
+                entityIndexFactory,
+                eventBuilder,
+                mapManagerFactory,
+                queueFig,
+                rxTaskScheduler);
+        */
+
+        asyncEventService = new DirectFirstEventServiceImpl(
             queueManagerFactory,
             indexProcessorFig,
             indexProducer,
@@ -118,14 +152,6 @@
             queueFig,
             rxTaskScheduler );
 
-        if ( impl.equals( LOCAL )) {
-            asyncEventService.MAX_TAKE = 1000;
-        }
-
-        if ( impl.equals( DISTRIBUTED )) {
-            asyncEventService.MAX_TAKE = 500;
-        }
-
         return asyncEventService;
     }
 }
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java
new file mode 100644
index 0000000..ab5f0b9
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Created by peterajohnson on 9/8/17.
+ */
+public interface BufferedQueue<T> {
+
+    /**
+     * Set the consumer of these events
+     * @param consumer
+     */
+    void setConsumer(Consumer<List<T>> consumer);
+
+    /**
+     * Offer an entity. May block
+     *
+     * @param t
+     * @return
+     */
+    boolean offer(T t);
+
+    /**
+     * @return the current size of the queue
+     */
+    int size();
+
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
new file mode 100644
index 0000000..9123138
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+/**
+ * Bufferes events and dispatched then in batches.
+ * Ensures that the callback will be called at a min interval.
+ */
+public class BufferedQueueImpl<T> implements BufferedQueue<T> {
+
+    private String fileName = "my_file_name.txt";
+    private Consumer<List<T>> consumer;
+
+    ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
+
+    private final LinkedBlockingQueue<PendingDispatch> queue;
+    private final long intervalNanos;
+    private long timeOfLastDispatch = 0L;
+
+    public BufferedQueueImpl(int size, long interval , TimeUnit intervalTimeUnit) {
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new DispatchTask()));
+
+        this.intervalNanos = intervalTimeUnit.toNanos(interval);
+        threadPool.scheduleAtFixedRate(new DispatchTask(), intervalNanos,intervalNanos, TimeUnit.NANOSECONDS);
+        readBatchFile();
+        queue = new LinkedBlockingQueue<>(size);
+    }
+
+    public boolean offer(T t) {
+        PendingDispatch pd = new PendingDispatch(t);
+        if (timeOfLastDispatch + intervalNanos < System.nanoTime()) {
+            dispatchOne(pd);
+            return true;
+        }
+        try {
+            return queue.offer(pd, intervalNanos, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException e) {
+            return false;
+        }
+    }
+
+    public void setConsumer(Consumer<List<T>> consumer) {
+        this.consumer = consumer;
+    }
+
+
+    private void dispatchOne(PendingDispatch pd) {
+        List<PendingDispatch> messages = new ArrayList<>();
+        messages.add(pd);
+        dispatchMessages(messages);
+    }
+
+    protected void dispatchAll() {
+        if (!queue.isEmpty()) {
+            List<PendingDispatch> messages = new ArrayList<>();
+            queue.drainTo(messages);
+            dispatchMessages(messages);
+        }
+    }
+
+    private void dispatchMessages(List<PendingDispatch> messages) {
+        List<T> m = new ArrayList<>();
+        for (PendingDispatch pd : messages) {
+            if (!pd.isCancelled()) {
+                m.add(pd.getWrapped());
+            }
+        }
+        timeOfLastDispatch = System.nanoTime();
+        Boolean sent = Boolean.TRUE;
+        try {
+            consumer.accept(m);
+        } catch (Exception e) {
+            sent = Boolean.FALSE;
+        }
+        for (PendingDispatch pd : messages) {
+            pd.setResult(sent);
+        }
+    }
+
+
+    public int size() {
+        return queue.size();
+    }
+
+    private void readBatchFile() {
+
+    }
+
+
+    //
+    // Internal Helper classes
+    //
+
+
+
+    private class PendingDispatch implements Future<Boolean> {
+        T wrapped;
+        boolean canceled;
+        boolean done;
+        Boolean result = null;
+
+        PendingDispatch(T wrapped) {
+            this.wrapped = wrapped;
+            canceled = false;
+            done = false;
+        }
+
+        T getWrapped() {
+            return wrapped;
+        }
+
+        void setResult(Boolean b) {
+            result = b;
+            done = true;
+            synchronized (this) {
+                notify();
+            }
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            canceled = true;
+            return canceled;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return canceled;
+        }
+
+        @Override
+        public boolean isDone() {
+            return done;
+        }
+
+        @Override
+        public Boolean get() throws InterruptedException, ExecutionException {
+            while (!done) {
+                synchronized (this) {
+                    wait(100);
+                }
+            }
+            return result;
+        }
+
+        @Override
+        public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            if (!done) {
+                synchronized (this) {
+                    wait(unit.toMillis(timeout));
+                }
+            }
+            return result;
+        }
+    }
+
+
+    private class DispatchTask implements Runnable  {
+        @Override
+        public void run() {
+            try {
+                dispatchAll();
+            } catch (Throwable t) {
+            }
+        }
+    }
+
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
new file mode 100644
index 0000000..f842cea
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+import java.util.function.Consumer;
+
+/**
+ * Created by peterajohnson on 10/27/17.
+ */
+public class BufferedQueueNOP<T> implements BufferedQueue<T> {
+
+    private Consumer consumer;
+
+    @Override
+    public void setConsumer(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
+    public boolean offer(T o) {
+        consumer.accept(o);
+        return true;
+    }
+
+    @Override
+    public int size() {
+        return 0;
+    }
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
new file mode 100644
index 0000000..4dfce37
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.asyncevents.direct;
+
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the AsyncEventService that writes first directly to ES
+ * and then submits to ASW as a backup.
+ *
+ * Created by peterajohnson on 8/29/17.
+ */
+public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl {
+
+
+    private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class);
+
+    private IndexingStrategy configIndexingStrategy = IndexingStrategy.ASYNC;
+
+    private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>();
+
+    public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) {
+        super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler);
+
+        //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS);
+        bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); });
+
+        configIndexingStrategy = IndexingStrategy.get(queueFig.getQueueStrategy());
+
+    }
+
+    protected void dispatchToES(final List<Serializable> bodies) {
+
+        List<LegacyQueueMessage> messages = new ArrayList<>();
+        for (Serializable body : bodies) {
+            String uuid = UUID.randomUUID().toString();
+            LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
+            messages.add(message);
+        }
+
+        List<IndexEventResult> result = callEventHandlers(messages);
+
+        // failed to dispatch send to SQS
+        try {
+            List<LegacyQueueMessage> indexedMessages = submitToIndex(result, false);
+        } catch (Exception e) {
+            e.printStackTrace();
+            for (Serializable body : bodies) {
+                super.offer(body);
+            }
+        }
+
+
+    }
+
+    /**
+     * Offer the EntityIdScope to SQS
+     *
+     * The body will be an implementation of one of the following:
+     *    EntityIndexEvent
+     *    EntityDeleteEvent
+     *    EdgeIndexEvent
+     *    EdgeDeleteEvent
+     */
+    protected void offer(final Serializable body) {
+        List<LegacyQueueMessage> messages = getMessageArray(body);
+        List<IndexEventResult> result = callEventHandlers(messages);
+        submitToIndex( result, false );
+        super.offer(body);
+    }
+
+    private List<LegacyQueueMessage> getMessageArray(final Serializable body) {
+        String uuid = UUID.randomUUID().toString();
+
+        LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here");
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Sync Handler called for body of class {} ", body.getClass().getSimpleName());
+        }
+
+        List<LegacyQueueMessage> messages = new ArrayList<>();
+        messages.add(message);
+        return messages;
+    }
+
+
+    protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) {
+        if  (shouldSendToDirectToES(indexingStrategy)) {
+            List<LegacyQueueMessage> messages = getMessageArray(operation);
+            List<IndexEventResult> result = callEventHandlers(messages);
+            submitToIndex( result, false );
+        }
+
+        // only if single region.
+        if (shouldSendToAWS(indexingStrategy)) {
+            super.offer(operation, indexingStrategy);
+        }
+    }
+
+
+    protected List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) {
+
+        // if nothing came back then return empty list
+        if(indexEventResults==null){
+            return new ArrayList<>(0);
+        }
+
+        IndexOperationMessage combined = new IndexOperationMessage();
+        List<LegacyQueueMessage> queueMessages = indexEventResults.stream()
+
+            // filter out messages that are not present, they were not processed and put into the results
+            .filter( result -> result.getQueueMessage().isPresent() )
+            .map(indexEventResult -> {
+
+                //record the cycle time
+                getMessageCycye().update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+
+                // ingest each index op into our combined, single index op for the index producer
+                if(indexEventResult.getIndexOperationMessage().isPresent()){
+                    combined.ingest(indexEventResult.getIndexOperationMessage().get());
+                }
+
+                return indexEventResult.getQueueMessage().get();
+            })
+            // collect into a list of QueueMessages that can be ack'd later
+            .collect(Collectors.toList());
+
+
+        // dispatch to ES
+        ElasticsearchIndexEvent elasticsearchIndexEvent = getIndexOperationMessage(combined);
+        handleIndexOperation(elasticsearchIndexEvent);
+        return queueMessages;
+    }
+
+    private boolean shouldSendToDirectToES(IndexingStrategy indexingStrategy) {
+        if (indexingStrategy == IndexingStrategy.DEFAULT) {
+            indexingStrategy = configIndexingStrategy;
+        }
+        return  (indexingStrategy == IndexingStrategy.DIRECT || indexingStrategy == IndexingStrategy.DIRECTONLY);
+    }
+
+    private boolean shouldSendToAWS(IndexingStrategy indexingStrategy) {
+        if (indexingStrategy == IndexingStrategy.DEFAULT) {
+            indexingStrategy = configIndexingStrategy;
+        }
+        // and is in same region.
+        return  (indexingStrategy != IndexingStrategy.DIRECTONLY);
+    }
+}
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
new file mode 100644
index 0000000..69c5445
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.index;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class describes the paths an index request can take
+ * between tomcat and ES.
+ *
+ * Created by peterajohnson on 10/30/17.
+ */
+public enum IndexingStrategy {
+
+    DIRECTONLY("directonly"),   // Index request is sent directly to ES and not to AWS
+    DIRECT("direct"),   // Index request is sent directly to ES before sync ASW
+    SYNC("sync"),     // Index request is sent via a sync AWS to ES
+    ASYNC("async"),     // Index request is sent via an async AWS to ES
+    DEFAULT("default");    // Follow the default setting
+
+    private String name;
+
+    private static final Map<String,IndexingStrategy> NAME_MAP;
+
+    static {
+        Map<String,IndexingStrategy> map = new HashMap<String,IndexingStrategy>();
+        for (IndexingStrategy instance : IndexingStrategy.values()) {
+            map.put(instance.getName(),instance);
+        }
+        NAME_MAP = Collections.unmodifiableMap(map);
+    }
+
+    IndexingStrategy(String name) {
+        this.name = name;
+    }
+
+    public static IndexingStrategy get(String name) {
+        IndexingStrategy indexingStrategy =  NAME_MAP.get(name);
+        if (indexingStrategy == null) {
+            return DEFAULT;
+        }
+        return indexingStrategy;
+    }
+
+
+    public String getName() {
+        return this.name;
+    }
+
+}
+
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
index cef6d12..f38cefa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java
@@ -21,10 +21,12 @@
 import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory;
 import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl;
 
+import org.apache.usergrid.corepersistence.index.IndexingStrategy;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+
 import java.util.*;
 
 import static org.apache.usergrid.persistence.Schema.*;
@@ -52,6 +54,16 @@
         return VALID_SETTING_NAMES;
     }
 
+    public static IndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
+
+        IndexingStrategy indexingStrategy = IndexingStrategy.DEFAULT;
+        String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
+        if (indexing != null) {
+            indexingStrategy = IndexingStrategy.get(indexing);
+        }
+        return indexingStrategy;
+    }
+
     public static Boolean asyncIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) {
 
         String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX);
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
index f19bede..4a12d14 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -114,4 +114,9 @@
     @Default("true")
     boolean isAsyncQueue();
 
+
+    @Key("usergrid.queue.strategy")
+    @Default("async")
+    String getQueueStrategy();
+
 }
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
new file mode 100644
index 0000000..f562475
--- /dev/null
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.rest.interceptors;
+
+import org.glassfish.jersey.server.ContainerRequest;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import javax.inject.*;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.ext.*;
+import javax.ws.rs.ext.Provider;
+
+/**
+ * If the request had an ACCEPT_ENCODING header containing 'gzip' then
+ * gzip the response and add CONTENT_ENCODING gzip header
+ *
+ * * If the request had an CONTENT_ENCODING header containing 'gzip' then
+ *  unzip the request and remove the CONTENT_ENCODING gzip header
+ *  Created by peterajohnson on 11/1/17.
+ */
+@Provider
+public class GZIPWriterInterceptor implements ReaderInterceptor, WriterInterceptor {
+
+    final private static String GZIP = "gzip";
+    @Inject
+    private javax.inject.Provider<ContainerRequest> requestProvider;
+
+    @Override
+    public void aroundWriteTo(WriterInterceptorContext context) throws IOException,WebApplicationException {
+        ContainerRequest request = requestProvider.get();
+
+        if (request != null) {
+            List<String> aeHeaders = request.getRequestHeader(HttpHeaders.ACCEPT_ENCODING);
+            if (aeHeaders != null && aeHeaders.size() > 0) {
+                String acceptEncodingHeader = aeHeaders.get(0);
+                if (acceptEncodingHeader.contains(GZIP)) {
+                    OutputStream outputStream = context.getOutputStream();
+                    context.setOutputStream(new GZIPOutputStream(outputStream));
+                    context.getHeaders().putSingle(HttpHeaders.CONTENT_ENCODING, GZIP);
+                }
+            }
+        }
+        context.proceed();
+    }
+
+    @Override
+    public Object aroundReadFrom(ReaderInterceptorContext context) throws IOException, WebApplicationException {
+        String encoding = context.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING);
+        if (GZIP.equalsIgnoreCase(encoding)) {
+            GZIPInputStream is = new GZIPInputStream(context.getInputStream());
+            context.getHeaders().remove(HttpHeaders.CONTENT_ENCODING);
+            context.setInputStream(is);
+        }
+
+        return context.proceed();
+    }
+}