| package org.apache.helix.rest.server; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.logging.Level; |
| import javax.ws.rs.client.Entity; |
| import javax.ws.rs.client.WebTarget; |
| import javax.ws.rs.core.Application; |
| import javax.ws.rs.core.Response; |
| |
| import com.google.common.base.Joiner; |
| import org.I0Itec.zkclient.ZkServer; |
| import org.apache.helix.AccessOption; |
| import org.apache.helix.BaseDataAccessor; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.PropertyPathBuilder; |
| import org.apache.helix.PropertyType; |
| import org.apache.helix.TestHelper; |
| import org.apache.helix.ZNRecord; |
| import org.apache.helix.integration.manager.ClusterControllerManager; |
| import org.apache.helix.integration.manager.MockParticipantManager; |
| import org.apache.helix.integration.task.MockTask; |
| import org.apache.helix.integration.task.TaskTestUtil; |
| import org.apache.helix.manager.zk.ZNRecordSerializer; |
| import org.apache.helix.manager.zk.ZkBaseDataAccessor; |
| import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; |
| import org.apache.helix.manager.zk.client.HelixZkClient; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.participant.StateMachineEngine; |
| import org.apache.helix.rest.common.ContextPropertyKeys; |
| import org.apache.helix.rest.common.HelixRestNamespace; |
| import org.apache.helix.rest.server.auditlog.AuditLog; |
| import org.apache.helix.rest.server.auditlog.AuditLogger; |
| import org.apache.helix.rest.server.filters.AuditLogFilter; |
| import org.apache.helix.rest.server.resources.AbstractResource; |
| import org.apache.helix.store.HelixPropertyStore; |
| import org.apache.helix.store.zk.ZkHelixPropertyStore; |
| import org.apache.helix.task.JobConfig; |
| import org.apache.helix.task.JobContext; |
| import org.apache.helix.task.Task; |
| import org.apache.helix.task.TaskCallbackContext; |
| import org.apache.helix.task.TaskConstants; |
| import org.apache.helix.task.TaskDriver; |
| import org.apache.helix.task.TaskFactory; |
| import org.apache.helix.task.TaskPartitionState; |
| import org.apache.helix.task.TaskState; |
| import org.apache.helix.task.TaskStateModelFactory; |
| import org.apache.helix.task.TaskUtil; |
| import org.apache.helix.task.Workflow; |
| import org.apache.helix.task.WorkflowContext; |
| import org.apache.helix.tools.ClusterSetup; |
| import org.apache.helix.util.ZKClientPool; |
| import org.codehaus.jackson.map.ObjectMapper; |
| import org.glassfish.jersey.client.ClientConfig; |
| import org.glassfish.jersey.server.ResourceConfig; |
| import org.glassfish.jersey.test.DeploymentContext; |
| import org.glassfish.jersey.test.JerseyTestNg; |
| import org.glassfish.jersey.test.spi.TestContainer; |
| import org.glassfish.jersey.test.spi.TestContainerException; |
| import org.glassfish.jersey.test.spi.TestContainerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterSuite; |
| import org.testng.annotations.BeforeSuite; |
| |
| public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { |
| protected static final String ZK_ADDR = "localhost:2123"; |
| protected static final String WORKFLOW_PREFIX = "Workflow_"; |
| protected static final String JOB_PREFIX = "Job_"; |
| protected static int NUM_PARTITIONS = 10; |
| protected static int NUM_REPLICA = 2; |
| protected static int MIN_ACTIVE_REPLICA = 3; |
| protected static ZkServer _zkServer; |
| protected static HelixZkClient _gZkClient; |
| protected static ClusterSetup _gSetupTool; |
| protected static ConfigAccessor _configAccessor; |
| protected static BaseDataAccessor<ZNRecord> _baseAccessor; |
| protected static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); |
| protected static boolean _init = false; |
| |
| // For testing namespaced access |
| protected static ZkServer _zkServerTestNS; |
| protected static final String _zkAddrTestNS = "localhost:2124"; |
| protected static final String TEST_NAMESPACE = "test-namespace"; |
| protected static HelixZkClient _gZkClientTestNS; |
| protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS; |
| protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster"; |
| protected static final String TASK_TEST_CLUSTER = "TaskTestCluster"; |
| protected static final List<String> STOPPABLE_INSTANCES = |
| Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5"); |
| |
| protected static Set<String> _clusters; |
| protected static String _superCluster = "superCluster"; |
| protected static Map<String, Set<String>> _instancesMap = new HashMap<>(); |
| protected static Map<String, Set<String>> _liveInstancesMap = new HashMap<>(); |
| protected static Map<String, Set<String>> _resourcesMap = new HashMap<>(); |
| protected static Map<String, Map<String, Workflow>> _workflowMap = new HashMap<>(); |
| protected static List<ClusterControllerManager> _clusterControllerManagers = new ArrayList<>(); |
| protected static List<MockParticipantManager> _mockParticipantManagers = new ArrayList<>(); |
| protected static MockAuditLogger _auditLogger = new MockAuditLogger(); |
| protected static HelixRestServer _helixRestServer; |
| |
| protected static class MockAuditLogger implements AuditLogger { |
| List<AuditLog> _auditLogList = new ArrayList<>(); |
| |
| @Override |
| public void write(AuditLog auditLog) { |
| _auditLogList.add(auditLog); |
| } |
| |
| public void clearupLogs() { |
| _auditLogList.clear(); |
| } |
| |
| public List<AuditLog> getAuditLogs() { |
| return _auditLogList; |
| } |
| } |
| |
| @Override |
| protected Application configure() { |
| // start zk |
| try { |
| if (_zkServer == null) { |
| _zkServer = TestHelper.startZkServer(ZK_ADDR); |
| Assert.assertTrue(_zkServer != null); |
| ZKClientPool.reset(); |
| } |
| |
| if (_zkServerTestNS == null) { |
| _zkServerTestNS = TestHelper.startZkServer(_zkAddrTestNS); |
| Assert.assertTrue(_zkServerTestNS != null); |
| ZKClientPool.reset(); |
| } |
| } catch (Exception e) { |
| Assert.assertTrue(false, String.format("Failed to start ZK server: %s", e.toString())); |
| } |
| |
| // Configure server context |
| ResourceConfig resourceConfig = new ResourceConfig(); |
| resourceConfig.packages(AbstractResource.class.getPackage().getName()); |
| ServerContext serverContext = new ServerContext(ZK_ADDR); |
| resourceConfig.property(ContextPropertyKeys.SERVER_CONTEXT.name(), serverContext); |
| resourceConfig.register(new AuditLogFilter(Arrays.<AuditLogger>asList(new MockAuditLogger()))); |
| |
| return resourceConfig; |
| } |
| |
| @Override |
| protected TestContainerFactory getTestContainerFactory() throws TestContainerException { |
| return new TestContainerFactory() { |
| @Override |
| public TestContainer create(final URI baseUri, DeploymentContext deploymentContext) { |
| return new TestContainer() { |
| |
| @Override |
| public ClientConfig getClientConfig() { |
| return null; |
| } |
| |
| @Override |
| public URI getBaseUri() { |
| return baseUri; |
| } |
| |
| @Override |
| public void start() { |
| if (_helixRestServer == null) { |
| // Create namespace manifest map |
| List<HelixRestNamespace> namespaces = new ArrayList<>(); |
| // Add test namespace |
| namespaces.add(new HelixRestNamespace(TEST_NAMESPACE, |
| HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false)); |
| // Add default namesapce |
| namespaces.add(new HelixRestNamespace(ZK_ADDR)); |
| try { |
| _helixRestServer = |
| new HelixRestServer(namespaces, baseUri.getPort(), baseUri.getPath(), |
| Arrays.<AuditLogger>asList(_auditLogger)); |
| _helixRestServer.start(); |
| } catch (Exception ex) { |
| throw new TestContainerException(ex); |
| } |
| } |
| } |
| |
| @Override |
| public void stop() { |
| } |
| }; |
| } |
| }; |
| } |
| |
| @BeforeSuite |
| public void beforeSuite() throws Exception { |
| if (!_init) { |
| // TODO: use logging.properties file to config java.util.logging.Logger levels |
| java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger(""); |
| topJavaLogger.setLevel(Level.WARNING); |
| |
| HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); |
| |
| clientConfig.setZkSerializer(new ZNRecordSerializer()); |
| _gZkClient = DedicatedZkClientFactory |
| .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig); |
| |
| clientConfig.setZkSerializer(new ZNRecordSerializer()); |
| _gZkClientTestNS = DedicatedZkClientFactory |
| .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS), clientConfig); |
| |
| _gSetupTool = new ClusterSetup(_gZkClient); |
| _configAccessor = new ConfigAccessor(_gZkClient); |
| _baseAccessor = new ZkBaseDataAccessor<>(_gZkClient); |
| _baseAccessorTestNS = new ZkBaseDataAccessor<>(_gZkClientTestNS); |
| |
| // wait for the web service to start |
| Thread.sleep(100); |
| |
| setup(); |
| _init = true; |
| } |
| } |
| |
| @AfterSuite |
| public void afterSuite() throws Exception { |
| // tear down orphan-ed threads |
| for (ClusterControllerManager cm : _clusterControllerManagers) { |
| if (cm != null && cm.isConnected()) { |
| cm.syncStop(); |
| } |
| } |
| |
| for (MockParticipantManager mm: _mockParticipantManagers) { |
| if (mm != null && mm.isConnected()) { |
| mm.syncStop(); |
| } |
| } |
| |
| ZKClientPool.reset(); |
| if (_gZkClient != null) { |
| _gZkClient.close(); |
| _gZkClient = null; |
| } |
| |
| if (_zkServer != null) { |
| TestHelper.stopZkServer(_zkServer); |
| _zkServer = null; |
| } |
| |
| if (_gZkClientTestNS != null) { |
| _gZkClientTestNS.close(); |
| _gZkClientTestNS = null; |
| } |
| if (_zkServerTestNS != null) { |
| TestHelper.stopZkServer(_zkServerTestNS); |
| _zkServerTestNS = null; |
| } |
| |
| if (_helixRestServer != null) { |
| _helixRestServer.shutdown(); |
| _helixRestServer = null; |
| } |
| } |
| |
| protected void setup() throws Exception { |
| _clusters = createClusters(3); |
| _gSetupTool.addCluster(_superCluster, true); |
| _gSetupTool.addCluster(TASK_TEST_CLUSTER, true); |
| _clusters.add(_superCluster); |
| _clusters.add(TASK_TEST_CLUSTER); |
| for (String cluster : _clusters) { |
| Set<String> instances = createInstances(cluster, 10); |
| Set<String> liveInstances = startInstances(cluster, instances, 6); |
| ClusterConfig clusterConfig = new ClusterConfig(cluster); |
| clusterConfig.setFaultZoneType("helixZoneId"); |
| _configAccessor.setClusterConfig(cluster, clusterConfig); |
| createResourceConfigs(cluster, 8); |
| _workflowMap.put(cluster, createWorkflows(cluster, 3)); |
| Set<String> resources = createResources(cluster, 8); |
| _instancesMap.put(cluster, instances); |
| _liveInstancesMap.put(cluster, liveInstances); |
| _resourcesMap.put(cluster, resources); |
| _clusterControllerManagers.add(startController(cluster)); |
| } |
| preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES); |
| } |
| |
| protected Set<String> createInstances(String cluster, int numInstances) throws Exception { |
| Set<String> instances = new HashSet<>(); |
| for (int i = 0; i < numInstances; i++) { |
| String instanceName = cluster + "localhost_" + (12918 + i); |
| _gSetupTool.addInstanceToCluster(cluster, instanceName); |
| instances.add(instanceName); |
| } |
| return instances; |
| } |
| |
| protected Set<String> createResources(String cluster, int numResources) { |
| Set<String> resources = new HashSet<>(); |
| for (int i = 0; i < numResources; i++) { |
| String resource = cluster + "_db_" + i; |
| _gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave"); |
| IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource); |
| idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA); |
| _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState); |
| _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA); |
| resources.add(resource); |
| } |
| return resources; |
| } |
| |
| protected Set<String> createResourceConfigs(String cluster, int numResources) { |
| Set<String> resources = new HashSet<>(); |
| for (int i = 0; i < numResources; i++) { |
| String resource = cluster + "_db_" + i; |
| org.apache.helix.model.ResourceConfig resourceConfig = |
| new org.apache.helix.model.ResourceConfig.Builder(resource).setNumReplica(NUM_REPLICA) |
| .build(); |
| _configAccessor.setResourceConfig(cluster, resource, resourceConfig); |
| resources.add(resource); |
| } |
| return resources; |
| } |
| |
| protected Set<String> startInstances(String cluster, Set<String> instances, |
| int numLiveinstances) { |
| Set<String> liveInstances = new HashSet<>(); |
| int i = 0; |
| for (String instance : instances) { |
| MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, cluster, instance); |
| Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); |
| taskFactoryReg.put(MockTask.TASK_COMMAND, new TaskFactory() { |
| @Override public Task createNewTask(TaskCallbackContext context) { |
| return new MockTask(context); |
| } |
| }); |
| StateMachineEngine stateMachineEngine = participant.getStateMachineEngine(); |
| stateMachineEngine.registerStateModelFactory("Task", |
| new TaskStateModelFactory(participant, taskFactoryReg)); |
| participant.syncStart(); |
| _mockParticipantManagers.add(participant); |
| liveInstances.add(instance); |
| if (++i > numLiveinstances) { |
| break; |
| } |
| } |
| return liveInstances; |
| } |
| |
| protected ClusterControllerManager startController(String cluster) { |
| String controllerName = "controller-" + cluster; |
| ClusterControllerManager controller = |
| new ClusterControllerManager(ZK_ADDR, cluster, controllerName); |
| controller.syncStart(); |
| |
| return controller; |
| } |
| |
| protected Set<String> createClusters(int numClusters) { |
| Set<String> clusters = new HashSet<>(); |
| for (int i = 0; i < numClusters; i++) { |
| String cluster = "TestCluster_" + i; |
| _gSetupTool.addCluster(cluster, true); |
| clusters.add(cluster); |
| } |
| |
| return clusters; |
| } |
| |
| protected Map<String, Workflow> createWorkflows(String cluster, int numWorkflows) { |
| Map<String, Workflow> workflows = new HashMap<>(); |
| HelixPropertyStore<ZNRecord> propertyStore = new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor, |
| PropertyPathBuilder.propertyStore(cluster), null); |
| |
| for (int i = 0; i < numWorkflows; i++) { |
| Workflow.Builder workflow = new Workflow.Builder(WORKFLOW_PREFIX + i); |
| int j = 0; |
| for (JobConfig.Builder job : createJobs(cluster, WORKFLOW_PREFIX + i, 3)) { |
| workflow.addJob(JOB_PREFIX + j++, job); |
| } |
| workflows.put(WORKFLOW_PREFIX + i, workflow.build()); |
| WorkflowContext workflowContext = TaskTestUtil |
| .buildWorkflowContext(WORKFLOW_PREFIX + i, TaskState.IN_PROGRESS, |
| System.currentTimeMillis(), TaskState.COMPLETED, TaskState.COMPLETED, |
| TaskState.IN_PROGRESS); |
| _baseAccessor.set(String.format("/%s/%s%s/%s/%s", cluster, PropertyType.PROPERTYSTORE.name(), |
| TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, TaskConstants.CONTEXT_NODE), |
| workflowContext.getRecord(), AccessOption.PERSISTENT); |
| _configAccessor.setResourceConfig(cluster, WORKFLOW_PREFIX + i, workflow.getWorkflowConfig()); |
| |
| // Add workflow user content |
| propertyStore.create(Joiner.on("/") |
| .join(TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, |
| TaskUtil.USER_CONTENT_NODE), new ZNRecord(TaskUtil.USER_CONTENT_NODE), |
| AccessOption.PERSISTENT); |
| } |
| return workflows; |
| } |
| |
| protected Set<JobConfig.Builder> createJobs(String cluster, String workflowName, int numJobs) { |
| HelixPropertyStore<ZNRecord> propertyStore = |
| new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor, |
| PropertyPathBuilder.propertyStore(cluster), null); |
| Set<JobConfig.Builder> jobCfgs = new HashSet<>(); |
| for (int i = 0; i < numJobs; i++) { |
| JobConfig.Builder job = |
| new JobConfig.Builder().setCommand("DummyCommand").setTargetResource("RESOURCE") |
| .setWorkflow(workflowName); |
| jobCfgs.add(job); |
| JobContext jobContext = TaskTestUtil |
| .buildJobContext(System.currentTimeMillis(), System.currentTimeMillis() + 1, |
| TaskPartitionState.COMPLETED); |
| _baseAccessor.set(String.format("/%s/%s%s/%s/%s", cluster, PropertyType.PROPERTYSTORE.name(), |
| TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i, |
| TaskConstants.CONTEXT_NODE), jobContext.getRecord(), AccessOption.PERSISTENT); |
| _configAccessor.setResourceConfig(cluster, workflowName + "_" + JOB_PREFIX + i, job.build()); |
| |
| // add job content stores |
| ZNRecord contentStore = new ZNRecord(TaskUtil.USER_CONTENT_NODE); |
| contentStore.setMapField(TaskUtil |
| .getNamespacedTaskName(TaskUtil.getNamespacedJobName(workflowName, JOB_PREFIX + i), "0"), |
| Collections.<String, String>emptyMap()); |
| propertyStore.create(Joiner.on("/") |
| .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i, |
| TaskUtil.USER_CONTENT_NODE), contentStore, AccessOption.PERSISTENT); |
| } |
| return jobCfgs; |
| } |
| |
| protected static ZNRecord toZNRecord(String data) throws IOException { |
| return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data); |
| } |
| |
| protected String get(String uri, Map<String, String> queryParams, int expectedReturnStatus, boolean expectBodyReturned) { |
| WebTarget webTarget = target(uri); |
| if (queryParams != null) { |
| for (Map.Entry<String, String> entry : queryParams.entrySet()) { |
| webTarget = webTarget.queryParam(entry.getKey(), entry.getValue()); |
| } |
| } |
| final Response response = webTarget.request().get(); |
| Assert.assertEquals(response.getStatus(), expectedReturnStatus); |
| |
| // NOT_FOUND and BAD_REQUEST will throw text based html |
| if (expectedReturnStatus != Response.Status.NOT_FOUND.getStatusCode() |
| && expectedReturnStatus != Response.Status.BAD_REQUEST.getStatusCode()) { |
| Assert.assertEquals(response.getMediaType().getType(), "application"); |
| } else { |
| Assert.assertEquals(response.getMediaType().getType(), "text"); |
| } |
| |
| String body = response.readEntity(String.class); |
| if (expectBodyReturned) { |
| Assert.assertNotNull(body); |
| } |
| |
| return body; |
| } |
| |
| protected void put(String uri, Map<String, String> queryParams, Entity entity, |
| int expectedReturnStatus) { |
| WebTarget webTarget = target(uri); |
| if (queryParams != null) { |
| for (Map.Entry<String, String> entry : queryParams.entrySet()) { |
| webTarget = webTarget.queryParam(entry.getKey(), entry.getValue()); |
| } |
| } |
| Response response = webTarget.request().put(entity); |
| Assert.assertEquals(response.getStatus(), expectedReturnStatus); |
| } |
| |
| protected void post(String uri, Map<String, String> queryParams, Entity entity, |
| int expectedReturnStatus) { |
| WebTarget webTarget = target(uri); |
| if (queryParams != null) { |
| for (Map.Entry<String, String> entry : queryParams.entrySet()) { |
| webTarget = webTarget.queryParam(entry.getKey(), entry.getValue()); |
| } |
| } |
| Response response = webTarget.request().post(entity); |
| Assert.assertEquals(response.getStatus(), expectedReturnStatus); |
| } |
| |
| protected void delete(String uri, int expectedReturnStatus) { |
| final Response response = target(uri).request().delete(); |
| Assert.assertEquals(response.getStatus(), expectedReturnStatus); |
| } |
| |
| protected TaskDriver getTaskDriver(String clusterName) { |
| return new TaskDriver(_gZkClient, clusterName); |
| } |
| |
| private void preSetupForParallelInstancesStoppableTest(String clusterName, List<String> instances) { |
| _gSetupTool.addCluster(clusterName, true); |
| ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); |
| clusterConfig.setFaultZoneType("helixZoneId"); |
| clusterConfig.setPersistIntermediateAssignment(true); |
| _configAccessor.setClusterConfig(clusterName, clusterConfig); |
| // Create instance configs |
| List<InstanceConfig> instanceConfigs = new ArrayList<>(); |
| for (int i = 0; i < instances.size() - 1; i++) { |
| InstanceConfig instanceConfig = new InstanceConfig(instances.get(i)); |
| instanceConfig.setDomain("helixZoneId=zone1,host=instance" + i); |
| instanceConfigs.add(instanceConfig); |
| } |
| instanceConfigs.add(new InstanceConfig(instances.get(instances.size() - 1))); |
| instanceConfigs.get(instanceConfigs.size() - 1).setDomain("helixZoneId=zone2,host=instance5"); |
| |
| instanceConfigs.get(1).setInstanceEnabled(false); |
| instanceConfigs.get(3).setInstanceEnabledForPartition("FakeResource", "FakePartition", false); |
| |
| for (InstanceConfig instanceConfig : instanceConfigs) { |
| _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig); |
| } |
| |
| // Start participant |
| startInstances(clusterName, new TreeSet<>(instances), 3); |
| createResources(clusterName, 1); |
| _clusterControllerManagers.add(startController(clusterName)); |
| |
| _clusters.add(STOPPABLE_CLUSTER); |
| _workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3)); |
| } |
| } |