blob: d64c7321618c018ecd8367ace0384c40486fe3a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Collections;
import java.util.Map;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.pinot.broker.broker.helix.BaseBrokerStarter;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.FailureDetector;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
/**
* Integration test that extends OfflineClusterIntegrationTest but start multiple brokers and servers.
*/
public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterIntegrationTest {
private static final int NUM_BROKERS = 2;
private static final int NUM_SERVERS = 3;
@Override
protected int getNumBrokers() {
return NUM_BROKERS;
}
@Override
protected int getNumServers() {
return NUM_SERVERS;
}
@Override
protected int getNumReplicas() {
return NUM_SERVERS;
}
@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(FailureDetector.CONFIG_OF_TYPE, FailureDetector.Type.CONNECTION.name());
}
@Test
public void testUpdateBrokerResource()
throws Exception {
// Add a new broker to the cluster
BaseBrokerStarter brokerStarter = startOneBroker(NUM_BROKERS);
// Check if broker is added to all the tables in broker resource
String clusterName = getHelixClusterName();
String brokerId = brokerStarter.getInstanceId();
IdealState brokerResourceIdealState =
_helixAdmin.getResourceIdealState(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
for (Map<String, String> brokerAssignment : brokerResourceIdealState.getRecord().getMapFields().values()) {
assertEquals(brokerAssignment.get(brokerId), BrokerResourceStateModel.ONLINE);
}
TestUtils.waitForCondition(aVoid -> {
ExternalView brokerResourceExternalView =
_helixAdmin.getResourceExternalView(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
for (Map<String, String> brokerAssignment : brokerResourceExternalView.getRecord().getMapFields().values()) {
if (!brokerAssignment.containsKey(brokerId)) {
return false;
}
}
return true;
}, 60_000L, "Failed to find broker in broker resource ExternalView");
// Stop the broker
brokerStarter.stop();
// Dropping the broker should fail because it is still in the broker resource
try {
sendDeleteRequest(_controllerRequestURLBuilder.forInstance(brokerId));
fail("Dropping instance should fail because it is still in the broker resource");
} catch (Exception e) {
// Expected
}
// Untag the broker and update the broker resource so that it is removed from the broker resource
sendPutRequest(_controllerRequestURLBuilder.forInstanceUpdateTags(brokerId, Collections.emptyList(), true));
// Check if broker is removed from all the tables in broker resource
brokerResourceIdealState =
_helixAdmin.getResourceIdealState(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
for (Map<String, String> brokerAssignment : brokerResourceIdealState.getRecord().getMapFields().values()) {
assertFalse(brokerAssignment.containsKey(brokerId));
}
TestUtils.waitForCondition(aVoid -> {
ExternalView brokerResourceExternalView =
_helixAdmin.getResourceExternalView(clusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
for (Map<String, String> brokerAssignment : brokerResourceExternalView.getRecord().getMapFields().values()) {
if (brokerAssignment.containsKey(brokerId)) {
return false;
}
}
return true;
}, 60_000L, "Failed to remove broker from broker resource ExternalView");
// Dropping the broker should success now
sendDeleteRequest(_controllerRequestURLBuilder.forInstance(brokerId));
// Check if broker is dropped from the cluster
assertFalse(_helixAdmin.getInstancesInCluster(clusterName).contains(brokerId));
}
@Test
public void testServerHardFailure()
throws Exception {
long expectedCountStarResult = getCountStarResult();
testCountStarQuery(3, false);
assertEquals(getCurrentCountStarResult(), expectedCountStarResult);
// Take a server and shut down its query server to mimic a hard failure
BaseServerStarter serverStarter = _serverStarters.get(NUM_SERVERS - 1);
serverStarter.getServerInstance().shutDown();
// First query should hit all servers and get connection refused exception
testCountStarQuery(NUM_SERVERS, true);
// Second query should not hit the failed server, and should return the correct result
testCountStarQuery(NUM_SERVERS - 1, false);
// Restart the failed server, and it should be included in the routing again
serverStarter.stop();
serverStarter = startOneServer(NUM_SERVERS - 1);
_serverStarters.set(NUM_SERVERS - 1, serverStarter);
TestUtils.waitForCondition(aVoid -> {
try {
JsonNode queryResult = postQuery("SELECT COUNT(*) FROM mytable");
// Result should always be correct
assertEquals(queryResult.get("resultTable").get("rows").get(0).get(0).longValue(), getCountStarResult());
return queryResult.get("numServersQueried").intValue() == NUM_SERVERS;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 10_000L, "Failed to include the restarted server into the routing");
}
private void testCountStarQuery(int expectedNumServersQueried, boolean exceptionExpected)
throws Exception {
JsonNode queryResult = postQuery("SELECT COUNT(*) FROM mytable");
assertEquals(queryResult.get("numServersQueried").intValue(), expectedNumServersQueried);
if (exceptionExpected) {
// 2 exceptions expected: 1. connection refused from the failed server; 2. servers not responded
JsonNode exceptions = queryResult.get("exceptions");
assertEquals(exceptions.size(), 2);
JsonNode firstException = exceptions.get(0);
assertEquals(firstException.get("errorCode").intValue(), QueryException.BROKER_REQUEST_SEND_ERROR_CODE);
assertTrue(firstException.get("message").textValue().contains("Connection refused"));
JsonNode secondException = exceptions.get(1);
assertEquals(secondException.get("errorCode").intValue(), QueryException.SERVER_NOT_RESPONDING_ERROR_CODE);
} else {
assertEquals(queryResult.get("resultTable").get("rows").get(0).get(0).longValue(), getCountStarResult());
assertTrue(queryResult.get("exceptions").isEmpty());
}
}
// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testStarTreeTriggering() {
// Ignored
}
// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testDefaultColumns() {
// Ignored
}
// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testBloomFilterTriggering() {
// Ignored
}
// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testRangeIndexTriggering() {
// Ignored
}
// Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded
@Test(enabled = false)
@Override
public void testInvertedIndexTriggering() {
// Ignored
}
@Test(enabled = false)
@Override
public void testHardcodedServerPartitionedSqlQueries() {
// Ignored
}
}