Merge commit 'refs/pull/573/head' of github.com:apache/usergrid
diff --git a/stack/config/src/test/resources/usergrid-test.properties b/stack/config/src/test/resources/usergrid-test.properties
index 143d60a..5def24e 100644
--- a/stack/config/src/test/resources/usergrid-test.properties
+++ b/stack/config/src/test/resources/usergrid-test.properties
@@ -104,8 +104,6 @@
 
 usergrid.counter.batch.interval=10
 #usergrid.auth.token_secret_salt=super secret token value
-#usergrid.auth.token_expires_from_last_use=false
-#usergrid.auth.token_refresh_reuses_id=false
 
 # max time to persist tokens for (milliseconds)
 #usergrid.auth.token.persist.expires=0
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 09047bb..658320e 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -428,6 +428,12 @@
 
         <dependency>
             <groupId>org.apache.usergrid</groupId>
+            <artifactId>token</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.usergrid</groupId>
             <artifactId>actorsystem</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index d4cf8cb..ec6b775 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -44,7 +44,7 @@
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
-import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.token.guice.TokenModule;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import java.util.Properties;
@@ -68,6 +68,7 @@
         install( new CommonModule());
         install( new LockModule());
         install( new CacheModule());
+        install( new TokenModule());
         install( new CollectionModule() {
             /**
              * configure our migration data provider for all entities in the system
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
index 5d50ef8..5aa556a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
@@ -33,14 +33,9 @@
 
 import com.google.inject.Injector;
 
-import me.prettyprint.hector.api.ddl.ComparatorType;
-
-import static me.prettyprint.hector.api.factory.HFactory.createColumnFamilyDefinition;
 import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.getCfDefs;
 import static org.apache.usergrid.persistence.cassandra.CassandraService.DEFAULT_ORGANIZATION;
 import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEMENT_APPLICATION;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.PRINCIPAL_TOKEN_CF;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.TOKENS_CF;
 import static org.apache.usergrid.persistence.cassandra.CassandraService.getApplicationKeyspace;
 
 
@@ -116,12 +111,6 @@
 
         logger.info( "Initialize keyspace and legacy column families" );
 
-        cass.createColumnFamily( getApplicationKeyspace(),
-            createColumnFamilyDefinition( getApplicationKeyspace(), TOKENS_CF, ComparatorType.BYTESTYPE ) );
-
-        cass.createColumnFamily( getApplicationKeyspace(),
-            createColumnFamilyDefinition( getApplicationKeyspace(), PRINCIPAL_TOKEN_CF, ComparatorType.UUIDTYPE ) );
-
         cass.createColumnFamilies( getApplicationKeyspace(),
             getCfDefs( ApplicationCF.class, getApplicationKeyspace() ) );
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index f8b1f6c..3e08cf7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -64,9 +64,6 @@
 
     public static final boolean USE_VIRTUAL_KEYSPACES = true;
 
-    public static final String TOKENS_CF = "Tokens";
-    public static final String PRINCIPAL_TOKEN_CF = "PrincipalTokens";
-
     public static final int DEFAULT_COUNT = 1000;
     public static final int ALL_COUNT = 100000;
     public static final int INDEX_ENTRY_LIST_COUNT = 1000;
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 26a1890..dda50c6 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -154,43 +154,44 @@
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( value, "value is required" );
 
+        final BatchStatement batchStatement = new BatchStatement();
+
         Statement mapEntry;
         Statement mapKey;
         if (ttl > 0){
             Using timeToLive = QueryBuilder.ttl(ttl);
 
-            mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+             batchStatement.add(QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
                 .using(timeToLive)
                 .value("key", getMapEntryPartitionKey(scope, key))
                 .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
-                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
+                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)));
 
 
             final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() );
-            mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+            batchStatement.add(QueryBuilder.insertInto(MAP_KEYS_TABLE)
                 .using(timeToLive)
                 .value("key", getMapKeyPartitionKey(scope, bucket))
                 .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
-                .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED));
+                .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)));
         }else{
 
-            mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+            batchStatement.add(QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
                 .value("key", getMapEntryPartitionKey(scope, key))
                 .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
-                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED));
+                .value("value", DataType.text().serialize(value, ProtocolVersion.NEWEST_SUPPORTED)));
 
             // get a bucket number for the map keys table
             final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() );
 
-            mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+            batchStatement.add(QueryBuilder.insertInto(MAP_KEYS_TABLE)
                 .value("key", getMapKeyPartitionKey(scope, bucket))
                 .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
-                .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED));
+                .value("value", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED)));
 
         }
 
-        session.execute(mapEntry);
-        session.execute(mapKey);
+        session.execute(batchStatement);
 
     }
 
@@ -211,23 +212,23 @@
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( putUuid, "value is required" );
 
+        final BatchStatement batchStatement = new BatchStatement();
 
-        Statement mapEntry = QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
+        batchStatement.add(QueryBuilder.insertInto(MAP_ENTRIES_TABLE)
             .value("key", getMapEntryPartitionKey(scope, key))
             .value("column1", DataType.cboolean().serialize(true, ProtocolVersion.NEWEST_SUPPORTED))
-            .value("value", DataType.uuid().serialize(putUuid, ProtocolVersion.NEWEST_SUPPORTED));
+            .value("value", DataType.uuid().serialize(putUuid, ProtocolVersion.NEWEST_SUPPORTED)));
 
-        session.execute(mapEntry);
 
 
         final int bucket = BUCKET_LOCATOR.getCurrentBucket( scope.getName() );
-        Statement mapKey;
-        mapKey = QueryBuilder.insertInto(MAP_KEYS_TABLE)
+        batchStatement.add(QueryBuilder.insertInto(MAP_KEYS_TABLE)
             .value("key", getMapKeyPartitionKey(scope, bucket))
             .value("column1", DataType.text().serialize(key, ProtocolVersion.NEWEST_SUPPORTED))
-            .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED));
+            .value("value", DataType.serializeValue(null, ProtocolVersion.NEWEST_SUPPORTED)));
 
-        session.execute(mapKey);
+        session.execute(batchStatement);
+
     }
 
 
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 8a45323..45aad4b 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -113,6 +113,7 @@
         <module>queue</module>
         <module>cache</module>
         <module>actorsystem</module>
+        <module>token</module>
     </modules>
 
     <build>
diff --git a/stack/corepersistence/token/pom.xml b/stack/corepersistence/token/pom.xml
new file mode 100644
index 0000000..fcbd73c
--- /dev/null
+++ b/stack/corepersistence/token/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <parent>
+        <artifactId>persistence</artifactId>
+        <groupId>org.apache.usergrid</groupId>
+        <version>2.2.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <description>The module for handling access token persistence</description>
+
+    <artifactId>token</artifactId>
+    <name>Usergrid Token</name>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons.lang.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.usergrid</groupId>
+            <artifactId>common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- test deps -->
+
+        <dependency>
+            <groupId>org.apache.usergrid</groupId>
+            <artifactId>common</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <!--
+        <dependency>
+            <groupId>org.apache.usergrid</groupId>
+            <artifactId>collection</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        -->
+
+    </dependencies>
+
+</project>
diff --git a/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/TokenSerialization.java b/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/TokenSerialization.java
new file mode 100644
index 0000000..22a93f0
--- /dev/null
+++ b/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/TokenSerialization.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.token;
+
+
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+
+/**
+ * Serialize token information to/from Cassandra. This was ported over to use a newer cassandra client from the old
+ * persistence code @
+ *
+ *     https://github.com/apache/usergrid/tree/3f819dc0679f84edb57c52e69b58622417cfd59f
+ *     org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl
+ *
+ */
+public interface TokenSerialization extends Migration {
+
+    void deleteTokens(List<UUID> tokenUUIDs, ByteBuffer principalKeyBuffer);
+
+    void revokeToken(UUID tokenUUID, ByteBuffer principalKeyBuffer);
+
+    void updateTokenAccessTime(UUID tokenUUID, long accessedTime, long inactiveTime, int ttl );
+
+    Map<String, Object> getTokenInfo(UUID tokenUUID);
+
+    void putTokenInfo(UUID tokenUUID, Map<String, Object> tokenInfo, ByteBuffer principalKeyBuffer, int ttl);
+
+    List<UUID> getTokensForPrincipal(ByteBuffer principalKeyBuffer);
+
+}
diff --git a/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/guice/TokenModule.java b/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/guice/TokenModule.java
new file mode 100644
index 0000000..a8fdd4c
--- /dev/null
+++ b/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/guice/TokenModule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.token.guice;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Key;
+import com.google.inject.multibindings.Multibinder;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.token.TokenSerialization;
+import org.apache.usergrid.persistence.token.impl.TokenSerializationImpl;
+
+
+/**
+ * Wire up cache impl.
+ */
+public class TokenModule extends AbstractModule {
+
+    @Override
+    protected void configure() {
+
+        bind( TokenSerialization.class ).to( TokenSerializationImpl.class );
+
+        Multibinder<Migration> migrationBinding = Multibinder.newSetBinder( binder(), Migration.class );
+        migrationBinding.addBinding().to(  Key.get( TokenSerialization.class ) );
+
+    }
+}
+
+
diff --git a/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/impl/TokenSerializationImpl.java b/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/impl/TokenSerializationImpl.java
new file mode 100644
index 0000000..7825da0
--- /dev/null
+++ b/stack/corepersistence/token/src/main/java/org/apache/usergrid/persistence/token/impl/TokenSerializationImpl.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.token.impl;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.*;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.TableDefinition;
+import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionImpl;
+import org.apache.usergrid.persistence.token.TokenSerialization;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+
+/**
+ * Serialize tokens and their details to Cassandra.
+ */
+@Singleton
+public class TokenSerializationImpl implements TokenSerialization {
+
+    public static final Logger logger = LoggerFactory.getLogger(TokenSerializationImpl.class);
+
+    private SmileFactory smile = new SmileFactory();
+
+    private ObjectMapper smileMapper = new ObjectMapper( smile );
+
+    public static final String TOKEN_UUID = "uuid";
+    public static final String TOKEN_TYPE = "type";
+    public static final String TOKEN_CREATED = "created";
+    public static final String TOKEN_ACCESSED = "accessed";
+    public static final String TOKEN_INACTIVE = "inactive";
+    public static final String TOKEN_DURATION = "duration";
+    public static final String TOKEN_PRINCIPAL_TYPE = "principal";
+    public static final String TOKEN_ENTITY = "entity";
+    public static final String TOKEN_APPLICATION = "application";
+    public static final String TOKEN_STATE = "state";
+    public static final String TOKEN_WORKFLOW_ORG_ID = "workflowOrgId";
+    public static final String TOKEN_TYPE_ACCESS = "access";
+
+    private static final Set<String> TOKEN_PROPERTIES;
+
+    static {
+        HashSet<String> set = new HashSet<String>();
+        set.add( TOKEN_UUID );
+        set.add( TOKEN_TYPE );
+        set.add( TOKEN_CREATED );
+        set.add( TOKEN_ACCESSED );
+        set.add( TOKEN_INACTIVE );
+        set.add( TOKEN_PRINCIPAL_TYPE );
+        set.add( TOKEN_ENTITY );
+        set.add( TOKEN_APPLICATION );
+        set.add( TOKEN_STATE );
+        set.add( TOKEN_DURATION );
+        set.add( TOKEN_WORKFLOW_ORG_ID );
+        TOKEN_PROPERTIES = Collections.unmodifiableSet(set);
+    }
+
+    public static final HashSet<String> REQUIRED_TOKEN_PROPERTIES = new HashSet<String>();
+
+    static {
+        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_UUID );
+        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_TYPE );
+        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_CREATED );
+        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_ACCESSED );
+        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_INACTIVE );
+        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_DURATION );
+    }
+
+    private static final String TOKENS_TABLE = CQLUtils.quote("Tokens");
+    private static final Collection<String> TOKENS_PARTITION_KEYS = Collections.singletonList("key");
+    private static final Collection<String> TOKENS_COLUMN_KEYS = Collections.singletonList("column1");
+    private static final Map<String, DataType.Name> TOKENS_COLUMNS =
+        new HashMap<String, DataType.Name>() {{
+            put( "key", DataType.Name.BLOB );
+            put( "column1", DataType.Name.BLOB );
+            put( "value", DataType.Name.BLOB ); }};
+    private static final Map<String, String> TOKENS_CLUSTERING_ORDER =
+        new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+    private static final String PRINCIPAL_TOKENS_TABLE = CQLUtils.quote("PrincipalTokens");
+    private static final Collection<String> PRINCIPAL_TOKENS_PARTITION_KEYS = Collections.singletonList("key");
+    private static final Collection<String> PRINCIPAL_TOKENS_COLUMN_KEYS = Collections.singletonList("column1");
+    private static final Map<String, DataType.Name> PRINCIPAL_TOKENS_COLUMNS =
+        new HashMap<String, DataType.Name>() {{
+            put( "key", DataType.Name.BLOB );
+            put( "column1", DataType.Name.UUID );
+            put( "value", DataType.Name.BLOB ); }};
+    private static final Map<String, String> PRINCIPAL_TOKENS_CLUSTERING_ORDER =
+        new HashMap<String, String>(){{ put( "column1", "ASC" ); }};
+
+
+    private final Session session;
+    private final CassandraConfig cassandraConfig;
+
+
+    @Inject
+    public TokenSerializationImpl(final Session session,
+                                  final CassandraConfig cassandraConfig ) {
+        this.session = session;
+        this.cassandraConfig = cassandraConfig;
+
+    }
+
+
+    @Override
+    public void deleteTokens(final List<UUID> tokenUUIDs, final ByteBuffer principalKeyBuffer){
+
+        Preconditions.checkNotNull(tokenUUIDs, "token UUID list is required");
+        Preconditions.checkNotNull(tokenUUIDs, "principalKeyBuffer is required");
+
+        logger.trace("deleteTokens, token UUIDs: {}", tokenUUIDs);
+
+        final BatchStatement batchStatement = new BatchStatement();
+
+        tokenUUIDs.forEach( tokenUUID ->
+            batchStatement.add(
+                QueryBuilder.delete()
+                    .from(TOKENS_TABLE)
+                    .where(QueryBuilder
+                        .eq("key", DataType.uuid().serialize(tokenUUID, ProtocolVersion.NEWEST_SUPPORTED)))
+            )
+        );
+
+        batchStatement.add(
+            QueryBuilder.delete()
+                .from(PRINCIPAL_TOKENS_TABLE)
+                .where(QueryBuilder
+                    .eq("key", principalKeyBuffer)));
+
+
+        session.execute(batchStatement);
+
+    }
+
+
+    @Override
+    public void revokeToken(final UUID tokenUUID, final ByteBuffer principalKeyBuffer){
+
+        Preconditions.checkNotNull(tokenUUID, "token UUID is required");
+
+        logger.trace("revokeToken, token UUID: {}", tokenUUID);
+
+
+        final BatchStatement batchStatement = new BatchStatement();
+
+        batchStatement.add(
+            QueryBuilder.delete()
+                .from(TOKENS_TABLE)
+                .where(QueryBuilder
+                    .eq("key", DataType.uuid().serialize(tokenUUID, ProtocolVersion.NEWEST_SUPPORTED))));
+
+        if(principalKeyBuffer != null){
+            batchStatement.add(
+                QueryBuilder.delete()
+                    .from(PRINCIPAL_TOKENS_TABLE)
+                    .where(QueryBuilder
+                        .eq("key", principalKeyBuffer))
+                    .and(QueryBuilder
+                        .eq("column1", DataType.uuid().serialize(tokenUUID, ProtocolVersion.NEWEST_SUPPORTED))));
+        }
+
+        session.execute(batchStatement);
+
+    }
+
+
+    @Override
+    public void updateTokenAccessTime(UUID tokenUUID, long accessedTime, long inactiveTime, int ttl ){
+
+        Preconditions.checkNotNull(tokenUUID, "token UUID is required");
+        Preconditions.checkArgument(accessedTime > -1 , "accessedTime is required to be positive");
+        Preconditions.checkArgument(inactiveTime > -1 , "inactiveTime is required to be positive");
+        Preconditions.checkArgument(ttl > -1 , "ttl is required to be positive");
+
+        logger.trace("updateTokenAccessTime, token UUID: {}, accessedTime: {}, inactiveTime: {}, ttl: {}",
+            tokenUUID, accessedTime, inactiveTime, ttl);
+
+        final BatchStatement batchStatement = new BatchStatement();
+        final Clause inKey =
+            QueryBuilder.eq("key", DataType.uuid().serialize(tokenUUID, ProtocolVersion.NEWEST_SUPPORTED));
+        final Clause whereTokenAccessed =
+            QueryBuilder.eq("column1", DataType.serializeValue(TOKEN_ACCESSED, ProtocolVersion.NEWEST_SUPPORTED));
+        final Clause whereTokenInactive =
+            QueryBuilder.eq("column1", DataType.serializeValue(TOKEN_INACTIVE, ProtocolVersion.NEWEST_SUPPORTED));
+
+        final Assignment setAccessedTime =
+            QueryBuilder.set("value", DataType.serializeValue(accessedTime, ProtocolVersion.NEWEST_SUPPORTED));
+        final Assignment setInactiveTime =
+            QueryBuilder.set("value", DataType.serializeValue(inactiveTime, ProtocolVersion.NEWEST_SUPPORTED));
+
+        final Using usingTTL = QueryBuilder.ttl(ttl);
+
+        if( inactiveTime != Long.MIN_VALUE){
+            batchStatement.add(
+                QueryBuilder
+                    .update(TOKENS_TABLE)
+                    .with(setInactiveTime)
+                    .where(inKey).and(whereTokenInactive)
+                    .using(usingTTL)
+            );
+        }
+
+        batchStatement.add(
+            QueryBuilder
+            .update(TOKENS_TABLE)
+            .with(setAccessedTime)
+            .where(inKey).and(whereTokenAccessed)
+            .using(usingTTL)
+        );
+
+        session.execute(batchStatement);
+
+    }
+
+
+    @Override
+    public Map<String, Object> getTokenInfo(UUID tokenUUID){
+
+        Preconditions.checkNotNull(tokenUUID, "token UUID is required");
+
+        List<ByteBuffer> tokenProperties = new ArrayList<>();
+        TOKEN_PROPERTIES.forEach( prop ->
+            tokenProperties.add(DataType.serializeValue(prop, ProtocolVersion.NEWEST_SUPPORTED)));
+
+        final ByteBuffer key = DataType.uuid().serialize(tokenUUID, ProtocolVersion.NEWEST_SUPPORTED);
+
+        final Clause inKey = QueryBuilder.eq("key", key);
+        final Clause inColumn = QueryBuilder.in("column1", tokenProperties );
+
+        final Statement statement = QueryBuilder.select().all().from(TOKENS_TABLE)
+            .where(inKey)
+            .and(inColumn)
+            .setConsistencyLevel(cassandraConfig.getDataStaxReadCl());
+
+        final ResultSet resultSet = session.execute(statement);
+        final List<Row> rows = resultSet.all();
+
+        Map<String, Object> tokenInfo = new HashMap<>();
+
+        rows.forEach( row -> {
+
+            final String name = (String)DataType.text()
+                .deserialize(row.getBytes("column1"), ProtocolVersion.NEWEST_SUPPORTED);
+            final Object value = deserializeColumnValue(name, row.getBytes("value"));
+
+            if (value == null){
+                throw new RuntimeException("error deserializing token info for property: "+name);
+            }
+
+            tokenInfo.put(name, value);
+
+        });
+
+        logger.trace("getTokenInfo, info: {}", tokenInfo);
+
+        return tokenInfo;
+    }
+
+
+    @Override
+    public void putTokenInfo(final UUID tokenUUID, final Map<String, Object> tokenInfo,
+                             final ByteBuffer principalKeyBuffer, final int ttl){
+
+        Preconditions.checkNotNull(tokenUUID, "tokenUUID is required");
+        Preconditions.checkNotNull(tokenUUID, "tokenInfo is required");
+        Preconditions.checkArgument(ttl > -1 , "ttl is required to be positive");
+
+        logger.trace("putTokenInfo, token UUID: {}, tokenInfo: {}, ttl: {}", tokenUUID, tokenInfo, ttl);
+
+        final BatchStatement batchStatement = new BatchStatement();
+        final Using usingTTL = QueryBuilder.ttl(ttl);
+
+        tokenInfo.forEach((key, value) -> {
+
+            ByteBuffer valueBuffer;
+            if(key.equalsIgnoreCase(TOKEN_STATE)){
+                valueBuffer = toByteBuffer(value);
+            }else{
+                valueBuffer = DataType.serializeValue(value, ProtocolVersion.NEWEST_SUPPORTED);
+            }
+
+            batchStatement.add(
+                QueryBuilder.insertInto(TOKENS_TABLE)
+                    .value("key", DataType.serializeValue(tokenUUID, ProtocolVersion.NEWEST_SUPPORTED))
+                    .value("column1", DataType.serializeValue(key, ProtocolVersion.NEWEST_SUPPORTED))
+                    .value("value", valueBuffer)
+                    .using(usingTTL));
+
+        });
+
+        if(principalKeyBuffer != null){
+
+            batchStatement.add(
+                QueryBuilder.insertInto(PRINCIPAL_TOKENS_TABLE)
+                    .value("key", principalKeyBuffer)
+                    .value("column1", DataType.serializeValue(tokenUUID, ProtocolVersion.NEWEST_SUPPORTED))
+                    .value("value", ByteBuffer.wrap( new byte[] { 0 } ))
+                    .using(usingTTL));
+
+        }
+
+        session.execute(batchStatement);
+
+    }
+
+
+    @Override
+    public List<UUID> getTokensForPrincipal(ByteBuffer principalKeyBuffer){
+
+        Preconditions.checkNotNull(principalKeyBuffer, "principal key bytebuffer cannot be null");
+
+        Clause inPrincipal = QueryBuilder.eq("key", principalKeyBuffer);
+        Statement statement = QueryBuilder
+            .select()
+            .column("column1")
+            .from(PRINCIPAL_TOKENS_TABLE)
+            .where(inPrincipal);
+
+        final List<Row> rows = session.execute(statement).all();
+        final List<UUID> tokenUUIDs = new ArrayList<>(rows.size());
+
+        rows.forEach(row -> tokenUUIDs.add(row.getUUID("column1")));
+
+        logger.trace("getTokensForPrincipal, token UUIDs: {}", tokenUUIDs);
+
+        return tokenUUIDs;
+    }
+
+
+    private Object deserializeColumnValue(final String name, final ByteBuffer bb){
+
+
+        switch (name) {
+            case TOKEN_TYPE:
+            case TOKEN_PRINCIPAL_TYPE:
+                return DataType.text().deserialize(bb, ProtocolVersion.NEWEST_SUPPORTED);
+            case TOKEN_CREATED:
+            case TOKEN_ACCESSED:
+            case TOKEN_INACTIVE:
+            case TOKEN_DURATION:
+                return DataType.bigint().deserialize(bb, ProtocolVersion.NEWEST_SUPPORTED);
+            case TOKEN_ENTITY:
+            case TOKEN_APPLICATION:
+            case TOKEN_WORKFLOW_ORG_ID:
+            case TOKEN_UUID:
+                return DataType.uuid().deserialize(bb, ProtocolVersion.NEWEST_SUPPORTED);
+            case TOKEN_STATE:
+                return fromByteBuffer(bb, Object.class);
+        }
+
+        return null;
+    }
+
+
+    private ByteBuffer toByteBuffer( Object obj ) {
+        if ( obj == null ) {
+            return null;
+        }
+
+        byte[] bytes = null;
+        try {
+            bytes = smileMapper.writeValueAsBytes( obj );
+        }
+        catch ( Exception e ) {
+            logger.error( "Error getting SMILE bytes", e );
+        }
+        if ( bytes != null ) {
+            return ByteBuffer.wrap( bytes );
+        }
+        return null;
+    }
+
+
+    private Object fromByteBuffer( ByteBuffer byteBuffer, Class<?> clazz ) {
+        if ( ( byteBuffer == null ) || !byteBuffer.hasRemaining() ) {
+            return null;
+        }
+        if ( clazz == null ) {
+            clazz = Object.class;
+        }
+
+        Object obj = null;
+        try {
+            obj = smileMapper.readValue( byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(),
+                byteBuffer.remaining(), clazz );
+        }
+        catch ( Exception e ) {
+            logger.error( "Error parsing SMILE bytes", e );
+        }
+        return obj;
+    }
+
+
+    @Override
+    public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
+
+        return Collections.emptyList();
+    }
+
+
+    @Override
+    public Collection<TableDefinition> getTables() {
+
+        final TableDefinition tokens =
+            new TableDefinitionImpl(
+                cassandraConfig.getApplicationKeyspace(),
+                TOKENS_TABLE,
+                TOKENS_PARTITION_KEYS,
+                TOKENS_COLUMN_KEYS,
+                TOKENS_COLUMNS,
+                TableDefinitionImpl.CacheOption.KEYS,
+                TOKENS_CLUSTERING_ORDER);
+
+        final TableDefinition principalTokens =
+            new TableDefinitionImpl(
+                cassandraConfig.getApplicationKeyspace(),
+                PRINCIPAL_TOKENS_TABLE,
+                PRINCIPAL_TOKENS_PARTITION_KEYS,
+                PRINCIPAL_TOKENS_COLUMN_KEYS,
+                PRINCIPAL_TOKENS_COLUMNS,
+                TableDefinitionImpl.CacheOption.KEYS,
+                PRINCIPAL_TOKENS_CLUSTERING_ORDER);
+
+        return Arrays.asList(tokens, principalTokens);
+    }
+
+}
diff --git a/stack/corepersistence/token/src/test/java/org/apache/usergrid/persistence/token/TestTokenModule.java b/stack/corepersistence/token/src/test/java/org/apache/usergrid/persistence/token/TestTokenModule.java
new file mode 100644
index 0000000..bfa0aa0
--- /dev/null
+++ b/stack/corepersistence/token/src/test/java/org/apache/usergrid/persistence/token/TestTokenModule.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.token;
+
+
+import org.apache.usergrid.persistence.token.guice.TokenModule;
+import org.apache.usergrid.persistence.token.impl.TokenSerializationImpl;
+import org.apache.usergrid.persistence.core.guice.CommonModule;
+
+
+
+public class TestTokenModule extends org.apache.usergrid.persistence.core.guice.TestModule {
+
+    @Override
+    protected void configure() {
+
+        install( new CommonModule() );
+        install( new TokenModule() );
+
+    }
+}
diff --git a/stack/corepersistence/token/src/test/java/org/apache/usergrid/persistence/token/TokenSerializationTest.java b/stack/corepersistence/token/src/test/java/org/apache/usergrid/persistence/token/TokenSerializationTest.java
new file mode 100644
index 0000000..7beeb56
--- /dev/null
+++ b/stack/corepersistence/token/src/test/java/org/apache/usergrid/persistence/token/TokenSerializationTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.token;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.token.impl.TokenSerializationImpl;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+
+import com.google.inject.Inject;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+
+@RunWith( ITRunner.class )
+@UseModules( { TestTokenModule.class } )
+@NotThreadSafe
+public class TokenSerializationTest {
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+    @Inject
+    public TokenSerialization tokenSerialization;
+
+    @Test
+    public void putTokenInfo() {
+
+        UUID uuid = UUIDGenerator.newTimeUUID();
+        Map<String, Object> tokenDetails = new HashMap<>();
+        tokenDetails.put(TokenSerializationImpl.TOKEN_TYPE, "test-token");
+
+        ByteBuffer principalKeyBuffer = ByteBuffer.wrap("test-principal".getBytes());
+
+        tokenSerialization.putTokenInfo(uuid, tokenDetails, principalKeyBuffer, 60);
+
+    }
+
+    @Test
+    public void getTokenInfo() {
+
+        UUID uuid = UUIDGenerator.newTimeUUID();
+        Map<String, Object> tokenDetails = new HashMap<>();
+        tokenDetails.put(TokenSerializationImpl.TOKEN_TYPE, "test-token");
+
+        ByteBuffer principalKeyBuffer = ByteBuffer.wrap("test-principal".getBytes());
+
+        tokenSerialization.putTokenInfo(uuid, tokenDetails, principalKeyBuffer, 60);
+
+        Map<String, Object> returnedDetails = tokenSerialization.getTokenInfo(uuid);
+
+        assertEquals(
+            tokenDetails.get(TokenSerializationImpl.TOKEN_TYPE),
+            returnedDetails.get(TokenSerializationImpl.TOKEN_TYPE)
+        );
+    }
+
+    @Test
+    public void updateAccessTokenTime() {
+
+        UUID uuid = UUIDGenerator.newTimeUUID();
+        Map<String, Object> tokenDetails = new HashMap<>();
+        tokenDetails.put(TokenSerializationImpl.TOKEN_TYPE, "test-token");
+
+        long accessedTime = System.currentTimeMillis()-1000000;
+        long inactiveTime = accessedTime+2000000;
+
+        tokenDetails.put(TokenSerializationImpl.TOKEN_ACCESSED, accessedTime);
+        tokenDetails.put(TokenSerializationImpl.TOKEN_INACTIVE, inactiveTime);
+
+        ByteBuffer principalKeyBuffer = ByteBuffer.wrap("test-principal".getBytes());
+
+        tokenSerialization.putTokenInfo(uuid, tokenDetails, principalKeyBuffer, 60);
+
+        Map<String, Object> returnedDetails = tokenSerialization.getTokenInfo(uuid);
+
+        assertEquals(
+            accessedTime,
+            returnedDetails.get(TokenSerializationImpl.TOKEN_ACCESSED)
+        );
+
+        long newAccessedTime = System.currentTimeMillis();
+        long newInactiveTime = newAccessedTime+1000000;
+
+        tokenSerialization.updateTokenAccessTime(uuid, newAccessedTime,newInactiveTime, 1200);
+
+        returnedDetails = tokenSerialization.getTokenInfo(uuid);
+
+        assertEquals(
+            newAccessedTime,
+            returnedDetails.get(TokenSerializationImpl.TOKEN_ACCESSED)
+        );
+    }
+
+    @Test
+    public void deleteTokens() {
+
+        UUID uuid = UUIDGenerator.newTimeUUID();
+        Map<String, Object> tokenDetails = new HashMap<>();
+        tokenDetails.put(TokenSerializationImpl.TOKEN_TYPE, "test-token");
+
+        ByteBuffer principalKeyBuffer = ByteBuffer.wrap("test-principal".getBytes());
+
+        tokenSerialization.putTokenInfo(uuid, tokenDetails, principalKeyBuffer, 60);
+
+        Map<String, Object> returnedDetails = tokenSerialization.getTokenInfo(uuid);
+
+        assertEquals(
+            tokenDetails.get(TokenSerializationImpl.TOKEN_TYPE),
+            returnedDetails.get(TokenSerializationImpl.TOKEN_TYPE)
+        );
+
+        tokenSerialization.deleteTokens(Collections.singletonList(uuid), principalKeyBuffer);
+
+        returnedDetails = tokenSerialization.getTokenInfo(uuid);
+
+        assertEquals(
+            0,
+            returnedDetails.size()
+        );
+    }
+
+    @Test
+    public void revokeToken() {
+
+        UUID uuid = UUIDGenerator.newTimeUUID();
+        Map<String, Object> tokenDetails = new HashMap<>();
+        tokenDetails.put(TokenSerializationImpl.TOKEN_TYPE, "test-token");
+
+        ByteBuffer principalKeyBuffer = ByteBuffer.wrap("test-principal".getBytes());
+
+        tokenSerialization.putTokenInfo(uuid, tokenDetails, principalKeyBuffer, 60);
+
+        Map<String, Object> returnedDetails = tokenSerialization.getTokenInfo(uuid);
+
+        assertEquals(
+            tokenDetails.get(TokenSerializationImpl.TOKEN_TYPE),
+            returnedDetails.get(TokenSerializationImpl.TOKEN_TYPE)
+        );
+
+        tokenSerialization.revokeToken(uuid, principalKeyBuffer);
+
+        returnedDetails = tokenSerialization.getTokenInfo(uuid);
+
+        assertEquals(
+            0,
+            returnedDetails.size()
+        );
+    }
+
+    @Test
+    public void getTokensForPrincipal() {
+
+        UUID uuid = UUIDGenerator.newTimeUUID();
+        Map<String, Object> tokenDetails = new HashMap<>();
+        tokenDetails.put(TokenSerializationImpl.TOKEN_TYPE, "test-token");
+
+        String principal = "test-principal-"+System.currentTimeMillis();
+
+        ByteBuffer principalKeyBuffer = ByteBuffer.wrap(principal.getBytes());
+
+        tokenSerialization.putTokenInfo(uuid, tokenDetails, principalKeyBuffer, 60);
+
+        principalKeyBuffer = ByteBuffer.wrap(principal.getBytes());
+
+        List<UUID> returnTokens = tokenSerialization.getTokensForPrincipal(principalKeyBuffer);
+
+        assertEquals(
+            returnTokens.get(0),
+            uuid
+        );
+    }
+
+}
diff --git a/stack/corepersistence/token/src/test/resources/log4j.properties b/stack/corepersistence/token/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d9f6b7f
--- /dev/null
+++ b/stack/corepersistence/token/src/test/resources/log4j.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# suppress inspection "UnusedProperty" for whole file
+log4j.rootLogger=INFO,stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{3}.%M(%L)<%t>- %m%n
+
+#log4j.logger.com.datastax.driver.core=TRACE
+log4j.logger.org.safehaus.chop.plugin=DEBUG
+log4j.logger.org.safehaus.guicyfig=ERROR
+log4j.logger.org.safehaus.chop.api.store.amazon=DEBUG
+log4j.logger.org.apache.http=ERROR
+log4j.logger.com.amazonaws.request=ERROR
+log4j.logger.cassandra.db=ERROR
+
+#log4j.logger.org.apache.usergrid=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.collection=TRACE
+log4j.logger.org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact=TRACE
+
diff --git a/stack/launcher/src/main/resources/usergrid-standalone.properties b/stack/launcher/src/main/resources/usergrid-standalone.properties
index 03d9b80..cd014e6 100644
--- a/stack/launcher/src/main/resources/usergrid-standalone.properties
+++ b/stack/launcher/src/main/resources/usergrid-standalone.properties
@@ -63,8 +63,6 @@
 usergrid.version.build=${version}
 
 #usergrid.auth.token_secret_salt=super secret token value
