| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zookeeper.server; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotEquals; |
| import static org.junit.Assert.assertTrue; |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.lang.reflect.Field; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.jute.BinaryInputArchive; |
| import org.apache.jute.BinaryOutputArchive; |
| import org.apache.jute.Record; |
| import org.apache.zookeeper.KeeperException.NoNodeException; |
| import org.apache.zookeeper.KeeperException.NodeExistsException; |
| import org.apache.zookeeper.Quotas; |
| import org.apache.zookeeper.ZKTestCase; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.common.PathTrie; |
| import org.apache.zookeeper.data.Stat; |
| import org.apache.zookeeper.metrics.MetricsUtils; |
| import org.apache.zookeeper.txn.CreateTxn; |
| import org.apache.zookeeper.txn.TxnHeader; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class DataTreeTest extends ZKTestCase { |
| |
| protected static final Logger LOG = LoggerFactory.getLogger(DataTreeTest.class); |
| |
| /** |
| * For ZOOKEEPER-1755 - Test race condition when taking dumpEphemerals and |
| * removing the session related ephemerals from DataTree structure |
| */ |
| @Test(timeout = 60000) |
| public void testDumpEphemerals() throws Exception { |
| int count = 1000; |
| long session = 1000; |
| long zxid = 2000; |
| final DataTree dataTree = new DataTree(); |
| LOG.info("Create {} zkclient sessions and its ephemeral nodes", count); |
| createEphemeralNode(session, dataTree, count); |
| final AtomicBoolean exceptionDuringDumpEphemerals = new AtomicBoolean(false); |
| final AtomicBoolean running = new AtomicBoolean(true); |
| Thread thread = new Thread() { |
| public void run() { |
| PrintWriter pwriter = new PrintWriter(new StringWriter()); |
| try { |
| while (running.get()) { |
| dataTree.dumpEphemerals(pwriter); |
| } |
| } catch (Exception e) { |
| LOG.error("Received exception while dumpEphemerals!", e); |
| exceptionDuringDumpEphemerals.set(true); |
| } |
| } |
| }; |
| thread.start(); |
| LOG.debug("Killing {} zkclient sessions and its ephemeral nodes", count); |
| killZkClientSession(session, zxid, dataTree, count); |
| running.set(false); |
| thread.join(); |
| assertFalse("Should have got exception while dumpEphemerals!", exceptionDuringDumpEphemerals.get()); |
| } |
| |
| private void killZkClientSession(long session, long zxid, final DataTree dataTree, int count) { |
| for (int i = 0; i < count; i++) { |
| dataTree.killSession(session + i, zxid); |
| } |
| } |
| |
| private void createEphemeralNode(long session, final DataTree dataTree, int count) throws NoNodeException, NodeExistsException { |
| for (int i = 0; i < count; i++) { |
| dataTree.createNode("/test" + i, new byte[0], null, session + i, dataTree.getNode("/").stat.getCversion() |
| + 1, 1, 1); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testRootWatchTriggered() throws Exception { |
| DataTree dt = new DataTree(); |
| |
| CompletableFuture<Void> fire = new CompletableFuture<>(); |
| // set a watch on the root node |
| dt.getChildren("/", new Stat(), event -> { |
| if (event.getPath().equals("/")) { |
| fire.complete(null); |
| } |
| }); |
| |
| // add a new node, should trigger a watch |
| dt.createNode("/xyz", new byte[0], null, 0, dt.getNode("/").stat.getCversion() + 1, 1, 1); |
| |
| assertTrue("Root node watch not triggered", fire.isDone()); |
| } |
| |
| /** |
| * For ZOOKEEPER-1046 test if cversion is getting incremented correctly. |
| */ |
| @Test(timeout = 60000) |
| public void testIncrementCversion() throws Exception { |
| try { |
| // digestCalculator gets initialized for the new DataTree constructor based on the system property |
| ZooKeeperServer.setDigestEnabled(true); |
| DataTree dt = new DataTree(); |
| dt.createNode("/test", new byte[0], null, 0, dt.getNode("/").stat.getCversion() + 1, 1, 1); |
| DataNode zk = dt.getNode("/test"); |
| int prevCversion = zk.stat.getCversion(); |
| long prevPzxid = zk.stat.getPzxid(); |
| long digestBefore = dt.getTreeDigest(); |
| dt.setCversionPzxid("/test/", prevCversion + 1, prevPzxid + 1); |
| int newCversion = zk.stat.getCversion(); |
| long newPzxid = zk.stat.getPzxid(); |
| assertTrue("<cversion, pzxid> verification failed. Expected: <" |
| + (prevCversion + 1) |
| + ", " |
| + (prevPzxid + 1) |
| + ">, found: <" |
| + newCversion |
| + ", " |
| + newPzxid |
| + ">", (newCversion == prevCversion + 1 && newPzxid == prevPzxid + 1)); |
| assertNotEquals(digestBefore, dt.getTreeDigest()); |
| } finally { |
| ZooKeeperServer.setDigestEnabled(false); |
| } |
| } |
| |
| @Test |
| public void testNoCversionRevert() throws Exception { |
| DataTree dt = new DataTree(); |
| DataNode parent = dt.getNode("/"); |
| dt.createNode("/test", new byte[0], null, 0, parent.stat.getCversion() + 1, 1, 1); |
| int currentCversion = parent.stat.getCversion(); |
| long currentPzxid = parent.stat.getPzxid(); |
| dt.createNode("/test1", new byte[0], null, 0, currentCversion - 1, 1, 1); |
| parent = dt.getNode("/"); |
| int newCversion = parent.stat.getCversion(); |
| long newPzxid = parent.stat.getPzxid(); |
| assertTrue("<cversion, pzxid> verification failed. Expected: <" |
| + currentCversion |
| + ", " |
| + currentPzxid |
| + ">, found: <" |
| + newCversion |
| + ", " |
| + newPzxid |
| + ">", (newCversion >= currentCversion && newPzxid >= currentPzxid)); |
| } |
| |
| @Test |
| public void testPzxidUpdatedWhenDeletingNonExistNode() throws Exception { |
| DataTree dt = new DataTree(); |
| DataNode root = dt.getNode("/"); |
| long currentPzxid = root.stat.getPzxid(); |
| |
| // pzxid updated with deleteNode on higher zxid |
| long zxid = currentPzxid + 1; |
| try { |
| dt.deleteNode("/testPzxidUpdatedWhenDeletingNonExistNode", zxid); |
| } catch (NoNodeException e) { /* expected */ } |
| root = dt.getNode("/"); |
| currentPzxid = root.stat.getPzxid(); |
| assertEquals(currentPzxid, zxid); |
| |
| // pzxid not updated with smaller zxid |
| long prevPzxid = currentPzxid; |
| zxid = prevPzxid - 1; |
| try { |
| dt.deleteNode("/testPzxidUpdatedWhenDeletingNonExistNode", zxid); |
| } catch (NoNodeException e) { /* expected */ } |
| root = dt.getNode("/"); |
| currentPzxid = root.stat.getPzxid(); |
| assertEquals(currentPzxid, prevPzxid); |
| } |
| |
| @Test |
| public void testDigestUpdatedWhenReplayCreateTxnForExistNode() { |
| try { |
| // digestCalculator gets initialized for the new DataTree constructor based on the system property |
| ZooKeeperServer.setDigestEnabled(true); |
| DataTree dt = new DataTree(); |
| |
| dt.processTxn(new TxnHeader(13, 1000, 1, 30, ZooDefs.OpCode.create), new CreateTxn("/foo", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1)); |
| |
| // create the same node with a higher cversion to simulate the |
| // scenario when replaying a create txn for an existing node due |
| // to fuzzy snapshot |
| dt.processTxn(new TxnHeader(13, 1000, 1, 30, ZooDefs.OpCode.create), new CreateTxn("/foo", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 2)); |
| |
| // check the current digest value |
| assertEquals(dt.getTreeDigest(), dt.getLastProcessedZxidDigest().digest); |
| } finally { |
| ZooKeeperServer.setDigestEnabled(false); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testPathTrieClearOnDeserialize() throws Exception { |
| |
| //Create a DataTree with quota nodes so PathTrie get updated |
| DataTree dserTree = new DataTree(); |
| |
| dserTree.createNode("/bug", new byte[20], null, -1, 1, 1, 1); |
| dserTree.createNode(Quotas.quotaZookeeper + "/bug", null, null, -1, 1, 1, 1); |
| dserTree.createNode(Quotas.quotaPath("/bug"), new byte[20], null, -1, 1, 1, 1); |
| dserTree.createNode(Quotas.statPath("/bug"), new byte[20], null, -1, 1, 1, 1); |
| |
| //deserialize a DataTree; this should clear the old /bug nodes and pathTrie |
| DataTree tree = new DataTree(); |
| |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); |
| tree.serialize(oa, "test"); |
| baos.flush(); |
| |
| ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); |
| BinaryInputArchive ia = BinaryInputArchive.getArchive(bais); |
| dserTree.deserialize(ia, "test"); |
| |
| Field pfield = DataTree.class.getDeclaredField("pTrie"); |
| pfield.setAccessible(true); |
| PathTrie pTrie = (PathTrie) pfield.get(dserTree); |
| |
| //Check that the node path is removed from pTrie |
| assertEquals("/bug is still in pTrie", "/", pTrie.findMaxPrefix("/bug")); |
| } |
| |
| /* |
| * ZOOKEEPER-2201 - OutputArchive.writeRecord can block for long periods of |
| * time, we must call it outside of the node lock. |
| * We call tree.serialize, which calls our modified writeRecord method that |
| * blocks until it can verify that a separate thread can lock the DataNode |
| * currently being written, i.e. that DataTree.serializeNode does not hold |
| * the DataNode lock while calling OutputArchive.writeRecord. |
| */ |
| @Test(timeout = 60000) |
| public void testSerializeDoesntLockDataNodeWhileWriting() throws Exception { |
| DataTree tree = new DataTree(); |
| tree.createNode("/marker", new byte[]{42}, null, -1, 1, 1, 1); |
| final DataNode markerNode = tree.getNode("/marker"); |
| final AtomicBoolean ranTestCase = new AtomicBoolean(); |
| DataOutputStream out = new DataOutputStream(new ByteArrayOutputStream()); |
| BinaryOutputArchive oa = new BinaryOutputArchive(out) { |
| @Override |
| public void writeRecord(Record r, String tag) throws IOException { |
| // Need check if the record is a DataNode instance because of changes in ZOOKEEPER-2014 |
| // which adds default ACL to config node. |
| if (r instanceof DataNode) { |
| DataNode node = (DataNode) r; |
| if (node.data.length == 1 && node.data[0] == 42) { |
| final Semaphore semaphore = new Semaphore(0); |
| new Thread(new Runnable() { |
| @Override |
| public void run() { |
| synchronized (markerNode) { |
| //When we lock markerNode, allow writeRecord to continue |
| semaphore.release(); |
| } |
| } |
| }).start(); |
| |
| try { |
| boolean acquired = semaphore.tryAcquire(30, TimeUnit.SECONDS); |
| //This is the real assertion - could another thread lock |
| //the DataNode we're currently writing |
| assertTrue("Couldn't acquire a lock on the DataNode while we were calling tree.serialize", acquired); |
| } catch (InterruptedException e1) { |
| throw new RuntimeException(e1); |
| } |
| ranTestCase.set(true); |
| } |
| } |
| |
| super.writeRecord(r, tag); |
| } |
| }; |
| |
| tree.serialize(oa, "test"); |
| |
| //Let's make sure that we hit the code that ran the real assertion above |
| assertTrue("Didn't find the expected node", ranTestCase.get()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testReconfigACLClearOnDeserialize() throws Exception { |
| |
| DataTree tree = new DataTree(); |
| // simulate the upgrading scenario, where the reconfig znode |
| // doesn't exist and the acl cache is empty |
| tree.deleteNode(ZooDefs.CONFIG_NODE, 1); |
| tree.getReferenceCountedAclCache().aclIndex = 0; |
| |
| assertEquals("expected to have 1 acl in acl cache map", 0, tree.aclCacheSize()); |
| |
| // serialize the data with one znode with acl |
| tree.createNode("/bug", new byte[20], ZooDefs.Ids.OPEN_ACL_UNSAFE, -1, 1, 1, 1); |
| |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos); |
| tree.serialize(oa, "test"); |
| baos.flush(); |
| |
| ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); |
| BinaryInputArchive ia = BinaryInputArchive.getArchive(bais); |
| tree.deserialize(ia, "test"); |
| |
| assertEquals("expected to have 1 acl in acl cache map", 1, tree.aclCacheSize()); |
| assertEquals("expected to have the same acl", ZooDefs.Ids.OPEN_ACL_UNSAFE, tree.getACL("/bug", new Stat())); |
| |
| // simulate the upgrading case where the config node will be created |
| // again after leader election |
| tree.addConfigNode(); |
| |
| assertEquals("expected to have 2 acl in acl cache map", 2, tree.aclCacheSize()); |
| assertEquals("expected to have the same acl", ZooDefs.Ids.OPEN_ACL_UNSAFE, tree.getACL("/bug", new Stat())); |
| } |
| |
| @Test |
| public void testCachedApproximateDataSize() throws Exception { |
| DataTree dt = new DataTree(); |
| long initialSize = dt.approximateDataSize(); |
| assertEquals(dt.cachedApproximateDataSize(), dt.approximateDataSize()); |
| |
| // create a node |
| dt.createNode("/testApproximateDataSize", new byte[20], null, -1, 1, 1, 1); |
| dt.createNode("/testApproximateDataSize1", new byte[20], null, -1, 1, 1, 1); |
| assertEquals(dt.cachedApproximateDataSize(), dt.approximateDataSize()); |
| |
| // update data |
| dt.setData("/testApproximateDataSize1", new byte[32], -1, 1, 1); |
| assertEquals(dt.cachedApproximateDataSize(), dt.approximateDataSize()); |
| |
| // delete a node |
| dt.deleteNode("/testApproximateDataSize", -1); |
| assertEquals(dt.cachedApproximateDataSize(), dt.approximateDataSize()); |
| } |
| |
| @Test |
| public void testGetAllChildrenNumber() throws Exception { |
| DataTree dt = new DataTree(); |
| // create a node |
| dt.createNode("/all_children_test", new byte[20], null, -1, 1, 1, 1); |
| dt.createNode("/all_children_test/nodes", new byte[20], null, -1, 1, 1, 1); |
| dt.createNode("/all_children_test/nodes/node1", new byte[20], null, -1, 1, 1, 1); |
| dt.createNode("/all_children_test/nodes/node2", new byte[20], null, -1, 1, 1, 1); |
| dt.createNode("/all_children_test/nodes/node3", new byte[20], null, -1, 1, 1, 1); |
| assertEquals(4, dt.getAllChildrenNumber("/all_children_test")); |
| assertEquals(3, dt.getAllChildrenNumber("/all_children_test/nodes")); |
| assertEquals(0, dt.getAllChildrenNumber("/all_children_test/nodes/node1")); |
| //add these three init nodes:/zookeeper,/zookeeper/quota,/zookeeper/config,so the number is 8. |
| assertEquals(8, dt.getAllChildrenNumber("/")); |
| } |
| |
| @Test |
| public void testDataTreeMetrics() throws Exception { |
| ServerMetrics.getMetrics().resetAll(); |
| |
| long readBytes1 = 0; |
| long readBytes2 = 0; |
| long writeBytes1 = 0; |
| long writeBytes2 = 0; |
| |
| final String TOP1 = "top1"; |
| final String TOP2 = "ttop2"; |
| final String TOP1PATH = "/" + TOP1; |
| final String TOP2PATH = "/" + TOP2; |
| final String CHILD1 = "child1"; |
| final String CHILD2 = "springishere"; |
| final String CHILD1PATH = TOP1PATH + "/" + CHILD1; |
| final String CHILD2PATH = TOP1PATH + "/" + CHILD2; |
| |
| final int TOP2_LEN = 50; |
| final int CHILD1_LEN = 100; |
| final int CHILD2_LEN = 250; |
| |
| DataTree dt = new DataTree(); |
| dt.createNode(TOP1PATH, null, null, -1, 1, 1, 1); |
| writeBytes1 += TOP1PATH.length(); |
| dt.createNode(TOP2PATH, new byte[TOP2_LEN], null, -1, 1, 1, 1); |
| writeBytes2 += TOP2PATH.length() + TOP2_LEN; |
| dt.createNode(CHILD1PATH, null, null, -1, 1, 1, 1); |
| writeBytes1 += CHILD1PATH.length(); |
| dt.setData(CHILD1PATH, new byte[CHILD1_LEN], 1, -1, 1); |
| writeBytes1 += CHILD1PATH.length() + CHILD1_LEN; |
| dt.createNode(CHILD2PATH, new byte[CHILD2_LEN], null, -1, 1, 1, 1); |
| writeBytes1 += CHILD2PATH.length() + CHILD2_LEN; |
| dt.getData(TOP1PATH, new Stat(), null); |
| readBytes1 += TOP1PATH.length() + DataTree.STAT_OVERHEAD_BYTES; |
| dt.getData(TOP2PATH, new Stat(), null); |
| readBytes2 += TOP2PATH.length() + TOP2_LEN + DataTree.STAT_OVERHEAD_BYTES; |
| dt.statNode(CHILD2PATH, null); |
| readBytes1 += CHILD2PATH.length() + DataTree.STAT_OVERHEAD_BYTES; |
| dt.getChildren(TOP1PATH, new Stat(), null); |
| readBytes1 += TOP1PATH.length() + CHILD1.length() + CHILD2.length() + DataTree.STAT_OVERHEAD_BYTES; |
| dt.deleteNode(TOP1PATH, 1); |
| writeBytes1 += TOP1PATH.length(); |
| |
| Map<String, Object> values = MetricsUtils.currentServerMetrics(); |
| System.out.println("values:" + values); |
| assertEquals(writeBytes1, values.get("sum_" + TOP1 + "_write_per_namespace")); |
| assertEquals(5L, values.get("cnt_" + TOP1 + "_write_per_namespace")); |
| assertEquals(writeBytes2, values.get("sum_" + TOP2 + "_write_per_namespace")); |
| assertEquals(1L, values.get("cnt_" + TOP2 + "_write_per_namespace")); |
| |
| assertEquals(readBytes1, values.get("sum_" + TOP1 + "_read_per_namespace")); |
| assertEquals(3L, values.get("cnt_" + TOP1 + "_read_per_namespace")); |
| assertEquals(readBytes2, values.get("sum_" + TOP2 + "_read_per_namespace")); |
| assertEquals(1L, values.get("cnt_" + TOP2 + "_read_per_namespace")); |
| } |
| |
| /** |
| * Test digest with general ops in DataTree, check that digest are |
| * updated when call different ops. |
| */ |
| @Test |
| public void testDigest() throws Exception { |
| try { |
| // enable diegst check |
| ZooKeeperServer.setDigestEnabled(true); |
| |
| DataTree dt = new DataTree(); |
| |
| // create a node and check the digest is updated |
| long previousDigest = dt.getTreeDigest(); |
| dt.createNode("/digesttest", new byte[0], null, -1, 1, 1, 1); |
| assertNotEquals(dt.getTreeDigest(), previousDigest); |
| |
| // create a child and check the digest is updated |
| previousDigest = dt.getTreeDigest(); |
| dt.createNode("/digesttest/1", "1".getBytes(), null, -1, 2, 2, 2); |
| assertNotEquals(dt.getTreeDigest(), previousDigest); |
| |
| // check the digest is not chhanged when creating the same node |
| previousDigest = dt.getTreeDigest(); |
| try { |
| dt.createNode("/digesttest/1", "1".getBytes(), null, -1, 2, 2, 2); |
| } catch (NodeExistsException e) { /* ignore */ } |
| assertEquals(dt.getTreeDigest(), previousDigest); |
| |
| // check digest with updated data |
| previousDigest = dt.getTreeDigest(); |
| dt.setData("/digesttest/1", "2".getBytes(), 3, 3, 3); |
| assertNotEquals(dt.getTreeDigest(), previousDigest); |
| |
| // check digest with deleted node |
| previousDigest = dt.getTreeDigest(); |
| dt.deleteNode("/digesttest/1", 5); |
| assertNotEquals(dt.getTreeDigest(), previousDigest); |
| } finally { |
| ZooKeeperServer.setDigestEnabled(false); |
| } |
| } |
| |
| } |