blob: 44a1792ca283fac13f991bf3207b979e7df33914 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.impl.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.remoting.config.RemotingClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.internal.RemotingUtil.extractRemoteAddress;
public class ClientChannelManager {
protected static final Logger LOG = LoggerFactory.getLogger(ClientChannelManager.class);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
final ConcurrentHashMap<String, RemotingChannelFuture> channelTables = new ConcurrentHashMap<>();
private final Lock lockChannelTables = new ReentrantLock();
private final Bootstrap clientBootstrap;
private final RemotingClientConfig clientConfig;
ClientChannelManager(final Bootstrap bootstrap,
final RemotingClientConfig config) {
clientBootstrap = bootstrap;
clientConfig = config;
}
void clear() {
for (RemotingChannelFuture cw : this.channelTables.values()) {
this.closeChannel(null, cw.getChannel());
}
this.channelTables.clear();
}
Channel createIfAbsent(final String addr) {
RemotingChannelFuture cw = this.channelTables.get(addr);
if (cw != null && cw.isActive()) {
return cw.getChannel();
}
return this.createChannel(addr);
}
private Channel createChannel(final String addr) {
RemotingChannelFuture cw = null;
try {
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isActive()) {
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
String[] s = addr.split(":");
SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new RemotingChannelFuture(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
LOG.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException ignore) {
}
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.clientConfig.getConnectTimeoutMillis())) {
if (cw.isActive()) {
LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
this.closeChannel(addr, cw.getChannel());
}
} else {
LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
this.closeChannel(addr, cw.getChannel());
}
}
return null;
}
void closeChannel(final String addr, final Channel channel) {
final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr;
try {
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean removeItemFromTable = true;
RemotingChannelFuture prevCW = this.channelTables.get(addrRemote);
//Workaround for null
if (null == prevCW) {
return;
}
LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW);
if (prevCW.getChannel() != channel) {
LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel);
removeItemFromTable = false;
}
if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
LOG.info("Channel {} has been removed !", addrRemote);
}
channel.close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
LOG.warn("Close channel {} {}", channel, future.isSuccess());
}
});
} catch (Exception e) {
LOG.error("Close channel error !", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
LOG.error("Close channel error !", e);
}
}
void closeChannel(final Channel channel) {
try {
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean removeItemFromTable = true;
RemotingChannelFuture prevCW = null;
String addrRemote = null;
for (Map.Entry<String, RemotingChannelFuture> entry : channelTables.entrySet()) {
RemotingChannelFuture prev = entry.getValue();
if (prev.getChannel() != null) {
if (prev.getChannel() == channel) {
prevCW = prev;
addrRemote = entry.getKey();
break;
}
}
}
if (null == prevCW) {
LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
removeItemFromTable = false;
}
if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
//RemotingHelper.closeChannel(channel);
}
} catch (Exception e) {
LOG.error("closeChannel: close the channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
LOG.error("closeChannel exception", e);
}
}
private class RemotingChannelFuture {
private final ChannelFuture channelFuture;
RemotingChannelFuture(ChannelFuture channelFuture) {
this.channelFuture = channelFuture;
}
boolean isActive() {
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
}
boolean isWriteable() {
return this.channelFuture.channel().isWritable();
}
private Channel getChannel() {
return this.channelFuture.channel();
}
ChannelFuture getChannelFuture() {
return channelFuture;
}
}
}