blob: 4630829c4e805efb225c1b6e0e1d29be4f2ff559 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.jute.BinaryInputArchive;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.ZKConfig;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.proto.ConnectResponse;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A ClientCnxnSocket does the lower level communication with a socket
* implementation.
*
* This code has been moved out of ClientCnxn so that a Netty implementation can
* be provided as an alternative to the NIO socket code.
*
*/
abstract class ClientCnxnSocket {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class);
protected boolean initialized;
/**
* This buffer is only used to read the length of the incoming message.
*/
protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
/**
* After the length is read, a new incomingBuffer is allocated in
* readLength() to receive the full message.
*/
protected ByteBuffer incomingBuffer = lenBuffer;
protected long sentCount = 0;
protected long recvCount = 0;
protected long lastHeard;
protected long lastSend;
protected long now;
protected ClientCnxn.SendThread sendThread;
protected LinkedBlockingDeque<Packet> outgoingQueue;
protected ZKClientConfig clientConfig;
private int packetLen = ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT;
/**
* The sessionId is only available here for Log and Exception messages.
* Otherwise the socket doesn't need to know it.
*/
protected long sessionId;
void introduce(ClientCnxn.SendThread sendThread, long sessionId,
LinkedBlockingDeque<Packet> outgoingQueue) {
this.sendThread = sendThread;
this.sessionId = sessionId;
this.outgoingQueue = outgoingQueue;
}
void updateNow() {
now = Time.currentElapsedTime();
}
int getIdleRecv() {
return (int) (now - lastHeard);
}
int getIdleSend() {
return (int) (now - lastSend);
}
long getSentCount() {
return sentCount;
}
long getRecvCount() {
return recvCount;
}
void updateLastHeard() {
this.lastHeard = now;
}
void updateLastSend() {
this.lastSend = now;
}
void updateLastSendAndHeard() {
this.lastSend = now;
this.lastHeard = now;
}
protected void readLength() throws IOException {
int len = incomingBuffer.getInt();
if (len < 0 || len >= packetLen) {
throw new IOException("Packet len" + len + " is out of range!");
}
incomingBuffer = ByteBuffer.allocate(len);
}
void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b) + ",");
}
buf.append("]");
LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
+ buf.toString());
}
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
abstract boolean isConnected();
abstract void connect(InetSocketAddress addr) throws IOException;
/**
* Returns the address to which the socket is connected.
*/
abstract SocketAddress getRemoteSocketAddress();
/**
* Returns the address to which the socket is bound.
*/
abstract SocketAddress getLocalSocketAddress();
/**
* Clean up resources for a fresh new socket.
* It's called before reconnect or close.
*/
abstract void cleanup();
/**
* new packets are added to outgoingQueue.
*/
abstract void packetAdded();
/**
* connState is marked CLOSED and notify ClientCnxnSocket to react.
*/
abstract void onClosing();
/**
* Sasl completes. Allows non-priming packgets to be sent.
* Note that this method will only be called if Sasl starts and completes.
*/
abstract void saslCompleted();
/**
* being called after ClientCnxn finish PrimeConnection
*/
abstract void connectionPrimed();
/**
* Do transportation work:
* - read packets into incomingBuffer.
* - write outgoing queue packets.
* - update relevant timestamp.
*
* @param waitTimeOut timeout in blocking wait. Unit in MilliSecond.
* @param pendingQueue These are the packets that have been sent and
* are waiting for a response.
* @param cnxn
* @throws IOException
* @throws InterruptedException
*/
abstract void doTransport(int waitTimeOut, List<Packet> pendingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException;
/**
* Close the socket.
*/
abstract void testableCloseSocket() throws IOException;
/**
* Close this client.
*/
abstract void close();
/**
* Send Sasl packets directly.
* The Sasl process will send the first (requestHeader == null) packet,
* and then block the doTransport write,
* finally unblock it when finished.
*/
abstract void sendPacket(Packet p) throws IOException;
protected void initProperties() {
packetLen = Integer.getInteger(
clientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER),
ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug("{} is {}", ZKConfig.JUTE_MAXBUFFER, packetLen);
}
}
}