JAMES-2096 Limit the number of simultaneusly fetched messages during migration
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index ba4802e..1c73063 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -97,7 +97,7 @@
     private final PreparedStatement selectHeaders;
     private final PreparedStatement selectFields;
     private final PreparedStatement selectBody;
-    private final PreparedStatement selectAll;
+    private final PreparedStatement selectByBatch;
     private CassandraUtils cassandraUtils;
     private final CassandraConfiguration cassandraConfiguration;
 
@@ -113,7 +113,7 @@
         this.selectFields = prepareSelect(session, FIELDS);
         this.selectBody = prepareSelect(session, BODY);
         this.cassandraConfiguration = cassandraConfiguration;
-        this.selectAll = prepareSelectAll(session);
+        this.selectByBatch = prepareSelectBatch(session, cassandraConfiguration);
         this.cassandraUtils = cassandraUtils;
     }
 
@@ -122,8 +122,9 @@
         this(session, typesProvider, CassandraConfiguration.DEFAULT_CONFIGURATION, CassandraUtils.WITH_DEFAULT_CONFIGURATION);
     }
 
-    private PreparedStatement prepareSelectAll(Session session) {
-        return session.prepare(select().from(TABLE_NAME));
+    private PreparedStatement prepareSelectBatch(Session session, CassandraConfiguration cassandraConfiguration) {
+        return session.prepare(select().from(TABLE_NAME)
+            .limit(cassandraConfiguration.getFetchNextPageInAdvanceRow()));
     }
 
     private PreparedStatement prepareSelect(Session session, String[] fields) {
@@ -152,11 +153,13 @@
                 .where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
     }
 
-    public Stream<RawMessage> readAll() {
+    public List<RawMessage> readBatch() {
         return cassandraUtils.convertToStream(
-            cassandraAsyncExecutor.execute(selectAll.bind().setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
+            cassandraAsyncExecutor.execute(selectByBatch.bind()
+                .setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
                 .join())
-            .map(this::fromRow);
+            .map(this::fromRow)
+            .collect(Guavate.toImmutableList());
     }
 
     public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index 3d50f84..db87065 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -116,7 +116,20 @@
 
     @Override
     public MigrationResult run() {
-        return messageDAOV1.readAll()
+        boolean allResultFetched = false;
+        MigrationResult result = MigrationResult.COMPLETED;
+
+        while (!allResultFetched) {
+            List<CassandraMessageDAO.RawMessage> batch = messageDAOV1.readBatch();
+            allResultFetched = batch.isEmpty();
+            result = Migration.combine(result, migrateBatch(batch));
+        }
+        return result;
+    }
+
+    private MigrationResult migrateBatch(List<CassandraMessageDAO.RawMessage> batch) {
+        return batch
+            .stream()
             .map(this::migrate)
             .reduce(MigrationResult.COMPLETED, Migration::combine);
     }