| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zookeeper.server; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| import java.io.BufferedInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.PrintWriter; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.BiConsumer; |
| import java.util.zip.Adler32; |
| import java.util.zip.CheckedInputStream; |
| import javax.security.sasl.SaslException; |
| import org.apache.jute.BinaryInputArchive; |
| import org.apache.jute.BinaryOutputArchive; |
| import org.apache.jute.InputArchive; |
| import org.apache.jute.Record; |
| import org.apache.zookeeper.Environment; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.Code; |
| import org.apache.zookeeper.KeeperException.SessionExpiredException; |
| import org.apache.zookeeper.Quotas; |
| import org.apache.zookeeper.StatsTrack; |
| import org.apache.zookeeper.Version; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooDefs.OpCode; |
| import org.apache.zookeeper.ZookeeperBanner; |
| import org.apache.zookeeper.common.PathUtils; |
| import org.apache.zookeeper.common.StringUtils; |
| import org.apache.zookeeper.common.Time; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Id; |
| import org.apache.zookeeper.data.StatPersisted; |
| import org.apache.zookeeper.jmx.MBeanRegistry; |
| import org.apache.zookeeper.metrics.MetricsContext; |
| import org.apache.zookeeper.proto.AuthPacket; |
| import org.apache.zookeeper.proto.ConnectRequest; |
| import org.apache.zookeeper.proto.ConnectResponse; |
| import org.apache.zookeeper.proto.CreateRequest; |
| import org.apache.zookeeper.proto.DeleteRequest; |
| import org.apache.zookeeper.proto.GetSASLRequest; |
| import org.apache.zookeeper.proto.ReplyHeader; |
| import org.apache.zookeeper.proto.RequestHeader; |
| import org.apache.zookeeper.proto.SetACLRequest; |
| import org.apache.zookeeper.proto.SetDataRequest; |
| import org.apache.zookeeper.proto.SetSASLResponse; |
| import org.apache.zookeeper.server.DataTree.ProcessTxnResult; |
| import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException; |
| import org.apache.zookeeper.server.ServerCnxn.CloseRequestException; |
| import org.apache.zookeeper.server.SessionTracker.Session; |
| import org.apache.zookeeper.server.SessionTracker.SessionExpirer; |
| import org.apache.zookeeper.server.auth.ProviderRegistry; |
| import org.apache.zookeeper.server.auth.ServerAuthenticationProvider; |
| import org.apache.zookeeper.server.persistence.FileTxnSnapLog; |
| import org.apache.zookeeper.server.quorum.QuorumPeerConfig; |
| import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; |
| import org.apache.zookeeper.server.util.JvmPauseMonitor; |
| import org.apache.zookeeper.server.util.OSMXBean; |
| import org.apache.zookeeper.server.util.QuotaMetricsUtils; |
| import org.apache.zookeeper.server.util.RequestPathMetricsCollector; |
| import org.apache.zookeeper.txn.CreateSessionTxn; |
| import org.apache.zookeeper.txn.TxnDigest; |
| import org.apache.zookeeper.txn.TxnHeader; |
| import org.apache.zookeeper.util.ServiceUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class implements a simple standalone ZooKeeperServer. It sets up the |
| * following chain of RequestProcessors to process requests: |
| * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor |
| */ |
| public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { |
| |
| protected static final Logger LOG; |
| private static final RateLogger RATE_LOGGER; |
| |
| public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit"; |
| |
| public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck"; |
| public static final String SKIP_ACL = "zookeeper.skipACL"; |
| public static final String ENFORCE_QUOTA = "zookeeper.enforceQuota"; |
| |
| // When enabled, will check ACL constraints appertained to the requests first, |
| // before sending the requests to the quorum. |
| static boolean enableEagerACLCheck; |
| |
| static final boolean skipACL; |
| |
| public static final boolean enforceQuota; |
| |
| public static final String SASL_SUPER_USER = "zookeeper.superUser"; |
| |
| public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients"; |
| public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; |
| private static boolean digestEnabled; |
| |
| public static final String ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED = "zookeeper.serializeLastProcessedZxid.enabled"; |
| private static boolean serializeLastProcessedZxidEnabled; |
| |
| // Add a enable/disable option for now, we should remove this one when |
| // this feature is confirmed to be stable |
| public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; |
| private static boolean closeSessionTxnEnabled = true; |
| private volatile CountDownLatch restoreLatch; |
| |
| static { |
| LOG = LoggerFactory.getLogger(ZooKeeperServer.class); |
| |
| RATE_LOGGER = new RateLogger(LOG); |
| |
| ZookeeperBanner.printBanner(LOG); |
| |
| Environment.logEnv("Server environment:", LOG); |
| |
| enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK); |
| LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck); |
| |
| skipACL = System.getProperty(SKIP_ACL, "no").equals("yes"); |
| if (skipACL) { |
| LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL); |
| } |
| |
| enforceQuota = Boolean.parseBoolean(System.getProperty(ENFORCE_QUOTA, "false")); |
| if (enforceQuota) { |
| LOG.info("{} = {}, Quota Enforce enables", ENFORCE_QUOTA, enforceQuota); |
| } |
| |
| digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); |
| LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); |
| |
| closeSessionTxnEnabled = Boolean.parseBoolean( |
| System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); |
| LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); |
| |
| setSerializeLastProcessedZxidEnabled(Boolean.parseBoolean( |
| System.getProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true"))); |
| } |
| |
| // @VisibleForTesting |
| public static boolean isEnableEagerACLCheck() { |
| return enableEagerACLCheck; |
| } |
| |
| // @VisibleForTesting |
| public static void setEnableEagerACLCheck(boolean enabled) { |
| ZooKeeperServer.enableEagerACLCheck = enabled; |
| LOG.info("Update {} to {}", ENABLE_EAGER_ACL_CHECK, enabled); |
| } |
| |
| public static boolean isCloseSessionTxnEnabled() { |
| return closeSessionTxnEnabled; |
| } |
| |
| public static void setCloseSessionTxnEnabled(boolean enabled) { |
| ZooKeeperServer.closeSessionTxnEnabled = enabled; |
| LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED, |
| ZooKeeperServer.closeSessionTxnEnabled); |
| } |
| |
| protected ZooKeeperServerBean jmxServerBean; |
| protected DataTreeBean jmxDataTreeBean; |
| |
| public static final int DEFAULT_TICK_TIME = 3000; |
| protected int tickTime = DEFAULT_TICK_TIME; |
| public static final int DEFAULT_THROTTLED_OP_WAIT_TIME = 0; // disabled |
| protected static volatile int throttledOpWaitTime = |
| Integer.getInteger("zookeeper.throttled_op_wait_time", DEFAULT_THROTTLED_OP_WAIT_TIME); |
| /** value of -1 indicates unset, use default */ |
| protected int minSessionTimeout = -1; |
| /** value of -1 indicates unset, use default */ |
| protected int maxSessionTimeout = -1; |
| /** Socket listen backlog. Value of -1 indicates unset */ |
| protected int listenBacklog = -1; |
| protected SessionTracker sessionTracker; |
| private FileTxnSnapLog txnLogFactory = null; |
| private ZKDatabase zkDb; |
| private ResponseCache readResponseCache; |
| private ResponseCache getChildrenResponseCache; |
| private final AtomicLong hzxid = new AtomicLong(0); |
| public static final Exception ok = new Exception("No prob"); |
| protected RequestProcessor firstProcessor; |
| protected JvmPauseMonitor jvmPauseMonitor; |
| protected volatile State state = State.INITIAL; |
| private boolean isResponseCachingEnabled = true; |
| /* contains the configuration file content read at startup */ |
| protected String initialConfig; |
| protected boolean reconfigEnabled; |
| private final RequestPathMetricsCollector requestPathMetricsCollector; |
| private static final int DEFAULT_SNAP_COUNT = 100000; |
| private static final int DEFAULT_GLOBAL_OUTSTANDING_LIMIT = 1000; |
| |
| private boolean localSessionEnabled = false; |
| protected enum State { |
| INITIAL, |
| RUNNING, |
| SHUTDOWN, |
| ERROR |
| } |
| |
| /** |
| * This is the secret that we use to generate passwords. For the moment, |
| * it's more of a checksum that's used in reconnection, which carries no |
| * security weight, and is treated internally as if it carries no |
| * security weight. |
| */ |
| private static final long superSecret = 0XB3415C00L; |
| |
| private final AtomicInteger requestsInProcess = new AtomicInteger(0); |
| final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>(); |
| // this data structure must be accessed under the outstandingChanges lock |
| final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<>(); |
| |
| protected ServerCnxnFactory serverCnxnFactory; |
| protected ServerCnxnFactory secureServerCnxnFactory; |
| |
| private final ServerStats serverStats; |
| private final ZooKeeperServerListener listener; |
| private ZooKeeperServerShutdownHandler zkShutdownHandler; |
| private volatile int createSessionTrackerServerId = 1; |
| |
| private static final String FLUSH_DELAY = "zookeeper.flushDelay"; |
| private static volatile long flushDelay; |
| private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime"; |
| private static volatile long maxWriteQueuePollTime; |
| private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize"; |
| private static volatile int maxBatchSize; |
| |
| /** |
| * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes. |
| * Flag not used for small transfers like connectResponses. |
| */ |
| public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes"; |
| public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024; |
| public static final int intBufferStartingSizeBytes; |
| |
| public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize"; |
| public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize"; |
| |
| static { |
| long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0); |
| setFlushDelay(configuredFlushDelay); |
| setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3)); |
| setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000)); |
| |
| intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE); |
| |
| if (intBufferStartingSizeBytes < 32) { |
| String msg = "Buffer starting size (" + intBufferStartingSizeBytes + ") must be greater than or equal to 32. " |
| + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" "; |
| LOG.error(msg); |
| throw new IllegalArgumentException(msg); |
| } |
| |
| LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes); |
| } |
| |
| // Connection throttling |
| private final BlueThrottle connThrottle = new BlueThrottle(); |
| |
| @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = |
| "Internally the throttler has a BlockingQueue so " |
| + "once the throttler is created and started, it is thread-safe") |
| private RequestThrottler requestThrottler; |
| public static final String SNAP_COUNT = "zookeeper.snapCount"; |
| |
| /** |
| * This setting sets a limit on the total number of large requests that |
| * can be inflight and is designed to prevent ZooKeeper from accepting |
| * too many large requests such that the JVM runs out of usable heap and |
| * ultimately crashes. |
| * |
| * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)} |
| * method which is called by the connection layer ({@link NIOServerCnxn}, |
| * {@link NettyServerCnxn}) before allocating a byte buffer and pulling |
| * data off the TCP socket. The limit is then checked again by the |
| * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which |
| * also atomically updates {@link #currentLargeRequestBytes}. The request is |
| * then marked as a large request, with the request size stored in the Request |
| * object so that it can later be decremented from {@link #currentLargeRequestBytes}. |
| * |
| * When a request is completed or dropped, the relevant code path calls the |
| * {@link #requestFinished(Request)} method which performs the decrement if |
| * needed. |
| */ |
| private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; |
| |
| /** |
| * The size threshold after which a request is considered a large request |
| * and is checked against the large request byte limit. |
| */ |
| private volatile int largeRequestThreshold = -1; |
| |
| private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); |
| |
| private final AuthenticationHelper authHelper = new AuthenticationHelper(); |
| |
| void removeCnxn(ServerCnxn cnxn) { |
| zkDb.removeCnxn(cnxn); |
| } |
| |
| /** |
| * Creates a ZooKeeperServer instance. Nothing is setup, use the setX |
| * methods to prepare the instance (eg datadir, datalogdir, ticktime, |
| * builder, etc...) |
| * |
| */ |
| public ZooKeeperServer() { |
| listener = new ZooKeeperServerListenerImpl(this); |
| serverStats = new ServerStats(this); |
| this.requestPathMetricsCollector = new RequestPathMetricsCollector(); |
| } |
| |
| /** |
| * Keeping this constructor for backward compatibility |
| */ |
| public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { |
| this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); |
| } |
| |
| /** |
| * * Creates a ZooKeeperServer instance. It sets everything up, but doesn't |
| * actually start listening for clients until run() is invoked. |
| * |
| */ |
| public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) { |
| serverStats = new ServerStats(this); |
| this.txnLogFactory = txnLogFactory; |
| this.txnLogFactory.setServerStats(this.serverStats); |
| this.zkDb = zkDb; |
| this.tickTime = tickTime; |
| setMinSessionTimeout(minSessionTimeout); |
| setMaxSessionTimeout(maxSessionTimeout); |
| this.listenBacklog = clientPortListenBacklog; |
| this.reconfigEnabled = reconfigEnabled; |
| |
| listener = new ZooKeeperServerListenerImpl(this); |
| |
| readResponseCache = new ResponseCache(Integer.getInteger( |
| GET_DATA_RESPONSE_CACHE_SIZE, |
| ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData"); |
| |
| getChildrenResponseCache = new ResponseCache(Integer.getInteger( |
| GET_CHILDREN_RESPONSE_CACHE_SIZE, |
| ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren"); |
| |
| this.initialConfig = initialConfig; |
| |
| this.requestPathMetricsCollector = new RequestPathMetricsCollector(); |
| |
| this.initLargeRequestThrottlingSettings(); |
| |
| LOG.info( |
| "Created server with" |
| + " tickTime {} ms" |
| + " minSessionTimeout {} ms" |
| + " maxSessionTimeout {} ms" |
| + " clientPortListenBacklog {}" |
| + " dataLogdir {}" |
| + " snapdir {}", |
| tickTime, |
| getMinSessionTimeout(), |
| getMaxSessionTimeout(), |
| getClientPortListenBacklog(), |
| txnLogFactory.getDataLogDir(), |
| txnLogFactory.getSnapDir()); |
| } |
| |
| public String getInitialConfig() { |
| return initialConfig; |
| } |
| |
| /** |
| * Adds JvmPauseMonitor and calls |
| * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)} |
| * |
| */ |
| public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { |
| this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig, QuorumPeerConfig.isReconfigEnabled()); |
| this.jvmPauseMonitor = jvmPauseMonitor; |
| if (jvmPauseMonitor != null) { |
| LOG.info("Added JvmPauseMonitor to server"); |
| } |
| } |
| |
| /** |
| * creates a zookeeperserver instance. |
| * @param txnLogFactory the file transaction snapshot logging class |
| * @param tickTime the ticktime for the server |
| * @throws IOException |
| */ |
| public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) { |
| this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled()); |
| } |
| |
| public ServerStats serverStats() { |
| return serverStats; |
| } |
| |
| public RequestPathMetricsCollector getRequestPathMetricsCollector() { |
| return requestPathMetricsCollector; |
| } |
| |
| public BlueThrottle connThrottle() { |
| return connThrottle; |
| } |
| |
| public void dumpConf(PrintWriter pwriter) { |
| pwriter.print("clientPort="); |
| pwriter.println(getClientPort()); |
| pwriter.print("secureClientPort="); |
| pwriter.println(getSecureClientPort()); |
| pwriter.print("dataDir="); |
| pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath()); |
| pwriter.print("dataDirSize="); |
| pwriter.println(getDataDirSize()); |
| pwriter.print("dataLogDir="); |
| pwriter.println(zkDb.snapLog.getDataLogDir().getAbsolutePath()); |
| pwriter.print("dataLogSize="); |
| pwriter.println(getLogDirSize()); |
| pwriter.print("tickTime="); |
| pwriter.println(getTickTime()); |
| pwriter.print("maxClientCnxns="); |
| pwriter.println(getMaxClientCnxnsPerHost()); |
| pwriter.print("minSessionTimeout="); |
| pwriter.println(getMinSessionTimeout()); |
| pwriter.print("maxSessionTimeout="); |
| pwriter.println(getMaxSessionTimeout()); |
| pwriter.print("clientPortListenBacklog="); |
| pwriter.println(getClientPortListenBacklog()); |
| |
| pwriter.print("serverId="); |
| pwriter.println(getServerId()); |
| } |
| |
| public ZooKeeperServerConf getConf() { |
| return new ZooKeeperServerConf( |
| getClientPort(), |
| zkDb.snapLog.getSnapDir().getAbsolutePath(), |
| zkDb.snapLog.getDataLogDir().getAbsolutePath(), |
| getTickTime(), |
| getMaxClientCnxnsPerHost(), |
| getMinSessionTimeout(), |
| getMaxSessionTimeout(), |
| getServerId(), |
| getClientPortListenBacklog()); |
| } |
| |
| /** |
| * This constructor is for backward compatibility with the existing unit |
| * test code. |
| * It defaults to FileLogProvider persistence provider. |
| */ |
| public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { |
| this(new FileTxnSnapLog(snapDir, logDir), tickTime, ""); |
| } |
| |
| /** |
| * Default constructor, relies on the config for its argument values |
| * |
| * @throws IOException |
| */ |
| public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException { |
| this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), "", QuorumPeerConfig.isReconfigEnabled()); |
| } |
| |
| /** |
| * get the zookeeper database for this server |
| * @return the zookeeper database for this server |
| */ |
| public ZKDatabase getZKDatabase() { |
| return this.zkDb; |
| } |
| |
| /** |
| * set the zkdatabase for this zookeeper server |
| * @param zkDb |
| */ |
| public void setZKDatabase(ZKDatabase zkDb) { |
| this.zkDb = zkDb; |
| } |
| |
| /** |
| * Restore sessions and data |
| */ |
| public void loadData() throws IOException, InterruptedException { |
| /* |
| * When a new leader starts executing Leader#lead, it |
| * invokes this method. The database, however, has been |
| * initialized before running leader election so that |
| * the server could pick its zxid for its initial vote. |
| * It does it by invoking QuorumPeer#getLastLoggedZxid. |
| * Consequently, we don't need to initialize it once more |
| * and avoid the penalty of loading it a second time. Not |
| * reloading it is particularly important for applications |
| * that host a large database. |
| * |
| * The following if block checks whether the database has |
| * been initialized or not. Note that this method is |
| * invoked by at least one other method: |
| * ZooKeeperServer#startdata. |
| * |
| * See ZOOKEEPER-1642 for more detail. |
| */ |
| if (zkDb.isInitialized()) { |
| setZxid(zkDb.getDataTreeLastProcessedZxid()); |
| } else { |
| setZxid(zkDb.loadDataBase()); |
| } |
| |
| // Clean up dead sessions |
| zkDb.getSessions().stream() |
| .filter(session -> zkDb.getSessionWithTimeOuts().get(session) == null) |
| .forEach(session -> killSession(session, zkDb.getDataTreeLastProcessedZxid())); |
| |
| // Make a clean snapshot |
| takeSnapshot(); |
| } |
| |
| public File takeSnapshot() throws IOException { |
| return takeSnapshot(false); |
| } |
| |
| public File takeSnapshot(boolean syncSnap) throws IOException { |
| return takeSnapshot(syncSnap, true, false); |
| } |
| |
| /** |
| * Takes a snapshot on the server. |
| * |
| * @param syncSnap syncSnap sync the snapshot immediately after write |
| * @param isSevere if true system exist, otherwise throw IOException |
| * @param fastForwardFromEdits whether fast forward database to the latest recorded transactions |
| * |
| * @return file snapshot file object |
| * @throws IOException |
| */ |
| public synchronized File takeSnapshot(boolean syncSnap, boolean isSevere, boolean fastForwardFromEdits) throws IOException { |
| long start = Time.currentElapsedTime(); |
| File snapFile = null; |
| try { |
| if (fastForwardFromEdits) { |
| zkDb.fastForwardDataBase(); |
| } |
| snapFile = txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); |
| } catch (IOException e) { |
| if (isSevere) { |
| LOG.error("Severe unrecoverable error, exiting", e); |
| // This is a severe error that we cannot recover from, |
| // so we need to exit |
| ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); |
| } else { |
| throw e; |
| } |
| } |
| long elapsed = Time.currentElapsedTime() - start; |
| LOG.info("Snapshot taken in {} ms", elapsed); |
| ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); |
| return snapFile; |
| } |
| |
| /** |
| * Restores database from a snapshot. It is used by the restore admin server command. |
| * |
| * @param inputStream input stream of snapshot |
| * @Return last processed zxid |
| * @throws IOException |
| */ |
| public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException { |
| if (inputStream == null) { |
| throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot"); |
| } |
| |
| long start = Time.currentElapsedTime(); |
| LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}", |
| getZKDatabase().getDataTreeLastProcessedZxid(), |
| getZKDatabase().dataTree.getNodeCount(), |
| getZKDatabase().getSessionCount()); |
| |
| // restore to a new zkDatabase |
| final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory); |
| final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32()); |
| final InputArchive ia = BinaryInputArchive.getArchive(cis); |
| newZKDatabase.deserializeSnapshot(ia, cis); |
| LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", |
| newZKDatabase.getDataTreeLastProcessedZxid(), |
| newZKDatabase.dataTree.getNodeCount(), |
| newZKDatabase.getSessionCount()); |
| |
| // create a CountDownLatch |
| restoreLatch = new CountDownLatch(1); |
| |
| try { |
| // set to the new zkDatabase |
| setZKDatabase(newZKDatabase); |
| |
| // re-create SessionTrack |
| createSessionTracker(); |
| } finally { |
| // unblock request submission |
| restoreLatch.countDown(); |
| restoreLatch = null; |
| } |
| |
| LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}", |
| getZKDatabase().getDataTreeLastProcessedZxid(), |
| getZKDatabase().dataTree.getNodeCount(), |
| getZKDatabase().getSessionCount()); |
| |
| long elapsed = Time.currentElapsedTime() - start; |
| LOG.info("Restore taken in {} ms", elapsed); |
| ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed); |
| |
| return getLastProcessedZxid(); |
| } |
| |
| public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() { |
| return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection(); |
| } |
| |
| @Override |
| public long getDataDirSize() { |
| if (zkDb == null) { |
| return 0L; |
| } |
| File path = zkDb.snapLog.getSnapDir(); |
| return getDirSize(path); |
| } |
| |
| @Override |
| public long getLogDirSize() { |
| if (zkDb == null) { |
| return 0L; |
| } |
| File path = zkDb.snapLog.getDataLogDir(); |
| return getDirSize(path); |
| } |
| |
| private long getDirSize(File file) { |
| long size = 0L; |
| if (file.isDirectory()) { |
| File[] files = file.listFiles(); |
| if (files != null) { |
| for (File f : files) { |
| size += getDirSize(f); |
| } |
| } |
| } else { |
| size = file.length(); |
| } |
| return size; |
| } |
| |
| public long getZxid() { |
| return hzxid.get(); |
| } |
| |
| public SessionTracker getSessionTracker() { |
| return sessionTracker; |
| } |
| |
| long getNextZxid() { |
| return hzxid.incrementAndGet(); |
| } |
| |
| public void setZxid(long zxid) { |
| hzxid.set(zxid); |
| } |
| |
| private void close(long sessionId) { |
| Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); |
| submitRequest(si); |
| } |
| |
| public void closeSession(long sessionId) { |
| LOG.info("Closing session 0x{}", Long.toHexString(sessionId)); |
| |
| // we do not want to wait for a session close. send it as soon as we |
| // detect it! |
| close(sessionId); |
| } |
| |
| protected void killSession(long sessionId, long zxid) { |
| zkDb.killSession(sessionId, zxid); |
| if (LOG.isTraceEnabled()) { |
| ZooTrace.logTraceMessage( |
| LOG, |
| ZooTrace.SESSION_TRACE_MASK, |
| "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId)); |
| } |
| if (sessionTracker != null) { |
| sessionTracker.removeSession(sessionId); |
| } |
| } |
| |
| public void expire(Session session) { |
| long sessionId = session.getSessionId(); |
| LOG.info( |
| "Expiring session 0x{}, timeout of {}ms exceeded", |
| Long.toHexString(sessionId), |
| session.getTimeout()); |
| close(sessionId); |
| } |
| |
| public void expire(long sessionId) { |
| LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId)); |
| |
| close(sessionId); |
| } |
| |
| public static class MissingSessionException extends IOException { |
| |
| private static final long serialVersionUID = 7467414635467261007L; |
| |
| public MissingSessionException(String msg) { |
| super(msg); |
| } |
| |
| } |
| |
| void touch(ServerCnxn cnxn) throws MissingSessionException { |
| if (cnxn == null) { |
| return; |
| } |
| long id = cnxn.getSessionId(); |
| int to = cnxn.getSessionTimeout(); |
| if (!sessionTracker.touchSession(id, to)) { |
| throw new MissingSessionException("No session with sessionid 0x" |
| + Long.toHexString(id) |
| + " exists, probably expired and removed"); |
| } |
| } |
| |
| protected void registerJMX() { |
| // register with JMX |
| try { |
| jmxServerBean = new ZooKeeperServerBean(this); |
| MBeanRegistry.getInstance().register(jmxServerBean, null); |
| |
| try { |
| jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); |
| MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| jmxDataTreeBean = null; |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| jmxServerBean = null; |
| } |
| } |
| |
| public void startdata() throws IOException, InterruptedException { |
| //check to see if zkDb is not null |
| if (zkDb == null) { |
| zkDb = new ZKDatabase(this.txnLogFactory); |
| } |
| if (!zkDb.isInitialized()) { |
| loadData(); |
| } |
| } |
| |
| public synchronized void startup() { |
| startupWithServerState(State.RUNNING); |
| } |
| |
| public synchronized void startupWithoutServing() { |
| startupWithServerState(State.INITIAL); |
| } |
| |
| public synchronized void startServing() { |
| setState(State.RUNNING); |
| notifyAll(); |
| } |
| |
| private void startupWithServerState(State state) { |
| if (sessionTracker == null) { |
| createSessionTracker(); |
| } |
| startSessionTracker(); |
| setupRequestProcessors(); |
| |
| startRequestThrottler(); |
| |
| registerJMX(); |
| |
| startJvmPauseMonitor(); |
| |
| registerMetrics(); |
| |
| setState(state); |
| |
| requestPathMetricsCollector.start(); |
| |
| localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); |
| |
| notifyAll(); |
| } |
| |
| protected void startJvmPauseMonitor() { |
| if (this.jvmPauseMonitor != null) { |
| this.jvmPauseMonitor.serviceStart(); |
| } |
| } |
| |
| protected void startRequestThrottler() { |
| requestThrottler = createRequestThrottler(); |
| requestThrottler.start(); |
| } |
| |
| protected RequestThrottler createRequestThrottler() { |
| return new RequestThrottler(this); |
| } |
| |
| protected void setupRequestProcessors() { |
| RequestProcessor finalProcessor = new FinalRequestProcessor(this); |
| RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); |
| ((SyncRequestProcessor) syncProcessor).start(); |
| firstProcessor = new PrepRequestProcessor(this, syncProcessor); |
| ((PrepRequestProcessor) firstProcessor).start(); |
| } |
| |
| public ZooKeeperServerListener getZooKeeperServerListener() { |
| return listener; |
| } |
| |
| /** |
| * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to |
| * {@link #startup()} being called |
| * |
| * @param newId ID to use |
| */ |
| public void setCreateSessionTrackerServerId(int newId) { |
| createSessionTrackerServerId = newId; |
| } |
| |
| protected void createSessionTracker() { |
| sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); |
| } |
| |
| protected void startSessionTracker() { |
| ((SessionTrackerImpl) sessionTracker).start(); |
| } |
| |
| /** |
| * Sets the state of ZooKeeper server. After changing the state, it notifies |
| * the server state change to a registered shutdown handler, if any. |
| * <p> |
| * The following are the server state transitions: |
| * <ul><li>During startup the server will be in the INITIAL state.</li> |
| * <li>After successfully starting, the server sets the state to RUNNING. |
| * </li> |
| * <li>The server transitions to the ERROR state if it hits an internal |
| * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource |
| * error events, e.g., SyncRequestProcessor not being able to write a txn to |
| * disk.</li> |
| * <li>During shutdown the server sets the state to SHUTDOWN, which |
| * corresponds to the server not running.</li></ul> |
| * |
| * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE |
| * </li></ul> |
| * |
| * @param state new server state. |
| */ |
| protected void setState(State state) { |
| this.state = state; |
| // Notify server state changes to the registered shutdown handler, if any. |
| if (zkShutdownHandler != null) { |
| zkShutdownHandler.handle(state); |
| } else { |
| LOG.debug( |
| "ZKShutdownHandler is not registered, so ZooKeeper server" |
| + " won't take any action on ERROR or SHUTDOWN server state changes"); |
| } |
| } |
| |
| /** |
| * This can be used while shutting down the server to see whether the server |
| * is already shutdown or not. |
| * |
| * @return true if the server is running or server hits an error, false |
| * otherwise. |
| */ |
| protected boolean canShutdown() { |
| return state == State.RUNNING || state == State.ERROR; |
| } |
| |
| /** |
| * @return true if the server is running, false otherwise. |
| */ |
| public boolean isRunning() { |
| return state == State.RUNNING; |
| } |
| |
| public void shutdown() { |
| shutdown(false); |
| } |
| |
| /** |
| * Shut down the server instance |
| * @param fullyShutDown true if another server using the same database will not replace this one in the same process |
| */ |
| public synchronized void shutdown(boolean fullyShutDown) { |
| if (!canShutdown()) { |
| if (fullyShutDown && zkDb != null) { |
| zkDb.clear(); |
| } |
| LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); |
| return; |
| } |
| LOG.info("shutting down"); |
| |
| // new RuntimeException("Calling shutdown").printStackTrace(); |
| setState(State.SHUTDOWN); |
| |
| // unregister all metrics that are keeping a strong reference to this object |
| // subclasses will do their specific clean up |
| unregisterMetrics(); |
| |
| if (requestThrottler != null) { |
| requestThrottler.shutdown(); |
| } |
| |
| // Since sessionTracker and syncThreads poll we just have to |
| // set running to false and they will detect it during the poll |
| // interval. |
| if (sessionTracker != null) { |
| sessionTracker.shutdown(); |
| } |
| if (firstProcessor != null) { |
| firstProcessor.shutdown(); |
| } |
| if (jvmPauseMonitor != null) { |
| jvmPauseMonitor.serviceStop(); |
| } |
| |
| if (zkDb != null) { |
| if (fullyShutDown) { |
| zkDb.clear(); |
| } else { |
| // else there is no need to clear the database |
| // * When a new quorum is established we can still apply the diff |
| // on top of the same zkDb data |
| // * If we fetch a new snapshot from leader, the zkDb will be |
| // cleared anyway before loading the snapshot |
| try { |
| //This will fast forward the database to the latest recorded transactions |
| zkDb.fastForwardDataBase(); |
| } catch (IOException e) { |
| LOG.error("Error updating DB", e); |
| zkDb.clear(); |
| } |
| } |
| } |
| |
| requestPathMetricsCollector.shutdown(); |
| unregisterJMX(); |
| } |
| |
| protected void unregisterJMX() { |
| // unregister from JMX |
| try { |
| if (jmxDataTreeBean != null) { |
| MBeanRegistry.getInstance().unregister(jmxDataTreeBean); |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to unregister with JMX", e); |
| } |
| try { |
| if (jmxServerBean != null) { |
| MBeanRegistry.getInstance().unregister(jmxServerBean); |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to unregister with JMX", e); |
| } |
| jmxServerBean = null; |
| jmxDataTreeBean = null; |
| } |
| |
| public void incInProcess() { |
| requestsInProcess.incrementAndGet(); |
| } |
| |
| public void decInProcess() { |
| requestsInProcess.decrementAndGet(); |
| if (requestThrottler != null) { |
| requestThrottler.throttleWake(); |
| } |
| } |
| |
| public int getInProcess() { |
| return requestsInProcess.get(); |
| } |
| |
| public int getInflight() { |
| return requestThrottleInflight(); |
| } |
| |
| private int requestThrottleInflight() { |
| if (requestThrottler != null) { |
| return requestThrottler.getInflight(); |
| } |
| return 0; |
| } |
| |
| static class PrecalculatedDigest { |
| final long nodeDigest; |
| final long treeDigest; |
| |
| PrecalculatedDigest(long nodeDigest, long treeDigest) { |
| this.nodeDigest = nodeDigest; |
| this.treeDigest = treeDigest; |
| } |
| } |
| |
| |
| /** |
| * This structure is used to facilitate information sharing between PrepRP |
| * and FinalRP. |
| */ |
| static class ChangeRecord { |
| PrecalculatedDigest precalculatedDigest; |
| byte[] data; |
| |
| ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) { |
| this.zxid = zxid; |
| this.path = path; |
| this.stat = stat; |
| this.childCount = childCount; |
| this.acl = acl; |
| } |
| |
| long zxid; |
| |
| String path; |
| |
| StatPersisted stat; /* Make sure to create a new object when changing */ |
| |
| int childCount; |
| |
| List<ACL> acl; /* Make sure to create a new object when changing */ |
| |
| ChangeRecord duplicate(long zxid) { |
| StatPersisted stat = new StatPersisted(); |
| if (this.stat != null) { |
| DataTree.copyStatPersisted(this.stat, stat); |
| } |
| ChangeRecord changeRecord = new ChangeRecord(zxid, path, stat, childCount, |
| acl == null ? new ArrayList<>() : new ArrayList<>(acl)); |
| changeRecord.precalculatedDigest = precalculatedDigest; |
| changeRecord.data = data; |
| return changeRecord; |
| } |
| |
| } |
| |
| byte[] generatePasswd(long id) { |
| Random r = new Random(id ^ superSecret); |
| byte[] p = new byte[16]; |
| r.nextBytes(p); |
| return p; |
| } |
| |
| protected boolean checkPasswd(long sessionId, byte[] passwd) { |
| return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId)); |
| } |
| |
| long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { |
| if (passwd == null) { |
| // Possible since it's just deserialized from a packet on the wire. |
| passwd = new byte[0]; |
| } |
| long sessionId = sessionTracker.createSession(timeout); |
| Random r = new Random(sessionId ^ superSecret); |
| r.nextBytes(passwd); |
| CreateSessionTxn txn = new CreateSessionTxn(timeout); |
| cnxn.setSessionId(sessionId); |
| Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); |
| submitRequest(si); |
| return sessionId; |
| } |
| |
| /** |
| * set the owner of this session as owner |
| * @param id the session id |
| * @param owner the owner of the session |
| * @throws SessionExpiredException |
| */ |
| public void setOwner(long id, Object owner) throws SessionExpiredException { |
| sessionTracker.setOwner(id, owner); |
| } |
| |
| protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { |
| boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); |
| if (LOG.isTraceEnabled()) { |
| ZooTrace.logTraceMessage( |
| LOG, |
| ZooTrace.SESSION_TRACE_MASK, |
| "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); |
| } |
| finishSessionInit(cnxn, rc); |
| } |
| |
| public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { |
| if (checkPasswd(sessionId, passwd)) { |
| revalidateSession(cnxn, sessionId, sessionTimeout); |
| } else { |
| LOG.warn( |
| "Incorrect password from {} for session 0x{}", |
| cnxn.getRemoteSocketAddress(), |
| Long.toHexString(sessionId)); |
| finishSessionInit(cnxn, false); |
| } |
| } |
| |
| public void finishSessionInit(ServerCnxn cnxn, boolean valid) { |
| // register with JMX |
| try { |
| if (valid) { |
| if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { |
| serverCnxnFactory.registerConnection(cnxn); |
| } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { |
| secureServerCnxnFactory.registerConnection(cnxn); |
| } |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| } |
| |
| try { |
| ConnectResponse rsp = new ConnectResponse( |
| 0, |
| valid ? cnxn.getSessionTimeout() : 0, |
| valid ? cnxn.getSessionId() : 0, // send 0 if session is no |
| // longer valid |
| valid ? generatePasswd(cnxn.getSessionId()) : new byte[16], |
| this instanceof ReadOnlyZooKeeperServer); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); |
| bos.writeInt(-1, "len"); |
| rsp.serialize(bos, "connect"); |
| baos.close(); |
| ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); |
| bb.putInt(bb.remaining() - 4).rewind(); |
| cnxn.sendBuffer(bb); |
| |
| if (valid) { |
| LOG.debug( |
| "Established session 0x{} with negotiated timeout {} for client {}", |
| Long.toHexString(cnxn.getSessionId()), |
| cnxn.getSessionTimeout(), |
| cnxn.getRemoteSocketAddress()); |
| cnxn.enableRecv(); |
| } else { |
| |
| LOG.info( |
| "Invalid session 0x{} for client {}, probably expired", |
| Long.toHexString(cnxn.getSessionId()), |
| cnxn.getRemoteSocketAddress()); |
| cnxn.sendBuffer(ServerCnxnFactory.closeConn); |
| } |
| |
| } catch (Exception e) { |
| LOG.warn("Exception while establishing session, closing", e); |
| cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT); |
| } |
| } |
| |
| public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) { |
| closeSession(cnxn.getSessionId()); |
| } |
| |
| public long getServerId() { |
| return 0; |
| } |
| |
| /** |
| * If the underlying Zookeeper server support local session, this method |
| * will set a isLocalSession to true if a request is associated with |
| * a local session. |
| * |
| * @param si |
| */ |
| protected void setLocalSessionFlag(Request si) { |
| } |
| |
| public void submitRequest(Request si) { |
| if (restoreLatch != null) { |
| try { |
| LOG.info("Blocking request submission while restore is in progress"); |
| restoreLatch.await(); |
| } catch (final InterruptedException e) { |
| LOG.warn("Unexpected interruption", e); |
| } |
| } |
| enqueueRequest(si); |
| } |
| |
| public void enqueueRequest(Request si) { |
| if (requestThrottler == null) { |
| synchronized (this) { |
| try { |
| // Since all requests are passed to the request |
| // processor it should wait for setting up the request |
| // processor chain. The state will be updated to RUNNING |
| // after the setup. |
| while (state == State.INITIAL) { |
| wait(1000); |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("Unexpected interruption", e); |
| } |
| if (requestThrottler == null) { |
| throw new RuntimeException("Not started"); |
| } |
| } |
| } |
| requestThrottler.submitRequest(si); |
| } |
| |
| public void submitRequestNow(Request si) { |
| if (firstProcessor == null) { |
| synchronized (this) { |
| try { |
| // Since all requests are passed to the request |
| // processor it should wait for setting up the request |
| // processor chain. The state will be updated to RUNNING |
| // after the setup. |
| while (state == State.INITIAL) { |
| wait(1000); |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("Unexpected interruption", e); |
| } |
| if (firstProcessor == null || state != State.RUNNING) { |
| throw new RuntimeException("Not started"); |
| } |
| } |
| } |
| try { |
| touch(si.cnxn); |
| boolean validpacket = Request.isValid(si.type); |
| if (validpacket) { |
| setLocalSessionFlag(si); |
| firstProcessor.processRequest(si); |
| if (si.cnxn != null) { |
| incInProcess(); |
| } |
| } else { |
| LOG.warn("Received packet at server of unknown type {}", si.type); |
| // Update request accounting/throttling limits |
| requestFinished(si); |
| new UnimplementedRequestProcessor().processRequest(si); |
| } |
| } catch (MissingSessionException e) { |
| LOG.debug("Dropping request.", e); |
| // Update request accounting/throttling limits |
| requestFinished(si); |
| } catch (RequestProcessorException e) { |
| LOG.error("Unable to process request", e); |
| // Update request accounting/throttling limits |
| requestFinished(si); |
| } |
| } |
| |
| public static int getSnapCount() { |
| int snapCount = Integer.getInteger(SNAP_COUNT, DEFAULT_SNAP_COUNT); |
| // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor |
| if (snapCount < 2) { |
| LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2"); |
| snapCount = 2; |
| } |
| return snapCount; |
| } |
| |
| public int getGlobalOutstandingLimit() { |
| return Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, DEFAULT_GLOBAL_OUTSTANDING_LIMIT); |
| } |
| |
| public static long getSnapSizeInBytes() { |
| long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default |
| if (size <= 0) { |
| LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size); |
| } |
| return size * 1024; // Convert to bytes |
| } |
| |
| public void setServerCnxnFactory(ServerCnxnFactory factory) { |
| serverCnxnFactory = factory; |
| } |
| |
| public ServerCnxnFactory getServerCnxnFactory() { |
| return serverCnxnFactory; |
| } |
| |
| public ServerCnxnFactory getSecureServerCnxnFactory() { |
| return secureServerCnxnFactory; |
| } |
| |
| public void setSecureServerCnxnFactory(ServerCnxnFactory factory) { |
| secureServerCnxnFactory = factory; |
| } |
| |
| /** |
| * return the last processed id from the |
| * datatree |
| */ |
| public long getLastProcessedZxid() { |
| return zkDb.getDataTreeLastProcessedZxid(); |
| } |
| |
| /** |
| * return the outstanding requests |
| * in the queue, which haven't been |
| * processed yet |
| */ |
| public long getOutstandingRequests() { |
| return getInProcess(); |
| } |
| |
| /** |
| * return the total number of client connections that are alive |
| * to this server |
| */ |
| public int getNumAliveConnections() { |
| int numAliveConnections = 0; |
| |
| if (serverCnxnFactory != null) { |
| numAliveConnections += serverCnxnFactory.getNumAliveConnections(); |
| } |
| |
| if (secureServerCnxnFactory != null) { |
| numAliveConnections += secureServerCnxnFactory.getNumAliveConnections(); |
| } |
| |
| return numAliveConnections; |
| } |
| |
| /** |
| * truncate the log to get in sync with others |
| * if in a quorum |
| * @param zxid the zxid that it needs to get in sync |
| * with others |
| * @throws IOException |
| */ |
| public void truncateLog(long zxid) throws IOException { |
| this.zkDb.truncateLog(zxid); |
| } |
| |
| public int getTickTime() { |
| return tickTime; |
| } |
| |
| public void setTickTime(int tickTime) { |
| LOG.info("tickTime set to {} ms", tickTime); |
| this.tickTime = tickTime; |
| } |
| |
| public static int getThrottledOpWaitTime() { |
| return throttledOpWaitTime; |
| } |
| |
| public static void setThrottledOpWaitTime(int time) { |
| LOG.info("throttledOpWaitTime set to {} ms", time); |
| throttledOpWaitTime = time; |
| } |
| |
| public int getMinSessionTimeout() { |
| return minSessionTimeout; |
| } |
| |
| public void setMinSessionTimeout(int min) { |
| this.minSessionTimeout = min == -1 ? tickTime * 2 : min; |
| LOG.info("minSessionTimeout set to {} ms", this.minSessionTimeout); |
| } |
| |
| public int getMaxSessionTimeout() { |
| return maxSessionTimeout; |
| } |
| |
| public void setMaxSessionTimeout(int max) { |
| this.maxSessionTimeout = max == -1 ? tickTime * 20 : max; |
| LOG.info("maxSessionTimeout set to {} ms", this.maxSessionTimeout); |
| } |
| |
| public int getClientPortListenBacklog() { |
| return listenBacklog; |
| } |
| |
| public void setClientPortListenBacklog(int backlog) { |
| this.listenBacklog = backlog; |
| LOG.info("clientPortListenBacklog set to {}", backlog); |
| } |
| |
| public int getClientPort() { |
| return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1; |
| } |
| |
| public int getSecureClientPort() { |
| return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1; |
| } |
| |
| /** Maximum number of connections allowed from particular host (ip) */ |
| public int getMaxClientCnxnsPerHost() { |
| if (serverCnxnFactory != null) { |
| return serverCnxnFactory.getMaxClientCnxnsPerHost(); |
| } |
| if (secureServerCnxnFactory != null) { |
| return secureServerCnxnFactory.getMaxClientCnxnsPerHost(); |
| } |
| return -1; |
| } |
| |
| public void setTxnLogFactory(FileTxnSnapLog txnLog) { |
| this.txnLogFactory = txnLog; |
| } |
| |
| public FileTxnSnapLog getTxnLogFactory() { |
| return this.txnLogFactory; |
| } |
| |
| /** |
| * Returns the elapsed sync of time of transaction log in milliseconds. |
| */ |
| public long getTxnLogElapsedSyncTime() { |
| return txnLogFactory.getTxnLogElapsedSyncTime(); |
| } |
| |
| public String getState() { |
| return "standalone"; |
| } |
| |
| public void dumpEphemerals(PrintWriter pwriter) { |
| zkDb.dumpEphemerals(pwriter); |
| } |
| |
| public Map<Long, Set<String>> getEphemerals() { |
| return zkDb.getEphemerals(); |
| } |
| |
| public double getConnectionDropChance() { |
| return connThrottle.getDropChance(); |
| } |
| |
| @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup") |
| public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException { |
| LOG.debug( |
| "Session establishment request from client {} client's lastZxid is 0x{}", |
| cnxn.getRemoteSocketAddress(), |
| Long.toHexString(request.getLastZxidSeen())); |
| |
| long sessionId = request.getSessionId(); |
| int tokensNeeded = 1; |
| if (connThrottle.isConnectionWeightEnabled()) { |
| if (sessionId == 0) { |
| if (localSessionEnabled) { |
| tokensNeeded = connThrottle.getRequiredTokensForLocal(); |
| } else { |
| tokensNeeded = connThrottle.getRequiredTokensForGlobal(); |
| } |
| } else { |
| tokensNeeded = connThrottle.getRequiredTokensForRenew(); |
| } |
| } |
| |
| if (!connThrottle.checkLimit(tokensNeeded)) { |
| throw new ClientCnxnLimitException(); |
| } |
| ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); |
| ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); |
| |
| if (!cnxn.protocolManager.isReadonlyAvailable()) { |
| LOG.warn( |
| "Connection request from old client {}; will be dropped if server is in r-o mode", |
| cnxn.getRemoteSocketAddress()); |
| } |
| |
| if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) { |
| String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); |
| LOG.info(msg); |
| throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); |
| } |
| if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { |
| String msg = "Refusing session(0x" |
| + Long.toHexString(sessionId) |
| + ") request for client " |
| + cnxn.getRemoteSocketAddress() |
| + " as it has seen zxid 0x" |
| + Long.toHexString(request.getLastZxidSeen()) |
| + " our last zxid is 0x" |
| + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) |
| + " client must try another server"; |
| |
| LOG.info(msg); |
| throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); |
| } |
| int sessionTimeout = request.getTimeOut(); |
| byte[] passwd = request.getPasswd(); |
| int minSessionTimeout = getMinSessionTimeout(); |
| if (sessionTimeout < minSessionTimeout) { |
| sessionTimeout = minSessionTimeout; |
| } |
| int maxSessionTimeout = getMaxSessionTimeout(); |
| if (sessionTimeout > maxSessionTimeout) { |
| sessionTimeout = maxSessionTimeout; |
| } |
| cnxn.setSessionTimeout(sessionTimeout); |
| // We don't want to receive any packets until we are sure that the |
| // session is setup |
| cnxn.disableRecv(); |
| if (sessionId == 0) { |
| long id = createSession(cnxn, passwd, sessionTimeout); |
| LOG.debug( |
| "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", |
| Long.toHexString(id), |
| Long.toHexString(request.getLastZxidSeen()), |
| request.getTimeOut(), |
| cnxn.getRemoteSocketAddress()); |
| } else { |
| validateSession(cnxn, sessionId); |
| LOG.debug( |
| "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", |
| Long.toHexString(sessionId), |
| Long.toHexString(request.getLastZxidSeen()), |
| request.getTimeOut(), |
| cnxn.getRemoteSocketAddress()); |
| if (serverCnxnFactory != null) { |
| serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); |
| } |
| if (secureServerCnxnFactory != null) { |
| secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); |
| } |
| cnxn.setSessionId(sessionId); |
| reopenSession(cnxn, sessionId, passwd, sessionTimeout); |
| ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1); |
| |
| } |
| } |
| |
| /** |
| * Validate if a particular session can be reestablished. |
| * |
| * @param cnxn |
| * @param sessionId |
| */ |
| protected void validateSession(ServerCnxn cnxn, long sessionId) |
| throws IOException { |
| // do nothing |
| } |
| |
| public boolean shouldThrottle(long outStandingCount) { |
| int globalOutstandingLimit = getGlobalOutstandingLimit(); |
| if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) { |
| return outStandingCount > 0; |
| } |
| return false; |
| } |
| |
| long getFlushDelay() { |
| return flushDelay; |
| } |
| |
| static void setFlushDelay(long delay) { |
| LOG.info("{} = {} ms", FLUSH_DELAY, delay); |
| flushDelay = delay; |
| } |
| |
| long getMaxWriteQueuePollTime() { |
| return maxWriteQueuePollTime; |
| } |
| |
| static void setMaxWriteQueuePollTime(long maxTime) { |
| LOG.info("{} = {} ms", MAX_WRITE_QUEUE_POLL_SIZE, maxTime); |
| maxWriteQueuePollTime = maxTime; |
| } |
| |
| int getMaxBatchSize() { |
| return maxBatchSize; |
| } |
| |
| static void setMaxBatchSize(int size) { |
| LOG.info("{}={}", MAX_BATCH_SIZE, size); |
| maxBatchSize = size; |
| } |
| |
| private void initLargeRequestThrottlingSettings() { |
| setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes)); |
| setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1)); |
| } |
| |
| public int getLargeRequestMaxBytes() { |
| return largeRequestMaxBytes; |
| } |
| |
| public void setLargeRequestMaxBytes(int bytes) { |
| if (bytes <= 0) { |
| LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes); |
| LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes); |
| } else { |
| largeRequestMaxBytes = bytes; |
| LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes); |
| } |
| } |
| |
| public int getLargeRequestThreshold() { |
| return largeRequestThreshold; |
| } |
| |
| public void setLargeRequestThreshold(int threshold) { |
| if (threshold == 0 || threshold < -1) { |
| LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold); |
| largeRequestThreshold = -1; |
| } else { |
| largeRequestThreshold = threshold; |
| LOG.info("The large request threshold is set to {}", largeRequestThreshold); |
| } |
| } |
| |
| public int getLargeRequestBytes() { |
| return currentLargeRequestBytes.get(); |
| } |
| |
| private boolean isLargeRequest(int length) { |
| // The large request limit is disabled when threshold is -1 |
| if (largeRequestThreshold == -1) { |
| return false; |
| } |
| return length > largeRequestThreshold; |
| } |
| |
| public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException { |
| if (!isLargeRequest(length)) { |
| return true; |
| } |
| if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) { |
| return true; |
| } else { |
| ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); |
| throw new IOException("Rejecting large request"); |
| } |
| |
| } |
| |
| private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException { |
| if (!isLargeRequest(length)) { |
| return true; |
| } |
| |
| int bytes = currentLargeRequestBytes.addAndGet(length); |
| if (bytes > largeRequestMaxBytes) { |
| currentLargeRequestBytes.addAndGet(-length); |
| ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); |
| throw new IOException("Rejecting large request"); |
| } |
| return true; |
| } |
| |
| public void requestFinished(Request request) { |
| int largeRequestLength = request.getLargeRequestSize(); |
| if (largeRequestLength != -1) { |
| currentLargeRequestBytes.addAndGet(-largeRequestLength); |
| } |
| } |
| |
| public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException { |
| // Need to increase the outstanding request count first, otherwise |
| // there might be a race condition that it enabled recv after |
| // processing request and then disabled when check throttling. |
| // |
| // Be aware that we're actually checking the global outstanding |
| // request before this request. |
| // |
| // It's fine if the IOException thrown before we decrease the count |
| // in cnxn, since it will close the cnxn anyway. |
| cnxn.incrOutstandingAndCheckThrottle(h); |
| |
| if (h.getType() == OpCode.auth) { |
| LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); |
| AuthPacket authPacket = request.readRecord(AuthPacket::new); |
| String scheme = authPacket.getScheme(); |
| ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); |
| Code authReturn = KeeperException.Code.AUTHFAILED; |
| if (ap != null) { |
| try { |
| // handleAuthentication may close the connection, to allow the client to choose |
| // a different server to connect to. |
| authReturn = ap.handleAuthentication( |
| new ServerAuthenticationProvider.ServerObjs(this, cnxn), |
| authPacket.getAuth()); |
| } catch (RuntimeException e) { |
| LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); |
| authReturn = KeeperException.Code.AUTHFAILED; |
| } |
| } |
| if (authReturn == KeeperException.Code.OK) { |
| LOG.info("Session 0x{}: auth success for scheme {} and address {}", |
| Long.toHexString(cnxn.getSessionId()), scheme, |
| cnxn.getRemoteSocketAddress()); |
| ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); |
| cnxn.sendResponse(rh, null, null); |
| } else { |
| if (ap == null) { |
| LOG.warn( |
| "No authentication provider for scheme: {} has {}", |
| scheme, |
| ProviderRegistry.listProviders()); |
| } else { |
| LOG.warn("Authentication failed for scheme: {}", scheme); |
| } |
| // send a response... |
| ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); |
| cnxn.sendResponse(rh, null, null); |
| // ... and close connection |
| cnxn.sendBuffer(ServerCnxnFactory.closeConn); |
| cnxn.disableRecv(); |
| } |
| return; |
| } else if (h.getType() == OpCode.sasl) { |
| processSasl(request, cnxn, h); |
| } else { |
| if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { |
| // Authentication enforcement is failed |
| // Already sent response to user about failure and closed the session, lets return |
| return; |
| } else { |
| Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo()); |
| int length = request.limit(); |
| if (isLargeRequest(length)) { |
| // checkRequestSize will throw IOException if request is rejected |
| checkRequestSizeWhenMessageReceived(length); |
| si.setLargeRequestSize(length); |
| } |
| si.setOwner(ServerCnxn.me); |
| submitRequest(si); |
| } |
| } |
| } |
| |
| private static boolean isSaslSuperUser(String id) { |
| if (id == null || id.isEmpty()) { |
| return false; |
| } |
| |
| Properties properties = System.getProperties(); |
| int prefixLen = SASL_SUPER_USER.length(); |
| |
| for (String k : properties.stringPropertyNames()) { |
| if (k.startsWith(SASL_SUPER_USER) |
| && (k.length() == prefixLen || k.charAt(prefixLen) == '.')) { |
| String value = properties.getProperty(k); |
| |
| if (value != null && value.equals(id)) { |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| private static boolean shouldAllowSaslFailedClientsConnect() { |
| return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); |
| } |
| |
| private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { |
| LOG.debug("Responding to client SASL token."); |
| GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new); |
| byte[] clientToken = clientTokenRecord.getToken(); |
| LOG.debug("Size of client SASL token: {}", clientToken.length); |
| byte[] responseToken = null; |
| try { |
| ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer; |
| try { |
| // note that clientToken might be empty (clientToken.length == 0): |
| // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the |
| // SASL negotiation process. |
| responseToken = saslServer.evaluateResponse(clientToken); |
| if (saslServer.isComplete()) { |
| String authorizationID = saslServer.getAuthorizationID(); |
| LOG.info("Session 0x{}: adding SASL authorization for authorizationID: {}", |
| Long.toHexString(cnxn.getSessionId()), authorizationID); |
| cnxn.addAuthInfo(new Id("sasl", authorizationID)); |
| |
| if (isSaslSuperUser(authorizationID)) { |
| cnxn.addAuthInfo(new Id("super", "")); |
| LOG.info( |
| "Session 0x{}: Authenticated Id '{}' as super user", |
| Long.toHexString(cnxn.getSessionId()), |
| authorizationID); |
| } |
| } |
| } catch (SaslException e) { |
| LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e); |
| if (shouldAllowSaslFailedClientsConnect() && !authHelper.isSaslAuthRequired()) { |
| LOG.warn("Maintaining client connection despite SASL authentication failure."); |
| } else { |
| int error; |
| if (authHelper.isSaslAuthRequired()) { |
| LOG.warn( |
| "Closing client connection due to server requires client SASL authenticaiton," |
| + "but client SASL authentication has failed, or client is not configured with SASL " |
| + "authentication."); |
| error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue(); |
| } else { |
| LOG.warn("Closing client connection due to SASL authentication failure."); |
| error = Code.AUTHFAILED.intValue(); |
| } |
| |
| ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error); |
| cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response"); |
| cnxn.sendCloseSession(); |
| cnxn.disableRecv(); |
| return; |
| } |
| } |
| } catch (NullPointerException e) { |
| LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly."); |
| } |
| if (responseToken != null) { |
| LOG.debug("Size of server SASL response: {}", responseToken.length); |
| } |
| |
| ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue()); |
| Record record = new SetSASLResponse(responseToken); |
| cnxn.sendResponse(replyHeader, record, "response"); |
| } |
| |
| // entry point for quorum/Learner.java |
| public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { |
| processTxnForSessionEvents(null, hdr, txn); |
| return processTxnInDB(hdr, txn, null); |
| } |
| |
| // entry point for FinalRequestProcessor.java |
| public ProcessTxnResult processTxn(Request request) { |
| TxnHeader hdr = request.getHdr(); |
| processTxnForSessionEvents(request, hdr, request.getTxn()); |
| |
| final boolean writeRequest = (hdr != null); |
| final boolean quorumRequest = request.isQuorum(); |
| |
| // return fast w/o synchronization when we get a read |
| if (!writeRequest && !quorumRequest) { |
| return new ProcessTxnResult(); |
| } |
| synchronized (outstandingChanges) { |
| ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest()); |
| |
| // request.hdr is set for write requests, which are the only ones |
| // that add to outstandingChanges. |
| if (writeRequest) { |
| long zxid = hdr.getZxid(); |
| while (!outstandingChanges.isEmpty() |
| && outstandingChanges.peek().zxid <= zxid) { |
| ChangeRecord cr = outstandingChanges.remove(); |
| ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); |
| if (cr.zxid < zxid) { |
| LOG.warn( |
| "Zxid outstanding 0x{} is less than current 0x{}", |
| Long.toHexString(cr.zxid), |
| Long.toHexString(zxid)); |
| } |
| if (outstandingChangesForPath.get(cr.path) == cr) { |
| outstandingChangesForPath.remove(cr.path); |
| } |
| } |
| } |
| |
| // do not add non quorum packets to the queue. |
| if (quorumRequest) { |
| getZKDatabase().addCommittedProposal(request); |
| } |
| return rc; |
| } |
| } |
| |
| private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { |
| int opCode = (request == null) ? hdr.getType() : request.type; |
| long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; |
| |
| if (opCode == OpCode.createSession) { |
| if (hdr != null && txn instanceof CreateSessionTxn) { |
| CreateSessionTxn cst = (CreateSessionTxn) txn; |
| sessionTracker.commitSession(sessionId, cst.getTimeOut()); |
| } else if (request == null || !request.isLocalSession()) { |
| LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString()); |
| } |
| } else if (opCode == OpCode.closeSession) { |
| sessionTracker.removeSession(sessionId); |
| } |
| } |
| |
| private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn, TxnDigest digest) { |
| if (hdr == null) { |
| return new ProcessTxnResult(); |
| } else { |
| return getZKDatabase().processTxn(hdr, txn, digest); |
| } |
| } |
| |
| public Map<Long, Set<Long>> getSessionExpiryMap() { |
| return sessionTracker.getSessionExpiryMap(); |
| } |
| |
| /** |
| * This method is used to register the ZooKeeperServerShutdownHandler to get |
| * server's error or shutdown state change notifications. |
| * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for |
| * every server state changes {@link #setState(State)}. |
| * |
| * @param zkShutdownHandler shutdown handler |
| */ |
| void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) { |
| this.zkShutdownHandler = zkShutdownHandler; |
| } |
| |
| public boolean isResponseCachingEnabled() { |
| return isResponseCachingEnabled; |
| } |
| |
| public void setResponseCachingEnabled(boolean isEnabled) { |
| isResponseCachingEnabled = isEnabled; |
| } |
| |
| public ResponseCache getReadResponseCache() { |
| return isResponseCachingEnabled ? readResponseCache : null; |
| } |
| |
| public ResponseCache getGetChildrenResponseCache() { |
| return isResponseCachingEnabled ? getChildrenResponseCache : null; |
| } |
| |
| protected void registerMetrics() { |
| MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); |
| |
| final ZKDatabase zkdb = this.getZKDatabase(); |
| final ServerStats stats = this.serverStats(); |
| |
| rootContext.registerGauge("avg_latency", stats::getAvgLatency); |
| |
| rootContext.registerGauge("max_latency", stats::getMaxLatency); |
| rootContext.registerGauge("min_latency", stats::getMinLatency); |
| |
| rootContext.registerGauge("packets_received", stats::getPacketsReceived); |
| rootContext.registerGauge("packets_sent", stats::getPacketsSent); |
| rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections); |
| |
| rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests); |
| rootContext.registerGauge("uptime", stats::getUptime); |
| |
| rootContext.registerGauge("znode_count", zkdb::getNodeCount); |
| |
| rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount); |
| rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount); |
| |
| rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize); |
| |
| rootContext.registerGauge("global_sessions", zkdb::getSessionCount); |
| rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount); |
| |
| OSMXBean osMbean = new OSMXBean(); |
| rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount); |
| rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount); |
| rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance); |
| |
| rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize); |
| rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize); |
| rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize); |
| |
| rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum); |
| rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount); |
| rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount); |
| rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount); |
| |
| rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE, |
| () -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree())); |
| rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE, |
| () -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree())); |
| rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE, |
| () -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree())); |
| rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE, |
| () -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree())); |
| } |
| |
| protected void unregisterMetrics() { |
| |
| MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); |
| |
| rootContext.unregisterGauge("avg_latency"); |
| |
| rootContext.unregisterGauge("max_latency"); |
| rootContext.unregisterGauge("min_latency"); |
| |
| rootContext.unregisterGauge("packets_received"); |
| rootContext.unregisterGauge("packets_sent"); |
| rootContext.unregisterGauge("num_alive_connections"); |
| |
| rootContext.unregisterGauge("outstanding_requests"); |
| rootContext.unregisterGauge("uptime"); |
| |
| rootContext.unregisterGauge("znode_count"); |
| |
| rootContext.unregisterGauge("watch_count"); |
| rootContext.unregisterGauge("ephemerals_count"); |
| rootContext.unregisterGauge("approximate_data_size"); |
| |
| rootContext.unregisterGauge("global_sessions"); |
| rootContext.unregisterGauge("local_sessions"); |
| |
| rootContext.unregisterGauge("open_file_descriptor_count"); |
| rootContext.unregisterGauge("max_file_descriptor_count"); |
| rootContext.unregisterGauge("connection_drop_probability"); |
| |
| rootContext.unregisterGauge("last_client_response_size"); |
| rootContext.unregisterGauge("max_client_response_size"); |
| rootContext.unregisterGauge("min_client_response_size"); |
| |
| rootContext.unregisterGauge("auth_failed_count"); |
| rootContext.unregisterGauge("non_mtls_remote_conn_count"); |
| rootContext.unregisterGauge("non_mtls_local_conn_count"); |
| |
| rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE); |
| rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE); |
| rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE); |
| rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE); |
| } |
| |
| /** |
| * Hook into admin server, useful to expose additional data |
| * that do not represent metrics. |
| * |
| * @param response a sink which collects the data. |
| */ |
| public void dumpMonitorValues(BiConsumer<String, Object> response) { |
| ServerStats stats = serverStats(); |
| response.accept("version", Version.getFullVersion()); |
| response.accept("server_state", stats.getServerState()); |
| } |
| |
| /** |
| * Grant or deny authorization to an operation on a node as a function of: |
| * @param cnxn : the server connection or null for admin server commands |
| * @param acl : set of ACLs for the node |
| * @param perm : the permission that the client is requesting |
| * @param ids : the credentials supplied by the client |
| * @param path : the ZNode path |
| * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null. |
| */ |
| public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException { |
| if (skipACL) { |
| return; |
| } |
| |
| LOG.debug("Permission requested: {} ", perm); |
| LOG.debug("ACLs for node: {}", acl); |
| LOG.debug("Client credentials: {}", ids); |
| |
| if (acl == null || acl.size() == 0) { |
| return; |
| } |
| for (Id authId : ids) { |
| if (authId.getScheme().equals("super")) { |
| return; |
| } |
| } |
| for (ACL a : acl) { |
| Id id = a.getId(); |
| if ((a.getPerms() & perm) != 0) { |
| if (id.getScheme().equals("world") && id.getId().equals("anyone")) { |
| return; |
| } |
| ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme()); |
| if (ap != null) { |
| for (Id authId : ids) { |
| if (authId.getScheme().equals(id.getScheme()) |
| && ap.matches( |
| new ServerAuthenticationProvider.ServerObjs(this, cnxn), |
| new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) { |
| return; |
| } |
| } |
| } |
| } |
| } |
| throw new KeeperException.NoAuthException(); |
| } |
| |
| /** |
| * check a path whether exceeded the quota. |
| * |
| * @param path |
| * the path of the node, used for the quota prefix check |
| * @param lastData |
| * the current node data, {@code null} for none |
| * @param data |
| * the data to be set, or {@code null} for none |
| * @param type |
| * currently, create and setData need to check quota |
| */ |
| public void checkQuota(String path, byte[] lastData, byte[] data, int type) throws KeeperException.QuotaExceededException { |
| if (!enforceQuota) { |
| return; |
| } |
| long dataBytes = (data == null) ? 0 : data.length; |
| ZKDatabase zkDatabase = getZKDatabase(); |
| String lastPrefix = zkDatabase.getDataTree().getMaxPrefixWithQuota(path); |
| if (StringUtils.isEmpty(lastPrefix)) { |
| return; |
| } |
| |
| final String namespace = PathUtils.getTopNamespace(path); |
| switch (type) { |
| case OpCode.create: |
| checkQuota(lastPrefix, dataBytes, 1, namespace); |
| break; |
| case OpCode.setData: |
| checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace); |
| break; |
| default: |
| throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type); |
| } |
| } |
| |
| /** |
| * check a path whether exceeded the quota. |
| * |
| * @param lastPrefix |
| the path of the node which has a quota. |
| * @param bytesDiff |
| * the diff to be added to number of bytes |
| * @param countDiff |
| * the diff to be added to the count |
| * @param namespace |
| * the namespace for collecting quota exceeded errors |
| */ |
| private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace) |
| throws KeeperException.QuotaExceededException { |
| LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff); |
| |
| // now check the quota we set |
| String limitNode = Quotas.limitPath(lastPrefix); |
| DataNode node = getZKDatabase().getNode(limitNode); |
| StatsTrack limitStats; |
| if (node == null) { |
| // should not happen |
| LOG.error("Missing limit node for quota {}", limitNode); |
| return; |
| } |
| synchronized (node) { |
| limitStats = new StatsTrack(node.data); |
| } |
| //check the quota |
| boolean checkCountQuota = countDiff != 0 && (limitStats.getCount() > -1 || limitStats.getCountHardLimit() > -1); |
| boolean checkByteQuota = bytesDiff != 0 && (limitStats.getBytes() > -1 || limitStats.getByteHardLimit() > -1); |
| |
| if (!checkCountQuota && !checkByteQuota) { |
| return; |
| } |
| |
| //check the statPath quota |
| String statNode = Quotas.statPath(lastPrefix); |
| node = getZKDatabase().getNode(statNode); |
| |
| StatsTrack currentStats; |
| if (node == null) { |
| // should not happen |
| LOG.error("Missing node for stat {}", statNode); |
| return; |
| } |
| synchronized (node) { |
| currentStats = new StatsTrack(node.data); |
| } |
| |
| //check the Count Quota |
| if (checkCountQuota) { |
| long newCount = currentStats.getCount() + countDiff; |
| boolean isCountHardLimit = limitStats.getCountHardLimit() > -1; |
| long countLimit = isCountHardLimit ? limitStats.getCountHardLimit() : limitStats.getCount(); |
| |
| if (newCount > countLimit) { |
| String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]"; |
| RATE_LOGGER.rateLimitLog(msg); |
| if (isCountHardLimit) { |
| updateQuotaExceededMetrics(namespace); |
| throw new KeeperException.QuotaExceededException(lastPrefix); |
| } |
| } |
| } |
| |
| //check the Byte Quota |
| if (checkByteQuota) { |
| long newBytes = currentStats.getBytes() + bytesDiff; |
| boolean isByteHardLimit = limitStats.getByteHardLimit() > -1; |
| long byteLimit = isByteHardLimit ? limitStats.getByteHardLimit() : limitStats.getBytes(); |
| if (newBytes > byteLimit) { |
| String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]"; |
| RATE_LOGGER.rateLimitLog(msg); |
| if (isByteHardLimit) { |
| updateQuotaExceededMetrics(namespace); |
| throw new KeeperException.QuotaExceededException(lastPrefix); |
| } |
| } |
| } |
| } |
| |
| public static boolean isDigestEnabled() { |
| return digestEnabled; |
| } |
| |
| public static void setDigestEnabled(boolean digestEnabled) { |
| LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); |
| ZooKeeperServer.digestEnabled = digestEnabled; |
| } |
| |
| public static boolean isSerializeLastProcessedZxidEnabled() { |
| return serializeLastProcessedZxidEnabled; |
| } |
| |
| public static void setSerializeLastProcessedZxidEnabled(boolean serializeLastZxidEnabled) { |
| serializeLastProcessedZxidEnabled = serializeLastZxidEnabled; |
| LOG.info("{} = {}", ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, serializeLastZxidEnabled); |
| } |
| |
| /** |
| * Trim a path to get the immediate predecessor. |
| * |
| * @param path |
| * @return |
| * @throws KeeperException.BadArgumentsException |
| */ |
| private String parentPath(String path) throws KeeperException.BadArgumentsException { |
| int lastSlash = path.lastIndexOf('/'); |
| if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) { |
| throw new KeeperException.BadArgumentsException(path); |
| } |
| return lastSlash == 0 ? "/" : path.substring(0, lastSlash); |
| } |
| |
| private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException { |
| boolean mustCheckACL = false; |
| String path = null; |
| List<ACL> acl = null; |
| |
| switch (request.type) { |
| case OpCode.create: |
| case OpCode.create2: { |
| CreateRequest req = request.readRequestRecordNoException(CreateRequest::new); |
| if (req != null) { |
| mustCheckACL = true; |
| acl = req.getAcl(); |
| path = parentPath(req.getPath()); |
| } |
| break; |
| } |
| case OpCode.delete: { |
| DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new); |
| if (req != null) { |
| path = parentPath(req.getPath()); |
| } |
| break; |
| } |
| case OpCode.setData: { |
| SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new); |
| if (req != null) { |
| path = req.getPath(); |
| } |
| break; |
| } |
| case OpCode.setACL: { |
| SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new); |
| if (req != null) { |
| mustCheckACL = true; |
| acl = req.getAcl(); |
| path = req.getPath(); |
| } |
| break; |
| } |
| } |
| |
| if (mustCheckACL) { |
| /* we ignore the extrapolated ACL returned by fixupACL because |
| * we only care about it being well-formed (and if it isn't, an |
| * exception will be raised). |
| */ |
| PrepRequestProcessor.fixupACL(path, request.authInfo, acl); |
| } |
| |
| return path; |
| } |
| |
| private int effectiveACLPerms(Request request) { |
| switch (request.type) { |
| case OpCode.create: |
| case OpCode.create2: |
| return ZooDefs.Perms.CREATE; |
| case OpCode.delete: |
| return ZooDefs.Perms.DELETE; |
| case OpCode.setData: |
| return ZooDefs.Perms.WRITE; |
| case OpCode.setACL: |
| return ZooDefs.Perms.ADMIN; |
| default: |
| return ZooDefs.Perms.ALL; |
| } |
| } |
| |
| /** |
| * Check Write Requests for Potential Access Restrictions |
| * <p/> |
| * Before a request is being proposed to the quorum, lets check it |
| * against local ACLs. Non-write requests (read, session, etc.) |
| * are passed along. Invalid requests are sent a response. |
| * <p/> |
| * While we are at it, if the request will set an ACL: make sure it's |
| * a valid one. |
| * |
| * @param request |
| * @return true if request is permitted, false if not. |
| * @throws java.io.IOException |
| */ |
| public boolean authWriteRequest(Request request) { |
| int err; |
| String pathToCheck; |
| |
| if (!enableEagerACLCheck) { |
| return true; |
| } |
| |
| err = KeeperException.Code.OK.intValue(); |
| |
| try { |
| pathToCheck = effectiveACLPath(request); |
| if (pathToCheck != null) { |
| checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null); |
| } |
| } catch (KeeperException.NoAuthException e) { |
| LOG.debug("Request failed ACL check", e); |
| err = e.code().intValue(); |
| } catch (KeeperException.InvalidACLException e) { |
| LOG.debug("Request has an invalid ACL check", e); |
| err = e.code().intValue(); |
| } catch (KeeperException.NoNodeException e) { |
| LOG.debug("ACL check against non-existent node: {}", e.getMessage()); |
| } catch (KeeperException.BadArgumentsException e) { |
| LOG.debug("ACL check against illegal node path: {}", e.getMessage()); |
| } catch (Throwable t) { |
| LOG.error("Uncaught exception in authWriteRequest with: ", t); |
| throw t; |
| } finally { |
| if (err != KeeperException.Code.OK.intValue()) { |
| /* This request has a bad ACL, so we are dismissing it early. */ |
| decInProcess(); |
| ReplyHeader rh = new ReplyHeader(request.cxid, 0, err); |
| try { |
| request.cnxn.sendResponse(rh, null, null); |
| } catch (IOException e) { |
| LOG.error("IOException : {}", e); |
| } |
| } |
| } |
| |
| return err == KeeperException.Code.OK.intValue(); |
| } |
| |
| public int getOutstandingHandshakeNum() { |
| if (serverCnxnFactory instanceof NettyServerCnxnFactory) { |
| return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum(); |
| } else { |
| return 0; |
| } |
| } |
| |
| public boolean isReconfigEnabled() { |
| return this.reconfigEnabled; |
| } |
| |
| public ZooKeeperServerShutdownHandler getZkShutdownHandler() { |
| return zkShutdownHandler; |
| } |
| |
| static void updateQuotaExceededMetrics(final String namespace) { |
| if (namespace == null) { |
| return; |
| } |
| ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1); |
| } |
| } |
| |