blob: bf58bee6b4277a3790b658833d2cd6f4489aff88 [file] [log] [blame]
package org.apache.helix.integration.multizk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
import org.apache.helix.ThreadLeakageChecker;
import org.apache.helix.api.config.RebalanceConfig;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.MockTask;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.api.client.ZkClientType;
import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.exception.MultiZkException;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* TestMultiZkHelixJavaApis spins up multiple in-memory ZooKeepers with a pre-configured
* cluster-Zk realm routing information.
* This test verifies that all Helix Java APIs work as expected.
*/
public class TestMultiZkHelixJavaApis {
private static Logger LOG = LoggerFactory.getLogger(TestMultiZkHelixJavaApis.class);
private static final int NUM_ZK = 3;
private static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
private static final Map<String, HelixZkClient> ZK_CLIENT_MAP = new HashMap<>();
private static final Map<String, ClusterControllerManager> MOCK_CONTROLLERS = new HashMap<>();
private static final Set<MockParticipantManager> MOCK_PARTICIPANTS = new HashSet<>();
private static final List<String> CLUSTER_LIST =
ImmutableList.of("CLUSTER_1", "CLUSTER_2", "CLUSTER_3");
// For testing different MSDS endpoint configs.
private static final String CLUSTER_ONE = CLUSTER_LIST.get(0);
private static final String CLUSTER_FOUR = "CLUSTER_4";
private MockMetadataStoreDirectoryServer _msds;
private static final Map<String, Collection<String>> _rawRoutingData = new HashMap<>();
private RealmAwareZkClient _zkClient;
private HelixAdmin _zkHelixAdmin;
// Save System property configs from before this test and pass onto after the test
private final Map<String, String> _configStore = new HashMap<>();
private static final String ZK_PREFIX = "localhost:";
private static final int ZK_START_PORT = 8777;
private String _msdsEndpoint;
@BeforeClass
public void beforeClass() throws Exception {
// Create 3 in-memory zookeepers and routing mapping
for (int i = 0; i < NUM_ZK; i++) {
String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
ZK_SERVER_MAP.put(zkAddress, TestHelper.startZkServer(zkAddress));
ZK_CLIENT_MAP.put(zkAddress, DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())));
// One cluster per ZkServer created
_rawRoutingData.put(zkAddress, Collections.singletonList("/" + CLUSTER_LIST.get(i)));
}
// Create a Mock MSDS
final String msdsHostName = "localhost";
final int msdsPort = 11117;
final String msdsNamespace = "multiZkTest";
_msdsEndpoint =
"http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace;
_msds = new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace,
_rawRoutingData);
_msds.startServer();
// Save previously-set system configs
String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
String prevMsdsServerEndpoint =
System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
if (prevMultiZkEnabled != null) {
_configStore.put(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
}
if (prevMsdsServerEndpoint != null) {
_configStore
.put(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, prevMsdsServerEndpoint);
}
// Turn on multiZk mode in System config
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
// MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, _msdsEndpoint);
// Routing data may be set by other tests using the same endpoint; reset() for good measure
RoutingDataManager.getInstance().reset();
// Create a FederatedZkClient for admin work
_zkClient =
new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
new RealmAwareZkClient.RealmAwareZkClientConfig());
}
@AfterClass
public void afterClass() throws Exception {
String testClassName = getClass().getSimpleName();
try {
// Kill all mock controllers and participants
MOCK_CONTROLLERS.values().forEach(ClusterControllerManager::syncStop);
MOCK_PARTICIPANTS.forEach(mockParticipantManager -> {
mockParticipantManager.syncStop();
StateMachineEngine stateMachine = mockParticipantManager.getStateMachineEngine();
if (stateMachine != null) {
StateModelFactory stateModelFactory = stateMachine.getStateModelFactory("Task");
if (stateModelFactory != null && stateModelFactory instanceof TaskStateModelFactory) {
((TaskStateModelFactory) stateModelFactory).shutdown();
}
}
});
// Tear down all clusters
CLUSTER_LIST.forEach(cluster -> TestHelper.dropCluster(cluster, _zkClient));
// Verify that all clusters are gone in each zookeeper
Assert.assertTrue(TestHelper.verify(() -> {
for (Map.Entry<String, HelixZkClient> zkClientEntry : ZK_CLIENT_MAP.entrySet()) {
List<String> children = zkClientEntry.getValue().getChildren("/");
if (children.stream().anyMatch(CLUSTER_LIST::contains)) {
return false;
}
}
return true;
}, TestHelper.WAIT_DURATION));
// Tear down zookeepers
ZK_CLIENT_MAP.forEach((zkAddress, zkClient) -> zkClient.close());
ZK_SERVER_MAP.forEach((zkAddress, zkServer) -> zkServer.shutdown());
// Stop MockMSDS
_msds.stopServer();
// Close ZK client connections
_zkHelixAdmin.close();
if (_zkClient != null && !_zkClient.isClosed()) {
_zkClient.close();
}
} finally {
// Restore System property configs
if (_configStore.containsKey(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED,
_configStore.get(SystemPropertyKeys.MULTI_ZK_ENABLED));
} else {
System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
}
if (_configStore.containsKey(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY)) {
System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
_configStore.get(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY));
} else {
System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
}
}
boolean status = false;
try {
status = ThreadLeakageChecker.afterClassCheck(testClassName);
} catch (Exception e) {
LOG.error("ThreadLeakageChecker exception:", e);
}
// todo: We should fail test here once we achieved 0 leakage and remove the following System print
if (!status) {
System.out.println("---------- Test Class " + testClassName + " thread leakage detected! ---------------");
}
}
/**
* Test cluster creation according to the pre-set routing mapping.
* Helix Java API tested is ClusterSetup in this method.
*/
@Test
public void testCreateClusters() {
// Create two ClusterSetups using two different constructors
// Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
ClusterSetup clusterSetupZkAddr = new ClusterSetup(ZK_SERVER_MAP.keySet().iterator().next());
ClusterSetup clusterSetupBuilder = new ClusterSetup.Builder().build();
createClusters(clusterSetupZkAddr);
verifyClusterCreation(clusterSetupZkAddr);
createClusters(clusterSetupBuilder);
verifyClusterCreation(clusterSetupBuilder);
// Create clusters again to continue with testing
createClusters(clusterSetupBuilder);
clusterSetupZkAddr.close();
clusterSetupBuilder.close();
}
private void createClusters(ClusterSetup clusterSetup) {
// Create clusters
for (String clusterName : CLUSTER_LIST) {
clusterSetup.addCluster(clusterName, false);
}
}
private void verifyClusterCreation(ClusterSetup clusterSetup) {
// Verify that clusters have been created correctly according to routing mapping
_rawRoutingData.forEach((zkAddress, cluster) -> {
// Note: clusterNamePath already contains "/"
String clusterNamePath = cluster.iterator().next();
// Check with single-realm ZkClients
Assert.assertTrue(ZK_CLIENT_MAP.get(zkAddress).exists(clusterNamePath));
// Check with realm-aware ZkClient (federated)
Assert.assertTrue(_zkClient.exists(clusterNamePath));
// Remove clusters
clusterSetup
.deleteCluster(clusterNamePath.substring(1)); // Need to remove "/" at the beginning
});
}
/**
* Test Helix Participant creation and addition.
* Helix Java APIs tested in this method are:
* ZkHelixAdmin and ZKHelixManager (mock participant/controller)
*/
@Test(dependsOnMethods = "testCreateClusters")
public void testCreateParticipants() throws Exception {
// Create two ClusterSetups using two different constructors
// Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
HelixAdmin helixAdminZkAddr = new ZKHelixAdmin(ZK_SERVER_MAP.keySet().iterator().next());
HelixAdmin helixAdminBuilder = new ZKHelixAdmin.Builder().build();
_zkHelixAdmin = new ZKHelixAdmin.Builder().build();
String participantNamePrefix = "Node_";
int numParticipants = 5;
createParticipantsAndVerify(helixAdminZkAddr, numParticipants, participantNamePrefix);
createParticipantsAndVerify(helixAdminBuilder, numParticipants, participantNamePrefix);
// Create mock controller and participants for next tests
for (String cluster : CLUSTER_LIST) {
// Start a controller
// Note: in multiZK mode, ZK Addr is ignored
ClusterControllerManager mockController =
new ClusterControllerManager("DummyZK", cluster, "controller");
mockController.syncStart();
MOCK_CONTROLLERS.put(cluster, mockController);
for (int i = 0; i < numParticipants; i++) {
// Note: in multiZK mode, ZK Addr is ignored
InstanceConfig instanceConfig = new InstanceConfig(participantNamePrefix + i);
helixAdminBuilder.addInstance(cluster, instanceConfig);
MockParticipantManager mockNode =
new MockParticipantManager("DummyZK", cluster, participantNamePrefix + i);
// Register task state model for task framework testing in later methods
Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
// Register a Task state model factory.
StateMachineEngine stateMachine = mockNode.getStateMachineEngine();
stateMachine
.registerStateModelFactory("Task", new TaskStateModelFactory(mockNode, taskFactoryReg));
mockNode.syncStart();
MOCK_PARTICIPANTS.add(mockNode);
}
// Check that mockNodes are up
Assert.assertTrue(TestHelper
.verify(() -> helixAdminBuilder.getInstancesInCluster(cluster).size() == numParticipants,
TestHelper.WAIT_DURATION));
}
helixAdminZkAddr.close();
helixAdminBuilder.close();
}
private void createParticipantsAndVerify(HelixAdmin admin, int numParticipants,
String participantNamePrefix) {
// Create participants in clusters
Set<String> participantNames = new HashSet<>();
CLUSTER_LIST.forEach(cluster -> {
for (int i = 0; i < numParticipants; i++) {
String participantName = participantNamePrefix + i;
participantNames.add(participantName);
InstanceConfig instanceConfig = new InstanceConfig(participantNamePrefix + i);
admin.addInstance(cluster, instanceConfig);
}
});
// Verify participants have been created properly
_rawRoutingData.forEach((zkAddress, cluster) -> {
// Note: clusterNamePath already contains "/"
String clusterNamePath = cluster.iterator().next();
// Check with single-realm ZkClients
List<String> instances =
ZK_CLIENT_MAP.get(zkAddress).getChildren(clusterNamePath + "/INSTANCES");
Assert.assertEquals(new HashSet<>(instances), participantNames);
// Check with realm-aware ZkClient (federated)
instances = _zkClient.getChildren(clusterNamePath + "/INSTANCES");
Assert.assertEquals(new HashSet<>(instances), participantNames);
// Remove Participants
participantNames.forEach(participant -> {
InstanceConfig instanceConfig = new InstanceConfig(participant);
admin.dropInstance(clusterNamePath.substring(1), instanceConfig);
});
});
}
/**
* Test creation of HelixManager and makes sure it connects correctly.
*/
@Test(dependsOnMethods = "testCreateParticipants")
public void testZKHelixManager() throws Exception {
String clusterName = "CLUSTER_1";
String participantName = "HelixManager";
InstanceConfig instanceConfig = new InstanceConfig(participantName);
_zkHelixAdmin.addInstance(clusterName, instanceConfig);
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
// Try with a connection config without ZK realm sharding key set (should fail)
RealmAwareZkClient.RealmAwareZkConnectionConfig invalidZkConnectionConfig =
connectionConfigBuilder.build();
RealmAwareZkClient.RealmAwareZkConnectionConfig validZkConnectionConfig =
connectionConfigBuilder.setZkRealmShardingKey("/" + clusterName).build();
HelixManagerProperty.Builder propertyBuilder = new HelixManagerProperty.Builder();
try {
HelixManager invalidManager = HelixManagerFactory
.getZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null,
propertyBuilder.setRealmAWareZkConnectionConfig(invalidZkConnectionConfig).build());
Assert.fail("Should see a HelixException here because the connection config doesn't have the "
+ "sharding key set!");
} catch (HelixException e) {
// Expected
}
// Connect as a participant
HelixManager managerParticipant = HelixManagerFactory
.getZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null,
propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build());
managerParticipant.connect();
// Connect as an administrator
HelixManager managerAdministrator = HelixManagerFactory
.getZKHelixManager(clusterName, participantName, InstanceType.ADMINISTRATOR, null,
propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build());
managerAdministrator.connect();
// Perform assert checks to make sure the manager can read and register itself as a participant
InstanceConfig instanceConfigRead = managerAdministrator.getClusterManagmentTool()
.getInstanceConfig(clusterName, participantName);
Assert.assertNotNull(instanceConfigRead);
Assert.assertEquals(instanceConfig.getInstanceName(), participantName);
Assert.assertNotNull(managerAdministrator.getHelixDataAccessor().getProperty(
managerAdministrator.getHelixDataAccessor().keyBuilder().liveInstance(participantName)));
// Clean up
managerParticipant.disconnect();
managerAdministrator.disconnect();
_zkHelixAdmin.dropInstance(clusterName, instanceConfig);
}
/**
* Test that clusters and instances are set up properly.
* Helix Java APIs tested in this method is ZkUtil.
*/
@Test(dependsOnMethods = "testZKHelixManager")
public void testZkUtil() {
CLUSTER_LIST.forEach(cluster -> {
_zkHelixAdmin.getInstancesInCluster(cluster).forEach(instance -> ZKUtil
.isInstanceSetup("DummyZk", cluster, instance, InstanceType.PARTICIPANT));
});
}
/**
* Create resources and see if things get rebalanced correctly.
* Helix Java API tested in this methods are:
* ZkBaseDataAccessor
* ZkHelixClusterVerifier (BestPossible)
*/
@Test(dependsOnMethods = "testZkUtil")
public void testCreateAndRebalanceResources() {
BaseDataAccessor<ZNRecord> dataAccessorZkAddr = new ZkBaseDataAccessor<>("DummyZk");
BaseDataAccessor<ZNRecord> dataAccessorBuilder =
new ZkBaseDataAccessor.Builder<ZNRecord>().build();
String resourceNamePrefix = "DB_";
int numResources = 5;
int numPartitions = 3;
Map<String, Map<String, ZNRecord>> idealStateMap = new HashMap<>();
for (String cluster : CLUSTER_LIST) {
Set<String> resourceNames = new HashSet<>();
Set<String> liveInstancesNames = new HashSet<>(dataAccessorZkAddr
.getChildNames("/" + cluster + "/LIVEINSTANCES", AccessOption.PERSISTENT));
for (int i = 0; i < numResources; i++) {
String resource = cluster + "_" + resourceNamePrefix + i;
_zkHelixAdmin.addResource(cluster, resource, numPartitions, "MasterSlave",
IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName());
_zkHelixAdmin.rebalance(cluster, resource, 3);
resourceNames.add(resource);
// Update IdealState fields with ZkBaseDataAccessor
String resourcePath = "/" + cluster + "/IDEALSTATES/" + resource;
ZNRecord is = dataAccessorZkAddr.get(resourcePath, null, AccessOption.PERSISTENT);
is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCER_CLASS_NAME.name(),
DelayedAutoRebalancer.class.getName());
is.setSimpleField(RebalanceConfig.RebalanceConfigProperty.REBALANCE_STRATEGY.name(),
CrushEdRebalanceStrategy.class.getName());
dataAccessorZkAddr.set(resourcePath, is, AccessOption.PERSISTENT);
idealStateMap.computeIfAbsent(cluster, recordList -> new HashMap<>())
.putIfAbsent(is.getId(), is); // Save ZNRecord for comparison later
}
// Create a verifier to make sure all resources have been rebalanced
ZkHelixClusterVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(cluster).setResources(resourceNames)
.setExpectLiveInstances(liveInstancesNames)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
try {
Assert.assertTrue(verifier.verifyByPolling());
} finally {
verifier.close();
}
}
// Using the ZkBaseDataAccessor created using the Builder, check that the correct IS is read
for (String cluster : CLUSTER_LIST) {
Map<String, ZNRecord> savedIdealStates = idealStateMap.get(cluster);
List<String> resources = dataAccessorBuilder
.getChildNames("/" + cluster + "/IDEALSTATES", AccessOption.PERSISTENT);
resources.forEach(resource -> {
ZNRecord is = dataAccessorBuilder
.get("/" + cluster + "/IDEALSTATES/" + resource, null, AccessOption.PERSISTENT);
Assert
.assertEquals(is.getSimpleFields(), savedIdealStates.get(is.getId()).getSimpleFields());
});
}
dataAccessorZkAddr.close();
dataAccessorBuilder.close();
}
/**
* This method tests ConfigAccessor.
*/
@Test(dependsOnMethods = "testCreateAndRebalanceResources")
public void testConfigAccessor() {
// Build two ConfigAccessors to read and write:
// 1. ConfigAccessor using a deprecated constructor
// 2. ConfigAccessor using the Builder
ConfigAccessor configAccessorZkAddr = new ConfigAccessor("DummyZk");
ConfigAccessor configAccessorBuilder = new ConfigAccessor.Builder().build();
setClusterConfigAndVerify(configAccessorZkAddr);
setClusterConfigAndVerify(configAccessorBuilder);
configAccessorZkAddr.close();
configAccessorBuilder.close();
}
private void setClusterConfigAndVerify(ConfigAccessor configAccessorMultiZk) {
_rawRoutingData.forEach((zkAddr, clusterNamePathList) -> {
// Need to rid of "/" because this is a sharding key
String cluster = clusterNamePathList.iterator().next().substring(1);
ClusterConfig clusterConfig = new ClusterConfig(cluster);
clusterConfig.getRecord().setSimpleField("configAccessor", cluster);
configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
// Now check with a single-realm ConfigAccessor
ConfigAccessor configAccessorSingleZk =
new ConfigAccessor.Builder().setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkAddress(zkAddr).build();
Assert.assertEquals(configAccessorSingleZk.getClusterConfig(cluster), clusterConfig);
// Also check with a single-realm dedicated ZkClient
ZNRecord clusterConfigRecord =
ZK_CLIENT_MAP.get(zkAddr).readData("/" + cluster + "/CONFIGS/CLUSTER/" + cluster);
Assert.assertEquals(clusterConfigRecord, clusterConfig.getRecord());
// Clean up
clusterConfig = new ClusterConfig(cluster);
configAccessorMultiZk.setClusterConfig(cluster, clusterConfig);
});
}
/**
* This test submits multiple tasks to be run.
* The Helix Java APIs tested in this method are TaskDriver (HelixManager) and
* ZkHelixPropertyStore/ZkCacheBaseDataAccessor.
*/
@Test(dependsOnMethods = "testConfigAccessor")
public void testTaskFramework() throws InterruptedException {
// Note: TaskDriver is like HelixManager - it only operates on one designated
// Create TaskDrivers for all clusters
Map<String, TaskDriver> taskDriverMap = new HashMap<>();
MOCK_CONTROLLERS
.forEach((cluster, controller) -> taskDriverMap.put(cluster, new TaskDriver(controller)));
// Create a Task Framework workload and start
Workflow workflow = WorkflowGenerator.generateNonTargetedSingleWorkflowBuilder("job").build();
for (TaskDriver taskDriver : taskDriverMap.values()) {
taskDriver.start(workflow);
}
// Use multi-ZK ZkHelixPropertyStore/ZkCacheBaseDataAccessor to query for workflow/job states
HelixPropertyStore<ZNRecord> propertyStore =
new ZkHelixPropertyStore.Builder<ZNRecord>().build();
for (Map.Entry<String, TaskDriver> entry : taskDriverMap.entrySet()) {
String cluster = entry.getKey();
TaskDriver driver = entry.getValue();
// Wait until workflow has completed
TaskState wfStateFromTaskDriver =
driver.pollForWorkflowState(workflow.getName(), TaskState.COMPLETED);
String workflowContextPath =
"/" + cluster + "/PROPERTYSTORE/TaskRebalancer/" + workflow.getName() + "/Context";
ZNRecord workflowContextRecord =
propertyStore.get(workflowContextPath, null, AccessOption.PERSISTENT);
WorkflowContext context = new WorkflowContext(workflowContextRecord);
// Compare the workflow state read from PropertyStore and TaskDriver
Assert.assertEquals(context.getWorkflowState(), wfStateFromTaskDriver);
}
}
/**
* This method tests that ZKHelixAdmin::getClusters() works in multi-zk environment.
*/
@Test(dependsOnMethods = "testTaskFramework")
public void testGetAllClusters() {
Assert.assertEquals(new HashSet<>(_zkHelixAdmin.getClusters()), new HashSet<>(CLUSTER_LIST));
}
/**
* This method tests that GenericBaseDataAccessorBuilder and GenericZkHelixApiBuilder work as
* expected. This test focuses on various usage scenarios for ZkBaseDataAccessor.
*
* Use cases tested:
* - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set
* - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address not set, ZK sharding key set
* - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set, ZK sharding key set (ZK addr should override)
* - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set
* - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address not set, ZK sharding key set
* - Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set, ZK sharding key set (ZK addr should override)
* - Create ZkBaseDataAccessor, single-realm, federated ZkClient (should fail)
* - Create ZkBaseDataAccessor, multi-realm, dedicated ZkClient (should fail)
* - Create ZkBaseDataAccessor, multi-realm, shared ZkClient (should fail)
* - Create ZkBaseDataAccessor, multi-realm, federated ZkClient, ZkAddress set (should fail)
* - Create ZkBaseDataAccessor, multi-realm, federated ZkClient, Zk sharding key set (should fail because by definition, multi-realm can access multiple sharding keys)
* - Create ZkBaseDataAccessor, multi-realm, federated ZkClient
* - Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, No ZkAddress set, ConnectionConfig has an invalid ZK sharding key (should fail because it cannot find a valid ZK to connect to)
*/
@Test(dependsOnMethods = "testGetAllClusters")
public void testGenericBaseDataAccessorBuilder() {
// Custom session timeout value is used to count active connections in SharedZkClientFactory
int customSessionTimeout = 10000;
String firstZkAddress = "localhost:8777"; // has "CLUSTER_1"
String firstClusterPath = "/CLUSTER_1";
String secondClusterPath = "/CLUSTER_2";
ZkBaseDataAccessor.Builder<ZNRecord> zkBaseDataAccessorBuilder =
new ZkBaseDataAccessor.Builder<>();
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
connectionConfigBuilder.setSessionTimeout(customSessionTimeout);
BaseDataAccessor<ZNRecord> accessor;
// Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set
int currentSharedZkClientActiveConnectionCount =
SharedZkClientFactory.getInstance().getActiveConnectionCount();
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
// Check that no extra connection has been created
Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
currentSharedZkClientActiveConnectionCount);
accessor.close();
// Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address not set, ZK sharding key set
connectionConfigBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkRealmShardingKey(firstClusterPath);
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkClientType(ZkClientType.DEDICATED).setZkAddress(null)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
currentSharedZkClientActiveConnectionCount);
accessor.close();
// Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, ZK address set, ZK sharding key set (ZK addr should override)
connectionConfigBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkRealmShardingKey(secondClusterPath);
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
currentSharedZkClientActiveConnectionCount);
accessor.close();
// Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set
currentSharedZkClientActiveConnectionCount =
SharedZkClientFactory.getInstance().getActiveConnectionCount();
connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
// Add one to active connection count since this is a shared ZkClientType
Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
currentSharedZkClientActiveConnectionCount + 1);
accessor.close();
// Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address not set, ZK sharding key set
connectionConfigBuilder.setZkRealmShardingKey(firstClusterPath)
.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkClientType(ZkClientType.SHARED).setZkAddress(null)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
// Add one to active connection count since this is a shared ZkClientType
Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
currentSharedZkClientActiveConnectionCount + 1);
accessor.close();
// Create ZkBaseDataAccessor, single-realm, sharedZkClient, ZK address set, ZK sharding key set
// (ZK address should override the sharding key setting)
connectionConfigBuilder.setZkRealmShardingKey(secondClusterPath)
.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
Assert.assertFalse(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
// Add one to active connection count since this is a shared ZkClientType
Assert.assertEquals(SharedZkClientFactory.getInstance().getActiveConnectionCount(),
currentSharedZkClientActiveConnectionCount + 1);
accessor.close();
// Create ZkBaseDataAccessor, single-realm, federated ZkClient (should fail)
connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
try {
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkClientType(ZkClientType.FEDERATED).setZkAddress(firstZkAddress)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.fail("SINGLE_REALM and FEDERATED ZkClientType are an invalid combination!");
} catch (HelixException e) {
// Expected
}
// Create ZkBaseDataAccessor, multi-realm, dedicated ZkClient (should fail)
try {
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
.setZkClientType(ZkClientType.DEDICATED).setZkAddress(firstZkAddress)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.fail("MULTI_REALM and DEDICATED ZkClientType are an invalid combination!");
} catch (HelixException e) {
// Expected
}
// Create ZkBaseDataAccessor, multi-realm, shared ZkClient (should fail)
try {
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
.setZkClientType(ZkClientType.SHARED).setZkAddress(firstZkAddress)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.fail("MULTI_REALM and SHARED ZkClientType are an invalid combination!");
} catch (HelixException e) {
// Expected
}
// Create ZkBaseDataAccessor, multi-realm, federated ZkClient, ZkAddress set (should fail)
try {
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
.setZkClientType(ZkClientType.FEDERATED).setZkAddress(firstZkAddress)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.fail("MULTI_REALM and FEDERATED ZkClientType do not connect to one ZK!");
} catch (HelixException e) {
// Expected
}
// Create ZkBaseDataAccessor, multi-realm, federated ZkClient, Zk sharding key set (should fail
// because by definition, multi-realm can access multiple sharding keys)
try {
connectionConfigBuilder.setZkRealmShardingKey(firstClusterPath)
.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM);
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
.setZkClientType(ZkClientType.FEDERATED).setZkAddress(null)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.fail("MULTI_REALM and FEDERATED ZkClientType do not connect to one ZK!");
} catch (HelixException e) {
// Expected
}
// Create ZkBaseDataAccessor, multi-realm, federated ZkClient
connectionConfigBuilder.setZkRealmShardingKey(null).setRealmMode(null);
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
.setZkClientType(ZkClientType.FEDERATED).setZkAddress(null)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.assertTrue(accessor.exists(firstClusterPath, AccessOption.PERSISTENT));
Assert.assertTrue(accessor.exists(secondClusterPath, AccessOption.PERSISTENT));
accessor.close();
// Create ZkBaseDataAccessor, single-realm, dedicated ZkClient, No ZkAddress set,
// ConnectionConfig has an invalid ZK sharding key (should fail because it cannot find a valid
// ZK to connect to)
connectionConfigBuilder.setZkRealmShardingKey("/NonexistentShardingKey");
try {
accessor = zkBaseDataAccessorBuilder.setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
.setZkClientType(ZkClientType.DEDICATED).setZkAddress(null)
.setRealmAwareZkConnectionConfig(connectionConfigBuilder.build()).build();
Assert.fail("Should fail because it cannot find a valid ZK to connect to!");
} catch (NoSuchElementException e) {
// Expected because the sharding key wouldn't be found
}
}
/**
* Tests Helix Java APIs which use different MSDS endpoint configs. Java API should
* only connect to the configured MSDS but not the others. The APIs are explicitly tested are:
* - ClusterSetup
* - HelixAdmin
* - ZkUtil
* - HelixManager
* - BaseDataAccessor
* - ConfigAccessor
*/
@Test(dependsOnMethods = "testGenericBaseDataAccessorBuilder")
public void testDifferentMsdsEndpointConfigs() throws IOException, InvalidRoutingDataException {
String methodName = TestHelper.getTestMethodName();
System.out.println("Start " + methodName);
final String zkAddress = ZK_SERVER_MAP.keySet().iterator().next();
final Map<String, Collection<String>> secondRoutingData =
ImmutableMap.of(zkAddress, Collections.singletonList(formPath(CLUSTER_FOUR)));
MockMetadataStoreDirectoryServer secondMsds =
new MockMetadataStoreDirectoryServer("localhost", 11118, "multiZkTest", secondRoutingData);
final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
.setRoutingDataSourceEndpoint(secondMsds.getEndpoint()).build();
secondMsds.startServer();
try {
// Verify ClusterSetup
verifyClusterSetupMsdsEndpoint(connectionConfig);
// Verify HelixAdmin
verifyHelixAdminMsdsEndpoint(connectionConfig);
// Verify ZKUtil
verifyZkUtilMsdsEndpoint();
// Verify HelixManager
verifyHelixManagerMsdsEndpoint();
// Verify BaseDataAccessor
verifyBaseDataAccessorMsdsEndpoint(connectionConfig);
// Verify ConfigAccessor
verifyConfigAccessorMsdsEndpoint(connectionConfig);
} finally {
RealmAwareZkClient zkClient = new FederatedZkClient(connectionConfig,
new RealmAwareZkClient.RealmAwareZkClientConfig());
TestHelper.dropCluster(CLUSTER_FOUR, zkClient);
zkClient.close();
secondMsds.stopServer();
}
System.out.println("End " + methodName);
}
private void verifyHelixManagerMsdsEndpoint() {
System.out.println("Start " + TestHelper.getTestMethodName());
// Mock participants are already created and started in the previous test.
// The mock participant only connects to MSDS configured in system property,
// but not the other.
final MockParticipantManager manager = MOCK_PARTICIPANTS.iterator().next();
verifyMsdsZkRealm(CLUSTER_ONE, true,
() -> manager.getZkClient().exists(formPath(manager.getClusterName())));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> manager.getZkClient().exists(formPath(CLUSTER_FOUR)));
}
private void verifyBaseDataAccessorMsdsEndpoint(
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
System.out.println("Start " + TestHelper.getTestMethodName());
// MSDS endpoint is not configured in builder, so config in system property is used.
BaseDataAccessor<ZNRecord> firstDataAccessor =
new ZkBaseDataAccessor.Builder<ZNRecord>().build();
// Create base data accessor with MSDS endpoint configured in builder.
BaseDataAccessor<ZNRecord> secondDataAccessor =
new ZkBaseDataAccessor.Builder<ZNRecord>().setRealmAwareZkConnectionConfig(connectionConfig)
.build();
String methodName = TestHelper.getTestMethodName();
String clusterOnePath = formPath(CLUSTER_ONE, methodName);
String clusterFourPath = formPath(CLUSTER_FOUR, methodName);
ZNRecord record = new ZNRecord(methodName);
try {
firstDataAccessor.create(clusterOnePath, record, AccessOption.PERSISTENT);
secondDataAccessor.create(clusterFourPath, record, AccessOption.PERSISTENT);
// Verify data accessors that they could only talk to their own configured MSDS endpoint:
// either being set in builder or system property.
Assert.assertTrue(firstDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> firstDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
Assert.assertTrue(secondDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
verifyMsdsZkRealm(CLUSTER_ONE, false,
() -> secondDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
firstDataAccessor.remove(clusterOnePath, AccessOption.PERSISTENT);
secondDataAccessor.remove(clusterFourPath, AccessOption.PERSISTENT);
Assert.assertFalse(firstDataAccessor.exists(clusterOnePath, AccessOption.PERSISTENT));
Assert.assertFalse(secondDataAccessor.exists(clusterFourPath, AccessOption.PERSISTENT));
} finally {
firstDataAccessor.close();
secondDataAccessor.close();
}
}
private void verifyClusterSetupMsdsEndpoint(
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
System.out.println("Start " + TestHelper.getTestMethodName());
ClusterSetup firstClusterSetup = new ClusterSetup.Builder().build();
ClusterSetup secondClusterSetup =
new ClusterSetup.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
try {
verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstClusterSetup.addCluster(CLUSTER_ONE, false));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> firstClusterSetup.addCluster(CLUSTER_FOUR, false));
verifyMsdsZkRealm(CLUSTER_FOUR, true,
() -> secondClusterSetup.addCluster(CLUSTER_FOUR, false));
verifyMsdsZkRealm(CLUSTER_ONE, false,
() -> secondClusterSetup.addCluster(CLUSTER_ONE, false));
} finally {
firstClusterSetup.close();
secondClusterSetup.close();
}
}
private void verifyZkUtilMsdsEndpoint() {
System.out.println("Start " + TestHelper.getTestMethodName());
String dummyZkAddress = "dummyZkAddress";
// MSDS endpoint 1
verifyMsdsZkRealm(CLUSTER_ONE, true,
() -> ZKUtil.getChildren(dummyZkAddress, formPath(CLUSTER_ONE)));
// Verify MSDS endpoint 2 is not used by this ZKUtil.
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> ZKUtil.getChildren(dummyZkAddress, formPath(CLUSTER_FOUR)));
}
private void verifyHelixAdminMsdsEndpoint(
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
System.out.println("Start " + TestHelper.getTestMethodName());
HelixAdmin firstHelixAdmin = new ZKHelixAdmin.Builder().build();
HelixAdmin secondHelixAdmin =
new ZKHelixAdmin.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
try {
verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstHelixAdmin.enableCluster(CLUSTER_ONE, true));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> firstHelixAdmin.enableCluster(CLUSTER_FOUR, true));
verifyMsdsZkRealm(CLUSTER_FOUR, true,
() -> secondHelixAdmin.enableCluster(CLUSTER_FOUR, true));
verifyMsdsZkRealm(CLUSTER_ONE, false,
() -> secondHelixAdmin.enableCluster(CLUSTER_ONE, true));
} finally {
firstHelixAdmin.close();
secondHelixAdmin.close();
}
}
private void verifyConfigAccessorMsdsEndpoint(
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
System.out.println("Start " + TestHelper.getTestMethodName());
ConfigAccessor firstConfigAccessor = new ConfigAccessor.Builder().build();
ConfigAccessor secondConfigAccessor =
new ConfigAccessor.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
try {
verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstConfigAccessor.getClusterConfig(CLUSTER_ONE));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> firstConfigAccessor.getClusterConfig(CLUSTER_FOUR));
verifyMsdsZkRealm(CLUSTER_FOUR, true,
() -> secondConfigAccessor.getClusterConfig(CLUSTER_FOUR));
verifyMsdsZkRealm(CLUSTER_ONE, false,
() -> secondConfigAccessor.getClusterConfig(CLUSTER_ONE));
} finally {
firstConfigAccessor.close();
secondConfigAccessor.close();
}
}
private interface Operation {
void run();
}
private void verifyMsdsZkRealm(String cluster, boolean shouldSucceed, Operation operation) {
try {
operation.run();
if (!shouldSucceed) {
Assert.fail("Should not connect to the MSDS that has /" + cluster);
}
} catch (NoSuchElementException e) {
if (shouldSucceed) {
Assert.fail("ZK Realm should be found for /" + cluster);
} else {
Assert.assertTrue(e.getMessage()
.startsWith("No sharding key found within the provided path. Path: /" + cluster));
}
} catch (IllegalArgumentException e) {
if (shouldSucceed) {
Assert.fail(formPath(cluster) + " should be a valid sharding key.");
} else {
String messageOne = "Given path: /" + cluster + " does not have a "
+ "valid sharding key or its ZK sharding key is not found in the cached routing data";
String messageTwo = "Given path: /" + cluster + "'s ZK sharding key: /" + cluster
+ " does not match the ZK sharding key";
Assert.assertTrue(
e.getMessage().startsWith(messageOne) || e.getMessage().startsWith(messageTwo));
}
} catch (HelixException e) {
// NoSuchElementException: "ZK Realm not found!" is swallowed in ZKUtil.isClusterSetup()
// Instead, HelixException is thrown.
if (shouldSucceed) {
Assert.fail("Cluster: " + cluster + " should have been setup.");
} else {
Assert.assertEquals("fail to get config. cluster: " + cluster + " is NOT setup.",
e.getMessage());
}
}
}
private String formPath(String... pathNames) {
StringBuilder sb = new StringBuilder();
for (String name : pathNames) {
sb.append('/').append(name);
}
return sb.toString();
}
/**
* Testing using ZK as the routing data source. We use BaseDataAccessor as the representative
* Helix API.
* Two modes are tested: ZK and HTTP-ZK fallback
*/
@Test(dependsOnMethods = "testDifferentMsdsEndpointConfigs")
public void testZkRoutingDataSourceConfigs() {
// Set up routing data in ZK by connecting directly to ZK
BaseDataAccessor<ZNRecord> accessor =
new ZkBaseDataAccessor.Builder<ZNRecord>().setZkAddress(ZK_PREFIX + ZK_START_PORT).build();
// Create ZK realm routing data ZNRecord
_rawRoutingData.forEach((realm, keys) -> {
ZNRecord znRecord = new ZNRecord(realm);
znRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY,
new ArrayList<>(keys));
accessor.set(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + realm, znRecord,
AccessOption.PERSISTENT);
});
// Create connection configs with the source type set to each type
final RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigZk =
connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.ZK.name())
.setRoutingDataSourceEndpoint(ZK_PREFIX + ZK_START_PORT).build();
final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttpZkFallback =
connectionConfigBuilder
.setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name())
.setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT).build();
final RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfigHttp =
connectionConfigBuilder.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name())
.setRoutingDataSourceEndpoint(_msdsEndpoint).build();
// Reset cached routing data
RoutingDataManager.getInstance().reset();
// Shutdown MSDS to ensure that these accessors are able to pull routing data from ZK
_msds.stopServer();
// Create a BaseDataAccessor instance with the connection config
BaseDataAccessor<ZNRecord> zkBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
.setRealmAwareZkConnectionConfig(connectionConfigZk).build();
BaseDataAccessor<ZNRecord> httpZkFallbackBasedAccessor =
new ZkBaseDataAccessor.Builder<ZNRecord>()
.setRealmAwareZkConnectionConfig(connectionConfigHttpZkFallback).build();
try {
BaseDataAccessor<ZNRecord> httpBasedAccessor = new ZkBaseDataAccessor.Builder<ZNRecord>()
.setRealmAwareZkConnectionConfig(connectionConfigHttp).build();
Assert.fail("Must fail with a MultiZkException because HTTP connection will be refused.");
} catch (MultiZkException e) {
// Okay
}
// Check that all clusters appear as existing to this accessor
CLUSTER_LIST.forEach(cluster -> {
Assert.assertTrue(zkBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
Assert.assertTrue(httpZkFallbackBasedAccessor.exists("/" + cluster, AccessOption.PERSISTENT));
});
// Close all connections
accessor.close();
zkBasedAccessor.close();
httpZkFallbackBasedAccessor.close();
}
}