Merge pull request #6 from apigee/mutationflushing

Changed all batch mutations to be counting mutations
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
index 1164dbc..3188fd8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java
@@ -61,6 +61,7 @@
 import org.apache.usergrid.persistence.cassandra.CounterUtils;
 import org.apache.usergrid.persistence.cassandra.CounterUtils.AggregateCounterSelection;
 import org.apache.usergrid.persistence.exceptions.TransactionNotFoundException;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 import org.apache.usergrid.utils.UUIDUtils;
 
 import com.fasterxml.uuid.UUIDComparator;
@@ -89,7 +90,7 @@
 
 import static me.prettyprint.hector.api.factory.HFactory.createColumn;
 import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+
 import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
 import static org.apache.usergrid.mq.Queue.QUEUE_CREATED;
 import static org.apache.usergrid.mq.Queue.QUEUE_MODIFIED;
@@ -240,7 +241,8 @@
     @Override
     public Message postToQueue( String queuePath, Message message ) {
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         queuePath = normalizeQueuePath( queuePath );
 
@@ -259,7 +261,7 @@
                 break;
             }
 
-            batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+            batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), be );
             for ( QueueInfo q : subscribers.getQueues() ) {
                 batchPostToQueue( batch, q.getPath(), message, indexUpdate, timestamp );
 
@@ -439,7 +441,8 @@
         UUID timestampUuid = newTimeUUID();
         long timestamp = getTimestampInMicros( timestampUuid );
 
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         batchSubscribeToQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath, subscriberQueueId,
                 timestamp );
@@ -484,7 +487,8 @@
         UUID timestampUuid = newTimeUUID();
         long timestamp = getTimestampInMicros( timestampUuid );
 
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         batchUnsubscribeFromQueue( batch, publisherQueuePath, publisherQueueId, subscriberQueuePath, subscriberQueueId,
                 timestamp );
@@ -578,7 +582,8 @@
         UUID timestampUuid = newTimeUUID();
         long timestamp = getTimestampInMicros( timestampUuid );
 
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         QueueSet queues = new QueueSet();
 
@@ -618,7 +623,8 @@
         UUID timestampUuid = newTimeUUID();
         long timestamp = getTimestampInMicros( timestampUuid );
 
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         QueueSet queues = new QueueSet();
 
@@ -658,7 +664,8 @@
         UUID timestampUuid = newTimeUUID();
         long timestamp = getTimestampInMicros( timestampUuid );
 
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         QueueSet queues = new QueueSet();
 
@@ -698,7 +705,8 @@
         UUID timestampUuid = newTimeUUID();
         long timestamp = getTimestampInMicros( timestampUuid );
 
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         QueueSet queues = new QueueSet();
 
