blob: cf3a4bf1e73fe971f315a43e9ce14968b7ac20e7 [file] [log] [blame]
package org.apache.helix.rest.server;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Level;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.base.Joiner;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.MockTask;
import org.apache.helix.integration.task.TaskTestUtil;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.rest.common.ContextPropertyKeys;
import org.apache.helix.rest.common.HelixRestNamespace;
import org.apache.helix.rest.server.auditlog.AuditLog;
import org.apache.helix.rest.server.auditlog.AuditLogger;
import org.apache.helix.rest.server.filters.AuditLogFilter;
import org.apache.helix.rest.server.resources.AbstractResource;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.util.ZKClientPool;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.introspect.CodehausJacksonIntrospector;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.DeploymentContext;
import org.glassfish.jersey.test.JerseyTestNg;
import org.glassfish.jersey.test.spi.TestContainer;
import org.glassfish.jersey.test.spi.TestContainerException;
import org.glassfish.jersey.test.spi.TestContainerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest {
/**
* Constants for multi-ZK environment.
*/
private static final String MULTI_ZK_PROPERTY_KEY = "multiZk";
private static final String NUM_ZK_PROPERTY_KEY = "numZk";
protected static final String ZK_PREFIX = "localhost:";
protected static final int ZK_START_PORT = 2123;
// The following map must be a static map because it needs to be shared throughout tests
protected static final Map<String, ZkServer> ZK_SERVER_MAP = new HashMap<>();
// For a single-ZK/Helix environment
protected static final String ZK_ADDR = "localhost:2123";
protected static final String WORKFLOW_PREFIX = "Workflow_";
protected static final String JOB_PREFIX = "JOB";
protected static int NUM_PARTITIONS = 10;
protected static int NUM_REPLICA = 2;
protected static int MIN_ACTIVE_REPLICA = 3;
protected static ZkServer _zkServer;
protected static HelixZkClient _gZkClient;
protected static ClusterSetup _gSetupTool;
protected static ConfigAccessor _configAccessor;
protected static BaseDataAccessor<ZNRecord> _baseAccessor;
protected static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected static ObjectReader ZNRECORD_READER = new ObjectMapper()
.setAnnotationIntrospector(new CodehausJacksonIntrospector())
.readerFor(ZNRecord.class);
protected static boolean _init = false;
// For testing namespaced access
protected static ZkServer _zkServerTestNS;
protected static final String _zkAddrTestNS = "localhost:2124";
protected static final String TEST_NAMESPACE = "test-namespace";
protected static HelixZkClient _gZkClientTestNS;
protected static BaseDataAccessor<ZNRecord> _baseAccessorTestNS;
protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster";
protected static final String TASK_TEST_CLUSTER = "TaskTestCluster";
protected static final List<String> STOPPABLE_INSTANCES =
Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5");
protected static Set<String> _clusters;
protected static String _superCluster = "superCluster";
protected static Map<String, Set<String>> _instancesMap = new HashMap<>();
protected static Map<String, Set<String>> _liveInstancesMap = new HashMap<>();
protected static Map<String, Set<String>> _resourcesMap = new HashMap<>();
protected static Map<String, Map<String, Workflow>> _workflowMap = new HashMap<>();
protected static List<ClusterControllerManager> _clusterControllerManagers = new ArrayList<>();
protected static List<MockParticipantManager> _mockParticipantManagers = new ArrayList<>();
protected static MockAuditLogger _auditLogger = new MockAuditLogger();
protected static HelixRestServer _helixRestServer;
protected static class MockAuditLogger implements AuditLogger {
List<AuditLog> _auditLogList = new ArrayList<>();
@Override
public void write(AuditLog auditLog) {
_auditLogList.add(auditLog);
}
public void clearupLogs() {
_auditLogList.clear();
}
public List<AuditLog> getAuditLogs() {
return _auditLogList;
}
}
@Override
protected Application configure() {
// Configure server context
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.packages(AbstractResource.class.getPackage().getName());
ServerContext serverContext = new ServerContext(ZK_ADDR);
resourceConfig.property(ContextPropertyKeys.SERVER_CONTEXT.name(), serverContext);
resourceConfig.register(new AuditLogFilter(Collections.singletonList(new MockAuditLogger())));
return resourceConfig;
}
@Override
protected TestContainerFactory getTestContainerFactory()
throws TestContainerException {
return new TestContainerFactory() {
@Override
public TestContainer create(final URI baseUri, DeploymentContext deploymentContext) {
return new TestContainer() {
@Override
public ClientConfig getClientConfig() {
return null;
}
@Override
public URI getBaseUri() {
return baseUri;
}
@Override
public void start() {
if (_helixRestServer == null) {
_helixRestServer = startRestServer();
}
}
@Override
public void stop() {
}
};
}
};
}
@BeforeSuite
public void beforeSuite()
throws Exception {
if (!_init) {
setupZooKeepers();
// TODO: use logging.properties file to config java.util.logging.Logger levels
java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
topJavaLogger.setLevel(Level.WARNING);
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new ZNRecordSerializer());
_gZkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
clientConfig.setZkSerializer(new ZNRecordSerializer());
_gZkClientTestNS = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddrTestNS), clientConfig);
_gSetupTool = new ClusterSetup(_gZkClient);
_configAccessor = new ConfigAccessor(_gZkClient);
_baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
_baseAccessorTestNS = new ZkBaseDataAccessor<>(_gZkClientTestNS);
// wait for the web service to start
Thread.sleep(100);
setupHelixResources();
_init = true;
}
}
@AfterSuite
public void afterSuite()
throws Exception {
// tear down orphan-ed threads
for (ClusterControllerManager cm : _clusterControllerManagers) {
if (cm != null && cm.isConnected()) {
cm.syncStop();
}
}
for (MockParticipantManager mm : _mockParticipantManagers) {
if (mm != null && mm.isConnected()) {
mm.syncStop();
}
}
ZKClientPool.reset();
if (_gZkClient != null) {
_gZkClient.close();
_gZkClient = null;
}
if (_gZkClientTestNS != null) {
_gZkClientTestNS.close();
_gZkClientTestNS = null;
}
if (_helixRestServer != null) {
_helixRestServer.shutdown();
_helixRestServer = null;
}
// Stop all ZkServers
ZK_SERVER_MAP.forEach((zkAddr, zkServer) -> TestHelper.stopZkServer(zkServer));
}
private void setupZooKeepers() {
// start zk
try {
if (_zkServer == null) {
_zkServer = TestHelper.startZkServer(ZK_ADDR);
Assert.assertNotNull(_zkServer);
ZK_SERVER_MAP.put(ZK_ADDR, _zkServer);
ZKClientPool.reset();
}
if (_zkServerTestNS == null) {
_zkServerTestNS = TestHelper.startZkServer(_zkAddrTestNS);
Assert.assertNotNull(_zkServerTestNS);
ZK_SERVER_MAP.put(_zkAddrTestNS, _zkServerTestNS);
ZKClientPool.reset();
}
} catch (Exception e) {
Assert.fail(String.format("Failed to start ZK servers: %s", e.toString()));
}
// Start additional ZKs in a multi-ZK setup if applicable
String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY);
if (multiZkConfig != null && multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) {
String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY);
if (numZkFromConfig != null) {
try {
int numZkFromConfigInt = Integer.parseInt(numZkFromConfig);
// Start (numZkFromConfigInt - 2) ZooKeepers
for (int i = 2; i < numZkFromConfigInt; i++) {
String zkAddr = ZK_PREFIX + (ZK_START_PORT + i);
ZkServer zkServer = TestHelper.startZkServer(zkAddr);
Assert.assertNotNull(zkServer);
ZK_SERVER_MAP.put(zkAddr, zkServer);
}
} catch (Exception e) {
Assert.fail("Failed to create multiple ZooKeepers!");
}
}
}
}
protected void setupHelixResources() {
_clusters = createClusters(4);
_gSetupTool.addCluster(_superCluster, true);
_gSetupTool.addCluster(TASK_TEST_CLUSTER, true);
_clusters.add(_superCluster);
_clusters.add(TASK_TEST_CLUSTER);
for (String cluster : _clusters) {
Set<String> instances = createInstances(cluster, 10);
Set<String> liveInstances = startInstances(cluster, instances, 6);
ClusterConfig clusterConfig = new ClusterConfig(cluster);
clusterConfig.setFaultZoneType("helixZoneId");
_configAccessor.setClusterConfig(cluster, clusterConfig);
createResourceConfigs(cluster, 8);
_workflowMap.put(cluster, createWorkflows(cluster, 3));
Set<String> resources = createResources(cluster, 8);
_instancesMap.put(cluster, instances);
_liveInstancesMap.put(cluster, liveInstances);
_resourcesMap.put(cluster, resources);
_clusterControllerManagers.add(startController(cluster));
}
preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES);
}
protected Set<String> createInstances(String cluster, int numInstances) {
Set<String> instances = new HashSet<>();
for (int i = 0; i < numInstances; i++) {
String instanceName = cluster + "localhost_" + (12918 + i);
_gSetupTool.addInstanceToCluster(cluster, instanceName);
instances.add(instanceName);
}
return instances;
}
protected Set<String> createResources(String cluster, int numResources) {
Set<String> resources = new HashSet<>();
for (int i = 0; i < numResources; i++) {
String resource = cluster + "_db_" + i;
_gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave");
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource);
idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA);
_gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState);
_gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA);
resources.add(resource);
}
return resources;
}
protected Set<String> createResourceConfigs(String cluster, int numResources) {
Set<String> resources = new HashSet<>();
for (int i = 0; i < numResources; i++) {
String resource = cluster + "_db_" + i;
org.apache.helix.model.ResourceConfig resourceConfig =
new org.apache.helix.model.ResourceConfig.Builder(resource).setNumReplica(NUM_REPLICA)
.build();
_configAccessor.setResourceConfig(cluster, resource, resourceConfig);
resources.add(resource);
}
return resources;
}
protected Set<String> startInstances(String cluster, Set<String> instances,
int numLiveinstances) {
Set<String> liveInstances = new HashSet<>();
int i = 0;
for (String instance : instances) {
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, cluster, instance);
Map<String, TaskFactory> taskFactoryReg = new HashMap<>();
taskFactoryReg.put(MockTask.TASK_COMMAND, MockTask::new);
StateMachineEngine stateMachineEngine = participant.getStateMachineEngine();
stateMachineEngine.registerStateModelFactory("Task",
new TaskStateModelFactory(participant, taskFactoryReg));
participant.syncStart();
_mockParticipantManagers.add(participant);
liveInstances.add(instance);
if (++i > numLiveinstances) {
break;
}
}
return liveInstances;
}
protected ClusterControllerManager startController(String cluster) {
String controllerName = "controller-" + cluster;
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, cluster, controllerName);
controller.syncStart();
return controller;
}
protected Set<String> createClusters(int numClusters) {
Set<String> clusters = new HashSet<>();
for (int i = 0; i < numClusters; i++) {
String cluster = "TestCluster_" + i;
_gSetupTool.addCluster(cluster, true);
clusters.add(cluster);
}
return clusters;
}
protected Map<String, Workflow> createWorkflows(String cluster, int numWorkflows) {
Map<String, Workflow> workflows = new HashMap<>();
HelixPropertyStore<ZNRecord> propertyStore =
new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor,
PropertyPathBuilder.propertyStore(cluster), null);
for (int i = 0; i < numWorkflows; i++) {
Workflow.Builder workflow = new Workflow.Builder(WORKFLOW_PREFIX + i);
int j = 0;
for (JobConfig.Builder job : createJobs(cluster, WORKFLOW_PREFIX + i, 3)) {
workflow.addJob(JOB_PREFIX + j++, job);
}
workflows.put(WORKFLOW_PREFIX + i, workflow.build());
WorkflowContext workflowContext = TaskTestUtil
.buildWorkflowContext(WORKFLOW_PREFIX + i, TaskState.FAILED,
System.currentTimeMillis(), TaskState.COMPLETED, TaskState.COMPLETED,
TaskState.FAILED);
_baseAccessor.set(String.format("/%s/%s%s/%s/%s", cluster, PropertyType.PROPERTYSTORE.name(),
TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i, TaskConstants.CONTEXT_NODE),
workflowContext.getRecord(), AccessOption.PERSISTENT);
_configAccessor.setResourceConfig(cluster, WORKFLOW_PREFIX + i, workflow.getWorkflowConfig());
// Add workflow user content
propertyStore.create(Joiner.on("/")
.join(TaskConstants.REBALANCER_CONTEXT_ROOT, WORKFLOW_PREFIX + i,
TaskUtil.USER_CONTENT_NODE), new ZNRecord(TaskUtil.USER_CONTENT_NODE),
AccessOption.PERSISTENT);
}
return workflows;
}
protected Set<JobConfig.Builder> createJobs(String cluster, String workflowName, int numJobs) {
HelixPropertyStore<ZNRecord> propertyStore =
new ZkHelixPropertyStore<>((ZkBaseDataAccessor<ZNRecord>) _baseAccessor,
PropertyPathBuilder.propertyStore(cluster), null);
Set<JobConfig.Builder> jobCfgs = new HashSet<>();
for (int i = 0; i < numJobs; i++) {
JobConfig.Builder job =
new JobConfig.Builder().setCommand("DummyCommand").setTargetResource("RESOURCE")
.setWorkflow(workflowName).setJobId(workflowName + "_" + JOB_PREFIX + i);
jobCfgs.add(job);
JobContext jobContext = TaskTestUtil
.buildJobContext(System.currentTimeMillis(), System.currentTimeMillis() + 1,
TaskPartitionState.COMPLETED);
_baseAccessor.set(String.format("/%s/%s%s/%s/%s", cluster, PropertyType.PROPERTYSTORE.name(),
TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i,
TaskConstants.CONTEXT_NODE), jobContext.getRecord(), AccessOption.PERSISTENT);
_configAccessor.setResourceConfig(cluster, workflowName + "_" + JOB_PREFIX + i, job.build());
// add job content stores
ZNRecord contentStore = new ZNRecord(TaskUtil.USER_CONTENT_NODE);
contentStore.setMapField(TaskUtil
.getNamespacedTaskName(TaskUtil.getNamespacedJobName(workflowName, JOB_PREFIX + i), "0"),
Collections.<String, String>emptyMap());
propertyStore.create(Joiner.on("/")
.join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowName + "_" + JOB_PREFIX + i,
TaskUtil.USER_CONTENT_NODE), contentStore, AccessOption.PERSISTENT);
}
return jobCfgs;
}
protected static ZNRecord toZNRecord(String data)
throws IOException {
return ZNRECORD_READER.readValue(data);
}
protected String get(String uri, Map<String, String> queryParams, int expectedReturnStatus,
boolean expectBodyReturned) {
WebTarget webTarget = target(uri);
if (queryParams != null) {
for (Map.Entry<String, String> entry : queryParams.entrySet()) {
webTarget = webTarget.queryParam(entry.getKey(), entry.getValue());
}
}
final Response response = webTarget.request().get();
Assert.assertEquals(response.getStatus(), expectedReturnStatus);
// NOT_FOUND and BAD_REQUEST will throw text based html
if (expectedReturnStatus != Response.Status.NOT_FOUND.getStatusCode()
&& expectedReturnStatus != Response.Status.BAD_REQUEST.getStatusCode()) {
Assert.assertEquals(response.getMediaType().getType(), "application");
} else {
Assert.assertEquals(response.getMediaType().getType(), "text");
}
String body = response.readEntity(String.class);
if (expectBodyReturned) {
Assert.assertNotNull(body);
}
return body;
}
protected void put(String uri, Map<String, String> queryParams, Entity entity,
int expectedReturnStatus) {
WebTarget webTarget = target(uri);
if (queryParams != null) {
for (Map.Entry<String, String> entry : queryParams.entrySet()) {
webTarget = webTarget.queryParam(entry.getKey(), entry.getValue());
}
}
Response response = webTarget.request().put(entity);
Assert.assertEquals(response.getStatus(), expectedReturnStatus);
}
protected void post(String uri, Map<String, String> queryParams, Entity entity,
int expectedReturnStatus) {
post(uri, queryParams, entity,expectedReturnStatus, false);
}
protected String post(String uri, Map<String, String> queryParams, Entity entity,
int expectedReturnStatus, boolean expectBodyReturned) {
WebTarget webTarget = target(uri);
if (queryParams != null) {
for (Map.Entry<String, String> entry : queryParams.entrySet()) {
webTarget = webTarget.queryParam(entry.getKey(), entry.getValue());
}
}
Response response = webTarget.request().post(entity);
String result = response.readEntity(String.class);
Assert.assertEquals(response.getStatus(), expectedReturnStatus);
return result;
}
protected void delete(String uri, int expectedReturnStatus) {
final Response response = target(uri).request().delete();
Assert.assertEquals(response.getStatus(), expectedReturnStatus);
}
protected TaskDriver getTaskDriver(String clusterName) {
return new TaskDriver(_gZkClient, clusterName);
}
private void preSetupForParallelInstancesStoppableTest(String clusterName,
List<String> instances) {
_gSetupTool.addCluster(clusterName, true);
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
clusterConfig.setFaultZoneType("helixZoneId");
clusterConfig.setPersistIntermediateAssignment(true);
_configAccessor.setClusterConfig(clusterName, clusterConfig);
// Create instance configs
List<InstanceConfig> instanceConfigs = new ArrayList<>();
for (int i = 0; i < instances.size() - 1; i++) {
InstanceConfig instanceConfig = new InstanceConfig(instances.get(i));
instanceConfig.setDomain("helixZoneId=zone1,host=instance" + i);
instanceConfigs.add(instanceConfig);
}
instanceConfigs.add(new InstanceConfig(instances.get(instances.size() - 1)));
instanceConfigs.get(instanceConfigs.size() - 1).setDomain("helixZoneId=zone2,host=instance5");
instanceConfigs.get(1).setInstanceEnabled(false);
instanceConfigs.get(3).setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
for (InstanceConfig instanceConfig : instanceConfigs) {
_gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig);
}
// Start participant
startInstances(clusterName, new TreeSet<>(instances), 3);
createResources(clusterName, 1);
_clusterControllerManagers.add(startController(clusterName));
_clusters.add(STOPPABLE_CLUSTER);
_workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3));
}
/**
* Starts a HelixRestServer for the test suite.
* @return
*/
protected HelixRestServer startRestServer() {
// Create namespace manifest map
List<HelixRestNamespace> namespaces = new ArrayList<>();
// Add test namespace
namespaces.add(new HelixRestNamespace(TEST_NAMESPACE,
HelixRestNamespace.HelixMetadataStoreType.ZOOKEEPER, _zkAddrTestNS, false));
// Add default namesapce
namespaces.add(new HelixRestNamespace(ZK_ADDR));
HelixRestServer server;
try {
server =
new HelixRestServer(namespaces, getBaseUri().getPort(), getBaseUri().getPath(),
Collections.singletonList(_auditLogger));
server.start();
} catch (Exception ex) {
throw new TestContainerException(ex);
}
return server;
}
}