blob: c1fdcf19dc4f063a36456d7d7e954ed8c435305a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.protocol.thrift;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.utils.ConfigUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.Transporter;
import com.alibaba.dubbo.remoting.exchange.ExchangeChannel;
import com.alibaba.dubbo.remoting.exchange.ExchangeClient;
import com.alibaba.dubbo.remoting.exchange.ExchangeHandler;
import com.alibaba.dubbo.remoting.exchange.ExchangeServer;
import com.alibaba.dubbo.remoting.exchange.Exchangers;
import com.alibaba.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.protocol.AbstractProtocol;
import com.alibaba.dubbo.rpc.protocol.dubbo.DubboExporter;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ThriftProtocol extends AbstractProtocol {
public static final int DEFAULT_PORT = 40880;
public static final String NAME = "thrift";
// ip:port -> ExchangeServer
private final ConcurrentMap<String, ExchangeServer> serverMap =
new ConcurrentHashMap<String, ExchangeServer>();
private ExchangeHandler handler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object msg) throws RemotingException {
if (msg instanceof Invocation) {
Invocation inv = (Invocation) msg;
String serviceName = inv.getAttachments().get(Constants.INTERFACE_KEY);
String serviceKey = serviceKey(channel.getLocalAddress().getPort(),
serviceName, null, null);
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel,
"Not found exported service: "
+ serviceKey
+ " in "
+ exporterMap.keySet()
+ ", may be version or group mismatch "
+ ", channel: consumer: "
+ channel.getRemoteAddress()
+ " --> provider: "
+ channel.getLocalAddress()
+ ", message:" + msg);
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return exporter.getInvoker().invoke(inv);
}
throw new RemotingException(channel,
"Unsupported request: "
+ (msg.getClass().getName() + ": " + msg)
+ ", channel: consumer: "
+ channel.getRemoteAddress()
+ " --> provider: "
+ channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
};
@Override
public int getDefaultPort() {
return DEFAULT_PORT;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// can use thrift codec only
URL url = invoker.getUrl().addParameter(Constants.CODEC_KEY, ThriftCodec.NAME);
// find server.
String key = url.getAddress();
// client can expose a service for server to invoke only.
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer && !serverMap.containsKey(key)) {
serverMap.put(key, getServer(url));
}
// export service.
key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
return exporter;
}
@Override
public void destroy() {
super.destroy();
for (String key : new ArrayList<String>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo server: " + server.getLocalAddress());
}
server.close(ConfigUtils.getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
} // ~ end of if ( server != null )
} // ~ end of loop serverMap
} // ~ end of method destroy
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
ThriftInvoker<T> invoker = new ThriftInvoker<T>(type, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 1);
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
clients[i] = initClient(url);
}
return clients;
}
private ExchangeClient initClient(URL url) {
ExchangeClient client;
url = url.addParameter(Constants.CODEC_KEY, ThriftCodec.NAME);
try {
client = Exchangers.connect(url);
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url
+ "): " + e.getMessage(), e);
}
return client;
}
private ExchangeServer getServer(URL url) {
// enable sending readonly event when server closes by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
ExchangeServer server;
try {
server = Exchangers.bind(url, handler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
}