-#usergrid.auth.token_expires_from_last_use=false
-#usergrid.auth.token_refresh_reuses_id=false
 
 # max time to persist tokens for (milliseconds)
 #usergrid.auth.token.persist.expires=0
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/ManagementResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/ManagementResource.java
index 12e1270..48aec78 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/ManagementResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/ManagementResource.java
@@ -41,7 +41,7 @@
 import org.apache.usergrid.security.sso.ApigeeSSO2Provider;
 import org.apache.usergrid.security.sso.ExternalSSOProvider;
 import org.apache.usergrid.security.sso.SSOProviderFactory;
-import org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl;
+import org.apache.usergrid.security.tokens.impl.TokenServiceImpl;
 import org.apache.usergrid.security.tokens.exceptions.BadTokenException;
 import org.apache.usergrid.utils.JsonUtils;
 import org.glassfish.jersey.server.mvc.Viewable;
@@ -62,7 +62,7 @@
 import static javax.servlet.http.HttpServletResponse.*;
 import static javax.ws.rs.core.MediaType.*;
 import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl.USERGRID_EXTERNAL_SSO_ENABLED;
+import static org.apache.usergrid.security.tokens.impl.TokenServiceImpl.USERGRID_EXTERNAL_SSO_ENABLED;
 import static org.apache.usergrid.utils.JsonUtils.mapToJsonString;
 import static org.apache.usergrid.utils.StringUtils.stringOrSubstringAfterFirst;
 import static org.apache.usergrid.utils.StringUtils.stringOrSubstringBeforeFirst;
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
index 066f734..4cbe9b2 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UserResource.java
@@ -30,7 +30,7 @@
 import org.apache.usergrid.rest.security.annotations.RequireAdminUserAccess;
 import org.apache.usergrid.security.shiro.principals.PrincipalIdentifier;
 import org.apache.usergrid.security.tokens.TokenInfo;
