blob: cadead8f706054a0e3d723e24ec13ef2f099efa0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.http.client.entity.EntityBuilder;
import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.controller.BaseControllerStarter;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerStarter;
import org.apache.pinot.controller.api.access.AllowAllAccessFactory;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
import static org.testng.Assert.assertNotNull;
public class ControllerTest {
public static final String DEFAULT_TENANT = "DefaultTenant";
public static final String LOCAL_HOST = "localhost";
public static final int DEFAULT_CONTROLLER_PORT = 18998;
public static final String DEFAULT_DATA_DIR =
new File(FileUtils.getTempDirectoryPath(), "test-controller-" + System.currentTimeMillis()).getAbsolutePath();
public static final String BROKER_INSTANCE_ID_PREFIX = "Broker_localhost_";
public static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_";
public static final String MINION_INSTANCE_ID_PREFIX = "Minion_localhost_";
// Default static ControllerTest instance settings.
// NUM_BROKER_INSTANCES and NUM_SERVER_INSTANCES must be a multiple of MIN_NUM_REPLICAS.
public static final int MIN_NUM_REPLICAS = 2;
public static final int NUM_BROKER_INSTANCES = 4;
public static final int NUM_SERVER_INSTANCES = 4;
public static final int TOTAL_NUM_SERVER_INSTANCES = 2 * NUM_SERVER_INSTANCES;
public static final int TOTAL_NUM_BROKER_INSTANCES = 2 * NUM_BROKER_INSTANCES;
/**
* default static instance used to access all wrapped static instances.
*/
private static final ControllerTest DEFAULT_INSTANCE = new ControllerTest();
protected static HttpClient _httpClient = null;
protected int _controllerPort;
protected String _controllerBaseApiUrl;
protected ControllerConf _controllerConfig;
protected ControllerRequestURLBuilder _controllerRequestURLBuilder;
protected ControllerRequestClient _controllerRequestClient = null;
protected final List<HelixManager> _fakeInstanceHelixManagers = new ArrayList<>();
protected String _controllerDataDir;
protected BaseControllerStarter _controllerStarter;
protected PinotHelixResourceManager _helixResourceManager;
protected HelixManager _helixManager;
protected HelixAdmin _helixAdmin;
protected HelixDataAccessor _helixDataAccessor;
protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
/**
* Acquire the {@link ControllerTest} default instance that can be shared across different test cases.
*
* @return the default instance.
*/
public static ControllerTest getInstance() {
return DEFAULT_INSTANCE;
}
public String getHelixClusterName() {
return getClass().getSimpleName();
}
/**
* HttpClient is lazy evaluated, static object, only instantiate when first use.
*
* <p>This is because {@code ControllerTest} has HTTP utils that depends on the TLSUtils to install the security
* context first before the HttpClient can be initialized. However, because we have static usages of the HTTPClient,
* it is not possible to create normal member variable, thus the workaround.
*/
public static HttpClient getHttpClient() {
if (_httpClient == null) {
_httpClient = HttpClient.getInstance();
}
return _httpClient;
}
/**
* ControllerRequestClient is lazy evaluated, static object, only instantiate when first use.
*
* <p>This is because {@code ControllerTest} has HTTP utils that depends on the TLSUtils to install the security
* context first before the ControllerRequestClient can be initialized. However, because we have static usages of the
* ControllerRequestClient, it is not possible to create normal member variable, thus the workaround.
*/
public ControllerRequestClient getControllerRequestClient() {
if (_controllerRequestClient == null) {
_controllerRequestClient = new ControllerRequestClient(_controllerRequestURLBuilder, getHttpClient());
}
return _controllerRequestClient;
}
public void startZk() {
if (_zookeeperInstance == null) {
_zookeeperInstance = ZkStarter.startLocalZkServer();
}
}
public void startZk(int port) {
if (_zookeeperInstance == null) {
_zookeeperInstance = ZkStarter.startLocalZkServer(port);
}
}
public void stopZk() {
try {
if (_zookeeperInstance != null) {
ZkStarter.stopLocalZkServer(_zookeeperInstance);
_zookeeperInstance = null;
}
} catch (Exception e) {
// Swallow exceptions
}
}
public String getZkUrl() {
return _zookeeperInstance.getZkUrl();
}
public Map<String, Object> getDefaultControllerConfiguration() {
Map<String, Object> properties = new HashMap<>();
properties.put(ControllerConf.CONTROLLER_HOST, LOCAL_HOST);
properties.put(ControllerConf.CONTROLLER_PORT, NetUtils.findOpenPort(DEFAULT_CONTROLLER_PORT));
properties.put(ControllerConf.DATA_DIR, DEFAULT_DATA_DIR);
properties.put(ControllerConf.ZK_STR, getZkUrl());
properties.put(ControllerConf.HELIX_CLUSTER_NAME, getHelixClusterName());
// Enable groovy on the controller
properties.put(ControllerConf.DISABLE_GROOVY, false);
return properties;
}
public void startController()
throws Exception {
startController(getDefaultControllerConfiguration());
}
public void startController(Map<String, Object> properties)
throws Exception {
Preconditions.checkState(_controllerStarter == null);
_controllerConfig = new ControllerConf(properties);
String controllerScheme = "http";
if (StringUtils.isNotBlank(_controllerConfig.getControllerVipProtocol())) {
controllerScheme = _controllerConfig.getControllerVipProtocol();
}
_controllerPort = DEFAULT_CONTROLLER_PORT;
if (StringUtils.isNotBlank(_controllerConfig.getControllerPort())) {
_controllerPort = Integer.parseInt(_controllerConfig.getControllerPort());
}
_controllerBaseApiUrl = controllerScheme + "://localhost:" + _controllerPort;
_controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl);
_controllerDataDir = _controllerConfig.getDataDir();
_controllerStarter = getControllerStarter();
_controllerStarter.init(new PinotConfiguration(properties));
_controllerStarter.start();
_helixResourceManager = _controllerStarter.getHelixResourceManager();
_helixManager = _controllerStarter.getHelixControllerManager();
_helixDataAccessor = _helixManager.getHelixDataAccessor();
ConfigAccessor configAccessor = _helixManager.getConfigAccessor();
// HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode.
HelixConfigScope scope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
.build();
switch (_controllerStarter.getControllerMode()) {
case DUAL:
case PINOT_ONLY:
_helixAdmin = _helixResourceManager.getHelixAdmin();
_propertyStore = _helixResourceManager.getPropertyStore();
// TODO: Enable periodic rebalance per 10 seconds as a temporary work-around for the Helix issue:
// https://github.com/apache/helix/issues/331. Remove this after Helix fixing the issue.
configAccessor.set(scope, ClusterConfig.ClusterConfigProperty.REBALANCE_TIMER_PERIOD.name(), "10000");
break;
case HELIX_ONLY:
_helixAdmin = _helixManager.getClusterManagmentTool();
_propertyStore = _helixManager.getHelixPropertyStore();
break;
default:
break;
}
// Enable case-insensitive for test cases.
configAccessor.set(scope, Helix.ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(true));
// Set hyperloglog log2m value to 12.
configAccessor.set(scope, Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(12));
}
public void stopController() {
Preconditions.checkState(_controllerStarter != null);
_controllerStarter.stop();
_controllerStarter = null;
FileUtils.deleteQuietly(new File(_controllerDataDir));
}
public int getFakeBrokerInstanceCount() {
return _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size()
+ _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), UNTAGGED_BROKER_INSTANCE).size();
}
public void addFakeBrokerInstancesToAutoJoinHelixCluster(int numInstances, boolean isSingleTenant)
throws Exception {
for (int i = 0; i < numInstances; i++) {
addFakeBrokerInstanceToAutoJoinHelixCluster(BROKER_INSTANCE_ID_PREFIX + i, isSingleTenant);
}
}
/**
* Adds fake broker instances until total number of broker instances equals maxCount.
*/
public void addFakeBrokerInstanceToAutoJoinHelixCluster(String instanceId, boolean isSingleTenant)
throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT, getZkUrl());
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeBrokerResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
new FakeBrokerResourceOnlineOfflineStateModelFactory());
helixManager.connect();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
if (isSingleTenant) {
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, TagNameUtils.getBrokerTagForTenant(null));
} else {
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, Helix.UNTAGGED_BROKER_INSTANCE);
}
_fakeInstanceHelixManagers.add(helixManager);
}
public void addMoreFakeBrokerInstancesToAutoJoinHelixCluster(int maxCount, boolean isSingleTenant)
throws Exception {
// get current instance count
int currentCount = getFakeBrokerInstanceCount();
// Add more instances if current count is less than max instance count.
if (currentCount < maxCount) {
for (int i = currentCount; i < maxCount; i++) {
addFakeBrokerInstanceToAutoJoinHelixCluster(BROKER_INSTANCE_ID_PREFIX + i, isSingleTenant);
}
}
}
public static class FakeBrokerResourceOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
private static final String STATE_MODEL_DEF = "BrokerResourceOnlineOfflineStateModel";
private FakeBrokerResourceOnlineOfflineStateModelFactory() {
}
@Override
public StateModel createNewStateModel(String resourceName, String partitionName) {
return new FakeBrokerResourceOnlineOfflineStateModel();
}
@SuppressWarnings("unused")
@StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE")
public static class FakeBrokerResourceOnlineOfflineStateModel extends StateModel {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeBrokerResourceOnlineOfflineStateModel.class);
private FakeBrokerResourceOnlineOfflineStateModel() {
}
@Transition(from = "OFFLINE", to = "ONLINE")
public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
}
@Transition(from = "OFFLINE", to = "DROPPED")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
}
@Transition(from = "ONLINE", to = "OFFLINE")
public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
}
@Transition(from = "ONLINE", to = "DROPPED")
public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
}
@Transition(from = "ERROR", to = "OFFLINE")
public void onBecomeOfflineFromError(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOfflineFromError(): {}", message);
}
}
}
public void addFakeServerInstancesToAutoJoinHelixCluster(int numInstances, boolean isSingleTenant)
throws Exception {
addFakeServerInstancesToAutoJoinHelixCluster(numInstances, isSingleTenant, Server.DEFAULT_ADMIN_API_PORT);
}
public void addFakeServerInstancesToAutoJoinHelixCluster(int numInstances, boolean isSingleTenant, int baseAdminPort)
throws Exception {
for (int i = 0; i < numInstances; i++) {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, isSingleTenant, baseAdminPort + i);
}
}
public int getFakeServerInstanceCount() {
return _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size()
+ _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), UNTAGGED_SERVER_INSTANCE).size();
}
public void addFakeServerInstanceToAutoJoinHelixCluster(String instanceId, boolean isSingleTenant)
throws Exception {
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, isSingleTenant, Server.DEFAULT_ADMIN_API_PORT);
}
public void addFakeServerInstanceToAutoJoinHelixCluster(String instanceId, boolean isSingleTenant, int adminPort)
throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT, getZkUrl());
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeSegmentOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
new FakeSegmentOnlineOfflineStateModelFactory());
helixManager.connect();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
if (isSingleTenant) {
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, TagNameUtils.getOfflineTagForTenant(null));
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, TagNameUtils.getRealtimeTagForTenant(null));
} else {
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, Helix.UNTAGGED_SERVER_INSTANCE);
}
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
getHelixClusterName()).forParticipant(instanceId).build();
helixAdmin.setConfig(configScope,
Collections.singletonMap(Helix.Instance.ADMIN_PORT_KEY, Integer.toString(adminPort)));
_fakeInstanceHelixManagers.add(helixManager);
}
/** Add fake server instances until total number of server instances reaches maxCount */
public void addMoreFakeServerInstancesToAutoJoinHelixCluster(int maxCount, boolean isSingleTenant)
throws Exception {
addMoreFakeServerInstancesToAutoJoinHelixCluster(maxCount, isSingleTenant, DEFAULT_ADMIN_API_PORT);
}
/** Add fake server instances until total number of server instances reaches maxCount */
public void addMoreFakeServerInstancesToAutoJoinHelixCluster(int maxCount, boolean isSingleTenant, int baseAdminPort)
throws Exception {
// get current instance count
int currentCount = getFakeServerInstanceCount();
// Add more instances if current count is less than max instance count.
if (currentCount < maxCount) {
for (int i = currentCount; i < maxCount; i++) {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, isSingleTenant, baseAdminPort + i);
}
}
}
public static class FakeSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
private static final String STATE_MODEL_DEF = "SegmentOnlineOfflineStateModel";
private FakeSegmentOnlineOfflineStateModelFactory() {
}
@Override
public StateModel createNewStateModel(String resourceName, String partitionName) {
return new FakeSegmentOnlineOfflineStateModel();
}
@SuppressWarnings("unused")
@StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'CONSUMING', 'DROPPED'}", initialState = "OFFLINE")
public static class FakeSegmentOnlineOfflineStateModel extends StateModel {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSegmentOnlineOfflineStateModel.class);
private FakeSegmentOnlineOfflineStateModel() {
}
@Transition(from = "OFFLINE", to = "ONLINE")
public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
}
@Transition(from = "OFFLINE", to = "CONSUMING")
public void onBecomeConsumingFromOffline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeConsumingFromOffline(): {}", message);
}
@Transition(from = "OFFLINE", to = "DROPPED")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
}
@Transition(from = "ONLINE", to = "OFFLINE")
public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
}
@Transition(from = "ONLINE", to = "DROPPED")
public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
}
@Transition(from = "CONSUMING", to = "OFFLINE")
public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOfflineFromConsuming(): {}", message);
}
@Transition(from = "CONSUMING", to = "ONLINE")
public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOnlineFromConsuming(): {}", message);
}
@Transition(from = "CONSUMING", to = "DROPPED")
public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) {
LOGGER.debug("onBecomeDroppedFromConsuming(): {}", message);
}
@Transition(from = "ERROR", to = "OFFLINE")
public void onBecomeOfflineFromError(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOfflineFromError(): {}", message);
}
}
}
public void addFakeMinionInstancesToAutoJoinHelixCluster(int numInstances)
throws Exception {
for (int i = 0; i < numInstances; i++) {
addFakeMinionInstanceToAutoJoinHelixCluster(MINION_INSTANCE_ID_PREFIX + i);
}
}
public void addFakeMinionInstanceToAutoJoinHelixCluster(String instanceId)
throws Exception {
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(getHelixClusterName(), instanceId, InstanceType.PARTICIPANT, getZkUrl());
helixManager.getStateMachineEngine()
.registerStateModelFactory(FakeMinionResourceOnlineOfflineStateModelFactory.STATE_MODEL_DEF,
new FakeMinionResourceOnlineOfflineStateModelFactory());
helixManager.connect();
HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
helixAdmin.addInstanceTag(getHelixClusterName(), instanceId, Helix.UNTAGGED_MINION_INSTANCE);
_fakeInstanceHelixManagers.add(helixManager);
}
public static class FakeMinionResourceOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
private static final String STATE_MODEL_DEF = "MinionResourceOnlineOfflineStateModel";
private FakeMinionResourceOnlineOfflineStateModelFactory() {
}
@Override
public StateModel createNewStateModel(String resourceName, String partitionName) {
return new FakeMinionResourceOnlineOfflineStateModel();
}
@SuppressWarnings("unused")
@StateModelInfo(states = "{'OFFLINE', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE")
public static class FakeMinionResourceOnlineOfflineStateModel extends StateModel {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeMinionResourceOnlineOfflineStateModel.class);
private FakeMinionResourceOnlineOfflineStateModel() {
}
@Transition(from = "OFFLINE", to = "ONLINE")
public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOnlineFromOffline(): {}", message);
}
@Transition(from = "OFFLINE", to = "DROPPED")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeDroppedFromOffline(): {}", message);
}
@Transition(from = "ONLINE", to = "OFFLINE")
public void onBecomeOfflineFromOnline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOfflineFromOnline(): {}", message);
}
@Transition(from = "ONLINE", to = "DROPPED")
public void onBecomeDroppedFromOnline(Message message, NotificationContext context) {
LOGGER.debug("onBecomeDroppedFromOnline(): {}", message);
}
@Transition(from = "ERROR", to = "OFFLINE")
public void onBecomeOfflineFromError(Message message, NotificationContext context) {
LOGGER.debug("onBecomeOfflineFromError(): {}", message);
}
}
}
public void stopFakeInstances() {
for (HelixManager helixManager : _fakeInstanceHelixManagers) {
helixManager.disconnect();
}
_fakeInstanceHelixManagers.clear();
}
public void stopFakeInstance(String instanceId) {
for (HelixManager helixManager : _fakeInstanceHelixManagers) {
if (helixManager.getInstanceName().equalsIgnoreCase(instanceId)) {
helixManager.disconnect();
_fakeInstanceHelixManagers.remove(helixManager);
return;
}
}
}
public Schema createDummySchema(String tableName) {
Schema schema = new Schema();
schema.setSchemaName(tableName);
schema.addField(new DimensionFieldSpec("dimA", FieldSpec.DataType.STRING, true, ""));
schema.addField(new DimensionFieldSpec("dimB", FieldSpec.DataType.STRING, true, 0));
schema.addField(new MetricFieldSpec("metricA", FieldSpec.DataType.INT, 0));
schema.addField(new MetricFieldSpec("metricB", FieldSpec.DataType.DOUBLE, -1));
schema.addField(new DateTimeFieldSpec("timeColumn", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:DAYS"));
return schema;
}
public Schema createDummySchemaForUpsertTable(String tableName) {
Schema schema = createDummySchema(tableName);
schema.setPrimaryKeyColumns(Collections.singletonList("dimA"));
return schema;
}
public void addDummySchema(String tableName)
throws IOException {
addSchema(createDummySchema(tableName));
}
/**
* Add a schema to the controller.
*/
public void addSchema(Schema schema)
throws IOException {
getControllerRequestClient().addSchema(schema);
}
public Schema getSchema(String schemaName) {
Schema schema = _helixResourceManager.getSchema(schemaName);
assertNotNull(schema);
return schema;
}
public void deleteSchema(String schemaName)
throws IOException {
getControllerRequestClient().deleteSchema(schemaName);
}
public void addTableConfig(TableConfig tableConfig)
throws IOException {
getControllerRequestClient().addTableConfig(tableConfig);
}
public void updateTableConfig(TableConfig tableConfig)
throws IOException {
getControllerRequestClient().updateTableConfig(tableConfig);
}
public TableConfig getOfflineTableConfig(String tableName) {
TableConfig offlineTableConfig = _helixResourceManager.getOfflineTableConfig(tableName);
Assert.assertNotNull(offlineTableConfig);
return offlineTableConfig;
}
public TableConfig getRealtimeTableConfig(String tableName) {
TableConfig realtimeTableConfig = _helixResourceManager.getRealtimeTableConfig(tableName);
Assert.assertNotNull(realtimeTableConfig);
return realtimeTableConfig;
}
public void dropOfflineTable(String tableName)
throws IOException {
getControllerRequestClient().deleteTable(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
public void dropRealtimeTable(String tableName)
throws IOException {
getControllerRequestClient().deleteTable(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
public void dropAllSegments(String tableName, TableType tableType)
throws IOException {
getControllerRequestClient().deleteSegments(tableName, tableType);
}
public long getTableSize(String tableName)
throws IOException {
return getControllerRequestClient().getTableSize(tableName);
}
public void reloadOfflineTable(String tableName)
throws IOException {
reloadOfflineTable(tableName, false);
}
public void reloadOfflineTable(String tableName, boolean forceDownload)
throws IOException {
getControllerRequestClient().reloadTable(tableName, TableType.OFFLINE, forceDownload);
}
public void reloadOfflineSegment(String tableName, String segmentName, boolean forceDownload)
throws IOException {
getControllerRequestClient().reloadSegment(tableName, segmentName, forceDownload);
}
public void reloadRealtimeTable(String tableName)
throws IOException {
getControllerRequestClient().reloadTable(tableName, TableType.REALTIME, false);
}
public void createBrokerTenant(String tenantName, int numBrokers)
throws IOException {
getControllerRequestClient().createBrokerTenant(tenantName, numBrokers);
}
public void updateBrokerTenant(String tenantName, int numBrokers)
throws IOException {
getControllerRequestClient().updateBrokerTenant(tenantName, numBrokers);
}
public void createServerTenant(String tenantName, int numOfflineServers, int numRealtimeServers)
throws IOException {
getControllerRequestClient().createServerTenant(tenantName, numOfflineServers, numRealtimeServers);
}
public void updateServerTenant(String tenantName, int numOfflineServers, int numRealtimeServers)
throws IOException {
getControllerRequestClient().updateServerTenant(tenantName, numOfflineServers, numRealtimeServers);
}
public void enableResourceConfigForLeadControllerResource(boolean enable) {
ConfigAccessor configAccessor = _helixManager.getConfigAccessor();
ResourceConfig resourceConfig =
configAccessor.getResourceConfig(getHelixClusterName(), Helix.LEAD_CONTROLLER_RESOURCE_NAME);
if (Boolean.parseBoolean(resourceConfig.getSimpleConfig(Helix.LEAD_CONTROLLER_RESOURCE_ENABLED_KEY)) != enable) {
resourceConfig.putSimpleConfig(Helix.LEAD_CONTROLLER_RESOURCE_ENABLED_KEY, Boolean.toString(enable));
configAccessor.setResourceConfig(getHelixClusterName(), Helix.LEAD_CONTROLLER_RESOURCE_NAME, resourceConfig);
}
}
public static String sendGetRequest(String urlString)
throws IOException {
return sendGetRequest(urlString, null);
}
public static String sendGetRequest(String urlString, Map<String, String> headers)
throws IOException {
try {
SimpleHttpResponse resp =
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new URL(urlString).toURI(), headers));
return constructResponse(resp);
} catch (URISyntaxException | HttpErrorStatusException e) {
throw new IOException(e);
}
}
public static String sendGetRequestRaw(String urlString)
throws IOException {
return IOUtils.toString(new URL(urlString).openStream());
}
public static String sendPostRequest(String urlString, String payload)
throws IOException {
return sendPostRequest(urlString, payload, Collections.emptyMap());
}
public static String sendPostRequest(String urlString, String payload, Map<String, String> headers)
throws IOException {
try {
SimpleHttpResponse resp = HttpClient.wrapAndThrowHttpException(
getHttpClient().sendJsonPostRequest(new URL(urlString).toURI(), payload, headers));
return constructResponse(resp);
} catch (URISyntaxException | HttpErrorStatusException e) {
throw new IOException(e);
}
}
public static String sendPostRequestRaw(String urlString, String payload, Map<String, String> headers)
throws IOException {
try {
EntityBuilder builder = EntityBuilder.create();
builder.setText(payload);
SimpleHttpResponse resp = HttpClient.wrapAndThrowHttpException(
getHttpClient().sendPostRequest(new URL(urlString).toURI(), builder.build(), headers));
return constructResponse(resp);
} catch (URISyntaxException | HttpErrorStatusException e) {
throw new IOException(e);
}
}
public static String sendPutRequest(String urlString)
throws IOException {
return sendPutRequest(urlString, null);
}
public static String sendPutRequest(String urlString, String payload)
throws IOException {
return sendPutRequest(urlString, payload, Collections.emptyMap());
}
public static String sendPutRequest(String urlString, String payload, Map<String, String> headers)
throws IOException {
try {
SimpleHttpResponse resp = HttpClient.wrapAndThrowHttpException(
getHttpClient().sendJsonPutRequest(new URL(urlString).toURI(), payload, headers));
return constructResponse(resp);
} catch (URISyntaxException | HttpErrorStatusException e) {
throw new IOException(e);
}
}
public static String sendDeleteRequest(String urlString)
throws IOException {
return sendDeleteRequest(urlString, Collections.emptyMap());
}
public static String sendDeleteRequest(String urlString, Map<String, String> headers)
throws IOException {
try {
SimpleHttpResponse resp =
HttpClient.wrapAndThrowHttpException(getHttpClient().sendDeleteRequest(new URL(urlString).toURI(), headers));
return constructResponse(resp);
} catch (URISyntaxException | HttpErrorStatusException e) {
throw new IOException(e);
}
}
private static String constructResponse(SimpleHttpResponse resp) {
return resp.getResponse();
}
public static SimpleHttpResponse sendMultipartPostRequest(String url, String body)
throws IOException {
return sendMultipartPostRequest(url, body, Collections.emptyMap());
}
public static SimpleHttpResponse sendMultipartPostRequest(String url, String body, Map<String, String> headers)
throws IOException {
return getHttpClient().sendMultipartPostRequest(url, body, headers);
}
public static SimpleHttpResponse sendMultipartPutRequest(String url, String body)
throws IOException {
return sendMultipartPutRequest(url, body, null);
}
public static SimpleHttpResponse sendMultipartPutRequest(String url, String body, Map<String, String> headers)
throws IOException {
return getHttpClient().sendMultipartPutRequest(url, body, headers);
}
/**
* @return Number of instances used by all the broker tenants
*/
public int getTaggedBrokerCount() {
int count = 0;
Set<String> brokerTenants = _helixResourceManager.getAllBrokerTenantNames();
for (String tenant : brokerTenants) {
count += _helixResourceManager.getAllInstancesForBrokerTenant(tenant).size();
}
return count;
}
/**
* @return Number of instances used by all the server tenants
*/
public int getTaggedServerCount() {
int count = 0;
Set<String> serverTenants = _helixResourceManager.getAllServerTenantNames();
for (String tenant : serverTenants) {
count += _helixResourceManager.getAllInstancesForServerTenant(tenant).size();
}
return count;
}
public ControllerRequestURLBuilder getControllerRequestURLBuilder() {
return _controllerRequestURLBuilder;
}
public HelixAdmin getHelixAdmin() {
return _helixAdmin;
}
public PinotHelixResourceManager getHelixResourceManager() {
return _helixResourceManager;
}
public String getControllerBaseApiUrl() {
return _controllerBaseApiUrl;
}
public HelixManager getHelixManager() {
return _helixManager;
}
public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
return _propertyStore;
}
public int getControllerPort() {
return _controllerPort;
}
public BaseControllerStarter getControllerStarter() {
return _controllerStarter == null ? new ControllerStarter() : _controllerStarter;
}
public ControllerConf getControllerConfig() {
return _controllerConfig;
}
/**
* Do not override this method as the configuration is shared across all default TestNG group.
*/
public final Map<String, Object> getSharedControllerConfiguration() {
Map<String, Object> properties = getDefaultControllerConfiguration();
// TODO: move these test specific configs into respective test classes.
properties.put(ControllerConf.ACCESS_CONTROL_FACTORY_CLASS, AllowAllAccessFactory.class.getName());
// Used in PinotTableRestletResourceTest
properties.put(ControllerConf.TABLE_MIN_REPLICAS, MIN_NUM_REPLICAS);
// Used in PinotControllerAppConfigsTest to test obfuscation
properties.put("controller.segment.fetcher.auth.token", "*personal*");
properties.put("controller.admin.access.control.principals.user.password", "*personal*");
return properties;
}
public void startSharedTestSetup()
throws Exception {
startSharedTestSetup(Collections.emptyMap());
}
/**
* Initialize shared state for the TestNG default test group.
*/
public void startSharedTestSetup(Map<String, Object> extraProperties)
throws Exception {
startZk();
Map<String, Object> sharedControllerConfiguration = getSharedControllerConfiguration();
sharedControllerConfiguration.putAll(extraProperties);
startController(sharedControllerConfiguration);
addMoreFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
addMoreFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
addMoreFakeBrokerInstancesToAutoJoinHelixCluster(TOTAL_NUM_BROKER_INSTANCES, false);
addMoreFakeServerInstancesToAutoJoinHelixCluster(TOTAL_NUM_SERVER_INSTANCES, false);
}
/**
* Cleanup shared state used in the TestNG default test group.
*/
public void stopSharedTestSetup() {
cleanup();
stopFakeInstances();
stopController();
stopZk();
}
/**
* Make sure shared state is setup and valid before each test case class is run.
*/
public void setupSharedStateAndValidate()
throws Exception {
setupSharedStateAndValidate(Collections.emptyMap());
}
public void setupSharedStateAndValidate(Map<String, Object> extraProperties)
throws Exception {
if (_zookeeperInstance == null || _helixResourceManager == null) {
// this is expected to happen only when running a single test case outside of testNG group, i.e when test
// cases are run one at a time within IntelliJ or through maven command line. When running under a testNG
// group, state will have already been setup by @BeforeGroups method in ControllerTestSetup.
startSharedTestSetup(extraProperties);
}
// Check number of tenants
Assert.assertEquals(getHelixResourceManager().getAllBrokerTenantNames().size(), 1);
Assert.assertEquals(getHelixResourceManager().getAllServerTenantNames().size(), 1);
// Check number of tagged broker and server instances
Assert.assertEquals(getTaggedBrokerCount(), NUM_BROKER_INSTANCES);
Assert.assertEquals(getTaggedServerCount(), NUM_SERVER_INSTANCES);
// No pre-existing tables
Assert.assertEquals(getHelixResourceManager().getAllTables().size(), 0);
// Check if tenants have right number of instances.
Assert.assertEquals(getHelixResourceManager().getAllInstancesForBrokerTenant("DefaultBroker").size(), 0);
Assert.assertEquals(getHelixResourceManager().getAllInstancesForServerTenant("DefaultServer").size(), 0);
// Check number of untagged instances.
Assert.assertEquals(getHelixResourceManager().getOnlineUnTaggedBrokerInstanceList().size(), NUM_BROKER_INSTANCES);
Assert.assertEquals(getHelixResourceManager().getOnlineUnTaggedServerInstanceList().size(), NUM_SERVER_INSTANCES);
}
/**
* Clean shared state after a test case class has completed running. Additional cleanup may be needed depending upon
* test functionality.
*/
public void cleanup() {
// Delete all tables.
List<String> tables = getHelixResourceManager().getAllTables();
for (String table : tables) {
getHelixResourceManager().deleteOfflineTable(table);
getHelixResourceManager().deleteRealtimeTable(table);
}
// Delete all schemas.
List<String> schemaNames = getHelixResourceManager().getSchemaNames();
if (CollectionUtils.isNotEmpty(schemaNames)) {
for (String schemaName : schemaNames) {
getHelixResourceManager().deleteSchema(getHelixResourceManager().getSchema(schemaName));
}
}
// Delete broker tenants except default tenant
Set<String> brokerTenants = getHelixResourceManager().getAllBrokerTenantNames();
for (String tenant : brokerTenants) {
if (!tenant.startsWith(DEFAULT_TENANT)) {
getHelixResourceManager().deleteBrokerTenantFor(tenant);
}
}
// Delete server tenants except default tenant
Set<String> serverTenants = getHelixResourceManager().getAllServerTenantNames();
for (String tenant : serverTenants) {
if (!tenant.startsWith(DEFAULT_TENANT)) {
getHelixResourceManager().deleteOfflineServerTenantFor(tenant);
getHelixResourceManager().deleteRealtimeServerTenantFor(tenant);
}
}
}
}