blob: f8ef0cd9e5101fe38cfd3efd5ede8ed789f54f02 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.jmap.model.message.view;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.james.jmap.model.Keywords;
import org.apache.james.jmap.utils.KeywordsCombiner;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageResult;
import org.apache.james.mime4j.codec.DecodeMonitor;
import org.apache.james.mime4j.codec.DecoderUtil;
import org.apache.james.mime4j.dom.Header;
import org.apache.james.mime4j.dom.Message;
import org.apache.james.mime4j.message.DefaultMessageBuilder;
import org.apache.james.mime4j.stream.Field;
import org.apache.james.mime4j.stream.MimeConfig;
import org.apache.james.util.ReactorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface MessageViewFactory<T extends MessageView> {
Logger LOGGER = LoggerFactory.getLogger(MessageViewFactory.class);
KeywordsCombiner KEYWORDS_COMBINER = new KeywordsCombiner();
Keywords.KeywordsFactory KEYWORDS_FACTORY = Keywords.lenientFactory();
String JMAP_MULTIVALUED_FIELD_DELIMITER = "\n";
Flux<T> fromMessageIds(List<MessageId> messageIds, MailboxSession mailboxSession);
class Helpers {
interface FromMessageResult<T extends MessageView> {
Mono<T> fromMessageResults(Collection<MessageResult> messageResults);
}
static void assertOneMessageId(Collection<MessageResult> messageResults) {
Preconditions.checkArgument(!messageResults.isEmpty(), "MessageResults cannot be empty");
Preconditions.checkArgument(hasOnlyOneMessageId(messageResults), "MessageResults need to share the same messageId");
}
private static boolean hasOnlyOneMessageId(Collection<MessageResult> messageResults) {
if (messageResults.size() == 1) {
return true;
}
return messageResults
.stream()
.map(MessageResult::getMessageId)
.distinct()
.count() == 1;
}
static Set<MailboxId> getMailboxIds(Collection<MessageResult> messageResults) {
return messageResults.stream()
.map(MessageResult::getMailboxId)
.collect(ImmutableSet.toImmutableSet());
}
static Keywords getKeywords(Collection<MessageResult> messageResults) {
return messageResults.stream()
.map(MessageResult::getFlags)
.map(KEYWORDS_FACTORY::fromFlags)
.reduce(KEYWORDS_COMBINER)
.get();
}
static String getHeaderValue(org.apache.james.mime4j.dom.Message message, String header) {
Field field = message.getHeader().getField(header);
if (field == null) {
return null;
}
return field.getBody();
}
static ImmutableMap<String, String> toHeaderMap(Header header) {
return header.getFieldsAsMap()
.entrySet()
.stream()
.collect(ImmutableMap.toImmutableMap(entry -> entry.getValue().get(0).getName(),
entry -> entry.getValue().stream()
.map(Field::getBody)
.map(body -> DecoderUtil.decodeEncodedWords(body, DecodeMonitor.SILENT))
.collect(Collectors.joining(JMAP_MULTIVALUED_FIELD_DELIMITER))));
}
static <T extends MessageView> Function<Collection<MessageResult>, Mono<T>> toMessageViews(FromMessageResult<T> converter) {
return messageResults ->
converter.fromMessageResults(messageResults)
.doOnEach(ReactorUtils.logOnError(e -> LOGGER.error("Can not convert MessageResults to Message for {}",
messageResults.iterator().next().getMessageId().serialize(),
e)))
.onErrorResume(e -> Mono.empty());
}
static <T extends MessageView> Flux<T> toMessageViews(Flux<MessageResult> messageResults, FromMessageResult<T> converter) {
return messageResults
.groupBy(MessageResult::getMessageId)
.flatMap(Flux::collectList, DEFAULT_CONCURRENCY)
.filter(Predicate.not(List::isEmpty))
.flatMap(toMessageViews(converter), DEFAULT_CONCURRENCY);
}
static Instant getDateFromHeaderOrInternalDateOtherwise(Message mimeMessage, MessageResult message) {
return Optional.ofNullable(mimeMessage.getDate())
.map(Date::toInstant)
.orElse(message.getInternalDate().toInstant());
}
static Message retrieveMessage(MessageFullViewFactory.MetaDataWithContent metaDataWithContent) {
return metaDataWithContent.getMessage()
.orElseGet(Throwing.supplier(() ->
parse(metaDataWithContent.getContent())).sneakyThrow());
}
static Message parse(InputStream messageContent) throws IOException {
DefaultMessageBuilder defaultMessageBuilder = new DefaultMessageBuilder();
defaultMessageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE);
return defaultMessageBuilder.parseMessage(messageContent);
}
}
}