-import org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl;
+import org.apache.usergrid.security.tokens.impl.TokenServiceImpl;
 import org.apache.usergrid.security.tokens.exceptions.TokenException;
 import org.apache.usergrid.services.ServiceResults;
 import org.glassfish.jersey.server.mvc.Viewable;
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UsersResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UsersResource.java
index 6999841..634e5da 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UsersResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/management/users/UsersResource.java
@@ -29,7 +29,7 @@
 import org.apache.usergrid.rest.exceptions.AuthErrorInfo;
 import org.apache.usergrid.rest.exceptions.RedirectionException;
 import org.apache.usergrid.security.shiro.utils.SubjectUtils;
-import org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl;
+import org.apache.usergrid.security.tokens.impl.TokenServiceImpl;
 import org.glassfish.jersey.server.mvc.Viewable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +46,6 @@
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static org.apache.usergrid.rest.exceptions.SecurityException.mappableSecurityException;
 import static org.apache.usergrid.security.shiro.utils.SubjectUtils.isServiceAdmin;
-import static org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl.USERGRID_EXTERNAL_SSO_PROVIDER_URL;
 
 
 @Component( "org.apache.usergrid.rest.management.users.UsersResource" )
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
index 0a80d73..8d18521f 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ManagementResourceIT.java
@@ -25,7 +25,7 @@
 import org.apache.usergrid.rest.test.resource.AbstractRestIT;
 import org.apache.usergrid.rest.test.resource.model.*;
 import org.apache.usergrid.rest.test.resource.model.Collection;
