blob: b71287745fcef6b26f4ecac977bc5e13784f3c30 [file] [log] [blame]
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
package org.apache.james.jmap.http;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE;
import static org.apache.james.jmap.JMAPUrls.JMAP;
import static org.apache.james.jmap.http.LoggingHelper.jmapAuthContext;
import static org.apache.james.jmap.http.LoggingHelper.jmapContext;
import static org.apache.james.util.ReactorUtils.logOnError;
import java.io.IOException;
import java.util.stream.Stream;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.james.jmap.Endpoint;
import org.apache.james.jmap.JMAPRoute;
import org.apache.james.jmap.JMAPRoutes;
import org.apache.james.jmap.draft.exceptions.BadRequestException;
import org.apache.james.jmap.draft.exceptions.InternalErrorException;
import org.apache.james.jmap.draft.methods.RequestHandler;
import org.apache.james.jmap.draft.model.AuthenticatedRequest;
import org.apache.james.jmap.draft.model.InvocationRequest;
import org.apache.james.jmap.exceptions.UnauthorizedException;
import org.apache.james.jmap.model.InvocationResponse;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.metrics.api.MetricFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
public class JMAPApiRoutes implements JMAPRoutes {
public static final Logger LOGGER = LoggerFactory.getLogger(JMAPApiRoutes.class);
private final ObjectMapper objectMapper;
private final RequestHandler requestHandler;
private final MetricFactory metricFactory;
private final Authenticator authenticator;
private final UserProvisioner userProvisioner;
@Inject
public JMAPApiRoutes(RequestHandler requestHandler, MetricFactory metricFactory, @Named(InjectionKeys.DRAFT) Authenticator authenticator, UserProvisioner userProvisioner) {
this.requestHandler = requestHandler;
this.metricFactory = metricFactory;
this.authenticator = authenticator;
this.userProvisioner = userProvisioner;
this.objectMapper = new ObjectMapper();
objectMapper.configure(Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true);
}
@Override
public Stream<JMAPRoute> routes() {
return Stream.of(
JMAPRoute.builder()
.endpoint(Endpoint.ofFixedPath(HttpMethod.POST, JMAP))
.action(this::post)
.corsHeaders(),
JMAPRoute.builder()
.endpoint(Endpoint.ofFixedPath(HttpMethod.OPTIONS, JMAP))
.action(CORS_CONTROL)
.noCorsHeaders()
);
}
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response) {
return authenticator.authenticate(request)
.flatMap(session -> userProvisioner.provisionUser(session)
.then(Mono.from(metricFactory.decoratePublisherWithTimerMetric("JMAP-request",
post(request, response, session))))
.contextWrite(jmapAuthContext(session)))
.onErrorResume(BadRequestException.class, e -> handleBadRequest(response, LOGGER, e))
.onErrorResume(UnauthorizedException.class, e -> handleAuthenticationFailure(response, LOGGER, e))
.doOnEach(logOnError(e -> LOGGER.error("Unexpected error", e)))
.onErrorResume(e -> response.status(INTERNAL_SERVER_ERROR).send())
.contextWrite(jmapContext(request));
}
private Mono<Void> post(HttpServerRequest request, HttpServerResponse response, MailboxSession session) {
Flux<Object[]> responses =
requestAsJsonStream(request)
.map(InvocationRequest::deserialize)
.map(invocationRequest -> AuthenticatedRequest.decorate(invocationRequest, session))
.concatMap(requestHandler::handle)
.map(InvocationResponse::asProtocolSpecification);
return sendResponses(response, responses);
}
private Mono<Void> sendResponses(HttpServerResponse response, Flux<Object[]> responses) {
return responses.collectList()
.map(objects -> {
try {
return objectMapper.writeValueAsBytes(objects);
} catch (JsonProcessingException e) {
throw new InternalErrorException("error serialising JMAP API response json");
}
})
.flatMap(json -> response.status(OK)
.header(CONTENT_TYPE, JSON_CONTENT_TYPE)
.header(CONTENT_LENGTH, Integer.toString(json.length))
.sendByteArray(Mono.just(json))
.then());
}
private Flux<JsonNode[]> requestAsJsonStream(HttpServerRequest req) {
return req.receive().aggregate().asInputStream()
.map(inputStream -> {
try {
return objectMapper.readValue(inputStream, JsonNode[][].class);
} catch (IOException e) {
throw new BadRequestException("Error deserializing JSON", e);
}
})
.flatMapIterable(ImmutableList::copyOf);
}
}