| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zookeeper.server; |
| |
| import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.PrintWriter; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.BiConsumer; |
| import javax.security.sasl.SaslException; |
| import org.apache.jute.BinaryInputArchive; |
| import org.apache.jute.BinaryOutputArchive; |
| import org.apache.jute.Record; |
| import org.apache.zookeeper.Environment; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.Code; |
| import org.apache.zookeeper.KeeperException.SessionExpiredException; |
| import org.apache.zookeeper.Version; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooDefs.OpCode; |
| import org.apache.zookeeper.ZookeeperBanner; |
| import org.apache.zookeeper.common.Time; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Id; |
| import org.apache.zookeeper.data.StatPersisted; |
| import org.apache.zookeeper.jmx.MBeanRegistry; |
| import org.apache.zookeeper.metrics.MetricsContext; |
| import org.apache.zookeeper.proto.AuthPacket; |
| import org.apache.zookeeper.proto.ConnectRequest; |
| import org.apache.zookeeper.proto.ConnectResponse; |
| import org.apache.zookeeper.proto.CreateRequest; |
| import org.apache.zookeeper.proto.DeleteRequest; |
| import org.apache.zookeeper.proto.GetSASLRequest; |
| import org.apache.zookeeper.proto.ReplyHeader; |
| import org.apache.zookeeper.proto.RequestHeader; |
| import org.apache.zookeeper.proto.SetACLRequest; |
| import org.apache.zookeeper.proto.SetDataRequest; |
| import org.apache.zookeeper.proto.SetSASLResponse; |
| import org.apache.zookeeper.server.DataTree.ProcessTxnResult; |
| import org.apache.zookeeper.server.RequestProcessor.RequestProcessorException; |
| import org.apache.zookeeper.server.ServerCnxn.CloseRequestException; |
| import org.apache.zookeeper.server.SessionTracker.Session; |
| import org.apache.zookeeper.server.SessionTracker.SessionExpirer; |
| import org.apache.zookeeper.server.auth.ProviderRegistry; |
| import org.apache.zookeeper.server.auth.ServerAuthenticationProvider; |
| import org.apache.zookeeper.server.persistence.FileTxnSnapLog; |
| import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; |
| import org.apache.zookeeper.server.util.JvmPauseMonitor; |
| import org.apache.zookeeper.server.util.OSMXBean; |
| import org.apache.zookeeper.server.util.RequestPathMetricsCollector; |
| import org.apache.zookeeper.txn.CreateSessionTxn; |
| import org.apache.zookeeper.txn.TxnHeader; |
| import org.apache.zookeeper.util.ServiceUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class implements a simple standalone ZooKeeperServer. It sets up the |
| * following chain of RequestProcessors to process requests: |
| * PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor |
| */ |
| public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider { |
| |
| protected static final Logger LOG; |
| |
| public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit"; |
| |
| public static final String ENABLE_EAGER_ACL_CHECK = "zookeeper.enableEagerACLCheck"; |
| public static final String SKIP_ACL = "zookeeper.skipACL"; |
| |
| // When enabled, will check ACL constraints appertained to the requests first, |
| // before sending the requests to the quorum. |
| static final boolean enableEagerACLCheck; |
| |
| static final boolean skipACL; |
| |
| public static final String ALLOW_SASL_FAILED_CLIENTS = "zookeeper.allowSaslFailedClients"; |
| public static final String SESSION_REQUIRE_CLIENT_SASL_AUTH = "zookeeper.sessionRequireClientSASLAuth"; |
| public static final String SASL_AUTH_SCHEME = "sasl"; |
| |
| public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled"; |
| private static boolean digestEnabled; |
| |
| // Add a enable/disable option for now, we should remove this one when |
| // this feature is confirmed to be stable |
| public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled"; |
| private static boolean closeSessionTxnEnabled = true; |
| |
| static { |
| LOG = LoggerFactory.getLogger(ZooKeeperServer.class); |
| |
| ZookeeperBanner.printBanner(LOG); |
| |
| Environment.logEnv("Server environment:", LOG); |
| |
| enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK); |
| LOG.info("{} = {}", ENABLE_EAGER_ACL_CHECK, enableEagerACLCheck); |
| |
| skipACL = System.getProperty(SKIP_ACL, "no").equals("yes"); |
| if (skipACL) { |
| LOG.info("{}==\"yes\", ACL checks will be skipped", SKIP_ACL); |
| } |
| |
| digestEnabled = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DIGEST_ENABLED, "true")); |
| LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); |
| |
| closeSessionTxnEnabled = Boolean.parseBoolean( |
| System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true")); |
| LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled); |
| } |
| |
| public static boolean isCloseSessionTxnEnabled() { |
| return closeSessionTxnEnabled; |
| } |
| |
| public static void setCloseSessionTxnEnabled(boolean enabled) { |
| ZooKeeperServer.closeSessionTxnEnabled = enabled; |
| LOG.info("Update {} to {}", CLOSE_SESSION_TXN_ENABLED, |
| ZooKeeperServer.closeSessionTxnEnabled); |
| } |
| |
| protected ZooKeeperServerBean jmxServerBean; |
| protected DataTreeBean jmxDataTreeBean; |
| |
| public static final int DEFAULT_TICK_TIME = 3000; |
| protected int tickTime = DEFAULT_TICK_TIME; |
| /** value of -1 indicates unset, use default */ |
| protected int minSessionTimeout = -1; |
| /** value of -1 indicates unset, use default */ |
| protected int maxSessionTimeout = -1; |
| /** Socket listen backlog. Value of -1 indicates unset */ |
| protected int listenBacklog = -1; |
| protected SessionTracker sessionTracker; |
| private FileTxnSnapLog txnLogFactory = null; |
| private ZKDatabase zkDb; |
| private ResponseCache readResponseCache; |
| private ResponseCache getChildrenResponseCache; |
| private final AtomicLong hzxid = new AtomicLong(0); |
| public static final Exception ok = new Exception("No prob"); |
| protected RequestProcessor firstProcessor; |
| protected JvmPauseMonitor jvmPauseMonitor; |
| protected volatile State state = State.INITIAL; |
| private boolean isResponseCachingEnabled = true; |
| /* contains the configuration file content read at startup */ |
| protected String initialConfig; |
| private final RequestPathMetricsCollector requestPathMetricsCollector; |
| |
| private boolean localSessionEnabled = false; |
| protected enum State { |
| INITIAL, |
| RUNNING, |
| SHUTDOWN, |
| ERROR |
| } |
| |
| /** |
| * This is the secret that we use to generate passwords. For the moment, |
| * it's more of a checksum that's used in reconnection, which carries no |
| * security weight, and is treated internally as if it carries no |
| * security weight. |
| */ |
| private static final long superSecret = 0XB3415C00L; |
| |
| private final AtomicInteger requestsInProcess = new AtomicInteger(0); |
| final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>(); |
| // this data structure must be accessed under the outstandingChanges lock |
| final Map<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>(); |
| |
| protected ServerCnxnFactory serverCnxnFactory; |
| protected ServerCnxnFactory secureServerCnxnFactory; |
| |
| private final ServerStats serverStats; |
| private final ZooKeeperServerListener listener; |
| private ZooKeeperServerShutdownHandler zkShutdownHandler; |
| private volatile int createSessionTrackerServerId = 1; |
| |
| private static final String FLUSH_DELAY = "zookeeper.flushDelay"; |
| private static volatile long flushDelay; |
| private static final String MAX_WRITE_QUEUE_POLL_SIZE = "zookeeper.maxWriteQueuePollTime"; |
| private static volatile long maxWriteQueuePollTime; |
| private static final String MAX_BATCH_SIZE = "zookeeper.maxBatchSize"; |
| private static volatile int maxBatchSize; |
| |
| /** |
| * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes. |
| * Flag not used for small transfers like connectResponses. |
| */ |
| public static final String INT_BUFFER_STARTING_SIZE_BYTES = "zookeeper.intBufferStartingSizeBytes"; |
| public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024; |
| public static final int intBufferStartingSizeBytes; |
| |
| public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize"; |
| public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize"; |
| |
| static { |
| long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0); |
| setFlushDelay(configuredFlushDelay); |
| setMaxWriteQueuePollTime(Long.getLong(MAX_WRITE_QUEUE_POLL_SIZE, configuredFlushDelay / 3)); |
| setMaxBatchSize(Integer.getInteger(MAX_BATCH_SIZE, 1000)); |
| |
| intBufferStartingSizeBytes = Integer.getInteger(INT_BUFFER_STARTING_SIZE_BYTES, DEFAULT_STARTING_BUFFER_SIZE); |
| |
| if (intBufferStartingSizeBytes < 32) { |
| String msg = "Buffer starting size must be greater than 0." |
| + "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" "; |
| LOG.error(msg); |
| throw new IllegalArgumentException(msg); |
| } |
| |
| LOG.info("{} = {}", INT_BUFFER_STARTING_SIZE_BYTES, intBufferStartingSizeBytes); |
| } |
| |
| // Connection throttling |
| private BlueThrottle connThrottle = new BlueThrottle(); |
| |
| @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = |
| "Internally the throttler has a BlockingQueue so " |
| + "once the throttler is created and started, it is thread-safe") |
| private RequestThrottler requestThrottler; |
| public static final String SNAP_COUNT = "zookeeper.snapCount"; |
| |
| /** |
| * This setting sets a limit on the total number of large requests that |
| * can be inflight and is designed to prevent ZooKeeper from accepting |
| * too many large requests such that the JVM runs out of usable heap and |
| * ultimately crashes. |
| * |
| * The limit is enforced by the {@link checkRequestSize(int, boolean)} |
| * method which is called by the connection layer ({@link NIOServerCnxn}, |
| * {@link NettyServerCnxn}) before allocating a byte buffer and pulling |
| * data off the TCP socket. The limit is then checked again by the |
| * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which |
| * also atomically updates {@link currentLargeRequestBytes}. The request is |
| * then marked as a large request, with the request size stored in the Request |
| * object so that it can later be decremented from {@link currentLargeRequestsBytes}. |
| * |
| * When a request is completed or dropped, the relevant code path calls the |
| * {@link requestFinished(Request)} method which performs the decrement if |
| * needed. |
| */ |
| private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; |
| |
| /** |
| * The size threshold after which a request is considered a large request |
| * and is checked against the large request byte limit. |
| */ |
| private volatile int largeRequestThreshold = -1; |
| |
| private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); |
| |
| void removeCnxn(ServerCnxn cnxn) { |
| zkDb.removeCnxn(cnxn); |
| } |
| |
| /** |
| * Creates a ZooKeeperServer instance. Nothing is setup, use the setX |
| * methods to prepare the instance (eg datadir, datalogdir, ticktime, |
| * builder, etc...) |
| * |
| */ |
| public ZooKeeperServer() { |
| listener = new ZooKeeperServerListenerImpl(this); |
| serverStats = new ServerStats(this); |
| this.requestPathMetricsCollector = new RequestPathMetricsCollector(); |
| } |
| |
| /** |
| * Creates a ZooKeeperServer instance. It sets everything up, but doesn't |
| * actually start listening for clients until run() is invoked. |
| * |
| */ |
| public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { |
| serverStats = new ServerStats(this); |
| this.txnLogFactory = txnLogFactory; |
| this.txnLogFactory.setServerStats(this.serverStats); |
| this.zkDb = zkDb; |
| this.tickTime = tickTime; |
| setMinSessionTimeout(minSessionTimeout); |
| setMaxSessionTimeout(maxSessionTimeout); |
| this.listenBacklog = clientPortListenBacklog; |
| |
| listener = new ZooKeeperServerListenerImpl(this); |
| |
| readResponseCache = new ResponseCache(Integer.getInteger( |
| GET_DATA_RESPONSE_CACHE_SIZE, |
| ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE)); |
| |
| getChildrenResponseCache = new ResponseCache(Integer.getInteger( |
| GET_CHILDREN_RESPONSE_CACHE_SIZE, |
| ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE)); |
| |
| this.initialConfig = initialConfig; |
| |
| this.requestPathMetricsCollector = new RequestPathMetricsCollector(); |
| |
| this.initLargeRequestThrottlingSettings(); |
| |
| LOG.info( |
| "Created server with" |
| + " tickTime {}" |
| + " minSessionTimeout {}" |
| + " maxSessionTimeout {}" |
| + " clientPortListenBacklog {}" |
| + " datadir {}" |
| + " snapdir {}", |
| tickTime, |
| getMinSessionTimeout(), |
| getMaxSessionTimeout(), |
| getClientPortListenBacklog(), |
| txnLogFactory.getDataDir(), |
| txnLogFactory.getSnapDir()); |
| } |
| |
| public String getInitialConfig() { |
| return initialConfig; |
| } |
| |
| /** |
| * Adds JvmPauseMonitor and calls |
| * {@link #ZooKeeperServer(FileTxnSnapLog, int, int, int, int, ZKDatabase, String)} |
| * |
| */ |
| public ZooKeeperServer(JvmPauseMonitor jvmPauseMonitor, FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig) { |
| this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig); |
| this.jvmPauseMonitor = jvmPauseMonitor; |
| if (jvmPauseMonitor != null) { |
| LOG.info("Added JvmPauseMonitor to server"); |
| } |
| } |
| |
| /** |
| * creates a zookeeperserver instance. |
| * @param txnLogFactory the file transaction snapshot logging class |
| * @param tickTime the ticktime for the server |
| * @throws IOException |
| */ |
| public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initialConfig) throws IOException { |
| this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig); |
| } |
| |
| public ServerStats serverStats() { |
| return serverStats; |
| } |
| |
| public RequestPathMetricsCollector getRequestPathMetricsCollector() { |
| return requestPathMetricsCollector; |
| } |
| |
| public BlueThrottle connThrottle() { |
| return connThrottle; |
| } |
| |
| public void dumpConf(PrintWriter pwriter) { |
| pwriter.print("clientPort="); |
| pwriter.println(getClientPort()); |
| pwriter.print("secureClientPort="); |
| pwriter.println(getSecureClientPort()); |
| pwriter.print("dataDir="); |
| pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath()); |
| pwriter.print("dataDirSize="); |
| pwriter.println(getDataDirSize()); |
| pwriter.print("dataLogDir="); |
| pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath()); |
| pwriter.print("dataLogSize="); |
| pwriter.println(getLogDirSize()); |
| pwriter.print("tickTime="); |
| pwriter.println(getTickTime()); |
| pwriter.print("maxClientCnxns="); |
| pwriter.println(getMaxClientCnxnsPerHost()); |
| pwriter.print("minSessionTimeout="); |
| pwriter.println(getMinSessionTimeout()); |
| pwriter.print("maxSessionTimeout="); |
| pwriter.println(getMaxSessionTimeout()); |
| pwriter.print("clientPortListenBacklog="); |
| pwriter.println(getClientPortListenBacklog()); |
| |
| pwriter.print("serverId="); |
| pwriter.println(getServerId()); |
| } |
| |
| public ZooKeeperServerConf getConf() { |
| return new ZooKeeperServerConf( |
| getClientPort(), |
| zkDb.snapLog.getSnapDir().getAbsolutePath(), |
| zkDb.snapLog.getDataDir().getAbsolutePath(), |
| getTickTime(), |
| getMaxClientCnxnsPerHost(), |
| getMinSessionTimeout(), |
| getMaxSessionTimeout(), |
| getServerId(), |
| getClientPortListenBacklog()); |
| } |
| |
| /** |
| * This constructor is for backward compatibility with the existing unit |
| * test code. |
| * It defaults to FileLogProvider persistence provider. |
| */ |
| public ZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException { |
| this(new FileTxnSnapLog(snapDir, logDir), tickTime, ""); |
| } |
| |
| /** |
| * Default constructor, relies on the config for its argument values |
| * |
| * @throws IOException |
| */ |
| public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException { |
| this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, -1, new ZKDatabase(txnLogFactory), ""); |
| } |
| |
| /** |
| * get the zookeeper database for this server |
| * @return the zookeeper database for this server |
| */ |
| public ZKDatabase getZKDatabase() { |
| return this.zkDb; |
| } |
| |
| /** |
| * set the zkdatabase for this zookeeper server |
| * @param zkDb |
| */ |
| public void setZKDatabase(ZKDatabase zkDb) { |
| this.zkDb = zkDb; |
| } |
| |
| /** |
| * Restore sessions and data |
| */ |
| public void loadData() throws IOException, InterruptedException { |
| /* |
| * When a new leader starts executing Leader#lead, it |
| * invokes this method. The database, however, has been |
| * initialized before running leader election so that |
| * the server could pick its zxid for its initial vote. |
| * It does it by invoking QuorumPeer#getLastLoggedZxid. |
| * Consequently, we don't need to initialize it once more |
| * and avoid the penalty of loading it a second time. Not |
| * reloading it is particularly important for applications |
| * that host a large database. |
| * |
| * The following if block checks whether the database has |
| * been initialized or not. Note that this method is |
| * invoked by at least one other method: |
| * ZooKeeperServer#startdata. |
| * |
| * See ZOOKEEPER-1642 for more detail. |
| */ |
| if (zkDb.isInitialized()) { |
| setZxid(zkDb.getDataTreeLastProcessedZxid()); |
| } else { |
| setZxid(zkDb.loadDataBase()); |
| } |
| |
| // Clean up dead sessions |
| List<Long> deadSessions = new ArrayList<>(); |
| for (Long session : zkDb.getSessions()) { |
| if (zkDb.getSessionWithTimeOuts().get(session) == null) { |
| deadSessions.add(session); |
| } |
| } |
| |
| for (long session : deadSessions) { |
| // TODO: Is lastProcessedZxid really the best thing to use? |
| killSession(session, zkDb.getDataTreeLastProcessedZxid()); |
| } |
| |
| // Make a clean snapshot |
| takeSnapshot(); |
| } |
| |
| public void takeSnapshot() { |
| takeSnapshot(false); |
| } |
| |
| public void takeSnapshot(boolean syncSnap) { |
| long start = Time.currentElapsedTime(); |
| try { |
| txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); |
| } catch (IOException e) { |
| LOG.error("Severe unrecoverable error, exiting", e); |
| // This is a severe error that we cannot recover from, |
| // so we need to exit |
| ServiceUtils.requestSystemExit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); |
| } |
| long elapsed = Time.currentElapsedTime() - start; |
| LOG.info("Snapshot taken in {} ms", elapsed); |
| ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed); |
| } |
| |
| @Override |
| public long getDataDirSize() { |
| if (zkDb == null) { |
| return 0L; |
| } |
| File path = zkDb.snapLog.getDataDir(); |
| return getDirSize(path); |
| } |
| |
| @Override |
| public long getLogDirSize() { |
| if (zkDb == null) { |
| return 0L; |
| } |
| File path = zkDb.snapLog.getSnapDir(); |
| return getDirSize(path); |
| } |
| |
| private long getDirSize(File file) { |
| long size = 0L; |
| if (file.isDirectory()) { |
| File[] files = file.listFiles(); |
| if (files != null) { |
| for (File f : files) { |
| size += getDirSize(f); |
| } |
| } |
| } else { |
| size = file.length(); |
| } |
| return size; |
| } |
| |
| public long getZxid() { |
| return hzxid.get(); |
| } |
| |
| public SessionTracker getSessionTracker() { |
| return sessionTracker; |
| } |
| |
| long getNextZxid() { |
| return hzxid.incrementAndGet(); |
| } |
| |
| public void setZxid(long zxid) { |
| hzxid.set(zxid); |
| } |
| |
| private void close(long sessionId) { |
| Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null); |
| submitRequest(si); |
| } |
| |
| public void closeSession(long sessionId) { |
| LOG.info("Closing session 0x{}", Long.toHexString(sessionId)); |
| |
| // we do not want to wait for a session close. send it as soon as we |
| // detect it! |
| close(sessionId); |
| } |
| |
| protected void killSession(long sessionId, long zxid) { |
| zkDb.killSession(sessionId, zxid); |
| if (LOG.isTraceEnabled()) { |
| ZooTrace.logTraceMessage( |
| LOG, |
| ZooTrace.SESSION_TRACE_MASK, |
| "ZooKeeperServer --- killSession: 0x" + Long.toHexString(sessionId)); |
| } |
| if (sessionTracker != null) { |
| sessionTracker.removeSession(sessionId); |
| } |
| } |
| |
| public void expire(Session session) { |
| long sessionId = session.getSessionId(); |
| LOG.info( |
| "Expiring session 0x{}, timeout of {}ms exceeded", |
| Long.toHexString(sessionId), |
| session.getTimeout()); |
| close(sessionId); |
| } |
| |
| public static class MissingSessionException extends IOException { |
| |
| private static final long serialVersionUID = 7467414635467261007L; |
| |
| public MissingSessionException(String msg) { |
| super(msg); |
| } |
| |
| } |
| |
| void touch(ServerCnxn cnxn) throws MissingSessionException { |
| if (cnxn == null) { |
| return; |
| } |
| long id = cnxn.getSessionId(); |
| int to = cnxn.getSessionTimeout(); |
| if (!sessionTracker.touchSession(id, to)) { |
| throw new MissingSessionException("No session with sessionid 0x" |
| + Long.toHexString(id) |
| + " exists, probably expired and removed"); |
| } |
| } |
| |
| protected void registerJMX() { |
| // register with JMX |
| try { |
| jmxServerBean = new ZooKeeperServerBean(this); |
| MBeanRegistry.getInstance().register(jmxServerBean, null); |
| |
| try { |
| jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); |
| MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| jmxDataTreeBean = null; |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| jmxServerBean = null; |
| } |
| } |
| |
| public void startdata() throws IOException, InterruptedException { |
| //check to see if zkDb is not null |
| if (zkDb == null) { |
| zkDb = new ZKDatabase(this.txnLogFactory); |
| } |
| if (!zkDb.isInitialized()) { |
| loadData(); |
| } |
| } |
| |
| public synchronized void startup() { |
| if (sessionTracker == null) { |
| createSessionTracker(); |
| } |
| startSessionTracker(); |
| setupRequestProcessors(); |
| |
| startRequestThrottler(); |
| |
| registerJMX(); |
| |
| startJvmPauseMonitor(); |
| |
| registerMetrics(); |
| |
| setState(State.RUNNING); |
| |
| requestPathMetricsCollector.start(); |
| |
| localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); |
| notifyAll(); |
| } |
| |
| protected void startJvmPauseMonitor() { |
| if (this.jvmPauseMonitor != null) { |
| this.jvmPauseMonitor.serviceStart(); |
| } |
| } |
| |
| protected void startRequestThrottler() { |
| requestThrottler = new RequestThrottler(this); |
| requestThrottler.start(); |
| |
| } |
| |
| protected void setupRequestProcessors() { |
| RequestProcessor finalProcessor = new FinalRequestProcessor(this); |
| RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); |
| ((SyncRequestProcessor) syncProcessor).start(); |
| firstProcessor = new PrepRequestProcessor(this, syncProcessor); |
| ((PrepRequestProcessor) firstProcessor).start(); |
| } |
| |
| public ZooKeeperServerListener getZooKeeperServerListener() { |
| return listener; |
| } |
| |
| /** |
| * Change the server ID used by {@link #createSessionTracker()}. Must be called prior to |
| * {@link #startup()} being called |
| * |
| * @param newId ID to use |
| */ |
| public void setCreateSessionTrackerServerId(int newId) { |
| createSessionTrackerServerId = newId; |
| } |
| |
| protected void createSessionTracker() { |
| sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener()); |
| } |
| |
| protected void startSessionTracker() { |
| ((SessionTrackerImpl) sessionTracker).start(); |
| } |
| |
| /** |
| * Sets the state of ZooKeeper server. After changing the state, it notifies |
| * the server state change to a registered shutdown handler, if any. |
| * <p> |
| * The following are the server state transitions: |
| * <ul><li>During startup the server will be in the INITIAL state.</li> |
| * <li>After successfully starting, the server sets the state to RUNNING. |
| * </li> |
| * <li>The server transitions to the ERROR state if it hits an internal |
| * error. {@link ZooKeeperServerListenerImpl} notifies any critical resource |
| * error events, e.g., SyncRequestProcessor not being able to write a txn to |
| * disk.</li> |
| * <li>During shutdown the server sets the state to SHUTDOWN, which |
| * corresponds to the server not running.</li></ul> |
| * |
| * @param state new server state. |
| */ |
| protected void setState(State state) { |
| this.state = state; |
| // Notify server state changes to the registered shutdown handler, if any. |
| if (zkShutdownHandler != null) { |
| zkShutdownHandler.handle(state); |
| } else { |
| LOG.debug( |
| "ZKShutdownHandler is not registered, so ZooKeeper server" |
| + " won't take any action on ERROR or SHUTDOWN server state changes"); |
| } |
| } |
| |
| /** |
| * This can be used while shutting down the server to see whether the server |
| * is already shutdown or not. |
| * |
| * @return true if the server is running or server hits an error, false |
| * otherwise. |
| */ |
| protected boolean canShutdown() { |
| return state == State.RUNNING || state == State.ERROR; |
| } |
| |
| /** |
| * @return true if the server is running, false otherwise. |
| */ |
| public boolean isRunning() { |
| return state == State.RUNNING; |
| } |
| |
| public void shutdown() { |
| shutdown(false); |
| } |
| |
| /** |
| * Shut down the server instance |
| * @param fullyShutDown true if another server using the same database will not replace this one in the same process |
| */ |
| public synchronized void shutdown(boolean fullyShutDown) { |
| if (!canShutdown()) { |
| LOG.debug("ZooKeeper server is not running, so not proceeding to shutdown!"); |
| return; |
| } |
| LOG.info("shutting down"); |
| |
| // new RuntimeException("Calling shutdown").printStackTrace(); |
| setState(State.SHUTDOWN); |
| |
| // unregister all metrics that are keeping a strong reference to this object |
| // subclasses will do their specific clean up |
| unregisterMetrics(); |
| |
| if (requestThrottler != null) { |
| requestThrottler.shutdown(); |
| } |
| |
| // Since sessionTracker and syncThreads poll we just have to |
| // set running to false and they will detect it during the poll |
| // interval. |
| if (sessionTracker != null) { |
| sessionTracker.shutdown(); |
| } |
| if (firstProcessor != null) { |
| firstProcessor.shutdown(); |
| } |
| if (jvmPauseMonitor != null) { |
| jvmPauseMonitor.serviceStop(); |
| } |
| |
| if (zkDb != null) { |
| if (fullyShutDown) { |
| zkDb.clear(); |
| } else { |
| // else there is no need to clear the database |
| // * When a new quorum is established we can still apply the diff |
| // on top of the same zkDb data |
| // * If we fetch a new snapshot from leader, the zkDb will be |
| // cleared anyway before loading the snapshot |
| try { |
| //This will fast forward the database to the latest recorded transactions |
| zkDb.fastForwardDataBase(); |
| } catch (IOException e) { |
| LOG.error("Error updating DB", e); |
| zkDb.clear(); |
| } |
| } |
| } |
| |
| requestPathMetricsCollector.shutdown(); |
| unregisterJMX(); |
| } |
| |
| protected void unregisterJMX() { |
| // unregister from JMX |
| try { |
| if (jmxDataTreeBean != null) { |
| MBeanRegistry.getInstance().unregister(jmxDataTreeBean); |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to unregister with JMX", e); |
| } |
| try { |
| if (jmxServerBean != null) { |
| MBeanRegistry.getInstance().unregister(jmxServerBean); |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to unregister with JMX", e); |
| } |
| jmxServerBean = null; |
| jmxDataTreeBean = null; |
| } |
| |
| public void incInProcess() { |
| requestsInProcess.incrementAndGet(); |
| } |
| |
| public void decInProcess() { |
| requestsInProcess.decrementAndGet(); |
| if (requestThrottler != null) { |
| requestThrottler.throttleWake(); |
| } |
| } |
| |
| public int getInProcess() { |
| return requestsInProcess.get(); |
| } |
| |
| public int getInflight() { |
| return requestThrottleInflight(); |
| } |
| |
| private int requestThrottleInflight() { |
| if (requestThrottler != null) { |
| return requestThrottler.getInflight(); |
| } |
| return 0; |
| } |
| |
| /** |
| * This structure is used to facilitate information sharing between PrepRP |
| * and FinalRP. |
| */ |
| static class ChangeRecord { |
| |
| ChangeRecord(long zxid, String path, StatPersisted stat, int childCount, List<ACL> acl) { |
| this.zxid = zxid; |
| this.path = path; |
| this.stat = stat; |
| this.childCount = childCount; |
| this.acl = acl; |
| } |
| |
| long zxid; |
| |
| String path; |
| |
| StatPersisted stat; /* Make sure to create a new object when changing */ |
| |
| int childCount; |
| |
| List<ACL> acl; /* Make sure to create a new object when changing */ |
| |
| ChangeRecord duplicate(long zxid) { |
| StatPersisted stat = new StatPersisted(); |
| if (this.stat != null) { |
| DataTree.copyStatPersisted(this.stat, stat); |
| } |
| return new ChangeRecord(zxid, path, stat, childCount, acl == null ? new ArrayList<>() : new ArrayList<>(acl)); |
| } |
| |
| } |
| |
| byte[] generatePasswd(long id) { |
| Random r = new Random(id ^ superSecret); |
| byte[] p = new byte[16]; |
| r.nextBytes(p); |
| return p; |
| } |
| |
| protected boolean checkPasswd(long sessionId, byte[] passwd) { |
| return sessionId != 0 && Arrays.equals(passwd, generatePasswd(sessionId)); |
| } |
| |
| long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { |
| if (passwd == null) { |
| // Possible since it's just deserialized from a packet on the wire. |
| passwd = new byte[0]; |
| } |
| long sessionId = sessionTracker.createSession(timeout); |
| Random r = new Random(sessionId ^ superSecret); |
| r.nextBytes(passwd); |
| ByteBuffer to = ByteBuffer.allocate(4); |
| to.putInt(timeout); |
| cnxn.setSessionId(sessionId); |
| Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); |
| submitRequest(si); |
| return sessionId; |
| } |
| |
| /** |
| * set the owner of this session as owner |
| * @param id the session id |
| * @param owner the owner of the session |
| * @throws SessionExpiredException |
| */ |
| public void setOwner(long id, Object owner) throws SessionExpiredException { |
| sessionTracker.setOwner(id, owner); |
| } |
| |
| protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { |
| boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); |
| if (LOG.isTraceEnabled()) { |
| ZooTrace.logTraceMessage( |
| LOG, |
| ZooTrace.SESSION_TRACE_MASK, |
| "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); |
| } |
| finishSessionInit(cnxn, rc); |
| } |
| |
| public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { |
| if (checkPasswd(sessionId, passwd)) { |
| revalidateSession(cnxn, sessionId, sessionTimeout); |
| } else { |
| LOG.warn( |
| "Incorrect password from {} for session 0x{}", |
| cnxn.getRemoteSocketAddress(), |
| Long.toHexString(sessionId)); |
| finishSessionInit(cnxn, false); |
| } |
| } |
| |
| public void finishSessionInit(ServerCnxn cnxn, boolean valid) { |
| // register with JMX |
| try { |
| if (valid) { |
| if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) { |
| serverCnxnFactory.registerConnection(cnxn); |
| } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) { |
| secureServerCnxnFactory.registerConnection(cnxn); |
| } |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| } |
| |
| try { |
| ConnectResponse rsp = new ConnectResponse( |
| 0, |
| valid ? cnxn.getSessionTimeout() : 0, |
| valid ? cnxn.getSessionId() : 0, // send 0 if session is no |
| // longer valid |
| valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); |
| bos.writeInt(-1, "len"); |
| rsp.serialize(bos, "connect"); |
| if (!cnxn.isOldClient) { |
| bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly"); |
| } |
| baos.close(); |
| ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); |
| bb.putInt(bb.remaining() - 4).rewind(); |
| cnxn.sendBuffer(bb); |
| |
| if (valid) { |
| LOG.debug( |
| "Established session 0x{} with negotiated timeout {} for client {}", |
| Long.toHexString(cnxn.getSessionId()), |
| cnxn.getSessionTimeout(), |
| cnxn.getRemoteSocketAddress()); |
| cnxn.enableRecv(); |
| } else { |
| |
| LOG.info( |
| "Invalid session 0x{} for client {}, probably expired", |
| Long.toHexString(cnxn.getSessionId()), |
| cnxn.getRemoteSocketAddress()); |
| cnxn.sendBuffer(ServerCnxnFactory.closeConn); |
| } |
| |
| } catch (Exception e) { |
| LOG.warn("Exception while establishing session, closing", e); |
| cnxn.close(ServerCnxn.DisconnectReason.IO_EXCEPTION_IN_SESSION_INIT); |
| } |
| } |
| |
| public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) { |
| closeSession(cnxn.getSessionId()); |
| } |
| |
| public long getServerId() { |
| return 0; |
| } |
| |
| /** |
| * If the underlying Zookeeper server support local session, this method |
| * will set a isLocalSession to true if a request is associated with |
| * a local session. |
| * |
| * @param si |
| */ |
| protected void setLocalSessionFlag(Request si) { |
| } |
| |
| public void submitRequest(Request si) { |
| enqueueRequest(si); |
| } |
| |
| public void enqueueRequest(Request si) { |
| if (requestThrottler == null) { |
| synchronized (this) { |
| try { |
| // Since all requests are passed to the request |
| // processor it should wait for setting up the request |
| // processor chain. The state will be updated to RUNNING |
| // after the setup. |
| while (state == State.INITIAL) { |
| wait(1000); |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("Unexpected interruption", e); |
| } |
| if (requestThrottler == null) { |
| throw new RuntimeException("Not started"); |
| } |
| } |
| } |
| requestThrottler.submitRequest(si); |
| } |
| |
| public void submitRequestNow(Request si) { |
| if (firstProcessor == null) { |
| synchronized (this) { |
| try { |
| // Since all requests are passed to the request |
| // processor it should wait for setting up the request |
| // processor chain. The state will be updated to RUNNING |
| // after the setup. |
| while (state == State.INITIAL) { |
| wait(1000); |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("Unexpected interruption", e); |
| } |
| if (firstProcessor == null || state != State.RUNNING) { |
| throw new RuntimeException("Not started"); |
| } |
| } |
| } |
| try { |
| touch(si.cnxn); |
| boolean validpacket = Request.isValid(si.type); |
| if (validpacket) { |
| setLocalSessionFlag(si); |
| firstProcessor.processRequest(si); |
| if (si.cnxn != null) { |
| incInProcess(); |
| } |
| } else { |
| LOG.warn("Received packet at server of unknown type {}", si.type); |
| // Update request accounting/throttling limits |
| requestFinished(si); |
| new UnimplementedRequestProcessor().processRequest(si); |
| } |
| } catch (MissingSessionException e) { |
| LOG.debug("Dropping request.", e); |
| // Update request accounting/throttling limits |
| requestFinished(si); |
| } catch (RequestProcessorException e) { |
| LOG.error("Unable to process request", e); |
| // Update request accounting/throttling limits |
| requestFinished(si); |
| } |
| } |
| |
| public static int getSnapCount() { |
| String sc = System.getProperty(SNAP_COUNT); |
| try { |
| int snapCount = Integer.parseInt(sc); |
| |
| // snapCount must be 2 or more. See org.apache.zookeeper.server.SyncRequestProcessor |
| if (snapCount < 2) { |
| LOG.warn("SnapCount should be 2 or more. Now, snapCount is reset to 2"); |
| snapCount = 2; |
| } |
| return snapCount; |
| } catch (Exception e) { |
| return 100000; |
| } |
| } |
| |
| public int getGlobalOutstandingLimit() { |
| String sc = System.getProperty(GLOBAL_OUTSTANDING_LIMIT); |
| int limit; |
| try { |
| limit = Integer.parseInt(sc); |
| } catch (Exception e) { |
| limit = 1000; |
| } |
| return limit; |
| } |
| |
| public static long getSnapSizeInBytes() { |
| long size = Long.getLong("zookeeper.snapSizeLimitInKb", 4194304L); // 4GB by default |
| if (size <= 0) { |
| LOG.info("zookeeper.snapSizeLimitInKb set to a non-positive value {}; disabling feature", size); |
| } |
| return size * 1024; // Convert to bytes |
| } |
| |
| public void setServerCnxnFactory(ServerCnxnFactory factory) { |
| serverCnxnFactory = factory; |
| } |
| |
| public ServerCnxnFactory getServerCnxnFactory() { |
| return serverCnxnFactory; |
| } |
| |
| public ServerCnxnFactory getSecureServerCnxnFactory() { |
| return secureServerCnxnFactory; |
| } |
| |
| public void setSecureServerCnxnFactory(ServerCnxnFactory factory) { |
| secureServerCnxnFactory = factory; |
| } |
| |
| /** |
| * return the last proceesed id from the |
| * datatree |
| */ |
| public long getLastProcessedZxid() { |
| return zkDb.getDataTreeLastProcessedZxid(); |
| } |
| |
| /** |
| * return the outstanding requests |
| * in the queue, which havent been |
| * processed yet |
| */ |
| public long getOutstandingRequests() { |
| return getInProcess(); |
| } |
| |
| /** |
| * return the total number of client connections that are alive |
| * to this server |
| */ |
| public int getNumAliveConnections() { |
| int numAliveConnections = 0; |
| |
| if (serverCnxnFactory != null) { |
| numAliveConnections += serverCnxnFactory.getNumAliveConnections(); |
| } |
| |
| if (secureServerCnxnFactory != null) { |
| numAliveConnections += secureServerCnxnFactory.getNumAliveConnections(); |
| } |
| |
| return numAliveConnections; |
| } |
| |
| /** |
| * trunccate the log to get in sync with others |
| * if in a quorum |
| * @param zxid the zxid that it needs to get in sync |
| * with others |
| * @throws IOException |
| */ |
| public void truncateLog(long zxid) throws IOException { |
| this.zkDb.truncateLog(zxid); |
| } |
| |
| public int getTickTime() { |
| return tickTime; |
| } |
| |
| public void setTickTime(int tickTime) { |
| LOG.info("tickTime set to {}", tickTime); |
| this.tickTime = tickTime; |
| } |
| |
| public int getMinSessionTimeout() { |
| return minSessionTimeout; |
| } |
| |
| public void setMinSessionTimeout(int min) { |
| this.minSessionTimeout = min == -1 ? tickTime * 2 : min; |
| LOG.info("minSessionTimeout set to {}", this.minSessionTimeout); |
| } |
| |
| public int getMaxSessionTimeout() { |
| return maxSessionTimeout; |
| } |
| |
| public void setMaxSessionTimeout(int max) { |
| this.maxSessionTimeout = max == -1 ? tickTime * 20 : max; |
| LOG.info("maxSessionTimeout set to {}", this.maxSessionTimeout); |
| } |
| |
| public int getClientPortListenBacklog() { |
| return listenBacklog; |
| } |
| |
| public void setClientPortListenBacklog(int backlog) { |
| this.listenBacklog = backlog; |
| LOG.info("clientPortListenBacklog set to {}", backlog); |
| } |
| |
| public int getClientPort() { |
| return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1; |
| } |
| |
| public int getSecureClientPort() { |
| return secureServerCnxnFactory != null ? secureServerCnxnFactory.getLocalPort() : -1; |
| } |
| |
| /** Maximum number of connections allowed from particular host (ip) */ |
| public int getMaxClientCnxnsPerHost() { |
| if (serverCnxnFactory != null) { |
| return serverCnxnFactory.getMaxClientCnxnsPerHost(); |
| } |
| if (secureServerCnxnFactory != null) { |
| return secureServerCnxnFactory.getMaxClientCnxnsPerHost(); |
| } |
| return -1; |
| } |
| |
| public void setTxnLogFactory(FileTxnSnapLog txnLog) { |
| this.txnLogFactory = txnLog; |
| } |
| |
| public FileTxnSnapLog getTxnLogFactory() { |
| return this.txnLogFactory; |
| } |
| |
| /** |
| * Returns the elapsed sync of time of transaction log in milliseconds. |
| */ |
| public long getTxnLogElapsedSyncTime() { |
| return txnLogFactory.getTxnLogElapsedSyncTime(); |
| } |
| |
| public String getState() { |
| return "standalone"; |
| } |
| |
| public void dumpEphemerals(PrintWriter pwriter) { |
| zkDb.dumpEphemerals(pwriter); |
| } |
| |
| public Map<Long, Set<String>> getEphemerals() { |
| return zkDb.getEphemerals(); |
| } |
| |
| public double getConnectionDropChance() { |
| return connThrottle.getDropChance(); |
| } |
| |
| @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup") |
| public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) |
| throws IOException, ClientCnxnLimitException { |
| |
| BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); |
| ConnectRequest connReq = new ConnectRequest(); |
| connReq.deserialize(bia, "connect"); |
| LOG.debug( |
| "Session establishment request from client {} client's lastZxid is 0x{}", |
| cnxn.getRemoteSocketAddress(), |
| Long.toHexString(connReq.getLastZxidSeen())); |
| |
| long sessionId = connReq.getSessionId(); |
| int tokensNeeded = 1; |
| if (connThrottle.isConnectionWeightEnabled()) { |
| if (sessionId == 0) { |
| if (localSessionEnabled) { |
| tokensNeeded = connThrottle.getRequiredTokensForLocal(); |
| } else { |
| tokensNeeded = connThrottle.getRequiredTokensForGlobal(); |
| } |
| } else { |
| tokensNeeded = connThrottle.getRequiredTokensForRenew(); |
| } |
| } |
| |
| if (!connThrottle.checkLimit(tokensNeeded)) { |
| throw new ClientCnxnLimitException(); |
| } |
| ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); |
| |
| ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); |
| |
| boolean readOnly = false; |
| try { |
| readOnly = bia.readBool("readOnly"); |
| cnxn.isOldClient = false; |
| } catch (IOException e) { |
| // this is ok -- just a packet from an old client which |
| // doesn't contain readOnly field |
| LOG.warn( |
| "Connection request from old client {}; will be dropped if server is in r-o mode", |
| cnxn.getRemoteSocketAddress()); |
| } |
| if (!readOnly && this instanceof ReadOnlyZooKeeperServer) { |
| String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); |
| LOG.info(msg); |
| throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); |
| } |
| if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { |
| String msg = "Refusing session request for client " |
| + cnxn.getRemoteSocketAddress() |
| + " as it has seen zxid 0x" |
| + Long.toHexString(connReq.getLastZxidSeen()) |
| + " our last zxid is 0x" |
| + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) |
| + " client must try another server"; |
| |
| LOG.info(msg); |
| throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); |
| } |
| int sessionTimeout = connReq.getTimeOut(); |
| byte[] passwd = connReq.getPasswd(); |
| int minSessionTimeout = getMinSessionTimeout(); |
| if (sessionTimeout < minSessionTimeout) { |
| sessionTimeout = minSessionTimeout; |
| } |
| int maxSessionTimeout = getMaxSessionTimeout(); |
| if (sessionTimeout > maxSessionTimeout) { |
| sessionTimeout = maxSessionTimeout; |
| } |
| cnxn.setSessionTimeout(sessionTimeout); |
| // We don't want to receive any packets until we are sure that the |
| // session is setup |
| cnxn.disableRecv(); |
| if (sessionId == 0) { |
| long id = createSession(cnxn, passwd, sessionTimeout); |
| LOG.debug( |
| "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", |
| Long.toHexString(id), |
| Long.toHexString(connReq.getLastZxidSeen()), |
| connReq.getTimeOut(), |
| cnxn.getRemoteSocketAddress()); |
| } else { |
| long clientSessionId = connReq.getSessionId(); |
| LOG.debug( |
| "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", |
| Long.toHexString(clientSessionId), |
| Long.toHexString(connReq.getLastZxidSeen()), |
| connReq.getTimeOut(), |
| cnxn.getRemoteSocketAddress()); |
| if (serverCnxnFactory != null) { |
| serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); |
| } |
| if (secureServerCnxnFactory != null) { |
| secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); |
| } |
| cnxn.setSessionId(sessionId); |
| reopenSession(cnxn, sessionId, passwd, sessionTimeout); |
| ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1); |
| |
| } |
| } |
| |
| public boolean shouldThrottle(long outStandingCount) { |
| if (getGlobalOutstandingLimit() < getInflight()) { |
| return outStandingCount > 0; |
| } |
| return false; |
| } |
| |
| long getFlushDelay() { |
| return flushDelay; |
| } |
| |
| static void setFlushDelay(long delay) { |
| LOG.info("{}={}", FLUSH_DELAY, delay); |
| flushDelay = delay; |
| } |
| |
| long getMaxWriteQueuePollTime() { |
| return maxWriteQueuePollTime; |
| } |
| |
| static void setMaxWriteQueuePollTime(long maxTime) { |
| LOG.info("{}={}", MAX_WRITE_QUEUE_POLL_SIZE, maxTime); |
| maxWriteQueuePollTime = maxTime; |
| } |
| |
| int getMaxBatchSize() { |
| return maxBatchSize; |
| } |
| |
| static void setMaxBatchSize(int size) { |
| LOG.info("{}={}", MAX_BATCH_SIZE, size); |
| maxBatchSize = size; |
| } |
| |
| private void initLargeRequestThrottlingSettings() { |
| setLargeRequestMaxBytes(Integer.getInteger("zookeeper.largeRequestMaxBytes", largeRequestMaxBytes)); |
| setLargeRequestThreshold(Integer.getInteger("zookeeper.largeRequestThreshold", -1)); |
| } |
| |
| public int getLargeRequestMaxBytes() { |
| return largeRequestMaxBytes; |
| } |
| |
| public void setLargeRequestMaxBytes(int bytes) { |
| if (bytes <= 0) { |
| LOG.warn("Invalid max bytes for all large requests {}. It should be a positive number.", bytes); |
| LOG.warn("Will not change the setting. The max bytes stay at {}", largeRequestMaxBytes); |
| } else { |
| largeRequestMaxBytes = bytes; |
| LOG.info("The max bytes for all large requests are set to {}", largeRequestMaxBytes); |
| } |
| } |
| |
| public int getLargeRequestThreshold() { |
| return largeRequestThreshold; |
| } |
| |
| public void setLargeRequestThreshold(int threshold) { |
| if (threshold == 0 || threshold < -1) { |
| LOG.warn("Invalid large request threshold {}. It should be -1 or positive. Setting to -1 ", threshold); |
| largeRequestThreshold = -1; |
| } else { |
| largeRequestThreshold = threshold; |
| LOG.info("The large request threshold is set to {}", largeRequestThreshold); |
| } |
| } |
| |
| public int getLargeRequestBytes() { |
| return currentLargeRequestBytes.get(); |
| } |
| |
| private boolean isLargeRequest(int length) { |
| // The large request limit is disabled when threshold is -1 |
| if (largeRequestThreshold == -1) { |
| return false; |
| } |
| return length > largeRequestThreshold; |
| } |
| |
| public boolean checkRequestSizeWhenReceivingMessage(int length) throws IOException { |
| if (!isLargeRequest(length)) { |
| return true; |
| } |
| if (currentLargeRequestBytes.get() + length <= largeRequestMaxBytes) { |
| return true; |
| } else { |
| ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); |
| throw new IOException("Rejecting large request"); |
| } |
| |
| } |
| |
| private boolean checkRequestSizeWhenMessageReceived(int length) throws IOException { |
| if (!isLargeRequest(length)) { |
| return true; |
| } |
| |
| int bytes = currentLargeRequestBytes.addAndGet(length); |
| if (bytes > largeRequestMaxBytes) { |
| currentLargeRequestBytes.addAndGet(-length); |
| ServerMetrics.getMetrics().LARGE_REQUESTS_REJECTED.add(1); |
| throw new IOException("Rejecting large request"); |
| } |
| return true; |
| } |
| |
| public void requestFinished(Request request) { |
| int largeRequestLength = request.getLargeRequestSize(); |
| if (largeRequestLength != -1) { |
| currentLargeRequestBytes.addAndGet(-largeRequestLength); |
| } |
| } |
| |
| public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { |
| // We have the request, now process and setup for next |
| InputStream bais = new ByteBufferInputStream(incomingBuffer); |
| BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); |
| RequestHeader h = new RequestHeader(); |
| h.deserialize(bia, "header"); |
| |
| // Need to increase the outstanding request count first, otherwise |
| // there might be a race condition that it enabled recv after |
| // processing request and then disabled when check throttling. |
| // |
| // Be aware that we're actually checking the global outstanding |
| // request before this request. |
| // |
| // It's fine if the IOException thrown before we decrease the count |
| // in cnxn, since it will close the cnxn anyway. |
| cnxn.incrOutstandingAndCheckThrottle(h); |
| |
| // Through the magic of byte buffers, txn will not be |
| // pointing |
| // to the start of the txn |
| incomingBuffer = incomingBuffer.slice(); |
| if (h.getType() == OpCode.auth) { |
| LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); |
| AuthPacket authPacket = new AuthPacket(); |
| ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); |
| String scheme = authPacket.getScheme(); |
| ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); |
| Code authReturn = KeeperException.Code.AUTHFAILED; |
| if (ap != null) { |
| try { |
| // handleAuthentication may close the connection, to allow the client to choose |
| // a different server to connect to. |
| authReturn = ap.handleAuthentication( |
| new ServerAuthenticationProvider.ServerObjs(this, cnxn), |
| authPacket.getAuth()); |
| } catch (RuntimeException e) { |
| LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e); |
| authReturn = KeeperException.Code.AUTHFAILED; |
| } |
| } |
| if (authReturn == KeeperException.Code.OK) { |
| LOG.debug("Authentication succeeded for scheme: {}", scheme); |
| LOG.info("auth success {}", cnxn.getRemoteSocketAddress()); |
| ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue()); |
| cnxn.sendResponse(rh, null, null); |
| } else { |
| if (ap == null) { |
| LOG.warn( |
| "No authentication provider for scheme: {} has {}", |
| scheme, |
| ProviderRegistry.listProviders()); |
| } else { |
| LOG.warn("Authentication failed for scheme: {}", scheme); |
| } |
| // send a response... |
| ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue()); |
| cnxn.sendResponse(rh, null, null); |
| // ... and close connection |
| cnxn.sendBuffer(ServerCnxnFactory.closeConn); |
| cnxn.disableRecv(); |
| } |
| return; |
| } else if (h.getType() == OpCode.sasl) { |
| processSasl(incomingBuffer, cnxn, h); |
| } else { |
| if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) { |
| ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue()); |
| cnxn.sendResponse(replyHeader, null, "response"); |
| cnxn.sendCloseSession(); |
| cnxn.disableRecv(); |
| } else { |
| Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); |
| int length = incomingBuffer.limit(); |
| if (isLargeRequest(length)) { |
| // checkRequestSize will throw IOException if request is rejected |
| checkRequestSizeWhenMessageReceived(length); |
| si.setLargeRequestSize(length); |
| } |
| si.setOwner(ServerCnxn.me); |
| submitRequest(si); |
| } |
| } |
| } |
| |
| private static boolean shouldAllowSaslFailedClientsConnect() { |
| return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); |
| } |
| |
| private static boolean shouldRequireClientSaslAuth() { |
| return Boolean.getBoolean(SESSION_REQUIRE_CLIENT_SASL_AUTH); |
| } |
| |
| private boolean hasCnxSASLAuthenticated(ServerCnxn cnxn) { |
| for (Id id : cnxn.getAuthInfo()) { |
| if (id.getScheme().equals(SASL_AUTH_SCHEME)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { |
| LOG.debug("Responding to client SASL token."); |
| GetSASLRequest clientTokenRecord = new GetSASLRequest(); |
| ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord); |
| byte[] clientToken = clientTokenRecord.getToken(); |
| LOG.debug("Size of client SASL token: {}", clientToken.length); |
| byte[] responseToken = null; |
| try { |
| ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer; |
| try { |
| // note that clientToken might be empty (clientToken.length == 0): |
| // if using the DIGEST-MD5 mechanism, clientToken will be empty at the beginning of the |
| // SASL negotiation process. |
| responseToken = saslServer.evaluateResponse(clientToken); |
| if (saslServer.isComplete()) { |
| String authorizationID = saslServer.getAuthorizationID(); |
| LOG.info("adding SASL authorization for authorizationID: {}", authorizationID); |
| cnxn.addAuthInfo(new Id("sasl", authorizationID)); |
| if (System.getProperty("zookeeper.superUser") != null |
| && authorizationID.equals(System.getProperty("zookeeper.superUser"))) { |
| cnxn.addAuthInfo(new Id("super", "")); |
| } |
| } |
| } catch (SaslException e) { |
| LOG.warn("Client {} failed to SASL authenticate: {}", cnxn.getRemoteSocketAddress(), e); |
| if (shouldAllowSaslFailedClientsConnect() && !shouldRequireClientSaslAuth()) { |
| LOG.warn("Maintaining client connection despite SASL authentication failure."); |
| } else { |
| int error; |
| if (shouldRequireClientSaslAuth()) { |
| LOG.warn( |
| "Closing client connection due to server requires client SASL authenticaiton," |
| + "but client SASL authentication has failed, or client is not configured with SASL " |
| + "authentication."); |
| error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue(); |
| } else { |
| LOG.warn("Closing client connection due to SASL authentication failure."); |
| error = Code.AUTHFAILED.intValue(); |
| } |
| |
| ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error); |
| cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response"); |
| cnxn.sendCloseSession(); |
| cnxn.disableRecv(); |
| return; |
| } |
| } |
| } catch (NullPointerException e) { |
| LOG.error("cnxn.saslServer is null: cnxn object did not initialize its saslServer properly."); |
| } |
| if (responseToken != null) { |
| LOG.debug("Size of server SASL response: {}", responseToken.length); |
| } |
| |
| ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue()); |
| Record record = new SetSASLResponse(responseToken); |
| cnxn.sendResponse(replyHeader, record, "response"); |
| } |
| |
| // entry point for quorum/Learner.java |
| public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { |
| processTxnForSessionEvents(null, hdr, txn); |
| return processTxnInDB(hdr, txn); |
| } |
| |
| // entry point for FinalRequestProcessor.java |
| public ProcessTxnResult processTxn(Request request) { |
| TxnHeader hdr = request.getHdr(); |
| processTxnForSessionEvents(request, hdr, request.getTxn()); |
| |
| final boolean writeRequest = (hdr != null); |
| final boolean quorumRequest = request.isQuorum(); |
| |
| // return fast w/o synchronization when we get a read |
| if (!writeRequest && !quorumRequest) { |
| return new ProcessTxnResult(); |
| } |
| synchronized (outstandingChanges) { |
| ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn()); |
| |
| // request.hdr is set for write requests, which are the only ones |
| // that add to outstandingChanges. |
| if (writeRequest) { |
| long zxid = hdr.getZxid(); |
| while (!outstandingChanges.isEmpty() |
| && outstandingChanges.peek().zxid <= zxid) { |
| ChangeRecord cr = outstandingChanges.remove(); |
| ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1); |
| if (cr.zxid < zxid) { |
| LOG.warn( |
| "Zxid outstanding 0x{} is less than current 0x{}", |
| Long.toHexString(cr.zxid), |
| Long.toHexString(zxid)); |
| } |
| if (outstandingChangesForPath.get(cr.path) == cr) { |
| outstandingChangesForPath.remove(cr.path); |
| } |
| } |
| } |
| |
| // do not add non quorum packets to the queue. |
| if (quorumRequest) { |
| getZKDatabase().addCommittedProposal(request); |
| } |
| return rc; |
| } |
| } |
| |
| private void processTxnForSessionEvents(Request request, TxnHeader hdr, Record txn) { |
| int opCode = (request == null) ? hdr.getType() : request.type; |
| long sessionId = (request == null) ? hdr.getClientId() : request.sessionId; |
| |
| if (opCode == OpCode.createSession) { |
| if (hdr != null && txn instanceof CreateSessionTxn) { |
| CreateSessionTxn cst = (CreateSessionTxn) txn; |
| sessionTracker.commitSession(sessionId, cst.getTimeOut()); |
| } else if (request == null || !request.isLocalSession()) { |
| LOG.warn("*****>>>>> Got {} {}", txn.getClass(), txn.toString()); |
| } |
| } else if (opCode == OpCode.closeSession) { |
| sessionTracker.removeSession(sessionId); |
| } |
| } |
| |
| private ProcessTxnResult processTxnInDB(TxnHeader hdr, Record txn) { |
| if (hdr == null) { |
| return new ProcessTxnResult(); |
| } else { |
| return getZKDatabase().processTxn(hdr, txn); |
| } |
| } |
| |
| public Map<Long, Set<Long>> getSessionExpiryMap() { |
| return sessionTracker.getSessionExpiryMap(); |
| } |
| |
| /** |
| * This method is used to register the ZooKeeperServerShutdownHandler to get |
| * server's error or shutdown state change notifications. |
| * {@link ZooKeeperServerShutdownHandler#handle(State)} will be called for |
| * every server state changes {@link #setState(State)}. |
| * |
| * @param zkShutdownHandler shutdown handler |
| */ |
| void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) { |
| this.zkShutdownHandler = zkShutdownHandler; |
| } |
| |
| public boolean isResponseCachingEnabled() { |
| return isResponseCachingEnabled; |
| } |
| |
| public void setResponseCachingEnabled(boolean isEnabled) { |
| isResponseCachingEnabled = isEnabled; |
| } |
| |
| public ResponseCache getReadResponseCache() { |
| return isResponseCachingEnabled ? readResponseCache : null; |
| } |
| |
| public ResponseCache getGetChildrenResponseCache() { |
| return isResponseCachingEnabled ? getChildrenResponseCache : null; |
| } |
| |
| protected void registerMetrics() { |
| MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); |
| |
| final ZKDatabase zkdb = this.getZKDatabase(); |
| final ServerStats stats = this.serverStats(); |
| |
| rootContext.registerGauge("avg_latency", stats::getAvgLatency); |
| |
| rootContext.registerGauge("max_latency", stats::getMaxLatency); |
| rootContext.registerGauge("min_latency", stats::getMinLatency); |
| |
| rootContext.registerGauge("packets_received", stats::getPacketsReceived); |
| rootContext.registerGauge("packets_sent", stats::getPacketsSent); |
| rootContext.registerGauge("num_alive_connections", stats::getNumAliveClientConnections); |
| |
| rootContext.registerGauge("outstanding_requests", stats::getOutstandingRequests); |
| rootContext.registerGauge("uptime", stats::getUptime); |
| |
| rootContext.registerGauge("znode_count", zkdb::getNodeCount); |
| |
| rootContext.registerGauge("watch_count", zkdb.getDataTree()::getWatchCount); |
| rootContext.registerGauge("ephemerals_count", zkdb.getDataTree()::getEphemeralsCount); |
| |
| rootContext.registerGauge("approximate_data_size", zkdb.getDataTree()::cachedApproximateDataSize); |
| |
| rootContext.registerGauge("global_sessions", zkdb::getSessionCount); |
| rootContext.registerGauge("local_sessions", this.getSessionTracker()::getLocalSessionCount); |
| |
| OSMXBean osMbean = new OSMXBean(); |
| rootContext.registerGauge("open_file_descriptor_count", osMbean::getOpenFileDescriptorCount); |
| rootContext.registerGauge("max_file_descriptor_count", osMbean::getMaxFileDescriptorCount); |
| rootContext.registerGauge("connection_drop_probability", this::getConnectionDropChance); |
| |
| rootContext.registerGauge("last_client_response_size", stats.getClientResponseStats()::getLastBufferSize); |
| rootContext.registerGauge("max_client_response_size", stats.getClientResponseStats()::getMaxBufferSize); |
| rootContext.registerGauge("min_client_response_size", stats.getClientResponseStats()::getMinBufferSize); |
| |
| rootContext.registerGauge("outstanding_tls_handshake", this::getOutstandingHandshakeNum); |
| } |
| |
| protected void unregisterMetrics() { |
| |
| MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext(); |
| |
| rootContext.unregisterGauge("avg_latency"); |
| |
| rootContext.unregisterGauge("max_latency"); |
| rootContext.unregisterGauge("min_latency"); |
| |
| rootContext.unregisterGauge("packets_received"); |
| rootContext.unregisterGauge("packets_sent"); |
| rootContext.unregisterGauge("num_alive_connections"); |
| |
| rootContext.unregisterGauge("outstanding_requests"); |
| rootContext.unregisterGauge("uptime"); |
| |
| rootContext.unregisterGauge("znode_count"); |
| |
| rootContext.unregisterGauge("watch_count"); |
| rootContext.unregisterGauge("ephemerals_count"); |
| rootContext.unregisterGauge("approximate_data_size"); |
| |
| rootContext.unregisterGauge("global_sessions"); |
| rootContext.unregisterGauge("local_sessions"); |
| |
| rootContext.unregisterGauge("open_file_descriptor_count"); |
| rootContext.unregisterGauge("max_file_descriptor_count"); |
| rootContext.unregisterGauge("connection_drop_probability"); |
| |
| rootContext.unregisterGauge("last_client_response_size"); |
| rootContext.unregisterGauge("max_client_response_size"); |
| rootContext.unregisterGauge("min_client_response_size"); |
| |
| } |
| |
| /** |
| * Hook into admin server, useful to expose additional data |
| * that do not represent metrics. |
| * |
| * @param response a sink which collects the data. |
| */ |
| public void dumpMonitorValues(BiConsumer<String, Object> response) { |
| ServerStats stats = serverStats(); |
| response.accept("version", Version.getFullVersion()); |
| response.accept("server_state", stats.getServerState()); |
| } |
| |
| /** |
| * Grant or deny authorization to an operation on a node as a function of: |
| * @param cnxn : the server connection |
| * @param acl : set of ACLs for the node |
| * @param perm : the permission that the client is requesting |
| * @param ids : the credentials supplied by the client |
| * @param path : the ZNode path |
| * @param setAcls : for set ACL operations, the list of ACLs being set. Otherwise null. |
| */ |
| public void checkACL(ServerCnxn cnxn, List<ACL> acl, int perm, List<Id> ids, String path, List<ACL> setAcls) throws KeeperException.NoAuthException { |
| if (skipACL) { |
| return; |
| } |
| |
| LOG.debug("Permission requested: {} ", perm); |
| LOG.debug("ACLs for node: {}", acl); |
| LOG.debug("Client credentials: {}", ids); |
| |
| if (acl == null || acl.size() == 0) { |
| return; |
| } |
| for (Id authId : ids) { |
| if (authId.getScheme().equals("super")) { |
| return; |
| } |
| } |
| for (ACL a : acl) { |
| Id id = a.getId(); |
| if ((a.getPerms() & perm) != 0) { |
| if (id.getScheme().equals("world") && id.getId().equals("anyone")) { |
| return; |
| } |
| ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme()); |
| if (ap != null) { |
| for (Id authId : ids) { |
| if (authId.getScheme().equals(id.getScheme()) |
| && ap.matches( |
| new ServerAuthenticationProvider.ServerObjs(this, cnxn), |
| new ServerAuthenticationProvider.MatchValues(path, authId.getId(), id.getId(), perm, setAcls))) { |
| return; |
| } |
| } |
| } |
| } |
| } |
| throw new KeeperException.NoAuthException(); |
| } |
| |
| public static boolean isDigestEnabled() { |
| return digestEnabled; |
| } |
| |
| public static void setDigestEnabled(boolean digestEnabled) { |
| LOG.info("{} = {}", ZOOKEEPER_DIGEST_ENABLED, digestEnabled); |
| ZooKeeperServer.digestEnabled = digestEnabled; |
| } |
| |
| /** |
| * Trim a path to get the immediate predecessor. |
| * |
| * @param path |
| * @return |
| * @throws KeeperException.BadArgumentsException |
| */ |
| private String parentPath(String path) throws KeeperException.BadArgumentsException { |
| int lastSlash = path.lastIndexOf('/'); |
| if (lastSlash == -1 || path.indexOf('\0') != -1 || getZKDatabase().isSpecialPath(path)) { |
| throw new KeeperException.BadArgumentsException(path); |
| } |
| return lastSlash == 0 ? "/" : path.substring(0, lastSlash); |
| } |
| |
| private String effectiveACLPath(Request request) throws KeeperException.BadArgumentsException, KeeperException.InvalidACLException { |
| boolean mustCheckACL = false; |
| String path = null; |
| List<ACL> acl = null; |
| |
| switch (request.type) { |
| case OpCode.create: |
| case OpCode.create2: { |
| CreateRequest req = new CreateRequest(); |
| if (buffer2Record(request.request, req)) { |
| mustCheckACL = true; |
| acl = req.getAcl(); |
| path = parentPath(req.getPath()); |
| } |
| break; |
| } |
| case OpCode.delete: { |
| DeleteRequest req = new DeleteRequest(); |
| if (buffer2Record(request.request, req)) { |
| path = parentPath(req.getPath()); |
| } |
| break; |
| } |
| case OpCode.setData: { |
| SetDataRequest req = new SetDataRequest(); |
| if (buffer2Record(request.request, req)) { |
| path = req.getPath(); |
| } |
| break; |
| } |
| case OpCode.setACL: { |
| SetACLRequest req = new SetACLRequest(); |
| if (buffer2Record(request.request, req)) { |
| mustCheckACL = true; |
| acl = req.getAcl(); |
| path = req.getPath(); |
| } |
| break; |
| } |
| } |
| |
| if (mustCheckACL) { |
| /* we ignore the extrapolated ACL returned by fixupACL because |
| * we only care about it being well-formed (and if it isn't, an |
| * exception will be raised). |
| */ |
| PrepRequestProcessor.fixupACL(path, request.authInfo, acl); |
| } |
| |
| return path; |
| } |
| |
| private int effectiveACLPerms(Request request) { |
| switch (request.type) { |
| case OpCode.create: |
| case OpCode.create2: |
| return ZooDefs.Perms.CREATE; |
| case OpCode.delete: |
| return ZooDefs.Perms.DELETE; |
| case OpCode.setData: |
| return ZooDefs.Perms.WRITE; |
| case OpCode.setACL: |
| return ZooDefs.Perms.ADMIN; |
| default: |
| return ZooDefs.Perms.ALL; |
| } |
| } |
| |
| /** |
| * Check Write Requests for Potential Access Restrictions |
| * <p/> |
| * Before a request is being proposed to the quorum, lets check it |
| * against local ACLs. Non-write requests (read, session, etc.) |
| * are passed along. Invalid requests are sent a response. |
| * <p/> |
| * While we are at it, if the request will set an ACL: make sure it's |
| * a valid one. |
| * |
| * @param request |
| * @return true if request is permitted, false if not. |
| * @throws java.io.IOException |
| */ |
| public boolean authWriteRequest(Request request) { |
| int err; |
| String pathToCheck; |
| |
| if (!enableEagerACLCheck) { |
| return true; |
| } |
| |
| err = KeeperException.Code.OK.intValue(); |
| |
| try { |
| pathToCheck = effectiveACLPath(request); |
| if (pathToCheck != null) { |
| checkACL(request.cnxn, zkDb.getACL(pathToCheck, null), effectiveACLPerms(request), request.authInfo, pathToCheck, null); |
| } |
| } catch (KeeperException.NoAuthException e) { |
| LOG.debug("Request failed ACL check", e); |
| err = e.code().intValue(); |
| } catch (KeeperException.InvalidACLException e) { |
| LOG.debug("Request has an invalid ACL check", e); |
| err = e.code().intValue(); |
| } catch (KeeperException.NoNodeException e) { |
| LOG.debug("ACL check against non-existent node: {}", e.getMessage()); |
| } catch (KeeperException.BadArgumentsException e) { |
| LOG.debug("ACL check against illegal node path: {}", e.getMessage()); |
| } catch (Throwable t) { |
| LOG.error("Uncaught exception in authWriteRequest with: ", t); |
| throw t; |
| } finally { |
| if (err != KeeperException.Code.OK.intValue()) { |
| /* This request has a bad ACL, so we are dismissing it early. */ |
| decInProcess(); |
| ReplyHeader rh = new ReplyHeader(request.cxid, 0, err); |
| try { |
| request.cnxn.sendResponse(rh, null, null); |
| } catch (IOException e) { |
| LOG.error("IOException : {}", e); |
| } |
| } |
| } |
| |
| return err == KeeperException.Code.OK.intValue(); |
| } |
| |
| private boolean buffer2Record(ByteBuffer request, Record record) { |
| boolean rv = false; |
| try { |
| ByteBufferInputStream.byteBuffer2Record(request, record); |
| request.rewind(); |
| rv = true; |
| } catch (IOException ex) { |
| } |
| |
| return rv; |
| } |
| |
| public int getOutstandingHandshakeNum() { |
| if (serverCnxnFactory instanceof NettyServerCnxnFactory) { |
| return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum(); |
| } else { |
| return 0; |
| } |
| } |
| } |