blob: c6cd93b4d18021b6380f42f7bb9fcdb62e09e38b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MultiOperationRecord;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
/**
* Abstract base class for all ZooKeeperServers that participate in
* a quorum.
*/
public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
public final QuorumPeer self;
protected UpgradeableSessionTracker upgradeableSessionTracker;
protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout,
int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) {
super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout, listenBacklog, zkDb, self.getInitialConfig());
this.self = self;
}
@Override
protected void startSessionTracker() {
upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker;
upgradeableSessionTracker.start();
}
public Request checkUpgradeSession(Request request) throws IOException, KeeperException {
// If this is a request for a local session and it is to
// create an ephemeral node, then upgrade the session and return
// a new session request for the leader.
// This is called by the request processor thread (either follower
// or observer request processor), which is unique to a learner.
// So will not be called concurrently by two threads.
if ((request.type != OpCode.create && request.type != OpCode.create2 && request.type != OpCode.multi)
|| !upgradeableSessionTracker.isLocalSession(request.sessionId)) {
return null;
}
if (OpCode.multi == request.type) {
MultiOperationRecord multiTransactionRecord = new MultiOperationRecord();
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, multiTransactionRecord);
request.request.rewind();
boolean containsEphemeralCreate = false;
for (Op op : multiTransactionRecord) {
if (op.getType() == OpCode.create || op.getType() == OpCode.create2) {
CreateRequest createRequest = (CreateRequest) op.toRequestRecord();
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isEphemeral()) {
containsEphemeralCreate = true;
break;
}
}
}
if (!containsEphemeralCreate) {
return null;
}
} else {
CreateRequest createRequest = new CreateRequest();
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
request.request.rewind();
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (!createMode.isEphemeral()) {
return null;
}
}
// Uh oh. We need to upgrade before we can proceed.
if (!self.isLocalSessionsUpgradingEnabled()) {
throw new KeeperException.EphemeralOnLocalSessionException();
}
return makeUpgradeRequest(request.sessionId);
}
private Request makeUpgradeRequest(long sessionId) {
// Make sure to atomically check local session status, upgrade
// session, and make the session creation request. This is to
// avoid another thread upgrading the session in parallel.
synchronized (upgradeableSessionTracker) {
if (upgradeableSessionTracker.isLocalSession(sessionId)) {
int timeout = upgradeableSessionTracker.upgradeSession(sessionId);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
return new Request(null, sessionId, 0, OpCode.createSession, to, null);
}
}
return null;
}
/**
* Implements the SessionUpgrader interface,
*
* @param sessionId
*/
public void upgrade(long sessionId) {
Request request = makeUpgradeRequest(sessionId);
if (request != null) {
LOG.info("Upgrading session 0x{}", Long.toHexString(sessionId));
// This must be a global request
submitRequest(request);
}
}
@Override
protected void setLocalSessionFlag(Request si) {
// We need to set isLocalSession to tree for these type of request
// so that the request processor can process them correctly.
switch (si.type) {
case OpCode.createSession:
if (self.areLocalSessionsEnabled()) {
// All new sessions local by default.
si.setLocalSession(true);
}
break;
case OpCode.closeSession:
String reqType = "global";
if (upgradeableSessionTracker.isLocalSession(si.sessionId)) {
si.setLocalSession(true);
reqType = "local";
}
LOG.info("Submitting {} closeSession request for session 0x{}", reqType, Long.toHexString(si.sessionId));
break;
default:
break;
}
}
@Override
public void dumpConf(PrintWriter pwriter) {
super.dumpConf(pwriter);
pwriter.print("initLimit=");
pwriter.println(self.getInitLimit());
pwriter.print("syncLimit=");
pwriter.println(self.getSyncLimit());
pwriter.print("electionAlg=");
pwriter.println(self.getElectionType());
pwriter.print("electionPort=");
pwriter.println(self.getElectionAddress().getAllPorts()
.stream().map(Objects::toString).collect(Collectors.joining("|")));
pwriter.print("quorumPort=");
pwriter.println(self.getQuorumAddress().getAllPorts()
.stream().map(Objects::toString).collect(Collectors.joining("|")));
pwriter.print("peerType=");
pwriter.println(self.getLearnerType().ordinal());
pwriter.println("membership: ");
pwriter.print(new String(self.getQuorumVerifier().toString().getBytes()));
}
@Override
protected void setState(State state) {
this.state = state;
}
@Override
protected void registerMetrics() {
super.registerMetrics();
MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
rootContext.registerGauge("quorum_size", () -> {
return self.getQuorumSize();
});
}
@Override
protected void unregisterMetrics() {
super.unregisterMetrics();
MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();
rootContext.unregisterGauge("quorum_size");
}
@Override
public void dumpMonitorValues(BiConsumer<String, Object> response) {
super.dumpMonitorValues(response);
response.accept("peer_state", self.getDetailedPeerState());
}
}