-import org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl;
+import org.apache.usergrid.security.tokens.impl.TokenServiceImpl;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -38,9 +38,9 @@
 import java.io.IOException;
 import java.util.*;
 
-import static org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl.USERGRID_EXTERNAL_SSO_PROVIDER;
-import static org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl.USERGRID_EXTERNAL_SSO_PROVIDER_URL;
-import static org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl.USERGRID_EXTERNAL_SSO_ENABLED;
+import static org.apache.usergrid.security.tokens.impl.TokenServiceImpl.USERGRID_EXTERNAL_SSO_PROVIDER;
+import static org.apache.usergrid.security.tokens.impl.TokenServiceImpl.USERGRID_EXTERNAL_SSO_PROVIDER_URL;
+import static org.apache.usergrid.security.tokens.impl.TokenServiceImpl.USERGRID_EXTERNAL_SSO_ENABLED;
 import static org.apache.usergrid.utils.MapUtils.hashMap;
 import static org.junit.Assert.*;
 
diff --git a/stack/services/src/main/java/org/apache/usergrid/security/sso/SSOProviderFactory.java b/stack/services/src/main/java/org/apache/usergrid/security/sso/SSOProviderFactory.java
index 31e085e..b723d9e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/security/sso/SSOProviderFactory.java
+++ b/stack/services/src/main/java/org/apache/usergrid/security/sso/SSOProviderFactory.java
@@ -18,7 +18,7 @@
 
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
 import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl;
