blob: bcbfbaad73c57b8a82df4f390a094ca288d5da26 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DeleteContainerRequest;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.BadArgumentsException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MultiOperationRecord;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.proto.CheckVersionRequest;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.CreateTTLRequest;
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.ReconfigRequest;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.server.ZooKeeperServer.PrecalculatedDigest;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumOracleMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.txn.CheckVersionTxn;
import org.apache.zookeeper.txn.CloseSessionTxn;
import org.apache.zookeeper.txn.CreateContainerTxn;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.CreateTTLTxn;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.DeleteTxn;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.MultiTxn;
import org.apache.zookeeper.txn.SetACLTxn;
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.Txn;
import org.apache.zookeeper.txn.TxnDigest;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This request processor is generally at the start of a RequestProcessor
* change. It sets up any transactions associated with requests that change the
* state of the system. It counts on ZooKeeperServer to update
* outstandingRequests, so that it can take into account transactions that are
* in the queue to be applied when generating a transaction.
*/
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessor.class);
/**
* this is only for testing purposes.
* should never be used otherwise
*/
private static boolean failCreate = false;
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<>();
private final RequestProcessor nextProcessor;
private final boolean digestEnabled;
private DigestCalculator digestCalculator;
ZooKeeperServer zks;
public enum DigestOpCode {
NOOP, ADD, REMOVE, UPDATE;
}
public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
super(
"ProcessThread(sid:" + zks.getServerId()
+ " cport:" + zks.getClientPort()
+ "):", zks.getZooKeeperServerListener());
this.nextProcessor = nextProcessor;
this.zks = zks;
this.digestEnabled = ZooKeeperServer.isDigestEnabled();
if (this.digestEnabled) {
this.digestCalculator = new DigestCalculator();
}
}
/**
* method for tests to set failCreate
* @param b
*/
public static void setFailCreate(boolean b) {
failCreate = b;
}
@Override
public void run() {
LOG.info(String.format("PrepRequestProcessor (sid:%d) started, reconfigEnabled=%s", zks.getServerId(), zks.reconfigEnabled));
try {
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
if (LOG.isTraceEnabled()) {
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
request.prepStartTime = Time.currentElapsedTime();
pRequest(request);
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
private ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException {
ChangeRecord lastChange = null;
synchronized (zks.outstandingChanges) {
lastChange = zks.outstandingChangesForPath.get(path);
if (lastChange == null) {
DataNode n = zks.getZKDatabase().getNode(path);
if (n != null) {
Set<String> children;
synchronized (n) {
children = n.getChildren();
}
lastChange = new ChangeRecord(-1, path, n.stat, children.size(), zks.getZKDatabase().aclForNode(n));
if (digestEnabled) {
lastChange.precalculatedDigest = new PrecalculatedDigest(
digestCalculator.calculateDigest(path, n), 0);
}
lastChange.data = n.getData();
}
}
}
if (lastChange == null || lastChange.stat == null) {
throw new KeeperException.NoNodeException(path);
}
return lastChange;
}
private ChangeRecord getOutstandingChange(String path) {
synchronized (zks.outstandingChanges) {
return zks.outstandingChangesForPath.get(path);
}
}
protected void addChangeRecord(ChangeRecord c) {
synchronized (zks.outstandingChanges) {
zks.outstandingChanges.add(c);
zks.outstandingChangesForPath.put(c.path, c);
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_QUEUED.add(1);
}
}
/**
* Grab current pending change records for each op in a multi-op.
*
* This is used inside MultiOp error code path to rollback in the event
* of a failed multi-op.
*
* @param multiRequest
* @return a map that contains previously existed records that probably need to be
* rolled back in any failure.
*/
private Map<String, ChangeRecord> getPendingChanges(MultiOperationRecord multiRequest) {
Map<String, ChangeRecord> pendingChangeRecords = new HashMap<>();
for (Op op : multiRequest) {
String path = op.getPath();
ChangeRecord cr = getOutstandingChange(path);
// only previously existing records need to be rolled back.
if (cr != null) {
pendingChangeRecords.put(path, cr);
}
/*
* ZOOKEEPER-1624 - We need to store for parent's ChangeRecord
* of the parent node of a request. So that if this is a
* sequential node creation request, rollbackPendingChanges()
* can restore previous parent's ChangeRecord correctly.
*
* Otherwise, sequential node name generation will be incorrect
* for a subsequent request.
*/
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1) {
continue;
}
String parentPath = path.substring(0, lastSlash);
ChangeRecord parentCr = getOutstandingChange(parentPath);
if (parentCr != null) {
pendingChangeRecords.put(parentPath, parentCr);
}
}
return pendingChangeRecords;
}
/**
* Rollback pending changes records from a failed multi-op.
*
* If a multi-op fails, we can't leave any invalid change records we created
* around. We also need to restore their prior value (if any) if their prior
* value is still valid.
*
* @param zxid
* @param pendingChangeRecords
*/
void rollbackPendingChanges(long zxid, Map<String, ChangeRecord> pendingChangeRecords) {
synchronized (zks.outstandingChanges) {
// Grab a list iterator starting at the END of the list so we can iterate in reverse
Iterator<ChangeRecord> iter = zks.outstandingChanges.descendingIterator();
while (iter.hasNext()) {
ChangeRecord c = iter.next();
if (c.zxid == zxid) {
iter.remove();
// Remove all outstanding changes for paths of this multi.
// Previous records will be added back later.
zks.outstandingChangesForPath.remove(c.path);
} else {
break;
}
}
// we don't need to roll back any records because there is nothing left.
if (zks.outstandingChanges.isEmpty()) {
return;
}
long firstZxid = zks.outstandingChanges.peek().zxid;
for (ChangeRecord c : pendingChangeRecords.values()) {
// Don't apply any prior change records less than firstZxid.
// Note that previous outstanding requests might have been removed
// once they are completed.
if (c.zxid < firstZxid) {
continue;
}
// add previously existing records back.
zks.outstandingChangesForPath.put(c.path, c);
}
}
}
/**
* Performs basic validation of a path for a create request.
* Throws if the path is not valid and returns the parent path.
* @throws BadArgumentsException
*/
private String validatePathForCreate(String path, long sessionId) throws BadArgumentsException {
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
LOG.info("Invalid path {} with session 0x{}", path, Long.toHexString(sessionId));
throw new KeeperException.BadArgumentsException(path);
}
return path.substring(0, lastSlash);
}
/**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
*/
protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException, IOException, RequestProcessorException {
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));
}
switch (type) {
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
pRequest2TxnCreate(type, request, record);
break;
}
case OpCode.deleteContainer: {
DeleteContainerRequest txn = (DeleteContainerRequest) record;
String path = txn.getPath();
String parentPath = getParentPathAndValidate(path);
ChangeRecord nodeRecord = getRecordForPath(path);
if (nodeRecord.childCount > 0) {
throw new KeeperException.NotEmptyException(path);
}
if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL) {
throw new KeeperException.BadVersionException(path);
}
ChangeRecord parentRecord = getRecordForPath(parentPath);
request.setTxn(new DeleteTxn(path));
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount--;
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
}
case OpCode.delete:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
DeleteRequest deleteRequest = (DeleteRequest) record;
String path = deleteRequest.getPath();
String parentPath = getParentPathAndValidate(path);
ChangeRecord parentRecord = getRecordForPath(parentPath);
zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
ChangeRecord nodeRecord = getRecordForPath(path);
checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
if (nodeRecord.childCount > 0) {
throw new KeeperException.NotEmptyException(path);
}
request.setTxn(new DeleteTxn(path));
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount--;
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
case OpCode.setData:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetDataRequest setDataRequest = (SetDataRequest) record;
path = setDataRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setVersion(newVersion);
nodeRecord.stat.setMtime(request.getHdr().getTime());
nodeRecord.stat.setMzxid(zxid);
nodeRecord.data = setDataRequest.getData();
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
case OpCode.reconfig:
if (!zks.isReconfigEnabled()) {
LOG.error("Reconfig operation requested but reconfig feature is disabled.");
throw new KeeperException.ReconfigDisabledException();
}
if (ZooKeeperServer.skipACL) {
LOG.warn("skipACL is set, reconfig operation will skip ACL checks!");
}
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
LeaderZooKeeperServer lzks;
try {
lzks = (LeaderZooKeeperServer) zks;
} catch (ClassCastException e) {
// standalone mode - reconfiguration currently not supported
throw new KeeperException.UnimplementedException();
}
QuorumVerifier lastSeenQV = lzks.self.getLastSeenQuorumVerifier();
// check that there's no reconfig in progress
if (lastSeenQV.getVersion() != lzks.self.getQuorumVerifier().getVersion()) {
throw new KeeperException.ReconfigInProgress();
}
ReconfigRequest reconfigRequest = (ReconfigRequest) record;
long configId = reconfigRequest.getCurConfigId();
if (configId != -1 && configId != lzks.self.getLastSeenQuorumVerifier().getVersion()) {
String msg = "Reconfiguration from version "
+ configId
+ " failed -- last seen version is "
+ lzks.self.getLastSeenQuorumVerifier().getVersion();
throw new KeeperException.BadVersionException(msg);
}
String newMembers = reconfigRequest.getNewMembers();
if (newMembers != null) { //non-incremental membership change
LOG.info("Non-incremental reconfig");
// Input may be delimited by either commas or newlines so convert to common newline separated format
newMembers = newMembers.replaceAll(",", "\n");
try {
Properties props = new Properties();
props.load(new StringReader(newMembers));
request.qv = QuorumPeerConfig.parseDynamicConfig(props, lzks.self.getElectionType(), true, false, lastSeenQV.getOraclePath());
request.qv.setVersion(request.getHdr().getZxid());
} catch (IOException | ConfigException e) {
throw new KeeperException.BadArgumentsException(e.getMessage());
}
} else { //incremental change - must be a majority quorum system
LOG.info("Incremental reconfig");
List<String> joiningServers = null;
String joiningServersString = reconfigRequest.getJoiningServers();
if (joiningServersString != null) {
joiningServers = StringUtils.split(joiningServersString, ",");
}
List<String> leavingServers = null;
String leavingServersString = reconfigRequest.getLeavingServers();
if (leavingServersString != null) {
leavingServers = StringUtils.split(leavingServersString, ",");
}
if (!(lastSeenQV instanceof QuorumMaj) && !(lastSeenQV instanceof QuorumOracleMaj)) {
String msg = "Incremental reconfiguration requested but last configuration seen has a non-majority quorum system";
LOG.warn(msg);
throw new KeeperException.BadArgumentsException(msg);
}
Map<Long, QuorumServer> nextServers = new HashMap<>(lastSeenQV.getAllMembers());
try {
if (leavingServers != null) {
for (String leaving : leavingServers) {
long sid = Long.parseLong(leaving);
nextServers.remove(sid);
}
}
if (joiningServers != null) {
for (String joiner : joiningServers) {
// joiner should have the following format: server.x = server_spec;client_spec
String[] parts = StringUtils.split(joiner, "=").toArray(new String[0]);
if (parts.length != 2) {
throw new KeeperException.BadArgumentsException("Wrong format of server string");
}
// extract server id x from first part of joiner: server.x
Long sid = Long.parseLong(parts[0].substring(parts[0].lastIndexOf('.') + 1));
QuorumServer qs = new QuorumServer(sid, parts[1]);
if ((qs.clientAddr == null && qs.secureClientAddr == null) || qs.electionAddr == null || qs.addr == null) {
throw new KeeperException.BadArgumentsException("Wrong format of server string - each server should have at least 3 ports specified");
}
// check duplication of addresses and ports
for (QuorumServer nqs : nextServers.values()) {
if (qs.id == nqs.id) {
continue;
}
qs.checkAddressDuplicate(nqs);
}
nextServers.remove(qs.id);
nextServers.put(qs.id, qs);
}
}
} catch (ConfigException e) {
throw new KeeperException.BadArgumentsException("Reconfiguration failed");
}
if (lastSeenQV instanceof QuorumMaj) {
request.qv = new QuorumMaj(nextServers);
} else {
request.qv = new QuorumOracleMaj(nextServers, lastSeenQV.getOraclePath());
}
request.qv.setVersion(request.getHdr().getZxid());
}
if (QuorumPeerConfig.isStandaloneEnabled() && request.qv.getVotingMembers().size() < 2) {
String msg = "Reconfig failed - new configuration must include at least 2 followers";
LOG.warn(msg);
throw new KeeperException.BadArgumentsException(msg);
} else if (request.qv.getVotingMembers().size() < 1) {
String msg = "Reconfig failed - new configuration must include at least 1 follower";
LOG.warn(msg);
throw new KeeperException.BadArgumentsException(msg);
}
if (!lzks.getLeader().isQuorumSynced(request.qv)) {
String msg2 = "Reconfig failed - there must be a connected and synced quorum in new configuration";
LOG.warn(msg2);
throw new KeeperException.NewConfigNoQuorum();
}
nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, null, null);
SetDataTxn setDataTxn = new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1);
request.setTxn(setDataTxn);
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setVersion(-1);
nodeRecord.stat.setMtime(request.getHdr().getTime());
nodeRecord.stat.setMzxid(zxid);
nodeRecord.data = setDataTxn.getData();
// Reconfig is currently a noop from digest computation
// perspective since config node is not covered by the digests.
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.NOOP, ZooDefs.CONFIG_NODE, nodeRecord.data, nodeRecord.stat);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
case OpCode.setACL:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetACLRequest setAclRequest = (SetACLRequest) record;
path = setAclRequest.getPath();
validatePath(path, request.sessionId);
List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
nodeRecord = getRecordForPath(path);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.ADMIN, request.authInfo, path, listACL);
newVersion = checkAndIncVersion(nodeRecord.stat.getAversion(), setAclRequest.getVersion(), path);
request.setTxn(new SetACLTxn(path, listACL, newVersion));
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
nodeRecord.stat.setAversion(newVersion);
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
break;
case OpCode.createSession:
CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());
break;
case OpCode.closeSession:
// We don't want to do this check since the session expiration thread
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
long startTime = Time.currentElapsedTime();
synchronized (zks.outstandingChanges) {
// need to move getEphemerals into zks.outstandingChanges
// synchronized block, otherwise there will be a race
// condition with the on flying deleteNode txn, and we'll
// delete the node again here, which is not correct
Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
for (ChangeRecord c : zks.outstandingChanges) {
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path2Delete : es) {
if (digestEnabled) {
parentPath = getParentPathAndValidate(path2Delete);
parentRecord = getRecordForPath(parentPath);
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
}
nodeRecord = new ChangeRecord(
request.getHdr().getZxid(), path2Delete, null, 0, null);
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.REMOVE, path2Delete);
addChangeRecord(nodeRecord);
}
if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);
break;
case OpCode.check:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
path = checkVersionRequest.getPath();
validatePath(path, request.sessionId);
nodeRecord = getRecordForPath(path);
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo, path, null);
request.setTxn(new CheckVersionTxn(
path,
checkAndIncVersion(nodeRecord.stat.getVersion(), checkVersionRequest.getVersion(), path)));
break;
default:
LOG.warn("unknown type {}", type);
break;
}
// If the txn is not going to mutate anything, like createSession,
// we just set the current tree digest in it
if (request.getTxnDigest() == null && digestEnabled) {
setTxnDigest(request);
}
}
private void pRequest2TxnCreate(int type, Request request, Record record) throws IOException, KeeperException {
int flags;
String path;
List<ACL> acl;
byte[] data;
long ttl;
if (type == OpCode.createTTL) {
CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
flags = createTtlRequest.getFlags();
path = createTtlRequest.getPath();
acl = createTtlRequest.getAcl();
data = createTtlRequest.getData();
ttl = createTtlRequest.getTtl();
} else {
CreateRequest createRequest = (CreateRequest) record;
flags = createRequest.getFlags();
path = createRequest.getPath();
acl = createRequest.getAcl();
data = createRequest.getData();
ttl = -1;
}
CreateMode createMode = CreateMode.fromFlag(flags);
validateCreateRequest(path, createMode, request, ttl);
String parentPath = validatePathForCreate(path, request.sessionId);
List<ACL> listACL = fixupACL(path, request.authInfo, acl);
ChangeRecord parentRecord = getRecordForPath(parentPath);
zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
int parentCVersion = parentRecord.stat.getCversion();
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);
try {
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
int newCversion = parentRecord.stat.getCversion() + 1;
zks.checkQuota(path, null, data, OpCode.create);
if (type == OpCode.createContainer) {
request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
} else if (type == OpCode.createTTL) {
request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
} else {
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
}
TxnHeader hdr = request.getHdr();
long ephemeralOwner = 0;
if (createMode.isContainer()) {
ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
} else if (createMode.isTTL()) {
ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
} else if (createMode.isEphemeral()) {
ephemeralOwner = request.sessionId;
}
StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
ChangeRecord nodeRecord = new ChangeRecord(
request.getHdr().getZxid(), path, s, 0, listACL);
nodeRecord.data = data;
nodeRecord.precalculatedDigest = precalculateDigest(
DigestOpCode.ADD, path, nodeRecord.data, s);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
}
private void validatePath(String path, long sessionId) throws BadArgumentsException {
try {
PathUtils.validatePath(path);
} catch (IllegalArgumentException ie) {
LOG.info("Invalid path {} with session 0x{}, reason: {}", path, Long.toHexString(sessionId), ie.getMessage());
throw new BadArgumentsException(path);
}
}
private String getParentPathAndValidate(String path) throws BadArgumentsException {
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1 || zks.getZKDatabase().isSpecialPath(path)) {
throw new BadArgumentsException(path);
}
return path.substring(0, lastSlash);
}
private static int checkAndIncVersion(int currentVersion, int expectedVersion, String path) throws KeeperException.BadVersionException {
if (expectedVersion != -1 && expectedVersion != currentVersion) {
throw new KeeperException.BadVersionException(path);
}
// Increase once more when going back to -1 from Integer.MIN_VALUE. Otherwise, the client will
// receive a new data version -1. And Now, if the client wants to check the data version, it can
// only pass -1 as the next expected version, but -1 as the expected version means do not check
// the data version. So the client is unable to express the expected manner.
//
// See also https://issues.apache.org/jira/browse/ZOOKEEPER-4743.
int nextVersion = currentVersion + 1;
if (nextVersion == -1) {
return 0;
} else {
return nextVersion;
}
}
/**
* This method will be called inside the ProcessRequestThread, which is a
* singleton, so there will be a single thread calling this code.
*
* @param request
*/
protected void pRequest(Request request) throws RequestProcessorException {
request.setHdr(null);
request.setTxn(null);
if (!request.isThrottled()) {
pRequestHelper(request);
}
request.zxid = zks.getZxid();
long timeFinishedPrepare = Time.currentElapsedTime();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(timeFinishedPrepare - request.prepStartTime);
nextProcessor.processRequest(request);
ServerMetrics.getMetrics().PROPOSAL_PROCESS_TIME.add(Time.currentElapsedTime() - timeFinishedPrepare);
}
/**
* This method is a helper to pRequest method
*/
private void pRequestHelper(Request request) {
try {
switch (request.type) {
case OpCode.createContainer:
case OpCode.create:
case OpCode.create2:
CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
break;
case OpCode.createTTL:
CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
break;
case OpCode.deleteContainer:
DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
break;
case OpCode.delete:
DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
break;
case OpCode.reconfig:
ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
break;
case OpCode.multi:
MultiOperationRecord multiRequest;
try {
multiRequest = request.readRequestRecord(MultiOperationRecord::new);
} catch (IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
throw e;
}
List<Txn> txns = new ArrayList<>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;
//Store off current pending change records in case we need to rollback
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
for (Op op : multiRequest) {
Record subrequest = op.toRequestRecord();
int type;
Record txn;
/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} else {
/* Prep the request and convert to a Txn */
try {
pRequest2Txn(op.getType(), zxid, request, subrequest);
type = op.getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());
if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting"
+ " remaining multi ops. Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}
// TODO: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
byte[] bb = RequestRecord.fromRecord(txn).readBytes();
txns.add(new Txn(type, bb));
}
request.setTxn(new MultiTxn(txns));
if (digestEnabled) {
setTxnDigest(request);
}
break;
//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request, null);
}
break;
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getAllChildrenNumber:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
case OpCode.setWatches2:
case OpCode.checkWatches:
case OpCode.removeWatches:
case OpCode.getEphemerals:
case OpCode.multiRead:
case OpCode.addWatch:
case OpCode.whoAmI:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
default:
LOG.warn("unknown type {}", request.type);
break;
}
} catch (KeeperException e) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));
}
if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info(
"Got user-level KeeperException when processing {} Error Path:{} Error:{}",
request.toString(),
e.getPath(),
e.getMessage());
}
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process {}", request, e);
String digest = request.requestDigest();
LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
if (request.getHdr() == null) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type));
}
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
}
}
private static List<ACL> removeDuplicates(final List<ACL> acls) {
if (acls == null || acls.isEmpty()) {
return Collections.emptyList();
}
// This would be done better with a Set but ACL hashcode/equals do not
// allow for null values
final ArrayList<ACL> retval = new ArrayList<>(acls.size());
for (final ACL acl : acls) {
if (!retval.contains(acl)) {
retval.add(acl);
}
}
return retval;
}
private void validateCreateRequest(String path, CreateMode createMode, Request request, long ttl) throws KeeperException {
if (createMode.isTTL() && !EphemeralType.extendedEphemeralTypesEnabled()) {
throw new KeeperException.UnimplementedException();
}
try {
EphemeralType.validateTTL(createMode, ttl);
} catch (IllegalArgumentException e) {
throw new BadArgumentsException(path);
}
if (createMode.isEphemeral()) {
// Exception is set when local session failed to upgrade
// so we just need to report the error
if (request.getException() != null) {
throw request.getException();
}
zks.sessionTracker.checkGlobalSession(request.sessionId, request.getOwner());
} else {
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
}
}
/**
* This method checks out the acl making sure it isn't null or empty,
* it has valid schemes and ids, and expanding any relative ids that
* depend on the requestor's authentication information.
*
* @param authInfo list of ACL IDs associated with the client connection
* @param acls list of ACLs being assigned to the node (create or setACL operation)
* @return verified and expanded ACLs
* @throws KeeperException.InvalidACLException
*/
public static List<ACL> fixupACL(String path, List<Id> authInfo, List<ACL> acls) throws KeeperException.InvalidACLException {
// check for well formed ACLs
// This resolves https://issues.apache.org/jira/browse/ZOOKEEPER-1877
List<ACL> uniqacls = removeDuplicates(acls);
if (uniqacls == null || uniqacls.size() == 0) {
throw new KeeperException.InvalidACLException(path);
}
List<ACL> rv = new ArrayList<>();
for (ACL a : uniqacls) {
LOG.debug("Processing ACL: {}", a);
if (a == null) {
throw new KeeperException.InvalidACLException(path);
}
Id id = a.getId();
if (id == null || id.getScheme() == null) {
throw new KeeperException.InvalidACLException(path);
}
if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
rv.add(a);
} else if (id.getScheme().equals("auth")) {
// This is the "auth" id, so we have to expand it to the
// authenticated ids of the requestor
boolean authIdValid = false;
for (Id cid : authInfo) {
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(cid.getScheme());
if (ap == null) {
LOG.error("Missing AuthenticationProvider for {}", cid.getScheme());
} else if (ap.isAuthenticated()) {
authIdValid = true;
rv.add(new ACL(a.getPerms(), cid));
}
}
if (!authIdValid) {
throw new KeeperException.InvalidACLException(path);
}
} else {
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(id.getScheme());
if (ap == null || !ap.isValid(id.getId())) {
throw new KeeperException.InvalidACLException(path);
}
rv.add(a);
}
}
return rv;
}
public void processRequest(Request request) {
request.prepQueueStartTime = Time.currentElapsedTime();
submittedRequests.add(request);
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
}
public void shutdown() {
LOG.info("Shutting down");
submittedRequests.clear();
submittedRequests.add(Request.requestOfDeath);
nextProcessor.shutdown();
}
/**
* Calculate the node digest and tree digest after the change.
*
* @param type the type of operations about the digest change
* @param path the path of the node
* @param data the data of the node
* @param s the stat of the node
*
* @return PrecalculatedDigest the pair of node and tree digest
*/
private PrecalculatedDigest precalculateDigest(DigestOpCode type, String path,
byte[] data, StatPersisted s) throws KeeperException.NoNodeException {
if (!digestEnabled) {
return null;
}
long prevNodeDigest;
long newNodeDigest;
switch (type) {
case ADD:
prevNodeDigest = 0;
newNodeDigest = digestCalculator.calculateDigest(path, data, s);
break;
case REMOVE:
prevNodeDigest = getRecordForPath(path).precalculatedDigest.nodeDigest;
newNodeDigest = 0;
break;
case UPDATE:
prevNodeDigest = getRecordForPath(path).precalculatedDigest.nodeDigest;
newNodeDigest = digestCalculator.calculateDigest(path, data, s);
break;
case NOOP:
newNodeDigest = prevNodeDigest = 0;
break;
default:
return null;
}
long treeDigest = getCurrentTreeDigest() - prevNodeDigest + newNodeDigest;
return new PrecalculatedDigest(newNodeDigest, treeDigest);
}
private PrecalculatedDigest precalculateDigest(
DigestOpCode type, String path) throws KeeperException.NoNodeException {
return precalculateDigest(type, path, null, null);
}
/**
* Query the current tree digest from DataTree or outstandingChanges list.
*
* @return current tree digest
*/
private long getCurrentTreeDigest() {
long digest;
synchronized (zks.outstandingChanges) {
if (zks.outstandingChanges.isEmpty()) {
digest = zks.getZKDatabase().getDataTree().getTreeDigest();
LOG.debug("Digest got from data tree is: {}", digest);
} else {
digest = zks.outstandingChanges.peekLast().precalculatedDigest.treeDigest;
LOG.debug("Digest got from outstandingChanges is: {}", digest);
}
}
return digest;
}
private void setTxnDigest(Request request) {
request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), getCurrentTreeDigest()));
}
private void setTxnDigest(Request request, PrecalculatedDigest preCalculatedDigest) {
if (preCalculatedDigest == null) {
return;
}
request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), preCalculatedDigest.treeDigest));
}
}