[REFACTORING] Use more SMono.handle in JMAP RFC-8621 implementation
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index c5a71b9..8f11639 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -106,9 +106,7 @@
for {
updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session))
.collectMultimap(metaData => metaData.getComposedMessageId.getMessageId)
- .flatMap(metaData => {
- doUpdate(validUpdates, metaData, session)
- })
+ .flatMap(doUpdate(validUpdates, _, session))
} yield {
EmailUpdateResults(updates ++ failures)
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index dedc483..0deff91 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -169,12 +169,16 @@
case Success((messageId, partId)) =>
Applicable(SMono.fromPublisher(
messageIdManager.getMessagesReactive(List(messageId).asJava, FetchGroup.FULL_CONTENT, mailboxSession))
- .flatMap(message => SMono.fromTry(EmailBodyPart.of(messageId, message)))
- .flatMap(bodyStructure => SMono.fromTry(bodyStructure.flatten
- .filter(_.blobId.contains(blobId))
- .map(Success(_))
- .headOption
- .getOrElse(Failure(BlobNotFoundException(blobId)))))
+ .handle[EmailBodyPart] {
+ case (message, sink) => EmailBodyPart.of(messageId, message)
+ .fold(sink.error, sink.next)
+ }
+ .handle[EmailBodyPart] {
+ case (bodyStructure, sink) =>
+ bodyStructure.flatten
+ .find(_.blobId.contains(blobId))
+ .fold(sink.error(BlobNotFoundException(blobId)))(part => sink.next(part))
+ }
.map[Blob](EmailBodyPartBlob(blobId, _))
.switchIfEmpty(SMono.raiseError(BlobNotFoundException(blobId))))
}
@@ -239,8 +243,9 @@
.`then`
private def get(request: HttpServerRequest, response: HttpServerResponse, mailboxSession: MailboxSession): SMono[Unit] = {
- SMono.fromTry(BlobId.of(request.param(blobIdParam)))
- .flatMap(blobResolvers.resolve(_, mailboxSession))
+ BlobId.of(request.param(blobIdParam))
+ .fold(e => SMono.raiseError(e),
+ blobResolvers.resolve(_, mailboxSession))
.flatMap(blob => downloadBlob(
optionalName = queryParam(request, nameParam),
response = response,
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
index c9b5618..e3c3b7e 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
@@ -93,18 +93,20 @@
.asJava()
.`then`()
- private def requestAsJsonStream(httpServerRequest: HttpServerRequest): SMono[RequestObject] = {
+ private def requestAsJsonStream(httpServerRequest: HttpServerRequest): SMono[RequestObject] =
SMono.fromPublisher(httpServerRequest
.receive()
.aggregate()
.asInputStream())
- .flatMap(this.parseRequestObject)
- }
+ .handle[RequestObject] {
+ case (input, sink) => parseRequestObject(input)
+ .fold(sink.error, sink.next)
+ }
- private def parseRequestObject(inputStream: InputStream): SMono[RequestObject] =
+ private def parseRequestObject(inputStream: InputStream): Either[IllegalArgumentException, RequestObject] =
ResponseSerializer.deserializeRequestObject(inputStream) match {
- case JsSuccess(requestObject, _) => SMono.just(requestObject)
- case errors: JsError => SMono.raiseError(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString()))
+ case JsSuccess(requestObject, _) => Right(requestObject)
+ case errors: JsError => Left(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString()))
}
private def process(requestObject: RequestObject,
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
index 81f5cc2..e794665 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
@@ -37,7 +37,7 @@
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
import org.slf4j.LoggerFactory
import play.api.libs.json.Json
-import reactor.core.publisher.Mono
+import reactor.core.publisher.{Mono, SynchronousSink}
import reactor.core.scala.publisher.SMono
import reactor.core.scheduler.Schedulers
import reactor.netty.http.server.HttpServerResponse
@@ -54,7 +54,10 @@
private val generateSession: JMAPRoute.Action =
(request, response) => SMono.fromPublisher(authenticator.authenticate(request))
.map(_.getUser)
- .flatMap(username => sessionSupplier.generate(username).fold(SMono.raiseError[Session], SMono.just[Session]))
+ .handle[Session] {
+ case (username, sink) => sessionSupplier.generate(username)
+ .fold(sink.error, session => sink.next(session))
+ }
.flatMap(session => sendRespond(session, response))
.onErrorResume(throwable => SMono.fromPublisher(errorHandling(throwable, response)))
.subscribeOn(Schedulers.elastic())