Merge pull request #5 from apigee/mutationflushing

Mutationflushing
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 09c0df5..8f7cb44 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -53,6 +53,9 @@
 #Write consistency level for the cassandra cluster
 cassandra.writecl=QUORUM
 
+#The maximum number of pending mutations allowed in ram before it is flushed to cassandra
+cassandra.mutation.flushsize=2000
+
 #Keyspace to use for locking
 #Note that if this is deployed in a production cluster, the RF on the keyspace MUST be updated to use an odd number for it's replication Factor.
 #Even numbers for RF can potentially case the locks to fail, via "split brain" when read at QUORUM on lock verification
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
index be840e5..0ada3fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/CassandraService.java
@@ -39,6 +39,7 @@
 import org.apache.usergrid.persistence.IndexBucketLocator.IndexType;
 import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 
 import me.prettyprint.cassandra.connection.HConnectionManager;
 import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
@@ -166,6 +167,12 @@
         systemKeyspace =
                 HFactory.createKeyspace( SYSTEM_KEYSPACE, cluster, consistencyLevelPolicy, ON_FAIL_TRY_ALL_AVAILABLE,
                         accessMap );
+
+
+        final int flushSize = getIntValue( properties, "cassandra.mutation.flushsize", 2000 );
+        CountingMutator.MAX_SIZE = flushSize;
+
+
     }
 
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
index 3b07fd7..73f89ed 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/RelationManagerImpl.java
@@ -61,6 +61,7 @@
 import org.apache.usergrid.persistence.geo.ConnectionGeoSearch;
 import org.apache.usergrid.persistence.geo.EntityLocationRef;
 import org.apache.usergrid.persistence.geo.model.Point;
+import org.apache.usergrid.persistence.hector.CountingMutator;
 import org.apache.usergrid.persistence.query.ir.AllNode;
 import org.apache.usergrid.persistence.query.ir.NameIdentifierNode;
 import org.apache.usergrid.persistence.query.ir.QueryNode;
@@ -1062,7 +1063,7 @@
     public void updateEntityConnection( boolean disconnect, ConnectionRefImpl connection ) throws Exception {
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), be );
 
         // Make or break the connection
 
@@ -1550,7 +1551,7 @@
         }
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator(cass.getApplicationKeyspace( applicationId ), be );
 
         batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
 
@@ -1577,7 +1578,7 @@
         }
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), be );
 
         Schema schema = getDefaultSchema();
         for ( Entry<String, List<UUID>> entry : collectionsByType.entrySet() ) {
@@ -1635,7 +1636,7 @@
 
         if ( itemEntity != null ) {
             UUID timestampUuid = newTimeUUID();
-            Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+            Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), be );
 
             batchAddToCollection( batch, collectionName, itemEntity, timestampUuid );
 
@@ -1678,7 +1679,7 @@
         }
 
         UUID timestampUuid = newTimeUUID();
