blob: 7b91276f2c9b22f89f12634ec3bfc577d7d14729 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.jmap.method
import eu.timepit.refined.auto._
import javax.inject.Inject
import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, JMAP_CORE, JMAP_MAIL}
import org.apache.james.jmap.core.Invocation.{Arguments, MethodName}
import org.apache.james.jmap.core.{AccountId, Invocation, UuidState}
import org.apache.james.jmap.json.{ResponseSerializer, ThreadSerializer}
import org.apache.james.jmap.mail.{Thread, ThreadGetRequest, ThreadGetResponse, ThreadNotFound, UnparsedThreadId}
import org.apache.james.jmap.routes.SessionSupplier
import org.apache.james.mailbox.model.{ThreadId => JavaThreadId}
import org.apache.james.mailbox.{MailboxManager, MailboxSession}
import org.apache.james.metrics.api.MetricFactory
import play.api.libs.json.{JsError, JsSuccess}
import reactor.core.scala.publisher.{SFlux, SMono}
import scala.util.Try
object ThreadGetResult {
def empty: ThreadGetResult = ThreadGetResult(Set.empty, ThreadNotFound(Set.empty))
def merge(result1: ThreadGetResult, result2: ThreadGetResult): ThreadGetResult = result1.merge(result2)
def found(thread: Thread): ThreadGetResult =
ThreadGetResult(Set(thread), ThreadNotFound(Set.empty))
def notFound(unparsedThreadId: UnparsedThreadId): ThreadGetResult =
ThreadGetResult(Set.empty, ThreadNotFound(Set(unparsedThreadId)))
}
case class ThreadGetResult(threads: Set[Thread], notFound: ThreadNotFound) {
def merge(other: ThreadGetResult): ThreadGetResult =
ThreadGetResult(this.threads ++ other.threads, this.notFound.merge(other.notFound))
def asResponse(accountId: AccountId): ThreadGetResponse =
ThreadGetResponse(
accountId = accountId,
state = UuidState.INSTANCE,
list = threads.toList,
notFound = notFound)
}
class ThreadGetMethod @Inject()(val metricFactory: MetricFactory,
val sessionSupplier: SessionSupplier,
val threadIdFactory: JavaThreadId.Factory,
val mailboxManager: MailboxManager) extends MethodRequiringAccountId[ThreadGetRequest] {
override val methodName: MethodName = MethodName("Thread/get")
override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE, JMAP_MAIL)
override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession, request: ThreadGetRequest): SMono[InvocationWithContext] = {
getThreadResponse(request, mailboxSession)
.reduce(ThreadGetResult.empty)(ThreadGetResult.merge)
.map(threadGetResult => threadGetResult.asResponse(request.accountId))
.map(threadGetResponse => Invocation(
methodName = methodName,
arguments = Arguments(ThreadSerializer.serialize(threadGetResponse)),
methodCallId = invocation.invocation.methodCallId))
.map(InvocationWithContext(_, invocation.processingContext))
}
override def getRequest(mailboxSession: MailboxSession, invocation: Invocation): Either[IllegalArgumentException, ThreadGetRequest] =
ThreadSerializer.deserialize(invocation.arguments.value) match {
case JsSuccess(threadGetRequest, _) => Right(threadGetRequest)
case errors: JsError => Left(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
}
private def getThreadResponse(threadGetRequest: ThreadGetRequest,
mailboxSession: MailboxSession): SFlux[ThreadGetResult] = {
SFlux.fromIterable(threadGetRequest.ids)
.flatMap(unparsedThreadId => {
Try(threadIdFactory.fromString(unparsedThreadId.id.toString()))
.fold(e => SFlux.just(ThreadGetResult.notFound(unparsedThreadId)),
threadId => SFlux.fromPublisher(mailboxManager.getThread(threadId, mailboxSession))
.collectSeq()
.map(seq => Thread(id = unparsedThreadId.id, emailIds = seq.toList))
.map(ThreadGetResult.found)
.onErrorResume((_ => SMono.just(ThreadGetResult.notFound(unparsedThreadId)))))
})
}
}