blob: 8899d0ca65c25904e3f4bdfc5a82788ac591034a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.common.rest;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager;
import org.apache.servicecomb.common.rest.definition.RestOperationMeta;
import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
import org.apache.servicecomb.common.rest.filter.HttpServerFilterBeforeSendResponseExecutor;
import org.apache.servicecomb.common.rest.filter.inner.RestServerCodecFilter;
import org.apache.servicecomb.common.rest.locator.OperationLocator;
import org.apache.servicecomb.common.rest.locator.ServicePathManager;
import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.MicroserviceMeta;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.foundation.common.Holder;
import org.apache.servicecomb.foundation.common.utils.JsonUtils;
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractRestInvocation {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRestInvocation.class);
public static final String UNKNOWN_OPERATION_ID = "UNKNOWN_OPERATION";
protected long start;
protected RestOperationMeta restOperationMeta;
protected Invocation invocation;
protected HttpServletRequestEx requestEx;
protected HttpServletResponseEx responseEx;
protected ProduceProcessor produceProcessor;
protected List<HttpServerFilter> httpServerFilters = Collections.emptyList();
public AbstractRestInvocation() {
this.start = System.nanoTime();
}
public void setHttpServerFilters(List<HttpServerFilter> httpServerFilters) {
this.httpServerFilters = httpServerFilters;
}
protected void findRestOperation(MicroserviceMeta microserviceMeta) {
ServicePathManager servicePathManager = ServicePathManager.getServicePathManager(microserviceMeta);
if (servicePathManager == null) {
LOGGER.error("No schema defined for {}:{}.", microserviceMeta.getAppId(), microserviceMeta.getMicroserviceName());
throw new InvocationException(Status.NOT_FOUND, Status.NOT_FOUND.getReasonPhrase());
}
OperationLocator locator = locateOperation(servicePathManager);
requestEx.setAttribute(RestConst.PATH_PARAMETERS, locator.getPathVarMap());
this.restOperationMeta = locator.getOperation();
}
protected void initProduceProcessor() {
produceProcessor = restOperationMeta.ensureFindProduceProcessor(requestEx);
if (produceProcessor == null) {
LOGGER.error("Accept {} is not supported, operation={}.", requestEx.getHeader(HttpHeaders.ACCEPT),
restOperationMeta.getOperationMeta().getMicroserviceQualifiedName());
String msg = String.format("Accept %s is not supported", requestEx.getHeader(HttpHeaders.ACCEPT));
throw new InvocationException(Status.NOT_ACCEPTABLE, msg);
}
}
protected void setContext() throws Exception {
String strCseContext = requestEx.getHeader(Const.CSE_CONTEXT);
if (StringUtils.isEmpty(strCseContext)) {
return;
}
@SuppressWarnings("unchecked")
Map<String, String> cseContext =
JsonUtils.readValue(strCseContext.getBytes(StandardCharsets.UTF_8), Map.class);
invocation.mergeContext(cseContext);
}
public String getContext(String key) {
if (null == invocation || null == invocation.getContext()) {
return null;
}
return invocation.getContext(key);
}
protected void scheduleInvocation() {
try {
createInvocation();
} catch (Throwable e) {
sendFailResponse(e);
return;
}
try {
this.setContext();
} catch (Exception e) {
LOGGER.error("failed to set invocation context", e);
sendFailResponse(e);
return;
}
invocation.onStart(requestEx, start);
invocation.getInvocationStageTrace().startSchedule();
OperationMeta operationMeta = restOperationMeta.getOperationMeta();
Holder<Boolean> qpsFlowControlReject = checkQpsFlowControl(operationMeta);
if (qpsFlowControlReject.value) {
return;
}
try {
operationMeta.getExecutor().execute(() -> {
synchronized (this.requestEx) {
try {
if (isInQueueTimeout()) {
throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "Timeout when processing the request.");
}
if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
// already timeout
// in this time, request maybe recycled and reused by web container, do not use requestEx
LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
operationMeta.getHttpMethod(),
operationMeta.getMicroserviceQualifiedName());
return;
}
runOnExecutor();
} catch (InvocationException e) {
LOGGER.error("Invocation failed, cause={}", e.getMessage());
sendFailResponse(e);
} catch (Throwable e) {
LOGGER.error("Processing rest server request error", e);
sendFailResponse(e);
}
}
});
} catch (Throwable e) {
LOGGER.error("failed to schedule invocation, message={}, executor={}.", e.getMessage(), e.getClass().getName());
sendFailResponse(e);
}
}
private Holder<Boolean> checkQpsFlowControl(OperationMeta operationMeta) {
Holder<Boolean> qpsFlowControlReject = new Holder<>(false);
@SuppressWarnings("deprecation")
Handler providerQpsFlowControlHandler = operationMeta.getProviderQpsFlowControlHandler();
if (null != providerQpsFlowControlHandler) {
try {
providerQpsFlowControlHandler.handle(invocation, response -> {
qpsFlowControlReject.value = true;
produceProcessor = ProduceProcessorManager.INSTANCE.findDefaultJsonProcessor();
sendResponse(response);
});
} catch (Throwable e) {
LOGGER.error("failed to execute ProviderQpsFlowControlHandler", e);
qpsFlowControlReject.value = true;
sendFailResponse(e);
}
}
return qpsFlowControlReject;
}
private boolean isInQueueTimeout() {
return System.nanoTime() - invocation.getInvocationStageTrace().getStart() >
invocation.getOperationMeta().getConfig().getNanoRestRequestWaitInPoolTimeout();
}
protected void runOnExecutor() {
invocation.onExecuteStart();
invoke();
}
protected abstract OperationLocator locateOperation(ServicePathManager servicePathManager);
// create a invocation without args setted
protected abstract void createInvocation();
public void invoke() {
try {
Response response = prepareInvoke();
if (response != null) {
sendResponseQuietly(response);
return;
}
doInvoke();
} catch (InvocationException e) {
LOGGER.error("Invocation failed, cause={}", e.getMessage());
sendFailResponse(e);
} catch (Throwable e) {
LOGGER.error("Processing rest server request error", e);
sendFailResponse(e);
}
}
protected Response prepareInvoke() throws Throwable {
this.initProduceProcessor();
invocation.getHandlerContext().put(RestConst.REST_REQUEST, requestEx);
invocation.getInvocationStageTrace().startServerFiltersRequest();
for (HttpServerFilter filter : httpServerFilters) {
if (filter.enabled()) {
Response response = filter.afterReceiveRequest(invocation, requestEx);
if (response != null) {
return response;
}
}
}
return null;
}
protected void doInvoke() throws Throwable {
invocation.onStartHandlersRequest();
invocation.next(this::sendResponseQuietly);
}
public void sendFailResponse(Throwable throwable) {
if (produceProcessor == null) {
produceProcessor = ProduceProcessorManager.INSTANCE.findDefaultProcessor();
}
Response response = Response.createProducerFail(throwable);
sendResponseQuietly(response);
}
protected void sendResponseQuietly(Response response) {
if (invocation != null) {
invocation.getInvocationStageTrace().finishHandlersResponse();
}
try {
sendResponse(response);
} catch (Throwable e) {
LOGGER.error("Failed to send rest response, operation:{}, request uri:{}",
getMicroserviceQualifiedName(), requestEx.getRequestURI(), e);
}
}
@SuppressWarnings("deprecation")
protected void sendResponse(Response response) {
RestServerCodecFilter.copyHeadersToHttpResponse(response.getHeaders(), responseEx);
responseEx.setStatus(response.getStatusCode(), response.getReasonPhrase());
responseEx.setAttribute(RestConst.INVOCATION_HANDLER_RESPONSE, response);
responseEx.setAttribute(RestConst.INVOCATION_HANDLER_PROCESSOR, produceProcessor);
executeHttpServerFilters(response);
}
protected void executeHttpServerFilters(Response response) {
HttpServerFilterBeforeSendResponseExecutor exec =
new HttpServerFilterBeforeSendResponseExecutor(httpServerFilters, invocation, responseEx);
CompletableFuture<Void> future = exec.run();
future.whenComplete((v, e) -> {
if (invocation != null) {
invocation.getInvocationStageTrace().finishServerFiltersResponse();
}
onExecuteHttpServerFiltersFinish(response, e);
});
}
protected void onExecuteHttpServerFiltersFinish(Response response, Throwable e) {
if (e != null) {
LOGGER.error("Failed to execute HttpServerFilters, operation:{}, request uri:{}",
getMicroserviceQualifiedName(), requestEx.getRequestURI(), e);
}
try {
responseEx.flushBuffer();
} catch (Throwable flushException) {
LOGGER.error("Failed to flush rest response, operation:{}, request uri:{}",
getMicroserviceQualifiedName(), requestEx.getRequestURI(), flushException);
}
try {
requestEx.getAsyncContext().complete();
} catch (Throwable completeException) {
LOGGER.error("Failed to complete async rest response, operation:{}, request uri:{}",
getMicroserviceQualifiedName(), requestEx.getRequestURI(), completeException);
}
// if failed to locate path, then will not create invocation
// TODO: statistics this case
if (invocation != null) {
invocation.onFinish(response);
}
}
private String getMicroserviceQualifiedName() {
return null == invocation ? UNKNOWN_OPERATION_ID : invocation.getMicroserviceQualifiedName();
}
}