blob: 13fc13bbd937902e9e1ccbe8bbd60248985956d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.zk;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.BooleanSupplier;
import com.google.common.collect.ImmutableList;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.runtime.LocationId;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
public class TestZkUtils {
private static EmbeddedZookeeper zkServer = null;
private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
private ZkClient zkClient = null;
private static final int SESSION_TIMEOUT_MS = 5000;
private static final int CONNECTION_TIMEOUT_MS = 10000;
private ZkUtils zkUtils;
@Rule
// Declared public to honor junit contract.
public final ExpectedException expectedException = ExpectedException.none();
@Rule
public Timeout testTimeOutInMillis = new Timeout(120000);
@Before
public void testSetup() {
try {
zkServer = new EmbeddedZookeeper();
zkServer.setup();
zkClient = new ZkClient(
new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS),
CONNECTION_TIMEOUT_MS);
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Client connection setup failed. Aborting tests..");
}
try {
zkClient.createPersistent(KEY_BUILDER.getProcessorsPath(), true);
} catch (ZkNodeExistsException e) {
// Do nothing
}
zkUtils = getZkUtils();
zkUtils.connect();
}
@After
public void testTeardown() {
if (zkClient != null) {
try {
zkUtils.close();
} finally {
zkServer.teardown();
}
}
}
private ZkUtils getZkUtils() {
return new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS,
SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
}
@Test
public void testRegisterProcessorId() {
String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));
Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
// Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath));
}
@Test
public void testGetActiveProcessors() {
Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsZnodes().size());
zkUtils.registerProcessorAndGetId(new ProcessorData("processorData", "1"));
Assert.assertEquals(1, zkUtils.getSortedActiveProcessorsZnodes().size());
}
@Test
public void testGetActiveProcessorIdShouldReturnEmptyForNonExistingZookeeperNodes() {
List<String> processorsIDs = zkUtils.getActiveProcessorsIDs(ImmutableList.of("node1", "node2"));
Assert.assertEquals(0, processorsIDs.size());
}
@Test
public void testReadAfterWriteTaskLocality() {
zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1"));
zkUtils.writeTaskLocality(new TaskName("task-2"), new LocationId("LocationId-2"));
Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1"),
new TaskName("task-2"), new LocationId("LocationId-2"));
Assert.assertEquals(taskLocality, zkUtils.readTaskLocality());
}
@Test
public void testReadWhenTaskLocalityDoesNotExist() {
Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality();
Assert.assertEquals(0, taskLocality.size());
}
@Test
public void testWriteTaskLocalityShouldUpdateTheExistingValue() {
zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1"));
Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1"));
Assert.assertEquals(taskLocality, zkUtils.readTaskLocality());
zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-2"));
taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-2"));
Assert.assertEquals(taskLocality, zkUtils.readTaskLocality());
}
@Test
public void testReadTaskLocalityShouldReturnAllTheExistingLocalityValue() {
zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1"));
zkUtils.writeTaskLocality(new TaskName("task-2"), new LocationId("LocationId-2"));
zkUtils.writeTaskLocality(new TaskName("task-3"), new LocationId("LocationId-3"));
zkUtils.writeTaskLocality(new TaskName("task-4"), new LocationId("LocationId-4"));
zkUtils.writeTaskLocality(new TaskName("task-5"), new LocationId("LocationId-5"));
Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1"),
new TaskName("task-2"), new LocationId("LocationId-2"),
new TaskName("task-3"), new LocationId("LocationId-3"),
new TaskName("task-4"), new LocationId("LocationId-4"),
new TaskName("task-5"), new LocationId("LocationId-5"));
Assert.assertEquals(taskLocality, zkUtils.readTaskLocality());
}
@Test
public void testGetAllProcessorNodesShouldReturnEmptyForNonExistingZookeeperNodes() {
List<ZkUtils.ProcessorNode> processorsIDs = zkUtils.getAllProcessorNodes();
Assert.assertEquals(0, processorsIDs.size());
}
@Test
public void testZKProtocolVersion() {
// first time connect, version should be set to ZkUtils.ZK_PROTOCOL_VERSION
ZkLeaderElector le = new ZkLeaderElector("1", zkUtils);
zkUtils.validateZkVersion();
String root = zkUtils.getKeyBuilder().getRootPath();
String ver = zkUtils.getZkClient().readData(root);
Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver);
// do it again (in case original value was null
zkUtils.validateZkVersion();
ver = zkUtils.getZkClient().readData(root);
Assert.assertEquals(ZkUtils.ZK_PROTOCOL_VERSION, ver);
// now negative case
zkUtils.getZkClient().writeData(root, "2.0");
try {
zkUtils.validateZkVersion();
Assert.fail("Expected to fail because of version mismatch 2.0 vs 1.0");
} catch (SamzaException e) {
// expected
}
// validate future values, let's say that current version should be 3.0
try {
Field f = zkUtils.getClass().getDeclaredField("ZK_PROTOCOL_VERSION");
FieldUtils.removeFinalModifier(f);
f.set(null, "3.0");
} catch (Exception e) {
System.out.println(e);
Assert.fail();
}
try {
zkUtils.validateZkVersion();
Assert.fail("Expected to fail because of version mismatch 2.0 vs 3.0");
} catch (SamzaException e) {
// expected
}
}
@Test
public void testGetProcessorsIDs() {
Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size());
zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
List<String> l = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(1, l.size());
new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(
new ProcessorData("host2", "2"));
l = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(2, l.size());
Assert.assertEquals(" ID1 didn't match", "1", l.get(0));
Assert.assertEquals(" ID2 didn't match", "2", l.get(1));
}
@Test
public void testSubscribeToJobModelVersionChange() {
ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
String root = keyBuilder.getRootPath();
zkClient.deleteRecursive(root);
class Result {
String res = "";
public String getRes() {
return res;
}
public void updateRes(String newRes) {
res = newRes;
}
}
Assert.assertFalse(zkUtils.exists(root));
// create the paths
zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
Assert.assertTrue(zkUtils.exists(root));
Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath()));
Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath()));
final Result res = new Result();
// define the callback
IZkDataListener dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data)
throws Exception {
res.updateRes((String) data);
}
@Override
public void handleDataDeleted(String dataPath)
throws Exception {
Assert.fail("Data wasn't deleted;");
}
};
// subscribe
zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
zkClient.subscribeDataChanges(keyBuilder.getProcessorsPath(), dataListener);
// update
zkClient.writeData(keyBuilder.getJobModelVersionPath(), "newVersion");
// verify
Assert.assertTrue(testWithDelayBackOff(() -> "newVersion".equals(res.getRes()), 2, 1000));
// update again
zkClient.writeData(keyBuilder.getProcessorsPath(), "newProcessor");
Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
}
/**
* Create two duplicate processors with same processorId.
* Second creation should fail with exception.
*/
@Test
public void testRegisterProcessorAndGetIdShouldFailForDuplicateProcessorRegistration() {
final String testHostName = "localhost";
final String testProcessId = "testProcessorId";
ProcessorData processorData1 = new ProcessorData(testHostName, testProcessId);
// Register processor 1 which is not duplicate, this registration should succeed.
zkUtils.registerProcessorAndGetId(processorData1);
ZkUtils zkUtils1 = getZkUtils();
zkUtils1.connect();
ProcessorData duplicateProcessorData = new ProcessorData(testHostName, testProcessId);
// Registration of the duplicate processor should fail.
expectedException.expect(SamzaException.class);
zkUtils1.registerProcessorAndGetId(duplicateProcessorData);
}
@Test
public void testPublishNewJobModel() {
ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
String root = keyBuilder.getRootPath();
zkClient.deleteRecursive(root);
String version = "1";
String oldVersion = "0";
zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
zkUtils.publishJobModelVersion(oldVersion, version);
Assert.assertEquals(version, zkUtils.getJobModelVersion());
String newerVersion = Long.toString(Long.valueOf(version) + 1);
zkUtils.publishJobModelVersion(version, newerVersion);
Assert.assertEquals(newerVersion, zkUtils.getJobModelVersion());
try {
zkUtils.publishJobModelVersion(oldVersion, "10"); //invalid new version
Assert.fail("publish invalid version should've failed");
} catch (SamzaException e) {
// expected
}
// create job model
Map<String, String> configMap = new HashMap<>();
Map<String, ContainerModel> containers = new HashMap<>();
MapConfig config = new MapConfig(configMap);
JobModel jobModel = new JobModel(config, containers);
zkUtils.publishJobModel(version, jobModel);
Assert.assertEquals(jobModel, zkUtils.getJobModel(version));
}
@Test
public void testCleanUpZkJobModels() {
String root = zkUtils.getKeyBuilder().getJobModelPathPrefix();
System.out.println("root=" + root);
zkUtils.getZkClient().createPersistent(root, true);
// generate multiple version
for (int i = 101; i < 110; i++) {
zkUtils.publishJobModel(String.valueOf(i), null);
}
// clean all of the versions except 5 most recent ones
zkUtils.deleteOldJobModels(5);
Assert.assertEquals(Arrays.asList("105", "106", "107", "108", "109"), zkUtils.getZkClient().getChildren(root));
}
@Test
public void testCleanUpZkBarrierVersion() {
String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
zkUtils.getZkClient().createPersistent(root, true);
ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null);
for (int i = 200; i < 210; i++) {
barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c")));
}
zkUtils.deleteOldBarrierVersions(5);
List<String> zNodeIds = zkUtils.getZkClient().getChildren(root);
Collections.sort(zNodeIds);
Assert.assertEquals(Arrays.asList("barrier_205", "barrier_206", "barrier_207", "barrier_208", "barrier_209"),
zNodeIds);
}
@Test
public void testCleanUpZk() {
String pathA = "/path/testA";
String pathB = "/path/testB";
zkUtils.getZkClient().createPersistent(pathA, true);
zkUtils.getZkClient().createPersistent(pathB, true);
// Create 100 nodes
for (int i = 0; i < 20; i++) {
String p1 = pathA + "/" + i;
zkUtils.getZkClient().createPersistent(p1, true);
zkUtils.getZkClient().createPersistent(p1 + "/something1", true);
zkUtils.getZkClient().createPersistent(p1 + "/something2", true);
String p2 = pathB + "/some_" + i;
zkUtils.getZkClient().createPersistent(p2, true);
zkUtils.getZkClient().createPersistent(p2 + "/something1", true);
zkUtils.getZkClient().createPersistent(p2 + "/something2", true);
}
List<String> zNodeIds = new ArrayList<>();
// empty list
zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return o1.compareTo(o2);
}
});
zNodeIds = zkUtils.getZkClient().getChildren(pathA);
zkUtils.deleteOldVersionPath(pathA, zNodeIds, 10, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.valueOf(o1) - Integer.valueOf(o2);
}
});
for (int i = 0; i < 10; i++) {
// should be gone
String p1 = pathA + "/" + i;
Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
}
for (int i = 10; i < 20; i++) {
// should be gone
String p1 = pathA + "/" + i;
Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
}
zNodeIds = zkUtils.getZkClient().getChildren(pathB);
zkUtils.deleteOldVersionPath(pathB, zNodeIds, 1, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return Integer.valueOf(o1.substring(o1.lastIndexOf("_") + 1)) - Integer
.valueOf(o2.substring(o2.lastIndexOf("_") + 1));
}
});
for (int i = 0; i < 19; i++) {
// should be gone
String p1 = pathB + "/" + i;
Assert.assertFalse("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
}
for (int i = 19; i < 20; i++) {
// should be gone
String p1 = pathB + "/some_" + i;
Assert.assertTrue("path " + p1 + " exists", zkUtils.getZkClient().exists(p1));
}
}
public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
long delay = startDelayMs;
while (delay < maxDelayMs) {
if (cond.getAsBoolean())
return true;
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
return false;
}
delay *= 2;
}
return false;
}
public static void sleepMs(long delay) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Assert.fail("Sleep was interrupted");
}
}
@Test
public void testgetNextJobModelVersion() {
// Set up the Zk base paths for testing.
ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
String root = keyBuilder.getRootPath();
zkClient.deleteRecursive(root);
zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()});
String version = "1";
String oldVersion = "0";
// Set zkNode JobModelVersion to 1.
zkUtils.publishJobModelVersion(oldVersion, version);
Assert.assertEquals(version, zkUtils.getJobModelVersion());
// Publish JobModel with a higher version (2).
zkUtils.publishJobModel("2", new JobModel(new MapConfig(), new HashMap<>()));
// Get on the JobModel version should return 2, taking into account the published version 2.
Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
}
@Test
public void testDeleteProcessorNodeShouldDeleteTheCorrectProcessorNode() {
String testProcessorId1 = "processorId1";
String testProcessorId2 = "processorId2";
ZkUtils zkUtils = getZkUtils();
ZkUtils zkUtils1 = getZkUtils();
zkUtils.registerProcessorAndGetId(new ProcessorData("host1", testProcessorId1));
zkUtils1.registerProcessorAndGetId(new ProcessorData("host2", testProcessorId2));
zkUtils.deleteProcessorNode(testProcessorId1);
List<String> expectedProcessors = ImmutableList.of(testProcessorId2);
List<String> actualProcessors = zkUtils.getSortedActiveProcessorsIDs();
Assert.assertEquals(expectedProcessors, actualProcessors);
}
@Test
public void testCloseShouldRetryOnceOnInterruptedException() {
ZkClient zkClient = Mockito.mock(ZkClient.class);
ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
Mockito.doThrow(new ZkInterruptedException(new InterruptedException()))
.doAnswer(invocation -> null)
.when(zkClient).close();
zkUtils.close();
Mockito.verify(zkClient, Mockito.times(2)).close();
}
@Test
public void testCloseShouldTearDownZkConnectionOnInterruptedException() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
// Establish connection with the zookeeper server.
ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.getPort());
ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
Thread threadToInterrupt = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
zkUtils.close();
});
threadToInterrupt.start();
Field field = ZkClient.class.getDeclaredField("_closed");
field.setAccessible(true);
Assert.assertFalse(field.getBoolean(zkClient));
threadToInterrupt.interrupt();
threadToInterrupt.join();
Assert.assertTrue(field.getBoolean(zkClient));
}
}