| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.integration.tests; |
| |
| import java.io.File; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.pinot.controller.ControllerConf; |
| import org.apache.pinot.spi.config.table.TableConfig; |
| import org.apache.pinot.spi.data.Schema; |
| import org.apache.pinot.spi.env.PinotConfiguration; |
| import org.apache.pinot.spi.utils.CommonConstants.Broker; |
| import org.apache.pinot.spi.utils.CommonConstants.Server; |
| import org.apache.pinot.util.TestUtils; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| |
| /** |
| * Integration test that converts Avro data for 12 segments and runs queries against it. |
| */ |
| public class GrpcBrokerClusterIntegrationTest extends BaseClusterIntegrationTest { |
| private static final String TENANT_NAME = "TestTenant"; |
| private static final int NUM_OFFLINE_SEGMENTS = 8; |
| private static final int NUM_REALTIME_SEGMENTS = 6; |
| |
| @Override |
| protected String getBrokerTenant() { |
| return TENANT_NAME; |
| } |
| |
| @Override |
| protected String getServerTenant() { |
| return TENANT_NAME; |
| } |
| |
| @Override |
| protected void overrideBrokerConf(PinotConfiguration brokerConf) { |
| brokerConf.setProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, "grpc"); |
| } |
| |
| @Override |
| protected void overrideServerConf(PinotConfiguration serverConf) { |
| serverConf.setProperty(Server.CONFIG_OF_ENABLE_GRPC_SERVER, true); |
| } |
| |
| @BeforeClass |
| public void setUp() |
| throws Exception { |
| TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); |
| |
| // Start Zk, Kafka and Pinot |
| startHybridCluster(); |
| |
| List<File> avroFiles = getAllAvroFiles(); |
| List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles, NUM_OFFLINE_SEGMENTS); |
| List<File> realtimeAvroFiles = getRealtimeAvroFiles(avroFiles, NUM_REALTIME_SEGMENTS); |
| |
| // Create and upload the schema and table config |
| Schema schema = createSchema(); |
| addSchema(schema); |
| TableConfig offlineTableConfig = createOfflineTableConfig(); |
| addTableConfig(offlineTableConfig); |
| addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0))); |
| |
| // Create and upload segments |
| ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 0, _segmentDir, |
| _tarDir); |
| uploadSegments(getTableName(), _tarDir); |
| |
| // Push data into Kafka |
| pushAvroIntoKafka(realtimeAvroFiles); |
| |
| // Set up the H2 connection |
| setUpH2Connection(avroFiles); |
| |
| // Initialize the query generator |
| setUpQueryGenerator(avroFiles); |
| |
| // TODO: this doesn't work so we simple wait for 5 second here. will be fixed after: |
| // https://github.com/apache/pinot/pull/7839 |
| // waitForAllDocsLoaded(600_000L); |
| Thread.sleep(5000); |
| } |
| |
| protected void startHybridCluster() |
| throws Exception { |
| // Start Zk and Kafka |
| startZk(); |
| startKafka(); |
| |
| // Start the Pinot cluster |
| Map<String, Object> properties = getDefaultControllerConfiguration(); |
| properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false); |
| startController(properties); |
| startBrokers(1); |
| startServers(2); |
| |
| // Create tenants |
| createBrokerTenant(TENANT_NAME, 1); |
| createServerTenant(TENANT_NAME, 1, 1); |
| } |
| |
| @Test |
| public void testGrpcBrokerRequestHandlerOnSelectionOnlyQuery() |
| throws Exception { |
| String query = "SELECT * FROM mytable LIMIT 1000000"; |
| testQuery(query); |
| query = "SELECT * FROM mytable WHERE DaysSinceEpoch > 16312 LIMIT 10000000"; |
| testQuery(query); |
| query = "SELECT ArrTime, DaysSinceEpoch, Carrier FROM mytable LIMIT 10000000"; |
| testQuery(query); |
| } |
| |
| @AfterClass |
| public void tearDown() |
| throws Exception { |
| dropOfflineTable(getTableName()); |
| |
| stopServer(); |
| stopBroker(); |
| stopController(); |
| stopZk(); |
| |
| FileUtils.deleteDirectory(_tempDir); |
| } |
| } |