blob: d61d48fdecaf99cedeeb109ea15b991f0ba6964d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
import org.apache.dubbo.rpc.model.ConsumerMethodModel;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import static org.apache.dubbo.common.utils.ReflectUtils.defaultReturn;
/**
* This class represents an unfinished RPC call, it will hold some context information for this call, for example RpcContext and Invocation,
* so that when the call finishes and the result returns, it can guarantee all the contexts being recovered as the same as when the call was made
* before any callback is invoked.
* <p>
* TODO if it's reasonable or even right to keep a reference to Invocation?
* <p>
* As {@link Result} implements CompletionStage, {@link AsyncRpcResult} allows you to easily build a async filter chain whose status will be
* driven entirely by the state of the underlying RPC call.
* <p>
* AsyncRpcResult does not contain any concrete value (except the underlying value bring by CompletableFuture), consider it as a status transfer node.
* {@link #getValue()} and {@link #getException()} are all inherited from {@link Result} interface, implementing them are mainly
* for compatibility consideration. Because many legacy {@link Filter} implementation are most possibly to call getValue directly.
*/
public class AsyncRpcResult implements Result {
private static final Logger logger = LoggerFactory.getLogger(AsyncRpcResult.class);
/**
* RpcContext may already have been changed when callback happens, it happens when the same thread is used to execute another RPC call.
* So we should keep the reference of current RpcContext instance and restore it before callback being executed.
*/
private RpcContext storedContext;
private RpcContext storedServerContext;
private Executor executor;
private Invocation invocation;
private CompletableFuture<AppResponse> responseFuture;
public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
this.responseFuture = future;
this.invocation = invocation;
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}
/**
* Notice the return type of {@link #getValue} is the actual type of the RPC method, not {@link AppResponse}
*
* @return
*/
@Override
public Object getValue() {
return getAppResponse().getValue();
}
/**
* CompletableFuture can only be completed once, so try to update the result of one completed CompletableFuture will
* has no effect. To avoid this problem, we check the complete status of this future before update it's value.
*
* But notice that trying to give an uncompleted CompletableFuture a new specified value may face a race condition,
* because the background thread watching the real result will also change the status of this CompletableFuture.
* The result is you may lose the value you expected to set.
*
* @param value
*/
@Override
public void setValue(Object value) {
try {
if (responseFuture.isDone()) {
responseFuture.get().setValue(value);
} else {
AppResponse appResponse = new AppResponse();
appResponse.setValue(value);
responseFuture.complete(appResponse);
}
} catch (Exception e) {
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
}
@Override
public Throwable getException() {
return getAppResponse().getException();
}
@Override
public void setException(Throwable t) {
try {
if (responseFuture.isDone()) {
responseFuture.get().setException(t);
} else {
AppResponse appResponse = new AppResponse();
appResponse.setException(t);
responseFuture.complete(appResponse);
}
} catch (Exception e) {
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
}
@Override
public boolean hasException() {
return getAppResponse().hasException();
}
public CompletableFuture<AppResponse> getResponseFuture() {
return responseFuture;
}
public void setResponseFuture(CompletableFuture<AppResponse> responseFuture) {
this.responseFuture = responseFuture;
}
public Result getAppResponse() {
try {
if (responseFuture.isDone()) {
return responseFuture.get();
}
} catch (Exception e) {
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
return createDefaultValue(invocation);
}
/**
* This method will always return after a maximum 'timeout' waiting:
* 1. if value returns before timeout, return normally.
* 2. if no value returns after timeout, throw TimeoutException.
*
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
@Override
public Result get() throws InterruptedException, ExecutionException {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
return responseFuture.get();
}
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
return responseFuture.get(timeout, unit);
}
@Override
public Object recreate() throws Throwable {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
return RpcContext.getContext().getFuture();
}
return getAppResponse().recreate();
}
public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
beforeContext.accept(v, t);
fn.accept(v, t);
afterContext.accept(v, t);
});
return this;
}
@Override
public <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn) {
return this.responseFuture.thenApply(fn);
}
@Override
@Deprecated
public Map<String, String> getAttachments() {
return getAppResponse().getAttachments();
}
@Override
public Map<String, Object> getObjectAttachments() {
return getAppResponse().getObjectAttachments();
}
@Override
public void setAttachments(Map<String, String> map) {
getAppResponse().setAttachments(map);
}
@Override
public void setObjectAttachments(Map<String, Object> map) {
getAppResponse().setObjectAttachments(map);
}
@Deprecated
@Override
public void addAttachments(Map<String, String> map) {
getAppResponse().addAttachments(map);
}
@Override
public void addObjectAttachments(Map<String, Object> map) {
getAppResponse().addObjectAttachments(map);
}
@Override
public String getAttachment(String key) {
return getAppResponse().getAttachment(key);
}
@Override
public Object getObjectAttachment(String key) {
return getAppResponse().getObjectAttachment(key);
}
@Override
public String getAttachment(String key, String defaultValue) {
return getAppResponse().getAttachment(key, defaultValue);
}
@Override
public Object getObjectAttachment(String key, Object defaultValue) {
return getAppResponse().getObjectAttachment(key, defaultValue);
}
@Override
public void setAttachment(String key, String value) {
setObjectAttachment(key, value);
}
@Override
public void setAttachment(String key, Object value) {
setObjectAttachment(key, value);
}
@Override
public void setObjectAttachment(String key, Object value) {
getAppResponse().setAttachment(key, value);
}
public Executor getExecutor() {
return executor;
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
/**
* tmp context to use when the thread switch to Dubbo thread.
*/
private RpcContext tmpContext;
private RpcContext tmpServerContext;
private BiConsumer<Result, Throwable> beforeContext = (appResponse, t) -> {
tmpContext = RpcContext.getContext();
tmpServerContext = RpcContext.getServerContext();
RpcContext.restoreContext(storedContext);
RpcContext.restoreServerContext(storedServerContext);
};
private BiConsumer<Result, Throwable> afterContext = (appResponse, t) -> {
RpcContext.restoreContext(tmpContext);
RpcContext.restoreServerContext(tmpServerContext);
};
/**
* Some utility methods used to quickly generate default AsyncRpcResult instance.
*/
public static AsyncRpcResult newDefaultAsyncResult(AppResponse appResponse, Invocation invocation) {
return new AsyncRpcResult(CompletableFuture.completedFuture(appResponse), invocation);
}
public static AsyncRpcResult newDefaultAsyncResult(Invocation invocation) {
return newDefaultAsyncResult(null, null, invocation);
}
public static AsyncRpcResult newDefaultAsyncResult(Object value, Invocation invocation) {
return newDefaultAsyncResult(value, null, invocation);
}
public static AsyncRpcResult newDefaultAsyncResult(Throwable t, Invocation invocation) {
return newDefaultAsyncResult(null, t, invocation);
}
public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
CompletableFuture<AppResponse> future = new CompletableFuture<>();
AppResponse result = new AppResponse();
if (t != null) {
result.setException(t);
} else {
result.setValue(value);
}
future.complete(result);
return new AsyncRpcResult(future, invocation);
}
private static Result createDefaultValue(Invocation invocation) {
ConsumerMethodModel method = (ConsumerMethodModel) invocation.get(Constants.METHOD_MODEL);
return method != null ? new AppResponse(defaultReturn(method.getReturnClass())) : new AppResponse();
}
}