@@ -732,7 +740,8 @@
     @Override
     public void incrementAggregateQueueCounters( String queuePath, String category, String counterName, long value ) {
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         counterUtils.batchIncrementAggregateCounters( m, applicationId, null, null, getQueueId( queuePath ), category,
                 counterName, value, timestamp );
         batchExecute( m, CassandraService.RETRY_COUNT );
@@ -875,7 +884,8 @@
     @Override
     public void incrementQueueCounters( String queuePath, Map<String, Long> counts ) {
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         counterUtils.batchIncrementQueueCounters( m, getQueueId( queuePath ), counts, timestamp, applicationId );
         batchExecute( m, CassandraService.RETRY_COUNT );
     }
@@ -884,7 +894,8 @@
     @Override
     public void incrementQueueCounter( String queuePath, String name, long value ) {
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         counterUtils.batchIncrementQueueCounter( m, getQueueId( queuePath ), name, value, timestamp, applicationId );
         batchExecute( m, CassandraService.RETRY_COUNT );
     }
@@ -954,7 +965,8 @@
         UUID timestampUuid = newTimeUUID();
         long timestamp = getTimestampInMicros( timestampUuid );
 
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         addQueueToMutator( batch, queue, timestamp );
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
index 99db41c..b5d4eca 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java
@@ -33,6 +33,7 @@
 import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.mq.cassandra.io.NoTransactionSearch.SearchParam;
 import org.apache.usergrid.persistence.exceptions.QueueException;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 import org.apache.usergrid.utils.UUIDUtils;
 
 import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
@@ -48,7 +49,7 @@
 
 import static me.prettyprint.hector.api.factory.HFactory.createColumn;
 import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+
 import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
 import static org.apache.usergrid.mq.Queue.QUEUE_NEWEST;
 import static org.apache.usergrid.mq.Queue.QUEUE_OLDEST;
@@ -306,7 +307,7 @@
         // conditions with clock drift.
         long colTimestamp = UUIDUtils.getTimestampInMicros( lastReturnedId );
 
-        Mutator<UUID> mutator = createMutator( ko, ue );
+        Mutator<UUID> mutator = CountingMutator.createFlushingMutator( ko, ue );
 
         if ( logger.isDebugEnabled() )
         {
diff --git a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
index 69860d6..98ee361 100644
--- a/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/ConsumerTransaction.java
@@ -35,6 +35,7 @@
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.exceptions.QueueException;
 import org.apache.usergrid.persistence.exceptions.TransactionNotFoundException;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 import org.apache.usergrid.utils.UUIDUtils;
 
 import me.prettyprint.hector.api.Keyspace;
@@ -43,7 +44,7 @@
 import me.prettyprint.hector.api.query.SliceQuery;
 
 import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+
 import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
 import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getConsumerId;
 import static org.apache.usergrid.mq.cassandra.CassandraMQUtils.getQueueClientTransactionKey;
@@ -128,7 +129,8 @@
 
         logger.debug( "Writing new timeout at '{}' for message '{}'", expirationId, messageId );
 
-        Mutator<ByteBuffer> mutator = createMutator( ko, be );
+
+        Mutator<ByteBuffer> mutator = CountingMutator.createFlushingMutator( ko, be );
 
         mutator.addInsertion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(),
                 createColumn( expirationId, messageId, cass.createTimestamp(), ue, ue ) );
@@ -162,7 +164,7 @@
     private void deleteTransaction( UUID queueId, UUID consumerId, UUID transactionId )
     {
 
-        Mutator<ByteBuffer> mutator = createMutator( ko, be );
+        Mutator<ByteBuffer> mutator = CountingMutator.createFlushingMutator( ko, be );
         ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
 
         mutator.addDeletion( key, CONSUMER_QUEUE_TIMEOUTS.getColumnFamily(), transactionId, ue,
@@ -374,7 +376,7 @@
             return;
         }
 
-        Mutator<ByteBuffer> mutator = createMutator( ko, be );
+        Mutator<ByteBuffer> mutator = CountingMutator.createFlushingMutator( ko, be );
         ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
 
         for ( int i = 0; i < maxIndex && i < pointers.size(); i++ )
@@ -406,7 +408,7 @@
     protected void writeTransactions( List<Message> messages, final long futureTimeout, UUID queueId, UUID consumerId )
     {
 
-        Mutator<ByteBuffer> mutator = createMutator( ko, be );
+        Mutator<ByteBuffer> mutator = CountingMutator.createFlushingMutator( ko, be );
 
         ByteBuffer key = getQueueClientTransactionKey( queueId, consumerId );
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index 0ada3fc..854c0ff 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -76,7 +76,7 @@
 import static me.prettyprint.cassandra.service.FailoverPolicy.ON_FAIL_TRY_ALL_AVAILABLE;
 import static me.prettyprint.hector.api.factory.HFactory.createColumn;
 import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+
 import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
 import static me.prettyprint.hector.api.factory.HFactory.createSliceQuery;
 import static me.prettyprint.hector.api.factory.HFactory.createVirtualKeyspace;
@@ -824,7 +824,7 @@
         if ( ttl != 0 ) {
             col.setTtl( ttl );
         }
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
         m.insert( bytebuffer( key ), columnFamily.toString(), col );
     }
 
@@ -851,7 +851,7 @@
                                                                                                  " ttl=" + ttl : "" ) );
         }
 
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
         long timestamp = createTimestamp();
 
         for ( Object name : map.keySet() ) {
@@ -912,7 +912,7 @@
             db_logger.debug( "deleteColumn cf=" + columnFamily + " key=" + key + " name=" + column );
         }
 
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
         m.delete( bytebuffer( key ), columnFamily.toString(), bytebuffer( column ), be );
     }
 
@@ -1002,7 +1002,7 @@
             db_logger.debug( "deleteRow cf=" + columnFamily + " key=" + key );
         }
 
