blob: a0dc29c61cb252569b20986cb7df9fcc6eba88a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import java.io.File;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Integration test that converts Avro data for 12 segments and runs queries against it.
*/
public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTest {
private static final String TENANT_NAME = "TestTenant";
private static final int NUM_OFFLINE_SEGMENTS = 8;
private static final int NUM_REALTIME_SEGMENTS = 6;
@Override
protected String getBrokerTenant() {
return TENANT_NAME;
}
@Override
protected String getServerTenant() {
return TENANT_NAME;
}
@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, "grpc");
}
@Override
protected void overrideServerConf(PinotConfiguration serverConf) {
serverConf.setProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, true);
}
@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// Start Zk, Kafka and Pinot
startHybridCluster();
List<File> avroFiles = getAllAvroFiles();
List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles, NUM_OFFLINE_SEGMENTS);
List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles, NUM_REALTIME_SEGMENTS);
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig offlineTableConfig = createOfflineTableConfig();
addTableConfig(offlineTableConfig);
addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir,
_tarDir);
uploadSegments(getTableName(), _tarDir);
// Push data into Kafka
pushAvroIntoKafka(realtimeAvroFiles);
// Set up the H2 connection
setUpH2Connection(avroFiles);
// Initialize the query generator
setUpQueryGenerator(avroFiles);
// TODO: this doesn't work so we simple wait for 5 second here. will be fixed after:
// https://github.com/apache/pinot/pull/7839
// waitForAllDocsLoaded(600_000L);
Thread.sleep(5000);
}
protected void startHybridCluster()
throws Exception {
// Start Zk and Kafka
startZk();
startKafka();
// Start the Pinot cluster
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
startController(properties);
startBrokers(1);
startServers(2);
// Create tenants
createBrokerTenant(TENANT_NAME, 1);
createServerTenant(TENANT_NAME, 1, 1);
}
@Test
public void testGrpcBrokerRequestHandlerOnSelectionOnlyQuery()
throws Exception {
String query = "SELECT * FROM mytable LIMIT 1000000";
testQuery(query);
query = "SELECT * FROM mytable WHERE DaysSinceEpoch > 16312 LIMIT 10000000";
testQuery(query);
query = "SELECT ArrTime, DaysSinceEpoch, Carrier FROM mytable LIMIT 10000000";
testQuery(query);
}
@AfterClass
public void tearDown()
throws Exception {
dropOfflineTable(getTableName());
stopServer();
stopBroker();
stopController();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
}