blob: dca3d524ae6a5a71264d8d09ae976a099bc8a863 [file] [log] [blame]
package org.apache.helix.integration.multizk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import com.google.common.collect.ImmutableList;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
import org.apache.helix.ThreadLeakageChecker;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.MockTask;
import org.apache.helix.manager.zk.HelixManagerStateListener;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Some Helix user do not have multizk routing configs in jvm system config but pass
* in through RealAwareZkConnectionConfig.
* This test class will not set jvm routing zk config and test Helix functionality.
* Tests were similar to TestMultiZkHelixJavaApis but without "MSDS_SERVER_ENDPOINT_KEY"
* in system property
*/
public class TestMultiZkConectionConfig {
private static Logger LOG = LoggerFactory.getLogger(TestMultiZkHelixJavaApis.class);
private static final int NUM_ZK = 3;
private static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
private static final Map<String, HelixZkClient> ZK_CLIENT_MAP = new HashMap<>();
private static final Map<String, ClusterControllerManager> MOCK_CONTROLLERS = new HashMap<>();
private static final Set<MockParticipantManager> MOCK_PARTICIPANTS = new HashSet<>();
private static final List<String> CLUSTER_LIST =
ImmutableList.of("CLUSTER_1", "CLUSTER_2", "CLUSTER_3");
// For testing different MSDS endpoint configs.
private static final String CLUSTER_ONE = CLUSTER_LIST.get(0);
private static final String CLUSTER_FOUR = "CLUSTER_4";
private MockMetadataStoreDirectoryServer _msds;
private static final Map<String, Collection<String>> _rawRoutingData = new HashMap<>();
private RealmAwareZkClient _zkClient;
private HelixAdmin _zkHelixAdmin;
// Save System property configs from before this test and pass onto after the test
private final Map<String, String> _configStore = new HashMap<>();
private static final String ZK_PREFIX = "localhost:";
private static final int ZK_START_PORT = 8777;
private String _msdsEndpoint;
@BeforeClass
public void beforeClass() throws Exception {
// Create 3 in-memory zookeepers and routing mapping
for (int i = 0; i < NUM_ZK; i++) {
String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
ZK_SERVER_MAP.put(zkAddress, TestHelper.startZkServer(zkAddress));
ZK_CLIENT_MAP.put(zkAddress, DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer())));
// One cluster per ZkServer created
_rawRoutingData.put(zkAddress, Collections.singletonList("/" + CLUSTER_LIST.get(i)));
}
// Create a Mock MSDS
final String msdsHostName = "localhost";
final int msdsPort = 11117;
final String msdsNamespace = "multiZkTest";
_msdsEndpoint =
"http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + msdsNamespace;
_msds = new MockMetadataStoreDirectoryServer(msdsHostName, msdsPort, msdsNamespace,
_rawRoutingData);
_msds.startServer();
// Save previously-set system configs
String prevMultiZkEnabled = System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
String prevMsdsServerEndpoint =
System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
if (prevMultiZkEnabled != null) {
_configStore.put(SystemPropertyKeys.MULTI_ZK_ENABLED, prevMultiZkEnabled);
}
if (prevMsdsServerEndpoint != null) {
_configStore
.put(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, prevMsdsServerEndpoint);
}
// Turn on multiZk mode in System config
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
// MSDS endpoint: http://localhost:11117/admin/v2/namespaces/multiZkTest
// We are not setting routing ZK in system property.
//System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY, _msdsEndpoint);
// Routing data may be set by other tests using the same endpoint; reset() for good measure
RoutingDataManager.getInstance().reset();
// Create a FederatedZkClient for admin work
try {
_zkClient =
new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
.setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT)
.setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name()).build(),
new RealmAwareZkClient.RealmAwareZkClientConfig());
_zkClient.setZkSerializer(new ZNRecordSerializer());
} catch (Exception ex) {
for (StackTraceElement elm : ex.getStackTrace()) {
System.out.println(elm);
}
}
System.out.println("end start");
}
@AfterClass
public void afterClass() throws Exception {
String testClassName = getClass().getSimpleName();
try {
// Kill all mock controllers and participants
MOCK_CONTROLLERS.values().forEach(ClusterControllerManager::syncStop);
MOCK_PARTICIPANTS.forEach(mockParticipantManager -> {
mockParticipantManager.syncStop();
StateMachineEngine stateMachine = mockParticipantManager.getStateMachineEngine();
if (stateMachine != null) {
StateModelFactory stateModelFactory = stateMachine.getStateModelFactory("Task");
if (stateModelFactory != null && stateModelFactory instanceof TaskStateModelFactory) {
((TaskStateModelFactory) stateModelFactory).shutdown();
}
}
});
// Tear down all clusters
CLUSTER_LIST.forEach(cluster -> TestHelper.dropCluster(cluster, _zkClient));
// Verify that all clusters are gone in each zookeeper
Assert.assertTrue(TestHelper.verify(() -> {
for (Map.Entry<String, HelixZkClient> zkClientEntry : ZK_CLIENT_MAP.entrySet()) {
List<String> children = zkClientEntry.getValue().getChildren("/");
if (children.stream().anyMatch(CLUSTER_LIST::contains)) {
return false;
}
}
return true;
}, TestHelper.WAIT_DURATION));
// Tear down zookeepers
ZK_CLIENT_MAP.forEach((zkAddress, zkClient) -> zkClient.close());
ZK_SERVER_MAP.forEach((zkAddress, zkServer) -> zkServer.shutdown());
// Stop MockMSDS
_msds.stopServer();
// Close ZK client connections
_zkHelixAdmin.close();
if (_zkClient != null && !_zkClient.isClosed()) {
_zkClient.close();
}
} finally {
// Restore System property configs
if (_configStore.containsKey(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED,
_configStore.get(SystemPropertyKeys.MULTI_ZK_ENABLED));
} else {
System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
}
if (_configStore.containsKey(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY)) {
System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
_configStore.get(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY));
} else {
System.clearProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY);
}
}
boolean status = false;
try {
status = ThreadLeakageChecker.afterClassCheck(testClassName);
} catch (Exception e) {
LOG.error("ThreadLeakageChecker exception:", e);
}
// todo: We should fail test here once we achieved 0 leakage and remove the following System print
if (!status) {
System.out.println(
"---------- Test Class " + testClassName + " thread leakage detected! ---------------");
}
}
/**
* Test cluster creation according to the pre-set routing mapping.
* Helix Java API tested is ClusterSetup in this method.
*/
@Test
public void testCreateClusters() {
// Create two ClusterSetups using two different constructors
// Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
ClusterSetup clusterSetupZkAddr = new ClusterSetup(_zkClient);
ClusterSetup clusterSetupBuilder = new ClusterSetup.Builder().setRealmAwareZkConnectionConfig(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
.setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT)
.setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name()).build())
.build();
createClusters(clusterSetupZkAddr);
verifyClusterCreation(clusterSetupZkAddr);
createClusters(clusterSetupBuilder);
verifyClusterCreation(clusterSetupBuilder);
// Create clusters again to continue with testing
createClusters(clusterSetupBuilder);
clusterSetupZkAddr.close();
clusterSetupBuilder.close();
}
private void createClusters(ClusterSetup clusterSetup) {
// Create clusters
for (String clusterName : CLUSTER_LIST) {
clusterSetup.addCluster(clusterName, false);
}
}
private void verifyClusterCreation(ClusterSetup clusterSetup) {
// Verify that clusters have been created correctly according to routing mapping
_rawRoutingData.forEach((zkAddress, cluster) -> {
// Note: clusterNamePath already contains "/"
String clusterNamePath = cluster.iterator().next();
// Check with single-realm ZkClients
Assert.assertTrue(ZK_CLIENT_MAP.get(zkAddress).exists(clusterNamePath));
// Check with realm-aware ZkClient (federated)
Assert.assertTrue(_zkClient.exists(clusterNamePath));
// Remove clusters
clusterSetup
.deleteCluster(clusterNamePath.substring(1)); // Need to remove "/" at the beginning
});
}
/**
* Test Helix Participant creation and addition.
* Helix Java APIs tested in this method are:
* ZkHelixAdmin and ZKHelixManager (mock participant/controller)
*/
@Test(dependsOnMethods = "testCreateClusters")
public void testCreateParticipants() throws Exception {
// Create two ClusterSetups using two different constructors
// Note: ZK Address here could be anything because multiZk mode is on (it will be ignored)
//HelixAdmin helixAdminZkAddr = new ZKHelixAdmin(ZK_SERVER_MAP.keySet().iterator().next());
RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfig =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
.setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT)
.setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name()).build();
HelixAdmin helixAdminBuilder =
new ZKHelixAdmin.Builder().setRealmAwareZkConnectionConfig(zkConnectionConfig).build();
_zkHelixAdmin =
new ZKHelixAdmin.Builder().setRealmAwareZkConnectionConfig(zkConnectionConfig).build();
String participantNamePrefix = "Node_";
int numParticipants = 5;
//createParticipantsAndVerify(helixAdminZkAddr, numParticipants, participantNamePrefix);
createParticipantsAndVerify(helixAdminBuilder, numParticipants, participantNamePrefix);
// Create mock controller and participants for next tests
for (String cluster : CLUSTER_LIST) {
// Start a controller
// Note: in multiZK mode, ZK Addr is ignored
RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfigCls =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
.setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT)
.setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name())
.setZkRealmShardingKey("/" + cluster).build();
HelixManagerProperty.Builder propertyBuilder = new HelixManagerProperty.Builder();
HelixManagerProperty helixManagerProperty =
propertyBuilder.setRealmAWareZkConnectionConfig(zkConnectionConfigCls).build();
ClusterControllerManager mockController =
new ClusterControllerManager(cluster, helixManagerProperty);
mockController.syncStart();
MOCK_CONTROLLERS.put(cluster, mockController);
for (int i = 0; i < numParticipants; i++) {
// Note: in multiZK mode, ZK Addr is ignored
InstanceConfig instanceConfig = new InstanceConfig(participantNamePrefix + i);
helixAdminBuilder.addInstance(cluster, instanceConfig);
MockParticipantManager mockNode =
new MockParticipantManager(cluster, participantNamePrefix + i, helixManagerProperty, 10,
null);
// Register task state model for task framework testing in later methods
Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
// Register a Task state model factory.
StateMachineEngine stateMachine = mockNode.getStateMachineEngine();
stateMachine
.registerStateModelFactory("Task", new TaskStateModelFactory(mockNode, taskFactoryReg));
mockNode.syncStart();
MOCK_PARTICIPANTS.add(mockNode);
}
// Check that mockNodes are up
Assert.assertTrue(TestHelper
.verify(() -> helixAdminBuilder.getInstancesInCluster(cluster).size() == numParticipants,
TestHelper.WAIT_DURATION));
}
//helixAdminZkAddr.close();
helixAdminBuilder.close();
}
private void createParticipantsAndVerify(HelixAdmin admin, int numParticipants,
String participantNamePrefix) {
// Create participants in clusters
Set<String> participantNames = new HashSet<>();
CLUSTER_LIST.forEach(cluster -> {
for (int i = 0; i < numParticipants; i++) {
String participantName = participantNamePrefix + i;
participantNames.add(participantName);
InstanceConfig instanceConfig = new InstanceConfig(participantNamePrefix + i);
admin.addInstance(cluster, instanceConfig);
}
});
// Verify participants have been created properly
_rawRoutingData.forEach((zkAddress, cluster) -> {
// Note: clusterNamePath already contains "/"
String clusterNamePath = cluster.iterator().next();
// Check with single-realm ZkClients
List<String> instances =
ZK_CLIENT_MAP.get(zkAddress).getChildren(clusterNamePath + "/INSTANCES");
Assert.assertEquals(new HashSet<>(instances), participantNames);
// Check with realm-aware ZkClient (federated)
instances = _zkClient.getChildren(clusterNamePath + "/INSTANCES");
Assert.assertEquals(new HashSet<>(instances), participantNames);
// Remove Participants
participantNames.forEach(participant -> {
InstanceConfig instanceConfig = new InstanceConfig(participant);
admin.dropInstance(clusterNamePath.substring(1), instanceConfig);
});
});
}
/**
* Test creation of HelixManager and makes sure it connects correctly.
*/
@Test(dependsOnMethods = "testCreateParticipants")
public void testZKHelixManager() throws Exception {
String clusterName = "CLUSTER_1";
String participantName = "HelixManager";
InstanceConfig instanceConfig = new InstanceConfig(participantName);
_zkHelixAdmin.addInstance(clusterName, instanceConfig);
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
// Try with a connection config without ZK realm sharding key set (should fail)
RealmAwareZkClient.RealmAwareZkConnectionConfig invalidZkConnectionConfig =
connectionConfigBuilder.build();
RealmAwareZkClient.RealmAwareZkConnectionConfig validZkConnectionConfig =
connectionConfigBuilder
.setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT)
.setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name())
.setZkRealmShardingKey("/" + clusterName).build();
HelixManagerProperty.Builder propertyBuilder = new HelixManagerProperty.Builder();
try {
HelixManager invalidManager = HelixManagerFactory
.getZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null,
propertyBuilder.setRealmAWareZkConnectionConfig(invalidZkConnectionConfig).build());
Assert.fail("Should see a HelixException here because the connection config doesn't have the "
+ "sharding key set!");
} catch (HelixException e) {
// Expected
}
// Connect as a participant
HelixManager managerParticipant = HelixManagerFactory
.getZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null,
propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build());
managerParticipant.connect();
// Connect as an administrator
HelixManager managerAdministrator = HelixManagerFactory
.getZKHelixManager(clusterName, participantName, InstanceType.ADMINISTRATOR, null,
propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build());
managerAdministrator.connect();
// Perform assert checks to make sure the manager can read and register itself as a participant
InstanceConfig instanceConfigRead = managerAdministrator.getClusterManagmentTool()
.getInstanceConfig(clusterName, participantName);
Assert.assertNotNull(instanceConfigRead);
Assert.assertEquals(instanceConfig.getInstanceName(), participantName);
Assert.assertNotNull(managerAdministrator.getHelixDataAccessor().getProperty(
managerAdministrator.getHelixDataAccessor().keyBuilder().liveInstance(participantName)));
// Clean up
managerParticipant.disconnect();
managerAdministrator.disconnect();
_zkHelixAdmin.dropInstance(clusterName, instanceConfig);
}
/**
* Test creation of HelixManager and makes sure it connects correctly.
*/
@Test(dependsOnMethods = "testZKHelixManager")
public void testZKHelixManagerCloudConfig() throws Exception {
String clusterName = "CLUSTER_1";
String participantName = "HelixManager";
InstanceConfig instanceConfig = new InstanceConfig(participantName);
_zkHelixAdmin.addInstance(clusterName, instanceConfig);
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
RealmAwareZkClient.RealmAwareZkConnectionConfig validZkConnectionConfig =
connectionConfigBuilder
.setRoutingDataSourceEndpoint(_msdsEndpoint + "," + ZK_PREFIX + ZK_START_PORT)
.setRoutingDataSourceType(RoutingDataReaderType.HTTP_ZK_FALLBACK.name())
.setZkRealmShardingKey("/" + clusterName).build();
HelixManagerProperty.Builder propertyBuilder = new HelixManagerProperty.Builder();
// create a dummy cloud config and pass to ManagerFactory. It should be overwrite by
// a default config because there is no CloudConfig ZNode in ZK.
CloudConfig.Builder cloudConfigBuilder = new CloudConfig.Builder();
cloudConfigBuilder.setCloudEnabled(true);
// Set to Customized so CloudInfoSources and CloudInfoProcessorName will be read from cloud config
// instead of properties
cloudConfigBuilder.setCloudProvider(CloudProvider.CUSTOMIZED);
cloudConfigBuilder.setCloudID("TestID");
List<String> infoURL = new ArrayList<String>();
infoURL.add("TestURL");
cloudConfigBuilder.setCloudInfoSources(infoURL);
cloudConfigBuilder.setCloudInfoProcessorName("TestProcessor");
CloudConfig cloudConfig = cloudConfigBuilder.build();
HelixCloudProperty oldCloudProperty = new HelixCloudProperty(cloudConfig);
HelixManagerProperty helixManagerProperty =
propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig)
.setHelixCloudProperty(oldCloudProperty).build();
// Cloud property populated with fields defined in cloud config
oldCloudProperty.populateFieldsWithCloudConfig(cloudConfig);
// Add some property fields to cloud property that are not in cloud config
Properties properties = new Properties();
oldCloudProperty.setCustomizedCloudProperties(properties);
class TestZKHelixManager extends ZKHelixManager {
public TestZKHelixManager(String clusterName, String participantName,
InstanceType instanceType, String zkAddress, HelixManagerStateListener stateListener,
HelixManagerProperty helixManagerProperty) {
super(clusterName, participantName, instanceType, zkAddress, stateListener,
helixManagerProperty);
}
public HelixManagerProperty getHelixManagerProperty() {
return _helixManagerProperty;
}
}
// Connect as a participant
TestZKHelixManager managerParticipant =
new TestZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null, null,
helixManagerProperty);
managerParticipant.connect();
HelixCloudProperty newCloudProperty =
managerParticipant.getHelixManagerProperty().getHelixCloudProperty();
// Test reading from zk cloud config overwrite property fields included in cloud config
Assert.assertFalse(newCloudProperty.getCloudEnabled());
Assert.assertNull(newCloudProperty.getCloudId());
Assert.assertNull(newCloudProperty.getCloudProvider());
// Test non-cloud config fields are not overwritten after reading cloud config from zk
Assert.assertEquals(newCloudProperty.getCustomizedCloudProperties(), properties);
Assert.assertEquals(newCloudProperty.getCloudInfoSources(), infoURL);
Assert.assertEquals(newCloudProperty.getCloudInfoProcessorName(), "TestProcessor");
// Clean up
managerParticipant.disconnect();
_zkHelixAdmin.dropInstance(clusterName, instanceConfig);
}
}