Rolled back ES version to 1.3.2

Added more debug logging to migrations
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 6da58bb..e07bb00 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -35,6 +35,7 @@
 
 import org.apache.commons.lang.StringUtils;
 
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AbstractEntity;
 import org.apache.usergrid.persistence.Entity;
@@ -535,6 +536,15 @@
         }
     }
 
+
+    @Override
+    public long performEntityCount() {
+        //TODO, this really needs to be a task that writes this data somewhere since this will get
+        //progressively slower as the system expands
+        return AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ).longCount().toBlocking().last();
+    }
+
+
     /**
      * @param managerCache the managerCache to set
      */
@@ -670,6 +680,13 @@
 
 
     @Override
+    public void setMigrationVersion( final int version ) {
+        dataMigrationManager.resetToVersion( version );
+        dataMigrationManager.invalidate();
+    }
+
+
+    @Override
     public void flushEntityManagerCaches() {
         Map<UUID, EntityManager>  entityManagersMap = entityManagers.asMap();
         for ( UUID appUuid : entityManagersMap.keySet() ) {
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
index de0dd2a..826d19d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
@@ -140,6 +140,13 @@
         factory.setApplicationContext(ac);
     }
 
+
+    @Override
+    public long performEntityCount() {
+        return factory.performEntityCount();
+    }
+
+
     @Override
     public void flushEntityManagerCaches() {
         factory.flushEntityManagerCaches();
@@ -180,6 +187,12 @@
 
 
     @Override
+    public void setMigrationVersion( final int version ) {
+        factory.setMigrationVersion( version );
+    }
+
+
+    @Override
     public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po) {
         factory.rebuildCollectionIndex(appId, collection, po);
     }
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index 4acac32..5dffb9f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -141,8 +141,20 @@
      */
     public int getMigrateDataVersion();
 
+    /**
+     * Force the migration version to the specified version
+     * @param version
+     */
+    public void setMigrationVersion(int version);
+
     public void setApplicationContext(ApplicationContext ac);
 
+    /**
+     * Perform a realtime count of every entity in the system.  This can be slow as it traverses the entire system graph
+     * @return
+     */
+    public long performEntityCount();
+
     /** For testing purposes */
     public void flushEntityManagerCaches();
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
index 40182d9..8d9e239 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
@@ -392,6 +392,12 @@
     }
 
 
+    @Override
+    public long performEntityCount() {
+        throw new UnsupportedOperationException("Not supported in v1");
+    }
+
+
     public void setCounterUtils( CounterUtils counterUtils ) {
         this.counterUtils = counterUtils;
     }
@@ -444,13 +450,19 @@
 
     @Override
     public String getMigrateDataStatus() {
-        return null;
+        throw new UnsupportedOperationException("Not supported in v1");
     }
 
 
     @Override
     public int getMigrateDataVersion() {
-        return 0;
+        throw new UnsupportedOperationException("Not supported in v1");
+    }
+
+
+    @Override
+    public void setMigrationVersion( final int version ) {
+        throw new UnsupportedOperationException("Not supported in v1");
     }
 
 
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index 1e6f0d5..c3a7e65 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -60,7 +60,6 @@
     private GraphShardVersionMigration graphShardVersionMigration;
     private Keyspace keyspace;
     private MigrationManager migrationManager;
-    private EntityManagerFactory emf;
     private ManagerCache managerCache;
     private DataMigrationManager dataMigrationManager;
     private MigrationInfoSerialization migrationInfoSerialization;
