blob: c29a2141d5832a0d83957b5ed465bb5f20618f24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import javax.security.auth.login.LoginException;
import javax.security.sasl.SaslException;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.AsyncCallback.ACLCallback;
import org.apache.zookeeper.AsyncCallback.AllChildrenNumberCallback;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.Create2Callback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.EphemeralsCallback;
import org.apache.zookeeper.AsyncCallback.MultiCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.OpResult.ErrorResult;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.ZooKeeper.WatchRegistration;
import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.proto.AuthPacket;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.Create2Response;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetACLResponse;
import org.apache.zookeeper.proto.GetAllChildrenNumberResponse;
import org.apache.zookeeper.proto.GetChildren2Response;
import org.apache.zookeeper.proto.GetChildrenResponse;
import org.apache.zookeeper.proto.GetDataResponse;
import org.apache.zookeeper.proto.GetEphemeralsResponse;
import org.apache.zookeeper.proto.GetSASLRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.proto.SetWatches;
import org.apache.zookeeper.proto.SetWatches2;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.ZooTrace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
/**
* This class manages the socket i/o for the client. ClientCnxn maintains a list
* of available servers to connect to and "transparently" switches servers it is
* connected to as needed.
*
*/
@SuppressFBWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
public class ClientCnxn {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);
/* ZOOKEEPER-706: If a session has a large number of watches set then
* attempting to re-establish those watches after a connection loss may
* fail due to the SetWatches request exceeding the server's configured
* jute.maxBuffer value. To avoid this we instead split the watch
* re-establishement across multiple SetWatches calls. This constant
* controls the size of each call. It is set to 128kB to be conservative
* with respect to the server's 1MB default for jute.maxBuffer.
*/
private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024;
/* predefined xid's values recognized as special by the server */
// -1 means notification(WATCHER_EVENT)
public static final int NOTIFICATION_XID = -1;
// -2 is the xid for pings
public static final int PING_XID = -2;
// -4 is the xid for AuthPacket
public static final int AUTHPACKET_XID = -4;
// -8 is the xid for setWatch
public static final int SET_WATCHES_XID = -8;
static class AuthData {
AuthData(String scheme, byte[] data) {
this.scheme = scheme;
this.data = data;
}
String scheme;
byte[] data;
}
private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<>();
/**
* These are the packets that have been sent and are waiting for a response.
*/
private final Queue<Packet> pendingQueue = new ArrayDeque<>();
/**
* These are the packets that need to be sent.
*/
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<>();
private int connectTimeout;
/**
* The timeout in ms the client negotiated with the server. This is the
* "real" timeout, not the timeout request by the client (which may have
* been increased/decreased by the server which applies bounds to this
* value.
*/
private volatile int negotiatedSessionTimeout;
private int readTimeout;
private final int sessionTimeout;
private final ZKWatchManager watchManager;
private long sessionId;
private byte[] sessionPasswd;
/**
* If true, the connection is allowed to go to r-o mode. This field's value
* is sent, besides other data, during session creation handshake. If the
* server on the other side of the wire is partitioned it'll accept
* read-only clients only.
*/
private boolean readOnly;
final String chrootPath;
final SendThread sendThread;
final EventThread eventThread;
/**
* Set to true when close is called. Latches the connection such that we
* don't attempt to re-connect to the server if in the middle of closing the
* connection (client sends session disconnect to server as part of close
* operation)
*/
private volatile boolean closing = false;
/**
* A set of ZooKeeper hosts this client could connect to.
*/
private final HostProvider hostProvider;
/**
* Is set to true when a connection to a r/w server is established for the
* first time; never changed afterwards.
* <p>
* Is used to handle situations when client without sessionId connects to a
* read-only server. Such client receives "fake" sessionId from read-only
* server, but this sessionId is invalid for other servers. So when such
* client finds a r/w server, it sends 0 instead of fake sessionId during
* connection handshake and establishes new, valid session.
* <p>
* If this field is false (which implies we haven't seen r/w server before)
* then non-zero sessionId is fake, otherwise it is valid.
*/
volatile boolean seenRwServerBefore = false;
private final ZKClientConfig clientConfig;
/**
* If any request's response in not received in configured requestTimeout
* then it is assumed that the response packet is lost.
*/
private long requestTimeout;
ZKWatchManager getWatcherManager() {
return watchManager;
}
public long getSessionId() {
return sessionId;
}
public byte[] getSessionPasswd() {
return sessionPasswd;
}
public int getSessionTimeout() {
return negotiatedSessionTimeout;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
sb.append("sessionid:0x").append(Long.toHexString(getSessionId()))
.append(" local:").append(local)
.append(" remoteserver:").append(remote)
.append(" lastZxid:").append(lastZxid)
.append(" xid:").append(xid)
.append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
.append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
.append(" queuedpkts:").append(outgoingQueue.size())
.append(" pendingresp:").append(pendingQueue.size())
.append(" queuedevents:").append(eventThread.waitingEvents.size());
return sb.toString();
}
/**
* This class allows us to pass the headers and the relevant records around.
*/
static class Packet {
RequestHeader requestHeader;
ReplyHeader replyHeader;
Record request;
Record response;
ByteBuffer bb;
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
boolean finished;
AsyncCallback cb;
Object ctx;
WatchRegistration watchRegistration;
WatchDeregistration watchDeregistration;
/** Convenience ctor */
Packet(
RequestHeader requestHeader,
ReplyHeader replyHeader,
Record request,
Record response,
WatchRegistration watchRegistration
) {
this.requestHeader = requestHeader;
this.replyHeader = replyHeader;
this.request = request;
this.response = response;
this.watchRegistration = watchRegistration;
}
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("clientPath:" + clientPath);
sb.append(" serverPath:" + serverPath);
sb.append(" finished:" + finished);
sb.append(" header:: " + requestHeader);
sb.append(" replyHeader:: " + replyHeader);
sb.append(" request:: " + request);
sb.append(" response:: " + response);
// jute toString is horrible, remove unnecessary newlines
return sb.toString().replaceAll("\r*\n+", " ");
}
}
/**
* Creates a connection object. The actual network connect doesn't get
* established until needed. The start() instance method must be called
* subsequent to construction.
*
* @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider the list of ZooKeeper servers to connect to
* @param sessionTimeout the timeout for connections.
* @param clientConfig the client configuration.
* @param defaultWatcher default watcher for this connection
* @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
* @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
*/
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly
) throws IOException {
this(
chrootPath,
hostProvider,
sessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
0,
new byte[16],
canBeReadOnly);
}
/**
* Creates a connection object. The actual network connect doesn't get
* established until needed. The start() instance method must be called
* subsequent to construction.
*
* @param chrootPath the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider the list of ZooKeeper servers to connect to
* @param sessionTimeout the timeout for connections.
* @param clientConfig the client configuration.
* @param defaultWatcher default watcher for this connection
* @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
* @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
* @throws IOException in cases of broken network
*/
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly
) throws IOException {
this.chrootPath = chrootPath;
this.hostProvider = hostProvider;
this.sessionTimeout = sessionTimeout;
this.clientConfig = clientConfig;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.readOnly = canBeReadOnly;
this.watchManager = new ZKWatchManager(
clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
defaultWatcher);
this.connectTimeout = sessionTimeout / hostProvider.size();
this.readTimeout = sessionTimeout * 2 / 3;
this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
initRequestTimeout();
}
public void start() {
sendThread.start();
eventThread.start();
}
private Object eventOfDeath = new Object();
private static class WatcherSetEventPair {
private final Set<Watcher> watchers;
private final WatchedEvent event;
public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
this.watchers = watchers;
this.event = event;
}
}
/**
* Guard against creating "-EventThread-EventThread-EventThread-..." thread
* names when ZooKeeper object is being created from within a watcher.
* See ZOOKEEPER-795 for details.
*/
private static String makeThreadName(String suffix) {
String name = Thread.currentThread().getName().replaceAll("-EventThread", "");
return name + suffix;
}
/**
* Tests that current thread is the main event loop.
* This method is useful only for tests inside ZooKeeper project
* it is not a public API intended for use by external applications.
* @return true if Thread.currentThread() is an EventThread.
*/
public static boolean isInEventThread() {
return Thread.currentThread() instanceof EventThread;
}
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<>();
/** This is really the queued session state until the event
* thread actually processes the event and hands it to the watcher.
* But for all intents and purposes this is the state.
*/
private volatile KeeperState sessionState = KeeperState.Disconnected;
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true);
}
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<>(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
public void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
waitingEvents.add(new LocalCallback(cb, rc, path, ctx));
}
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) {
waitingEvents.add(packet);
} else {
processEvent(packet);
}
}
} else {
waitingEvents.add(packet);
}
}
public void queueEventOfDeath() {
waitingEvents.add(eventOfDeath);
}
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled) {
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher.", t);
}
}
} else if (event instanceof LocalCallback) {
LocalCallback lcb = (LocalCallback) event;
if (lcb.cb instanceof StatCallback) {
((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof DataCallback) {
((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof ACLCallback) {
((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof ChildrenCallback) {
((ChildrenCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof Children2Callback) {
((Children2Callback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof StringCallback) {
((StringCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof AsyncCallback.EphemeralsCallback) {
((AsyncCallback.EphemeralsCallback) lcb.cb).processResult(lcb.rc, lcb.ctx, null);
} else if (lcb.cb instanceof AsyncCallback.AllChildrenNumberCallback) {
((AsyncCallback.AllChildrenNumberCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, -1);
} else if (lcb.cb instanceof AsyncCallback.MultiCallback) {
((AsyncCallback.MultiCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, Collections.emptyList());
} else {
((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx);
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == Code.OK.intValue()) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response).getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response).getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response).getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == Code.OK.intValue()) {
cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == Code.OK.intValue()) {
cb.processResult(rc, clientPath, p.ctx, rsp.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == Code.OK.intValue()) {
cb.processResult(rc, clientPath, p.ctx, rsp.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetAllChildrenNumberResponse) {
AllChildrenNumberCallback cb = (AllChildrenNumberCallback) p.cb;
GetAllChildrenNumberResponse rsp = (GetAllChildrenNumberResponse) p.response;
if (rc == Code.OK.intValue()) {
cb.processResult(rc, clientPath, p.ctx, rsp.getTotalNumber());
} else {
cb.processResult(rc, clientPath, p.ctx, -1);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == Code.OK.intValue()) {
cb.processResult(rc, clientPath, p.ctx, rsp.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == Code.OK.intValue()) {
cb.processResult(
rc,
clientPath,
p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath().substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof Create2Response) {
Create2Callback cb = (Create2Callback) p.cb;
Create2Response rsp = (Create2Response) p.response;
if (rc == Code.OK.intValue()) {
cb.processResult(
rc,
clientPath,
p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath().substring(chrootPath.length())),
rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
if (rc == Code.OK.intValue()) {
List<OpResult> results = rsp.getResultList();
int newRc = rc;
for (OpResult result : results) {
if (result instanceof ErrorResult
&& KeeperException.Code.OK.intValue()
!= (newRc = ((ErrorResult) result).getErr())) {
break;
}
}
cb.processResult(newRc, clientPath, p.ctx, results);
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetEphemeralsResponse) {
EphemeralsCallback cb = (EphemeralsCallback) p.cb;
GetEphemeralsResponse rsp = (GetEphemeralsResponse) p.response;
if (rc == Code.OK.intValue()) {
cb.processResult(rc, p.ctx, rsp.getEphemerals());
} else {
cb.processResult(rc, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Unexpected throwable", t);
}
}
}
// @VisibleForTesting
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
// Add all the removed watch events to the event queue, so that the
// clients will be notified with 'Data/Child WatchRemoved' event type.
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err, watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
void queueEvent(String clientPath, int err, Set<Watcher> materializedWatchers, EventType eventType) {
KeeperState sessionState = KeeperState.SyncConnected;
if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
|| KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
sessionState = Event.KeeperState.Disconnected;
}
WatchedEvent event = new WatchedEvent(eventType, sessionState, clientPath);
eventThread.queueEvent(event, materializedWatchers);
}
void queueCallback(AsyncCallback cb, int rc, String path, Object ctx) {
eventThread.queueCallback(cb, rc, path, ctx);
}
// for test only
protected void onConnecting(InetSocketAddress addr) {
}
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
}
finishPacket(p);
}
private volatile long lastZxid;
public long getLastZxid() {
return lastZxid;
}
static class EndOfStreamException extends IOException {
private static final long serialVersionUID = -5438877188796231422L;
public EndOfStreamException(String msg) {
super(msg);
}
@Override
public String toString() {
return "EndOfStreamException: " + getMessage();
}
}
private static class SessionTimeoutException extends IOException {
private static final long serialVersionUID = 824482094072071178L;
public SessionTimeoutException(String msg) {
super(msg);
}
}
private static class SessionExpiredException extends IOException {
private static final long serialVersionUID = -1388816932076193249L;
public SessionExpiredException(String msg) {
super(msg);
}
}
private static class RWServerFoundException extends IOException {
private static final long serialVersionUID = 90431199887158758L;
public RWServerFoundException(String msg) {
super(msg);
}
}
/**
* This class services the outgoing request queue and generates the heart
* beats. It also spawns the ReadThread.
*/
class SendThread extends ZooKeeperThread {
private long lastPingSentNs;
private final ClientCnxnSocket clientCnxnSocket;
private boolean isFirstConnect = true;
private volatile ZooKeeperSaslClient zooKeeperSaslClient;
private String stripChroot(String serverPath) {
if (serverPath.startsWith(chrootPath)) {
if (serverPath.length() == chrootPath.length()) {
return "/";
}
return serverPath.substring(chrootPath.length());
} else if (serverPath.startsWith(ZooDefs.ZOOKEEPER_NODE_SUBTREE)) {
return serverPath;
}
LOG.warn("Got server path {} which is not descendant of chroot path {}.", serverPath, chrootPath);
return serverPath;
}
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
case PING_XID:
LOG.debug("Got ping response for session id: 0x{} after {}ms.",
Long.toHexString(sessionId),
((System.nanoTime() - lastPingSentNs) / 1000000));
return;
case AUTHPACKET_XID:
LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
changeZkState(States.AUTH_FAILED);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
}
return;
case NOTIFICATION_XID:
LOG.debug("Got notification session id: 0x{}",
Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
String clientPath = stripChroot(serverPath);
event.setPath(clientPath);
}
WatchedEvent we = new WatchedEvent(event);
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
default:
break;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()
+ " with err " + replyHdr.getErr()
+ " expected Xid " + packet.requestHeader.getXid()
+ " for a packet with details: " + packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
} finally {
finishPacket(packet);
}
}
SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
super(makeThreadName("-SendThread()"));
changeZkState(States.CONNECTING);
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
// TODO: can not name this method getState since Thread.getState()
// already exists
// It would be cleaner to make class SendThread an implementation of
// Runnable
/**
* Used by ClientCnxnSocket
*
* @return
*/
synchronized ZooKeeper.States getZkState() {
return state;
}
synchronized void changeZkState(ZooKeeper.States newState) throws IOException {
if (!state.isAlive() && newState == States.CONNECTING) {
throw new IOException(
"Connection has already been closed and reconnection is not allowed");
}
// It's safer to place state modification at the end.
state = newState;
}
ClientCnxnSocket getClientCnxnSocket() {
return clientCnxnSocket;
}
/**
* Setup session, previous watches, authentication.
*/
void primeConnection() throws IOException {
LOG.info(
"Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd, readOnly);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
List<String> dataWatches = watchManager.getDataWatchList();
List<String> existWatches = watchManager.getExistWatchList();
List<String> childWatches = watchManager.getChildWatchList();
List<String> persistentWatches = watchManager.getPersistentWatchList();
List<String> persistentRecursiveWatches = watchManager.getPersistentRecursiveWatchList();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
|| !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
|| persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<>();
List<String> existWatchesBatch = new ArrayList<>();
List<String> childWatchesBatch = new ArrayList<>();
List<String> persistentWatchesBatch = new ArrayList<>();
List<String> persistentRecursiveWatchesBatch = new ArrayList<>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else if (persistentWatchesIter.hasNext()) {
watch = persistentWatchesIter.next();
persistentWatchesBatch.add(watch);
} else if (persistentRecursiveWatchesIter.hasNext()) {
watch = persistentRecursiveWatchesIter.next();
persistentRecursiveWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
Record record;
int opcode;
if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
// maintain compatibility with older servers - if no persistent/recursive watchers
// are used, use the old version of SetWatches
record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
opcode = OpCode.setWatches;
} else {
record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
opcode = OpCode.setWatches2;
}
RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(
new Packet(
new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
null,
new AuthPacket(0, id.scheme, id.data),
null,
null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null));
clientCnxnSocket.connectionPrimed();
LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}
private List<String> prependChroot(List<String> paths) {
if (chrootPath != null && !paths.isEmpty()) {
for (int i = 0; i < paths.size(); ++i) {
String clientPath = paths.get(i);
String serverPath;
// handle clientPath = "/"
if (clientPath.length() == 1) {
serverPath = chrootPath;
} else {
serverPath = chrootPath + clientPath;
}
paths.set(i, serverPath);
}
}
return paths;
}
private void sendPing() {
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
queuePacket(h, null, null, null, null, null, null, null, null);
}
private InetSocketAddress rwServerAddress = null;
private static final int minPingRwTimeout = 100;
private static final int maxPingRwTimeout = 60000;
private int pingRwTimeout = minPingRwTimeout;
// Set to true if and only if constructor of ZooKeeperSaslClient
// throws a LoginException: see startConnect() below.
private boolean saslLoginFailed = false;
private void startConnect(InetSocketAddress addr) throws IOException {
// initializing it for new connection
saslLoginFailed = false;
if (!isFirstConnect) {
try {
Thread.sleep(ThreadLocalRandom.current().nextLong(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
changeZkState(States.CONNECTING);
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
if (clientConfig.isSaslClientEnabled()) {
try {
if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
}
zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig);
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn(
"SASL configuration failed. "
+ "Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.", e);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
clientCnxnSocket.connect(addr);
}
private void logStartConnect(InetSocketAddress addr) {
LOG.info("Opening socket connection to server {}.", addr);
if (zooKeeperSaslClient != null) {
LOG.info("SASL config status: {}", zooKeeperSaslClient.getConfigStatus());
}
}
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
onConnecting(serverAddress);
startConnect(serverAddress);
// Update now to start the connection timer right after we make a connection attempt
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend()
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
// closing so this is expected
if (LOG.isDebugEnabled()) {
LOG.debug(
"An exception was thrown while closing send thread for session 0x{}.",
Long.toHexString(getSessionId()), e);
}
break;
} else {
LOG.warn(
"Session 0x{} for server {}, Closing socket connection. "
+ "Attempting reconnect except it is a SessionExpiredException.",
Long.toHexString(getSessionId()),
serverAddress,
e);
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
}
synchronized (outgoingQueue) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
}
ZooTrace.logTraceMessage(
LOG,
ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
private void cleanAndNotifyState() {
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
private void pingRwServer() throws RWServerFoundException {
String result = null;
InetSocketAddress addr = hostProvider.next(0);
LOG.info("Checking server {} for being r/w. Timeout {}", addr, pingRwTimeout);
Socket sock = null;
BufferedReader br = null;
try {
sock = new Socket(addr.getHostString(), addr.getPort());
sock.setSoLinger(false, -1);
sock.setSoTimeout(1000);
sock.setTcpNoDelay(true);
sock.getOutputStream().write("isro".getBytes());
sock.getOutputStream().flush();
sock.shutdownOutput();
br = new BufferedReader(new InputStreamReader(sock.getInputStream()));
result = br.readLine();
} catch (ConnectException e) {
// ignore, this just means server is not up
} catch (IOException e) {
// some unexpected error, warn about it
LOG.warn("Exception while seeking for r/w server.", e);
} finally {
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
if (br != null) {
try {
br.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
}
if ("rw".equals(result)) {
pingRwTimeout = minPingRwTimeout;
// save the found address so that it's used during the next
// connection attempt
rwServerAddress = addr;
throw new RWServerFoundException("Majority server found at "
+ addr.getHostString() + ":" + addr.getPort());
}
}
private void cleanup() {
clientCnxnSocket.cleanup();
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
conLossPacket(p);
}
pendingQueue.clear();
}
// We can't call outgoingQueue.clear() here because
// between iterating and clear up there might be new
// packets added in queuePacket().
Iterator<Packet> iter = outgoingQueue.iterator();
while (iter.hasNext()) {
Packet p = iter.next();
conLossPacket(p);
iter.remove();
}
}
/**
* Callback invoked by the ClientCnxnSocket once a connection has been
* established.
*/
void onConnected(
int _negotiatedSessionTimeout,
long _sessionId,
byte[] _sessionPasswd,
boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
changeZkState(States.CLOSED);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo = String.format(
"Unable to reconnect to ZooKeeper service, session 0x%s has expired",
Long.toHexString(sessionId));
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
seenRwServerBefore |= !isRO;
LOG.info(
"Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
clientCnxnSocket.getRemoteSocketAddress(),
Long.toHexString(sessionId),
negotiatedSessionTimeout,
(isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
}
void close() {
try {
changeZkState(States.CLOSED);
} catch (IOException e) {
LOG.warn("Connection close fails when migrates state from {} to CLOSED",
getZkState());
}
clientCnxnSocket.onClosing();
}
void testableCloseSocket() throws IOException {
clientCnxnSocket.testableCloseSocket();
}
public boolean tunnelAuthInProgress() {
// 1. SASL client is disabled.
if (!clientConfig.isSaslClientEnabled()) {
return false;
}
// 2. SASL login failed.
if (saslLoginFailed) {
return false;
}
// 3. SendThread has not created the authenticating object yet,
// therefore authentication is (at the earliest stage of being) in progress.
if (zooKeeperSaslClient == null) {
return true;
}
// 4. authenticating object exists, so ask it for its progress.
return zooKeeperSaslClient.clientTunneledAuthenticationInProgress();
}
public void sendPacket(Packet p) throws IOException {
clientCnxnSocket.sendPacket(p);
}
public ZooKeeperSaslClient getZooKeeperSaslClient() {
return zooKeeperSaslClient;
}
}
/**
* Shutdown the send/event threads. This method should not be called
* directly - rather it should be called as part of close operation. This
* method is primarily here to allow the tests to verify disconnection
* behavior.
*/
public void disconnect() {
LOG.debug("Disconnecting client for session: 0x{}", Long.toHexString(getSessionId()));
sendThread.close();
try {
sendThread.join();
} catch (InterruptedException ex) {
LOG.warn("Got interrupted while waiting for the sender thread to close", ex);
}
eventThread.queueEventOfDeath();
}
/**
* Close the connection, which includes; send session disconnect to the
* server, shutdown the send/event threads.
*
* @throws IOException
*/
public void close() throws IOException {
LOG.debug("Closing client for session: 0x{}", Long.toHexString(getSessionId()));
try {
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.closeSession);
submitRequest(h, null, null, null);
} catch (InterruptedException e) {
// ignore, close the send/event threads
} finally {
disconnect();
}
}
// @VisibleForTesting
protected int xid = 1;
// @VisibleForTesting
volatile States state = States.NOT_CONNECTED;
/*
* getXid() is called externally by ClientCnxnNIO::doIO() when packets are sent from the outgoingQueue to
* the server. Thus, getXid() must be public.
*/
public synchronized int getXid() {
// Avoid negative cxid values. In particular, cxid values of -4, -2, and -1 are special and
// must not be used for requests -- see SendThread.readResponse.
// Skip from MAX to 1.
if (xid == Integer.MAX_VALUE) {
xid = 1;
}
return xid++;
}
public ReplyHeader submitRequest(
RequestHeader h,
Record request,
Record response,
WatchRegistration watchRegistration) throws InterruptedException {
return submitRequest(h, request, response, watchRegistration, null);
}
public ReplyHeader submitRequest(
RequestHeader h,
Record request,
Record response,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(
h,
r,
request,
response,
null,
null,
null,
null,
watchRegistration,
watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
packet.wait();
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
/**
* Wait for request completion with timeout.
*/
private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException {
long waitStartTime = Time.currentElapsedTime();
while (!packet.finished) {
packet.wait(requestTimeout);
if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) {
LOG.error("Timeout error occurred for the packet '{}'.", packet);
r.setErr(Code.REQUESTTIMEOUT.intValue());
break;
}
}
}
public void saslCompleted() {
sendThread.getClientCnxnSocket().saslCompleted();
}
public void sendPacket(Record request, Record response, AsyncCallback cb, int opCode) throws IOException {
// Generate Xid now because it will be sent immediately,
// by call to sendThread.sendPacket() below.
int xid = getXid();
RequestHeader h = new RequestHeader();
h.setXid(xid);
h.setType(opCode);
ReplyHeader r = new ReplyHeader();
r.setXid(xid);
Packet p = new Packet(h, r, request, response, null);
p.cb = cb;
sendThread.sendPacket(p);
}
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Record request,
Record response,
AsyncCallback cb,
String clientPath,
String serverPath,
Object ctx,
WatchRegistration watchRegistration) {
return queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, watchRegistration, null);
}
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Record request,
Record response,
AsyncCallback cb,
String clientPath,
String serverPath,
Object ctx,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (outgoingQueue) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
public void addAuthInfo(String scheme, byte[] auth) {
if (!state.isAlive()) {
return;
}
authInfo.add(new AuthData(scheme, auth));
queuePacket(
new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
null,
new AuthPacket(0, scheme, auth),
null,
null,
null,
null,
null,
null);
}
States getState() {
return state;
}
private static class LocalCallback {
private final AsyncCallback cb;
private final int rc;
private final String path;
private final Object ctx;
public LocalCallback(AsyncCallback cb, int rc, String path, Object ctx) {
this.cb = cb;
this.rc = rc;
this.path = path;
this.ctx = ctx;
}
}
private void initRequestTimeout() {
try {
requestTimeout = clientConfig.getLong(
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT_DEFAULT);
LOG.info(
"{} value is {}. feature enabled={}",
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT,
requestTimeout,
requestTimeout > 0);
} catch (NumberFormatException e) {
LOG.error(
"Configured value {} for property {} can not be parsed to long.",
clientConfig.getProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT),
ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT);
throw e;
}
}
public ZooKeeperSaslClient getZooKeeperSaslClient() {
return sendThread.getZooKeeperSaslClient();
}
}