| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.alibaba.dubbo.remoting.exchange.support; |
| |
| import com.alibaba.dubbo.common.Constants; |
| import com.alibaba.dubbo.common.logger.Logger; |
| import com.alibaba.dubbo.common.logger.LoggerFactory; |
| import com.alibaba.dubbo.remoting.Channel; |
| import com.alibaba.dubbo.remoting.RemotingException; |
| import com.alibaba.dubbo.remoting.TimeoutException; |
| import com.alibaba.dubbo.remoting.exchange.Request; |
| import com.alibaba.dubbo.remoting.exchange.Response; |
| import com.alibaba.dubbo.remoting.exchange.ResponseCallback; |
| import com.alibaba.dubbo.remoting.exchange.ResponseFuture; |
| |
| import java.text.SimpleDateFormat; |
| import java.util.Date; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| /** |
| * DefaultFuture. |
| */ |
| public class DefaultFuture implements ResponseFuture { |
| |
| private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class); |
| |
| private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); |
| |
| private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); |
| |
| static { |
| Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); |
| th.setDaemon(true); |
| th.start(); |
| } |
| |
| // invoke id. |
| private final long id; |
| private final Channel channel; |
| private final Request request; |
| private final int timeout; |
| private final Lock lock = new ReentrantLock(); |
| private final Condition done = lock.newCondition(); |
| private final long start = System.currentTimeMillis(); |
| private volatile long sent; |
| private volatile Response response; |
| private volatile ResponseCallback callback; |
| |
| public DefaultFuture(Channel channel, Request request, int timeout) { |
| this.channel = channel; |
| this.request = request; |
| this.id = request.getId(); |
| this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); |
| // put into waiting map. |
| FUTURES.put(id, this); |
| CHANNELS.put(id, channel); |
| } |
| |
| public static DefaultFuture getFuture(long id) { |
| return FUTURES.get(id); |
| } |
| |
| public static boolean hasFuture(Channel channel) { |
| return CHANNELS.containsValue(channel); |
| } |
| |
| public static void sent(Channel channel, Request request) { |
| DefaultFuture future = FUTURES.get(request.getId()); |
| if (future != null) { |
| future.doSent(); |
| } |
| } |
| |
| public static void received(Channel channel, Response response) { |
| try { |
| DefaultFuture future = FUTURES.remove(response.getId()); |
| if (future != null) { |
| future.doReceived(response); |
| } else { |
| logger.warn("The timeout response finally returned at " |
| + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) |
| + ", response " + response |
| + (channel == null ? "" : ", channel: " + channel.getLocalAddress() |
| + " -> " + channel.getRemoteAddress())); |
| } |
| } finally { |
| CHANNELS.remove(response.getId()); |
| } |
| } |
| |
| @Override |
| public Object get() throws RemotingException { |
| return get(timeout); |
| } |
| |
| @Override |
| public Object get(int timeout) throws RemotingException { |
| if (timeout <= 0) { |
| timeout = Constants.DEFAULT_TIMEOUT; |
| } |
| if (!isDone()) { |
| long start = System.currentTimeMillis(); |
| lock.lock(); |
| try { |
| while (!isDone()) { |
| done.await(timeout, TimeUnit.MILLISECONDS); |
| if (isDone() || System.currentTimeMillis() - start > timeout) { |
| break; |
| } |
| } |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } finally { |
| lock.unlock(); |
| } |
| if (!isDone()) { |
| throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); |
| } |
| } |
| return returnFromResponse(); |
| } |
| |
| public void cancel() { |
| Response errorResult = new Response(id); |
| errorResult.setErrorMessage("request future has been canceled."); |
| response = errorResult; |
| FUTURES.remove(id); |
| CHANNELS.remove(id); |
| } |
| |
| @Override |
| public boolean isDone() { |
| return response != null; |
| } |
| |
| @Override |
| public void setCallback(ResponseCallback callback) { |
| if (isDone()) { |
| invokeCallback(callback); |
| } else { |
| boolean isdone = false; |
| lock.lock(); |
| try { |
| if (!isDone()) { |
| this.callback = callback; |
| } else { |
| isdone = true; |
| } |
| } finally { |
| lock.unlock(); |
| } |
| if (isdone) { |
| invokeCallback(callback); |
| } |
| } |
| } |
| |
| private void invokeCallback(ResponseCallback c) { |
| ResponseCallback callbackCopy = c; |
| if (callbackCopy == null) { |
| throw new NullPointerException("callback cannot be null."); |
| } |
| c = null; |
| Response res = response; |
| if (res == null) { |
| throw new IllegalStateException("response cannot be null. url:" + channel.getUrl()); |
| } |
| |
| if (res.getStatus() == Response.OK) { |
| try { |
| callbackCopy.done(res.getResult()); |
| } catch (Exception e) { |
| logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e); |
| } |
| } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { |
| try { |
| TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); |
| callbackCopy.caught(te); |
| } catch (Exception e) { |
| logger.error("callback invoke error ,url:" + channel.getUrl(), e); |
| } |
| } else { |
| try { |
| RuntimeException re = new RuntimeException(res.getErrorMessage()); |
| callbackCopy.caught(re); |
| } catch (Exception e) { |
| logger.error("callback invoke error ,url:" + channel.getUrl(), e); |
| } |
| } |
| } |
| |
| private Object returnFromResponse() throws RemotingException { |
| Response res = response; |
| if (res == null) { |
| throw new IllegalStateException("response cannot be null"); |
| } |
| if (res.getStatus() == Response.OK) { |
| return res.getResult(); |
| } |
| if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { |
| throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); |
| } |
| throw new RemotingException(channel, res.getErrorMessage()); |
| } |
| |
| private long getId() { |
| return id; |
| } |
| |
| private Channel getChannel() { |
| return channel; |
| } |
| |
| private boolean isSent() { |
| return sent > 0; |
| } |
| |
| public Request getRequest() { |
| return request; |
| } |
| |
| private int getTimeout() { |
| return timeout; |
| } |
| |
| private long getStartTimestamp() { |
| return start; |
| } |
| |
| private void doSent() { |
| sent = System.currentTimeMillis(); |
| } |
| |
| private void doReceived(Response res) { |
| lock.lock(); |
| try { |
| response = res; |
| if (done != null) { |
| done.signal(); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| if (callback != null) { |
| invokeCallback(callback); |
| } |
| } |
| |
| private String getTimeoutMessage(boolean scan) { |
| long nowTimestamp = System.currentTimeMillis(); |
| return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side") |
| + (scan ? " by scan timer" : "") + ". start time: " |
| + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(start))) + ", end time: " |
| + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + "," |
| + (sent > 0 ? " client elapsed: " + (sent - start) |
| + " ms, server elapsed: " + (nowTimestamp - sent) |
| : " elapsed: " + (nowTimestamp - start)) + " ms, timeout: " |
| + timeout + " ms, request: " + request + ", channel: " + channel.getLocalAddress() |
| + " -> " + channel.getRemoteAddress(); |
| } |
| |
| private static class RemotingInvocationTimeoutScan implements Runnable { |
| |
| @Override |
| public void run() { |
| while (true) { |
| try { |
| for (DefaultFuture future : FUTURES.values()) { |
| if (future == null || future.isDone()) { |
| continue; |
| } |
| if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { |
| // create exception response. |
| Response timeoutResponse = new Response(future.getId()); |
| // set timeout status. |
| timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); |
| timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); |
| // handle response. |
| DefaultFuture.received(future.getChannel(), timeoutResponse); |
| } |
| } |
| Thread.sleep(30); |
| } catch (Throwable e) { |
| logger.error("Exception when scan the timeout invocation of remoting.", e); |
| } |
| } |
| } |
| } |
| |
| } |