blob: 3bdbcd908dcf81014ebea86b07afcbe081b0cdeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.server.quorum.ZabUtils.MockLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createMockLeader;
import static org.apache.zookeeper.server.quorum.ZabUtils.createQuorumPeer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.ByteBufferOutputStream;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.TestUtils;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.ErrorTxn;
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.TxnHeader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Zab1_0Test extends ZKTestCase {
private static final Logger LOG = LoggerFactory.getLogger(Zab1_0Test.class);
private static final File testData = new File(System.getProperty("test.data.dir", "src/test/resources/data"));
@BeforeEach
public void setUp() {
System.setProperty("zookeeper.admin.enableServer", "false");
}
private static final class LeadThread extends Thread {
private final Leader leader;
private LeadThread(Leader leader) {
this.leader = leader;
}
public void run() {
try {
leader.lead();
} catch (InterruptedException e) {
LOG.info("Leader thread interrupted", e);
} catch (Exception e) {
LOG.warn("Unexpected exception in leader thread", e);
} finally {
leader.shutdown("lead ended");
}
}
}
public static final class FollowerMockThread extends Thread {
private final Leader leader;
private final long followerSid;
public long epoch = -1;
public String msg = null;
private boolean onlyGetEpochToPropose;
private FollowerMockThread(long followerSid, Leader leader, boolean onlyGetEpochToPropose) {
this.leader = leader;
this.followerSid = followerSid;
this.onlyGetEpochToPropose = onlyGetEpochToPropose;
}
public void run() {
if (onlyGetEpochToPropose) {
try {
epoch = leader.getEpochToPropose(followerSid, 0);
} catch (Exception e) {
}
} else {
try {
leader.waitForEpochAck(followerSid, new StateSummary(0, 0));
msg = "FollowerMockThread (id = " + followerSid + ") returned from waitForEpochAck";
} catch (Exception e) {
}
}
}
}
@Test
public void testLeaderInConnectingFollowers() throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
Leader leader = null;
try {
QuorumPeer peer = createQuorumPeer(tmpDir);
leader = createLeader(tmpDir, peer);
peer.leader = leader;
peer.setAcceptedEpoch(5);
FollowerMockThread f1 = new FollowerMockThread(1, leader, true);
FollowerMockThread f2 = new FollowerMockThread(2, leader, true);
f1.start();
f2.start();
// wait until followers time out in getEpochToPropose - they shouldn't return
// normally because the leader didn't execute getEpochToPropose and so its epoch was not
// accounted for
f1.join(leader.self.getInitLimit() * leader.self.getTickTime() + 5000);
f2.join(leader.self.getInitLimit() * leader.self.getTickTime() + 5000);
// even though followers timed out, their ids are in connectingFollowers, and their
// epoch were accounted for, so the leader should not block and since it started with
// accepted epoch = 5 it should now have 6
try {
long epoch = leader.getEpochToPropose(leader.self.getId(), leader.self.getAcceptedEpoch());
assertEquals(6, epoch, "leader got wrong epoch from getEpochToPropose");
} catch (Exception e) {
fail("leader timed out in getEpochToPropose");
}
} finally {
if (leader != null) {
leader.shutdown("end of test");
}
TestUtils.deleteFileRecursively(tmpDir);
}
}
/**
* In this test, the leader sets the last accepted epoch to 5. The call
* to getEpochToPropose should set epoch to 6 and wait until another
* follower executes it. If in getEpochToPropose we don't check if
* lastAcceptedEpoch == epoch, then the call from the subsequent
* follower with lastAcceptedEpoch = 6 doesn't change the value
* of epoch, and the test fails. It passes with the fix to predicate.
*
* https://issues.apache.org/jira/browse/ZOOKEEPER-1343
*
*
* @throws Exception
*/
@Test
public void testLastAcceptedEpoch() throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
Leader leader = null;
LeadThread leadThread = null;
try {
QuorumPeer peer = createQuorumPeer(tmpDir);
leader = createMockLeader(tmpDir, peer);
peer.leader = leader;
peer.setAcceptedEpoch(5);
leadThread = new LeadThread(leader);
leadThread.start();
while (((MockLeader) leader).getCurrentEpochToPropose() != 6) {
Thread.sleep(20);
}
try {
long epoch = leader.getEpochToPropose(1, 6);
assertEquals(7, epoch, "New proposed epoch is wrong");
} catch (Exception e) {
fail("Timed out in getEpochToPropose");
}
} finally {
if (leader != null) {
leader.shutdown("end of test");
}
if (leadThread != null) {
leadThread.interrupt();
leadThread.join();
}
TestUtils.deleteFileRecursively(tmpDir);
}
}
@Test
public void testLeaderInElectingFollowers() throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
Leader leader = null;
try {
QuorumPeer peer = createQuorumPeer(tmpDir);
leader = createLeader(tmpDir, peer);
peer.leader = leader;
FollowerMockThread f1 = new FollowerMockThread(1, leader, false);
FollowerMockThread f2 = new FollowerMockThread(2, leader, false);
// things needed for waitForEpochAck to run (usually in leader.lead(), but we're not running leader here)
leader.leaderStateSummary = new StateSummary(leader.self.getCurrentEpoch(), leader.zk.getLastProcessedZxid());
f1.start();
f2.start();
// wait until followers time out in waitForEpochAck - they shouldn't return
// normally because the leader didn't execute waitForEpochAck
f1.join(leader.self.getInitLimit() * leader.self.getTickTime() + 5000);
f2.join(leader.self.getInitLimit() * leader.self.getTickTime() + 5000);
// make sure that they timed out and didn't return normally
assertTrue(f1.msg == null, f1.msg + " without waiting for leader");
assertTrue(f2.msg == null, f2.msg + " without waiting for leader");
} finally {
if (leader != null) {
leader.shutdown("end of test");
}
TestUtils.deleteFileRecursively(tmpDir);
}
}
static Socket[] getSocketPair() throws IOException {
ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
InetSocketAddress endPoint = (InetSocketAddress) ss.getLocalSocketAddress();
Socket s = new Socket(endPoint.getAddress(), endPoint.getPort());
return new Socket[]{s, ss.accept()};
}
static void readPacketSkippingPing(InputArchive ia, QuorumPacket qp) throws IOException {
while (true) {
ia.readRecord(qp, null);
if (qp.getType() != Leader.PING) {
return;
}
}
}
public interface LeaderConversation {
void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws Exception;
}
public interface PopulatedLeaderConversation {
void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception;
}
public interface FollowerConversation {
void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception;
}
public interface ObserverConversation {
void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) throws Exception;
}
public void testLeaderConversation(LeaderConversation conversation) throws Exception {
Socket[] pair = getSocketPair();
Socket leaderSocket = pair[0];
Socket followerSocket = pair[1];
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
LeadThread leadThread = null;
Leader leader = null;
try {
QuorumPeer peer = createQuorumPeer(tmpDir);
leader = createLeader(tmpDir, peer);
peer.leader = leader;
leadThread = new LeadThread(leader);
leadThread.start();
while (leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) {
Thread.sleep(20);
}
LearnerHandler lh = new LearnerHandler(leaderSocket, new BufferedInputStream(leaderSocket.getInputStream()), leader);
lh.start();
leaderSocket.setSoTimeout(4000);
InputArchive ia = BinaryInputArchive.getArchive(followerSocket.getInputStream());
OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket.getOutputStream());
conversation.converseWithLeader(ia, oa, leader);
} finally {
if (leader != null) {
leader.shutdown("end of test");
}
if (leadThread != null) {
leadThread.interrupt();
leadThread.join();
}
TestUtils.deleteFileRecursively(tmpDir);
}
}
public void testPopulatedLeaderConversation(PopulatedLeaderConversation conversation, int ops) throws Exception {
Socket[] pair = getSocketPair();
Socket leaderSocket = pair[0];
Socket followerSocket = pair[1];
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
LeadThread leadThread = null;
Leader leader = null;
try {
// Setup a database with two znodes
FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir);
ZKDatabase zkDb = new ZKDatabase(snapLog);
assertTrue(ops >= 1);
long zxid = ZxidUtils.makeZxid(1, 0);
for (int i = 1; i <= ops; i++) {
zxid = ZxidUtils.makeZxid(1, i);
String path = "/foo-" + i;
zkDb.processTxn(new TxnHeader(13, 1000 + i, zxid, 30 + i, ZooDefs.OpCode.create),
new CreateTxn(path, "fpjwasalsohere".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
Stat stat = new Stat();
assertEquals("fpjwasalsohere", new String(zkDb.getData(path, stat, null)));
}
assertTrue(zxid > ZxidUtils.makeZxid(1, 0));
// Generate snapshot and close files.
snapLog.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), false);
snapLog.close();
QuorumPeer peer = createQuorumPeer(tmpDir);
leader = createLeader(tmpDir, peer);
peer.leader = leader;
// Set the last accepted epoch and current epochs to be 1
peer.setAcceptedEpoch(1);
peer.setCurrentEpoch(1);
leadThread = new LeadThread(leader);
leadThread.start();
while (leader.cnxAcceptor == null || !leader.cnxAcceptor.isAlive()) {
Thread.sleep(20);
}
LearnerHandler lh = new LearnerHandler(leaderSocket, new BufferedInputStream(leaderSocket.getInputStream()), leader);
lh.start();
leaderSocket.setSoTimeout(4000);
InputArchive ia = BinaryInputArchive.getArchive(followerSocket.getInputStream());
OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket.getOutputStream());
conversation.converseWithLeader(ia, oa, leader, zxid);
} finally {
if (leader != null) {
leader.shutdown("end of test");
}
if (leadThread != null) {
leadThread.interrupt();
leadThread.join();
}
TestUtils.deleteFileRecursively(tmpDir);
}
}
public void testFollowerConversation(FollowerConversation conversation) throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
Thread followerThread = null;
ConversableFollower follower = null;
QuorumPeer peer = null;
try {
peer = createQuorumPeer(tmpDir);
follower = createFollower(tmpDir, peer);
peer.follower = follower;
ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
QuorumServer leaderQS = new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress());
follower.setLeaderQuorumServer(leaderQS);
final Follower followerForThread = follower;
followerThread = new Thread() {
public void run() {
try {
followerForThread.followLeader();
} catch (InterruptedException e) {
LOG.info("Follower thread interrupted", e);
} catch (Exception e) {
LOG.warn("Unexpected exception in follower thread", e);
}
}
};
followerThread.start();
Socket leaderSocket = ss.accept();
InputArchive ia = BinaryInputArchive.getArchive(leaderSocket.getInputStream());
OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket.getOutputStream());
conversation.converseWithFollower(ia, oa, follower);
} finally {
if (follower != null) {
follower.shutdown();
}
if (followerThread != null) {
followerThread.interrupt();
followerThread.join();
}
if (peer != null) {
peer.shutdown();
}
TestUtils.deleteFileRecursively(tmpDir);
}
}
public void testObserverConversation(ObserverConversation conversation) throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
Thread observerThread = null;
ConversableObserver observer = null;
QuorumPeer peer = null;
try {
peer = createQuorumPeer(tmpDir);
peer.setSyncEnabled(true);
observer = createObserver(tmpDir, peer);
peer.observer = observer;
ServerSocket ss = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
QuorumServer leaderQS = new QuorumServer(1, (InetSocketAddress) ss.getLocalSocketAddress());
observer.setLeaderQuorumServer(leaderQS);
final Observer observerForThread = observer;
observerThread = new Thread() {
public void run() {
try {
observerForThread.observeLeader();
} catch (Exception e) {
e.printStackTrace();
}
}
};
observerThread.start();
Socket leaderSocket = ss.accept();
InputArchive ia = BinaryInputArchive.getArchive(leaderSocket.getInputStream());
OutputArchive oa = BinaryOutputArchive.getArchive(leaderSocket.getOutputStream());
conversation.converseWithObserver(ia, oa, observer);
} finally {
if (observer != null) {
observer.shutdown();
}
if (observerThread != null) {
observerThread.interrupt();
observerThread.join();
}
if (peer != null) {
peer.shutdown();
}
TestUtils.deleteFileRecursively(tmpDir);
}
}
@Test
public void testUnnecessarySnap() throws Exception {
testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
@Override
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long zxid) throws Exception {
assertEquals(1, l.self.getAcceptedEpoch());
assertEquals(1, l.self.getCurrentEpoch());
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
assertEquals(2, l.self.getAcceptedEpoch());
assertEquals(1, l.self.getCurrentEpoch());
byte[] epochBytes = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
wrappedEpochBytes.putInt(1);
qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.DIFF, qp.getType());
}
}, 2);
}
// We want to track the change with a callback rather than depending on timing
class TrackerWatcher implements Watcher {
boolean changed;
synchronized void waitForChange() throws InterruptedException {
while (!changed) {
wait();
}
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
synchronized (this) {
changed = true;
notifyAll();
}
}
}
public synchronized boolean changed() {
return changed;
}
}
@Test
public void testNormalFollowerRun() throws Exception {
testFollowerConversation(new FollowerConversation() {
@Override
public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
//Spy on ZK so we can check if a snapshot happened or not.
f.zk = spy(f.zk);
try {
assertEquals(0, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());
// Setup a database with a single /foo node
ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
final long firstZxid = ZxidUtils.makeZxid(1, 1);
zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
Stat stat = new Stat();
assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
QuorumPacket qp = new QuorumPacket();
readPacketSkippingPing(ia, qp);
assertEquals(Leader.FOLLOWERINFO, qp.getType());
assertEquals(qp.getZxid(), 0);
LearnerInfo learnInfo = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
assertEquals(learnInfo.getProtocolVersion(), 0x10000);
assertEquals(learnInfo.getServerid(), 0);
// We are simulating an established leader, so the epoch is 1
qp.setType(Leader.LEADERINFO);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
byte[] protoBytes = new byte[4];
ByteBuffer.wrap(protoBytes).putInt(0x10000);
qp.setData(protoBytes);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
assertEquals(0, qp.getZxid());
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());
// Send the snapshot we created earlier
qp.setType(Leader.SNAP);
qp.setData(new byte[0]);
qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
oa.writeRecord(qp, null);
zkDb.serializeSnapshot(oa);
oa.writeString("BenWasHere", null);
Thread.sleep(10); //Give it some time to process the snap
//No Snapshot taken yet, the SNAP was applied in memory
verify(f.zk, never()).takeSnapshot();
qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
oa.writeRecord(qp, null);
// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());
//Make sure that we did take the snapshot now
verify(f.zk).takeSnapshot(true);
assertEquals(firstZxid, f.fzk.getLastProcessedZxid());
// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
long lastZxid = zkDb2.loadDataBase();
assertEquals("data1", new String(zkDb2.getData("/foo", stat, null)));
assertEquals(firstZxid, lastZxid);
// Propose an update
long proposalZxid = ZxidUtils.makeZxid(1, 1000);
proposeSetData(qp, proposalZxid, "data2", 2);
oa.writeRecord(qp, null);
TrackerWatcher watcher = new TrackerWatcher();
// The change should not have happened yet, since we haven't committed
assertEquals("data1", new String(f.fzk.getZKDatabase().getData("/foo", stat, watcher)));
// The change should happen now
qp.setType(Leader.COMMIT);
qp.setZxid(proposalZxid);
oa.writeRecord(qp, null);
qp.setType(Leader.UPTODATE);
qp.setZxid(0);
oa.writeRecord(qp, null);
// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(proposalZxid, qp.getZxid());
watcher.waitForChange();
assertEquals("data2", new String(f.fzk.getZKDatabase().getData("/foo", stat, null)));
// check and make sure the change is persisted
zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
lastZxid = zkDb2.loadDataBase();
assertEquals("data2", new String(zkDb2.getData("/foo", stat, null)));
assertEquals(proposalZxid, lastZxid);
} finally {
TestUtils.deleteFileRecursively(tmpDir);
}
}
private void proposeSetData(QuorumPacket qp, long zxid, String data, int version) throws IOException {
qp.setType(Leader.PROPOSAL);
qp.setZxid(zxid);
TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
SetDataTxn sdt = new SetDataTxn("/foo", data.getBytes(), version);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeRecord(hdr, null);
boa.writeRecord(sdt, null);
qp.setData(baos.toByteArray());
}
});
}
@Test
public void testNormalFollowerRunWithDiff() throws Exception {
testFollowerConversation(new FollowerConversation() {
@Override
public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
//Spy on ZK so we can check if a snapshot happened or not.
f.zk = spy(f.zk);
try {
assertEquals(0, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());
// Setup a database with a single /foo node
ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
final long firstZxid = ZxidUtils.makeZxid(1, 1);
zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
Stat stat = new Stat();
assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
QuorumPacket qp = new QuorumPacket();
readPacketSkippingPing(ia, qp);
assertEquals(Leader.FOLLOWERINFO, qp.getType());
assertEquals(qp.getZxid(), 0);
LearnerInfo learnInfo = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
assertEquals(learnInfo.getProtocolVersion(), 0x10000);
assertEquals(learnInfo.getServerid(), 0);
// We are simulating an established leader, so the epoch is 1
qp.setType(Leader.LEADERINFO);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
byte[] protoBytes = new byte[4];
ByteBuffer.wrap(protoBytes).putInt(0x10000);
qp.setData(protoBytes);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
assertEquals(0, qp.getZxid());
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(0, f.self.getCurrentEpoch());
// Send a diff
qp.setType(Leader.DIFF);
qp.setData(new byte[0]);
qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
oa.writeRecord(qp, null);
final long createSessionZxid = ZxidUtils.makeZxid(1, 2);
proposeNewSession(qp, createSessionZxid, 0x333);
oa.writeRecord(qp, null);
qp.setType(Leader.COMMIT);
qp.setZxid(createSessionZxid);
oa.writeRecord(qp, null);
qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
qp.setData(null);
oa.writeRecord(qp, null);
qp.setType(Leader.UPTODATE);
qp.setZxid(0);
oa.writeRecord(qp, null);
// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, f.self.getAcceptedEpoch());
assertEquals(1, f.self.getCurrentEpoch());
//Wait for the transactions to be written out. The thread that writes them out
// does not send anything back when it is done.
long start = System.currentTimeMillis();
while (createSessionZxid != f.fzk.getLastProcessedZxid()
&& (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
}
assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());
// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
start = System.currentTimeMillis();
zkDb2.loadDataBase();
while (zkDb2.getSessionWithTimeOuts().isEmpty() && (System.currentTimeMillis() - start) < 50) {
Thread.sleep(1);
zkDb2.loadDataBase();
}
LOG.info("zkdb2 sessions:{}", zkDb2.getSessions());
LOG.info("zkdb2 with timeouts:{}", zkDb2.getSessionWithTimeOuts());
assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
//Snapshot was never taken during very simple sync
verify(f.zk, never()).takeSnapshot();
} finally {
TestUtils.deleteFileRecursively(tmpDir);
}
}
private void proposeNewSession(QuorumPacket qp, long zxid, long sessionId) throws IOException {
qp.setType(Leader.PROPOSAL);
qp.setZxid(zxid);
TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.createSession);
CreateSessionTxn cst = new CreateSessionTxn(30000);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeRecord(hdr, null);
boa.writeRecord(cst, null);
qp.setData(baos.toByteArray());
}
});
}
@Test
public void testNormalRun() throws Exception {
testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException {
assertEquals(0, l.self.getAcceptedEpoch());
assertEquals(0, l.self.getCurrentEpoch());
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
assertEquals(1, l.self.getAcceptedEpoch());
assertEquals(0, l.self.getCurrentEpoch());
qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.DIFF, qp.getType());
readPacketSkippingPing(ia, qp);
assertEquals(Leader.NEWLEADER, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, l.self.getAcceptedEpoch());
assertCurrentEpochGotUpdated(1, l.self, ClientBase.CONNECTION_TIMEOUT);
qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.UPTODATE, qp.getType());
}
});
}
@Test
public void testTxnTimeout() throws Exception {
testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException, org.apache.zookeeper.server.quorum.Leader.XidRolloverException {
assertEquals(0, l.self.getAcceptedEpoch());
assertEquals(0, l.self.getCurrentEpoch());
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
assertEquals(1, l.self.getAcceptedEpoch());
assertEquals(0, l.self.getCurrentEpoch());
qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.DIFF, qp.getType());
readPacketSkippingPing(ia, qp);
assertEquals(Leader.NEWLEADER, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, l.self.getAcceptedEpoch());
assertCurrentEpochGotUpdated(1, l.self, ClientBase.CONNECTION_TIMEOUT);
qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.UPTODATE, qp.getType());
long zxid = l.zk.getZxid();
l.propose(new Request(1, 1, ZooDefs.OpCode.create, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.create), new CreateTxn("/test", "hola".getBytes(), null, true, 0), zxid));
readPacketSkippingPing(ia, qp);
assertEquals(Leader.PROPOSAL, qp.getType());
LOG.info("Proposal sent.");
for (int i = 0; i < (2 * ZabUtils.SYNC_LIMIT) + 2; i++) {
try {
ia.readRecord(qp, null);
LOG.info("Ping received: {}", i);
qp = new QuorumPacket(Leader.PING, qp.getZxid(), "".getBytes(), null);
oa.writeRecord(qp, null);
} catch (EOFException e) {
return;
}
}
fail("Connection hasn't been closed by leader after transaction times out.");
}
});
}
private void deserializeSnapshot(InputArchive ia) throws IOException {
ZKDatabase zkdb = new ZKDatabase(null);
zkdb.deserializeSnapshot(ia);
String signature = ia.readString("signature");
assertEquals("BenWasHere", signature);
}
@Test
public void testNormalObserverRun() throws Exception {
testObserverConversation(new ObserverConversation() {
@Override
public void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) throws Exception {
File tmpDir = File.createTempFile("test", "dir", testData);
tmpDir.delete();
tmpDir.mkdir();
File logDir = o.zk.getTxnLogFactory().getDataDir().getParentFile();
File snapDir = o.zk.getTxnLogFactory().getSnapDir().getParentFile();
try {
assertEquals(0, o.self.getAcceptedEpoch());
assertEquals(0, o.self.getCurrentEpoch());
// Setup a database with a single /foo node
ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
final long foo1Zxid = ZxidUtils.makeZxid(1, 1);
final long foo2Zxid = ZxidUtils.makeZxid(1, 2);
zkDb.processTxn(new TxnHeader(13, 1313, foo1Zxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo1", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
zkDb.processTxn(new TxnHeader(13, 1313, foo2Zxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo2", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1), null);
Stat stat = new Stat();
assertEquals("data1", new String(zkDb.getData("/foo1", stat, null)));
assertEquals("data1", new String(zkDb.getData("/foo2", stat, null)));
QuorumPacket qp = new QuorumPacket();
readPacketSkippingPing(ia, qp);
assertEquals(Leader.OBSERVERINFO, qp.getType());
assertEquals(qp.getZxid(), 0);
LearnerInfo learnInfo = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
assertEquals(learnInfo.getProtocolVersion(), 0x10000);
assertEquals(learnInfo.getServerid(), 0);
// We are simulating an established leader, so the epoch is 1
qp.setType(Leader.LEADERINFO);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
byte[] protoBytes = new byte[4];
ByteBuffer.wrap(protoBytes).putInt(0x10000);
qp.setData(protoBytes);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACKEPOCH, qp.getType());
assertEquals(0, qp.getZxid());
assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
assertEquals(1, o.self.getAcceptedEpoch());
assertEquals(0, o.self.getCurrentEpoch());
// Send the snapshot we created earlier
qp.setType(Leader.SNAP);
qp.setData(new byte[0]);
qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
oa.writeRecord(qp, null);
zkDb.serializeSnapshot(oa);
oa.writeString("BenWasHere", null);
qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
oa.writeRecord(qp, null);
// Get the ack of the new leader
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(1, o.self.getAcceptedEpoch());
assertEquals(1, o.self.getCurrentEpoch());
assertEquals(foo2Zxid, o.zk.getLastProcessedZxid());
// Make sure the data was recorded in the filesystem ok
ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
long lastZxid = zkDb2.loadDataBase();
assertEquals("data1", new String(zkDb2.getData("/foo1", stat, null)));
assertEquals(foo2Zxid, lastZxid);
// Register watch
TrackerWatcher watcher = new TrackerWatcher();
assertEquals("data1", new String(o.zk.getZKDatabase().getData("/foo2", stat, watcher)));
// Propose /foo1 update
long proposalZxid = ZxidUtils.makeZxid(1, 1000);
proposeSetData(qp, "/foo1", proposalZxid, "data2", 2);
oa.writeRecord(qp, null);
// Commit /foo1 update
qp.setType(Leader.COMMIT);
qp.setZxid(proposalZxid);
oa.writeRecord(qp, null);
// Inform /foo2 update
long informZxid = ZxidUtils.makeZxid(1, 1001);
proposeSetData(qp, "/foo2", informZxid, "data2", 2);
qp.setType(Leader.INFORM);
oa.writeRecord(qp, null);
qp.setType(Leader.UPTODATE);
qp.setZxid(0);
oa.writeRecord(qp, null);
// Read the uptodate ack
readPacketSkippingPing(ia, qp);
assertEquals(Leader.ACK, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
// Data should get updated
watcher.waitForChange();
assertEquals("data2", new String(o.zk.getZKDatabase().getData("/foo1", stat, null)));
assertEquals("data2", new String(o.zk.getZKDatabase().getData("/foo2", stat, null)));
// Shutdown sequence guarantee that all pending requests
// in sync request processor get flush to disk
o.zk.shutdown();
zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
lastZxid = zkDb2.loadDataBase();
assertEquals("data2", new String(zkDb2.getData("/foo1", stat, null)));
assertEquals("data2", new String(zkDb2.getData("/foo2", stat, null)));
assertEquals(informZxid, lastZxid);
} finally {
TestUtils.deleteFileRecursively(tmpDir);
}
}
private void proposeSetData(QuorumPacket qp, String path, long zxid, String data, int version) throws IOException {
qp.setType(Leader.PROPOSAL);
qp.setZxid(zxid);
TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.setData);
SetDataTxn sdt = new SetDataTxn(path, data.getBytes(), version);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeRecord(hdr, null);
boa.writeRecord(sdt, null);
qp.setData(baos.toByteArray());
}
});
}
@Test
public void testLeaderBehind() throws Exception {
testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException {
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
/* we are going to say we last acked epoch 20 */
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.DIFF, qp.getType());
readPacketSkippingPing(ia, qp);
assertEquals(Leader.NEWLEADER, qp.getType());
assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.UPTODATE, qp.getType());
}
});
}
/**
* Tests that when a quorum of followers send LearnerInfo but do not ack the epoch (which is sent
* by the leader upon receipt of LearnerInfo from a quorum), the leader does not start using this epoch
* as it would in the normal case (when a quorum do ack the epoch). This tests ZK-1192
* @throws Exception
*/
@Test
public void testAbandonBeforeACKEpoch() throws Exception {
testLeaderConversation(new LeaderConversation() {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException {
/* we test a normal run. everything should work out well. */
LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
byte[] liBytes = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li, ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null);
oa.writeRecord(qp, null);
readPacketSkippingPing(ia, qp);
assertEquals(Leader.LEADERINFO, qp.getType());
assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000);
Thread.sleep(l.self.getInitLimit() * l.self.getTickTime() + 5000);
// The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced
assertEquals(0, l.self.getCurrentEpoch());
}
});
}
static class ConversableFollower extends Follower {
ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {
super(self, zk);
}
QuorumServer leaderQuorumServer;
public void setLeaderQuorumServer(QuorumServer quorumServer) {
leaderQuorumServer = quorumServer;
}
@Override
protected QuorumServer findLeader() {
return leaderQuorumServer;
}
}
private ConversableFollower createFollower(File tmpDir, QuorumPeer peer) throws IOException {
FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
peer.setTxnFactory(logFactory);
ZKDatabase zkDb = new ZKDatabase(logFactory);
FollowerZooKeeperServer zk = new FollowerZooKeeperServer(logFactory, peer, zkDb);
peer.setZKDatabase(zkDb);
return new ConversableFollower(peer, zk);
}
static class ConversableObserver extends Observer {
ConversableObserver(QuorumPeer self, ObserverZooKeeperServer zk) {
super(self, zk);
}
QuorumServer leaderQuorumServer;
public void setLeaderQuorumServer(QuorumServer quorumServer) {
leaderQuorumServer = quorumServer;
}
@Override
protected QuorumServer findLeader() {
return leaderQuorumServer;
}
}
private ConversableObserver createObserver(File tmpDir, QuorumPeer peer) throws IOException {
FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
peer.setTxnFactory(logFactory);
ZKDatabase zkDb = new ZKDatabase(logFactory);
ObserverZooKeeperServer zk = new ObserverZooKeeperServer(logFactory, peer, zkDb);
peer.setZKDatabase(zkDb);
return new ConversableObserver(peer, zk);
}
private String readContentsOfFile(File f) throws IOException {
return new BufferedReader(new FileReader(f)).readLine();
}
@Test
public void testInitialAcceptedCurrent() throws Exception {
File tmpDir = File.createTempFile("test", ".dir", testData);
tmpDir.delete();
tmpDir.mkdir();
try {
FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
File version2 = new File(tmpDir, "version-2");
version2.mkdir();
logFactory.save(new DataTree(), new ConcurrentHashMap<Long, Integer>(), false);
long zxid = ZxidUtils.makeZxid(3, 3);
logFactory.append(new Request(1, 1, ZooDefs.OpCode.error, new TxnHeader(1, 1, zxid, 1, ZooDefs.OpCode.error), new ErrorTxn(1), zxid));
logFactory.commit();
ZKDatabase zkDb = new ZKDatabase(logFactory);
QuorumPeer peer = QuorumPeer.testingQuorumPeer();
peer.setZKDatabase(zkDb);
peer.setTxnFactory(logFactory);
peer.getLastLoggedZxid();
assertEquals(3, peer.getAcceptedEpoch());
assertEquals(3, peer.getCurrentEpoch());
assertEquals(3, Integer.parseInt(readContentsOfFile(new File(version2, QuorumPeer.CURRENT_EPOCH_FILENAME))));
assertEquals(3, Integer.parseInt(readContentsOfFile(new File(version2, QuorumPeer.ACCEPTED_EPOCH_FILENAME))));
} finally {
TestUtils.deleteFileRecursively(tmpDir);
}
}
/*
* Epoch is first written to file then updated in memory. Give some time to
* write the epoch in file and then go for assert.
*/
private void assertCurrentEpochGotUpdated(int expected, QuorumPeer self, long timeout)
throws IOException {
long elapsedTime = 0;
long waitInterval = 10;
while (self.getCurrentEpoch() != expected && elapsedTime < timeout) {
try {
Thread.sleep(waitInterval);
} catch (InterruptedException e) {
fail("CurrentEpoch update failed");
}
elapsedTime = elapsedTime + waitInterval;
}
assertEquals(expected, self.getCurrentEpoch(), "CurrentEpoch update failed");
}
}