blob: e3c3b7efb1f0866d8dff199e00bb1ac417f0f5ea [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.jmap.routes
import java.io.InputStream
import java.nio.charset.StandardCharsets
import java.util.stream
import java.util.stream.Stream
import com.fasterxml.jackson.core.JsonParseException
import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, INTERNAL_SERVER_ERROR, OK, UNAUTHORIZED}
import io.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}
import javax.inject.{Inject, Named}
import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
import org.apache.james.jmap.JMAPUrls.JMAP
import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
import org.apache.james.jmap.core.Invocation.MethodName
import org.apache.james.jmap.core.ProblemDetails.{notJSONProblem, notRequestProblem, unknownCapabilityProblem}
import org.apache.james.jmap.core.{DefaultCapabilities, ErrorCode, Invocation, MissingCapabilityException, ProblemDetails, RequestObject, ResponseObject}
import org.apache.james.jmap.exceptions.UnauthorizedException
import org.apache.james.jmap.http.rfc8621.InjectionKeys
import org.apache.james.jmap.http.{Authenticator, MailboxesProvisioner, UserProvisioning}
import org.apache.james.jmap.json.ResponseSerializer
import org.apache.james.jmap.method.{InvocationWithContext, Method}
import org.apache.james.jmap.routes.DownloadRoutes.LOGGER
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
import org.apache.james.mailbox.MailboxSession
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.{JsError, JsSuccess}
import reactor.core.publisher.{Flux, Mono}
import reactor.core.scala.publisher.{SFlux, SMono}
import reactor.core.scheduler.Schedulers
import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
import scala.jdk.CollectionConverters._
object JMAPApiRoutes {
val LOGGER: Logger = LoggerFactory.getLogger(classOf[JMAPApiRoutes])
}
class JMAPApiRoutes (val authenticator: Authenticator,
userProvisioner: UserProvisioning,
mailboxesProvisioner: MailboxesProvisioner,
methods: Set[Method]) extends JMAPRoutes {
private val methodsByName: Map[MethodName, Method] = methods.map(method => method.methodName -> method).toMap
@Inject
def this(@Named(InjectionKeys.RFC_8621) authenticator: Authenticator,
userProvisioner: UserProvisioning,
mailboxesProvisioner: MailboxesProvisioner,
javaMethods: java.util.Set[Method]) {
this(authenticator, userProvisioner, mailboxesProvisioner, javaMethods.asScala.toSet)
}
override def routes(): stream.Stream[JMAPRoute] = Stream.of(
JMAPRoute.builder
.endpoint(new Endpoint(HttpMethod.POST, JMAP))
.action(this.post)
.corsHeaders,
JMAPRoute.builder
.endpoint(new Endpoint(HttpMethod.OPTIONS, JMAP))
.action(JMAPRoutes.CORS_CONTROL)
.corsHeaders())
private def post(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] =
SMono(authenticator.authenticate(httpServerRequest))
.flatMap((mailboxSession: MailboxSession) => SFlux.merge(Seq(
userProvisioner.provisionUser(mailboxSession),
mailboxesProvisioner.createMailboxesIfNeeded(mailboxSession)))
.`then`
.`then`(this.requestAsJsonStream(httpServerRequest)
.flatMap(requestObject => this.process(requestObject, httpServerResponse, mailboxSession))))
.onErrorResume(throwable => handleError(throwable, httpServerResponse))
.subscribeOn(Schedulers.elastic)
.asJava()
.`then`()
private def requestAsJsonStream(httpServerRequest: HttpServerRequest): SMono[RequestObject] =
SMono.fromPublisher(httpServerRequest
.receive()
.aggregate()
.asInputStream())
.handle[RequestObject] {
case (input, sink) => parseRequestObject(input)
.fold(sink.error, sink.next)
}
private def parseRequestObject(inputStream: InputStream): Either[IllegalArgumentException, RequestObject] =
ResponseSerializer.deserializeRequestObject(inputStream) match {
case JsSuccess(requestObject, _) => Right(requestObject)
case errors: JsError => Left(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString()))
}
private def process(requestObject: RequestObject,
httpServerResponse: HttpServerResponse,
mailboxSession: MailboxSession): SMono[Void] = {
val processingContext: ProcessingContext = ProcessingContext(Map.empty, Map.empty)
val unsupportedCapabilities = requestObject.using.toSet -- DefaultCapabilities.SUPPORTED_CAPABILITY_IDENTIFIERS
val capabilities: Set[CapabilityIdentifier] = requestObject.using.toSet
if (unsupportedCapabilities.nonEmpty) {
SMono.raiseError(UnsupportedCapabilitiesException(unsupportedCapabilities))
} else {
processSequentiallyAndUpdateContext(requestObject, mailboxSession, processingContext, capabilities)
.flatMap((invocations : Seq[InvocationWithContext]) =>
SMono.fromPublisher(httpServerResponse.status(OK)
.header(CONTENT_TYPE, JSON_CONTENT_TYPE)
.sendString(
SMono.fromCallable(() =>
ResponseSerializer.serialize(ResponseObject(ResponseObject.SESSION_STATE, invocations.map(_.invocation))).toString),
StandardCharsets.UTF_8)
.`then`())
)
}
}
private def processSequentiallyAndUpdateContext(requestObject: RequestObject, mailboxSession: MailboxSession, processingContext: ProcessingContext, capabilities: Set[CapabilityIdentifier]): SMono[Seq[(InvocationWithContext)]] = {
SFlux.fromIterable(requestObject.methodCalls)
.foldLeft(List[SFlux[InvocationWithContext]]())((acc, elem) => {
val lastProcessingContext: SMono[ProcessingContext] = acc.headOption
.map(last => SMono.fromPublisher(Flux.from(last.map(_.processingContext)).last()))
.getOrElse(SMono.just(processingContext))
val invocation: SFlux[InvocationWithContext] = lastProcessingContext.flatMapMany(context => process(capabilities, mailboxSession, InvocationWithContext(elem, context)))
invocation.cache() :: acc
})
.map(_.reverse)
.flatMap(list => SFlux.fromIterable(list)
.concatMap(e => e)
.collectSeq())
}
private def process(capabilities: Set[CapabilityIdentifier], mailboxSession: MailboxSession, invocation: InvocationWithContext) : SFlux[InvocationWithContext] =
SFlux.fromPublisher(
invocation.processingContext.resolveBackReferences(invocation.invocation) match {
case Left(e) => SFlux.just[InvocationWithContext](InvocationWithContext(Invocation.error(
errorCode = ErrorCode.InvalidResultReference,
description = s"Failed resolving back-reference: ${e.message}",
methodCallId = invocation.invocation.methodCallId), invocation.processingContext))
case Right(resolvedInvocation) => processMethodWithMatchName(capabilities, InvocationWithContext(resolvedInvocation, invocation.processingContext), mailboxSession)
.map(_.recordInvocation)
})
private def processMethodWithMatchName(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession): SFlux[InvocationWithContext] =
methodsByName.get(invocation.invocation.methodName)
.map(method => validateCapabilities(capabilities, method.requiredCapabilities)
.fold(e => SFlux.just(InvocationWithContext(Invocation.error(ErrorCode.UnknownMethod, e.description, invocation.invocation.methodCallId), invocation.processingContext)),
_ => SFlux.fromPublisher(method.process(capabilities, invocation, mailboxSession))))
.getOrElse(SFlux.just(InvocationWithContext(Invocation.error(ErrorCode.UnknownMethod, invocation.invocation.methodCallId), invocation.processingContext)))
.onErrorResume(throwable => SMono.just(InvocationWithContext(Invocation.error(ErrorCode.ServerFail, throwable.getMessage, invocation.invocation.methodCallId), invocation.processingContext)))
private def validateCapabilities(capabilities: Set[CapabilityIdentifier], requiredCapabilities: Set[CapabilityIdentifier]): Either[MissingCapabilityException, Unit] = {
val missingCapabilities = requiredCapabilities -- capabilities
if (missingCapabilities.nonEmpty) {
Left(MissingCapabilityException(s"Missing capability(ies): ${missingCapabilities.mkString(", ")}"))
} else {
Right()
}
}
private def handleError(throwable: Throwable, response: HttpServerResponse): SMono[Void] = throwable match {
case exception: IllegalArgumentException => respondDetails(response,
notRequestProblem(
s"The request was successfully parsed as JSON but did not match the type signature of the Request object: ${exception.getMessage}"))
case e: UnauthorizedException =>
LOGGER.warn("Unauthorized", e)
respondDetails(response,
ProblemDetails(status = UNAUTHORIZED, detail = e.getMessage),
UNAUTHORIZED)
case exception: JsonParseException => respondDetails(response,
notJSONProblem(
s"The content type of the request was not application/json or the request did not parse as I-JSON: ${exception.getMessage}"))
case exception: UnsupportedCapabilitiesException => respondDetails(response,
unknownCapabilityProblem(s"The request used unsupported capabilities: ${exception.capabilities}"))
case e =>
LOGGER.error("Unexpected error upon API request", e)
respondDetails(response,
ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage),
INTERNAL_SERVER_ERROR)
}
private def respondDetails(httpServerResponse: HttpServerResponse, details: ProblemDetails, statusCode: HttpResponseStatus = BAD_REQUEST): SMono[Void] =
SMono.fromPublisher(httpServerResponse.status(statusCode)
.header(CONTENT_TYPE, JSON_CONTENT_TYPE)
.sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString),
StandardCharsets.UTF_8)
.`then`)
}
case class UnsupportedCapabilitiesException(capabilities: Set[CapabilityIdentifier]) extends RuntimeException