/****************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one   *
 * or more contributor license agreements.  See the NOTICE file *
 * distributed with this work for additional information        *
 * regarding copyright ownership.  The ASF licenses this file   *
 * to you under the Apache License, Version 2.0 (the            *
 * "License"); you may not use this file except in compliance   *
 * with the License.  You may obtain a copy of the License at   *
 *                                                              *
 * http://www.apache.org/licenses/LICENSE-2.0                   *
 *                                                              *
 * Unless required by applicable law or agreed to in writing,   *
 * software distributed under the License is distributed on an  *
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
 * KIND, either express or implied.  See the License for the    *
 * specific language governing permissions and limitations      *
 * under the License.                                           *
 ****************************************************************/
package org.apache.james.jmap.routes

import java.io.InputStream
import java.nio.charset.StandardCharsets
import java.util.stream
import java.util.stream.Stream

import com.fasterxml.jackson.core.JsonParseException
import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, INTERNAL_SERVER_ERROR, OK, UNAUTHORIZED}
import io.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}
import javax.inject.{Inject, Named}
import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
import org.apache.james.jmap.JMAPUrls.JMAP
import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
import org.apache.james.jmap.core.Invocation.MethodName
import org.apache.james.jmap.core.ProblemDetails.{notJSONProblem, notRequestProblem, unknownCapabilityProblem}
import org.apache.james.jmap.core.{DefaultCapabilities, ErrorCode, Invocation, MissingCapabilityException, ProblemDetails, RequestObject, ResponseObject}
import org.apache.james.jmap.exceptions.UnauthorizedException
import org.apache.james.jmap.http.rfc8621.InjectionKeys
import org.apache.james.jmap.http.{Authenticator, MailboxesProvisioner, UserProvisioning}
import org.apache.james.jmap.json.ResponseSerializer
import org.apache.james.jmap.method.{InvocationWithContext, Method}
import org.apache.james.jmap.routes.DownloadRoutes.LOGGER
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
import org.apache.james.mailbox.MailboxSession
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.{JsError, JsSuccess}
import reactor.core.publisher.{Flux, Mono}
import reactor.core.scala.publisher.{SFlux, SMono}
import reactor.core.scheduler.Schedulers
import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}

import scala.jdk.CollectionConverters._

object JMAPApiRoutes {
  val LOGGER: Logger = LoggerFactory.getLogger(classOf[JMAPApiRoutes])
}

