blob: bc1a28bfc96dc6ca1a5e04a768ae954fd98be31b [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol.dubbo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.dubbo.support.ProtocolUtils;
public class ExplicitCallbackTest {
protected Exporter<IDemoService> exporter = null;
protected Exporter<IHelloService> hello_exporter = null;
protected Invoker<IDemoService> reference = null;
@After
public void tearDown(){
destroyService();
}
public void exportService(){
//先export一个service,测试共享连接的问题
serviceURL=serviceURL.addParameter("connections", 1);
URL hellourl = serviceURL.setPath(IHelloService.class.getName());
hello_exporter = ProtocolUtils.export(new HelloServiceImpl(), IHelloService.class, hellourl);
exporter = ProtocolUtils.export(new DemoServiceImpl(), IDemoService.class, serviceURL);
}
void referService() {
demoProxy = (IDemoService)ProtocolUtils.refer(IDemoService.class, consumerUrl);
}
protected URL serviceURL = null ;
protected URL consumerUrl = null ;
@Before
public void setUp(){
}
public void initOrResetUrl(int callbacks, int timeout) throws Exception {
int port = NetUtils.getAvailablePort() ;
consumerUrl = serviceURL = URL.valueOf("dubbo://127.0.0.1:"+port+"/"+IDemoService.class.getName()+"?group=test"
+"&xxx.0.callback=true"
+"&xxx2.0.callback=true"
+"&unxxx2.0.callback=false"
+"&timeout="+timeout
+"&retries=0"
+"&"+Constants.CALLBACK_INSTANCES_LIMIT_KEY+"="+callbacks
);
// uncomment is unblock invoking
// serviceURL = serviceURL.addParameter("yyy."+Constants.ASYNC_KEY,String.valueOf(true));
// consumerUrl = consumerUrl.addParameter("yyy."+Constants.ASYNC_KEY,String.valueOf(true));
}
public void initOrResetBadUrl() throws Exception{
initOrResetUrl(1, 1000);
consumerUrl = serviceURL = serviceURL
.addParameter(Constants.DOWNSTREAM_CODEC_KEY, "dubbo1compatible")
;
}
public void initOrResetService(){
destroyService();
exportService();
referService();
}
public void destroyService(){
demoProxy = null ;
try {
if (exporter!=null) exporter.unexport();
if (hello_exporter!=null) hello_exporter.unexport();
if (reference!=null) reference.destroy();
}catch (Exception e) {
}
}
// ============================华丽的分割线================================================
interface IDemoCallback{
String yyy(String msg);
}
interface IHelloService{
public String sayHello();
}
interface IDemoService{
public String get();
public int getCallbackCount();
public void xxx(IDemoCallback callback,String arg1,int runs,int sleep);
public void xxx2(IDemoCallback callback);
public void unxxx2(IDemoCallback callback);
}
class HelloServiceImpl implements IHelloService{
public String sayHello() {
return "hello";
}
}
class DemoServiceImpl implements IDemoService {
public String get(){
return "ok" ;
}
public void xxx(final IDemoCallback callback ,String arg1, final int runs ,final int sleep) {
callback.yyy("Sync callback msg .This is callback data. arg1:"+arg1);
Thread t = new Thread(new Runnable() {
public void run() {
for(int i = 0 ;i< runs ;i++){
String ret = callback.yyy("server invoke callback : arg:"+System.currentTimeMillis());
System.out.println("callback result is :"+ret);
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
t.setDaemon(true);
t.start();
System.out.println("xxx invoke complete");
}
private List<IDemoCallback> callbacks = new ArrayList<IDemoCallback>();
public int getCallbackCount(){
return callbacks.size();
}
public void xxx2(IDemoCallback callback){
if (!callbacks.contains(callback)){
callbacks.add(callback);
}
startThread();
}
private volatile Thread t = null;
private volatile Lock lock = new ReentrantLock();
private void startThread(){
if (t == null || callbacks.size() == 0){
try{
lock.lock();
t = new Thread(new Runnable() {
public void run() {
while(callbacks.size()>0){
try {
List<IDemoCallback> callbacksCopy = new ArrayList<IDemoCallback>(callbacks);
for(IDemoCallback callback : callbacksCopy){
try{
callback.yyy("this is callback msg,current time is :"+ System.currentTimeMillis());
}catch (Exception e) {
e.printStackTrace();
callbacks.remove(callback);
}
}
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
t.setDaemon(true);
t.start();
}finally{
lock.unlock();
}
}
}
public void unxxx2(IDemoCallback callback) {
if (!callbacks.contains(callback)){
throw new IllegalStateException("callback instance not found");
}
callbacks.remove(callback);
}
}
// ============================华丽的分割线================================================
IDemoService demoProxy = null;
@Test
public void TestCallbackNormal() throws Exception {
initOrResetUrl(1, 10000000); initOrResetService() ;
final AtomicInteger count = new AtomicInteger(0);
demoProxy.xxx(new IDemoCallback() {
public String yyy(String msg) {
System.out.println("Recived callback: " + msg);
count.incrementAndGet();
return "ok";
}
},"other custom args" , 10 , 100);
System.out.println("Async...");
// Thread.sleep(10000000);
assertCallbackCount(10,100,count);
destroyService();
}
@Test
public void TestCallbackMultiInstans() throws Exception {
initOrResetUrl(2, 1000);
initOrResetService() ;
IDemoCallback callback = new IDemoCallback(){
public String yyy(String msg) {
System.out.println("callback1:"+msg);
return "callback1 onChanged ,"+msg;
}
};
IDemoCallback callback2 = new IDemoCallback(){
public String yyy(String msg) {
System.out.println("callback2:"+msg);
return "callback2 onChanged ,"+msg;
}
};
{
demoProxy.xxx2(callback);
Assert.assertEquals(1, demoProxy.getCallbackCount());
Thread.sleep(500);
demoProxy.unxxx2(callback);
Assert.assertEquals(0, demoProxy.getCallbackCount());
System.out.println("");
demoProxy.xxx2(callback2);
Assert.assertEquals(1, demoProxy.getCallbackCount());
Thread.sleep(500);
demoProxy.unxxx2(callback2);
Assert.assertEquals(0, demoProxy.getCallbackCount());
System.out.println("");
demoProxy.xxx2(callback);
Thread.sleep(500);
Assert.assertEquals(1, demoProxy.getCallbackCount());
demoProxy.unxxx2(callback);
Assert.assertEquals(0, demoProxy.getCallbackCount());
}
{
demoProxy.xxx2(callback);
Assert.assertEquals(1, demoProxy.getCallbackCount());
demoProxy.xxx2(callback);
Assert.assertEquals(1, demoProxy.getCallbackCount());
demoProxy.xxx2(callback2);
Assert.assertEquals(2, demoProxy.getCallbackCount());
}
destroyService();
}
// @Ignore
@Test(expected=RpcException.class)
public void TestCallbackDownStreamCodec() throws Exception {
initOrResetBadUrl(); initOrResetService() ;
final AtomicInteger count = new AtomicInteger(0);
demoProxy.xxx(new IDemoCallback() {
public String yyy(String msg) {
System.out.println("Recived callback: " + msg);
count.incrementAndGet();
return "ok";
}
},"other custom args" , 10 , 100);
System.out.println("Async...");
assertCallbackCount(10,100,count);
destroyService();
}
@Test(expected = RpcException.class)
public void TestCallbackConsumerLimit() throws Exception {
initOrResetUrl(1, 1000);
//api的方式 url 无法自动从服务端传递到客户端,需要手动制定
initOrResetService() ;
final AtomicInteger count = new AtomicInteger(0);
demoProxy.xxx(new IDemoCallback() {
public String yyy(String msg) {
System.out.println("Recived callback: " + msg);
count.incrementAndGet();
return "ok";
}
},"other custom args" , 10 , 100);
demoProxy.xxx(new IDemoCallback() {
public String yyy(String msg) {
System.out.println("Recived callback: " + msg);
count.incrementAndGet();
return "ok";
}
},"other custom args" , 10 , 100);
destroyService();
}
@Test(expected = RpcException.class)
public void TestCallbackProviderLimit() throws Exception {
initOrResetUrl(1, 1000);
//api的方式 url 无法自动从服务端传递到客户端,需要手动制定
serviceURL = serviceURL.addParameter(Constants.CALLBACK_INSTANCES_LIMIT_KEY, 1+"");
initOrResetService() ;
final AtomicInteger count = new AtomicInteger(0);
demoProxy.xxx(new IDemoCallback() {
public String yyy(String msg) {
System.out.println("Recived callback: " + msg);
count.incrementAndGet();
return "ok";
}
},"other custom args" , 10 , 100);
demoProxy.xxx(new IDemoCallback() {
public String yyy(String msg) {
System.out.println("Recived callback: " + msg);
count.incrementAndGet();
return "ok";
}
},"other custom args" , 10 , 100);
destroyService();
}
private void assertCallbackCount(int runs, int sleep, AtomicInteger count) throws InterruptedException{
int last = count.get();
for(int i=0;i< runs ;i++){
if (last > runs) break;
Thread.sleep(sleep * 2);
System.out.println(count.get() + " " + last);
Assert.assertTrue(count.get() > last);
last = count.get();
}
//有一次同步调用callback
Assert.assertEquals(runs+1, count.get());
}
@Ignore //使用不同进程启动
@Test
public void startProvider() throws Exception {
exportService();
synchronized (ExplicitCallbackTest.class) {
ExplicitCallbackTest.class.wait();
}
}
}