blob: 67638a69d3f5e7a5af710f2b23be4e1f44901876 [file] [log] [blame]
package org.apache.helix.integration;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.serialize.BytesPushThroughSerializer;
import org.testng.Assert;
import org.testng.annotations.Test;
/**
* Test controller, spectator and participant roles when compression is enabled.
* Compression can be enabled for a specific resource by setting enableCompression=true in the
* idealstate of the resource. Generally this is used when the number of partitions is large
*/
public class TestEnableCompression extends ZkTestBase {
private static final int ENABLE_COMPRESSION_WAIT = 20 * 60 * 1000;
private static final int ENABLE_COMPRESSION_POLL_INTERVAL = 2000;
@Test(timeOut = 10 * 10 * 1000L)
public void testEnableCompressionResource() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
MockParticipantManager[] participants = new MockParticipantManager[5];
// ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
int numNodes = 10;
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
0, // no resources, will be added later
0, // partitions per resource
numNodes, // number of nodes
0, // replicas
"OnlineOffline", false); // dont rebalance
List<String> instancesInCluster =
_gSetupTool.getClusterManagementTool().getInstancesInCluster(clusterName);
String resourceName = "TestResource";
Set<String> expectedResources = new HashSet<>();
expectedResources.add(resourceName);
CustomModeISBuilder customModeISBuilder = new CustomModeISBuilder(resourceName);
int numPartitions = 10000;
int numReplica = 3;
customModeISBuilder.setNumPartitions(numPartitions);
customModeISBuilder.setNumReplica(numReplica);
customModeISBuilder.setStateModel("OnlineOffline");
for (int p = 0; p < numPartitions; p++) {
String partitionName = resourceName + "_" + p;
customModeISBuilder.add(partitionName);
for (int r = 0; r < numReplica; r++) {
String instanceName = instancesInCluster.get((p % numNodes + r) % numNodes);
customModeISBuilder.assignInstanceAndState(partitionName, instanceName, "ONLINE");
}
}
IdealState idealstate = customModeISBuilder.build();
idealstate.getRecord().setBooleanField("enableCompression", true);
_gSetupTool.getClusterManagementTool().addResource(clusterName, resourceName, idealstate);
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new BytesPushThroughSerializer())
.setOperationRetryTimeout((long) (60 * 1000)).setConnectInitTimeout(60 * 1000);
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
controller.syncStart();
Set<String> expectedLiveInstances = new HashSet<>();
// start participants
for (int i = 0; i < 5; i++) {
String instanceName = "localhost_" + (12918 + i);
participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participants[i].syncStart();
expectedLiveInstances.add(instanceName);
}
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(clusterName).setZkClient(_gZkClient)
.setExpectLiveInstances(expectedLiveInstances).setResources(expectedResources)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
boolean reuslt = verifier.verifyByPolling(ENABLE_COMPRESSION_WAIT, ENABLE_COMPRESSION_POLL_INTERVAL);
Assert.assertTrue((reuslt));
List<String> compressedPaths = new ArrayList<>();
findCompressedZNodes(zkClient, "/" + clusterName, compressedPaths);
System.out.println("compressed paths:" + compressedPaths);
// ONLY IDEALSTATE and EXTERNAL VIEW must be compressed
Assert.assertEquals(compressedPaths.size(), 2);
String idealstatePath = PropertyPathBuilder.idealState(clusterName, resourceName);
String externalViewPath = PropertyPathBuilder.externalView(clusterName, resourceName);
Assert.assertTrue(compressedPaths.contains(idealstatePath));
Assert.assertTrue(compressedPaths.contains(externalViewPath));
// clean up
controller.syncStop();
for (int i = 0; i < 5; i++) {
participants[i].syncStop();
}
deleteCluster(clusterName);
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
private void findCompressedZNodes(HelixZkClient zkClient, String path,
List<String> compressedPaths) {
List<String> children = zkClient.getChildren(path);
if (children != null && children.size() > 0) {
for (String child : children) {
String childPath = (path.equals("/") ? "" : path) + "/" + child;
findCompressedZNodes(zkClient, childPath, compressedPaths);
}
} else {
byte[] data = zkClient.readData(path);
if (GZipCompressionUtil.isCompressed(data)) {
compressedPaths.add(path);
}
}
}
}