blob: e06e4ea91275e008a9c536bd8ce5362059c0ca2a [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.jmap.method
import java.io.InputStream
import java.time.format.{DateTimeFormatter, DateTimeParseException}
import java.time.{Clock, Duration, LocalDateTime, ZoneId, ZonedDateTime}
import cats.implicits.toTraverseOps
import com.google.common.collect.ImmutableMap
import eu.timepit.refined.auto._
import eu.timepit.refined.refineV
import jakarta.mail.Address
import jakarta.mail.Message.RecipientType
import jakarta.mail.internet.{InternetAddress, MimeMessage}
import javax.annotation.PreDestroy
import javax.inject.Inject
import org.apache.commons.lang3.StringUtils
import org.apache.james.core.{MailAddress, Username}
import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, EMAIL_SUBMISSION, JMAP_CORE}
import org.apache.james.jmap.core.Id.{Id, IdConstraint}
import org.apache.james.jmap.core.Invocation.{Arguments, MethodName}
import org.apache.james.jmap.core.SetError.{SetErrorDescription, SetErrorType}
import org.apache.james.jmap.core.{ClientId, Invocation, JmapRfc8621Configuration, Properties, ServerId, SessionTranslator, SetError, SubmissionCapabilityFactory, UTCDate, UuidState}
import org.apache.james.jmap.json.EmailSubmissionSetSerializer
import org.apache.james.jmap.mail.{EmailSubmissionAddress, EmailSubmissionCreationId, EmailSubmissionCreationRequest, EmailSubmissionCreationResponse, EmailSubmissionId, EmailSubmissionSetRequest, EmailSubmissionSetResponse, Envelope, ParameterName, ParameterValue}
import org.apache.james.jmap.method.EmailSubmissionSetMethod.{CreationFailure, CreationResult, CreationResults, CreationSuccess, LOGGER, MAIL_METADATA_USERNAME_ATTRIBUTE, NO_DELAY, VALID_PARAMETER_NAME_SET, formatter}
import org.apache.james.jmap.routes.{ProcessingContext, SessionSupplier}
import org.apache.james.lifecycle.api.{LifecycleUtil, Startable}
import org.apache.james.mailbox.model.{FetchGroup, MessageId, MessageResult}
import org.apache.james.mailbox.{MailboxSession, MessageIdManager}
import org.apache.james.metrics.api.MetricFactory
import org.apache.james.queue.api.MailQueueFactory.SPOOL
import org.apache.james.queue.api.{MailQueue, MailQueueFactory}
import org.apache.james.rrt.api.CanSendFrom
import org.apache.james.server.core.{MailImpl, MimeMessageSource, MimeMessageWrapper}
import org.apache.james.util.AuditTrail
import org.apache.mailet.{Attribute, AttributeName, AttributeValue, Mail}
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json._
import reactor.core.scala.publisher.{SFlux, SMono}
import reactor.core.scheduler.Schedulers
import reactor.util.concurrent.Queues
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
object EmailSubmissionSetMethod {
val MAIL_METADATA_USERNAME_ATTRIBUTE: AttributeName = AttributeName.of("org.apache.james.jmap.send.MailMetaData.username")
val LOGGER: Logger = LoggerFactory.getLogger(classOf[EmailSubmissionSetMethod])
val noRecipients: SetErrorType = "noRecipients"
val forbiddenFrom: SetErrorType = "forbiddenFrom"
val forbiddenMailFrom: SetErrorType = "forbiddenMailFrom"
val formatter = DateTimeFormatter.ISO_INSTANT.withZone(ZoneId.of("Z"))
val VALID_PARAMETER_NAME_SET: Set[ParameterName] = Set(ParameterName.holdFor, ParameterName.holdUntil)
val NO_DELAY: Duration = Duration.ZERO
sealed trait CreationResult {
def emailSubmissionCreationId: EmailSubmissionCreationId
}
case class CreationSuccess(emailSubmissionCreationId: EmailSubmissionCreationId,
emailSubmissionCreationResponse: EmailSubmissionCreationResponse,
messageId: MessageId) extends CreationResult
case class CreationFailure(emailSubmissionCreationId: EmailSubmissionCreationId, exception: Throwable) extends CreationResult {
def asSetError: SetError = exception match {
case e: EmailSubmissionCreationParseException =>
LOGGER.info("Failed to parse EMailSubmission/set create", e)
e.setError
case _: NoRecipientException =>
LOGGER.info("Attempt to send a mail with no recipients")
SetError(EmailSubmissionSetMethod.noRecipients,
SetErrorDescription("Attempt to send a mail with no recipients"), None)
case e: ForbiddenMailFromException =>
LOGGER.warn(s"Attempt to send a mail whose MimeMessage From and Sender fields not allowed for connected user: ${e.from}")
SetError(EmailSubmissionSetMethod.forbiddenMailFrom,
SetErrorDescription(s"Attempt to send a mail whose MimeMessage From and Sender fields not allowed for connected user: ${e.from}"), None)
case e: ForbiddenFromException =>
LOGGER.warn(s"Attempt to send a mail whose envelope From not allowed for connected user: ${e.from}")
SetError(EmailSubmissionSetMethod.forbiddenFrom,
SetErrorDescription(s"Attempt to send a mail whose envelope From not allowed for connected user: ${e.from}"),
Some(Properties("envelope.mailFrom")))
case _: MessageNotFoundException =>
LOGGER.info(" EmailSubmission/set failed as the underlying email could not be found")
SetError(SetError.invalidArgumentValue,
SetErrorDescription("The email to be sent cannot be found"),
Some(Properties("emailId")))
case e: DateTimeParseException =>
LOGGER.info("Failed to parse date time", e)
SetError.invalidArguments(SetErrorDescription(e.getMessage))
case e: IllegalArgumentException =>
LOGGER.info("Illegal argument in EmailSubmission/set", e)
SetError.invalidArguments(SetErrorDescription(e.getMessage))
case e: Exception =>
LOGGER.error("Failed to send an email with EmailSubmission/set", e)
SetError.serverFail(SetErrorDescription(exception.getMessage))
}
}
case class CreationResults(created: Seq[CreationResult]) {
def retrieveCreated: Map[EmailSubmissionCreationId, EmailSubmissionCreationResponse] = created
.flatMap {
case success: CreationSuccess => Some(success.emailSubmissionCreationId, success.emailSubmissionCreationResponse)
case _ => None
}
.toMap
.map(creation => (creation._1, creation._2))
def retrieveErrors: Map[EmailSubmissionCreationId, SetError] = created
.flatMap {
case failure: CreationFailure => Some(failure.emailSubmissionCreationId, failure.asSetError)
case _ => None
}
.toMap
def resolveMessageId(creationId: EmailSubmissionCreationId): Either[IllegalArgumentException, Option[MessageId]] = {
if (creationId.id.startsWith("#")) {
val realId = creationId.id.substring(1)
val validatedId: Either[String, Id] = refineV[IdConstraint](realId)
validatedId
.left.map(s => new IllegalArgumentException(s))
.flatMap(id => retrieveCreationResult(EmailSubmissionCreationId(id))
.map(scala.Right(_))
.getOrElse(Left(new IllegalArgumentException(s"${creationId.id} cannot be referenced in current method call"))))
.map {
case (success: CreationSuccess) => Some(success.messageId)
case (_: CreationFailure) => None
}
} else {
Left(new IllegalArgumentException(s"${creationId.id} cannot be retrieved as storage for EmailSubmission is not yet implemented"))
}
}
private def retrieveCreationResult(creationId: EmailSubmissionCreationId): Option[CreationResult] =
created.find(_.emailSubmissionCreationId.equals(creationId))
}
}
case class EmailSubmissionCreationParseException(setError: SetError) extends Exception
case class NoRecipientException() extends Exception
case class ForbiddenFromException(from: String) extends Exception
case class ForbiddenMailFromException(from: List[String]) extends Exception
case class MessageMimeMessageSource(id: String, message: MessageResult) extends MimeMessageSource {
override def getSourceId: String = id
override def getInputStream: InputStream = message.getFullContent.getInputStream
override def getMessageSize: Long = message.getFullContent.size()
}
class EmailSubmissionSetMethod @Inject()(serializer: EmailSubmissionSetSerializer,
configuration: JmapRfc8621Configuration,
messageIdManager: MessageIdManager,
mailQueueFactory: MailQueueFactory[_ <: MailQueue],
canSendFrom: CanSendFrom,
emailSetMethod: EmailSetMethod,
clock: Clock,
val metricFactory: MetricFactory,
val sessionSupplier: SessionSupplier,
val sessionTranslator: SessionTranslator) extends MethodRequiringAccountId[EmailSubmissionSetRequest] with Startable {
override val methodName: MethodName = MethodName("EmailSubmission/set")
override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE, EMAIL_SUBMISSION)
var queue: MailQueue = _
def init: Unit = queue = mailQueueFactory.createQueue(SPOOL)
@PreDestroy def dispose: Unit =
Try(queue.close())
.recover(e => LOGGER.debug("error closing queue", e))
override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession, request: EmailSubmissionSetRequest): SFlux[InvocationWithContext] =
create(request, mailboxSession, invocation.processingContext)
.flatMapMany(createdResults => {
val explicitInvocation: InvocationWithContext = InvocationWithContext(
invocation = Invocation(
methodName = invocation.invocation.methodName,
arguments = Arguments(serializer.serializeEmailSubmissionSetResponse(EmailSubmissionSetResponse(
accountId = request.accountId,
newState = UuidState.INSTANCE,
created = Some(createdResults._1.retrieveCreated).filter(_.nonEmpty),
notCreated = Some(createdResults._1.retrieveErrors).filter(_.nonEmpty)))
.as[JsObject]),
methodCallId = invocation.invocation.methodCallId),
processingContext = createdResults._2)
val emailSetCall: SMono[InvocationWithContext] = request.implicitEmailSetRequest(createdResults._1.resolveMessageId)
.fold(e => SMono.error(e),
maybeEmailSetRequest => maybeEmailSetRequest.map(emailSetRequest =>
emailSetMethod.doProcess(
capabilities = capabilities,
invocation = invocation,
mailboxSession = mailboxSession,
request = emailSetRequest))
.getOrElse(SMono.empty))
SFlux.concat(SMono.just(explicitInvocation), emailSetCall)
})
override def getRequest(mailboxSession: MailboxSession, invocation: Invocation): Either[Exception, EmailSubmissionSetRequest] =
serializer.deserializeEmailSubmissionSetRequest(invocation.arguments.value)
.asEitherRequest
.flatMap(_.validate(configuration))
private def create(request: EmailSubmissionSetRequest,
session: MailboxSession,
processingContext: ProcessingContext): SMono[(CreationResults, ProcessingContext)] =
SFlux.fromIterable(
request.create
.getOrElse(Map.empty)
.view)
.fold[SMono[(CreationResults, ProcessingContext)]](SMono.just((CreationResults(Nil), processingContext))) {
(acc: SMono[(CreationResults, ProcessingContext)], elem: (EmailSubmissionCreationId, JsObject)) => {
val (emailSubmissionCreationId, jsObject) = elem
acc.flatMap {
case (creationResults, processingContext) =>
createSubmission(session, emailSubmissionCreationId, jsObject, processingContext)
.map {
case (created, updatedProcessingContext) => CreationResults(creationResults.created :+ created) -> updatedProcessingContext
}
.switchIfEmpty(SMono.error(new RuntimeException("I should not be empty")))
}.cache()
}
}
.flatMap(x => x)
private def createSubmission(mailboxSession: MailboxSession,
emailSubmissionCreationId: EmailSubmissionCreationId,
jsObject: JsObject,
processingContext: ProcessingContext): SMono[(CreationResult, ProcessingContext)] =
parseCreate(jsObject)
.fold(e => SMono.error(e), sendEmail(mailboxSession, _))
.map {
case (creationResponse, messageId) =>
CreationSuccess(emailSubmissionCreationId, creationResponse, messageId) ->
recordCreationIdInProcessingContext(emailSubmissionCreationId, processingContext, creationResponse.id)
}
.onErrorResume(e => SMono.just((CreationFailure(emailSubmissionCreationId, e), processingContext)))
private def parseCreate(jsObject: JsObject): Either[EmailSubmissionCreationParseException, EmailSubmissionCreationRequest] =
EmailSubmissionCreationRequest.validateProperties(jsObject)
.flatMap(validJsObject => Json.fromJson(validJsObject)(serializer.emailSubmissionCreationRequestReads) match {
case JsSuccess(creationRequest, _) => Right(creationRequest)
case JsError(errors) => Left(EmailSubmissionCreationParseException(emailSubmissionSetError(errors)))
})
private def emailSubmissionSetError(errors: collection.Seq[(JsPath, collection.Seq[JsonValidationError])]): SetError =
standardError(errors)
private def sendEmail(mailboxSession: MailboxSession,
request: EmailSubmissionCreationRequest): SMono[(EmailSubmissionCreationResponse, MessageId)] =
for {
message <- SFlux(messageIdManager.getMessagesReactive(List(request.emailId).asJava, FetchGroup.FULL_CONTENT, mailboxSession))
.next
.switchIfEmpty(SMono.error(MessageNotFoundException(request.emailId)))
submissionId = EmailSubmissionId.generate
message <- SMono.fromTry(toMimeMessage(submissionId.value, message))
_ <- validateMimeMessages(message)
envelope <- SMono.fromTry(resolveEnvelope(message, request.envelope))
_ <- validate(mailboxSession)(message, envelope)
_ <- SMono.fromTry(validateFromParameters(envelope.mailFrom.parameters))
delay <- SMono.fromTry(retrieveDelay(envelope.mailFrom.parameters))
_ <- SMono.fromTry(validateDelay(delay))
_ <- validateRcptTo(envelope.rcptTo)
mail = {
val mailImpl = MailImpl.builder()
.name(submissionId.value)
.addRecipients(envelope.rcptTo.map(_.email).asJava)
.sender(envelope.mailFrom.email)
.addAttribute(new Attribute(MAIL_METADATA_USERNAME_ATTRIBUTE, AttributeValue.of(mailboxSession.getUser.asString())))
.build()
mailImpl.setMessageNoCopy(message)
mailImpl
}
_ <- enqueue(mail, delay, mailboxSession)
.`then`(SMono.just(submissionId))
sendAt = UTCDate(ZonedDateTime.now(clock).plus(delay))
} yield {
EmailSubmissionCreationResponse(submissionId, sendAt) -> request.emailId
}
private def enqueue(mail: Mail, delay: Duration, mailboxSession: MailboxSession): SMono[Unit] =
(delay match {
case d if d.isNegative || d.isZero => SMono(queue.enqueueReactive(mail))
.doOnSuccess(_ => AuditTrail.entry
.username(() => mailboxSession.getUser.asString())
.protocol("JMAP")
.action("EmailSubmission")
.parameters(() => ImmutableMap.of("mailId", mail.getName,
"mimeMessageId", Option(mail.getMessage)
.flatMap(message => Option(message.getMessageID))
.getOrElse(""),
"sender", mail.getMaybeSender.asString,
"recipients", StringUtils.join(mail.getRecipients),
"loggedInUser", mailboxSession.getLoggedInUser.toScala
.map(_.asString())
.getOrElse("")))
.log("JMAP mail spooled."))
case _ => SMono(queue.enqueueReactive(mail, delay))
.doOnSuccess(_ => AuditTrail.entry
.username(() => mailboxSession.getUser.asString())
.protocol("JMAP")
.action("EmailSubmission")
.parameters(() => ImmutableMap.of("mailId", mail.getName,
"mimeMessageId", Option(mail.getMessage)
.flatMap(message => Option(message.getMessageID))
.getOrElse(""),
"sender", mail.getMaybeSender.asString,
"recipients", StringUtils.join(mail.getRecipients),
"holdFor", delay.toString,
"loggedInUser", mailboxSession.getLoggedInUser.toScala
.map(_.asString())
.getOrElse("")))
.log("JMAP mail spooled."))
}).`then`(SMono.fromCallable(() => LifecycleUtil.dispose(mail)).subscribeOn(Schedulers.boundedElastic()))
private def retrieveDelay(mailParameters: Option[Map[ParameterName, Option[ParameterValue]]]): Try[Duration] =
mailParameters match {
case None => Success(NO_DELAY)
case Some(aMap) if aMap.contains(ParameterName.holdFor) =>
aMap(ParameterName.holdFor).map(paramValue => Try(Duration.ofSeconds(paramValue.value.toLong)))
.getOrElse(Success(NO_DELAY))
case Some(aMap) if aMap.contains(ParameterName.holdUntil) =>
aMap(ParameterName.holdUntil).map(paramValue => Try(Duration.between(LocalDateTime.now(clock), LocalDateTime.parse(paramValue.value, formatter))))
.getOrElse(Success(NO_DELAY))
case _ => Success(NO_DELAY)
}
def validateDelay(delay: Duration): Try[Duration] =
if (delay.getSeconds >= 0 && delay.getSeconds <= SubmissionCapabilityFactory.maximumDelays.getSeconds) {
Success(delay)
} else {
Failure(new IllegalArgumentException("Invalid delayed time!"))
}
def validateMimeMessages(mimeMessage: MimeMessage) : SMono[MimeMessage] = validateMailAddressHeaderMimeMessage(mimeMessage)
private def validateMailAddressHeaderMimeMessage(mimeMessage: MimeMessage): SMono[MimeMessage] =
SFlux.fromIterable(Map("to" -> Option(mimeMessage.getRecipients(RecipientType.TO)).toList.flatten,
"cc" -> Option(mimeMessage.getRecipients(RecipientType.CC)).toList.flatten,
"bcc" -> Option(mimeMessage.getRecipients(RecipientType.BCC)).toList.flatten,
"from" -> Option(mimeMessage.getFrom).toList.flatten,
"sender" -> Option(mimeMessage.getSender).toList,
"replyTo" -> Option(mimeMessage.getReplyTo).toList.flatten))
.doOnNext { case (headerName, addresses) => (headerName, addresses.foreach(address => validateMailAddress(headerName, address))) }
.`then`()
.`then`(SMono.just(mimeMessage))
private def validateMailAddress(headName: String, address: Address): MailAddress =
Try(new MailAddress(asString(address))) match {
case Success(mailAddress) => mailAddress
case Failure(_) => throw new IllegalArgumentException(s"Invalid mail address: $address in $headName header")
}
private def asString(address: Address): String = address match {
case a: InternetAddress => a.getAddress
case _ => address.toString
}
def validateRcptTo(recipients: List[EmailSubmissionAddress]): SMono[List[EmailSubmissionAddress]] =
SFlux.fromIterable(recipients)
.filter(validateRecipient)
.collectSeq()
.flatMap(recipientsList => {
if (recipientsList.length != recipients.length) {
SMono.just(Failure(new IllegalArgumentException("Some recipients have invalid delay parameters")))
} else {
SMono.just(Success(recipientsList.toList))
}
}).handle[List[EmailSubmissionAddress]]((aTry, sink) => {
aTry match {
case Success(recipient) => sink.next(recipient)
case Failure(ex) => sink.error(ex)
}
})
private def validateRecipient(recipient: EmailSubmissionAddress): Boolean =
recipient.parameters.isEmpty || !(recipient.parameters.get.contains(ParameterName.holdFor) || recipient.parameters.get.contains(ParameterName.holdUntil))
def validateFromParameters(mailParameters: Option[Map[ParameterName, Option[ParameterValue]]]): Try[Option[Map[ParameterName, Option[ParameterValue]]]] = {
val keySet: Set[ParameterName] = mailParameters.getOrElse(Map()).keySet
val invalidEntries = keySet -- VALID_PARAMETER_NAME_SET
if (invalidEntries.isEmpty) {
if (invalidFutureReleaseParameter(keySet)) {
Failure(new IllegalArgumentException("Can't specify holdFor and holdUntil simultaneously"))
} else {
Success(mailParameters)
}
} else {
Failure(new IllegalArgumentException("Unsupported parameterName"))
}
}
private def invalidFutureReleaseParameter(keySet: Set[ParameterName]) =
keySet.contains(ParameterName.holdFor) && keySet.contains(ParameterName.holdUntil)
private def toMimeMessage(name: String, message: MessageResult): Try[MimeMessageWrapper] = {
val source = MessageMimeMessageSource(name, message)
// if MimeMessageCopyOnWriteProxy throws an error in the constructor we
// have to manually care disposing our source.
Try(new MimeMessageWrapper(source))
.recover(e => {
LifecycleUtil.dispose(source)
throw e
})
}
private def validate(session: MailboxSession)(mimeMessage: MimeMessage, envelope: Envelope): SMono[MimeMessage] =
SFlux.fromIterable(Option(mimeMessage.getSender).toList ++ Option(mimeMessage.getFrom).toList.flatten)
.map(_.asInstanceOf[InternetAddress].getAddress)
.filterWhen(addressAsString => SMono.fromPublisher(canSendFrom.userCanSendFromReactive(session.getUser, Username.fromMailAddress(new MailAddress(addressAsString))))
.map(Boolean.unbox(_)).map(!_), Queues.SMALL_BUFFER_SIZE)
.collectSeq()
.flatMap(forbiddenMailFrom => {
if (forbiddenMailFrom.nonEmpty) {
SMono.just(Failure(ForbiddenMailFromException(forbiddenMailFrom.toList)))
} else if (envelope.rcptTo.isEmpty) {
SMono.just(Failure(NoRecipientException()))
} else {
SMono.fromPublisher(canSendFrom.userCanSendFromReactive(session.getUser, Username.fromMailAddress(envelope.mailFrom.email)))
.filter(bool => bool.equals(false))
.map(_ => Failure(ForbiddenFromException(envelope.mailFrom.email.asString)))
.switchIfEmpty(SMono.just(Success(mimeMessage)))
}
})
.handle[MimeMessage]((aTry, sink) => {
aTry match {
case Success(mimeMessage) => sink.next(mimeMessage)
case Failure(ex) => sink.error(ex)
}
})
private def resolveEnvelope(mimeMessage: MimeMessage, maybeEnvelope: Option[Envelope]): Try[Envelope] =
maybeEnvelope.map(Success(_)).getOrElse(extractEnvelope(mimeMessage))
private def extractEnvelope(mimeMessage: MimeMessage): Try[Envelope] = {
val to: List[Address] = Option(mimeMessage.getRecipients(RecipientType.TO)).toList.flatten
val cc: List[Address] = Option(mimeMessage.getRecipients(RecipientType.CC)).toList.flatten
val bcc: List[Address] = Option(mimeMessage.getRecipients(RecipientType.BCC)).toList.flatten
for {
mailFrom <- Option(mimeMessage.getFrom).toList.flatten
.headOption
.map(_.asInstanceOf[InternetAddress].getAddress)
.map(s => Try(new MailAddress(s)))
.getOrElse(Failure(new IllegalArgumentException("Implicit envelope detection requires a from field")))
.map(EmailSubmissionAddress(_))
rcptTo <- (to ++ cc ++ bcc)
.map(_.asInstanceOf[InternetAddress].getAddress)
.map(s => Try(new MailAddress(s)))
.sequence
} yield {
Envelope(mailFrom, rcptTo.map(EmailSubmissionAddress(_)))
}
}
private def recordCreationIdInProcessingContext(emailSubmissionCreationId: EmailSubmissionCreationId,
processingContext: ProcessingContext,
emailSubmissionId: EmailSubmissionId): ProcessingContext =
processingContext.recordCreatedId(ClientId(emailSubmissionCreationId.id), ServerId(emailSubmissionId.value))
}