[REFACTORING] Use more SMono.handle in JMAP RFC-8621 implementation
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index c5a71b9..8f11639 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -106,9 +106,7 @@
     for {
       updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session))
         .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId)
-        .flatMap(metaData => {
-          doUpdate(validUpdates, metaData, session)
-        })
+        .flatMap(doUpdate(validUpdates, _, session))
     } yield {
       EmailUpdateResults(updates ++ failures)
     }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index dedc483..0deff91 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -169,12 +169,16 @@
       case Success((messageId, partId)) =>
         Applicable(SMono.fromPublisher(
           messageIdManager.getMessagesReactive(List(messageId).asJava, FetchGroup.FULL_CONTENT, mailboxSession))
-          .flatMap(message => SMono.fromTry(EmailBodyPart.of(messageId, message)))
-          .flatMap(bodyStructure => SMono.fromTry(bodyStructure.flatten
-              .filter(_.blobId.contains(blobId))
-            .map(Success(_))
-            .headOption
-            .getOrElse(Failure(BlobNotFoundException(blobId)))))
+          .handle[EmailBodyPart] {
+            case (message, sink) => EmailBodyPart.of(messageId, message)
+              .fold(sink.error, sink.next)
+          }
+          .handle[EmailBodyPart] {
+            case (bodyStructure, sink) =>
+              bodyStructure.flatten
+                .find(_.blobId.contains(blobId))
+                .fold(sink.error(BlobNotFoundException(blobId)))(part => sink.next(part))
+          }
           .map[Blob](EmailBodyPartBlob(blobId, _))
           .switchIfEmpty(SMono.raiseError(BlobNotFoundException(blobId))))
     }
@@ -239,8 +243,9 @@
       .`then`
 
   private def get(request: HttpServerRequest, response: HttpServerResponse, mailboxSession: MailboxSession): SMono[Unit] = {
-    SMono.fromTry(BlobId.of(request.param(blobIdParam)))
-      .flatMap(blobResolvers.resolve(_, mailboxSession))
+    BlobId.of(request.param(blobIdParam))
+      .fold(e => SMono.raiseError(e),
+        blobResolvers.resolve(_, mailboxSession))
       .flatMap(blob => downloadBlob(
         optionalName = queryParam(request, nameParam),
         response = response,
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
index c9b5618..e3c3b7e 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
@@ -93,18 +93,20 @@
       .asJava()
       .`then`()
 
-  private def requestAsJsonStream(httpServerRequest: HttpServerRequest): SMono[RequestObject] = {
+  private def requestAsJsonStream(httpServerRequest: HttpServerRequest): SMono[RequestObject] =
     SMono.fromPublisher(httpServerRequest
       .receive()
       .aggregate()
       .asInputStream())
-      .flatMap(this.parseRequestObject)
-  }
+      .handle[RequestObject] {
+        case (input, sink) => parseRequestObject(input)
+          .fold(sink.error, sink.next)
+      }
 
-  private def parseRequestObject(inputStream: InputStream): SMono[RequestObject] =
+  private def parseRequestObject(inputStream: InputStream): Either[IllegalArgumentException, RequestObject] =
     ResponseSerializer.deserializeRequestObject(inputStream) match {
-      case JsSuccess(requestObject, _) => SMono.just(requestObject)
-      case errors: JsError => SMono.raiseError(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString()))
+      case JsSuccess(requestObject, _) => Right(requestObject)
+      case errors: JsError => Left(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString()))
     }
 
   private def process(requestObject: RequestObject,
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
index 81f5cc2..e794665 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
@@ -37,7 +37,7 @@
 import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
 import org.slf4j.LoggerFactory
 import play.api.libs.json.Json
-import reactor.core.publisher.Mono
+import reactor.core.publisher.{Mono, SynchronousSink}
 import reactor.core.scala.publisher.SMono
 import reactor.core.scheduler.Schedulers
 import reactor.netty.http.server.HttpServerResponse
@@ -54,7 +54,10 @@
   private val generateSession: JMAPRoute.Action =
     (request, response) => SMono.fromPublisher(authenticator.authenticate(request))
       .map(_.getUser)
-      .flatMap(username => sessionSupplier.generate(username).fold(SMono.raiseError[Session], SMono.just[Session]))
+      .handle[Session] {
+        case (username, sink) =>  sessionSupplier.generate(username)
+          .fold(sink.error, session => sink.next(session))
+      }
       .flatMap(session => sendRespond(session, response))
       .onErrorResume(throwable => SMono.fromPublisher(errorHandling(throwable, response)))
       .subscribeOn(Schedulers.elastic())