blob: 7c57e21a70bed3134e5c458f3430e363dc9d8676 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.controller.helix;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "stateless")
public class ControllerTenantStatelessTest extends ControllerTest {
private static final String BROKER_TAG_PREFIX = "brokerTag_";
private static final String SERVER_TAG_PREFIX = "serverTag_";
private static final int NUM_INSTANCES = 10;
private static final int NUM_BROKER_TAGS = 3;
private static final int NUM_BROKERS_PER_TAG = 3;
private static final int NUM_SERVER_TAGS = 2;
private static final int NUM_OFFLINE_SERVERS_PER_TAG = 2;
private static final int NUM_REALTIME_SERVERS_PER_TAG = 1;
private static final int NUM_SERVERS_PER_TAG = NUM_OFFLINE_SERVERS_PER_TAG + NUM_REALTIME_SERVERS_PER_TAG;
@BeforeClass
public void setUp()
throws Exception {
startZk();
startController();
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, false);
addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
}
@Test
public void testBrokerTenant()
throws IOException {
// Create broker tenants
for (int i = 1; i <= NUM_BROKER_TAGS; i++) {
String brokerTenant = BROKER_TAG_PREFIX + i;
createBrokerTenant(brokerTenant, NUM_BROKERS_PER_TAG);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant(brokerTenant))
.size(),
NUM_BROKERS_PER_TAG);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
.size(),
NUM_INSTANCES - i * NUM_BROKERS_PER_TAG);
}
// Get broker tenants
JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTenantGet()));
Assert.assertEquals(response.get("BROKER_TENANTS").size(), NUM_BROKER_TAGS);
for (int i = 1; i <= NUM_BROKER_TAGS; i++) {
String brokerTag = BROKER_TAG_PREFIX + i;
response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantGet(brokerTag)));
Assert.assertEquals(response.get("BrokerInstances").size(), NUM_BROKERS_PER_TAG);
Assert.assertEquals(response.get("tenantName").asText(), brokerTag);
response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTenantGet(brokerTag)));
Assert.assertEquals(response.get("BrokerInstances").size(), NUM_BROKERS_PER_TAG);
Assert.assertEquals(response.get("ServerInstances").size(), 0);
Assert.assertEquals(response.get("tenantName").asText(), brokerTag);
}
// Update broker tenants
for (int i = 0; i <= NUM_INSTANCES - (NUM_BROKER_TAGS - 1) * NUM_BROKERS_PER_TAG; i++) {
String brokerTenant = BROKER_TAG_PREFIX + 1;
updateBrokerTenant(brokerTenant, i);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant(brokerTenant))
.size(),
i);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
.size(),
NUM_INSTANCES - (NUM_BROKER_TAGS - 1) * NUM_BROKERS_PER_TAG - i);
}
// Delete broker tenants
for (int i = 1; i <= NUM_BROKER_TAGS; i++) {
String brokerTenant = BROKER_TAG_PREFIX + i;
sendDeleteRequest(_controllerRequestURLBuilder.forBrokerTenantDelete(brokerTenant));
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant(brokerTenant))
.size(),
0);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE)
.size(),
NUM_INSTANCES - (NUM_BROKER_TAGS - i) * NUM_BROKERS_PER_TAG);
}
}
@Test
public void testEmptyServerTenant() {
try {
sendGetRequest(_controllerRequestURLBuilder.forServerTenantGet("doesn't_exist"));
Assert.fail();
} catch (Exception e) {
}
}
@Test
public void testServerTenant()
throws IOException {
// Create server tenants
for (int i = 1; i <= NUM_SERVER_TAGS; i++) {
String serverTenant = SERVER_TAG_PREFIX + i;
createServerTenant(serverTenant, NUM_OFFLINE_SERVERS_PER_TAG, NUM_REALTIME_SERVERS_PER_TAG);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getOfflineTagForTenant(serverTenant))
.size(), NUM_OFFLINE_SERVERS_PER_TAG);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getRealtimeTagForTenant(serverTenant))
.size(), NUM_REALTIME_SERVERS_PER_TAG);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
.size(),
NUM_INSTANCES - i * NUM_SERVERS_PER_TAG);
}
// Get server tenants
JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTenantGet()));
Assert.assertEquals(response.get("SERVER_TENANTS").size(), NUM_SERVER_TAGS);
for (int i = 1; i <= NUM_SERVER_TAGS; i++) {
String serverTag = SERVER_TAG_PREFIX + i;
response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forServerTenantGet(serverTag)));
Assert.assertEquals(response.get("ServerInstances").size(), NUM_SERVERS_PER_TAG);
Assert.assertEquals(response.get("tenantName").asText(), serverTag);
response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTenantGet(serverTag)));
Assert.assertEquals(response.get("BrokerInstances").size(), 0);
Assert.assertEquals(response.get("ServerInstances").size(), NUM_SERVERS_PER_TAG);
Assert.assertEquals(response.get("tenantName").asText(), serverTag);
}
// Update server tenants
// Note: server tenants cannot scale down
for (int i = 0; i <= (NUM_INSTANCES - NUM_SERVER_TAGS * NUM_SERVERS_PER_TAG) / 2; i++) {
String serverTenant = SERVER_TAG_PREFIX + 1;
updateServerTenant(serverTenant, NUM_OFFLINE_SERVERS_PER_TAG + i, NUM_REALTIME_SERVERS_PER_TAG + i);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getOfflineTagForTenant(serverTenant))
.size(), NUM_OFFLINE_SERVERS_PER_TAG + i);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getRealtimeTagForTenant(serverTenant))
.size(), NUM_REALTIME_SERVERS_PER_TAG + i);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
.size(),
NUM_INSTANCES - NUM_SERVER_TAGS * NUM_SERVERS_PER_TAG - i * 2);
}
// Delete server tenants
for (int i = 1; i < NUM_SERVER_TAGS; i++) {
String serverTenant = SERVER_TAG_PREFIX + i;
sendDeleteRequest(_controllerRequestURLBuilder.forServerTenantDelete(serverTenant));
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getOfflineTagForTenant(serverTenant))
.size(), 0);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getRealtimeTagForTenant(serverTenant))
.size(), 0);
Assert.assertEquals(_helixAdmin
.getInstancesInClusterWithTag(getHelixClusterName(), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE)
.size(),
NUM_INSTANCES - (NUM_SERVER_TAGS - i) * NUM_SERVERS_PER_TAG);
}
}
@AfterClass
public void tearDown() {
stopFakeInstances();
stopController();
stopZk();
}
}