+import org.apache.usergrid.security.tokens.impl.TokenServiceImpl;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.util.List;
diff --git a/stack/services/src/main/java/org/apache/usergrid/security/tokens/cassandra/TokenServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/security/tokens/impl/TokenServiceImpl.java
similarity index 73%
rename from stack/services/src/main/java/org/apache/usergrid/security/tokens/cassandra/TokenServiceImpl.java
rename to stack/services/src/main/java/org/apache/usergrid/security/tokens/impl/TokenServiceImpl.java
index 6ea6de0..05859ad 100644
--- a/stack/services/src/main/java/org/apache/usergrid/security/tokens/cassandra/TokenServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/security/tokens/impl/TokenServiceImpl.java
@@ -14,22 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.usergrid.security.tokens.cassandra;
+package org.apache.usergrid.security.tokens.impl;
 
 
 import com.google.inject.Injector;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.mutation.Mutator;
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.management.ApplicationCreator;
 import org.apache.usergrid.management.ManagementService;
 import org.apache.usergrid.management.UserInfo;
 import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.token.TokenSerialization;
 import org.apache.usergrid.security.AuthPrincipalInfo;
 import org.apache.usergrid.security.AuthPrincipalType;
 import org.apache.usergrid.security.sso.SSOProviderFactory;
