| /**************************************************************** |
| * Licensed to the Apache Software Foundation (ASF) under one * |
| * or more contributor license agreements. See the NOTICE file * |
| * distributed with this work for additional information * |
| * regarding copyright ownership. The ASF licenses this file * |
| * to you under the Apache License, Version 2.0 (the * |
| * "License"); you may not use this file except in compliance * |
| * with the License. You may obtain a copy of the License at * |
| * * |
| * http://www.apache.org/licenses/LICENSE-2.0 * |
| * * |
| * Unless required by applicable law or agreed to in writing, * |
| * software distributed under the License is distributed on an * |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * |
| * KIND, either express or implied. See the License for the * |
| * specific language governing permissions and limitations * |
| * under the License. * |
| ****************************************************************/ |
| package org.apache.james.jmap.routes |
| |
| import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream} |
| import java.nio.charset.StandardCharsets |
| import java.util.stream |
| import java.util.stream.Stream |
| |
| import com.google.common.base.CharMatcher |
| import eu.timepit.refined.numeric.NonNegative |
| import eu.timepit.refined.refineV |
| import io.netty.buffer.Unpooled |
| import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE |
| import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, FORBIDDEN, INTERNAL_SERVER_ERROR, NOT_FOUND, OK, UNAUTHORIZED} |
| import io.netty.handler.codec.http.{HttpMethod, HttpResponseStatus, QueryStringDecoder} |
| import javax.inject.{Inject, Named} |
| import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE |
| import org.apache.james.jmap.core.Id.Id |
| import org.apache.james.jmap.core.{AccountId, Id, ProblemDetails} |
| import org.apache.james.jmap.exceptions.UnauthorizedException |
| import org.apache.james.jmap.http.Authenticator |
| import org.apache.james.jmap.http.rfc8621.InjectionKeys |
| import org.apache.james.jmap.json.ResponseSerializer |
| import org.apache.james.jmap.mail.Email.Size |
| import org.apache.james.jmap.mail.{BlobId, EmailBodyPart, PartId} |
| import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER} |
| import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes} |
| import org.apache.james.mailbox.model.{AttachmentId, AttachmentMetadata, ContentType, FetchGroup, MessageId, MessageResult} |
| import org.apache.james.mailbox.{AttachmentManager, MailboxSession, MessageIdManager} |
| import org.apache.james.mime4j.codec.EncoderUtil |
| import org.apache.james.mime4j.codec.EncoderUtil.Usage |
| import org.apache.james.mime4j.message.DefaultMessageWriter |
| import org.apache.james.util.ReactorUtils |
| import org.slf4j.{Logger, LoggerFactory} |
| import reactor.core.publisher.Mono |
| import reactor.core.scala.publisher.SMono |
| import reactor.core.scheduler.Schedulers |
| import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse} |
| |
| import scala.compat.java8.FunctionConverters._ |
| import scala.jdk.CollectionConverters._ |
| import scala.util.{Failure, Success, Try} |
| |
| object DownloadRoutes { |
| val LOGGER: Logger = LoggerFactory.getLogger(classOf[DownloadRoutes]) |
| |
| val BUFFER_SIZE: Int = 16 * 1024 |
| } |
| |
| sealed trait BlobResolutionResult { |
| def asOption: Option[SMono[Blob]] |
| } |
| case class NonApplicable() extends BlobResolutionResult { |
| override def asOption: Option[SMono[Blob]] = None |
| } |
| case class Applicable(blob: SMono[Blob]) extends BlobResolutionResult { |
| override def asOption: Option[SMono[Blob]] = Some(blob) |
| } |
| |
| trait BlobResolver { |
| def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult |
| } |
| |
| trait Blob { |
| def blobId: BlobId |
| def contentType: ContentType |
| def size: Try[Size] |
| def content: InputStream |
| } |
| |
| case class BlobNotFoundException(blobId: BlobId) extends RuntimeException |
| case class ForbiddenException() extends RuntimeException |
| |
| case class MessageBlob(blobId: BlobId, message: MessageResult) extends Blob { |
| override def contentType: ContentType = new ContentType("message/rfc822") |
| |
| override def size: Try[Size] = refineV[NonNegative](message.getSize) match { |
| case Left(e) => Failure(new IllegalArgumentException(e)) |
| case Right(size) => Success(size) |
| } |
| |
| override def content: InputStream = message.getFullContent.getInputStream |
| } |
| |
| case class AttachmentBlob(attachmentMetadata: AttachmentMetadata, fileContent: InputStream) extends Blob { |
| override def size: Try[Size] = Success(UploadRoutes.sanitizeSize(attachmentMetadata.getSize)) |
| |
| override def contentType: ContentType = attachmentMetadata.getType |
| |
| override def content: InputStream = fileContent |
| |
| override def blobId: BlobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get |
| } |
| |
| case class EmailBodyPartBlob(blobId: BlobId, part: EmailBodyPart) extends Blob { |
| override def size: Try[Size] = Success(part.size) |
| |
| override def contentType: ContentType = new ContentType(part.`type`.value) |
| |
| override def content: InputStream = { |
| val writer = new DefaultMessageWriter |
| val outputStream = new ByteArrayOutputStream() |
| writer.writeBody(part.entity.getBody, outputStream) |
| new ByteArrayInputStream(outputStream.toByteArray) |
| } |
| } |
| |
| class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, |
| val messageIdManager: MessageIdManager) extends BlobResolver { |
| override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = { |
| Try(messageIdFactory.fromString(blobId.value.value)) match { |
| case Failure(_) => NonApplicable() |
| case Success(messageId) => Applicable(SMono.fromPublisher( |
| messageIdManager.getMessagesReactive(List(messageId).asJava, FetchGroup.FULL_CONTENT, mailboxSession)) |
| .map[Blob](MessageBlob(blobId, _)) |
| .switchIfEmpty(SMono.raiseError(BlobNotFoundException(blobId)))) |
| } |
| } |
| } |
| |
| class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager) extends BlobResolver { |
| override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = |
| AttachmentId.from(org.apache.james.mailbox.model.BlobId.fromString(blobId.value.value)) match { |
| case attachmentId: AttachmentId => |
| Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) match { |
| case Success(attachmentMetadata) => Applicable( |
| SMono.fromCallable(() => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession)))) |
| case Failure(_) => Applicable(SMono.raiseError(BlobNotFoundException(blobId))) |
| } |
| |
| case _ => NonApplicable() |
| } |
| } |
| |
| class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, |
| val messageIdManager: MessageIdManager) extends BlobResolver { |
| private def asMessageAndPartId(blobId: BlobId): Try[(MessageId, PartId)] = { |
| blobId.value.value.split("_").toList match { |
| case List(messageIdString, partIdString) => for { |
| messageId <- Try(messageIdFactory.fromString(messageIdString)) |
| partId <- PartId.parse(partIdString) |
| } yield { |
| (messageId, partId) |
| } |
| case _ => Failure(BlobNotFoundException(blobId)) |
| } |
| } |
| |
| override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = { |
| asMessageAndPartId(blobId) match { |
| case Failure(_) => NonApplicable() |
| case Success((messageId, partId)) => |
| Applicable(SMono.fromPublisher( |
| messageIdManager.getMessagesReactive(List(messageId).asJava, FetchGroup.FULL_CONTENT, mailboxSession)) |
| .handle[EmailBodyPart] { |
| case (message, sink) => EmailBodyPart.of(messageId, message) |
| .fold(sink.error, sink.next) |
| } |
| .handle[EmailBodyPart] { |
| case (bodyStructure, sink) => |
| bodyStructure.flatten |
| .find(_.blobId.contains(blobId)) |
| .fold(sink.error(BlobNotFoundException(blobId)))(part => sink.next(part)) |
| } |
| .map[Blob](EmailBodyPartBlob(blobId, _)) |
| .switchIfEmpty(SMono.raiseError(BlobNotFoundException(blobId)))) |
| } |
| } |
| } |
| |
| class BlobResolvers @Inject()(val messageBlobResolver: MessageBlobResolver, |
| val messagePartBlobResolver: MessagePartBlobResolver, |
| val attachmentBlobResolver: AttachmentBlobResolver) { |
| def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[Blob] = |
| messageBlobResolver |
| .resolve(blobId, mailboxSession).asOption |
| .orElse(messagePartBlobResolver.resolve(blobId, mailboxSession).asOption) |
| .orElse(attachmentBlobResolver.resolve(blobId, mailboxSession).asOption) |
| .getOrElse(SMono.raiseError(BlobNotFoundException(blobId))) |
| } |
| |
| class DownloadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator, |
| val blobResolvers: BlobResolvers) extends JMAPRoutes { |
| |
| private val accountIdParam: String = "accountId" |
| private val blobIdParam: String = "blobId" |
| private val nameParam: String = "name" |
| private val contentTypeParam: String = "contentType" |
| private val downloadUri = s"/download/{$accountIdParam}/{$blobIdParam}" |
| |
| override def routes(): stream.Stream[JMAPRoute] = Stream.of( |
| JMAPRoute.builder |
| .endpoint(new Endpoint(HttpMethod.GET, downloadUri)) |
| .action(this.get) |
| .corsHeaders, |
| JMAPRoute.builder |
| .endpoint(new Endpoint(HttpMethod.OPTIONS, downloadUri)) |
| .action(JMAPRoutes.CORS_CONTROL) |
| .noCorsHeaders) |
| |
| private def get(request: HttpServerRequest, response: HttpServerResponse): Mono[Void] = |
| SMono(authenticator.authenticate(request)) |
| .flatMap(mailboxSession => getIfOwner(request, response, mailboxSession)) |
| .onErrorResume { |
| case e: ForbiddenException => |
| respondDetails(response, |
| ProblemDetails(status = FORBIDDEN, detail = "You cannot upload in others accounts"), |
| FORBIDDEN) |
| case e: UnauthorizedException => |
| LOGGER.warn("Unauthorized", e) |
| respondDetails(response, |
| ProblemDetails(status = UNAUTHORIZED, detail = e.getMessage), |
| UNAUTHORIZED) |
| case _: BlobNotFoundException => |
| respondDetails(response, |
| ProblemDetails(status = NOT_FOUND, detail = "The resource could not be found"), |
| NOT_FOUND) |
| case e => |
| LOGGER.error("Unexpected error upon downloads", e) |
| respondDetails(response, |
| ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage), |
| INTERNAL_SERVER_ERROR) |
| } |
| .subscribeOn(Schedulers.elastic) |
| .asJava() |
| .`then` |
| |
| private def get(request: HttpServerRequest, response: HttpServerResponse, mailboxSession: MailboxSession): SMono[Unit] = { |
| BlobId.of(request.param(blobIdParam)) |
| .fold(e => SMono.raiseError(e), |
| blobResolvers.resolve(_, mailboxSession)) |
| .flatMap(blob => downloadBlob( |
| optionalName = queryParam(request, nameParam), |
| response = response, |
| blobContentType = queryParam(request, contentTypeParam) |
| .map(ContentType.of) |
| .getOrElse(blob.contentType), |
| blob = blob) |
| .`then`()) |
| } |
| |
| private def getIfOwner(request: HttpServerRequest, response: HttpServerResponse, mailboxSession: MailboxSession): SMono[Unit] = { |
| Id.validate(request.param(accountIdParam)) match { |
| case Right(id: Id) => { |
| val targetAccountId: AccountId = AccountId(id) |
| AccountId.from(mailboxSession.getUser).map(accountId => accountId.equals(targetAccountId)) |
| .fold[SMono[Unit]]( |
| e => SMono.raiseError(e), |
| value => if (value) { |
| get(request, response, mailboxSession) |
| } else { |
| SMono.raiseError(ForbiddenException()) |
| }) |
| } |
| |
| case Left(throwable: Throwable) => SMono.raiseError(throwable) |
| } |
| } |
| |
| private def downloadBlob(optionalName: Option[String], |
| response: HttpServerResponse, |
| blobContentType: ContentType, |
| blob: Blob): SMono[Unit] = |
| SMono.fromPublisher(Mono.using( |
| () => blob.content, |
| (stream: InputStream) => addContentDispositionHeader(optionalName) |
| .compose(addContentLengthHeader(blob.size)) |
| .apply(response) |
| .header(CONTENT_TYPE, blobContentType.asString) |
| .status(OK) |
| .send(ReactorUtils.toChunks(stream, BUFFER_SIZE) |
| .map(Unpooled.wrappedBuffer(_)) |
| .subscribeOn(Schedulers.elastic)) |
| .`then`, |
| asJavaConsumer[InputStream]((stream: InputStream) => stream.close()))) |
| .`then` |
| |
| private def addContentDispositionHeader(optionalName: Option[String]): HttpServerResponse => HttpServerResponse = |
| resp => optionalName.map(addContentDispositionHeaderRegardingEncoding(_, resp)) |
| .getOrElse(resp) |
| |
| private def addContentLengthHeader(sizeTry: Try[Size]): HttpServerResponse => HttpServerResponse = |
| resp => sizeTry |
| .map(size => resp.header("Content-Length", size.value.toString)) |
| .getOrElse(resp) |
| |
| private def addContentDispositionHeaderRegardingEncoding(name: String, resp: HttpServerResponse): HttpServerResponse = |
| if (CharMatcher.ascii.matchesAllOf(name)) { |
| resp.header("Content-Disposition", "attachment; filename=\"" + name + "\"") |
| } else { |
| resp.header("Content-Disposition", "attachment; filename*=\"" + EncoderUtil.encodeEncodedWord(name, Usage.TEXT_TOKEN) + "\"") |
| } |
| |
| private def queryParam(httpRequest: HttpServerRequest, parameterName: String): Option[String] = |
| queryParam(parameterName, httpRequest.uri) |
| |
| private def queryParam(parameterName: String, uri: String): Option[String] = |
| Option(new QueryStringDecoder(uri).parameters.get(parameterName)) |
| .toList |
| .flatMap(_.asScala) |
| .headOption |
| |
| private def respondDetails(httpServerResponse: HttpServerResponse, details: ProblemDetails, statusCode: HttpResponseStatus = BAD_REQUEST): SMono[Unit] = |
| SMono.fromPublisher(httpServerResponse.status(statusCode) |
| .header(CONTENT_TYPE, JSON_CONTENT_TYPE) |
| .sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString), StandardCharsets.UTF_8) |
| .`then`).`then` |
| } |