| /**************************************************************** |
| * Licensed to the Apache Software Foundation (ASF) under one * |
| * or more contributor license agreements. See the NOTICE file * |
| * distributed with this work for additional information * |
| * regarding copyright ownership. The ASF licenses this file * |
| * to you under the Apache License, Version 2.0 (the * |
| * "License"); you may not use this file except in compliance * |
| * with the License. You may obtain a copy of the License at * |
| * * |
| * http://www.apache.org/licenses/LICENSE-2.0 * |
| * * |
| * Unless required by applicable law or agreed to in writing, * |
| * software distributed under the License is distributed on an * |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * |
| * KIND, either express or implied. See the License for the * |
| * specific language governing permissions and limitations * |
| * under the License. * |
| ****************************************************************/ |
| |
| package org.apache.james.jmap.method |
| |
| import java.util.Date |
| |
| import eu.timepit.refined.auto._ |
| import javax.inject.Inject |
| import org.apache.james.jmap.api.change.EmailChangeRepository |
| import org.apache.james.jmap.api.model.Size.sanitizeSize |
| import org.apache.james.jmap.api.model.{AccountId => JavaAccountId} |
| import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, JAMES_SHARES, JMAP_CORE, JMAP_MAIL} |
| import org.apache.james.jmap.core.Invocation.{Arguments, MethodName} |
| import org.apache.james.jmap.core.SetError.SetErrorDescription |
| import org.apache.james.jmap.core.{ClientId, Id, Invocation, ServerId, SessionTranslator, SetError, UuidState} |
| import org.apache.james.jmap.json.EmailSetSerializer |
| import org.apache.james.jmap.mail.{BlobId, EmailCreationId, EmailCreationResponse, EmailImport, EmailImportRequest, EmailImportResponse, ThreadId, ValidatedEmailImport} |
| import org.apache.james.jmap.method.EmailImportMethod.{ImportFailure, ImportResult, ImportResults, ImportSuccess, ImportWithBlob} |
| import org.apache.james.jmap.routes.{Blob, BlobNotFoundException, BlobResolvers, ProcessingContext, SessionSupplier} |
| import org.apache.james.mailbox.MessageManager.AppendCommand |
| import org.apache.james.mailbox.exception.{MailboxNotFoundException, OverQuotaException} |
| import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageManager} |
| import org.apache.james.metrics.api.MetricFactory |
| import org.apache.james.mime4j.codec.DecodeMonitor |
| import org.apache.james.mime4j.dom.Message |
| import org.apache.james.mime4j.message.DefaultMessageBuilder |
| import org.apache.james.mime4j.stream.MimeConfig |
| import org.apache.james.util.ReactorUtils |
| import org.reactivestreams.Publisher |
| import reactor.core.scala.publisher.{SFlux, SMono} |
| |
| import scala.util.{Try, Using} |
| |
| object EmailImportMethod { |
| case class ImportWithBlob(id: EmailCreationId, request: EmailImport, blob: Blob) |
| case class ImportResults(results: Seq[ImportResult]) { |
| def created: Option[Map[EmailCreationId, EmailCreationResponse]] = |
| Option(results.flatMap{ |
| case result: ImportSuccess => Some((result.clientId, result.response)) |
| case _ => None |
| }.toMap) |
| .filter(_.nonEmpty) |
| |
| def notCreated: Option[Map[EmailCreationId, SetError]] = { |
| Option(results.flatMap{ |
| case failure: ImportFailure => Some((failure.clientId, failure.asMessageSetError)) |
| case _ => None |
| } |
| .toMap) |
| .filter(_.nonEmpty) |
| } |
| } |
| trait ImportResult |
| case class ImportSuccess(clientId: EmailCreationId, response: EmailCreationResponse) extends ImportResult |
| case class ImportFailure(clientId: EmailCreationId, e: Throwable) extends ImportResult { |
| def asMessageSetError: SetError = e match { |
| case e: BlobNotFoundException => SetError.notFound(SetErrorDescription(s"Blob ${e.blobId} could not be found")) |
| case e: MailboxNotFoundException => SetError.notFound(SetErrorDescription("Mailbox " + e.getMessage)) |
| case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(e.getMessage)) |
| case e: OverQuotaException => SetError.overQuota(SetErrorDescription(e.getMessage)) |
| case _ => SetError.serverFail(SetErrorDescription(e.getMessage)) |
| } |
| } |
| } |
| |
| class EmailImportMethod @Inject() (val metricFactory: MetricFactory, |
| val sessionSupplier: SessionSupplier, |
| val sessionTranslator: SessionTranslator, |
| val blobResolvers: BlobResolvers, |
| val serializer: EmailSetSerializer, |
| val mailboxManager: MailboxManager, |
| val emailChangeRepository: EmailChangeRepository) extends MethodRequiringAccountId[EmailImportRequest] { |
| override val methodName: MethodName = MethodName("Email/import") |
| override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE, JMAP_MAIL) |
| |
| override def getRequest(mailboxSession: MailboxSession, invocation: Invocation): Either[Exception, EmailImportRequest] = |
| serializer.deserializeEmailImportRequest(invocation.arguments.value).asEitherRequest |
| |
| override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession, request: EmailImportRequest): Publisher[InvocationWithContext] = |
| for { |
| oldState <- retrieveState(capabilities, mailboxSession) |
| importResults <- importEmails(request, mailboxSession) |
| .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) |
| newState <- retrieveState(capabilities, mailboxSession) |
| } yield { |
| val updatedContext = updateProcessingContext(importResults, invocation.processingContext) |
| val importResponse = EmailImportResponse( |
| accountId = request.accountId, |
| oldState = oldState, |
| newState = newState, |
| created = importResults.created, |
| notCreated = importResults.notCreated) |
| InvocationWithContext( |
| Invocation( |
| methodName = methodName, |
| arguments = Arguments(serializer.serializeEmailImportResponse(importResponse)), |
| methodCallId = invocation.invocation.methodCallId), |
| updatedContext) |
| } |
| |
| def updateProcessingContext(importResults: ImportResults, processingContext: ProcessingContext): ProcessingContext = |
| importResults.created.getOrElse(Map()) |
| .foldLeft(processingContext) { |
| case (context, entry) => |
| Id.validate(entry._2.id.serialize) |
| .fold(_ => context, |
| serverId => context.recordCreatedId(ClientId(entry._1.id), ServerId(serverId))) |
| } |
| |
| private def importEmails(request: EmailImportRequest, mailboxSession: MailboxSession): SMono[ImportResults] = |
| SFlux.fromIterable(request.emails.toList) |
| .flatMap { |
| case creationId -> emailImport => resolveBlob(mailboxSession, creationId, emailImport) |
| } |
| .flatMap { |
| case Right(emailImport) => importEmail(mailboxSession, emailImport) |
| case Left(e) => SMono.just(e) |
| }.collectSeq() |
| .map(ImportResults) |
| |
| private def importEmail(mailboxSession: MailboxSession, emailImport: ImportWithBlob): SMono[ImportResult] = { |
| val either = for { |
| validatedRequest <- emailImport.request.validate |
| message <- SMono.fromTry(asMessage(emailImport.blob)) |
| response <- append(validatedRequest, message, mailboxSession) |
| } yield response |
| |
| either.map(r => ImportSuccess(emailImport.id, r)) |
| .onErrorResume(e => SMono.just(ImportFailure(emailImport.id, e))) |
| } |
| |
| private def resolveBlob(mailboxSession: MailboxSession, creationId: EmailCreationId, emailImport: EmailImport): SMono[Either[ImportFailure, ImportWithBlob]] = |
| blobResolvers.resolve(emailImport.blobId, mailboxSession) |
| .map(blob => Right[ImportFailure, ImportWithBlob](ImportWithBlob(creationId, emailImport, blob))) |
| .onErrorResume(e => SMono.just(Left[ImportFailure, ImportWithBlob](ImportFailure(creationId, e)))) |
| |
| private def asMessage(blob: Blob): Try[Message] = { |
| val defaultMessageBuilder = new DefaultMessageBuilder |
| defaultMessageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE) |
| defaultMessageBuilder.setDecodeMonitor(DecodeMonitor.SILENT) |
| |
| Using(blob.content) {content => defaultMessageBuilder.parseMessage(content)} |
| } |
| |
| private def append(emailImport: ValidatedEmailImport, message: Message, mailboxSession: MailboxSession): SMono[EmailCreationResponse] = |
| SMono(mailboxManager.getMailboxReactive(emailImport.mailboxId, mailboxSession)) |
| .flatMap(mailbox => SMono(mailbox.appendMessageReactive(AppendCommand.builder() |
| .recent() |
| .withFlags(emailImport.keywords.asFlags) |
| .withInternalDate(Date.from(emailImport.receivedAt.asUTC.toInstant)) |
| .build(message), |
| mailboxSession))) |
| .map(asEmailCreationResponse) |
| |
| private def asEmailCreationResponse(appendResult: MessageManager.AppendResult): EmailCreationResponse = { |
| val blobId: Option[BlobId] = BlobId.of(appendResult.getId.getMessageId).toOption |
| val threadId: ThreadId = ThreadId.fromJava(appendResult.getThreadId) |
| EmailCreationResponse(appendResult.getId.getMessageId, blobId, threadId, sanitizeSize(appendResult.getSize)) |
| } |
| |
| private def retrieveState(capabilities: Set[CapabilityIdentifier], mailboxSession: MailboxSession): SMono[UuidState] = |
| if (capabilities.contains(JAMES_SHARES)) { |
| SMono(emailChangeRepository.getLatestStateWithDelegation(JavaAccountId.fromUsername(mailboxSession.getUser))) |
| .map(UuidState.fromJava) |
| } else { |
| SMono(emailChangeRepository.getLatestState(JavaAccountId.fromUsername(mailboxSession.getUser))) |
| .map(UuidState.fromJava) |
| } |
| } |