| /*
|
| * Copyright 1999-2011 Alibaba Group.
|
| *
|
| * Licensed under the Apache License, Version 2.0 (the "License");
|
| * you may not use this file except in compliance with the License.
|
| * You may obtain a copy of the License at
|
| *
|
| * http://www.apache.org/licenses/LICENSE-2.0
|
| *
|
| * Unless required by applicable law or agreed to in writing, software
|
| * distributed under the License is distributed on an "AS IS" BASIS,
|
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| * See the License for the specific language governing permissions and
|
| * limitations under the License.
|
| */
|
| package com.alibaba.dubbo.rpc.protocol.dubbo; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import com.alibaba.dubbo.common.Constants; |
| import com.alibaba.dubbo.common.URL; |
| import com.alibaba.dubbo.common.utils.NetUtils; |
| import com.alibaba.dubbo.rpc.Exporter; |
| import com.alibaba.dubbo.rpc.Invoker; |
| import com.alibaba.dubbo.rpc.RpcConstants; |
| import com.alibaba.dubbo.rpc.RpcException; |
| import com.alibaba.dubbo.rpc.protocol.dubbo.support.ProtocolUtils;
|
| |
| public class ExplicitCallbackTest { |
| |
| protected Exporter<IDemoService> exporter = null; |
| protected Invoker<IDemoService> reference = null; |
| |
| @After |
| public void tearDown(){ |
| destroyService(); |
| } |
| |
| public void exportService(){ |
| exporter = ProtocolUtils.export(new DemoServiceImpl(), IDemoService.class, serviceURL); |
| } |
| void referService() { |
| demoProxy = (IDemoService)ProtocolUtils.refer(IDemoService.class, consumerUrl); |
| } |
| |
| protected URL serviceURL = null ; |
| protected URL consumerUrl = null ; |
| |
| @Before |
| public void setUp(){ |
| } |
| public void initOrResetUrl(int callbacks, int timeout) throws Exception { |
| int port = NetUtils.getAvailablePort() ; |
| consumerUrl = serviceURL = URL.valueOf("dubbo://127.0.0.1:"+port+"/"+IDemoService.class.getName()+"?group=test" |
| +"&xxx.0.callback=true" |
| +"&xxx2.0.callback=true" |
| +"&unxxx2.0.callback=false" |
| +"&timeout="+timeout |
| +"&"+RpcConstants.CALLBACK_INSTANCES_LIMIT_KEY+"="+callbacks |
| ); |
| // uncomment is unblock invoking |
| // serviceURL = serviceURL.addParameter("yyy."+Constants.ASYNC_KEY,String.valueOf(true)); |
| // consumerUrl = consumerUrl.addParameter("yyy."+Constants.ASYNC_KEY,String.valueOf(true)); |
| } |
| public void initOrResetBadUrl() throws Exception{ |
| initOrResetUrl(1, 1000); |
| consumerUrl = serviceURL = serviceURL |
| .addParameter(Constants.DOWNSTREAM_CODEC_KEY, "dubbo1compatible") |
| ; |
| } |
| public void initOrResetService(){ |
| destroyService(); |
| exportService(); |
| referService(); |
| } |
| public void destroyService(){ |
| demoProxy = null ; |
| try { |
| if (exporter!=null) exporter.unexport(); |
| if (reference!=null) reference.destroy(); |
| }catch (Exception e) { |
| } |
| } |
| // ============================华丽的分割线================================================ |
| interface IDemoCallback{ |
| String yyy(String msg); |
| } |
| interface IDemoService{ |
| public String get(); |
| public int getCallbackCount(); |
| public void xxx(IDemoCallback callback,String arg1,int runs,int sleep); |
| public void xxx2(IDemoCallback callback); |
| public void unxxx2(IDemoCallback callback); |
| } |
| |
| class DemoServiceImpl implements IDemoService { |
| public String get(){ |
| return "ok" ; |
| } |
| public void xxx(final IDemoCallback callback ,String arg1, final int runs ,final int sleep) { |
| callback.yyy("Sync callback msg .This is callback data. arg1:"+arg1); |
| Thread t = new Thread(new Runnable() { |
| public void run() { |
| for(int i = 0 ;i< runs ;i++){ |
| String ret = callback.yyy("server invoke callback : arg:"+System.currentTimeMillis()); |
| System.out.println("callback result is :"+ret); |
| try { |
| Thread.sleep(sleep); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| }); |
| t.setDaemon(true); |
| t.start(); |
| System.out.println("xxx invoke complete"); |
| } |
| |
| private List<IDemoCallback> callbacks = new ArrayList<IDemoCallback>(); |
| public int getCallbackCount(){ |
| return callbacks.size(); |
| } |
| public void xxx2(IDemoCallback callback){ |
| if (!callbacks.contains(callback)){ |
| callbacks.add(callback); |
| } |
| startThread(); |
| } |
| private volatile Thread t = null; |
| private volatile Lock lock = new ReentrantLock(); |
| private void startThread(){ |
| if (t == null || callbacks.size() == 0){ |
| try{ |
| lock.lock(); |
| t = new Thread(new Runnable() { |
| public void run() { |
| while(callbacks.size()>0){ |
| try { |
| List<IDemoCallback> callbacksCopy = new ArrayList<IDemoCallback>(callbacks); |
| for(IDemoCallback callback : callbacksCopy){ |
| try{ |
| callback.yyy("this is callback msg,current time is :"+ System.currentTimeMillis()); |
| }catch (Exception e) { |
| e.printStackTrace(); |
| callbacks.remove(callback); |
| } |
| } |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| }); |
| t.setDaemon(true); |
| t.start(); |
| }finally{ |
| lock.unlock(); |
| } |
| } |
| } |
| |
| public void unxxx2(IDemoCallback callback) { |
| if (!callbacks.contains(callback)){ |
| throw new IllegalStateException("callback instance not found"); |
| } |
| callbacks.remove(callback); |
| } |
| } |
| |
| |
| // ============================华丽的分割线================================================ |
| IDemoService demoProxy = null; |
| @Test |
| public void TestCallbackNormal() throws Exception { |
| initOrResetUrl(1, 1000); initOrResetService() ; |
| final AtomicInteger count = new AtomicInteger(0); |
| |
| demoProxy.xxx(new IDemoCallback() { |
| public String yyy(String msg) { |
| System.out.println("Recived callback: " + msg); |
| count.incrementAndGet(); |
| return "ok"; |
| } |
| },"other custom args" , 10 , 100); |
| System.out.println("Async..."); |
| assertCallbackCount(10,100,count); |
| destroyService(); |
| } |
| |
| @Test |
| public void TestCallbackMultiInstans() throws Exception { |
| initOrResetUrl(2, 1000); |
| initOrResetService() ; |
| IDemoCallback callback = new IDemoCallback(){ |
| public String yyy(String msg) { |
| System.out.println("callback1:"+msg); |
| return "callback1 onChanged ,"+msg; |
| } |
| }; |
| |
| IDemoCallback callback2 = new IDemoCallback(){ |
| public String yyy(String msg) { |
| System.out.println("callback2:"+msg); |
| return "callback2 onChanged ,"+msg; |
| } |
| }; |
| { |
| demoProxy.xxx2(callback); |
| Assert.assertEquals(1, demoProxy.getCallbackCount()); |
| Thread.sleep(500); |
| demoProxy.unxxx2(callback); |
| Assert.assertEquals(0, demoProxy.getCallbackCount()); |
| System.out.println(""); |
| |
| demoProxy.xxx2(callback2); |
| Assert.assertEquals(1, demoProxy.getCallbackCount()); |
| Thread.sleep(500); |
| demoProxy.unxxx2(callback2); |
| Assert.assertEquals(0, demoProxy.getCallbackCount()); |
| System.out.println(""); |
| demoProxy.xxx2(callback); |
| Thread.sleep(500); |
| Assert.assertEquals(1, demoProxy.getCallbackCount()); |
| demoProxy.unxxx2(callback); |
| Assert.assertEquals(0, demoProxy.getCallbackCount()); |
| } |
| { |
| demoProxy.xxx2(callback); |
| Assert.assertEquals(1, demoProxy.getCallbackCount()); |
| |
| demoProxy.xxx2(callback); |
| Assert.assertEquals(1, demoProxy.getCallbackCount()); |
| |
| demoProxy.xxx2(callback2); |
| Assert.assertEquals(2, demoProxy.getCallbackCount()); |
| } |
| destroyService(); |
| } |
| |
| @Ignore |
| @Test(expected=RpcException.class) |
| public void TestCallbackDownStreamCodec() throws Exception { |
| initOrResetBadUrl(); initOrResetService() ; |
| final AtomicInteger count = new AtomicInteger(0); |
| demoProxy.xxx(new IDemoCallback() { |
| public String yyy(String msg) { |
| System.out.println("Recived callback: " + msg); |
| count.incrementAndGet(); |
| return "ok"; |
| } |
| },"other custom args" , 10 , 100); |
| System.out.println("Async..."); |
| assertCallbackCount(10,100,count); |
| destroyService(); |
| } |
| |
| @Test(expected = RpcException.class) |
| public void TestCallbackConsumerLimit() throws Exception { |
| initOrResetUrl(1, 1000); |
| //api的方式 url 无法自动从服务端传递到客户端,需要手动制定 |
| initOrResetService() ; |
| final AtomicInteger count = new AtomicInteger(0); |
| demoProxy.xxx(new IDemoCallback() { |
| public String yyy(String msg) { |
| System.out.println("Recived callback: " + msg); |
| count.incrementAndGet(); |
| return "ok"; |
| } |
| },"other custom args" , 10 , 100); |
| |
| demoProxy.xxx(new IDemoCallback() { |
| public String yyy(String msg) { |
| System.out.println("Recived callback: " + msg); |
| count.incrementAndGet(); |
| return "ok"; |
| } |
| },"other custom args" , 10 , 100); |
| destroyService(); |
| } |
| |
| @Test(expected = RpcException.class) |
| public void TestCallbackProviderLimit() throws Exception { |
| initOrResetUrl(1, 1000); |
| //api的方式 url 无法自动从服务端传递到客户端,需要手动制定 |
| serviceURL = serviceURL.addParameter(RpcConstants.CALLBACK_INSTANCES_LIMIT_KEY, 1+""); |
| initOrResetService() ; |
| final AtomicInteger count = new AtomicInteger(0); |
| demoProxy.xxx(new IDemoCallback() { |
| public String yyy(String msg) { |
| System.out.println("Recived callback: " + msg); |
| count.incrementAndGet(); |
| return "ok"; |
| } |
| },"other custom args" , 10 , 100); |
| |
| demoProxy.xxx(new IDemoCallback() { |
| public String yyy(String msg) { |
| System.out.println("Recived callback: " + msg); |
| count.incrementAndGet(); |
| return "ok"; |
| } |
| },"other custom args" , 10 , 100); |
| destroyService(); |
| } |
| |
| private void assertCallbackCount(int runs, int sleep, AtomicInteger count) throws InterruptedException{ |
| int last = count.get(); |
| for(int i=0;i< runs ;i++){ |
| if (last > runs) break; |
| Thread.sleep(sleep * 2); |
| System.out.println(count.get() + " " + last); |
| Assert.assertTrue(count.get() > last); |
| last = count.get(); |
| } |
| //有一次同步调用callback |
| Assert.assertEquals(runs+1, count.get()); |
| } |
| |
| @Ignore //使用不同进程启动 |
| @Test |
| public void startProvider() throws Exception { |
| exportService(); |
| synchronized (ExplicitCallbackTest.class) { |
| ExplicitCallbackTest.class.wait(); |
| } |
| } |
| } |