-        createMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString() ).execute();
+        CountingMutator.createFlushingMutator( ko, be ).addDeletion( bytebuffer( key ), columnFamily.toString() ).execute();
     }
 
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
index e378013..1aa6f67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
@@ -32,6 +32,7 @@
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 import org.apache.usergrid.utils.UUIDUtils;
 
 import org.apache.commons.lang.StringUtils;
@@ -59,7 +60,7 @@
 
 import static java.lang.String.CASE_INSENSITIVE_ORDER;
 
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+
 import static me.prettyprint.hector.api.factory.HFactory.createRangeSlicesQuery;
 import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
 import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
@@ -252,7 +253,7 @@
 
 
         Keyspace ko = cass.getSystemKeyspace();
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
 
         long timestamp = cass.createTimestamp();
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java
index fdff3ca..132b2f4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java
@@ -79,6 +79,7 @@
 import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 import org.apache.usergrid.persistence.schema.CollectionInfo;
 import org.apache.usergrid.utils.ClassUtils;
 import org.apache.usergrid.utils.CompositeUtils;
@@ -112,7 +113,7 @@
 import static java.util.Arrays.asList;
 
 import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.usergrid.locking.LockHelper.getUniqueUpdateLock;
@@ -725,7 +726,7 @@
         UUID timestampUuid = newTimeUUID();
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
         A entity = batchCreate( m, entityType, entityClass, properties, importId, timestampUuid );
 
         batchExecute( m, CassandraService.RETRY_COUNT );
@@ -956,7 +957,7 @@
     public void insertEntity( String type, UUID entityId ) throws Exception {
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
 
         Object itemKey = key( entityId );
 
@@ -1386,7 +1387,7 @@
         }
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
 
         UUID timestampUuid = newTimeUUID();
         properties.put( PROPERTY_MODIFIED, getTimestampInMillis( timestampUuid ) );
@@ -1410,7 +1411,7 @@
         logger.info( "deleteEntity: {} is of type {}", entityId, entity.getType() );
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
 
         UUID timestampUuid = newTimeUUID();
         long timestamp = getTimestampInMicros( timestampUuid );
@@ -1725,7 +1726,7 @@
     public void createApplicationCollection( String entityType ) throws Exception {
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, be );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, be );
 
         long timestamp = cass.createTimestamp();
 
@@ -2017,7 +2018,8 @@
         DynamicEntity entity = loadPartialEntity( entityRef.getUuid(), propertyName );
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         propertyValue = getDefaultSchema().validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
 
@@ -2052,7 +2054,8 @@
         EntityRef entity = getRef( entityRef.getUuid() );
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, elementCoValue, false,
                 timestampUuid );
@@ -2072,7 +2075,8 @@
         EntityRef entity = getRef( entityRef.getUuid() );
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         for ( Object elementValue : elementValues ) {
             batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, null, false, timestampUuid );
@@ -2093,7 +2097,8 @@
         EntityRef entity = getRef( entityRef.getUuid() );
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         for ( Map.Entry<?, ?> elementValue : elementValues.entrySet() ) {
             batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue.getKey(),
@@ -2115,7 +2120,8 @@
         EntityRef entity = getRef( entityRef.getUuid() );
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
 
         batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, true, timestampUuid );
 
@@ -2289,7 +2295,8 @@
     @Override
     public Entity createRole( String roleName, String roleTitle, long inactivity ) throws Exception {
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         batchCreateRole( batch, null, roleName, roleTitle, inactivity, null, timestampUuid );
         batchExecute( batch, CassandraService.RETRY_COUNT );
         return get( roleRef( roleName ) );
@@ -2301,7 +2308,8 @@
         roleName = roleName.toLowerCase();
         permission = permission.toLowerCase();
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ), permission,
                 ByteBuffer.allocate( 0 ), timestamp );
         batchExecute( batch, CassandraService.RETRY_COUNT );
@@ -2312,7 +2320,8 @@
     public void grantRolePermissions( String roleName, Collection<String> permissions ) throws Exception {
         roleName = roleName.toLowerCase();
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         for ( String permission : permissions ) {
             permission = permission.toLowerCase();
             addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ), permission,
@@ -2327,7 +2336,8 @@
         roleName = roleName.toLowerCase();
         permission = permission.toLowerCase();
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         CassandraPersistenceUtils
                 .addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ),
                         permission, timestamp );
