blob: 6f0ab9fe4892b39330c09b2f88006ecc4c51cc5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.transport.mina;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.transport.AbstractClient;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoFuture;
import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.ThreadModel;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Mina client.
*/
public class MinaClient extends AbstractClient {
private static final Logger logger = LoggerFactory.getLogger(MinaClient.class);
private static final Map<String, SocketConnector> connectors = new ConcurrentHashMap<String, SocketConnector>();
private String connectorKey;
private SocketConnector connector;
private volatile IoSession session; // volatile, please copy reference to use
public MinaClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
@Override
protected void doOpen() throws Throwable {
connectorKey = getUrl().toFullString();
SocketConnector c = connectors.get(connectorKey);
if (c != null) {
connector = c;
} else {
// set thread pool.
connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,
Executors.newCachedThreadPool(new NamedThreadFactory("MinaClientWorker", true)));
// config
SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
cfg.getSessionConfig().setTcpNoDelay(true);
cfg.getSessionConfig().setKeepAlive(true);
int timeout = getTimeout();
cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
// set codec.
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
connectors.put(connectorKey, connector);
}
}
@Override
protected void doConnect() throws Throwable {
ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this));
long start = System.currentTimeMillis();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
future.addListener(new IoFutureListener() {
@Override
public void operationComplete(IoFuture future) {
try {
if (future.isReady()) {
IoSession newSession = future.getSession();
try {
// Close old channel
IoSession oldSession = MinaClient.this.session; // copy reference
if (oldSession != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old mina channel " + oldSession + " on create new mina channel " + newSession);
}
oldSession.close();
} finally {
MinaChannel.removeChannelIfDisconnected(oldSession);
}
}
} finally {
if (MinaClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new mina channel " + newSession + ", because the client closed.");
}
newSession.close();
} finally {
MinaClient.this.session = null;
MinaChannel.removeChannelIfDisconnected(newSession);
}
} else {
MinaClient.this.session = newSession;
}
}
}
} catch (Exception e) {
exception.set(e);
} finally {
finish.countDown();
}
}
});
try {
finish.await(getTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout "
+ getTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start)
+ "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version "
+ Version.getVersion() + ", cause: " + e.getMessage(), e);
}
Throwable e = exception.get();
if (e != null) {
throw e;
}
}
@Override
protected void doDisConnect() throws Throwable {
try {
MinaChannel.removeChannelIfDisconnected(session);
} catch (Throwable t) {
logger.warn(t.getMessage());
}
}
@Override
protected void doClose() throws Throwable {
//release mina resouces.
}
@Override
protected Channel getChannel() {
IoSession s = session;
if (s == null || !s.isConnected())
return null;
return MinaChannel.getOrAddChannel(s, getUrl(), this);
}
}