blob: d5506793ab6c3ee281daa7d45760bf1eb763e5f7 [file] [log] [blame]
package org.apache.helix.messaging.handling;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.mock.participant.DummyProcess;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestResourceThreadpoolSize extends ZkStandAloneCMTestBase {
public static final String TEST_FACTORY = "TestFactory";
public static final String ONLINE_OFFLINE = "OnlineOffline";
public static final String OFFLINE_TO_SLAVE = "OFFLINE.SLAVE";
public static final String SLAVE_TO_MASTER = "SLAVE.MASTER";
@Test
public void TestThreadPoolSizeConfig() {
String resourceName = "NextDB";
int numPartition = 64;
int numReplica = 3;
int threadPoolSize = 12;
setResourceThreadPoolSize(resourceName, threadPoolSize);
_gSetupTool.addResourceToCluster(CLUSTER_NAME, resourceName, numPartition, STATE_MODEL);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, resourceName, numReplica);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
long taskcount = 0;
for (int i = 0; i < NODE_NR; i++) {
DefaultMessagingService svc =
(DefaultMessagingService) (_participants[i].getMessagingService());
HelixTaskExecutor helixExecutor = svc.getExecutor();
ThreadPoolExecutor executor =
(ThreadPoolExecutor) (helixExecutor._executorMap.get(MessageType.STATE_TRANSITION + "."
+ resourceName));
Assert.assertNotNull(executor);
Assert.assertEquals(threadPoolSize, executor.getMaximumPoolSize());
taskcount += executor.getCompletedTaskCount();
Assert.assertTrue(executor.getCompletedTaskCount() > 0);
}
// (numPartition * numReplica) O->S, numPartition S->M
// Plus possible racing condition: when preference list is [n1, n2, n3],
// but n2 or n3 becomes Slave before n1 and captured by controller, i.e. [n1:O, n2:S, n3:O],
// controller will set n2 to Master first and then change it back to n1
Assert.assertTrue(taskcount >= numPartition * (numReplica + 1));
}
@Test (dependsOnMethods = "TestThreadPoolSizeConfig")
public void TestCustomizedResourceThreadPool() {
int customizedPoolSize = 7;
int configuredPoolSize = 9;
for (MockParticipantManager participant : _participants) {
participant.getStateMachineEngine().registerStateModelFactory(ONLINE_OFFLINE,
new TestOnlineOfflineStateModelFactory(customizedPoolSize, 0), TEST_FACTORY);
}
// add db with default thread pool
_gSetupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "1", 64,
STATE_MODEL);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "1", 3);
// add db with customized thread pool
IdealState idealState = new FullAutoModeISBuilder(WorkflowGenerator.DEFAULT_TGT_DB + "2")
.setStateModel(ONLINE_OFFLINE).setStateModelFactoryName(TEST_FACTORY).setNumPartitions(10)
.setNumReplica(1).build();
_gSetupTool.getClusterManagementTool()
.addResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "2", idealState);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "2", 1);
// add db with configured pool size
idealState = new FullAutoModeISBuilder(WorkflowGenerator.DEFAULT_TGT_DB + "3")
.setStateModel(ONLINE_OFFLINE).setStateModelFactoryName(TEST_FACTORY).setNumPartitions(10)
.setNumReplica(1).build();
_gSetupTool.getClusterManagementTool()
.addResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "3", idealState);
setResourceThreadPoolSize(WorkflowGenerator.DEFAULT_TGT_DB + "3", configuredPoolSize);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "3", 1);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
for (int i = 0; i < NODE_NR; i++) {
DefaultMessagingService svc =
(DefaultMessagingService) (_participants[i].getMessagingService());
HelixTaskExecutor helixExecutor = svc.getExecutor();
ThreadPoolExecutor executor = (ThreadPoolExecutor) (helixExecutor._executorMap
.get(MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "1"));
Assert.assertNull(executor);
executor = (ThreadPoolExecutor) (helixExecutor._executorMap
.get(MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "2"));
Assert.assertNotNull(executor);
Assert.assertEquals(customizedPoolSize, executor.getMaximumPoolSize());
executor = (ThreadPoolExecutor) (helixExecutor._executorMap
.get(MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "3"));
Assert.assertNotNull(executor);
Assert.assertEquals(configuredPoolSize, executor.getMaximumPoolSize());
}
}
@Test (dependsOnMethods = "TestCustomizedResourceThreadPool")
public void TestPerStateTransitionTypeThreadPool() throws InterruptedException {
String MASTER_SLAVE = "MasterSlave";
int customizedPoolSize = 22;
for (MockParticipantManager participant : _participants) {
participant.getStateMachineEngine().registerStateModelFactory(MASTER_SLAVE,
new TestMasterSlaveStateModelFactory(customizedPoolSize), TEST_FACTORY);
}
// add db with customized thread pool
IdealState idealState = new FullAutoModeISBuilder(WorkflowGenerator.DEFAULT_TGT_DB + "4")
.setStateModel(MASTER_SLAVE).setStateModelFactoryName(TEST_FACTORY).setNumPartitions(10)
.setNumReplica(1).build();
_gSetupTool.getClusterManagementTool()
.addResource(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "4", idealState);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB + "4", 1);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// Verify OFFLINE -> SLAVE and SLAVE -> MASTER have different threadpool size.
for (int i = 0; i < NODE_NR; i++) {
DefaultMessagingService svc =
(DefaultMessagingService) (_participants[i].getMessagingService());
HelixTaskExecutor helixExecutor = svc.getExecutor();
ThreadPoolExecutor executorOfflineToSlave = (ThreadPoolExecutor) (helixExecutor._executorMap
.get(MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "4" + "."
+ OFFLINE_TO_SLAVE));
Assert.assertNotNull(executorOfflineToSlave);
Assert.assertEquals(customizedPoolSize, executorOfflineToSlave.getMaximumPoolSize());
ThreadPoolExecutor executorSlaveToMaster = (ThreadPoolExecutor) (helixExecutor._executorMap
.get(MessageType.STATE_TRANSITION + "." + WorkflowGenerator.DEFAULT_TGT_DB + "4" + "."
+ SLAVE_TO_MASTER));
Assert.assertNotNull(executorSlaveToMaster);
Assert.assertEquals(customizedPoolSize + 5, executorSlaveToMaster.getMaximumPoolSize());
}
}
@Test (dependsOnMethods = "TestPerStateTransitionTypeThreadPool")
public void testBatchMessageThreadPoolSize() throws InterruptedException {
int customizedPoolSize = 5;
_participants[0].getStateMachineEngine().registerStateModelFactory("OnlineOffline",
new TestOnlineOfflineStateModelFactory(customizedPoolSize, 2000), "TestFactory");
for (int i = 1; i < _participants.length; i++) {
_participants[i].syncStop();
}
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// Add 10 dbs with batch message enabled. Each db has 10 partitions.
// So it will have 10 batch messages and each batch message has 10 sub messages.
int numberOfDbs = 10;
for (int i = 0; i < numberOfDbs; i++) {
String dbName = "TestDBABatch" + i;
IdealState idealState = new FullAutoModeISBuilder(dbName).setStateModel("OnlineOffline")
.setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build();
idealState.setBatchMessageMode(true);
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, idealState);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1);
}
Assert.assertTrue(_clusterVerifier.verifyByPolling());
DefaultMessagingService svc =
(DefaultMessagingService) (_participants[0].getMessagingService());
HelixTaskExecutor helixExecutor = svc.getExecutor();
ThreadPoolExecutor executor = (ThreadPoolExecutor) (helixExecutor._batchMessageExecutorService);
Assert.assertNotNull(executor);
Assert.assertTrue(executor.getPoolSize() >= numberOfDbs);
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME)
.setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(verifier.verifyByPolling());
}
private void setResourceThreadPoolSize(String resourceName, int threadPoolSize) {
HelixManager manager = _participants[0];
ConfigAccessor accessor = manager.getConfigAccessor();
HelixConfigScope scope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
.forCluster(manager.getClusterName()).forResource(resourceName).build();
accessor.set(scope, HelixTaskExecutor.MAX_THREADS, "" + threadPoolSize);
}
public static class TestOnlineOfflineStateModelFactory
extends DummyProcess.DummyOnlineOfflineStateModelFactory {
int _threadPoolSize;
ExecutorService _threadPoolExecutor;
public TestOnlineOfflineStateModelFactory(int threadPoolSize, int delay) {
super(0);
if (threadPoolSize > 0) {
_threadPoolExecutor = Executors.newFixedThreadPool(threadPoolSize);
}
}
@Override public ExecutorService getExecutorService(String resourceName) {
return _threadPoolExecutor;
}
}
public static class TestMasterSlaveStateModelFactory
extends DummyProcess.DummyMasterSlaveStateModelFactory {
int _startThreadPoolSize;
Map<String, ExecutorService> _threadPoolExecutorMap;
public TestMasterSlaveStateModelFactory(int startThreadPoolSize) {
super(0);
_startThreadPoolSize = startThreadPoolSize;
_threadPoolExecutorMap = new HashMap<String, ExecutorService>();
if (_startThreadPoolSize > 0) {
_threadPoolExecutorMap
.put(OFFLINE_TO_SLAVE, Executors.newFixedThreadPool(_startThreadPoolSize));
_threadPoolExecutorMap
.put(SLAVE_TO_MASTER, Executors.newFixedThreadPool(_startThreadPoolSize + 5));
}
}
@Override
public ExecutorService getExecutorService(String resourceName, String fromState,
String toState) {
return _threadPoolExecutorMap.get(fromState + "." + toState);
}
}
}