blob: b1b48c7f39dc9dde27f3f16428595c2110da5040 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.core;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.core.definition.InvocationRuntimeType;
import org.apache.servicecomb.core.definition.MicroserviceMeta;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.definition.SchemaMeta;
import org.apache.servicecomb.core.event.InvocationBusinessFinishEvent;
import org.apache.servicecomb.core.event.InvocationBusinessMethodFinishEvent;
import org.apache.servicecomb.core.event.InvocationBusinessMethodStartEvent;
import org.apache.servicecomb.core.event.InvocationEncodeResponseStartEvent;
import org.apache.servicecomb.core.event.InvocationFinishEvent;
import org.apache.servicecomb.core.event.InvocationHandlersStartEvent;
import org.apache.servicecomb.core.event.InvocationRunInExecutorFinishEvent;
import org.apache.servicecomb.core.event.InvocationRunInExecutorStartEvent;
import org.apache.servicecomb.core.event.InvocationStartEvent;
import org.apache.servicecomb.core.event.InvocationStartSendRequestEvent;
import org.apache.servicecomb.core.event.InvocationTimeoutCheckEvent;
import org.apache.servicecomb.core.invocation.InvocationStageTrace;
import org.apache.servicecomb.core.provider.consumer.InvokerUtils;
import org.apache.servicecomb.core.provider.consumer.ReferenceConfig;
import org.apache.servicecomb.core.tracing.TraceIdGenerator;
import org.apache.servicecomb.core.tracing.TraceIdLogger;
import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.foundation.common.utils.AsyncUtils;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.InvocationType;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.SwaggerInvocation;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import com.fasterxml.jackson.databind.JavaType;
public class Invocation extends SwaggerInvocation {
private static final Collection<TraceIdGenerator> TRACE_ID_GENERATORS = loadTraceIdGenerators();
protected static final AtomicLong INVOCATION_ID = new AtomicLong();
static Collection<TraceIdGenerator> loadTraceIdGenerators() {
return SPIServiceUtils.getPriorityHighestServices(TraceIdGenerator::getName, TraceIdGenerator.class);
}
protected ReferenceConfig referenceConfig;
private InvocationRuntimeType invocationRuntimeType;
// 本次调用对应的schemaMeta
private SchemaMeta schemaMeta;
// 本次调用对应的operationMeta
private OperationMeta operationMeta;
// loadbalance查询得到的地址,由transport client使用
// 之所以不放在handlerContext中,是因为这属于核心数据,没必要走那样的机制
private Endpoint endpoint;
// 只用于handler之间传递数据,是本地数据
private final Map<String, Object> handlerContext = localContext;
// handler链,是arrayList,可以高效地通过index访问
private List<Handler> handlerList;
private int handlerIndex;
// 应答的处理器
// 同步模式:避免应答在网络线程中处理解码等等业务级逻辑
private Executor responseExecutor;
private boolean sync = true;
private final InvocationStageTrace invocationStageTrace = new InvocationStageTrace(this);
private HttpServletRequestEx requestEx;
private boolean finished;
// not extend InvocationType
// because isEdge() only affect to apm/metrics output, no need to change so many logic
private boolean edge;
private long invocationId;
private TraceIdLogger traceIdLogger;
private Map<String, Object> invocationArguments = Collections.emptyMap();
private Object[] producerArguments;
private Map<String, Object> swaggerArguments = Collections.emptyMap();
public Invocation() {
// An empty invocation, used to mock or some other scenario do not need operation information.
traceIdLogger = new TraceIdLogger(this);
}
public Invocation(ReferenceConfig referenceConfig, OperationMeta operationMeta,
InvocationRuntimeType invocationRuntimeType,
Map<String, Object> swaggerArguments) {
this.invocationType = InvocationType.CONSUMER;
this.referenceConfig = referenceConfig;
this.invocationRuntimeType = invocationRuntimeType;
init(operationMeta, swaggerArguments);
}
public Invocation(Endpoint endpoint, OperationMeta operationMeta, Map<String, Object> swaggerArguments) {
this.invocationType = InvocationType.PRODUCER;
this.invocationRuntimeType = operationMeta.buildBaseProviderRuntimeType();
this.endpoint = endpoint;
init(operationMeta, swaggerArguments);
}
private void init(OperationMeta operationMeta, Map<String, Object> swaggerArguments) {
this.invocationId = INVOCATION_ID.getAndIncrement();
this.schemaMeta = operationMeta.getSchemaMeta();
this.operationMeta = operationMeta;
this.setSwaggerArguments(swaggerArguments);
this.handlerList = getHandlerChain();
handlerIndex = 0;
traceIdLogger = new TraceIdLogger(this);
}
public Transport getTransport() {
if (endpoint == null) {
throw new IllegalStateException(
"Endpoint is empty. Forget to configure \"loadbalance\" in consumer handler chain?");
}
return endpoint.getTransport();
}
public List<Handler> getHandlerChain() {
return schemaMeta.getMicroserviceMeta().getHandlerChain();
}
@VisibleForTesting
public void setHandlerList(List<Handler> handlerList) {
this.handlerList = handlerList;
}
public Executor getResponseExecutor() {
return responseExecutor;
}
public void setResponseExecutor(Executor responseExecutor) {
this.responseExecutor = responseExecutor;
}
public SchemaMeta getSchemaMeta() {
return schemaMeta;
}
public OperationMeta getOperationMeta() {
return operationMeta;
}
public Map<String, Object> getInvocationArguments() {
return this.invocationArguments;
}
public Map<String, Object> getSwaggerArguments() {
return this.swaggerArguments;
}
public Object getInvocationArgument(String name) {
return this.invocationArguments.get(name);
}
public Object getSwaggerArgument(String name) {
return this.swaggerArguments.get(name);
}
public void setInvocationArguments(Map<String, Object> invocationArguments) {
if (invocationArguments == null) {
// Empty arguments
this.invocationArguments = new HashMap<>(0);
return;
}
this.invocationArguments = invocationArguments;
buildSwaggerArguments();
}
private void buildSwaggerArguments() {
if (!this.invocationRuntimeType.isRawConsumer()) {
this.swaggerArguments = this.invocationRuntimeType.getArgumentsMapper()
.invocationArgumentToSwaggerArguments(this,
this.invocationArguments);
} else {
this.swaggerArguments = invocationArguments;
}
}
public void setSwaggerArguments(Map<String, Object> swaggerArguments) {
if (swaggerArguments == null) {
// Empty arguments
this.swaggerArguments = new HashMap<>(0);
return;
}
this.swaggerArguments = swaggerArguments;
buildInvocationArguments();
}
private void buildInvocationArguments() {
if (operationMeta.getSwaggerProducerOperation() != null && !isEdge()) {
this.invocationArguments = operationMeta.getSwaggerProducerOperation().getArgumentsMapper()
.swaggerArgumentToInvocationArguments(this,
swaggerArguments);
} else {
this.invocationArguments = swaggerArguments;
}
}
public Object[] toProducerArguments() {
if (producerArguments != null) {
return producerArguments;
}
Method method = operationMeta.getSwaggerProducerOperation().getProducerMethod();
Object[] args = new Object[method.getParameterCount()];
for (int i = 0; i < method.getParameterCount(); i++) {
args[i] = this.invocationArguments.get(method.getParameters()[i].getName());
}
return producerArguments = args;
}
public Endpoint getEndpoint() {
return endpoint;
}
public void setEndpoint(Endpoint endpoint) {
this.endpoint = endpoint;
}
public Map<String, Object> getHandlerContext() {
return handlerContext;
}
public int getHandlerIndex() {
return handlerIndex;
}
public void setHandlerIndex(int handlerIndex) {
this.handlerIndex = handlerIndex;
}
public void next(AsyncResponse asyncResp) throws Exception {
// 不必判断有效性,因为整个流程都是内部控制的
int runIndex = handlerIndex;
handlerIndex++;
handlerList.get(runIndex).handle(this, asyncResp);
}
public String getSchemaId() {
return schemaMeta.getSchemaId();
}
public String getOperationName() {
return operationMeta.getOperationId();
}
public String getConfigTransportName() {
return referenceConfig.getTransport();
}
public String getRealTransportName() {
return (endpoint != null) ? endpoint.getTransport().getName() : getConfigTransportName();
}
public String getMicroserviceName() {
return schemaMeta.getMicroserviceName();
}
public String getAppId() {
return schemaMeta.getMicroserviceMeta().getAppId();
}
public MicroserviceMeta getMicroserviceMeta() {
return schemaMeta.getMicroserviceMeta();
}
public String getMicroserviceVersionRule() {
return referenceConfig.getVersionRule();
}
public InvocationRuntimeType getInvocationRuntimeType() {
return this.invocationRuntimeType;
}
public JavaType findResponseType(int statusCode) {
return this.invocationRuntimeType.findResponseType(statusCode);
}
public void setSuccessResponseType(JavaType javaType) {
this.invocationRuntimeType.setSuccessResponseType(javaType);
}
@Override
public String getInvocationQualifiedName() {
return invocationType.name() + " " + getRealTransportName() + " "
+ getOperationMeta().getMicroserviceQualifiedName();
}
public String getMicroserviceQualifiedName() {
return operationMeta.getMicroserviceQualifiedName();
}
protected void initTraceId() {
for (TraceIdGenerator traceIdGenerator : TRACE_ID_GENERATORS) {
initTraceId(traceIdGenerator);
}
}
protected void initTraceId(TraceIdGenerator traceIdGenerator) {
if (!StringUtils.isEmpty(getTraceId(traceIdGenerator.getTraceIdKeyName()))) {
// if invocation context contains traceId, nothing needed to do
return;
}
if (requestEx == null) {
// it's a new consumer invocation, must generate a traceId
addContext(traceIdGenerator.getTraceIdKeyName(), traceIdGenerator.generate());
return;
}
String traceId = requestEx.getHeader(traceIdGenerator.getTraceIdKeyName());
if (!StringUtils.isEmpty(traceId)) {
// if request header contains traceId, save traceId into invocation context
addContext(traceIdGenerator.getTraceIdKeyName(), traceId);
return;
}
// if traceId not found, generate a traceId
addContext(traceIdGenerator.getTraceIdKeyName(), traceIdGenerator.generate());
}
public void onStart(long start) {
invocationStageTrace.start(start);
initTraceId();
EventManager.post(new InvocationStartEvent(this));
}
public void onStart(HttpServletRequestEx requestEx, long start) {
this.requestEx = requestEx;
onStart(start);
}
public void onExecuteStart() {
invocationStageTrace.startExecution();
EventManager.post(new InvocationRunInExecutorStartEvent(this));
}
public void onExecuteFinish() {
EventManager.post(new InvocationRunInExecutorFinishEvent(this));
}
public void onStartHandlersRequest() {
invocationStageTrace.startHandlersRequest();
EventManager.post(new InvocationHandlersStartEvent(this));
}
public void onStartSendRequest() {
invocationStageTrace.startSend();
EventManager.post(new InvocationStartSendRequestEvent(this));
}
@Override
public void onBusinessMethodStart() {
invocationStageTrace.startBusinessMethod();
EventManager.post(new InvocationBusinessMethodStartEvent(this));
}
@Override
public void onBusinessMethodFinish() {
EventManager.post(new InvocationBusinessMethodFinishEvent(this));
}
public void onEncodeResponseStart(Response response) {
EventManager.post(new InvocationEncodeResponseStartEvent(this, response));
}
@Override
public void onBusinessFinish() {
invocationStageTrace.finishBusiness();
EventManager.post(new InvocationBusinessFinishEvent(this));
}
public void onFinish(Response response) {
if (finished) {
// avoid to post repeated event
return;
}
invocationStageTrace.finish();
EventManager.post(new InvocationFinishEvent(this, response));
finished = true;
}
// for retry, reset invocation and try it again
public void reset() {
finished = false;
handlerIndex = 0;
}
public boolean isFinished() {
return finished;
}
public boolean isSync() {
return sync;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public boolean isConsumer() {
return InvocationType.CONSUMER.equals(invocationType);
}
public boolean isProducer() {
return InvocationType.PRODUCER.equals(invocationType);
}
public boolean isEdge() {
return edge;
}
public void setEdge(boolean edge) {
this.edge = edge;
}
public boolean isThirdPartyInvocation() {
return referenceConfig.is3rdPartyService();
}
public long getInvocationId() {
return invocationId;
}
public TraceIdLogger getTraceIdLogger() {
return this.traceIdLogger;
}
public HttpServletRequestEx getRequestEx() {
return requestEx;
}
public InvocationStageTrace getInvocationStageTrace() {
return invocationStageTrace;
}
public String getTraceId() {
return getContext(Const.TRACE_ID_NAME);
}
public String getTraceId(String traceIdName) {
return getContext(traceIdName);
}
// ensure sync consumer invocation response flow not run in eventLoop
public <T> CompletableFuture<T> optimizeSyncConsumerThread(CompletableFuture<T> future) {
if (sync && !InvokerUtils.isInEventLoop()) {
AsyncUtils.waitQuietly(future);
}
return future;
}
/**
* Check if invocation is timeout.
*
* NOTICE: this method only trigger event to ask the target checker to do the real check. So this method
* will only take effect when timeout checker is enabled.
*
* e.g. InvocationTimeoutBootListener.ENABLE_TIMEOUT_CHECK is enabled.
*
* @throws InvocationException if timeout, throw an exception. Will not throw exception twice if this method called
* after timeout.
*/
public void ensureInvocationNotTimeout() throws InvocationException {
EventManager.post(new InvocationTimeoutCheckEvent(this));
}
}