class JMAPApiRoutes (val authenticator: Authenticator,
                     userProvisioner: UserProvisioning,
                     mailboxesProvisioner: MailboxesProvisioner,
                     methods: Set[Method]) extends JMAPRoutes {

  private val methodsByName: Map[MethodName, Method] = methods.map(method => method.methodName -> method).toMap

  @Inject
  def this(@Named(InjectionKeys.RFC_8621) authenticator: Authenticator,
           userProvisioner: UserProvisioning,
           mailboxesProvisioner: MailboxesProvisioner,
           javaMethods: java.util.Set[Method]) {
    this(authenticator, userProvisioner, mailboxesProvisioner, javaMethods.asScala.toSet)
  }

  override def routes(): stream.Stream[JMAPRoute] = Stream.of(
    JMAPRoute.builder
      .endpoint(new Endpoint(HttpMethod.POST, JMAP))
      .action(this.post)
      .corsHeaders,
    JMAPRoute.builder
      .endpoint(new Endpoint(HttpMethod.OPTIONS, JMAP))
      .action(JMAPRoutes.CORS_CONTROL)
      .corsHeaders())

  private def post(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] =
    SMono(authenticator.authenticate(httpServerRequest))
      .flatMap((mailboxSession: MailboxSession) => SFlux.merge(Seq(
          userProvisioner.provisionUser(mailboxSession),
          mailboxesProvisioner.createMailboxesIfNeeded(mailboxSession)))
        .`then`
        .`then`(this.requestAsJsonStream(httpServerRequest)
          .flatMap(requestObject => this.process(requestObject, httpServerResponse, mailboxSession))))
      .onErrorResume(throwable => handleError(throwable, httpServerResponse))
      .subscribeOn(Schedulers.elastic)
      .asJava()
      .`then`()

  private def requestAsJsonStream(httpServerRequest: HttpServerRequest): SMono[RequestObject] =
    SMono.fromPublisher(httpServerRequest
      .receive()
      .aggregate()
      .asInputStream())
      .handle[RequestObject] {
        case (input, sink) => parseRequestObject(input)
          .fold(sink.error, sink.next)
      }

  private def parseRequestObject(inputStream: InputStream): Either[IllegalArgumentException, RequestObject] =
    ResponseSerializer.deserializeRequestObject(inputStream) match {
      case JsSuccess(requestObject, _) => Right(requestObject)
      case errors: JsError => Left(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString()))
    }

  private def process(requestObject: RequestObject,
                      httpServerResponse: HttpServerResponse,
                      mailboxSession: MailboxSession): SMono[Void] = {
    val processingContext: ProcessingContext = ProcessingContext(Map.empty, Map.empty)
    val unsupportedCapabilities = requestObject.using.toSet -- DefaultCapabilities.SUPPORTED_CAPABILITY_IDENTIFIERS
    val capabilities: Set[CapabilityIdentifier] = requestObject.using.toSet

    if (unsupportedCapabilities.nonEmpty) {
      SMono.raiseError(UnsupportedCapabilitiesException(unsupportedCapabilities))
    } else {
      processSequentiallyAndUpdateContext(requestObject, mailboxSession, processingContext, capabilities)
        .flatMap((invocations : Seq[InvocationWithContext]) =>
          SMono.fromPublisher(httpServerResponse.status(OK)
            .header(CONTENT_TYPE, JSON_CONTENT_TYPE)
            .sendString(
              SMono.fromCallable(() =>
                ResponseSerializer.serialize(ResponseObject(ResponseObject.SESSION_STATE, invocations.map(_.invocation))).toString),
              StandardCharsets.UTF_8)
            .`then`())
        )
    }
  }

  private def processSequentiallyAndUpdateContext(requestObject: RequestObject, mailboxSession: MailboxSession, processingContext: ProcessingContext, capabilities: Set[CapabilityIdentifier]): SMono[Seq[(InvocationWithContext)]] = {
    SFlux.fromIterable(requestObject.methodCalls)
      .foldLeft(List[SFlux[InvocationWithContext]]())((acc, elem) => {
        val lastProcessingContext: SMono[ProcessingContext] = acc.headOption
          .map(last => SMono.fromPublisher(Flux.from(last.map(_.processingContext)).last()))
          .getOrElse(SMono.just(processingContext))
        val invocation: SFlux[InvocationWithContext] = lastProcessingContext.flatMapMany(context => process(capabilities, mailboxSession, InvocationWithContext(elem, context)))
        invocation.cache() :: acc
      })
      .map(_.reverse)
      .flatMap(list => SFlux.fromIterable(list)
        .concatMap(e => e)
        .collectSeq())
  }

  private def process(capabilities: Set[CapabilityIdentifier], mailboxSession: MailboxSession, invocation: InvocationWithContext) : SFlux[InvocationWithContext] =
    SFlux.fromPublisher(
      invocation.processingContext.resolveBackReferences(invocation.invocation) match {
        case Left(e) => SFlux.just[InvocationWithContext](InvocationWithContext(Invocation.error(
          errorCode = ErrorCode.InvalidResultReference,
          description = s"Failed resolving back-reference: ${e.message}",
          methodCallId = invocation.invocation.methodCallId), invocation.processingContext))
        case Right(resolvedInvocation) => processMethodWithMatchName(capabilities, InvocationWithContext(resolvedInvocation, invocation.processingContext), mailboxSession)
          .map(_.recordInvocation)
      })

  private def processMethodWithMatchName(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession): SFlux[InvocationWithContext] =
    methodsByName.get(invocation.invocation.methodName)
      .map(method => validateCapabilities(capabilities, method.requiredCapabilities)
        .fold(e => SFlux.just(InvocationWithContext(Invocation.error(ErrorCode.UnknownMethod, e.description, invocation.invocation.methodCallId), invocation.processingContext)),
          _ => SFlux.fromPublisher(method.process(capabilities, invocation, mailboxSession))))
      .getOrElse(SFlux.just(InvocationWithContext(Invocation.error(ErrorCode.UnknownMethod, invocation.invocation.methodCallId), invocation.processingContext)))
      .onErrorResume(throwable => SMono.just(InvocationWithContext(Invocation.error(ErrorCode.ServerFail, throwable.getMessage, invocation.invocation.methodCallId), invocation.processingContext)))

  private def validateCapabilities(capabilities: Set[CapabilityIdentifier], requiredCapabilities: Set[CapabilityIdentifier]): Either[MissingCapabilityException, Unit] = {
    val missingCapabilities = requiredCapabilities -- capabilities
    if (missingCapabilities.nonEmpty) {
      Left(MissingCapabilityException(s"Missing capability(ies): ${missingCapabilities.mkString(", ")}"))
    } else {
      Right()
    }
  }

  private def handleError(throwable: Throwable, response: HttpServerResponse): SMono[Void] = throwable match {
    case exception: IllegalArgumentException => respondDetails(response,
      notRequestProblem(
        s"The request was successfully parsed as JSON but did not match the type signature of the Request object: ${exception.getMessage}"))

    case e: UnauthorizedException =>
      LOGGER.warn("Unauthorized", e)
      respondDetails(response,
        ProblemDetails(status = UNAUTHORIZED, detail = e.getMessage),
        UNAUTHORIZED)
    case exception: JsonParseException => respondDetails(response,
      notJSONProblem(
        s"The content type of the request was not application/json or the request did not parse as I-JSON: ${exception.getMessage}"))
    case exception: UnsupportedCapabilitiesException => respondDetails(response,
      unknownCapabilityProblem(s"The request used unsupported capabilities: ${exception.capabilities}"))
    case e =>
      LOGGER.error("Unexpected error upon API request", e)
      respondDetails(response,
        ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage),
        INTERNAL_SERVER_ERROR)
  }

  private def respondDetails(httpServerResponse: HttpServerResponse, details: ProblemDetails, statusCode: HttpResponseStatus = BAD_REQUEST): SMono[Void] =
    SMono.fromPublisher(httpServerResponse.status(statusCode)
      .header(CONTENT_TYPE, JSON_CONTENT_TYPE)
      .sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString),
        StandardCharsets.UTF_8)
      .`then`)
}

case class UnsupportedCapabilitiesException(capabilities: Set[CapabilityIdentifier]) extends RuntimeException