@@ -69,7 +68,6 @@
     @Before
     public void setup() {
         injector = CpSetup.getInjector();
-        emf = setup.getEmf();
         graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
         keyspace = injector.getInstance( Keyspace.class );
         migrationManager = injector.getInstance( MigrationManager.class );
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
index e36c40b..c633a08 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
@@ -48,6 +48,12 @@
     public int getCurrentVersion();
 
     /**
+     * Reset the system version to the version specified
+     * @param version
+     */
+    public void resetToVersion(final int version);
+
+    /**
      * Invalidate the cache for versions
      */
     public void invalidate();
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index 9ab388f..38c24e4 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -138,7 +138,7 @@
             catch ( Throwable throwable ) {
                 observer.failed( migrationVersion, "Exception thrown during migration", throwable );
 
-                LOG.error( "Unable to migration version {}.", migrationVersion, throwable );
+                LOG.error( "Unable to migrate to version {}.", migrationVersion, throwable );
 
                 return;
             }
@@ -185,6 +185,17 @@
 
 
     @Override
+    public void resetToVersion( final int version ) {
+        final int highestAllowed = migrationTreeMap.lastKey();
+
+        Preconditions.checkArgument( version <= highestAllowed, "You cannot set a version higher than the max of " + highestAllowed);
+        Preconditions.checkArgument( version >= 0, "You must specify a version of 0 or greater" );
+
+        migrationInfoSerialization.setVersion( version );
+    }
+
+
+    @Override
     public String getLastStatus() {
         return migrationInfoSerialization.getStatusMessage();
     }
@@ -248,9 +259,12 @@
 
         @Override
         public void update( final int migrationVersion, final String message ) {
-            final String error = String.format( "Migration version %d.  %s", migrationVersion, message );
+            final String formattedOutput = String.format( "Migration version %d.  %s", migrationVersion, message );
 
-            migrationInfoSerialization.setStatusMessage( error );
+            //Print this to the info log
+            LOG.info( formattedOutput );
+
+            migrationInfoSerialization.setStatusMessage( formattedOutput );
         }
 
 
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index b44fca7..9bd53fa 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -59,7 +59,7 @@
         <commons.collections.version>3.2.1</commons.collections.version>
         <commons.io.version>2.4</commons.io.version>
         <commons.lang.version>3.1</commons.lang.version>
-        <elasticsearch.version>1.4.0</elasticsearch.version>
+        <elasticsearch.version>1.3.2</elasticsearch.version>
         <fasterxml-uuid.version>3.1.3</fasterxml-uuid.version>
         <guava.version>15.0</guava.version>
         <guice.version>4.0-beta5</guice.version>
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 51a4aee..9d6baa5 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -35,6 +35,7 @@
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.exceptions.QueryParseException;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
new file mode 100644
index 0000000..84b1085
--- /dev/null
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.rest;
+
+
+import java.util.Map;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.json.JSONWithPadding;
+
+
+@Component
+@Scope( "singleton" )
+@Produces( {
+        MediaType.APPLICATION_JSON, "application/javascript", "application/x-javascript", "text/ecmascript",
+        "application/ecmascript", "text/jscript"
+} )
+public class MigrateResource extends AbstractContextResource {
+
+    private static final Logger logger = LoggerFactory.getLogger( MigrateResource.class );
+
+
+    public MigrateResource() {
+        logger.info( "SystemResource initialized" );
+    }
+
+
+
+    @RequireSystemAccess
+    @PUT
+    @Path( "run" )
+    public JSONWithPadding migrateData( @Context UriInfo ui,
+                                        @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+            throws Exception {
+
+        ApiResponse response = createApiResponse();
+        response.setAction( "Migrate Data" );
+        //TODO make this use the task scheduler
+
+
+        final Thread migrate = new Thread() {
+
+            @Override
+            public void run() {
+                logger.info( "Rebuilding all indexes" );
+
+                try {
+                    emf.migrateData();
+                }
+                catch ( Exception e ) {
+                    logger.error( "Unable to rebuild indexes", e );
+                }
+            }
+        };
+
+        migrate.setName( "Index migrate data formats" );
+        migrate.setDaemon( true );
+        migrate.start();
+
+
+        response.setSuccess();
+
+        return new JSONWithPadding( response, callback );
+    }
+
+
+    @RequireSystemAccess
+    @PUT
+    @Path( "set" )
+    public JSONWithPadding setMigrationVersion( @Context UriInfo ui, Map<String, Object> json,
+                                                @QueryParam( "callback" ) @DefaultValue( "" ) String callback )
+            throws Exception {
+
+        logger.debug( "newOrganization" );
+
+        Preconditions.checkNotNull( json, "You must provide a json body" );
+
+        String version = ( String ) json.get( "version" );
+
+        Preconditions.checkArgument( version != null && version.length() > 0,
+                "You must specify a version field in your json" );
+
+
+        int intVersion = Integer.parseInt( version );
+
+        emf.setMigrationVersion( intVersion );
+
+
+        return migrateStatus( ui, callback );
+    }
+
+
+    @RequireSystemAccess
+    @GET
+    @Path( "status" )
+    public JSONWithPadding migrateStatus( @Context UriInfo ui,
+                                          @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+            throws Exception {
+
+        ApiResponse response = createApiResponse();
+        response.setAction( "Migrate Schema indexes" );
+
+        ObjectNode node = JsonNodeFactory.instance.objectNode();
+        node.put( "currentVersion", emf.getMigrateDataVersion() );
+        node.put( "lastMessage", emf.getMigrateDataStatus() );
+        response.setProperty( "status", node );
+
+        response.setSuccess();
+
+        return new JSONWithPadding( response, callback );
+    }
+
+    @RequireSystemAccess
+       @GET
+       @Path( "count" )
+       public JSONWithPadding migrateCount( @Context UriInfo ui,
+                                             @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+               throws Exception {
+
+           ApiResponse response = createApiResponse();
+           response.setAction( "Current entity count in system" );
+
+           response.setProperty( "count", emf.performEntityCount()  );
+
+           response.setSuccess();
+
+           return new JSONWithPadding( response, callback );
+       }
+
+}
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
index bdf647b..1961aa1 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
@@ -17,6 +17,7 @@
 package org.apache.usergrid.rest;
 
 
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
@@ -41,8 +42,10 @@
 import org.apache.usergrid.persistence.EntityManagerFactory.ProgressObserver;
 import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+import org.apache.usergrid.rest.management.organizations.OrganizationsResource;
 import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
 
+import com.clearspring.analytics.util.Preconditions;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.sun.jersey.api.json.JSONWithPadding;
@@ -113,65 +116,11 @@
         return new JSONWithPadding( response, callback );
     }
 
-
-    @RequireSystemAccess
-    @PUT
-    @Path( "migrate/run" )
-    public JSONWithPadding migrateData( @Context UriInfo ui,
-                                        @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
-            throws Exception {
-
-        ApiResponse response = createApiResponse();
-        response.setAction( "Migrate Data" );
-
-
-        final Thread migrate = new Thread() {
-
-            @Override
-            public void run() {
-                logger.info( "Rebuilding all indexes" );
-
-                try {
-                    emf.migrateData();
-                }
-                catch ( Exception e ) {
-                    logger.error( "Unable to rebuild indexes", e );
-                }
-            }
-        };
-
-        migrate.setName( "Index migrate data formats" );
-        migrate.setDaemon( true );
-        migrate.start();
-
-
-        response.setSuccess();
-
-        return new JSONWithPadding( response, callback );
+    @Path( "migrate" )
+    public MigrateResource doSomething(){
+        return getSubResource( MigrateResource.class );
     }
 
-
-    @RequireSystemAccess
-    @GET
-    @Path( "migrate/status" )
-    public JSONWithPadding migrateStatus( @Context UriInfo ui,
-                                          @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
-            throws Exception {
-
-        ApiResponse response = createApiResponse();
-        response.setAction( "Migrate Schema indexes" );
-
-        ObjectNode node = JsonNodeFactory.instance.objectNode();
-        node.put( "currentVersion", emf.getMigrateDataVersion() );
-        node.put( "lastMessage", emf.getMigrateDataStatus() );
-        response.setProperty( "status", node );
-
-        response.setSuccess();
-
-        return new JSONWithPadding( response, callback );
-    }
-
-
     @RequireSystemAccess
     @PUT
     @Path( "index/rebuild" )