JAMES-2096 Limit the number of simultaneusly fetched messages during migration
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index ba4802e..1c73063 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -97,7 +97,7 @@
private final PreparedStatement selectHeaders;
private final PreparedStatement selectFields;
private final PreparedStatement selectBody;
- private final PreparedStatement selectAll;
+ private final PreparedStatement selectByBatch;
private CassandraUtils cassandraUtils;
private final CassandraConfiguration cassandraConfiguration;
@@ -113,7 +113,7 @@
this.selectFields = prepareSelect(session, FIELDS);
this.selectBody = prepareSelect(session, BODY);
this.cassandraConfiguration = cassandraConfiguration;
- this.selectAll = prepareSelectAll(session);
+ this.selectByBatch = prepareSelectBatch(session, cassandraConfiguration);
this.cassandraUtils = cassandraUtils;
}
@@ -122,8 +122,9 @@
this(session, typesProvider, CassandraConfiguration.DEFAULT_CONFIGURATION, CassandraUtils.WITH_DEFAULT_CONFIGURATION);
}
- private PreparedStatement prepareSelectAll(Session session) {
- return session.prepare(select().from(TABLE_NAME));
+ private PreparedStatement prepareSelectBatch(Session session, CassandraConfiguration cassandraConfiguration) {
+ return session.prepare(select().from(TABLE_NAME)
+ .limit(cassandraConfiguration.getFetchNextPageInAdvanceRow()));
}
private PreparedStatement prepareSelect(Session session, String[] fields) {
@@ -152,11 +153,13 @@
.where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
}
- public Stream<RawMessage> readAll() {
+ public List<RawMessage> readBatch() {
return cassandraUtils.convertToStream(
- cassandraAsyncExecutor.execute(selectAll.bind().setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
+ cassandraAsyncExecutor.execute(selectByBatch.bind()
+ .setFetchSize(cassandraConfiguration.getV1ReadFetchSize()))
.join())
- .map(this::fromRow);
+ .map(this::fromRow)
+ .collect(Guavate.toImmutableList());
}
public CompletableFuture<Void> save(MailboxMessage message) throws MailboxException {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index 3d50f84..db87065 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -116,7 +116,20 @@
@Override
public MigrationResult run() {
- return messageDAOV1.readAll()
+ boolean allResultFetched = false;
+ MigrationResult result = MigrationResult.COMPLETED;
+
+ while (!allResultFetched) {
+ List<CassandraMessageDAO.RawMessage> batch = messageDAOV1.readBatch();
+ allResultFetched = batch.isEmpty();
+ result = Migration.combine(result, migrateBatch(batch));
+ }
+ return result;
+ }
+
+ private MigrationResult migrateBatch(List<CassandraMessageDAO.RawMessage> batch) {
+ return batch
+ .stream()
.map(this::migrate)
.reduce(MigrationResult.COMPLETED, Migration::combine);
}