blob: 19f1fac4536578212edb3b46703d4dc369602465 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.zip.Adler32;
import java.util.zip.CheckedInputStream;
import javax.security.sasl.SaslException;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.Environment;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.Quotas;
import org.apache.zookeeper.StatsTrack;
import org.apache.zookeeper.Version;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZookeeperBanner;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.proto.AuthPacket;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ConnectResponse;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.GetSASLRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.proto.SetSASLResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException;
import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
import org.apache.zookeeper.server.SessionTracker.Session;
import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.apache.zookeeper.server.util.OSMXBean;
import org.apache.zookeeper.server.util.QuotaMetricsUtils;
import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.apache.zookeeper.util.ServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
* following chain of RequestProcessors to process requests:
* PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
*/
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
protected static final Logger LOG;
private static final RateLogger RATE_LOGGER;
public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck";
public static final String SKIP_ACL = "zookeeper.skipACL";
public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota";
// When enabled, will check ACL constraints appertained to the requests first,
// before sending the requests to the quorum.
static boolean enableEagerACLCheck;
static final boolean skipACL;
public static final boolean enforceQuota;
public static final String SASL_SUPER_USER = "zookeeper.superUser";
public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients";
public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
private static boolean digestEnabled;
public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled";
private static boolean serializeLastProcessedZxidEnabled;
// Add a enable/disable option for now, we should remove this one when
// this feature is confirmed to be stable
public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
private static boolean closeSessionTxnEnabled = true;
private volatile CountDownLatch restoreLatch;
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
RATE_LOGGER = new RateLogger(LOG);
ZookeeperBanner.printBanner(LOG);
Environment.logEnv("Server environment:", LOG);
enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK);
LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck);
skipACL = System.getProperty(SKIP_ACL, "no").equals("yes");
if (skipACL) {
LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL);
}
enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false"));
if (enforceQuota) {
LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota);
}
digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true"));
LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
closeSessionTxnEnabled = Boolean.parseBoolean(
System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean(
System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true")));
}
// @VisibleForTesting
public static boolean isEnableEagerACLCheck() {
return enableEagerACLCheck;
}
// @VisibleForTesting
public static void setEnableEagerACLCheck(boolean enabled) {
ZooKeeperServer.enableEagerACLCheck = enabled;
LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled);
}
public static boolean isCloseSessionTxnEnabled() {
return closeSessionTxnEnabled;
}
public static void setCloseSessionTxnEnabled(boolean enabled) {
ZooKeeperServer.closeSessionTxnEnabled = enabled;
LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED,
ZooKeeperServer.closeSessionTxnEnabled);
}
protected ZooKeeperServerBean jmxServerBean;
protected DataTreeBean jmxDataTreeBean;
public static final int DEFAULT_TICK_TIME = 3000;
protected int tickTime = DEFAULT_TICK_TIME;
public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled
protected static volatile int throttledOpWaitTime =
Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME);
/** value of -1 indicates unset, use default */
protected int minSessionTimeout = -1;
/** value of -1 indicates unset, use default */
protected int maxSessionTimeout = -1;
/** Socket listen backlog. Value of -1 indicates unset */
protected int listenBacklog = -1;
protected SessionTracker sessionTracker;
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;
private ResponseCache readResponseCache;
private ResponseCache getChildrenResponseCache;
private final AtomicLong hzxid = new AtomicLong(0);
public static final Exception ok = new Exception("No prob");
protected RequestProcessor firstProcessor;
protected JvmPauseMonitor jvmPauseMonitor;
protected volatile State state = State.INITIAL;
private boolean isResponseCachingEnabled = true;
/* contains the configuration file content read at startup */
protected String initialConfig;
protected boolean reconfigEnabled;
private final RequestPathMetricsCollector requestPathMetricsCollector;
private static final int DEFAULT_SNAP_COUNT = 100000;
private static final int DEFAULT_GLOBAL_OUTSTANDING_LIMIT = 1000;
private boolean localSessionEnabled = false;
protected enum State {
INITIAL,
RUNNING,
SHUTDOWN,
ERROR
}
/**
* This is the secret that we use to generate passwords. For the moment,
* it's more of a checksum that's used in reconnection, which carries no
* security weight, and is treated internally as if it carries no
* security weight.
*/
private static final long superSecret = 0XB3415C00L;
private final AtomicInteger requestsInProcess = new AtomicInteger(0);
final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
// this data structure must be accessed under the outstandingChanges lock
final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>();
protected ServerCnxnFactory serverCnxnFactory;
protected ServerCnxnFactory secureServerCnxnFactory;
private final ServerStats serverStats;
private final ZooKeeperServerListener listener;
private ZooKeeperServerShutdownHandler zkShutdownHandler;
private volatile int createSessionTrackerServerId = 1;
private static final String FLUSH_DELAY = "zookeeper.flushDelay";
private static volatile long flushDelay;
private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime";
private static volatile long maxWriteQueuePollTime;
private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize";
private static volatile int maxBatchSize;
/**
* Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
* Flag not used for small transfers like connectResponses.
*/
public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes";
public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
public static final int intBufferStartingSizeBytes;
public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize";
public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize";
static {
long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
setFlushDelay(configuredFlushDelay);
setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3));
setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000));
intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE);
if (intBufferStartingSizeBytes < 32) {
String msg = "Buffer starting size (" + intBufferStartingSizeBytes + ") must be greater than or equal to 32. "
+ "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" ";
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes);
}
// Connection throttling
private final BlueThrottle connThrottle = new BlueThrottle();
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification =
"Internally the throttler has a BlockingQueue so "
+ "once the throttler is created and started, it is thread-safe")
private RequestThrottler requestThrottler;
public static final String SNAP_COUNT = "zookeeper.snapCount";
/**
* This setting sets a limit on the total number of large requests that
* can be inflight and is designed to prevent ZooKeeper from accepting
* too many large requests such that the JVM runs out of usable heap and
* ultimately crashes.
*
* The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)}
* method which is called by the connection layer ({@link NIOServerCnxn},
* {@link NettyServerCnxn}) before allocating a byte buffer and pulling
* data off the TCP socket. The limit is then checked again by the
* ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which
* also atomically updates {@link #currentLargeRequestBytes}. The request is
* then marked as a large request, with the request size stored in the Request
* object so that it can later be decremented from {@link #currentLargeRequestBytes}.
*
* When a request is completed or dropped, the relevant code path calls the
* {@link #requestFinished(Request)} method which performs the decrement if
* needed.
*/
private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
/**
* The size threshold after which a request is considered a large request
* and is checked against the large request byte limit.
*/
private volatile int largeRequestThreshold = -1;
private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
private final AuthenticationHelper authHelper = new AuthenticationHelper();
void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
}
/**
* Creates a ZooKeeperServer instance. Nothing is setup, use the setX
* methods to prepare the instance (eg datadir, datalogdir, ticktime,
* builder, etc...)
*
*/
public ZooKeeperServer() {
listener = new ZooKeeperServerListenerImpl(this);
serverStats = new ServerStats(this);
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
}
/**
* Keeping this constructor for backward compatibility
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
}
/**
* * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
* actually start listening for clients until run() is invoked.
*
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.txnLogFactory.setServerStats(this.serverStats);
this.zkDb = zkDb;
this.tickTime = tickTime;
setMinSessionTimeout(minSessionTimeout);
setMaxSessionTimeout(maxSessionTimeout);
this.listenBacklog = clientPortListenBacklog;
this.reconfigEnabled = reconfigEnabled;
listener = new ZooKeeperServerListenerImpl(this);
readResponseCache = new ResponseCache(Integer.getInteger(
GET_DATA_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData");
getChildrenResponseCache = new ResponseCache(Integer.getInteger(
GET_CHILDREN_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren");
this.initialConfig = initialConfig;
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
this.initLargeRequestThrottlingSettings();
LOG.info(
"Created server with"
+ " tickTime {} ms"
+ " minSessionTimeout {} ms"
+ " maxSessionTimeout {} ms"
+ " clientPortListenBacklog {}"
+ " dataLogdir {}"
+ " snapdir {}",
tickTime,
getMinSessionTimeout(),
getMaxSessionTimeout(),
getClientPortListenBacklog(),
txnLogFactory.getDataLogDir(),
txnLogFactory.getSnapDir());
}
public String getInitialConfig() {
return initialConfig;
}
/**
* Adds JvmPauseMonitor and calls
* {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)}
*
*/
public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) {
this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled());
this.jvmPauseMonitor = jvmPauseMonitor;
if (jvmPauseMonitor != null) {
LOG.info("Added JvmPauseMonitor to server");
}
}
/**
* creates a zookeeperserver instance.
* @param txnLogFactory the file transaction snapshot logging class
* @param tickTime the ticktime for the server
* @throws IOException
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) {
this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled());
}
public ServerStats serverStats() {
return serverStats;
}
public RequestPathMetricsCollector getRequestPathMetricsCollector() {
return requestPathMetricsCollector;
}
public BlueThrottle connThrottle() {
return connThrottle;
}
public void dumpConf(PrintWriter pwriter) {
pwriter.print("clientPort=");
pwriter.println(getClientPort());
pwriter.print("secureClientPort=");
pwriter.println(getSecureClientPort());
pwriter.print("dataDir=");
pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath());
pwriter.print("dataDirSize=");
pwriter.println(getDataDirSize());
pwriter.print("dataLogDir=");
pwriter.println(zkDb.snapLog.getDataLogDir().getAbsolutePath());
pwriter.print("dataLogSize=");
pwriter.println(getLogDirSize());
pwriter.print("tickTime=");
pwriter.println(getTickTime());
pwriter.print("maxClientCnxns=");
pwriter.println(getMaxClientCnxnsPerHost());
pwriter.print("minSessionTimeout=");
pwriter.println(getMinSessionTimeout());
pwriter.print("maxSessionTimeout=");
pwriter.println(getMaxSessionTimeout());
pwriter.print("clientPortListenBacklog=");
pwriter.println(getClientPortListenBacklog());
pwriter.print("serverId=");
pwriter.println(getServerId());
}
public ZooKeeperServerConf getConf() {
return new ZooKeeperServerConf(
getClientPort(),
zkDb.snapLog.getSnapDir().getAbsolutePath(),
zkDb.snapLog.getDataLogDir().getAbsolutePath(),
getTickTime(),
getMaxClientCnxnsPerHost(),
getMinSessionTimeout(),
getMaxSessionTimeout(),
getServerId(),
getClientPortListenBacklog());
}
/**
* This constructor is for backward compatibility with the existing unit
* test code.
* It defaults to FileLogProvider persistence provider.
*/
public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
this(new FileTxnSnapLog(snapDir, logDir), tickTime, "");
}
/**
* Default constructor, relies on the config for its argument values
*
* @throws IOException
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException {
this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled());
}
/**
* get the zookeeper database for this server
* @return the zookeeper database for this server
*/
public ZKDatabase getZKDatabase() {
return this.zkDb;
}
/**
* set the zkdatabase for this zookeeper server
* @param zkDb
*/
public void setZKDatabase(ZKDatabase zkDb) {
this.zkDb = zkDb;
}
/**
* Restore sessions and data
*/
public void loadData() throws IOException, InterruptedException {
/*
* When a new leader starts executing Leader#lead, it
* invokes this method. The database, however, has been
* initialized before running leader election so that
* the server could pick its zxid for its initial vote.
* It does it by invoking QuorumPeer#getLastLoggedZxid.
* Consequently, we don't need to initialize it once more
* and avoid the penalty of loading it a second time. Not
* reloading it is particularly important for applications
* that host a large database.
*
* The following if block checks whether the database has
* been initialized or not. Note that this method is
* invoked by at least one other method:
* ZooKeeperServer#startdata.
*
* See ZOOKEEPER-1642 for more detail.
*/
if (zkDb.isInitialized()) {
setZxid(zkDb.getDataTreeLastProcessedZxid());
} else {
setZxid(zkDb.loadDataBase());
}
// Clean up dead sessions
zkDb.getSessions().stream()
.filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null)
.forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid()));
// Make a clean snapshot
takeSnapshot();
}
public File takeSnapshot() throws IOException {
return takeSnapshot(false);
}
public File takeSnapshot(boolean syncSnap) throws IOException {
return takeSnapshot(syncSnap, true, false);
}
/**
* Takes a snapshot on the server.
*
* @param syncSnap syncSnap sync the snapshot immediately after write
* @param isSevere if true system exist, otherwise throw IOException
* @param fastForwardFromEdits whether fast forward database to the latest recorded transactions
*
* @return file snapshot file object
* @throws IOException
*/
public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException {
long start = Time.currentElapsedTime();
File snapFile = null;
try {
if (fastForwardFromEdits) {
zkDb.fastForwardDataBase();
}
snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
} catch (IOException e) {
if (isSevere) {
LOG.error("Severe unrecoverable error, exiting", e);
// This is a severe error that we cannot recover from,
// so we need to exit
ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());
} else {
throw e;
}
}
long elapsed = Time.currentElapsedTime() - start;
LOG.info("Snapshot taken in {} ms", elapsed);
ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);
return snapFile;
}
/**
* Restores database from a snapshot. It is used by the restore admin server command.
*
* @param inputStream input stream of snapshot
* @Return last processed zxid
* @throws IOException
*/
public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException {
if (inputStream == null) {
throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot");
}
long start = Time.currentElapsedTime();
LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}",
getZKDatabase().getDataTreeLastProcessedZxid(),
getZKDatabase().dataTree.getNodeCount(),
getZKDatabase().getSessionCount());
// restore to a new zkDatabase
final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory);
final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32());
final InputArchive ia = BinaryInputArchive.getArchive(cis);
newZKDatabase.deserializeSnapshot(ia, cis);
LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
newZKDatabase.getDataTreeLastProcessedZxid(),
newZKDatabase.dataTree.getNodeCount(),
newZKDatabase.getSessionCount());
// create a CountDownLatch
restoreLatch = new CountDownLatch(1);
try {
// set to the new zkDatabase
setZKDatabase(newZKDatabase);
// re-create SessionTrack
createSessionTracker();
} finally {
// unblock request submission
restoreLatch.countDown();
restoreLatch = null;
}
LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
getZKDatabase().getDataTreeLastProcessedZxid(),
getZKDatabase().dataTree.getNodeCount(),
getZKDatabase().getSessionCount());
long elapsed = Time.currentElapsedTime() - start;
LOG.info("Restore taken in {} ms", elapsed);
ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed);
return getLastProcessedZxid();
}
public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
}
@Override
public long getDataDirSize() {
if (zkDb == null) {
return 0L;
}
File path = zkDb.snapLog.getSnapDir();
return getDirSize(path);
}
@Override
public long getLogDirSize() {
if (zkDb == null) {
return 0L;
}
File path = zkDb.snapLog.getDataLogDir();
return getDirSize(path);
}
private long getDirSize(File file) {
long size = 0L;
if (file.isDirectory()) {
File[] files = file.listFiles();
if (files != null) {
for (File f : files) {
size += getDirSize(f);
}
}
} else {
size = file.length();
}
return size;
}
public long getZxid() {
return hzxid.get();
}
public SessionTracker getSessionTracker() {
return sessionTracker;
}
long getNextZxid() {
return hzxid.incrementAndGet();
}
public void setZxid(long zxid) {
hzxid.set(zxid);
}
private void close(long sessionId) {
Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
submitRequest(si);
}
public void closeSession(long sessionId) {
LOG.info("Closing session 0x{}", Long.toHexString(sessionId));
// we do not want to wait for a session close. send it as soon as we
// detect it!
close(sessionId);
}
protected void killSession(long sessionId, long zxid) {
zkDb.killSession(sessionId, zxid);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId));
}
if (sessionTracker != null) {
sessionTracker.removeSession(sessionId);
}
}
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info(
"Expiring session 0x{}, timeout of {}ms exceeded",
Long.toHexString(sessionId),
session.getTimeout());
close(sessionId);
}
public void expire(long sessionId) {
LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId));
close(sessionId);
}
public static class MissingSessionException extends IOException {
private static final long serialVersionUID = 7467414635467261007L;
public MissingSessionException(String msg) {
super(msg);
}
}
void touch(ServerCnxn cnxn) throws MissingSessionException {
if (cnxn == null) {
return;
}
long id = cnxn.getSessionId();
int to = cnxn.getSessionTimeout();
if (!sessionTracker.touchSession(id, to)) {
throw new MissingSessionException("No session with sessionid 0x"
+ Long.toHexString(id)
+ " exists, probably expired and removed");
}
}
protected void registerJMX() {
// register with JMX
try {
jmxServerBean = new ZooKeeperServerBean(this);
MBeanRegistry.getInstance().register(jmxServerBean, null);
try {
jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree());
MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxDataTreeBean = null;
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxServerBean = null;
}
}
public void startdata() throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
if (!zkDb.isInitialized()) {
loadData();
}
}
public synchronized void startup() {
startupWithServerState(State.RUNNING);
}
public synchronized void startupWithoutServing() {
startupWithServerState(State.INITIAL);
}
public synchronized void startServing() {
setState(State.RUNNING);
notifyAll();
}
private void startupWithServerState(State state) {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();
startRequestThrottler();
registerJMX();
startJvmPauseMonitor();
registerMetrics();
setState(state);
requestPathMetricsCollector.start();
localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
notifyAll();
}
protected void startJvmPauseMonitor() {
if (this.jvmPauseMonitor != null) {
this.jvmPauseMonitor.serviceStart();
}
}
protected void startRequestThrottler() {
requestThrottler = createRequestThrottler();
requestThrottler.start();
}
protected RequestThrottler createRequestThrottler() {
return new RequestThrottler(this);
}
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor) firstProcessor).start();
}
public ZooKeeperServerListener getZooKeeperServerListener() {
return listener;
}
/**
* Change the server ID used by {@link #createSessionTracker()}. Must be called prior to
* {@link #startup()} being called
*
* @param newId ID to use
*/
public void setCreateSessionTrackerServerId(int newId) {
createSessionTrackerServerId = newId;
}
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
}
protected void startSessionTracker() {
((SessionTrackerImpl) sessionTracker).start();
}
/**
* Sets the state of ZooKeeper server. After changing the state, it notifies
* the server state change to a registered shutdown handler, if any.
* <p>
* The following are the server state transitions:
* <ul><li>During startup the server will be in the INITIAL state.</li>
* <li>After successfully starting, the server sets the state to RUNNING.
* </li>
* <li>The server transitions to the ERROR state if it hits an internal
* error. {@link ZooKeeperServerListenerImpl} notifies any critical resource
* error events, e.g., SyncRequestProcessor not being able to write a txn to
* disk.</li>
* <li>During shutdown the server sets the state to SHUTDOWN, which
* corresponds to the server not running.</li></ul>
*
* <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE
* </li></ul>
*
* @param state new server state.
*/
protected void setState(State state) {
this.state = state;
// Notify server state changes to the registered shutdown handler, if any.
if (zkShutdownHandler != null) {
zkShutdownHandler.handle(state);
} else {
LOG.debug(
"ZKShutdownHandler is not registered, so ZooKeeper server"
+ " won't take any action on ERROR or SHUTDOWN server state changes");
}
}
/**
* This can be used while shutting down the server to see whether the server
* is already shutdown or not.
*
* @return true if the server is running or server hits an error, false
* otherwise.
*/
protected boolean canShutdown() {
return state == State.RUNNING || state == State.ERROR;
}
/**
* @return true if the server is running, false otherwise.
*/
public boolean isRunning() {
return state == State.RUNNING;
}
public void shutdown() {
shutdown(false);
}
/**
* Shut down the server instance
* @param fullyShutDown true if another server using the same database will not replace this one in the same process
*/
public synchronized void shutdown(boolean fullyShutDown) {
if (!canShutdown()) {
if (fullyShutDown && zkDb != null) {
zkDb.clear();
}
LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!");
return;
}
LOG.info("shutting down");
// new RuntimeException("Calling shutdown").printStackTrace();
setState(State.SHUTDOWN);
// unregister all metrics that are keeping a strong reference to this object
// subclasses will do their specific clean up
unregisterMetrics();
if (requestThrottler != null) {
requestThrottler.shutdown();
}
// Since sessionTracker and syncThreads poll we just have to
// set running to false and they will detect it during the poll
// interval.
if (sessionTracker != null) {
sessionTracker.shutdown();
}
if (firstProcessor != null) {
firstProcessor.shutdown();
}
if (jvmPauseMonitor != null) {
jvmPauseMonitor.serviceStop();
}
if (zkDb != null) {
if (fullyShutDown) {
zkDb.clear();
} else {
// else there is no need to clear the database
// * When a new quorum is established we can still apply the diff
// on top of the same zkDb data
// * If we fetch a new snapshot from leader, the zkDb will be
// cleared anyway before loading the snapshot
try {
//This will fast forward the database to the latest recorded transactions
zkDb.fastForwardDataBase();
} catch (IOException e) {
LOG.error("Error updating DB", e);
zkDb.clear();
}
}
}
requestPathMetricsCollector.shutdown();
unregisterJMX();
}
protected void unregisterJMX() {
// unregister from JMX
try {
if (jmxDataTreeBean != null) {
MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
try {
if (jmxServerBean != null) {
MBeanRegistry.getInstance().unregister(jmxServerBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxServerBean = null;
jmxDataTreeBean = null;
}
public void incInProcess() {
requestsInProcess.incrementAndGet();
}
public void decInProcess() {
requestsInProcess.decrementAndGet();
if (requestThrottler != null) {
requestThrottler.throttleWake();
}
}
public int getInProcess() {
return requestsInProcess.get();
}
public int getInflight() {
return requestThrottleInflight();
}
private int requestThrottleInflight() {
if (requestThrottler != null) {
return requestThrottler.getInflight();
}
return 0;
}
static class PrecalculatedDigest {
final long nodeDigest;
final long treeDigest;
PrecalculatedDigest(long nodeDigest, long treeDigest) {
this.nodeDigest = nodeDigest;
this.treeDigest = treeDigest;
}
}
/**
* This structure is used to facilitate information sharing between PrepRP
* and FinalRP.
*/
static class ChangeRecord {
PrecalculatedDigest precalculatedDigest;
byte[] data;
ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) {
this.zxid = zxid;
this.path = path;
this.stat = stat;
this.childCount = childCount;
this.acl = acl;
}
long zxid;
String path;
StatPersisted stat; /* Make sure to create a new object when changing */
int childCount;
List<ACL> acl; /* Make sure to create a new object when changing */
ChangeRecord duplicate(long zxid) {
StatPersisted stat = new StatPersisted();
if (this.stat != null) {
DataTree.copyStatPersisted(this.stat, stat);
}
ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount,
acl == null ? new ArrayList<>() : new ArrayList<>(acl));
changeRecord.precalculatedDigest = precalculatedDigest;
changeRecord.data = data;
return changeRecord;
}
}
byte[] generatePasswd(long id) {
Random r = new Random(id ^ superSecret);
byte[] p = new byte[16];
r.nextBytes(p);
return p;
}
protected boolean checkPasswd(long sessionId, byte[] passwd) {
return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId));
}
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
if (passwd == null) {
// Possible since it's just deserialized from a packet on the wire.
passwd = new byte[0];
}
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
/**
* set the owner of this session as owner
* @param id the session id
* @param owner the owner of the session
* @throws SessionExpiredException
*/
public void setOwner(long id, Object owner) throws SessionExpiredException {
sessionTracker.setOwner(id, owner);
}
protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException {
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc);
}
finishSessionInit(cnxn, rc);
}
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
if (checkPasswd(sessionId, passwd)) {
revalidateSession(cnxn, sessionId, sessionTimeout);
} else {
LOG.warn(
"Incorrect password from {} for session 0x{}",
cnxn.getRemoteSocketAddress(),
Long.toHexString(sessionId));
finishSessionInit(cnxn, false);
}
}
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
// register with JMX
try {
if (valid) {
if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
serverCnxnFactory.registerConnection(cnxn);
} else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
secureServerCnxnFactory.registerConnection(cnxn);
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
try {
ConnectResponse rsp = new ConnectResponse(
0,
valid ? cnxn.getSessionTimeout() : 0,
valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16],
this instanceof ReadOnlyZooKeeperServer);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
cnxn.sendBuffer(bb);
if (valid) {
LOG.debug(
"Established session 0x{} with negotiated timeout {} for client {}",
Long.toHexString(cnxn.getSessionId()),
cnxn.getSessionTimeout(),
cnxn.getRemoteSocketAddress());
cnxn.enableRecv();
} else {
LOG.info(
"Invalid session 0x{} for client {}, probably expired",
Long.toHexString(cnxn.getSessionId()),
cnxn.getRemoteSocketAddress());
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
}
} catch (Exception e) {
LOG.warn("Exception while establishing session, closing", e);
cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT);
}
}
public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
closeSession(cnxn.getSessionId());
}
public long getServerId() {
return 0;
}
/**
* If the underlying Zookeeper server support local session, this method
* will set a isLocalSession to true if a request is associated with
* a local session.
*
* @param si
*/
protected void setLocalSessionFlag(Request si) {
}
public void submitRequest(Request si) {
if (restoreLatch != null) {
try {
LOG.info("Blocking request submission while restore is in progress");
restoreLatch.await();
} catch (final InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
}
enqueueRequest(si);
}
public void enqueueRequest(Request si) {
if (requestThrottler == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (requestThrottler == null) {
throw new RuntimeException("Not started");
}
}
}
requestThrottler.submitRequest(si);
}
public void submitRequestNow(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type {}", si.type);
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
LOG.debug("Dropping request.", e);
// Update request accounting/throttling limits
requestFinished(si);
} catch (RequestProcessorException e) {
LOG.error("Unable to process request", e);
// Update request accounting/throttling limits
requestFinished(si);
}
}
public static int getSnapCount() {
int snapCount = Integer.getInteger(SNAP_COUNT, DEFAULT_SNAP_COUNT);
// snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor
if (snapCount < 2) {
LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2");
snapCount = 2;
}
return snapCount;
}
public int getGlobalOutstandingLimit() {
return Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, DEFAULT_GLOBAL_OUTSTANDING_LIMIT);
}
public static long getSnapSizeInBytes() {
long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default
if (size <= 0) {
LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size);
}
return size * 1024; // Convert to bytes
}
public void setServerCnxnFactory(ServerCnxnFactory factory) {
serverCnxnFactory = factory;
}
public ServerCnxnFactory getServerCnxnFactory() {
return serverCnxnFactory;
}
public ServerCnxnFactory getSecureServerCnxnFactory() {
return secureServerCnxnFactory;
}
public void setSecureServerCnxnFactory(ServerCnxnFactory factory) {
secureServerCnxnFactory = factory;
}
/**
* return the last processed id from the
* datatree
*/
public long getLastProcessedZxid() {
return zkDb.getDataTreeLastProcessedZxid();
}
/**
* return the outstanding requests
* in the queue, which haven't been
* processed yet
*/
public long getOutstandingRequests() {
return getInProcess();
}
/**
* return the total number of client connections that are alive
* to this server
*/
public int getNumAliveConnections() {
int numAliveConnections = 0;
if (serverCnxnFactory != null) {
numAliveConnections += serverCnxnFactory.getNumAliveConnections();
}
if (secureServerCnxnFactory != null) {
numAliveConnections += secureServerCnxnFactory.getNumAliveConnections();
}
return numAliveConnections;
}
/**
* truncate the log to get in sync with others
* if in a quorum
* @param zxid the zxid that it needs to get in sync
* with others
* @throws IOException
*/
public void truncateLog(long zxid) throws IOException {
this.zkDb.truncateLog(zxid);
}
public int getTickTime() {
return tickTime;
}
public void setTickTime(int tickTime) {
LOG.info("tickTime set to {} ms", tickTime);
this.tickTime = tickTime;
}
public static int getThrottledOpWaitTime() {
return throttledOpWaitTime;
}
public static void setThrottledOpWaitTime(int time) {
LOG.info("throttledOpWaitTime set to {} ms", time);
throttledOpWaitTime = time;
}
public int getMinSessionTimeout() {
return minSessionTimeout;
}
public void setMinSessionTimeout(int min) {
this.minSessionTimeout = min == -1 ? tickTime * 2 : min;
LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout);
}
public int getMaxSessionTimeout() {
return maxSessionTimeout;
}
public void setMaxSessionTimeout(int max) {
this.maxSessionTimeout = max == -1 ? tickTime * 20 : max;
LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout);
}
public int getClientPortListenBacklog() {
return listenBacklog;
}
public void setClientPortListenBacklog(int backlog) {
this.listenBacklog = backlog;
LOG.info("clientPortListenBacklog set to {}", backlog);
}
public int getClientPort() {
return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1;
}
public int getSecureClientPort() {
return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1;
}
/** Maximum number of connections allowed from particular host (ip) */
public int getMaxClientCnxnsPerHost() {
if (serverCnxnFactory != null) {
return serverCnxnFactory.getMaxClientCnxnsPerHost();
}
if (secureServerCnxnFactory != null) {
return secureServerCnxnFactory.getMaxClientCnxnsPerHost();
}
return -1;
}
public void setTxnLogFactory(FileTxnSnapLog txnLog) {
this.txnLogFactory = txnLog;
}
public FileTxnSnapLog getTxnLogFactory() {
return this.txnLogFactory;
}
/**
* Returns the elapsed sync of time of transaction log in milliseconds.
*/
public long getTxnLogElapsedSyncTime() {
return txnLogFactory.getTxnLogElapsedSyncTime();
}
public String getState() {
return "standalone";
}
public void dumpEphemerals(PrintWriter pwriter) {
zkDb.dumpEphemerals(pwriter);
}
public Map<Long, Set<String>> getEphemerals() {
return zkDb.getEphemerals();
}
public double getConnectionDropChance() {
return connThrottle.getDropChance();
}
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
LOG.debug(
"Session establishment request from client {} client's lastZxid is 0x{}",
cnxn.getRemoteSocketAddress(),
Long.toHexString(request.getLastZxidSeen()));
long sessionId = request.getSessionId();
int tokensNeeded = 1;
if (connThrottle.isConnectionWeightEnabled()) {
if (sessionId == 0) {
if (localSessionEnabled) {
tokensNeeded = connThrottle.getRequiredTokensForLocal();
} else {
tokensNeeded = connThrottle.getRequiredTokensForGlobal();
}
} else {
tokensNeeded = connThrottle.getRequiredTokensForRenew();
}
}
if (!connThrottle.checkLimit(tokensNeeded)) {
throw new ClientCnxnLimitException();
}
ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
if (!cnxn.protocolManager.isReadonlyAvailable()) {
LOG.warn(
"Connection request from old client {}; will be dropped if server is in r-o mode",
cnxn.getRemoteSocketAddress());
}
if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session(0x"
+ Long.toHexString(sessionId)
+ ") request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(request.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
int sessionTimeout = request.getTimeOut();
byte[] passwd = request.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the
// session is setup
cnxn.disableRecv();
if (sessionId == 0) {
long id = createSession(cnxn, passwd, sessionTimeout);
LOG.debug(
"Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(id),
Long.toHexString(request.getLastZxidSeen()),
request.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
validateSession(cnxn, sessionId);
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(sessionId),
Long.toHexString(request.getLastZxidSeen()),
request.getTimeOut(),
cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
}
}
/**
* Validate if a particular session can be reestablished.
*
* @param cnxn
* @param sessionId
*/
protected void validateSession(ServerCnxn cnxn, long sessionId)
throws IOException {
// do nothing
}
public boolean shouldThrottle(long outStandingCount) {
int globalOutstandingLimit = getGlobalOutstandingLimit();
if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {
return outStandingCount > 0;
}
return false;
}
long getFlushDelay() {
return flushDelay;
}
static void setFlushDelay(long delay) {
LOG.info("{} = {} ms", FLUSH_DELAY, delay);
flushDelay = delay;
}
long getMaxWriteQueuePollTime() {
return maxWriteQueuePollTime;
}
static void setMaxWriteQueuePollTime(long maxTime) {
LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime);
maxWriteQueuePollTime = maxTime;
}
int getMaxBatchSize() {
return maxBatchSize;
}
static void setMaxBatchSize(int size) {
LOG.info("{}={}", MAX_BATCH_SIZE, size);
maxBatchSize = size;
}
private void initLargeRequestThrottlingSettings() {
setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes));
setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1));
}
public int getLargeRequestMaxBytes() {
return largeRequestMaxBytes;
}
public void setLargeRequestMaxBytes(int bytes) {
if (bytes <= 0) {
LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes);
LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes);
} else {
largeRequestMaxBytes = bytes;
LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes);
}
}
public int getLargeRequestThreshold() {
return largeRequestThreshold;
}
public void setLargeRequestThreshold(int threshold) {
if (threshold == 0 || threshold < -1) {
LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold);
largeRequestThreshold = -1;
} else {
largeRequestThreshold = threshold;
LOG.info("The large request threshold is set to {}", largeRequestThreshold);
}
}
public int getLargeRequestBytes() {
return currentLargeRequestBytes.get();
}
private boolean isLargeRequest(int length) {
// The large request limit is disabled when threshold is -1
if (largeRequestThreshold == -1) {
return false;
}
return length > largeRequestThreshold;
}
public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException {
if (!isLargeRequest(length)) {
return true;
}
if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) {
return true;
} else {
ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
throw new IOException("Rejecting large request");
}
}
private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException {
if (!isLargeRequest(length)) {
return true;
}
int bytes = currentLargeRequestBytes.addAndGet(length);
if (bytes > largeRequestMaxBytes) {
currentLargeRequestBytes.addAndGet(-length);
ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1);
throw new IOException("Rejecting large request");
}
return true;
}
public void requestFinished(Request request) {
int largeRequestLength = request.getLargeRequestSize();
if (largeRequestLength != -1) {
currentLargeRequestBytes.addAndGet(-largeRequestLength);
}
}
public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
// Need to increase the outstanding request count first, otherwise
// there might be a race condition that it enabled recv after
// processing request and then disabled when check throttling.
//
// Be aware that we're actually checking the global outstanding
// request before this request.
//
// It's fine if the IOException thrown before we decrease the count
// in cnxn, since it will close the cnxn anyway.
cnxn.incrOutstandingAndCheckThrottle(h);
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
AuthPacket authPacket = request.readRecord(AuthPacket::new);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
// handleAuthentication may close the connection, to allow the client to choose
// a different server to connect to.
authReturn = ap.handleAuthentication(
new ServerAuthenticationProvider.ServerObjs(this, cnxn),
authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
LOG.info("Session 0x{}: auth success for scheme {} and address {}",
Long.toHexString(cnxn.getSessionId()), scheme,
cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn(
"No authentication provider for scheme: {} has {}",
scheme,
ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: {}", scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else if (h.getType() == OpCode.sasl) {
processSasl(request, cnxn, h);
} else {
if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
// Authentication enforcement is failed
// Already sent response to user about failure and closed the session, lets return
return;
} else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
int length = request.limit();
if (isLargeRequest(length)) {
// checkRequestSize will throw IOException if request is rejected
checkRequestSizeWhenMessageReceived(length);
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
submitRequest(si);
}
}
}
private static boolean isSaslSuperUser(String id) {
if (id == null || id.isEmpty()) {
return false;
}
Properties properties = System.getProperties();
int prefixLen = SASL_SUPER_USER.length();
for (String k : properties.stringPropertyNames()) {
if (k.startsWith(SASL_SUPER_USER)
&& (k.length() == prefixLen || k.charAt(prefixLen) == '.')) {
String value = properties.getProperty(k);
if (value != null && value.equals(id)) {
return true;
}
}
}
return false;
}
private static boolean shouldAllowSaslFailedClientsConnect() {
return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
}
private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
LOG.debug("Responding to client SASL token.");
GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new);
byte[] clientToken = clientTokenRecord.getToken();
LOG.debug("Size of client SASL token: {}", clientToken.length);
byte[] responseToken = null;
try {
ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer;
try {
// note that clientToken might be empty (clientToken.length == 0):
// if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the
// SASL negotiation process.
responseToken = saslServer.evaluateResponse(clientToken);
if (saslServer.isComplete()) {
String authorizationID = saslServer.getAuthorizationID();
LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}",
Long.toHexString(cnxn.getSessionId()), authorizationID);
cnxn.addAuthInfo(new Id("sasl", authorizationID));
if (isSaslSuperUser(authorizationID)) {
cnxn.addAuthInfo(new Id("super", ""));
LOG.info(
"Session 0x{}: Authenticated Id '{}' as super user",
Long.toHexString(cnxn.getSessionId()),
authorizationID);
}
}
} catch (SaslException e) {
LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e);
if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) {
LOG.warn("Maintaining client connection despite SASL authentication failure.");
} else {
int error;
if (authHelper.isSaslAuthRequired()) {
LOG.warn(
"Closing client connection due to server requires client SASL authenticaiton,"
+ "but client SASL authentication has failed, or client is not configured with SASL "
+ "authentication.");
error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
} else {
LOG.warn("Closing client connection due to SASL authentication failure.");
error = Code.AUTHFAILED.intValue();
}
ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error);
cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response");
cnxn.sendCloseSession();
cnxn.disableRecv();
return;
}
}
} catch (NullPointerException e) {
LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
}
if (responseToken != null) {
LOG.debug("Size of server SASL response: {}", responseToken.length);
}
ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue());
Record record = new SetSASLResponse(responseToken);
cnxn.sendResponse(replyHeader, record, "response");
}
// entry point for quorum/Learner.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
processTxnForSessionEvents(null, hdr, txn);
return processTxnInDB(hdr, txn, null);
}
// entry point for FinalRequestProcessor.java
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn());
final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum();
// return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
while (!outstandingChanges.isEmpty()
&& outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
}
private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) {
int opCode = (request == null) ? hdr.getType() : request.type;
long sessionId = (request == null) ? hdr.getClientId() : request.sessionId;
if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.commitSession(sessionId, cst.getTimeOut());
} else if (request == null || !request.isLocalSession()) {
LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
}
private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) {
if (hdr == null) {
return new ProcessTxnResult();
} else {
return getZKDatabase().processTxn(hdr, txn, digest);
}
}
public Map<Long, Set<Long>> getSessionExpiryMap() {
return sessionTracker.getSessionExpiryMap();
}
/**
* This method is used to register the ZooKeeperServerShutdownHandler to get
* server's error or shutdown state change notifications.
* {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for
* every server state changes {@link #setState(State)}.
*
* @param zkShutdownHandler shutdown handler
*/
void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) {
this.zkShutdownHandler = zkShutdownHandler;
}
public boolean isResponseCachingEnabled() {
return isResponseCachingEnabled;
}
public void setResponseCachingEnabled(boolean isEnabled) {
isResponseCachingEnabled = isEnabled;
}
public ResponseCache getReadResponseCache() {
return isResponseCachingEnabled ? readResponseCache : null;
}
public ResponseCache getGetChildrenResponseCache() {
return isResponseCachingEnabled ? getChildrenResponseCache : null;
}
protected void registerMetrics() {
MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
final ZKDatabase zkdb = this.getZKDatabase();
final ServerStats stats = this.serverStats();
rootContext.registerGauge("avg_latency", stats::getAvgLatency);
rootContext.registerGauge("max_latency", stats::getMaxLatency);
rootContext.registerGauge("min_latency", stats::getMinLatency);
rootContext.registerGauge("packets_received", stats::getPacketsReceived);
rootContext.registerGauge("packets_sent", stats::getPacketsSent);
rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections);
rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests);
rootContext.registerGauge("uptime", stats::getUptime);
rootContext.registerGauge("znode_count", zkdb::getNodeCount);
rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount);
rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount);
rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize);
rootContext.registerGauge("global_sessions", zkdb::getSessionCount);
rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount);
OSMXBean osMbean = new OSMXBean();
rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount);
rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount);
rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance);
rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize);
rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize);
rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize);
rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum);
rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount);
rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount);
rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount);
rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE,
() -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree()));
rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE,
() -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree()));
rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE,
() -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree()));
rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE,
() -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree()));
}
protected void unregisterMetrics() {
MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
rootContext.unregisterGauge("avg_latency");
rootContext.unregisterGauge("max_latency");
rootContext.unregisterGauge("min_latency");
rootContext.unregisterGauge("packets_received");
rootContext.unregisterGauge("packets_sent");
rootContext.unregisterGauge("num_alive_connections");
rootContext.unregisterGauge("outstanding_requests");
rootContext.unregisterGauge("uptime");
rootContext.unregisterGauge("znode_count");
rootContext.unregisterGauge("watch_count");
rootContext.unregisterGauge("ephemerals_count");
rootContext.unregisterGauge("approximate_data_size");
rootContext.unregisterGauge("global_sessions");
rootContext.unregisterGauge("local_sessions");
rootContext.unregisterGauge("open_file_descriptor_count");
rootContext.unregisterGauge("max_file_descriptor_count");
rootContext.unregisterGauge("connection_drop_probability");
rootContext.unregisterGauge("last_client_response_size");
rootContext.unregisterGauge("max_client_response_size");
rootContext.unregisterGauge("min_client_response_size");
rootContext.unregisterGauge("auth_failed_count");
rootContext.unregisterGauge("non_mtls_remote_conn_count");
rootContext.unregisterGauge("non_mtls_local_conn_count");
rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE);
rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE);
rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE);
rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE);
}
/**
* Hook into admin server, useful to expose additional data
* that do not represent metrics.
*
* @param response a sink which collects the data.
*/
public void dumpMonitorValues(BiConsumer<String, Object> response) {
ServerStats stats = serverStats();
response.accept("version", Version.getFullVersion());
response.accept("server_state", stats.getServerState());
}
/**
* Grant or deny authorization to an operation on a node as a function of:
* @param cnxn : the server connection or null for admin server commands
* @param acl : set of ACLs for the node
* @param perm : the permission that the client is requesting
* @param ids : the credentials supplied by the client
* @param path : the ZNode path
* @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null.
*/
public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException {
if (skipACL) {
return;
}
LOG.debug("Permission requested: {} ", perm);
LOG.debug("ACLs for node: {}", acl);
LOG.debug("Client credentials: {}", ids);
if (acl == null || acl.size() == 0) {
return;
}
for (Id authId : ids) {
if (authId.getScheme().equals("super")) {
return;
}
}
for (ACL a : acl) {
Id id = a.getId();
if ((a.getPerms() & perm) != 0) {
if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
return;
}
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme());
if (ap != null) {
for (Id authId : ids) {
if (authId.getScheme().equals(id.getScheme())
&& ap.matches(
new ServerAuthenticationProvider.ServerObjs(this, cnxn),
new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) {
return;
}
}
}
}
}
throw new KeeperException.NoAuthException();
}
/**
* check a path whether exceeded the quota.
*
* @param path
* the path of the node, used for the quota prefix check
* @param lastData
* the current node data, {@code null} for none
* @param data
* the data to be set, or {@code null} for none
* @param type
* currently, create and setData need to check quota
*/
public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException {
if (!enforceQuota) {
return;
}
long dataBytes = (data == null) ? 0 : data.length;
ZKDatabase zkDatabase = getZKDatabase();
String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path);
if (StringUtils.isEmpty(lastPrefix)) {
return;
}
final String namespace = PathUtils.getTopNamespace(path);
switch (type) {
case OpCode.create:
checkQuota(lastPrefix, dataBytes, 1, namespace);
break;
case OpCode.setData:
checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace);
break;
default:
throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type);
}
}
/**
* check a path whether exceeded the quota.
*
* @param lastPrefix
the path of the node which has a quota.
* @param bytesDiff
* the diff to be added to number of bytes
* @param countDiff
* the diff to be added to the count
* @param namespace
* the namespace for collecting quota exceeded errors
*/
private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace)
throws KeeperException.QuotaExceededException {
LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff);
// now check the quota we set
String limitNode = Quotas.limitPath(lastPrefix);
DataNode node = getZKDatabase().getNode(limitNode);
StatsTrack limitStats;
if (node == null) {
// should not happen
LOG.error("Missing limit node for quota {}", limitNode);
return;
}
synchronized (node) {
limitStats = new StatsTrack(node.data);
}
//check the quota
boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1);
boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1);
if (!checkCountQuota && !checkByteQuota) {
return;
}
//check the statPath quota
String statNode = Quotas.statPath(lastPrefix);
node = getZKDatabase().getNode(statNode);
StatsTrack currentStats;
if (node == null) {
// should not happen
LOG.error("Missing node for stat {}", statNode);
return;
}
synchronized (node) {
currentStats = new StatsTrack(node.data);
}
//check the Count Quota
if (checkCountQuota) {
long newCount = currentStats.getCount() + countDiff;
boolean isCountHardLimit = limitStats.getCountHardLimit() > -1;
long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount();
if (newCount > countLimit) {
String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]";
RATE_LOGGER.rateLimitLog(msg);
if (isCountHardLimit) {
updateQuotaExceededMetrics(namespace);
throw new KeeperException.QuotaExceededException(lastPrefix);
}
}
}
//check the Byte Quota
if (checkByteQuota) {
long newBytes = currentStats.getBytes() + bytesDiff;
boolean isByteHardLimit = limitStats.getByteHardLimit() > -1;
long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes();
if (newBytes > byteLimit) {
String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]";
RATE_LOGGER.rateLimitLog(msg);
if (isByteHardLimit) {
updateQuotaExceededMetrics(namespace);
throw new KeeperException.QuotaExceededException(lastPrefix);
}
}
}
}
public static boolean isDigestEnabled() {
return digestEnabled;
}
public static void setDigestEnabled(boolean digestEnabled) {
LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled);
ZooKeeperServer.digestEnabled = digestEnabled;
}
public static boolean isSerializeLastProcessedZxidEnabled() {
return serializeLastProcessedZxidEnabled;
}
public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) {
serializeLastProcessedZxidEnabled = serializeLastZxidEnabled;
LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled);
}
/**
* Trim a path to get the immediate predecessor.
*
* @param path
* @return
* @throws KeeperException.BadArgumentsException
*/
private String parentPath(String path) throws KeeperException.BadArgumentsException {
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) {
throw new KeeperException.BadArgumentsException(path);
}
return lastSlash == 0 ? "/" : path.substring(0, lastSlash);
}
private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException {
boolean mustCheckACL = false;
String path = null;
List<ACL> acl = null;
switch (request.type) {
case OpCode.create:
case OpCode.create2: {
CreateRequest req = request.readRequestRecordNoException(CreateRequest::new);
if (req != null) {
mustCheckACL = true;
acl = req.getAcl();
path = parentPath(req.getPath());
}
break;
}
case OpCode.delete: {
DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new);
if (req != null) {
path = parentPath(req.getPath());
}
break;
}
case OpCode.setData: {
SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new);
if (req != null) {
path = req.getPath();
}
break;
}
case OpCode.setACL: {
SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new);
if (req != null) {
mustCheckACL = true;
acl = req.getAcl();
path = req.getPath();
}
break;
}
}
if (mustCheckACL) {
/* we ignore the extrapolated ACL returned by fixupACL because
* we only care about it being well-formed (and if it isn't, an
* exception will be raised).
*/
PrepRequestProcessor.fixupACL(path, request.authInfo, acl);
}
return path;
}
private int effectiveACLPerms(Request request) {
switch (request.type) {
case OpCode.create:
case OpCode.create2:
return ZooDefs.Perms.CREATE;
case OpCode.delete:
return ZooDefs.Perms.DELETE;
case OpCode.setData:
return ZooDefs.Perms.WRITE;
case OpCode.setACL:
return ZooDefs.Perms.ADMIN;
default:
return ZooDefs.Perms.ALL;
}
}
/**
* Check Write Requests for Potential Access Restrictions
* <p/>
* Before a request is being proposed to the quorum, lets check it
* against local ACLs. Non-write requests (read, session, etc.)
* are passed along. Invalid requests are sent a response.
* <p/>
* While we are at it, if the request will set an ACL: make sure it's
* a valid one.
*
* @param request
* @return true if request is permitted, false if not.
* @throws java.io.IOException
*/
public boolean authWriteRequest(Request request) {
int err;
String pathToCheck;
if (!enableEagerACLCheck) {
return true;
}
err = KeeperException.Code.OK.intValue();
try {
pathToCheck = effectiveACLPath(request);
if (pathToCheck != null) {
checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null);
}
} catch (KeeperException.NoAuthException e) {
LOG.debug("Request failed ACL check", e);
err = e.code().intValue();
} catch (KeeperException.InvalidACLException e) {
LOG.debug("Request has an invalid ACL check", e);
err = e.code().intValue();
} catch (KeeperException.NoNodeException e) {
LOG.debug("ACL check against non-existent node: {}", e.getMessage());
} catch (KeeperException.BadArgumentsException e) {
LOG.debug("ACL check against illegal node path: {}", e.getMessage());
} catch (Throwable t) {
LOG.error("Uncaught exception in authWriteRequest with: ", t);
throw t;
} finally {
if (err != KeeperException.Code.OK.intValue()) {
/* This request has a bad ACL, so we are dismissing it early. */
decInProcess();
ReplyHeader rh = new ReplyHeader(request.cxid, 0, err);
try {
request.cnxn.sendResponse(rh, null, null);
} catch (IOException e) {
LOG.error("IOException : {}", e);
}
}
}
return err == KeeperException.Code.OK.intValue();
}
public int getOutstandingHandshakeNum() {
if (serverCnxnFactory instanceof NettyServerCnxnFactory) {
return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum();
} else {
return 0;
}
}
public boolean isReconfigEnabled() {
return this.reconfigEnabled;
}
public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
return zkShutdownHandler;
}
static void updateQuotaExceededMetrics(final String namespace) {
if (namespace == null) {
return;
}
ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1);
}
}