| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.rpc; |
| |
| import org.apache.dubbo.common.logger.Logger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.threadpool.ThreadlessExecutor; |
| import org.apache.dubbo.rpc.model.ConsumerMethodModel; |
| |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.function.BiConsumer; |
| import java.util.function.Function; |
| |
| import static org.apache.dubbo.common.utils.ReflectUtils.defaultReturn; |
| |
| /** |
| * This class represents an unfinished RPC call, it will hold some context information for this call, for example RpcContext and Invocation, |
| * so that when the call finishes and the result returns, it can guarantee all the contexts being recovered as the same as when the call was made |
| * before any callback is invoked. |
| * <p> |
| * TODO if it's reasonable or even right to keep a reference to Invocation? |
| * <p> |
| * As {@link Result} implements CompletionStage, {@link AsyncRpcResult} allows you to easily build a async filter chain whose status will be |
| * driven entirely by the state of the underlying RPC call. |
| * <p> |
| * AsyncRpcResult does not contain any concrete value (except the underlying value bring by CompletableFuture), consider it as a status transfer node. |
| * {@link #getValue()} and {@link #getException()} are all inherited from {@link Result} interface, implementing them are mainly |
| * for compatibility consideration. Because many legacy {@link Filter} implementation are most possibly to call getValue directly. |
| */ |
| public class AsyncRpcResult implements Result { |
| private static final Logger logger = LoggerFactory.getLogger(AsyncRpcResult.class); |
| |
| /** |
| * RpcContext may already have been changed when callback happens, it happens when the same thread is used to execute another RPC call. |
| * So we should keep the reference of current RpcContext instance and restore it before callback being executed. |
| */ |
| private RpcContext storedContext; |
| private RpcContext storedServerContext; |
| private Executor executor; |
| |
| private Invocation invocation; |
| |
| private CompletableFuture<AppResponse> responseFuture; |
| |
| public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) { |
| this.responseFuture = future; |
| this.invocation = invocation; |
| this.storedContext = RpcContext.getContext(); |
| this.storedServerContext = RpcContext.getServerContext(); |
| } |
| |
| /** |
| * Notice the return type of {@link #getValue} is the actual type of the RPC method, not {@link AppResponse} |
| * |
| * @return |
| */ |
| @Override |
| public Object getValue() { |
| return getAppResponse().getValue(); |
| } |
| |
| /** |
| * CompletableFuture can only be completed once, so try to update the result of one completed CompletableFuture will |
| * has no effect. To avoid this problem, we check the complete status of this future before update it's value. |
| * |
| * But notice that trying to give an uncompleted CompletableFuture a new specified value may face a race condition, |
| * because the background thread watching the real result will also change the status of this CompletableFuture. |
| * The result is you may lose the value you expected to set. |
| * |
| * @param value |
| */ |
| @Override |
| public void setValue(Object value) { |
| try { |
| if (responseFuture.isDone()) { |
| responseFuture.get().setValue(value); |
| } else { |
| AppResponse appResponse = new AppResponse(); |
| appResponse.setValue(value); |
| responseFuture.complete(appResponse); |
| } |
| } catch (Exception e) { |
| // This should not happen in normal request process; |
| logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult."); |
| throw new RpcException(e); |
| } |
| } |
| |
| @Override |
| public Throwable getException() { |
| return getAppResponse().getException(); |
| } |
| |
| @Override |
| public void setException(Throwable t) { |
| try { |
| if (responseFuture.isDone()) { |
| responseFuture.get().setException(t); |
| } else { |
| AppResponse appResponse = new AppResponse(); |
| appResponse.setException(t); |
| responseFuture.complete(appResponse); |
| } |
| } catch (Exception e) { |
| // This should not happen in normal request process; |
| logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult."); |
| throw new RpcException(e); |
| } |
| } |
| |
| @Override |
| public boolean hasException() { |
| return getAppResponse().hasException(); |
| } |
| |
| public CompletableFuture<AppResponse> getResponseFuture() { |
| return responseFuture; |
| } |
| |
| public void setResponseFuture(CompletableFuture<AppResponse> responseFuture) { |
| this.responseFuture = responseFuture; |
| } |
| |
| public Result getAppResponse() { |
| try { |
| if (responseFuture.isDone()) { |
| return responseFuture.get(); |
| } |
| } catch (Exception e) { |
| // This should not happen in normal request process; |
| logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult."); |
| throw new RpcException(e); |
| } |
| |
| return createDefaultValue(invocation); |
| } |
| |
| /** |
| * This method will always return after a maximum 'timeout' waiting: |
| * 1. if value returns before timeout, return normally. |
| * 2. if no value returns after timeout, throw TimeoutException. |
| * |
| * @return |
| * @throws InterruptedException |
| * @throws ExecutionException |
| */ |
| @Override |
| public Result get() throws InterruptedException, ExecutionException { |
| if (executor != null && executor instanceof ThreadlessExecutor) { |
| ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; |
| threadlessExecutor.waitAndDrain(); |
| } |
| return responseFuture.get(); |
| } |
| |
| @Override |
| public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
| if (executor != null && executor instanceof ThreadlessExecutor) { |
| ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; |
| threadlessExecutor.waitAndDrain(); |
| } |
| return responseFuture.get(timeout, unit); |
| } |
| |
| @Override |
| public Object recreate() throws Throwable { |
| RpcInvocation rpcInvocation = (RpcInvocation) invocation; |
| if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) { |
| return RpcContext.getContext().getFuture(); |
| } |
| |
| return getAppResponse().recreate(); |
| } |
| |
| public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) { |
| this.responseFuture = this.responseFuture.whenComplete((v, t) -> { |
| beforeContext.accept(v, t); |
| fn.accept(v, t); |
| afterContext.accept(v, t); |
| }); |
| return this; |
| } |
| |
| @Override |
| public <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn) { |
| return this.responseFuture.thenApply(fn); |
| } |
| |
| @Override |
| @Deprecated |
| public Map<String, String> getAttachments() { |
| return getAppResponse().getAttachments(); |
| } |
| |
| @Override |
| public Map<String, Object> getObjectAttachments() { |
| return getAppResponse().getObjectAttachments(); |
| } |
| |
| @Override |
| public void setAttachments(Map<String, String> map) { |
| getAppResponse().setAttachments(map); |
| } |
| |
| @Override |
| public void setObjectAttachments(Map<String, Object> map) { |
| getAppResponse().setObjectAttachments(map); |
| } |
| |
| @Deprecated |
| @Override |
| public void addAttachments(Map<String, String> map) { |
| getAppResponse().addAttachments(map); |
| } |
| |
| @Override |
| public void addObjectAttachments(Map<String, Object> map) { |
| getAppResponse().addObjectAttachments(map); |
| } |
| |
| @Override |
| public String getAttachment(String key) { |
| return getAppResponse().getAttachment(key); |
| } |
| |
| @Override |
| public Object getObjectAttachment(String key) { |
| return getAppResponse().getObjectAttachment(key); |
| } |
| |
| @Override |
| public String getAttachment(String key, String defaultValue) { |
| return getAppResponse().getAttachment(key, defaultValue); |
| } |
| |
| @Override |
| public Object getObjectAttachment(String key, Object defaultValue) { |
| return getAppResponse().getObjectAttachment(key, defaultValue); |
| } |
| |
| @Override |
| public void setAttachment(String key, String value) { |
| setObjectAttachment(key, value); |
| } |
| |
| @Override |
| public void setAttachment(String key, Object value) { |
| setObjectAttachment(key, value); |
| } |
| |
| @Override |
| public void setObjectAttachment(String key, Object value) { |
| getAppResponse().setAttachment(key, value); |
| } |
| |
| public Executor getExecutor() { |
| return executor; |
| } |
| |
| public void setExecutor(Executor executor) { |
| this.executor = executor; |
| } |
| |
| /** |
| * tmp context to use when the thread switch to Dubbo thread. |
| */ |
| private RpcContext tmpContext; |
| |
| private RpcContext tmpServerContext; |
| private BiConsumer<Result, Throwable> beforeContext = (appResponse, t) -> { |
| tmpContext = RpcContext.getContext(); |
| tmpServerContext = RpcContext.getServerContext(); |
| RpcContext.restoreContext(storedContext); |
| RpcContext.restoreServerContext(storedServerContext); |
| }; |
| |
| private BiConsumer<Result, Throwable> afterContext = (appResponse, t) -> { |
| RpcContext.restoreContext(tmpContext); |
| RpcContext.restoreServerContext(tmpServerContext); |
| }; |
| |
| /** |
| * Some utility methods used to quickly generate default AsyncRpcResult instance. |
| */ |
| public static AsyncRpcResult newDefaultAsyncResult(AppResponse appResponse, Invocation invocation) { |
| return new AsyncRpcResult(CompletableFuture.completedFuture(appResponse), invocation); |
| } |
| |
| public static AsyncRpcResult newDefaultAsyncResult(Invocation invocation) { |
| return newDefaultAsyncResult(null, null, invocation); |
| } |
| |
| public static AsyncRpcResult newDefaultAsyncResult(Object value, Invocation invocation) { |
| return newDefaultAsyncResult(value, null, invocation); |
| } |
| |
| public static AsyncRpcResult newDefaultAsyncResult(Throwable t, Invocation invocation) { |
| return newDefaultAsyncResult(null, t, invocation); |
| } |
| |
| public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) { |
| CompletableFuture<AppResponse> future = new CompletableFuture<>(); |
| AppResponse result = new AppResponse(); |
| if (t != null) { |
| result.setException(t); |
| } else { |
| result.setValue(value); |
| } |
| future.complete(result); |
| return new AsyncRpcResult(future, invocation); |
| } |
| |
| private static Result createDefaultValue(Invocation invocation) { |
| ConsumerMethodModel method = (ConsumerMethodModel) invocation.get(Constants.METHOD_MODEL); |
| return method != null ? new AppResponse(defaultReturn(method.getReturnClass())) : new AppResponse(); |
| } |
| } |
| |