@@ -40,13 +37,10 @@
 import org.apache.usergrid.security.tokens.exceptions.ExpiredTokenException;
 import org.apache.usergrid.security.tokens.exceptions.InvalidTokenException;
 import org.apache.usergrid.security.sso.ExternalSSOProvider;
-import org.apache.usergrid.utils.ConversionUtils;
-import org.apache.usergrid.utils.JsonUtils;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.util.Assert;
 
 import javax.ws.rs.client.Client;
@@ -54,15 +48,10 @@
 import java.util.*;
 
 import static java.lang.System.currentTimeMillis;
-import static me.prettyprint.hector.api.factory.HFactory.createColumn;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.apache.commons.codec.binary.Base64.decodeBase64;
 import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;
 import static org.apache.commons.codec.digest.DigestUtils.sha;
-import static org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils.getColumnMap;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.PRINCIPAL_TOKEN_CF;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.TOKENS_CF;
-import static org.apache.usergrid.persistence.cassandra.Serializers.*;
+import static org.apache.usergrid.persistence.token.impl.TokenSerializationImpl.*;
 import static org.apache.usergrid.security.AuthPrincipalType.ADMIN_USER;
 import static org.apache.usergrid.security.tokens.TokenCategory.*;
 import static org.apache.usergrid.utils.ConversionUtils.*;
@@ -76,58 +65,6 @@
     private static final Logger logger = LoggerFactory.getLogger( TokenServiceImpl.class );
 
     public static final String PROPERTIES_AUTH_TOKEN_SECRET_SALT = "usergrid.auth.token_secret_salt";
-    public static final String PROPERTIES_AUTH_TOKEN_EXPIRES_FROM_LAST_USE =
-            "usergrid.auth.token_expires_from_last_use";
-    public static final String PROPERTIES_AUTH_TOKEN_REFRESH_REUSES_ID = "usergrid.auth.token_refresh_reuses_id";
-
-    private static final String TOKEN_UUID = "uuid";
-    private static final String TOKEN_TYPE = "type";
-    private static final String TOKEN_CREATED = "created";
-    private static final String TOKEN_ACCESSED = "accessed";
-    private static final String TOKEN_INACTIVE = "inactive";
-    private static final String TOKEN_DURATION = "duration";
-    private static final String TOKEN_PRINCIPAL_TYPE = "principal";
-    private static final String TOKEN_ENTITY = "entity";
-    private static final String TOKEN_APPLICATION = "application";
-    private static final String TOKEN_STATE = "state";
-    private static final String TOKEN_WORKFLOW_ORG_ID = "workflowOrgId";
-
-
-    private static final String TOKEN_TYPE_ACCESS = "access";
-
-
-    private static final Set<String> TOKEN_PROPERTIES;
-
-
-    static {
-        HashSet<String> set = new HashSet<String>();
-        set.add( TOKEN_UUID );
-        set.add( TOKEN_TYPE );
-        set.add( TOKEN_CREATED );
-        set.add( TOKEN_ACCESSED );
-        set.add( TOKEN_INACTIVE );
-        set.add( TOKEN_PRINCIPAL_TYPE );
-        set.add( TOKEN_ENTITY );
-        set.add( TOKEN_APPLICATION );
-        set.add( TOKEN_STATE );
-        set.add( TOKEN_DURATION );
-        set.add( TOKEN_WORKFLOW_ORG_ID );
-        TOKEN_PROPERTIES = Collections.unmodifiableSet(set);
-    }
-
-
-    private static final HashSet<String> REQUIRED_TOKEN_PROPERTIES = new HashSet<String>();
-
-
-    static {
-        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_UUID );
-        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_TYPE );
-        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_CREATED );
-        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_ACCESSED );
-        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_INACTIVE );
-        REQUIRED_TOKEN_PROPERTIES.add( TOKEN_DURATION );
-    }
-
 
     public static final String TOKEN_SECRET_SALT = "super secret token value";
 
@@ -150,27 +87,27 @@
     long maxEmailTokenAge = LONG_TOKEN_AGE;
     long maxOfflineTokenAge = LONG_TOKEN_AGE;
 
-    protected CassandraService cassandra;
-
     protected Properties properties;
 
     protected EntityManagerFactory emf;
 
-    protected MetricsFactory metricsFactory;
+    private MetricsFactory metricsFactory;
+
+    private TokenSerialization tokenSerialization;
 
 
     public TokenServiceImpl() {
     }
 
 
