blob: 05f8a812e8919c7a914b705dc3f14876fcec71d9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper;
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.X509Util;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.zookeeper.common.X509Exception.SSLContextException;
/**
* ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
* It's responsible for connecting to server, reading/writing network traffic and
* being a layer between network data and higher level packets.
*/
public class ClientCnxnSocketNetty extends ClientCnxnSocket {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
ChannelFactory channelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
Channel channel;
CountDownLatch firstConnect;
ChannelFuture connectFuture;
Lock connectLock = new ReentrantLock();
AtomicBoolean disconnected = new AtomicBoolean();
AtomicBoolean needSasl = new AtomicBoolean();
Semaphore waitSasl = new Semaphore(0);
ClientCnxnSocketNetty(ZKClientConfig clientConfig) {
this.clientConfig = clientConfig;
initProperties();
}
/**
* lifecycles diagram:
* <p/>
* loop:
* - try:
* - - !isConnected()
* - - - connect()
* - - doTransport()
* - catch:
* - - cleanup()
* close()
* <p/>
* Other non-lifecycle methods are in jeopardy getting a null channel
* when calling in concurrency. We must handle it.
*/
@Override
boolean isConnected() {
// Assuming that isConnected() is only used to initiate connection,
// not used by some other connection status judgement.
return channel != null;
}
@Override
void connect(InetSocketAddress addr) throws IOException {
firstConnect = new CountDownLatch(1);
ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setPipelineFactory(new ZKClientPipelineFactory());
bootstrap.setOption("soLinger", -1);
bootstrap.setOption("tcpNoDelay", true);
connectFuture = bootstrap.connect(addr);
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// this lock guarantees that channel won't be assgined after cleanup().
connectLock.lock();
try {
if (!channelFuture.isSuccess() || connectFuture == null) {
LOG.info("future isn't success, cause: {}", channelFuture.getCause());
return;
}
// setup channel, variables, connection, etc.
channel = channelFuture.getChannel();
disconnected.set(false);
initialized = false;
lenBuffer.clear();
incomingBuffer = lenBuffer;
sendThread.primeConnection();
updateNow();
updateLastSendAndHeard();
if (sendThread.tunnelAuthInProgress()) {
waitSasl.drainPermits();
needSasl.set(true);
sendPrimePacket();
} else {
needSasl.set(false);
}
// we need to wake up on first connect to avoid timeout.
wakeupCnxn();
firstConnect.countDown();
LOG.info("channel is connected: {}", channelFuture.getChannel());
} finally {
connectLock.unlock();
}
}
});
}
@Override
void cleanup() {
connectLock.lock();
try {
if (connectFuture != null) {
connectFuture.cancel();
connectFuture = null;
}
if (channel != null) {
channel.close().awaitUninterruptibly();
channel = null;
}
} finally {
connectLock.unlock();
}
Iterator<Packet> iter = outgoingQueue.iterator();
while (iter.hasNext()) {
Packet p = iter.next();
if (p == WakeupPacket.getInstance()) {
iter.remove();
}
}
}
@Override
void close() {
channelFactory.releaseExternalResources();
}
@Override
void saslCompleted() {
needSasl.set(false);
waitSasl.release();
}
@Override
void connectionPrimed() {
}
@Override
void packetAdded() {
}
@Override
void onClosing() {
firstConnect.countDown();
wakeupCnxn();
LOG.info("channel is told closing");
}
private void wakeupCnxn() {
if (needSasl.get()) {
waitSasl.release();
}
outgoingQueue.add(WakeupPacket.getInstance());
}
@Override
void doTransport(int waitTimeOut,
List<Packet> pendingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
try {
if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
Packet head = null;
if (needSasl.get()) {
if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
} else {
if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
return;
}
}
// check if being waken up on closing.
if (!sendThread.getZkState().isAlive()) {
// adding back the patck to notify of failure in conLossPacket().
addBack(head);
return;
}
// channel disconnection happened
if (disconnected.get()) {
addBack(head);
throw new EndOfStreamException("channel for sessionid 0x"
+ Long.toHexString(sessionId)
+ " is lost");
}
if (head != null) {
doWrite(pendingQueue, head, cnxn);
}
} finally {
updateNow();
}
}
private void addBack(Packet head) {
if (head != null && head != WakeupPacket.getInstance()) {
outgoingQueue.addFirst(head);
}
}
private void sendPkt(Packet p) {
// Assuming the packet will be sent out successfully. Because if it fails,
// the channel will close and clean up queues.
p.createBB();
updateLastSend();
sentCount++;
channel.write(ChannelBuffers.wrappedBuffer(p.bb));
}
private void sendPrimePacket() {
// assuming the first packet is the priming packet.
sendPkt(outgoingQueue.remove());
}
/**
* doWrite handles writing the packets from outgoingQueue via network to server.
*/
private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
updateNow();
while (true) {
if (p != WakeupPacket.getInstance()) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
(p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
sendPkt(p);
}
if (outgoingQueue.isEmpty()) {
break;
}
p = outgoingQueue.remove();
}
}
@Override
void sendPacket(ClientCnxn.Packet p) throws IOException {
if (channel == null) {
throw new IOException("channel has been closed");
}
sendPkt(p);
}
@Override
SocketAddress getRemoteSocketAddress() {
Channel copiedChanRef = channel;
return (copiedChanRef == null) ? null : copiedChanRef.getRemoteAddress();
}
@Override
SocketAddress getLocalSocketAddress() {
Channel copiedChanRef = channel;
return (copiedChanRef == null) ? null : copiedChanRef.getLocalAddress();
}
@Override
void testableCloseSocket() throws IOException {
Channel copiedChanRef = channel;
if (copiedChanRef != null) {
copiedChanRef.disconnect().awaitUninterruptibly();
}
}
// *************** <END> CientCnxnSocketNetty </END> ******************
private static class WakeupPacket {
private static final Packet instance = new Packet(null, null, null, null, null);
protected WakeupPacket() {
// Exists only to defeat instantiation.
}
public static Packet getInstance() {
return instance;
}
}
/**
* ZKClientPipelineFactory is the netty pipeline factory for this netty
* connection implementation.
*/
private class ZKClientPipelineFactory implements ChannelPipelineFactory {
private SSLContext sslContext = null;
private SSLEngine sslEngine = null;
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) {
initSSL(pipeline);
}
pipeline.addLast("handler", new ZKClientHandler());
return pipeline;
}
// The synchronized is to prevent the race on shared variable "sslEngine".
// Basically we only need to create it once.
private synchronized void initSSL(ChannelPipeline pipeline) throws SSLContextException {
if (sslContext == null || sslEngine == null) {
sslContext = X509Util.createSSLContext(clientConfig);
sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(true);
}
pipeline.addLast("ssl", new SslHandler(sslEngine));
LOG.info("SSL handler added for channel: {}", pipeline.getChannel());
}
}
/**
* ZKClientHandler is the netty handler that sits in netty upstream last
* place. It mainly handles read traffic and helps synchronize connection state.
*/
private class ZKClientHandler extends SimpleChannelUpstreamHandler {
AtomicBoolean channelClosed = new AtomicBoolean(false);
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
LOG.info("channel is disconnected: {}", ctx.getChannel());
cleanup();
}
/**
* netty handler has encountered problems. We are cleaning it up and tell outside to close
* the channel/connection.
*/
private void cleanup() {
if (!channelClosed.compareAndSet(false, true)) {
return;
}
disconnected.set(true);
onClosing();
}
@Override
public void messageReceived(ChannelHandlerContext ctx,
MessageEvent e) throws Exception {
updateNow();
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
while (buf.readable()) {
if (incomingBuffer.remaining() > buf.readableBytes()) {
int newLimit = incomingBuffer.position()
+ buf.readableBytes();
incomingBuffer.limit(newLimit);
}
buf.readBytes(incomingBuffer);
incomingBuffer.limit(incomingBuffer.capacity());
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
readConnectResult();
lenBuffer.clear();
incomingBuffer = lenBuffer;
initialized = true;
updateLastHeard();
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
wakeupCnxn();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
ExceptionEvent e) throws Exception {
LOG.warn("Exception caught: {}", e, e.getCause());
cleanup();
}
}
}