blob: b50d00babdb2fe7c084f948564d984ec7ed21cfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.zip.CheckedInputStream;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.WatcherType;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.persistence.FileSnap;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
import org.apache.zookeeper.server.persistence.SnapStream;
import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class maintains the in memory database of zookeeper
* server states that includes the sessions, datatree and the
* committed logs. It is booted up after reading the logs
* and snapshots from the disk.
*/
public class ZKDatabase {
private static final Logger LOG = LoggerFactory.getLogger(ZKDatabase.class);
/**
* make sure on a clear you take care of
* all these members.
*/
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
protected FileTxnSnapLog snapLog;
protected long minCommittedLog, maxCommittedLog;
/**
* Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot
*/
public static final String SNAPSHOT_SIZE_FACTOR = "zookeeper.snapshotSizeFactor";
public static final double DEFAULT_SNAPSHOT_SIZE_FACTOR = 0.33;
private double snapshotSizeFactor;
public static final String COMMIT_LOG_COUNT = "zookeeper.commitLogCount";
public static final int DEFAULT_COMMIT_LOG_COUNT = 500;
public int commitLogCount;
protected Queue<Proposal> committedLog = new ArrayDeque<>();
protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
private volatile boolean initialized = false;
/**
* Number of txn since last snapshot;
*/
private AtomicInteger txnCount = new AtomicInteger(0);
/**
* the filetxnsnaplog that this zk database
* maps to. There is a one to one relationship
* between a filetxnsnaplog and zkdatabase.
* @param snapLog the FileTxnSnapLog mapping this zkdatabase
*/
public ZKDatabase(FileTxnSnapLog snapLog) {
dataTree = createDataTree();
sessionsWithTimeouts = new ConcurrentHashMap<>();
this.snapLog = snapLog;
try {
snapshotSizeFactor = Double.parseDouble(
System.getProperty(SNAPSHOT_SIZE_FACTOR,
Double.toString(DEFAULT_SNAPSHOT_SIZE_FACTOR)));
if (snapshotSizeFactor > 1) {
snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;
LOG.warn(
"The configured {} is invalid, going to use the default {}",
SNAPSHOT_SIZE_FACTOR,
DEFAULT_SNAPSHOT_SIZE_FACTOR);
}
} catch (NumberFormatException e) {
LOG.error(
"Error parsing {}, using default value {}",
SNAPSHOT_SIZE_FACTOR,
DEFAULT_SNAPSHOT_SIZE_FACTOR);
snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR;
}
LOG.info("{} = {}", SNAPSHOT_SIZE_FACTOR, snapshotSizeFactor);
try {
commitLogCount = Integer.parseInt(
System.getProperty(COMMIT_LOG_COUNT,
Integer.toString(DEFAULT_COMMIT_LOG_COUNT)));
if (commitLogCount < DEFAULT_COMMIT_LOG_COUNT) {
commitLogCount = DEFAULT_COMMIT_LOG_COUNT;
LOG.warn(
"The configured commitLogCount {} is less than the recommended {}, going to use the recommended one",
COMMIT_LOG_COUNT,
DEFAULT_COMMIT_LOG_COUNT);
}
} catch (NumberFormatException e) {
LOG.error(
"Error parsing {} - use default value {}",
COMMIT_LOG_COUNT,
DEFAULT_COMMIT_LOG_COUNT);
commitLogCount = DEFAULT_COMMIT_LOG_COUNT;
}
LOG.info("{}={}", COMMIT_LOG_COUNT, commitLogCount);
}
/**
* checks to see if the zk database has been
* initialized or not.
* @return true if zk database is initialized and false if not
*/
public boolean isInitialized() {
return initialized;
}
/**
* clear the zkdatabase.
* Note to developers - be careful to see that
* the clear method does clear out all the
* data structures in zkdatabase.
*/
public void clear() {
minCommittedLog = 0;
maxCommittedLog = 0;
/* to be safe we just create a new
* datatree.
*/
dataTree.shutdownWatcher();
dataTree = createDataTree();
sessionsWithTimeouts.clear();
WriteLock lock = logLock.writeLock();
try {
lock.lock();
committedLog.clear();
} finally {
lock.unlock();
}
initialized = false;
}
/**
* the datatree for this zkdatabase
* @return the datatree for this zkdatabase
*/
public DataTree getDataTree() {
return this.dataTree;
}
/**
* the committed log for this zk database
* @return the committed log for this zkdatabase
*/
public long getmaxCommittedLog() {
return maxCommittedLog;
}
/**
* the minimum committed transaction log
* available in memory
* @return the minimum committed transaction
* log available in memory
*/
public long getminCommittedLog() {
return minCommittedLog;
}
/**
* Get the lock that controls the committedLog. If you want to get the pointer to the committedLog, you need
* to use this lock to acquire a read lock before calling getCommittedLog()
* @return the lock that controls the committed log
*/
public ReentrantReadWriteLock getLogLock() {
return logLock;
}
public synchronized Collection<Proposal> getCommittedLog() {
final Collection<Proposal> result;
ReadLock rl = logLock.readLock();
// make a copy if this thread is not already holding a lock
if (logLock.getReadHoldCount() > 0) {
result = this.committedLog;
} else {
rl.lock();
try {
result = new ArrayList<>(this.committedLog);
} finally {
rl.unlock();
}
}
return Collections.unmodifiableCollection(result);
}
/**
* get the last processed zxid from a datatree
* @return the last processed zxid of a datatree
*/
public long getDataTreeLastProcessedZxid() {
return dataTree.lastProcessedZxid;
}
/**
* return the sessions in the datatree
* @return the data tree sessions
*/
public Collection<Long> getSessions() {
return dataTree.getSessions();
}
/**
* @return number of (global) sessions
*/
public long getSessionCount() {
return sessionsWithTimeouts.size();
}
/**
* get sessions with timeouts
* @return the hashmap of sessions with timeouts
*/
public ConcurrentHashMap<Long, Integer> getSessionWithTimeOuts() {
return sessionsWithTimeouts;
}
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
public void onTxnLoaded(TxnHeader hdr, Record txn, TxnDigest digest) {
addCommittedProposal(hdr, txn, digest);
}
};
/**
* load the database from the disk onto memory and also add
* the transactions to the committedlog in memory.
* @return the last valid zxid on disk
* @throws IOException
*/
public long loadDataBase() throws IOException {
long startTime = Time.currentElapsedTime();
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
long loadTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
return zxid;
}
/**
* Fast forward the database adding transactions from the committed log into memory.
* @return the last valid zxid.
* @throws IOException
*/
public long fastForwardDataBase() throws IOException {
long zxid = snapLog.fastForwardFromEdits(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
private void addCommittedProposal(TxnHeader hdr, Record txn, TxnDigest digest) {
Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
r.setTxnDigest(digest);
addCommittedProposal(r);
}
/**
* maintains a list of last <i>committedLog</i>
* or so committed requests. This is used for
* fast follower synchronization.
* @param request committed request
*/
public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.remove();
minCommittedLog = committedLog.peek().packet.getZxid();
}
if (committedLog.isEmpty()) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
byte[] data = SerializeUtils.serializeRequest(request);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
committedLog.add(p);
maxCommittedLog = p.packet.getZxid();
} finally {
wl.unlock();
}
}
public boolean isTxnLogSyncEnabled() {
boolean enabled = snapshotSizeFactor >= 0;
if (enabled) {
LOG.info("On disk txn sync enabled with snapshotSizeFactor {}", snapshotSizeFactor);
} else {
LOG.info("On disk txn sync disabled");
}
return enabled;
}
public long calculateTxnLogSizeLimit() {
long snapSize = 0;
try {
File snapFile = snapLog.findMostRecentSnapshot();
if (snapFile != null) {
snapSize = snapFile.length();
}
} catch (IOException e) {
LOG.error("Unable to get size of most recent snapshot");
}
return (long) (snapSize * snapshotSizeFactor);
}
/**
* Get proposals from txnlog. Only packet part of proposal is populated.
*
* @param startZxid the starting zxid of the proposal
* @param sizeLimit maximum on-disk size of txnlog to fetch
* 0 is unlimited, negative value means disable.
* @return list of proposal (request part of each proposal is null)
*/
public Iterator<Proposal> getProposalsFromTxnLog(long startZxid, long sizeLimit) {
if (sizeLimit < 0) {
LOG.debug("Negative size limit - retrieving proposal via txnlog is disabled");
return TxnLogProposalIterator.EMPTY_ITERATOR;
}
TxnIterator itr = null;
try {
itr = snapLog.readTxnLog(startZxid, false);
// If we cannot guarantee that this is strictly the starting txn
// after a given zxid, we should fail.
if ((itr.getHeader() != null) && (itr.getHeader().getZxid() > startZxid)) {
LOG.warn(
"Unable to find proposals from txnlog for zxid: 0x{}",
Long.toHexString(startZxid));
itr.close();
return TxnLogProposalIterator.EMPTY_ITERATOR;
}
if (sizeLimit > 0) {
long txnSize = itr.getStorageSize();
if (txnSize > sizeLimit) {
LOG.info("Txnlog size: {} exceeds sizeLimit: {}", txnSize, sizeLimit);
itr.close();
return TxnLogProposalIterator.EMPTY_ITERATOR;
}
}
} catch (IOException e) {
LOG.error("Unable to read txnlog from disk", e);
try {
if (itr != null) {
itr.close();
}
} catch (IOException ioe) {
LOG.warn("Error closing file iterator", ioe);
}
return TxnLogProposalIterator.EMPTY_ITERATOR;
}
return new TxnLogProposalIterator(itr);
}
public List<ACL> aclForNode(DataNode n) {
return dataTree.getACL(n);
}
/**
* remove a cnxn from the datatree
* @param cnxn the cnxn to remove from the datatree
*/
public void removeCnxn(ServerCnxn cnxn) {
dataTree.removeCnxn(cnxn);
}
/**
* kill a given session in the datatree
* @param sessionId the session id to be killed
* @param zxid the zxid of kill session transaction
*/
public void killSession(long sessionId, long zxid) {
dataTree.killSession(sessionId, zxid);
}
/**
* write a text dump of all the ephemerals in the datatree
* @param pwriter the output to write to
*/
public void dumpEphemerals(PrintWriter pwriter) {
dataTree.dumpEphemerals(pwriter);
}
public Map<Long, Set<String>> getEphemerals() {
return dataTree.getEphemerals();
}
/**
* the node count of the datatree
* @return the node count of datatree
*/
public int getNodeCount() {
return dataTree.getNodeCount();
}
/**
* the paths for ephemeral session id
* @param sessionId the session id for which paths match to
* @return the paths for a session id
*/
public Set<String> getEphemerals(long sessionId) {
return dataTree.getEphemerals(sessionId);
}
/**
* the last processed zxid in the datatree
* @param zxid the last processed zxid in the datatree
*/
public void setlastProcessedZxid(long zxid) {
dataTree.lastProcessedZxid = zxid;
}
/**
* the process txn on the data and perform digest comparision.
* @param hdr the txnheader for the txn
* @param txn the transaction that needs to be processed
* @param digest the expected digest. A null value would skip the check
* @return the result of processing the transaction on this
* datatree/zkdatabase
*/
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn, TxnDigest digest) {
return dataTree.processTxn(hdr, txn, digest);
}
/**
* stat the path
* @param path the path for which stat is to be done
* @param serverCnxn the servercnxn attached to this request
* @return the stat of this node
* @throws KeeperException.NoNodeException
*/
public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
return dataTree.statNode(path, serverCnxn);
}
/**
* get the datanode for this path
* @param path the path to lookup
* @return the datanode for getting the path
*/
public DataNode getNode(String path) {
return dataTree.getNode(path);
}
/**
* get data and stat for a path
* @param path the path being queried
* @param stat the stat for this path
* @param watcher the watcher function
* @throws KeeperException.NoNodeException
*/
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
return dataTree.getData(path, stat, watcher);
}
/**
* set watches on the datatree
* @param relativeZxid the relative zxid that client has seen
* @param dataWatches the data watches the client wants to reset
* @param existWatches the exists watches the client wants to reset
* @param childWatches the child watches the client wants to reset
* @param persistentWatches the persistent watches the client wants to reset
* @param persistentRecursiveWatches the persistent recursive watches the client wants to reset
* @param watcher the watcher function
*/
public void setWatches(long relativeZxid, List<String> dataWatches, List<String> existWatches, List<String> childWatches,
List<String> persistentWatches, List<String> persistentRecursiveWatches, Watcher watcher) {
dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, persistentWatches, persistentRecursiveWatches, watcher);
}
/**
* Add a watch
*
* @param basePath
* watch base
* @param watcher
* the watcher
* @param mode
* a mode from ZooDefs.AddWatchModes
*/
public void addWatch(String basePath, Watcher watcher, int mode) {
dataTree.addWatch(basePath, watcher, mode);
}
/**
* get acl for a path
* @param path the path to query for acl
* @param stat the stat for the node
* @return the acl list for this path
* @throws NoNodeException
*/
public List<ACL> getACL(String path, Stat stat) throws NoNodeException {
return dataTree.getACL(path, stat);
}
/**
* get children list for this path
* @param path the path of the node
* @param stat the stat of the node
* @param watcher the watcher function for this path
* @return the list of children for this path
* @throws KeeperException.NoNodeException
*/
public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
return dataTree.getChildren(path, stat, watcher);
}
/*
* get all sub-children number of this node
* */
public int getAllChildrenNumber(String path) throws KeeperException.NoNodeException {
return dataTree.getAllChildrenNumber(path);
}
/**
* check if the path is special or not
* @param path the input path
* @return true if path is special and false if not
*/
public boolean isSpecialPath(String path) {
return dataTree.isSpecialPath(path);
}
/**
* get the acl size of the datatree
* @return the acl size of the datatree
*/
public int getAclSize() {
return dataTree.aclCacheSize();
}
/**
* Truncate the ZKDatabase to the specified zxid
* @param zxid the zxid to truncate zk database to
* @return true if the truncate is successful and false if not
* @throws IOException
*/
public boolean truncateLog(long zxid) throws IOException {
clear();
// truncate the log
boolean truncated = snapLog.truncateLog(zxid);
if (!truncated) {
return false;
}
loadDataBase();
return true;
}
/**
* deserialize a snapshot from an input archive
* @param ia the input archive you want to deserialize from
* @throws IOException
*/
public void deserializeSnapshot(InputArchive ia) throws IOException {
clear();
SerializeUtils.deserializeSnapshot(getDataTree(), ia, getSessionWithTimeOuts());
initialized = true;
}
/**
* Deserialize a snapshot that contains FileHeader from an input archive. It is used by
* the admin restore command.
*
* @param ia the input archive to deserialize from
* @param is the CheckInputStream to check integrity
*
* @throws IOException
*/
public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
clear();
// deserialize data tree
final DataTree dataTree = getDataTree();
FileSnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
SnapStream.checkSealIntegrity(is, ia);
// deserialize digest and check integrity
if (dataTree.deserializeZxidDigest(ia, 0)) {
SnapStream.checkSealIntegrity(is, ia);
}
// deserialize lastProcessedZxid and check integrity
if (dataTree.deserializeLastProcessedZxid(ia)) {
SnapStream.checkSealIntegrity(is, ia);
}
// compare the digest to find inconsistency
if (dataTree.getDigestFromLoadedSnapshot() != null) {
dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
}
initialized = true;
}
/**
* serialize the snapshot
* @param oa the output archive to which the snapshot needs to be serialized
* @throws IOException
* @throws InterruptedException
*/
public void serializeSnapshot(OutputArchive oa) throws IOException, InterruptedException {
SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
}
/**
* append to the underlying transaction log
* @param si the request to append
* @return true if the append was succesfull and false if not
*/
public boolean append(Request si) throws IOException {
if (this.snapLog.append(si)) {
txnCount.incrementAndGet();
return true;
}
return false;
}
/**
* roll the underlying log
*/
public void rollLog() throws IOException {
this.snapLog.rollLog();
resetTxnCount();
}
/**
* commit to the underlying transaction log
* @throws IOException
*/
public void commit() throws IOException {
this.snapLog.commit();
}
/**
* close this database. free the resources
* @throws IOException
*/
public void close() throws IOException {
this.snapLog.close();
}
public synchronized void initConfigInZKDatabase(QuorumVerifier qv) {
if (qv == null) {
return; // only happens during tests
}
try {
if (this.dataTree.getNode(ZooDefs.CONFIG_NODE) == null) {
// should only happen during upgrade
LOG.warn("configuration znode missing (should only happen during upgrade), creating the node");
this.dataTree.addConfigNode();
}
this.dataTree.setData(
ZooDefs.CONFIG_NODE,
qv.toString().getBytes(UTF_8),
-1,
qv.getVersion(),
Time.currentWallTime());
} catch (NoNodeException e) {
System.out.println("configuration node missing - should not happen");
}
}
/**
* Use for unit testing, so we can turn this feature on/off
* @param snapshotSizeFactor Set to minus value to turn this off.
*/
public void setSnapshotSizeFactor(double snapshotSizeFactor) {
this.snapshotSizeFactor = snapshotSizeFactor;
}
/**
* Check whether the given watcher exists in datatree
*
* @param path
* node to check watcher existence
* @param type
* type of watcher
* @param watcher
* watcher function
*/
public boolean containsWatcher(String path, WatcherType type, Watcher watcher) {
return dataTree.containsWatcher(path, type, watcher);
}
/**
* Remove watch from the datatree
*
* @param path
* node to remove watches from
* @param type
* type of watcher to remove
* @param watcher
* watcher function to remove
*/
public boolean removeWatch(String path, WatcherType type, Watcher watcher) {
return dataTree.removeWatch(path, type, watcher);
}
// visible for testing
public DataTree createDataTree() {
return new DataTree();
}
/**
* Reset the number of txn since last rollLog
*/
public void resetTxnCount() {
txnCount.set(0);
snapLog.setTotalLogSize(0);
}
/**
* Get the number of txn since last snapshot
*/
public int getTxnCount() {
return txnCount.get();
}
/**
* Get the size of txn since last snapshot
*/
public long getTxnSize() {
return snapLog.getTotalLogSize();
}
public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest) {
return dataTree.compareDigest(header, txn, digest);
}
}