blob: 9cecbddb36ff34397e14dddd89be5193150dbc77 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.queue.rabbitmq.view.cassandra;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import java.time.Clock;
import java.time.Instant;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import javax.inject.Inject;
import org.apache.james.queue.rabbitmq.EnqueueId;
import org.apache.james.queue.rabbitmq.MailQueueName;
import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices;
import org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlices.Slice;
import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext.SlicingContext;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class CassandraMailQueueMailDelete {
private final DeletedMailsDAO deletedMailsDao;
private final BrowseStartDAO browseStartDao;
private final ContentStartDAO contentStartDAO;
private final EnqueuedMailsDAO enqueuedMailsDAO;
private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
private final CassandraMailQueueViewConfiguration configuration;
private final Clock clock;
@Inject
CassandraMailQueueMailDelete(DeletedMailsDAO deletedMailsDao,
BrowseStartDAO browseStartDao,
ContentStartDAO contentStartDAO, EnqueuedMailsDAO enqueuedMailsDAO, CassandraMailQueueBrowser cassandraMailQueueBrowser,
CassandraMailQueueViewConfiguration configuration, Clock clock) {
this.deletedMailsDao = deletedMailsDao;
this.browseStartDao = browseStartDao;
this.contentStartDAO = contentStartDAO;
this.enqueuedMailsDAO = enqueuedMailsDAO;
this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
this.configuration = configuration;
this.clock = clock;
}
Mono<Void> considerDeleted(EnqueueId enqueueId, MailQueueName mailQueueName) {
return deletedMailsDao
.markAsDeleted(mailQueueName, enqueueId)
.doFinally(any -> maybeUpdateBrowseStart(mailQueueName));
}
Mono<Boolean> isDeleted(EnqueueId enqueueId, MailQueueName mailQueueName) {
return deletedMailsDao.isDeleted(mailQueueName, enqueueId);
}
void updateBrowseStart(MailQueueName mailQueueName) {
updateBrowseStartReactive(mailQueueName)
.subscribeOn(Schedulers.parallel())
.subscribe();
}
Mono<Void> updateBrowseStartReactive(MailQueueName mailQueueName) {
return findNewBrowseStart(mailQueueName)
.flatMap(newBrowseStart -> updateNewBrowseStart(mailQueueName, newBrowseStart)
.then(clearContentBeforeBrowse(mailQueueName, newBrowseStart)));
}
private void maybeUpdateBrowseStart(MailQueueName mailQueueName) {
if (shouldUpdateBrowseStart()) {
updateBrowseStart(mailQueueName);
}
}
private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
Instant now = clock.instant();
return browseStartDao.findBrowseStart(mailQueueName)
.filter(browseStart -> browseStart.isBefore(now.minus(configuration.getSliceWindow())))
.flatMap(browseStart -> cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart)
.map(enqueuedItem -> enqueuedItem.getSlicingContext().getTimeRangeStart())
.next()
.filter(newBrowseStart -> newBrowseStart.isAfter(browseStart)));
}
private Mono<Void> updateNewBrowseStart(MailQueueName mailQueueName, Instant newBrowseStartInstant) {
return browseStartDao.updateBrowseStart(mailQueueName, newBrowseStartInstant);
}
private Mono<Void> clearContentBeforeBrowse(MailQueueName mailQueueName, Instant newBrowseStartInstant) {
return contentStartDAO.findContentStart(mailQueueName)
.flatMapIterable(contentStart ->
Slice.of(contentStart).allSlicesTill(newBrowseStartInstant, configuration.getSliceWindow())
.filter(slice -> slice.getStartSliceInstant().isBefore(newBrowseStartInstant))
.flatMap(slice -> IntStream.range(0, configuration.getBucketCount()).boxed()
.map(bucket -> SlicingContext.of(BucketedSlices.BucketId.of(bucket), slice.getStartSliceInstant())))
.collect(ImmutableList.toImmutableList()))
.concatMap(slice -> deleteEmailsFromBrowseProjection(mailQueueName, slice))
.concatMap(slice -> enqueuedMailsDAO.deleteBucket(mailQueueName, Slice.of(slice.getTimeRangeStart()), slice.getBucketId()))
.then(contentStartDAO.updateContentStart(mailQueueName, newBrowseStartInstant));
}
private Mono<SlicingContext> deleteEmailsFromBrowseProjection(MailQueueName mailQueueName, SlicingContext slicingContext) {
return enqueuedMailsDAO.selectEnqueuedMails(mailQueueName, Slice.of(slicingContext.getTimeRangeStart()), slicingContext.getBucketId())
.flatMap(item -> deletedMailsDao.removeDeletedMark(mailQueueName, item.getEnqueuedItem().getEnqueueId())
.then(Mono.fromRunnable(item::dispose).subscribeOn(Schedulers.boundedElastic())), DEFAULT_CONCURRENCY)
.then()
.thenReturn(slicingContext);
}
private boolean shouldUpdateBrowseStart() {
int threshold = configuration.getUpdateBrowseStartPace();
return ThreadLocalRandom.current().nextInt(threshold) % threshold == 0;
}
}