blob: d8b0676b91c16a056736b46736de767f8f83a47f [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.queue.file;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.security.SecureRandom;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.mail.MessagingException;
import javax.mail.util.SharedFileInputStream;
import org.apache.commons.io.FileUtils;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
import org.apache.james.queue.api.MailQueueName;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.server.core.MimeMessageSource;
import org.apache.james.server.core.MimeMessageWrapper;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Attribute;
import org.apache.mailet.AttributeName;
import org.apache.mailet.AttributeUtils;
import org.apache.mailet.AttributeValue;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* {@link ManageableMailQueue} implementation which use the fs to store {@link Mail}'s
* <p/>
* On create of the {@link FileCacheableMailQueue} the {@link #init()} will get called. This takes care of
* loading the needed meta-data into memory for fast access.
*
* @deprecated FileMailQueue implementation is unmaintained, incomplete and not thread safe
* We recommend using embedded ActiveMQMailQueue implementation instead
*/
@Deprecated
public class FileCacheableMailQueue implements ManageableMailQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(FileCacheableMailQueue.class);
private final Map<String, FileItem> keyMappings = Collections.synchronizedMap(new LinkedHashMap<>());
private final BlockingQueue<String> inmemoryQueue = new LinkedBlockingQueue<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.withClassName(getClass()));
private static final AtomicLong COUNTER = new AtomicLong();
private final String queueDirName;
private final File queueDir;
private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
private final boolean sync;
private static final String MSG_EXTENSION = ".msg";
private static final String OBJECT_EXTENSION = ".obj";
private static final AttributeName NEXT_DELIVERY = AttributeName.of("FileQueueNextDelivery");
private static final int SPLITCOUNT = 10;
private static final SecureRandom RANDOM = new SecureRandom();
private final MailQueueName queueName;
private final Flux<MailQueueItem> flux;
public FileCacheableMailQueue(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, File parentDir, MailQueueName queuename, boolean sync) throws IOException {
this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
this.sync = sync;
this.queueName = queuename;
this.queueDir = new File(parentDir, queueName.asString());
this.queueDirName = queueDir.getAbsolutePath();
init();
this.flux = Mono.defer(this::deQueueOneItem)
.repeat()
.limitRate(1);
}
@Override
public void close() {
//There's no resource to free
}
@Override
public MailQueueName getName() {
return queueName;
}
private void init() throws IOException {
for (int i = 1; i <= SPLITCOUNT; i++) {
File qDir = new File(queueDir, Integer.toString(i));
FileUtils.forceMkdir(qDir);
String[] files = qDir.list((dir, name) -> name.endsWith(OBJECT_EXTENSION));
for (String name : files) {
ObjectInputStream oin = null;
try {
final String msgFileName = name.substring(0, name.length() - OBJECT_EXTENSION.length()) + MSG_EXTENSION;
FileItem item = new FileItem(qDir.getAbsolutePath() + File.separator + name, qDir.getAbsolutePath() + File.separator + msgFileName);
oin = new ObjectInputStream(new FileInputStream(item.getObjectFile()));
Mail mail = (Mail) oin.readObject();
Optional<ZonedDateTime> next = getNextDelivery(mail);
final String key = mail.getName();
keyMappings.put(key, item);
if (!next.isPresent() || next.get().isBefore(ZonedDateTime.now())) {
try {
inmemoryQueue.put(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to init", e);
}
} else {
// Schedule a task which will put the mail in the queue
// for processing after a given delay
long nextDeliveryDelay = ZonedDateTime.now().until(next.get(), ChronoUnit.MILLIS);
scheduler.schedule(() -> {
try {
inmemoryQueue.put(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to init", e);
}
}, nextDeliveryDelay, TimeUnit.MILLISECONDS);
}
} catch (ClassNotFoundException | IOException e) {
LOGGER.error("Unable to load Mail", e);
} finally {
if (oin != null) {
try {
oin.close();
} catch (Exception e) {
// ignore on close
}
}
}
}
}
}
private Optional<ZonedDateTime> getNextDelivery(Mail mail) {
return AttributeUtils
.getValueAndCastFromMail(mail, NEXT_DELIVERY, Long.class)
.map(next -> Instant.ofEpochMilli(next).atZone(ZoneId.systemDefault()));
}
@Override
public void enQueue(Mail mail, Duration delay) throws MailQueueException {
final String key = mail.getName() + "-" + COUNTER.incrementAndGet();
try {
int i = RANDOM.nextInt(SPLITCOUNT) + 1;
String name = queueDirName + "/" + i + "/" + key;
final FileItem item = new FileItem(name + OBJECT_EXTENSION, name + MSG_EXTENSION);
if (!delay.isNegative()) {
mail.setAttribute(new Attribute(NEXT_DELIVERY, AttributeValue.of(computeNextDelivery(delay))));
}
try (FileOutputStream foout = new FileOutputStream(item.getObjectFile());
ObjectOutputStream oout = new ObjectOutputStream(foout)) {
oout.writeObject(mail);
oout.flush();
if (sync) {
foout.getFD().sync();
}
}
try (FileOutputStream out = new FileOutputStream(item.getMessageFile())) {
mail.getMessage().writeTo(out);
out.flush();
if (sync) {
out.getFD().sync();
}
}
keyMappings.put(key, item);
if (!delay.isNegative()) {
// The message should get delayed so schedule it for later
scheduler.schedule(() -> {
try {
inmemoryQueue.put(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to init", e);
}
}, delay.getSeconds(), TimeUnit.SECONDS);
} else {
inmemoryQueue.put(key);
}
//TODO: Think about exception handling in detail
} catch (IOException | MessagingException | InterruptedException e) {
throw new MailQueueException("Unable to enqueue mail", e);
}
}
private long computeNextDelivery(Duration delay) {
try {
return Instant.now().plus(delay).getEpochSecond();
} catch (ArithmeticException e) {
return Long.MAX_VALUE;
}
}
@Override
public void enQueue(Mail mail) throws MailQueueException {
enQueue(mail, 0, TimeUnit.MILLISECONDS);
}
@Override
public Flux<MailQueueItem> deQueue() {
return flux;
}
private Mono<MailQueueItem> deQueueOneItem() {
try {
FileItem item = null;
String k = null;
while (item == null) {
k = inmemoryQueue.take();
item = keyMappings.get(k);
}
final String key = k;
final FileItem fitem = item;
try {
final File objectFile = new File(fitem.getObjectFile());
final File msgFile = new File(fitem.getMessageFile());
try (ObjectInputStream oin = new ObjectInputStream(new FileInputStream(objectFile))) {
final Mail mail = (Mail) oin.readObject();
mail.setMessage(new MimeMessageWrapper(new FileMimeMessageSource(msgFile)));
MailQueueItem fileMailQueueItem = new MailQueueItem() {
@Override
public Mail getMail() {
return mail;
}
@Override
public void done(boolean success) throws MailQueueException {
if (!success) {
try {
inmemoryQueue.put(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MailQueueException("Unable to rollback", e);
}
} else {
fitem.delete();
keyMappings.remove(key);
}
LifecycleUtil.dispose(mail);
}
};
return Mono.just(mailQueueItemDecoratorFactory.decorate(fileMailQueueItem, queueName));
}
// TODO: Think about exception handling in detail
} catch (IOException | ClassNotFoundException | MessagingException e) {
return Mono.error(new MailQueueException("Unable to dequeue", e));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Mono.error(new MailQueueException("Unable to dequeue", e));
}
}
private static final class FileMimeMessageSource extends MimeMessageSource implements Disposable {
private File file;
private final SharedFileInputStream in;
public FileMimeMessageSource(File file) throws IOException {
this.file = file;
this.in = new SharedFileInputStream(file);
}
@Override
public String getSourceId() {
return file.getAbsolutePath();
}
/**
* Get an input stream to retrieve the data stored in the temporary file
*
* @return a <code>BufferedInputStream</code> containing the data
*/
@Override
public InputStream getInputStream() throws IOException {
return in.newStream(0, -1);
}
@Override
public long getMessageSize() throws IOException {
return file.length();
}
@Override
public void dispose() {
try {
in.close();
} catch (IOException e) {
//ignore exception during close
}
file = null;
}
}
/**
* Helper class which is used to reference the path to the object and msg file
*/
private static final class FileItem {
private final String objectfile;
private final String messagefile;
public FileItem(String objectfile, String messagefile) {
this.objectfile = objectfile;
this.messagefile = messagefile;
}
public String getObjectFile() {
return objectfile;
}
public String getMessageFile() {
return messagefile;
}
public void delete() throws MailQueueException {
try {
FileUtils.forceDelete(new File(getObjectFile()));
} catch (IOException e) {
throw new MailQueueException("Unable to delete mail");
}
try {
FileUtils.forceDelete(new File(getMessageFile()));
} catch (IOException e) {
LOGGER.debug("Remove of msg file for mail failed");
}
}
}
@Override
public long getSize() throws MailQueueException {
return keyMappings.size();
}
@Override
public long flush() throws MailQueueException {
Iterator<String> keys = keyMappings.keySet().iterator();
long i = 0;
while (keys.hasNext()) {
String key = keys.next();
if (!inmemoryQueue.contains(key)) {
inmemoryQueue.add(key);
i++;
}
}
return i;
}
@Override
public long clear() throws MailQueueException {
long count = getSize();
keyMappings.values().forEach(Throwing.consumer(FileItem::delete));
keyMappings.clear();
inmemoryQueue.clear();
return count;
}
/**
* TODO: implement me
*/
@Override
public long remove(Type type, String value) throws MailQueueException {
switch (type) {
case Name:
FileItem item = keyMappings.remove(value);
if (item != null) {
item.delete();
return 1;
} else {
return 0;
}
default:
break;
}
throw new MailQueueException("Not supported yet");
}
@Override
public MailQueueIterator browse() throws MailQueueException {
final Iterator<FileItem> items = keyMappings.values().iterator();
return new MailQueueIterator() {
private MailQueueItemView item;
@Override
public void remove() {
throw new UnsupportedOperationException("Read-only");
}
@Override
public MailQueueItemView next() {
if (hasNext()) {
MailQueueItemView itemView = item;
item = null;
return itemView;
}
throw new NoSuchElementException();
}
@Override
public boolean hasNext() {
if (item != null) {
return true;
}
while (items.hasNext()) {
try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(items.next().getObjectFile()))) {
final Mail mail = (Mail) in.readObject();
item = new DefaultMailQueueItemView(mail, getNextDelivery(mail));
return true;
} catch (IOException | ClassNotFoundException e) {
LOGGER.info("Unable to load mail", e);
}
}
return false;
}
@Override
public void close() {
// do nothing
}
};
}
}