blob: c228c4b4659367c56b393f9c411d38a8a5793dce [file] [log] [blame]
package org.apache.helix.manager.zk.client;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.concurrent.TimeUnit;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestHelixZkClient extends ZkUnitTestBase {
private final String TEST_NODE = "/test_helix_zkclient";
@Test
public void testZkConnectionManager() {
final String TEST_ROOT = "/testZkConnectionManager/IDEALSTATES";
final String TEST_PATH = TEST_ROOT + TEST_NODE;
ZkConnectionManager zkConnectionManager =
new ZkConnectionManager(new ZkConnection(ZK_ADDR), HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
null);
Assert.assertTrue(zkConnectionManager.waitUntilConnected(1, TimeUnit.SECONDS));
// This client can write/read from ZK
zkConnectionManager.createPersistent(TEST_PATH, true);
zkConnectionManager.writeData(TEST_PATH, "Test");
Assert.assertNotNull(zkConnectionManager.readData(TEST_PATH));
zkConnectionManager.deleteRecursively(TEST_ROOT);
// This client can be shared, and cannot close when sharing
SharedZkClient sharedZkClient =
new SharedZkClient(zkConnectionManager, new HelixZkClient.ZkClientConfig(), null);
try {
zkConnectionManager.close();
Assert.fail("Dedicated ZkClient cannot be closed while sharing!");
} catch (ZkClientException hex) {
// expected
}
// This client can be closed normally when sharing ends
sharedZkClient.close();
Assert.assertTrue(sharedZkClient.isClosed());
Assert.assertFalse(sharedZkClient.waitUntilConnected(100, TimeUnit.MILLISECONDS));
zkConnectionManager.close();
Assert.assertTrue(zkConnectionManager.isClosed());
Assert.assertFalse(zkConnectionManager.waitUntilConnected(100, TimeUnit.MILLISECONDS));
// Sharing a closed dedicated ZkClient shall fail
try {
new SharedZkClient(zkConnectionManager, new HelixZkClient.ZkClientConfig(), null);
Assert.fail("Sharing a closed dedicated ZkClient shall fail.");
} catch (ZkClientException hex) {
// expected
}
deleteCluster("testZkConnectionManager");
}
@Test(dependsOnMethods = "testZkConnectionManager")
public void testSharingZkClient() throws Exception {
final String TEST_ROOT = "/testSharingZkClient/IDEALSTATES";
final String TEST_PATH = TEST_ROOT + TEST_NODE;
// A factory just for this tests, this for avoiding the impact from other tests running in
// parallel.
final SharedZkClientFactory testFactory = new SharedZkClientFactory();
HelixZkClient.ZkConnectionConfig connectionConfig =
new HelixZkClient.ZkConnectionConfig(ZK_ADDR);
HelixZkClient sharedZkClientA =
testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
Assert.assertTrue(sharedZkClientA.waitUntilConnected(1, TimeUnit.SECONDS));
HelixZkClient sharedZkClientB =
testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
Assert.assertTrue(sharedZkClientB.waitUntilConnected(1, TimeUnit.SECONDS));
Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
// client A and B is sharing the same session.
Assert.assertEquals(sharedZkClientA.getSessionId(), sharedZkClientB.getSessionId());
long sessionId = sharedZkClientA.getSessionId();
final int[] notificationCountA = {0, 0};
sharedZkClientA.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) {
notificationCountA[0]++;
}
@Override
public void handleDataDeleted(String s) {
notificationCountA[1]++;
}
});
final int[] notificationCountB = {0, 0};
sharedZkClientB.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) {
notificationCountB[0]++;
}
@Override
public void handleDataDeleted(String s) {
notificationCountB[1]++;
}
});
// Modify using client A and client B will get notification.
sharedZkClientA.createPersistent(TEST_PATH, true);
Assert.assertTrue(TestHelper.verify(() -> notificationCountB[0] == 1, 1000));
Assert.assertEquals(notificationCountB[1], 0);
sharedZkClientA.deleteRecursively(TEST_ROOT);
Assert.assertTrue(TestHelper.verify(() -> notificationCountB[1] == 1, 1000));
Assert.assertEquals(notificationCountB[0], 1);
try {
sharedZkClientA.createEphemeral(TEST_PATH, true);
Assert.fail("Create Ephemeral nodes using shared client should fail.");
} catch (UnsupportedOperationException e) {
// expected.
}
sharedZkClientA.close();
// Shared client A closed.
Assert.assertTrue(sharedZkClientA.isClosed());
Assert.assertFalse(sharedZkClientA.waitUntilConnected(100, TimeUnit.MILLISECONDS));
// Shared client B still open.
Assert.assertFalse(sharedZkClientB.isClosed());
Assert.assertTrue(sharedZkClientB.waitUntilConnected(100, TimeUnit.MILLISECONDS));
// client A cannot do any modify once closed.
try {
sharedZkClientA.createPersistent(TEST_PATH, true);
Assert.fail("Should not be able to create node with a closed client.");
} catch (Exception e) {
// expected to be here.
}
// client B needs to re-install the data watch
sharedZkClientB.watchForData(TEST_PATH);
// Now modify using client B, and client A won't get notification.
sharedZkClientB.createPersistent(TEST_PATH, true);
Assert.assertTrue(TestHelper.verify(() -> notificationCountB[0] == 2, 1000));
Assert.assertFalse(TestHelper.verify(() -> notificationCountA[0] == 2, 1000));
sharedZkClientB.deleteRecursively(TEST_ROOT);
Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
sharedZkClientB.close();
// Shared client B closed.
Assert.assertTrue(sharedZkClientB.isClosed());
Assert.assertFalse(sharedZkClientB.waitUntilConnected(100, TimeUnit.MILLISECONDS));
Assert.assertEquals(testFactory.getActiveConnectionCount(), 0);
// Try to create new shared ZkClient, will get a different session
HelixZkClient sharedZkClientC =
testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
Assert.assertFalse(sessionId == sharedZkClientC.getSessionId());
Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
sharedZkClientC.close();
// Shared client C closed.
Assert.assertTrue(sharedZkClientC.isClosed());
Assert.assertFalse(sharedZkClientC.waitUntilConnected(100, TimeUnit.MILLISECONDS));
Assert.assertEquals(testFactory.getActiveConnectionCount(), 0);
deleteCluster("testSharingZkClient");
}
}