-    long getExpirationProperty( String name, long default_expiration ) {
+    private long getExpirationProperty( String name, long default_expiration ) {
         long expires = Long.parseLong(
                 properties.getProperty( "usergrid.auth.token." + name + ".expires", "" + default_expiration ) );
         return expires > 0 ? expires : default_expiration;
     }
 
 
-    long getExpirationForTokenType( TokenCategory tokenCategory ) {
+    private long getExpirationForTokenType( TokenCategory tokenCategory ) {
         Long l = tokenExpirations.get( tokenCategory );
         if ( l != null ) {
             return l;
@@ -179,7 +116,7 @@
     }
 
 
-    void setExpirationFromProperties( String name ) {
+    private void setExpirationFromProperties( String name ) {
         TokenCategory tokenCategory = TokenCategory.valueOf( name.toUpperCase() );
         long expires = Long.parseLong( properties.getProperty( "usergrid.auth.token." + name + ".expires",
                 "" + getExpirationForTokenType( tokenCategory ) ) );
@@ -372,22 +309,16 @@
 
             long maxTokenTtl = getMaxTtl(TokenCategory.getFromBase64String(token), tokenInfo.getPrincipal());
 
-            Mutator<UUID> batch = createMutator(cassandra.getUsergridApplicationKeyspace(), ue);
-
-            HColumn<String, Long> col =
-                    createColumn(TOKEN_ACCESSED, now, calcTokenTime(tokenInfo.getExpiration(maxTokenTtl)),
-                            se, le);
-            batch.addInsertion(uuid, TOKENS_CF, col);
-
             long inactive = now - tokenInfo.getAccessed();
             if (inactive > tokenInfo.getInactive()) {
-                col = createColumn(TOKEN_INACTIVE, inactive, calcTokenTime(tokenInfo.getExpiration(maxTokenTtl)),
-                        se, le);
-                batch.addInsertion(uuid, TOKENS_CF, col);
                 tokenInfo.setInactive(inactive);
+            }else{
+                // Long.MIN_VALUE indicates that nothing needs to be updated for token inactive property in
+                // tokenSerialization.updateTokenAccessTime()
+                inactive = Long.MIN_VALUE;
             }
 
-            batch.execute();
+            tokenSerialization.updateTokenAccessTime(uuid, now, inactive, calcTokenTime(tokenInfo.getExpiration(maxTokenTtl)));
         }
 
         return tokenInfo;
@@ -436,17 +367,10 @@
      */
     @Override
     public void removeTokens( AuthPrincipalInfo principal ) throws Exception {
-        List<UUID> tokenIds = getTokenUUIDS( principal );
 
-        Mutator<ByteBuffer> batch = createMutator( cassandra.getUsergridApplicationKeyspace(), be );
+        final List<UUID> tokenIds = getTokenUUIDS( principal );
+        tokenSerialization.deleteTokens(tokenIds, principalKey( principal ));
 
-        for ( UUID tokenId : tokenIds ) {
-            batch.addDeletion( bytebuffer( tokenId ), TOKENS_CF );
-        }
-
-        batch.addDeletion( principalKey( principal ), PRINCIPAL_TOKEN_CF );
-
-        batch.execute();
     }
 
 
@@ -469,21 +393,16 @@
             return;
         }
 
-        UUID tokenId = info.getUuid();
-
-        Mutator<ByteBuffer> batch = createMutator( cassandra.getUsergridApplicationKeyspace(), be );
+        final UUID tokenId = info.getUuid();
 
         // clean up the link in the principal -> token index if the principal is
         // on the token
         if ( info.getPrincipal() != null ) {
-            batch.addDeletion( principalKey( info.getPrincipal() ), PRINCIPAL_TOKEN_CF, bytebuffer( tokenId ),
-                    be );
+            tokenSerialization.revokeToken(tokenId, principalKey( info.getPrincipal()));
+        }else{
+            tokenSerialization.revokeToken(tokenId, null);
         }
 
-        // remove the token from the tokens cf
-        batch.addDeletion( bytebuffer( tokenId ), TOKENS_CF );
-
-        batch.execute();
     }
 
 
@@ -491,38 +410,57 @@
         if ( uuid == null ) {
             throw new InvalidTokenException( "No token specified" );
         }
-        Map<String, ByteBuffer> columns = getColumnMap( cassandra
-                .getColumns( cassandra.getUsergridApplicationKeyspace(), TOKENS_CF, uuid, TOKEN_PROPERTIES, se,
-                        be ) );
-        if ( !hasKeys( columns, REQUIRED_TOKEN_PROPERTIES ) ) {
+
+        Map<String, Object> tokenDetails = tokenSerialization.getTokenInfo(uuid);
+
+        if ( !hasKeys( tokenDetails, REQUIRED_TOKEN_PROPERTIES ) ) {
             throw new InvalidTokenException( "Token not found in database" );
         }
-        String type = string( columns.get( TOKEN_TYPE ) );
-        long created = getLong( columns.get( TOKEN_CREATED ) );
-        long accessed = getLong( columns.get( TOKEN_ACCESSED ) );
-        long inactive = getLong( columns.get( TOKEN_INACTIVE ) );
-        long duration = getLong( columns.get( TOKEN_DURATION ) );
-        String principalTypeStr = string( columns.get( TOKEN_PRINCIPAL_TYPE ) );
+
+        String type;
+        long created, accessed, inactive, duration;
+        try {
+            type = (String) tokenDetails.get(TOKEN_TYPE);
+            created = (long) tokenDetails.get(TOKEN_CREATED);
+            accessed = (long) tokenDetails.get(TOKEN_ACCESSED);
+            inactive = (long) tokenDetails.get(TOKEN_INACTIVE);
+            duration = (long) tokenDetails.get(TOKEN_DURATION);
+        } catch (ClassCastException cce){
+            logger.error("Unable to cast token info to primitive type: {}", cce);
+            throw new RuntimeException("Unable to cast token info to primitive type");
+        } catch (NullPointerException npe){
+            logger.error("Unable to obtain token info from serialization layer, on or more missing properties: {}", npe);
+            throw new RuntimeException("Unable to obtain token info from serialization layer, on or more missing properties");
+        }
+
+        String principalTypeStr = (String) tokenDetails.get(TOKEN_PRINCIPAL_TYPE);
+
         AuthPrincipalType principalType = null;
         if ( principalTypeStr != null ) {
             try {
                 principalType = AuthPrincipalType.valueOf( principalTypeStr.toUpperCase() );
             }
             catch ( IllegalArgumentException e ) {
+                logger.warn("Unable to convert authPrincipal Type from string to enum");
             }
         }
         AuthPrincipalInfo principal = null;
         if ( principalType != null ) {
-            UUID entityId = uuid( columns.get( TOKEN_ENTITY ) );
-            UUID appId = uuid( columns.get( TOKEN_APPLICATION ) );
+            UUID entityId = (UUID) tokenDetails.get(TOKEN_ENTITY);
+            UUID appId = (UUID) tokenDetails.get(TOKEN_APPLICATION);
             principal = new AuthPrincipalInfo( principalType, entityId, appId );
         }
-        @SuppressWarnings("unchecked") Map<String, Object> state =
-                ( Map<String, Object> ) JsonUtils.fromByteBuffer( columns.get( TOKEN_STATE ) );
+
+        @SuppressWarnings("unchecked")
+        Map<String, Object> state = ( Map<String, Object> ) tokenDetails.get( TOKEN_STATE );
 
         UUID workflowOrgId = null;
-        if (columns.containsKey(TOKEN_WORKFLOW_ORG_ID)) {
-            workflowOrgId = ConversionUtils.uuid(columns.get(TOKEN_WORKFLOW_ORG_ID));
+        if (tokenDetails.containsKey(TOKEN_WORKFLOW_ORG_ID)) {
+            try {
+                workflowOrgId = (UUID) tokenDetails.get(TOKEN_WORKFLOW_ORG_ID);
+            } catch (ClassCastException cce){
+                logger.error("Unable to cast {} to primitive UUID type", TOKEN_WORKFLOW_ORG_ID);
+            }
         }
 
         return new TokenInfo( uuid, type, created, accessed, inactive, duration, principal, state, workflowOrgId );
@@ -531,81 +469,50 @@
 
     private void putTokenInfo( TokenInfo tokenInfo ) throws Exception {
 
-        ByteBuffer tokenUUID = bytebuffer( tokenInfo.getUuid() );
-
-        Keyspace ko = cassandra.getUsergridApplicationKeyspace();
-
-        Mutator<ByteBuffer> m = createMutator( ko, be );
-
         int ttl = calcTokenTime( tokenInfo.getDuration() );
+        final Map<String, Object> tokenDetails = new HashMap<>();
 
-        m.addInsertion( tokenUUID, TOKENS_CF,
-                createColumn( TOKEN_UUID, bytebuffer( tokenInfo.getUuid() ), ttl, se, be ) );
-        m.addInsertion( tokenUUID, TOKENS_CF,
-                createColumn( TOKEN_TYPE, bytebuffer( tokenInfo.getType() ), ttl, se, be ) );
-        m.addInsertion( tokenUUID, TOKENS_CF,
-                createColumn( TOKEN_CREATED, bytebuffer( tokenInfo.getCreated() ), ttl, se, be ) );
-        m.addInsertion( tokenUUID, TOKENS_CF,
-                createColumn( TOKEN_ACCESSED, bytebuffer( tokenInfo.getAccessed() ), ttl, se, be ) );
-        m.addInsertion( tokenUUID, TOKENS_CF,
-                createColumn( TOKEN_INACTIVE, bytebuffer( tokenInfo.getInactive() ), ttl, se, be ) );
-        m.addInsertion( tokenUUID, TOKENS_CF,
-                createColumn( TOKEN_DURATION, bytebuffer( tokenInfo.getDuration() ), ttl, se, be ) );
+        tokenDetails.put(TOKEN_UUID, tokenInfo.getUuid());
+        tokenDetails.put(TOKEN_TYPE, tokenInfo.getType());
+        tokenDetails.put(TOKEN_CREATED, tokenInfo.getCreated());
+        tokenDetails.put(TOKEN_ACCESSED, tokenInfo.getAccessed());
+        tokenDetails.put(TOKEN_INACTIVE, tokenInfo.getInactive());
+        tokenDetails.put(TOKEN_DURATION, tokenInfo.getDuration());
 
+        ByteBuffer principalKeyBuffer = null;
         if ( tokenInfo.getPrincipal() != null ) {
 
             AuthPrincipalInfo principalInfo = tokenInfo.getPrincipal();
 
-            m.addInsertion( tokenUUID, TOKENS_CF,
-                    createColumn( TOKEN_PRINCIPAL_TYPE, bytebuffer( principalInfo.getType().toString().toLowerCase() ),
-                            ttl, se, be ) );
-            m.addInsertion( tokenUUID, TOKENS_CF,
-                    createColumn( TOKEN_ENTITY, bytebuffer( principalInfo.getUuid() ), ttl, se, be ) );
-            m.addInsertion( tokenUUID, TOKENS_CF,
-                    createColumn( TOKEN_APPLICATION, bytebuffer( principalInfo.getApplicationId() ), ttl, se,
-                            be ) );
+            tokenDetails.put(TOKEN_PRINCIPAL_TYPE, principalInfo.getType().toString().toLowerCase());
+            tokenDetails.put(TOKEN_ENTITY, principalInfo.getUuid());
+            tokenDetails.put(TOKEN_APPLICATION, principalInfo.getApplicationId());
 
-      /*
-       * write to the PRINCIPAL+TOKEN The format is as follow
-       *
-       * appid+principalId+principalType :{ tokenuuid: 0x00}
-       */
+          /*
+           * write to the PRINCIPAL+TOKEN The format is as follow
+           *
+           * appid+principalId+principalType :{ tokenuuid: 0x00}
+           */
+            principalKeyBuffer = principalKey( principalInfo );
 
-            ByteBuffer rowKey = principalKey( principalInfo );
-            m.addInsertion( rowKey, PRINCIPAL_TOKEN_CF, createColumn( tokenUUID, HOLDER, ttl, be, be ) );
         }
 
         if ( tokenInfo.getState() != null ) {
-            m.addInsertion( tokenUUID, TOKENS_CF,
-                    createColumn( TOKEN_STATE, JsonUtils.toByteBuffer( tokenInfo.getState() ), ttl, se,
-                            be ) );
+            tokenDetails.put(TOKEN_STATE, tokenInfo.getState());
         }
 
         if ( tokenInfo.getWorkflowOrgId() != null ) {
-            m.addInsertion( tokenUUID, TOKENS_CF,
-                    createColumn( TOKEN_WORKFLOW_ORG_ID, bytebuffer( tokenInfo.getWorkflowOrgId() ), ttl, se, be ) );
+            tokenDetails.put(TOKEN_WORKFLOW_ORG_ID, tokenInfo.getWorkflowOrgId());
         }
 
-        m.execute();
+        tokenSerialization.putTokenInfo(tokenInfo.getUuid(), tokenDetails, principalKeyBuffer, ttl);
     }
 
 
     /** Load all the token uuids for a principal info */
     private List<UUID> getTokenUUIDS( AuthPrincipalInfo principal ) throws Exception {
 
-        ByteBuffer rowKey = principalKey( principal );
-
-        List<HColumn<ByteBuffer, ByteBuffer>> cols = cassandra
-                .getColumns( cassandra.getUsergridApplicationKeyspace(), PRINCIPAL_TOKEN_CF, rowKey, null, null, Integer.MAX_VALUE,
-                        false );
-
-        List<UUID> results = new ArrayList<UUID>( cols.size() );
-
-        for ( HColumn<ByteBuffer, ByteBuffer> col : cols ) {
-            results.add( uuid( col.getName() ) );
-        }
-
-        return results;
+        return tokenSerialization.getTokensForPrincipal(principalKey( principal ));
     }
 
 
@@ -688,19 +595,12 @@
         return maxPersistenceTokenAge;
     }
 
-
-    @Autowired
-    @Qualifier("cassandraService")
-    public void setCassandraService( CassandraService cassandra ) {
-        this.cassandra = cassandra;
-    }
-
-
     @Autowired
     public void setEntityManagerFactory( EntityManagerFactory emf ) {
         this.emf = emf;
         final Injector injector = ((CpEntityManagerFactory)emf).getApplicationContext().getBean( Injector.class );
-        metricsFactory = injector.getInstance(MetricsFactory.class);
+        this.metricsFactory = injector.getInstance(MetricsFactory.class);
+        this.tokenSerialization = injector.getInstance(TokenSerialization.class);
     }
 
 
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
index 37650c5..6693939 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -64,12 +64,6 @@
                     for (Entity e : results.getEntities()) {
                         entityManager.updateProperties(e, clearPushtokenMap);
                     }
-                    // uuid
-                    query = Query.fromQL( notifier.getName() + notfierPostFix  + " = " + entry.getKey() + "");
-                    results = entityManager.searchCollection(entityManager.getApplication(),  "devices", query);
-                    for (Entity e : results.getEntities()) {
-                        entityManager.updateProperties(e, clearPushtokenMap);
-                    }
                 }catch (Exception e){
                     logger.error("failed to remove token",e);
                 }
diff --git a/stack/services/src/main/resources/usergrid-services-context.xml b/stack/services/src/main/resources/usergrid-services-context.xml
index 268b5ec..45f4c48 100644
--- a/stack/services/src/main/resources/usergrid-services-context.xml
+++ b/stack/services/src/main/resources/usergrid-services-context.xml
@@ -63,8 +63,7 @@
 
     <bean id="taskExecutor" class="org.springframework.core.task.SyncTaskExecutor"/>
 
-    <bean id="tokenService" class="org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl">
-        <property name="cassandraService" ref="cassandraService"/>
+    <bean id="tokenService" class="org.apache.usergrid.security.tokens.impl.TokenServiceImpl">
         <property name="entityManagerFactory" ref="entityManagerFactory"/>
     </bean>
 
diff --git a/stack/services/src/test/java/org/apache/usergrid/security/tokens/TokenServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/security/tokens/TokenServiceIT.java
index c4cc10c..452ee61 100644
--- a/stack/services/src/test/java/org/apache/usergrid/security/tokens/TokenServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/security/tokens/TokenServiceIT.java
@@ -38,7 +38,7 @@
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.security.AuthPrincipalInfo;
 import org.apache.usergrid.security.AuthPrincipalType;
-import org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl;
+import org.apache.usergrid.security.tokens.impl.TokenServiceImpl;
 import org.apache.usergrid.security.tokens.exceptions.ExpiredTokenException;
 import org.apache.usergrid.security.tokens.exceptions.InvalidTokenException;
 import org.apache.usergrid.utils.UUIDUtils;
diff --git a/stack/websocket/src/test/resources/testApplicationContext.xml b/stack/websocket/src/test/resources/testApplicationContext.xml
index d92f4fb..bbf7b93 100644
--- a/stack/websocket/src/test/resources/testApplicationContext.xml
+++ b/stack/websocket/src/test/resources/testApplicationContext.xml
@@ -130,7 +130,7 @@
 		</constructor-arg>
 	</bean>
 
-	<bean id="tokenService" class="org.apache.usergrid.security.tokens.cassandra.TokenServiceImpl"/>
+	<bean id="tokenService" class="org.apache.usergrid.security.tokens.impl.TokenServiceImpl"/>
 
 	<bean id="managementService" class="org.apache.usergrid.management.cassandra.ManagementServiceImpl" />