| /**************************************************************** |
| * Licensed to the Apache Software Foundation (ASF) under one * |
| * or more contributor license agreements. See the NOTICE file * |
| * distributed with this work for additional information * |
| * regarding copyright ownership. The ASF licenses this file * |
| * to you under the Apache License, Version 2.0 (the * |
| * "License"); you may not use this file except in compliance * |
| * with the License. You may obtain a copy of the License at * |
| * * |
| * http://www.apache.org/licenses/LICENSE-2.0 * |
| * * |
| * Unless required by applicable law or agreed to in writing, * |
| * software distributed under the License is distributed on an * |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * |
| * KIND, either express or implied. See the License for the * |
| * specific language governing permissions and limitations * |
| * under the License. * |
| ****************************************************************/ |
| |
| package org.apache.james.jmap.method |
| |
| import java.util.function.Consumer |
| |
| import com.google.common.collect.ImmutableList |
| import javax.inject.Inject |
| import javax.mail.Flags |
| import org.apache.james.jmap.core.SetError |
| import org.apache.james.jmap.core.SetError.SetErrorDescription |
| import org.apache.james.jmap.json.EmailSetSerializer |
| import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId |
| import org.apache.james.jmap.mail.KeywordsFactory.LENIENT_KEYWORDS_FACTORY |
| import org.apache.james.jmap.mail.{EmailSet, EmailSetRequest, MailboxIds, ValidatedEmailSetUpdate} |
| import org.apache.james.jmap.method.EmailSetUpdatePerformer.{EmailUpdateFailure, EmailUpdateResult, EmailUpdateResults, EmailUpdateSuccess} |
| import org.apache.james.mailbox.MessageManager.FlagsUpdateMode |
| import org.apache.james.mailbox.exception.MailboxNotFoundException |
| import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, MailboxId, MessageId, MessageRange} |
| import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager} |
| import play.api.libs.json.JsObject |
| import reactor.core.scala.publisher.{SFlux, SMono} |
| import reactor.core.scheduler.Schedulers |
| |
| import scala.jdk.CollectionConverters._ |
| |
| object EmailSetUpdatePerformer { |
| trait EmailUpdateResult |
| case class EmailUpdateSuccess(messageId: MessageId) extends EmailUpdateResult |
| case class EmailUpdateFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends EmailUpdateResult { |
| def asMessageSetError: SetError = e match { |
| case e: IllegalArgumentException => SetError.invalidPatch(SetErrorDescription(s"Message update is invalid: ${e.getMessage}")) |
| case _: MailboxNotFoundException => SetError.notFound(SetErrorDescription(s"Mailbox not found")) |
| case e: MessageNotFoundException => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}")) |
| case _ => SetError.serverFail(SetErrorDescription(e.getMessage)) |
| } |
| } |
| case class EmailUpdateResults(results: Seq[EmailUpdateResult]) { |
| def updated: Option[Map[MessageId, Unit]] = |
| Option(results.flatMap{ |
| case result: EmailUpdateSuccess => Some(result.messageId, ()) |
| case _ => None |
| }.toMap) |
| .filter(_.nonEmpty) |
| |
| def notUpdated: Option[Map[UnparsedMessageId, SetError]] = |
| Option(results.flatMap{ |
| case failure: EmailUpdateFailure => Some((failure.unparsedMessageId, failure.asMessageSetError)) |
| case _ => None |
| }.toMap) |
| .filter(_.nonEmpty) |
| } |
| } |
| |
| class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer, |
| messageIdManager: MessageIdManager, |
| mailboxManager: MailboxManager, |
| messageIdFactory: MessageId.Factory) { |
| |
| def update(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[EmailUpdateResults] = { |
| emailSetRequest.update |
| .filter(_.nonEmpty) |
| .map(update(_, mailboxSession)) |
| .getOrElse(SMono.just(EmailUpdateResults(Seq()))) |
| } |
| |
| private def update(updates: Map[UnparsedMessageId, JsObject], session: MailboxSession): SMono[EmailUpdateResults] = { |
| val validatedUpdates: List[Either[EmailUpdateFailure, (MessageId, ValidatedEmailSetUpdate)]] = updates |
| .map({ |
| case (unparsedMessageId, json) => EmailSet.parse(messageIdFactory)(unparsedMessageId) |
| .toEither |
| .left.map(e => EmailUpdateFailure(unparsedMessageId, e)) |
| .flatMap(id => serializer.deserializeEmailSetUpdate(json) |
| .asEither.left.map(e => new IllegalArgumentException(e.toString)) |
| .flatMap(_.validate) |
| .fold(e => Left(EmailUpdateFailure(unparsedMessageId, e)), |
| emailSetUpdate => Right((id, emailSetUpdate)))) |
| }) |
| .toList |
| val failures: List[EmailUpdateFailure] = validatedUpdates.flatMap({ |
| case Left(e) => Some(e) |
| case _ => None |
| }) |
| val validUpdates: List[(MessageId, ValidatedEmailSetUpdate)] = validatedUpdates.flatMap({ |
| case Right(pair) => Some(pair) |
| case _ => None |
| }) |
| |
| for { |
| updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session)) |
| .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId) |
| .flatMap(doUpdate(validUpdates, _, session)) |
| } yield { |
| EmailUpdateResults(updates ++ failures) |
| } |
| } |
| |
| private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)], |
| metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], |
| session: MailboxSession): SMono[Seq[EmailUpdateResult]] = { |
| val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1 |
| val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1 |
| |
| if (sameUpdate && singleMailbox && validUpdates.size > 3) { |
| val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get |
| val ranges: List[MessageRange] = asRanges(metaData) |
| val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get |
| |
| if (update.update.isOnlyFlagAddition) { |
| updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session) |
| } else if (update.update.isOnlyFlagRemoval) { |
| updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session) |
| } else if (update.update.isOnlyMove) { |
| moveByRange(mailboxId, update, ranges, metaData, session) |
| } else { |
| updateEachMessage(validUpdates, metaData, session) |
| } |
| } else { |
| updateEachMessage(validUpdates, metaData, session) |
| } |
| } |
| |
| private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) = |
| MessageRange.toRanges(metaData.values |
| .flatten.map(_.getComposedMessageId.getUid) |
| .toList.asJava) |
| .asScala.toList |
| |
| private def updateFlagsByRange(mailboxId: MailboxId, |
| flags: Flags, |
| ranges: List[MessageRange], |
| metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], |
| updateMode: FlagsUpdateMode, |
| session: MailboxSession): SMono[Seq[EmailUpdateResult]] = { |
| val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session)) |
| |
| mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData, |
| range => mailbox.setFlags(flags, updateMode, range, session))) |
| .subscribeOn(Schedulers.elastic()) |
| } |
| |
| private def moveByRange(mailboxId: MailboxId, |
| update: ValidatedEmailSetUpdate, |
| ranges: List[MessageRange], |
| metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], |
| session: MailboxSession): SMono[Seq[EmailUpdateResult]] = { |
| val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get |
| |
| updateByRange(ranges, metaData, |
| range => mailboxManager.moveMessages(range, mailboxId, targetId, session)) |
| } |
| |
| private def updateByRange(ranges: List[MessageRange], |
| metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], |
| operation: Consumer[MessageRange]): SMono[Seq[EmailUpdateResult]] = { |
| |
| SFlux.fromIterable(ranges) |
| .concatMap(range => { |
| val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid))) |
| .keys |
| .toSeq |
| SMono.fromCallable[Seq[EmailUpdateResult]](() => { |
| operation.accept(range) |
| messageIds.map(EmailUpdateSuccess) |
| }) |
| .onErrorResume(e => SMono.just(messageIds.map(id => EmailUpdateFailure(EmailSet.asUnparsed(id), e)))) |
| .subscribeOn(Schedulers.elastic()) |
| }) |
| .reduce(Seq(), _ ++ _) |
| } |
| |
| private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)], |
| metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], |
| session: MailboxSession): SMono[Seq[EmailUpdateResult]] = |
| SFlux.fromIterable(validUpdates) |
| .concatMap[EmailUpdateResult]({ |
| case (messageId, updatePatch) => |
| updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session) |
| }) |
| .collectSeq() |
| |
| private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[EmailUpdateResult] = { |
| val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId)) |
| val originFlags: Flags = storedMetaData |
| .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => { |
| flags.add(m.getFlags) |
| flags |
| }) |
| |
| if (mailboxIds.value.isEmpty) { |
| SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), MessageNotFoundException(messageId))) |
| } else { |
| updateFlags(messageId, update, mailboxIds, originFlags, session) |
| .flatMap { |
| case failure: EmailUpdateFailure => SMono.just[EmailUpdateResult](failure) |
| case _: EmailUpdateSuccess => updateMailboxIds(messageId, update, mailboxIds, session) |
| } |
| .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e))) |
| .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) |
| } |
| } |
| |
| private def updateMailboxIds(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, session: MailboxSession): SMono[EmailUpdateResult] = { |
| val targetIds = update.mailboxIdsTransformation.apply(mailboxIds) |
| if (targetIds.equals(mailboxIds)) { |
| SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)) |
| } else { |
| SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session)) |
| .subscribeOn(Schedulers.elastic()) |
| .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) |
| .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e))) |
| .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) |
| } |
| } |
| |
| private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[EmailUpdateResult] = { |
| val newFlags = update.keywordsTransformation |
| .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get) |
| .asFlagsWithRecentAndDeletedFrom(originalFlags) |
| |
| if (newFlags.equals(originalFlags)) { |
| SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)) |
| } else { |
| SMono.fromCallable(() => |
| messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session)) |
| .subscribeOn(Schedulers.elastic()) |
| .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) |
| } |
| } |
| } |