| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.zk; |
| |
| import com.google.common.collect.ImmutableMap; |
| import java.lang.reflect.Field; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.function.BooleanSupplier; |
| import com.google.common.collect.ImmutableList; |
| import org.I0Itec.zkclient.IZkDataListener; |
| import org.I0Itec.zkclient.ZkClient; |
| import org.I0Itec.zkclient.ZkConnection; |
| import org.I0Itec.zkclient.exception.ZkInterruptedException; |
| import org.I0Itec.zkclient.exception.ZkNodeExistsException; |
| import org.apache.commons.lang3.reflect.FieldUtils; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.container.TaskName; |
| import org.apache.samza.job.model.ContainerModel; |
| import org.apache.samza.job.model.JobModel; |
| import org.apache.samza.runtime.LocationId; |
| import org.apache.samza.testUtils.EmbeddedZookeeper; |
| import org.apache.samza.util.NoOpMetricsRegistry; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.rules.Timeout; |
| import org.mockito.Mockito; |
| |
| public class TestZkUtils { |
| private static EmbeddedZookeeper zkServer = null; |
| private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); |
| private ZkClient zkClient = null; |
| private static final int SESSION_TIMEOUT_MS = 5000; |
| private static final int CONNECTION_TIMEOUT_MS = 10000; |
| private ZkUtils zkUtils; |
| |
| @Rule |
| // Declared public to honor junit contract. |
| public final ExpectedException expectedException = ExpectedException.none(); |
| |
| @Rule |
| public Timeout testTimeOutInMillis = new Timeout(120000); |
| |
| @Before |
| public void testSetup() { |
| try { |
| zkServer = new EmbeddedZookeeper(); |
| zkServer.setup(); |
| zkClient = new ZkClient( |
| new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS), |
| CONNECTION_TIMEOUT_MS); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail("Client connection setup failed. Aborting tests.."); |
| } |
| try { |
| zkClient.createPersistent(KEY_BUILDER.getProcessorsPath(), true); |
| } catch (ZkNodeExistsException e) { |
| // Do nothing |
| } |
| |
| zkUtils = getZkUtils(); |
| zkUtils.connect(); |
| } |
| |
| @After |
| public void testTeardown() { |
| if (zkClient != null) { |
| try { |
| zkUtils.close(); |
| } finally { |
| zkServer.teardown(); |
| } |
| } |
| } |
| |
| private ZkUtils getZkUtils() { |
| return new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, |
| SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); |
| } |
| |
| @Test |
| public void testRegisterProcessorId() { |
| String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")); |
| Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath())); |
| |
| // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid |
| Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath)); |
| } |
| |
| @Test |
| public void testGetActiveProcessors() { |
| Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsZnodes().size()); |
| zkUtils.registerProcessorAndGetId(new ProcessorData("processorData", "1")); |
| Assert.assertEquals(1, zkUtils.getSortedActiveProcessorsZnodes().size()); |
| } |
| |
| @Test |
| public void testGetActiveProcessorIdShouldReturnEmptyForNonExistingZookeeperNodes() { |
| List<String> processorsIDs = zkUtils.getActiveProcessorsIDs(ImmutableList.of("node1", "node2")); |
| |
| Assert.assertEquals(0, processorsIDs.size()); |
| } |
| |
| |
| @Test |
| public void testReadAfterWriteTaskLocality() { |
| zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1")); |
| zkUtils.writeTaskLocality(new TaskName("task-2"), new LocationId("LocationId-2")); |
| |
| Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1"), |
| new TaskName("task-2"), new LocationId("LocationId-2")); |
| |
| Assert.assertEquals(taskLocality, zkUtils.readTaskLocality()); |
| } |
| |
| @Test |
| public void testReadWhenTaskLocalityDoesNotExist() { |
| Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality(); |
| |
| Assert.assertEquals(0, taskLocality.size()); |
| } |
| |
| @Test |
| public void testWriteTaskLocalityShouldUpdateTheExistingValue() { |
| zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1")); |
| |
| Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1")); |
| Assert.assertEquals(taskLocality, zkUtils.readTaskLocality()); |
| |
| zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-2")); |
| |
| taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-2")); |
| Assert.assertEquals(taskLocality, zkUtils.readTaskLocality()); |
| } |
| |
| @Test |
| public void testReadTaskLocalityShouldReturnAllTheExistingLocalityValue() { |
| zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1")); |
| zkUtils.writeTaskLocality(new TaskName("task-2"), new LocationId("LocationId-2")); |
| zkUtils.writeTaskLocality(new TaskName("task-3"), new LocationId("LocationId-3")); |
| zkUtils.writeTaskLocality(new TaskName("task-4"), new LocationId("LocationId-4")); |
| zkUtils.writeTaskLocality(new TaskName("task-5"), new LocationId("LocationId-5")); |
| |
| Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1"), |
| new TaskName("task-2"), new LocationId("LocationId-2"), |
| new TaskName("task-3"), new LocationId("LocationId-3"), |
| new TaskName("task-4"), new LocationId("LocationId-4"), |
| new TaskName("task-5"), new LocationId("LocationId-5")); |
| |
| Assert.assertEquals(taskLocality, zkUtils.readTaskLocality()); |
| } |
| |
| @Test |
| public void testGetAllProcessorNodesShouldReturnEmptyForNonExistingZookeeperNodes() { |
| List<ZkUtils.ProcessorNode> processorsIDs = zkUtils.getAllProcessorNodes(); |
| |
| Assert.assertEquals(0, processorsIDs.size()); |
| } |
| |
| @Test |
| public void testZKProtocolVersion() { |
| // first time connect, version should be set to ZkUtils.ZK_PROTOCOL_VERSION |
| ZkLeaderElector le = new ZkLeaderElector("1", zkUtils); |
| zkUtils.validateZkVersion(); |
| |
| String root = zkUtils.getKeyBuilder().getRootPath(); |
| String ver = zkUtils.getZkClient().readData(root); |
| Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver); |
| |
| // do it again (in case original value was null |
| zkUtils.validateZkVersion(); |
| ver = zkUtils.getZkClient().readData(root); |
| Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver); |
| |
| // now negative case |
| zkUtils.getZkClient().writeData(root, "2.0"); |
| try { |
| zkUtils.validateZkVersion(); |
| Assert.fail("Expected to fail because of version mismatch 2.0 vs 1.0"); |
| } catch (SamzaException e) { |
| // expected |
| } |
| |
| // validate future values, let's say that current version should be 3.0 |
| try { |
| Field f = zkUtils.getClass().getDeclaredField("ZK_PROTOCOL_VERSION"); |
| FieldUtils.removeFinalModifier(f); |
| f.set(null, "3.0"); |
| } catch (Exception e) { |
| System.out.println(e); |
| Assert.fail(); |
| } |
| |
| try { |
| zkUtils.validateZkVersion(); |
| Assert.fail("Expected to fail because of version mismatch 2.0 vs 3.0"); |
| } catch (SamzaException e) { |
| // expected |
| } |
| } |
| |
| @Test |
| public void testGetProcessorsIDs() { |
| Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size()); |
| zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1")); |
| List<String> l = zkUtils.getSortedActiveProcessorsIDs(); |
| Assert.assertEquals(1, l.size()); |
| new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId( |
| new ProcessorData("host2", "2")); |
| l = zkUtils.getSortedActiveProcessorsIDs(); |
| Assert.assertEquals(2, l.size()); |
| |
| Assert.assertEquals(" ID1 didn't match", "1", l.get(0)); |
| Assert.assertEquals(" ID2 didn't match", "2", l.get(1)); |
| } |
| |
| @Test |
| public void testSubscribeToJobModelVersionChange() { |
| |
| ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test"); |
| String root = keyBuilder.getRootPath(); |
| zkClient.deleteRecursive(root); |
| |
| class Result { |
| String res = ""; |
| public String getRes() { |
| return res; |
| } |
| public void updateRes(String newRes) { |
| res = newRes; |
| } |
| } |
| |
| Assert.assertFalse(zkUtils.exists(root)); |
| |
| // create the paths |
| zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()}); |
| Assert.assertTrue(zkUtils.exists(root)); |
| Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath())); |
| Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath())); |
| |
| final Result res = new Result(); |
| // define the callback |
| IZkDataListener dataListener = new IZkDataListener() { |
| @Override |
| public void handleDataChange(String dataPath, Object data) |
| throws Exception { |
| res.updateRes((String) data); |
| } |
| |
| @Override |
| public void handleDataDeleted(String dataPath) |
| throws Exception { |
| Assert.fail("Data wasn't deleted;"); |
| } |
| }; |
| // subscribe |
| zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener); |
| zkClient.subscribeDataChanges(keyBuilder.getProcessorsPath(), dataListener); |
| // update |
| zkClient.writeData(keyBuilder.getJobModelVersionPath(), "newVersion"); |
| |
| // verify |
| Assert.assertTrue(testWithDelayBackOff(() -> "newVersion".equals(res.getRes()), 2, 1000)); |
| |
| // update again |
| zkClient.writeData(keyBuilder.getProcessorsPath(), "newProcessor"); |
| |
| Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000)); |
| } |
| |
| /** |
| * Create two duplicate processors with same processorId. |
| * Second creation should fail with exception. |
| */ |
| @Test |
| public void testRegisterProcessorAndGetIdShouldFailForDuplicateProcessorRegistration() { |
| final String testHostName = "localhost"; |
| final String testProcessId = "testProcessorId"; |
| ProcessorData processorData1 = new ProcessorData(testHostName, testProcessId); |
| // Register processor 1 which is not duplicate, this registration should succeed. |
| zkUtils.registerProcessorAndGetId(processorData1); |
| |
| ZkUtils zkUtils1 = getZkUtils(); |
| zkUtils1.connect(); |
| ProcessorData duplicateProcessorData = new ProcessorData(testHostName, testProcessId); |
| // Registration of the duplicate processor should fail. |
| expectedException.expect(SamzaException.class); |
| zkUtils1.registerProcessorAndGetId(duplicateProcessorData); |
| } |
| |
| @Test |
| public void testPublishNewJobModel() { |
| ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test"); |
| String root = keyBuilder.getRootPath(); |
| zkClient.deleteRecursive(root); |
| String version = "1"; |
| String oldVersion = "0"; |
| |
| zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()}); |
| |
| zkUtils.publishJobModelVersion(oldVersion, version); |
| Assert.assertEquals(version, zkUtils.getJobModelVersion()); |
| |
| String newerVersion = Long.toString(Long.valueOf(version) + 1); |
| zkUtils.publishJobModelVersion(version, newerVersion); |
| Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion()); |
| |
| try { |
| zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version |
| Assert.fail("publish invalid version should've failed"); |
| } catch (SamzaException e) { |
| // expected |
| } |
| |
| // create job model |
| Map<String, String> configMap = new HashMap<>(); |
| Map<String, ContainerModel> containers = new HashMap<>(); |
| MapConfig config = new MapConfig(configMap); |
| JobModel jobModel = new JobModel(config, containers); |
| |
| zkUtils.publishJobModel(version, jobModel); |
| Assert.assertEquals(jobModel, zkUtils.getJobModel(version)); |
| } |
| |
| @Test |
| public void testCleanUpZkJobModels() { |
| String root = zkUtils.getKeyBuilder().getJobModelPathPrefix(); |
| System.out.println("root=" + root); |
| zkUtils.getZkClient().createPersistent(root, true); |
| |
| // generate multiple version |
| for (int i = 101; i < 110; i++) { |
| zkUtils.publishJobModel(String.valueOf(i), null); |
| } |
| |
| // clean all of the versions except 5 most recent ones |
| zkUtils.deleteOldJobModels(5); |
| Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root)); |
| } |
| |
| @Test |
| public void testCleanUpZkBarrierVersion() { |
| String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(); |
| zkUtils.getZkClient().createPersistent(root, true); |
| ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null); |
| for (int i = 200; i < 210; i++) { |
| barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c"))); |
| } |
| |
| zkUtils.deleteOldBarrierVersions(5); |
| List<String> zNodeIds = zkUtils.getZkClient().getChildren(root); |
| Collections.sort(zNodeIds); |
| Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"), |
| zNodeIds); |
| } |
| |
| @Test |
| public void testCleanUpZk() { |
| String pathA = "/path/testA"; |
| String pathB = "/path/testB"; |
| zkUtils.getZkClient().createPersistent(pathA, true); |
| zkUtils.getZkClient().createPersistent(pathB, true); |
| |
| // Create 100 nodes |
| for (int i = 0; i < 20; i++) { |
| String p1 = pathA + "/" + i; |
| zkUtils.getZkClient().createPersistent(p1, true); |
| zkUtils.getZkClient().createPersistent(p1 + "/something1", true); |
| zkUtils.getZkClient().createPersistent(p1 + "/something2", true); |
| |
| String p2 = pathB + "/some_" + i; |
| zkUtils.getZkClient().createPersistent(p2, true); |
| zkUtils.getZkClient().createPersistent(p2 + "/something1", true); |
| zkUtils.getZkClient().createPersistent(p2 + "/something2", true); |
| } |
| |
| List<String> zNodeIds = new ArrayList<>(); |
| // empty list |
| zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() { |
| @Override |
| public int compare(String o1, String o2) { |
| return o1.compareTo(o2); |
| } |
| }); |
| |
| |
| zNodeIds = zkUtils.getZkClient().getChildren(pathA); |
| zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() { |
| @Override |
| public int compare(String o1, String o2) { |
| return Integer.valueOf(o1) - Integer.valueOf(o2); |
| } |
| }); |
| |
| for (int i = 0; i < 10; i++) { |
| // should be gone |
| String p1 = pathA + "/" + i; |
| Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1)); |
| } |
| |
| for (int i = 10; i < 20; i++) { |
| // should be gone |
| String p1 = pathA + "/" + i; |
| Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1)); |
| } |
| |
| zNodeIds = zkUtils.getZkClient().getChildren(pathB); |
| zkUtils.deleteOldVersionPath(pathB, zNodeIds, 1, new Comparator<String>() { |
| @Override |
| public int compare(String o1, String o2) { |
| return Integer.valueOf(o1.substring(o1.lastIndexOf("_") + 1)) - Integer |
| .valueOf(o2.substring(o2.lastIndexOf("_") + 1)); |
| } |
| }); |
| |
| for (int i = 0; i < 19; i++) { |
| // should be gone |
| String p1 = pathB + "/" + i; |
| Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1)); |
| } |
| |
| for (int i = 19; i < 20; i++) { |
| // should be gone |
| String p1 = pathB + "/some_" + i; |
| Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1)); |
| } |
| |
| } |
| |
| public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) { |
| long delay = startDelayMs; |
| while (delay < maxDelayMs) { |
| if (cond.getAsBoolean()) |
| return true; |
| try { |
| Thread.sleep(delay); |
| } catch (InterruptedException e) { |
| return false; |
| } |
| delay *= 2; |
| } |
| return false; |
| } |
| |
| public static void sleepMs(long delay) { |
| try { |
| Thread.sleep(delay); |
| } catch (InterruptedException e) { |
| Assert.fail("Sleep was interrupted"); |
| } |
| } |
| @Test |
| public void testgetNextJobModelVersion() { |
| // Set up the Zk base paths for testing. |
| ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test"); |
| String root = keyBuilder.getRootPath(); |
| zkClient.deleteRecursive(root); |
| zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()}); |
| |
| String version = "1"; |
| String oldVersion = "0"; |
| |
| // Set zkNode JobModelVersion to 1. |
| zkUtils.publishJobModelVersion(oldVersion, version); |
| |
| Assert.assertEquals(version, zkUtils.getJobModelVersion()); |
| |
| // Publish JobModel with a higher version (2). |
| zkUtils.publishJobModel("2", new JobModel(new MapConfig(), new HashMap<>())); |
| |
| // Get on the JobModel version should return 2, taking into account the published version 2. |
| Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion())); |
| } |
| |
| @Test |
| public void testDeleteProcessorNodeShouldDeleteTheCorrectProcessorNode() { |
| String testProcessorId1 = "processorId1"; |
| String testProcessorId2 = "processorId2"; |
| |
| ZkUtils zkUtils = getZkUtils(); |
| ZkUtils zkUtils1 = getZkUtils(); |
| |
| zkUtils.registerProcessorAndGetId(new ProcessorData("host1", testProcessorId1)); |
| zkUtils1.registerProcessorAndGetId(new ProcessorData("host2", testProcessorId2)); |
| |
| zkUtils.deleteProcessorNode(testProcessorId1); |
| |
| List<String> expectedProcessors = ImmutableList.of(testProcessorId2); |
| List<String> actualProcessors = zkUtils.getSortedActiveProcessorsIDs(); |
| |
| Assert.assertEquals(expectedProcessors, actualProcessors); |
| } |
| |
| @Test |
| public void testCloseShouldRetryOnceOnInterruptedException() { |
| ZkClient zkClient = Mockito.mock(ZkClient.class); |
| ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); |
| |
| Mockito.doThrow(new ZkInterruptedException(new InterruptedException())) |
| .doAnswer(invocation -> null) |
| .when(zkClient).close(); |
| |
| zkUtils.close(); |
| |
| Mockito.verify(zkClient, Mockito.times(2)).close(); |
| } |
| |
| @Test |
| public void testCloseShouldTearDownZkConnectionOnInterruptedException() throws Exception { |
| CountDownLatch latch = new CountDownLatch(1); |
| // Establish connection with the zookeeper server. |
| ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.getPort()); |
| ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); |
| |
| Thread threadToInterrupt = new Thread(() -> { |
| try { |
| latch.await(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| zkUtils.close(); |
| }); |
| |
| threadToInterrupt.start(); |
| |
| Field field = ZkClient.class.getDeclaredField("_closed"); |
| field.setAccessible(true); |
| |
| Assert.assertFalse(field.getBoolean(zkClient)); |
| |
| threadToInterrupt.interrupt(); |
| threadToInterrupt.join(); |
| |
| Assert.assertTrue(field.getBoolean(zkClient)); |
| } |
| } |