blob: 73470b8cadad8077d240a5aee4bfde70ffb16b78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.coord.zk;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.drill.common.collections.ImmutableEntry;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ZookeeperTestUtil;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.test.BaseTest;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestZookeeperClient extends BaseTest {
private final static String root = "/test";
private final static String path = "test-key";
private final static String abspath = PathUtils.join(root, path);
private final static byte[] data = "testing".getBytes(StandardCharsets.UTF_8);
private final static CreateMode mode = CreateMode.PERSISTENT;
private TestingServer server;
private CuratorFramework curator;
private ZookeeperClient client;
private static class ClientWithMockCache extends ZookeeperClient {
private final PathChildrenCache cacheMock = Mockito.mock(PathChildrenCache.class);
ClientWithMockCache(final CuratorFramework curator, final String root, final CreateMode mode) {
super(curator, root, mode);
}
@Override
public PathChildrenCache getCache() {
return cacheMock;
}
}
@Before
public void setUp() throws Exception {
ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
server = new TestingServer();
final RetryPolicy policy = new RetryNTimes(1, 1000);
curator = CuratorFrameworkFactory.newClient(server.getConnectString(), policy);
client = new ClientWithMockCache(curator, root, mode);
server.start();
curator.start();
client.start();
}
@After
public void tearDown() throws Exception {
client.close();
curator.close();
server.close();
}
@Test
public void testStartingClientEnablesCacheAndEnsuresRootNodeExists() throws Exception {
Assert.assertTrue("start must create the root node", client.hasPath("", true));
Mockito
.verify(client.getCache())
.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
}
@Test
public void testHasPathWithEventualConsistencyHitsCache() {
final String path = "test-key";
final String absPath = PathUtils.join(root, path);
Mockito
.when(client.getCache().getCurrentData(absPath))
.thenReturn(null);
Assert.assertFalse(client.hasPath(path)); // test convenience method
Mockito
.when(client.getCache().getCurrentData(absPath))
.thenReturn(new ChildData(absPath, null, null));
Assert.assertTrue(client.hasPath(path, false)); // test actual method
}
@Test(expected = DrillRuntimeException.class)
public void testHasPathThrowsDrillRuntimeException() {
final String path = "test-key";
final String absPath = PathUtils.join(root, path);
Mockito
.when(client.getCache().getCurrentData(absPath))
.thenThrow(RuntimeException.class);
client.hasPath(path);
}
@Test
public void testHasPathTrueWithVersion() {
client.put(path, data);
DataChangeVersion version0 = new DataChangeVersion();
assertTrue(client.hasPath(path, true, version0));
assertEquals("Versions should match", 0, version0.getVersion());
client.put(path, data);
DataChangeVersion version1 = new DataChangeVersion();
assertTrue(client.hasPath(path, true, version1));
assertEquals("Versions should match", 1, version1.getVersion());
}
@Test
public void testHasPathFalseWithVersion() {
DataChangeVersion version0 = new DataChangeVersion();
version0.setVersion(-1);
assertFalse(client.hasPath("unknown_path", true, version0));
assertEquals("Versions should not have changed", -1, version0.getVersion());
}
@Test
public void testPutAndGetWorks() {
client.put(path, data);
final byte[] actual = client.get(path, true);
Assert.assertArrayEquals("data mismatch", data, actual);
}
@Test
public void testGetWithEventualConsistencyHitsCache() {
Mockito
.when(client.getCache().getCurrentData(abspath))
.thenReturn(null);
assertNull("get should return null", client.get(path));
Mockito
.when(client.getCache().getCurrentData(abspath))
.thenReturn(new ChildData(abspath, null, data));
assertEquals("get should return data", data, client.get(path, false));
}
@Test
public void testCreate() throws Exception {
client.create(path);
Assert.assertTrue("path must exist", client.hasPath(path, true));
// ensure invoking create also rebuilds cache
Mockito
.verify(client.getCache(), Mockito.times(1))
.rebuildNode(abspath);
}
@Test
public void testDelete() throws Exception {
client.create(path);
Assert.assertTrue("path must exist", client.hasPath(path, true));
client.delete(path);
Assert.assertFalse("path must not exist", client.hasPath(path, true));
// ensure cache is rebuilt
Mockito
.verify(client.getCache(), Mockito.times(2))
.rebuildNode(abspath);
}
@Test
public void testEntriesReturnsRelativePaths() {
final ChildData child = Mockito.mock(ChildData.class);
Mockito
.when(child.getPath())
.thenReturn(abspath);
Mockito
.when(child.getData())
.thenReturn(data);
final List<ChildData> children = Lists.newArrayList(child);
Mockito
.when(client.getCache().getCurrentData())
.thenReturn(children);
final Iterator<Map.Entry<String, byte[]>> entries = client.entries();
// returned entry must contain the given relative path
final Map.Entry<String, byte[]> expected = new ImmutableEntry<>(path, data);
assertEquals("entries do not match", expected, entries.next());
}
@Test
public void testGetWithVersion() {
client.put(path, data);
DataChangeVersion version0 = new DataChangeVersion();
client.get(path, version0);
assertEquals("Versions should match", 0, version0.getVersion());
client.put(path, data);
DataChangeVersion version1 = new DataChangeVersion();
client.get(path, version1);
assertEquals("Versions should match", 1, version1.getVersion());
}
@Test
public void testPutWithMatchingVersion() {
client.put(path, data);
DataChangeVersion version = new DataChangeVersion();
client.get(path, version);
client.put(path, data, version);
}
@Test (expected = VersionMismatchException.class)
public void testPutWithNonMatchingVersion() {
client.put(path, data);
DataChangeVersion version = new DataChangeVersion();
version.setVersion(123);
client.put(path, data, version);
}
@Test
public void testPutIfAbsentWhenAbsent() {
assertNull(client.putIfAbsent(path, data));
}
@Test
public void testPutIfAbsentWhenPresent() {
client.putIfAbsent(path, data);
assertEquals("Data should match", new String(data), new String(client.putIfAbsent(path, "new_data".getBytes(StandardCharsets.UTF_8))));
}
}