@@ -2366,7 +2376,8 @@
     @Override
     public Entity createGroupRole( UUID groupId, String roleName, long inactivity ) throws Exception {
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         batchCreateRole( batch, groupId, roleName, null, inactivity, null, timestampUuid );
         batchExecute( batch, CassandraService.RETRY_COUNT );
         return get( roleRef( groupId, roleName ) );
@@ -2378,7 +2389,8 @@
         roleName = roleName.toLowerCase();
         permission = permission.toLowerCase();
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( groupId, roleName ),
                 permission, ByteBuffer.allocate( 0 ), timestamp );
         batchExecute( batch, CassandraService.RETRY_COUNT );
@@ -2390,7 +2402,8 @@
         roleName = roleName.toLowerCase();
         permission = permission.toLowerCase();
         long timestamp = cass.createTimestamp();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                be );
         CassandraPersistenceUtils.addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
                 getRolePermissionsKey( groupId, roleName ), permission, timestamp );
         batchExecute( batch, CassandraService.RETRY_COUNT );
@@ -2516,7 +2529,8 @@
                                              long cassandraTimestamp ) {
         // TODO short circuit
         if ( !skipAggregateCounters ) {
-            Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+            Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                    be );
 
             counterUtils
                     .batchIncrementAggregateCounters( m, applicationId, userId, groupId, null, category, counterName,
@@ -2532,7 +2546,8 @@
         // TODO shortcircuit
         if ( !skipAggregateCounters ) {
             long timestamp = cass.createTimestamp();
-            Mutator<ByteBuffer> m = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+            Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ),
+                    be );
             counterUtils.batchIncrementAggregateCounters( m, applicationId, userId, groupId, null, category, counters,
                     timestamp );
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
index f9b1262..2f3beb1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/GeoIndexManager.java
@@ -29,6 +29,7 @@
 import org.apache.usergrid.persistence.geo.EntityLocationRef;
 import org.apache.usergrid.persistence.geo.GeocellManager;
 import org.apache.usergrid.persistence.geo.model.Point;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 
 import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
 import me.prettyprint.hector.api.Keyspace;
@@ -37,7 +38,7 @@
 import me.prettyprint.hector.api.mutation.Mutator;
 
 import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_GEOCELL;
 import static org.apache.usergrid.persistence.Schema.INDEX_CONNECTIONS;
 import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_INDEX;
@@ -276,7 +277,7 @@
                                                 String propertyName, EntityLocationRef location ) {
 
         Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
-        Mutator<ByteBuffer> m = createMutator( ko, ByteBufferSerializer.get() );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() );
 
         batchStoreLocationInCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
                 key( owner.getUuid(), collectionName, propertyName ), owner.getUuid(), location );
@@ -314,7 +315,7 @@
                                                    EntityLocationRef location ) {
 
         Keyspace ko = cass.getApplicationKeyspace( em.getApplicationId() );
-        Mutator<ByteBuffer> m = createMutator( ko, ByteBufferSerializer.get() );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, ByteBufferSerializer.get() );
 
         batchRemoveLocationFromCollectionIndex( m, em.getIndexBucketLocator(), em.getApplicationId(),
                 key( owner.getUuid(), collectionName, propertyName ), location );
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 73f89ed..908ce9d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -94,7 +94,7 @@
 import static java.lang.String.CASE_INSENSITIVE_ORDER;
 import static java.util.Arrays.asList;
 
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_COLLECTIONS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
index 9448543..cbde58a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/IndexIT.java
@@ -32,6 +32,7 @@
 import org.apache.usergrid.persistence.cassandra.IndexUpdate;
 import org.apache.usergrid.persistence.cassandra.IndexUpdate.IndexEntry;
 import org.apache.usergrid.persistence.cassandra.RelationManagerImpl;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 import org.apache.usergrid.utils.JsonUtils;
 import org.apache.usergrid.utils.UUIDUtils;
 
@@ -39,7 +40,6 @@
 import me.prettyprint.hector.api.Keyspace;
 import me.prettyprint.hector.api.mutation.Mutator;
 
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -432,7 +432,7 @@
         ByteBufferSerializer buf = ByteBufferSerializer.get();
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
-        Mutator<ByteBuffer> m = createMutator( ko, buf );
+        Mutator<ByteBuffer> m = CountingMutator.createFlushingMutator( ko, buf );
 
 
         IndexUpdate update =