blob: 87963c8e845ba5279cd15d95b752297bf7d0625e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Hybrid cluster integration test that uploads 8 months of data as offline and 6 months of data as realtime (with a
* two month overlap).
*/
public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet {
private static final String TENANT_NAME = "TestTenant";
private static final int NUM_OFFLINE_SEGMENTS = 8;
private static final int NUM_REALTIME_SEGMENTS = 6;
@Override
protected String getBrokerTenant() {
return TENANT_NAME;
}
@Override
protected String getServerTenant() {
return TENANT_NAME;
}
@Override
protected void overrideServerConf(PinotConfiguration configuration) {
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, false);
}
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start Zk, Kafka and Pinot
startHybridCluster();
List<File> avroFiles = getAllAvroFiles();
List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles, NUM_OFFLINE_SEGMENTS);
List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles, NUM_REALTIME_SEGMENTS);
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig offlineTableConfig = createOfflineTableConfig();
addTableConfig(offlineTableConfig);
addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir,
_tarDir);
uploadSegments(getTableName(), _tarDir);
// Push data into Kafka
pushAvroIntoKafka(realtimeAvroFiles);
// Set up the H2 connection
setUpH2Connection(avroFiles);
// Initialize the query generator
setUpQueryGenerator(avroFiles);
// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}
protected void startHybridCluster()
throws Exception {
// Start Zk and Kafka
startZk();
startKafka();
// Start the Pinot cluster
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
startController(properties);
startBroker();
startServers(2);
// Create tenants
createBrokerTenant(TENANT_NAME, 1);
createServerTenant(TENANT_NAME, 1, 1);
}
@Test
public void testSegmentMetadataApi()
throws Exception {
String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName()));
JsonNode tableSegmentsMetadata = JsonUtils.stringToJsonNode(jsonOutputStr);
Assert.assertEquals(tableSegmentsMetadata.size(), 8);
JsonNode segmentMetadataFromAllEndpoint = tableSegmentsMetadata.elements().next();
String segmentName = segmentMetadataFromAllEndpoint.get("segmentName").asText();
jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(), segmentName));
JsonNode segmentMetadataFromDirectEndpoint = JsonUtils.stringToJsonNode(jsonOutputStr);
Assert.assertEquals(segmentMetadataFromAllEndpoint.get("totalDocs"),
segmentMetadataFromDirectEndpoint.get("segment.total.docs"));
}
// TODO: This test fails when using `llc` consumer mode. Needs investigation
@Test
public void testSegmentListApi()
throws Exception {
{
String jsonOutputStr = sendGetRequest(
_controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.OFFLINE.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
// There should be one element in the array
JsonNode element = array.get(0);
JsonNode segments = element.get("OFFLINE");
Assert.assertEquals(segments.size(), 8);
}
{
String jsonOutputStr = sendGetRequest(
_controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.REALTIME.toString()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
// There should be one element in the array
JsonNode element = array.get(0);
JsonNode segments = element.get("REALTIME");
Assert.assertEquals(segments.size(), 3);
}
{
String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.forSegmentListAPI(getTableName()));
JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr);
// there should be 2 elements in the array now.
int realtimeIndex = 0;
int offlineIndex = 1;
JsonNode element = array.get(realtimeIndex);
if (!element.has("REALTIME")) {
realtimeIndex = 1;
offlineIndex = 0;
}
JsonNode offlineElement = array.get(offlineIndex);
JsonNode realtimeElement = array.get(realtimeIndex);
JsonNode realtimeSegments = realtimeElement.get("REALTIME");
Assert.assertEquals(realtimeSegments.size(), 3);
JsonNode offlineSegments = offlineElement.get("OFFLINE");
Assert.assertEquals(offlineSegments.size(), 8);
}
}
@Test
public void testReload()
throws Exception {
super.testReload(true);
}
@Test
public void testBrokerDebugOutput()
throws Exception {
String tableName = getTableName();
Assert.assertNotNull(getDebugInfo("debug/timeBoundary/" + tableName));
Assert.assertNotNull(getDebugInfo("debug/timeBoundary/" + TableNameBuilder.OFFLINE.tableNameWithType(tableName)));
Assert.assertNotNull(getDebugInfo("debug/timeBoundary/" + TableNameBuilder.REALTIME.tableNameWithType(tableName)));
Assert.assertNotNull(getDebugInfo("debug/routingTable/" + tableName));
Assert.assertNotNull(getDebugInfo("debug/routingTable/" + TableNameBuilder.OFFLINE.tableNameWithType(tableName)));
Assert.assertNotNull(getDebugInfo("debug/routingTable/" + TableNameBuilder.REALTIME.tableNameWithType(tableName)));
}
@Test
public void testBrokerDebugRoutingTableSQL()
throws Exception {
String tableName = getTableName();
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
String encodedSQL;
encodedSQL = URLEncoder.encode("select * from " + realtimeTableName, "UTF-8");
Assert.assertNotNull(getDebugInfo("debug/routingTable/sql?query=" + encodedSQL));
encodedSQL = URLEncoder.encode("select * from " + offlineTableName, "UTF-8");
Assert.assertNotNull(getDebugInfo("debug/routingTable/sql?query=" + encodedSQL));
}
@Test
public void testQueryTracing()
throws Exception {
JsonNode jsonNode = postQuery("SET trace = true; SELECT COUNT(*) FROM " + getTableName());
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), getCountStarResult());
Assert.assertTrue(jsonNode.get("exceptions").isEmpty());
JsonNode traceInfo = jsonNode.get("traceInfo");
Assert.assertEquals(traceInfo.size(), 2);
Assert.assertTrue(traceInfo.has("localhost_O"));
Assert.assertTrue(traceInfo.has("localhost_R"));
}
@Test
@Override
public void testHardcodedQueries()
throws Exception {
super.testHardcodedQueries();
}
@Test
@Override
public void testQueriesFromQueryFile()
throws Exception {
super.testQueriesFromQueryFile();
}
@Test
@Override
public void testGeneratedQueries()
throws Exception {
super.testGeneratedQueries();
}
@Test
@Override
public void testQueryExceptions()
throws Exception {
super.testQueryExceptions();
}
@Test
@Override
public void testInstanceShutdown()
throws Exception {
super.testInstanceShutdown();
}
@Test
@Override
public void testBrokerResponseMetadata()
throws Exception {
super.testBrokerResponseMetadata();
}
@Test
@Override
public void testVirtualColumnQueries() {
super.testVirtualColumnQueries();
}
@AfterClass
public void tearDown()
throws Exception {
// Try deleting the tables and check that they have no routing table
String tableName = getTableName();
dropOfflineTable(tableName);
dropRealtimeTable(tableName);
// Routing should be removed after deleting all tables
TestUtils.waitForCondition(aVoid -> {
try {
getDebugInfo("debug/routingTable/" + tableName);
return false;
} catch (Exception e) {
// only return true if 404 not found error is thrown.
return e.getMessage().contains("Got error status code: 404");
}
}, 60_000L, "Routing table is not empty after dropping all tables");
stopServer();
stopBroker();
stopController();
stopKafka();
stopZk();
cleanupHybridCluster();
}
/**
* Can be overridden to preserve segments.
*
* @throws Exception
*/
protected void cleanupHybridCluster()
throws Exception {
FileUtils.deleteDirectory(_tempDir);
}
}