-        Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+        Mutator<ByteBuffer> batch = CountingMutator.createFlushingMutator( cass.getApplicationKeyspace( applicationId ), be );
 
         batchRemoveFromCollection( batch, collectionName, itemEntity, timestampUuid );
 
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/hector/CountingMutator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/hector/CountingMutator.java
new file mode 100644
index 0000000..8f8107a
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/hector/CountingMutator.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.hector;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.Serializer;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HCounterColumn;
+import me.prettyprint.hector.api.beans.HCounterSuperColumn;
+import me.prettyprint.hector.api.beans.HSuperColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.MutationResult;
+import me.prettyprint.hector.api.mutation.Mutator;
+
+
+/**
+ * Mutator proxy that automatically flushes mutations when they reach a size of 1k
+ */
+public class CountingMutator<K> implements Mutator<K> {
+
+    private static final Logger logger = LoggerFactory.getLogger( CountingMutator.class );
+
+    /**
+     * MAX size of pending mutations we'll handle before flushing.  Defaults to 2k, can be changed on purpose
+     * for ease of testing
+     */
+    public static int MAX_SIZE = 2000;
+
+    /**
+     * The maximum size a mutation can have
+     */
+    private final int maxSize;
+
+    private final Mutator<K> target;
+
+
+    /**
+     * Create a mutator that will flush if our max size is reached
+     */
+    public CountingMutator( final Mutator<K> target, int maxSize ) {
+        this.target = target;
+        this.maxSize = maxSize;
+    }
+
+
+    @Override
+    public <N, V> MutationResult insert( final K key, final String cf, final HColumn<N, V> c ) {
+        return target.insert( key, cf, c );
+    }
+
+
+    @Override
+    public <SN, N, V> MutationResult insert( final K key, final String cf, final HSuperColumn<SN, N, V> superColumn ) {
+        return target.insert( key, cf, superColumn );
+    }
+
+
+    @Override
+    public <N> MutationResult delete( final K key, final String cf, final N columnName,
+                                      final Serializer<N> nameSerializer ) {
+        return target.delete( key, cf, columnName, nameSerializer );
+    }
+
+
+    @Override
+    public <N> MutationResult delete( final K key, final String cf, final N columnName,
+                                      final Serializer<N> nameSerializer, final long clock ) {
+        return target.delete( key, cf, columnName, nameSerializer, clock );
+    }
+
+
+    @Override
+    public <SN, N> MutationResult subDelete( final K key, final String cf, final SN supercolumnName, final N columnName,
+                                             final Serializer<SN> sNameSerializer,
+                                             final Serializer<N> nameSerializer ) {
+        return target.subDelete( key, cf, supercolumnName, columnName, sNameSerializer, nameSerializer );
+    }
+
+
+    @Override
+    public <SN> MutationResult superDelete( final K key, final String cf, final SN supercolumnName,
+                                            final Serializer<SN> sNameSerializer ) {
+        return target.superDelete( key, cf, supercolumnName, sNameSerializer );
+    }
+
+
+    @Override
+    public <SN> Mutator<K> addSuperDelete( final K key, final String cf, final SN sColumnName,
+                                           final Serializer<SN> sNameSerializer ) {
+        target.addSuperDelete( key, cf, sColumnName, sNameSerializer );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N, V> Mutator<K> addInsertion( final K key, final String cf, final HColumn<N, V> c ) {
+        target.addInsertion( key, cf, c );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <SN, N, V> Mutator<K> addInsertion( final K key, final String cf, final HSuperColumn<SN, N, V> sc ) {
+        target.addInsertion( key, cf, sc );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N> Mutator<K> addDeletion( final K key, final String cf, final N columnName,
+                                       final Serializer<N> nameSerializer ) {
+        target.addDeletion( key, cf, columnName, nameSerializer );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N> Mutator<K> addDeletion( final K key, final String cf ) {
+        target.addDeletion( key, cf );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N> Mutator<K> addDeletion( final Iterable<K> keys, final String cf ) {
+        target.addDeletion( keys, cf );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N> Mutator<K> addDeletion( final Iterable<K> keys, final String cf, final long clock ) {
+        target.addDeletion( keys, cf, clock );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N> Mutator<K> addDeletion( final K key, final String cf, final long clock ) {
+        target.addDeletion( key, cf, clock );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N> Mutator<K> addDeletion( final K key, final String cf, final N columnName,
+                                       final Serializer<N> nameSerializer, final long clock ) {
+        target.addDeletion( key, cf, columnName, nameSerializer, clock );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <SN, N, V> Mutator<K> addSubDelete( final K key, final String cf, final HSuperColumn<SN, N, V> sc ) {
+        target.addSubDelete( key, cf, sc );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <SN, N, V> Mutator<K> addSubDelete( final K key, final String cf, final HSuperColumn<SN, N, V> sc,
+                                               final long clock ) {
+        target.addSubDelete( key, cf, sc, clock );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <SN, N> Mutator<K> addSubDelete( final K key, final String cf, final SN sColumnName, final N columnName,
+                                            final Serializer<SN> sNameSerializer, final Serializer<N> nameSerialer ) {
+        target.addSubDelete( key, cf, sColumnName, columnName, sNameSerializer, nameSerialer );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <SN, N> Mutator<K> addSubDelete( final K key, final String cf, final SN sColumnName, final N columnName,
+                                            final Serializer<SN> sNameSerializer, final Serializer<N> nameSerialer,
+                                            final long clock ) {
+        target.addSubDelete( key, cf, sColumnName, columnName, sNameSerializer, nameSerialer, clock );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public MutationResult execute() {
+        return target.execute();
+    }
+
+
+    @Override
+    public Mutator<K> discardPendingMutations() {
+        return target.discardPendingMutations();
+    }
+
+
+    @Override
+    public <N> MutationResult insertCounter( final K key, final String cf, final HCounterColumn<N> c ) {
+        return target.insertCounter( key, cf, c );
+    }
+
+
+    @Override
+    public <SN, N> MutationResult insertCounter( final K key, final String cf,
+                                                 final HCounterSuperColumn<SN, N> superColumn ) {
+        return target.insertCounter( key, cf, superColumn );
+    }
+
+
+    @Override
+    public <N> MutationResult incrementCounter( final K key, final String cf, final N columnName,
+                                                final long increment ) {
+        return target.incrementCounter( key, cf, columnName, increment );
+    }
+
+
+    @Override
+    public <N> MutationResult decrementCounter( final K key, final String cf, final N columnName,
+                                                final long increment ) {
+        return target.decrementCounter( key, cf, columnName, increment );
+    }
+
+
+    @Override
+    public <N> MutationResult deleteCounter( final K key, final String cf, final N columnName,
+                                             final Serializer<N> nameSerializer ) {
+        return target.deleteCounter( key, cf, columnName, nameSerializer );
+    }
+
+
+    @Override
+    public <SN, N> MutationResult subDeleteCounter( final K key, final String cf, final SN supercolumnName,
+                                                    final N columnName, final Serializer<SN> sNameSerializer,
+                                                    final Serializer<N> nameSerializer ) {
+        return target.subDeleteCounter( key, cf, supercolumnName, columnName, sNameSerializer, nameSerializer );
+    }
+
+
+    @Override
+    public <N> Mutator<K> addCounter( final K key, final String cf, final HCounterColumn<N> c ) {
+        target.addCounter( key, cf, c );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <SN, N> Mutator<K> addCounter( final K key, final String cf, final HCounterSuperColumn<SN, N> sc ) {
+        target.addCounter( key, cf, sc );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N> Mutator<K> addCounterDeletion( final K key, final String cf, final N counterColumnName,
+                                              final Serializer<N> nameSerializer ) {
+        target.addCounterDeletion( key, cf, counterColumnName, nameSerializer );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <N> Mutator<K> addCounterDeletion( final K key, final String cf ) {
+        target.addCounterDeletion( key, cf );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public <SN, N> Mutator<K> addCounterSubDeletion( final K key, final String cf,
+                                                     final HCounterSuperColumn<SN, N> sc ) {
+        target.addCounterSubDeletion( key, cf, sc );
+        checkAndFlush();
+        return this;
+    }
+
+
+    @Override
+    public int getPendingMutationCount() {
+        return target.getPendingMutationCount();
+    }
+
+
+    /**
+     * If our size is >= than our max, we'll flush
+     */
+    public void checkAndFlush() {
+
+        if ( target.getPendingMutationCount() >= maxSize ) {
+            logger.info( "Max mutation size of {} reached.  Flushing", maxSize);
+            target.execute();
+        }
+    }
+
+
+    /**
+     * Create a mutator that will flush when the maximum size is reached
+     * @param keyspace
+     * @param keySerializer
+     * @param <K>
+     * @return
+     */
+    public static <K> CountingMutator<K> createFlushingMutator( Keyspace keyspace, Serializer<K> keySerializer ) {
+        Mutator<K> target = HFactory.createMutator( keyspace, keySerializer );
+
+        return new CountingMutator<K>( target, MAX_SIZE );
+    }
+}
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
new file mode 100644
index 0000000..14ac43e
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CountingMutatorIT.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.cassandra.Concurrent;
+import org.apache.usergrid.persistence.Results.Level;
+import org.apache.usergrid.persistence.entities.Group;
+import org.apache.usergrid.persistence.entities.User;
+import org.apache.usergrid.persistence.hector.CountingMutator;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEMENT_APPLICATION_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+@Concurrent()
+public class CountingMutatorIT extends AbstractCoreIT {
+    private static final Logger LOG = LoggerFactory.getLogger( CountingMutatorIT.class );
+
+    private int originalAmount;
+
+
+    public CountingMutatorIT() {
+        super();
+    }
+
+
+    @Before
+    public void storeAmount(){
+        originalAmount = CountingMutator.MAX_SIZE;
+    }
+
+    @After
+    public void setSize(){
+        CountingMutator.MAX_SIZE = originalAmount;
+    }
+
+
+    @Test
+    public void testFlushingMutatorOnConnections() throws Exception {
+
+        //temporarily set our max size to 10 for testing
+        CountingMutator.MAX_SIZE = 10;
+
+        UUID applicationId = setup.createApplication( "testOrganization", "testFlushingMutatorOnConnections" );
+
+        EntityManager em = setup.getEmf().getEntityManager( applicationId );
+
+
+        Map<String, Object> properties = new LinkedHashMap<String, Object>();
+        properties.put( "name", "testuser" );
+        properties.put( "username", "testuser" );
+        properties.put( "email", "test@foo.bar" );
+        Entity created = em.create( "user", properties );
+
+        Entity returned = em.get( created.getUuid() );
+
+
+
+        int writeSize = ( int ) ( CountingMutator.MAX_SIZE*2.5);
+
+        for(int i = 0; i < writeSize; i ++){
+            Map<String, Object> connectedProps = new LinkedHashMap<String, Object>();
+            final UUID uuid = UUIDUtils.newTimeUUID();
+            connectedProps.put( "name", "testuser"+uuid);
+            connectedProps.put( "username", "testuser"+uuid );
+            connectedProps.put( "email", "test"+uuid+"@foo.bar" );
+
+
+            Entity connectedEntity = em.create( "user", connectedProps );
+
+            /*Connect from our new entity to our root one so it's updated when paging
+            /
+             */
+
+                 em.createConnection( connectedEntity, "following", returned );
+        }
+
+        //now verify our connections were created properly
+
+        PagingResultsIterator itr = new PagingResultsIterator(em.getConnectingEntities( returned.getUuid(), "following",
+                "user", Level.ALL_PROPERTIES, 1000 ));
+
+        int count = 0;
+
+        while(itr.hasNext()){
+            itr.next();
+            count++;
+        }
+
+        assertEquals("Correct number of connections created", writeSize, count);
+
+        //now update the props on the entity to update the connections
+
+        properties.put( "email", "test2@foo.bar" );
+        em.updateProperties( returned, properties );
+
+    }
+}
diff --git a/stack/core/src/test/resources/log4j.properties b/stack/core/src/test/resources/log4j.properties
index fab8d7b..147451a 100644
--- a/stack/core/src/test/resources/log4j.properties
+++ b/stack/core/src/test/resources/log4j.properties
@@ -39,6 +39,7 @@
 log4j.logger.me.prettyprint.hector.api.beans.AbstractComposite=ERROR, stdout
 #log4j.logger.org.apache.usergrid.locking.singlenode.SingleNodeLockManagerImpl=DEBUG, stdout
 
+log4j.logger.org.apache.usergrid.persistence.hector.CountingMutator=INFO, stdout
 
 #log4j.logger.org.apache.cassandra.service.StorageProxy=DEBUG, stdout
 
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/AppNameFix.java b/stack/tools/src/main/java/org/apache/usergrid/tools/AppNameFix.java
index 73d8adb..319956f 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/AppNameFix.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/AppNameFix.java
@@ -17,14 +17,20 @@
 package org.apache.usergrid.tools;
 
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.UUID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.cassandra.CassandraService;
 import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.utils.UUIDUtils;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -32,12 +38,24 @@
 import org.apache.commons.cli.Options;
 
 
-/** @author tnine */
+/** @author tnine
+ *
+ * Fixes org/app names when they are corrupted.
+ *
+ *
+ */
 public class AppNameFix extends ToolBase {
 
     private static final Logger logger = LoggerFactory.getLogger( AppNameFix.class );
 
 
+    /**
+     *
+     */
+    private static final String ORGANIZATION_ARG = "org";
+
+
+
     @Override
     @SuppressWarnings("static-access")
     public Options createOptions() {
@@ -46,9 +64,13 @@
                 OptionBuilder.withArgName( "host" ).hasArg().isRequired( true ).withDescription( "Cassandra host" )
                              .create( "host" );
 
+        Option orgOption = OptionBuilder.withArgName( ORGANIZATION_ARG ).hasArg().isRequired( false )
+                                               .withDescription( "organization id or org name" ).create( ORGANIZATION_ARG );
+
 
         Options options = new Options();
         options.addOption( hostOption );
+        options.addOption( orgOption );
 
         return options;
     }
@@ -67,7 +89,10 @@
 
         EntityManager rootEm = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
 
-        for ( Entry<UUID, String> org : managementService.getOrganizations().entrySet() ) {
+        final Map<UUID, String> orgs = getOrgs( line, rootEm );
+
+
+        for ( Entry<UUID, String> org : orgs.entrySet() ) {
 
             for ( Entry<UUID, String> app : managementService.getApplicationsForOrganization( org.getKey() )
                                                              .entrySet() ) {
@@ -107,4 +132,34 @@
             }
         }
     }
+
+    private Map<UUID, String> getOrgs(CommandLine line, EntityManager rootEm) throws Exception {
+
+        String optionValue = line.getOptionValue( ORGANIZATION_ARG ) ;
+
+        if(optionValue == null){
+            return  managementService.getOrganizations();
+        }
+
+
+        UUID id = UUIDUtils.tryExtractUUID(optionValue );
+        OrganizationInfo org;
+
+        if(id != null){
+            org = managementService.getOrganizationByUuid( id );
+        }
+        else{
+            org = managementService.getOrganizationByName( optionValue );
+        }
+
+        if(org == null){
+            throw new NullPointerException( String.format("Org with identifier %s does not exist", optionValue) );
+        }
+
+        Map<UUID, String> entries = new HashMap<UUID, String>();
+        entries.put( org.getUuid(), org.getName() );
+
+        return entries;
+
+    }
 }