blob: 8803a73cbab15c8070bdb68c1c123a94cae33c95 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import javax.security.sasl.SaslException;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(SessionUpgradeQuorumTest.class);
public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
public static final int SERVER_COUNT = 3;
private MainThread[] mt;
private int[] clientPorts;
private TestQPMainDropSessionUpgrading[] qpMain;
@BeforeEach
public void setUp() throws Exception {
LOG.info("STARTING quorum {}", getClass().getName());
// setup the env with RetainDB and local session upgrading
ClientBase.setupTestEnv();
mt = new MainThread[SERVER_COUNT];
clientPorts = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
sb.append("server.").append(i).append("=127.0.0.1:").append(PortAssignment.unique()).append(":").append(PortAssignment.unique()).append("\n");
}
sb.append("localSessionsEnabled=true\n");
sb.append("localSessionsUpgradingEnabled=true\n");
String cfg = sb.toString();
// create a 3 server ensemble
qpMain = new TestQPMainDropSessionUpgrading[SERVER_COUNT];
for (int i = 0; i < SERVER_COUNT; i++) {
final TestQPMainDropSessionUpgrading qp = new TestQPMainDropSessionUpgrading();
qpMain[i] = qp;
mt[i] = new MainThread(i, clientPorts[i], cfg, false) {
@Override
public TestQPMain getTestQPMain() {
return qp;
}
};
mt[i].start();
}
for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
}
}
@AfterEach
public void tearDown() throws Exception {
LOG.info("STOPPING quorum {}", getClass().getName());
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
}
@Test
public void testLocalSessionUpgradeSnapshot() throws IOException, InterruptedException {
// select the candidate of follower
int leader = -1;
int followerA = -1;
for (int i = SERVER_COUNT - 1; i >= 0; i--) {
if (mt[i].main.quorumPeer.leader != null) {
leader = i;
} else if (followerA == -1) {
followerA = i;
}
}
LOG.info("follower A is {}", followerA);
qpMain[followerA].setDropCreateSession(true);
// create a client, and create an ephemeral node to trigger the
// upgrading process
final String node = "/node-1";
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[followerA], ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk, States.CONNECTED);
// clone the session id and passwd for later usage
long sessionId = zk.getSessionId();
// should fail because of the injection
try {
zk.create(node, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
fail("expect to failed to upgrade session due to the "
+ "TestQPMainDropSessionUpgrading is being used");
} catch (KeeperException e) {
LOG.info("KeeperException when create ephemeral node.", e);
}
// force to take snapshot
qpMain[followerA].quorumPeer.follower.zk.takeSnapshot(true);
// wait snapshot finish
Thread.sleep(500);
// shutdown all servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
ArrayList<States> waitStates = new ArrayList<States>();
waitStates.add(States.CONNECTING);
waitStates.add(States.CLOSED);
waitForOne(zk, waitStates);
// start the servers again, start follower A last as we want to
// keep it running as follower
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].start();
}
for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT), "waiting for server " + i + " being up");
}
// check global session not exist on follower A
for (int i = 0; i < SERVER_COUNT; i++) {
ConcurrentHashMap<Long, Integer> sessions = mt[i].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
assertFalse(sessions.containsKey(sessionId),
"server " + i + " should not have global " + "session " + sessionId);
}
zk.close();
}
@Test
public void testOnlyUpgradeSessionOnce() throws IOException, InterruptedException, KeeperException {
// create a client, and create an ephemeral node to trigger the
// upgrading process
final String node = "/node-1";
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[0], ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk, States.CONNECTED);
long sessionId = zk.getSessionId();
QuorumZooKeeperServer server = (QuorumZooKeeperServer) mt[0].main.quorumPeer.getActiveServer();
Request create1 = createEphemeralRequest("/data-1", sessionId);
Request create2 = createEphemeralRequest("/data-2", sessionId);
assertNotNull(server.checkUpgradeSession(create1), "failed to upgrade on a ephemeral create");
assertNull(server.checkUpgradeSession(create2), "tried to upgrade again");
// clean al the setups and close the zk
zk.close();
}
@Test
public void testCloseSessionWhileUpgradeOnLeader()
throws IOException, KeeperException, InterruptedException {
int leaderId = -1;
for (int i = SERVER_COUNT - 1; i >= 0; i--) {
if (mt[i].main.quorumPeer.leader != null) {
leaderId = i;
}
}
if (leaderId > 0) {
makeSureEphemeralIsGone(leaderId);
}
}
@Test
public void testCloseSessionWhileUpgradeOnLearner()
throws IOException, KeeperException, InterruptedException {
int learnerId = -1;
for (int i = SERVER_COUNT - 1; i >= 0; i--) {
if (mt[i].main.quorumPeer.follower != null) {
learnerId = i;
}
}
if (learnerId > 0) {
makeSureEphemeralIsGone(learnerId);
}
}
private void makeSureEphemeralIsGone(int sid)
throws IOException, KeeperException, InterruptedException {
// Delay submit request to simulate the request queued in
// RequestThrottler
qpMain[sid].setSubmitDelayMs(200);
// Create a client and an ephemeral node
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk, States.CONNECTED);
final String node = "/node-1";
zk.create(node, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL, new StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx,
String name) {}
}, null);
// close the client
zk.close();
// make sure the ephemeral is gone
zk = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk, States.CONNECTED);
assertNull(zk.exists(node, false));
zk.close();
}
private static class TestQPMainDropSessionUpgrading extends TestQPMain {
private volatile boolean shouldDrop = false;
private volatile int submitDelayMs = 0;
public void setDropCreateSession(boolean dropCreateSession) {
shouldDrop = dropCreateSession;
}
public void setSubmitDelayMs(int delay) {
this.submitDelayMs = delay;
}
@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new QuorumPeer() {
@Override
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
return new Leader(this, new LeaderZooKeeperServer(
logFactory, this, this.getZkDb()) {
@Override
public void submitRequestNow(Request si) {
if (submitDelayMs > 0) {
try {
Thread.sleep(submitDelayMs);
} catch (Exception e) {}
}
super.submitRequestNow(si);
}
});
}
@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
@Override
public void submitRequestNow(Request si) {
if (submitDelayMs > 0) {
try {
Thread.sleep(submitDelayMs);
} catch (Exception e) {}
}
super.submitRequestNow(si);
}
}) {
@Override
protected void request(Request request) throws IOException {
if (!shouldDrop) {
super.request(request);
return;
}
LOG.info("request is {}, cnxn {}", request.type, request.cnxn);
if (request.type == ZooDefs.OpCode.createSession) {
LOG.info("drop createSession request {}", request);
return;
}
if (request.type == ZooDefs.OpCode.create && request.cnxn != null) {
CreateRequest createRequest = new CreateRequest();
request.readRequestRecord(createRequest);
try {
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isEphemeral()) {
request.cnxn.sendCloseSession();
}
} catch (KeeperException ignore) {
}
return;
}
super.request(request);
}
};
}
};
}
}
private void waitForOne(ZooKeeper zk, ArrayList<States> states) throws InterruptedException {
int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
while (!states.contains(zk.getState())) {
if (iterations-- == 0) {
LOG.info("state is {}", zk.getState());
throw new RuntimeException("Waiting too long");
}
Thread.sleep(500);
}
}
private Request createEphemeralRequest(String path, long sessionId) throws IOException {
ByteArrayOutputStream boas = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
createRequest.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<Id>());
}
}