| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.broker.broker; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.helix.model.ExternalView; |
| import org.apache.helix.model.IdealState; |
| import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; |
| import org.apache.pinot.broker.routing.RoutingManager; |
| import org.apache.pinot.broker.routing.RoutingTable; |
| import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo; |
| import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; |
| import org.apache.pinot.common.request.BrokerRequest; |
| import org.apache.pinot.common.utils.config.TagNameUtils; |
| import org.apache.pinot.controller.api.exception.InvalidTableConfigException; |
| import org.apache.pinot.controller.helix.ControllerTest; |
| import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; |
| import org.apache.pinot.pql.parsers.Pql2Compiler; |
| import org.apache.pinot.spi.config.table.TableConfig; |
| import org.apache.pinot.spi.config.table.TableType; |
| import org.apache.pinot.spi.data.DateTimeFieldSpec; |
| import org.apache.pinot.spi.data.DateTimeFormatSpec; |
| import org.apache.pinot.spi.data.DateTimeGranularitySpec; |
| import org.apache.pinot.spi.data.FieldSpec; |
| import org.apache.pinot.spi.data.Schema; |
| import org.apache.pinot.spi.env.PinotConfiguration; |
| import org.apache.pinot.spi.utils.CommonConstants.Helix; |
| import org.apache.pinot.spi.utils.builder.TableConfigBuilder; |
| import org.apache.pinot.spi.utils.builder.TableNameBuilder; |
| import org.apache.pinot.util.TestUtils; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertTrue; |
| |
| |
| public class HelixBrokerStarterTest extends ControllerTest { |
| private static final Pql2Compiler COMPILER = new Pql2Compiler(); |
| private static final String RAW_TABLE_NAME = "testTable"; |
| private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); |
| private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); |
| private static final String TIME_COLUMN_NAME = "daysSinceEpoch"; |
| private static final int NUM_BROKERS = 3; |
| private static final int NUM_SERVERS = 1; |
| private static final int NUM_OFFLINE_SEGMENTS = 5; |
| private static final int EXPECTED_VERSION = -1; |
| |
| private HelixBrokerStarter _brokerStarter; |
| |
| @BeforeClass |
| public void setUp() |
| throws Exception { |
| startZk(); |
| startController(); |
| |
| Map<String, Object> properties = new HashMap<>(); |
| properties.put(Helix.KEY_OF_BROKER_QUERY_PORT, 18099); |
| properties.put(Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName()); |
| properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl()); |
| |
| _brokerStarter = new HelixBrokerStarter(); |
| _brokerStarter.init(new PinotConfiguration(properties)); |
| _brokerStarter.start(); |
| |
| addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKERS - 1, true); |
| addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true); |
| |
| Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) |
| .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.INT, |
| new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), DateTimeFieldSpec.TimeFormat.EPOCH.toString()) |
| .getFormat(), new DateTimeGranularitySpec(1, TimeUnit.DAYS).getGranularity()).build(); |
| _helixResourceManager.addSchema(schema, true); |
| TableConfig offlineTableConfig = |
| new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) |
| .setTimeType(TimeUnit.DAYS.name()).build(); |
| _helixResourceManager.addTable(offlineTableConfig); |
| TableConfig realtimeTimeConfig = |
| new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME) |
| .setTimeType(TimeUnit.DAYS.name()).setStreamConfigs(getStreamConfigs()).build(); |
| _helixResourceManager.addTable(realtimeTimeConfig); |
| |
| for (int i = 0; i < NUM_OFFLINE_SEGMENTS; i++) { |
| _helixResourceManager |
| .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), |
| "downloadUrl"); |
| } |
| |
| TestUtils.waitForCondition(aVoid -> { |
| ExternalView offlineTableExternalView = |
| _helixAdmin.getResourceExternalView(getHelixClusterName(), OFFLINE_TABLE_NAME); |
| return offlineTableExternalView != null |
| && offlineTableExternalView.getPartitionSet().size() == NUM_OFFLINE_SEGMENTS; |
| }, 30_000L, "Failed to find all OFFLINE segments in the ExternalView"); |
| } |
| |
| private Map<String, String> getStreamConfigs() { |
| Map<String, String> streamConfigs = new HashMap<>(); |
| streamConfigs.put("streamType", "kafka"); |
| streamConfigs.put("stream.kafka.consumer.type", "highLevel"); |
| streamConfigs.put("stream.kafka.topic.name", "kafkaTopic"); |
| streamConfigs |
| .put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder"); |
| return streamConfigs; |
| } |
| |
| @Test |
| public void testResourceAndTagAssignment() |
| throws Exception { |
| assertEquals( |
| _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), TagNameUtils.getBrokerTagForTenant(null)) |
| .size(), NUM_BROKERS); |
| |
| IdealState brokerResourceIdealState = |
| _helixAdmin.getResourceIdealState(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE); |
| assertEquals(brokerResourceIdealState.getInstanceSet(OFFLINE_TABLE_NAME).size(), NUM_BROKERS); |
| assertEquals(brokerResourceIdealState.getInstanceSet(REALTIME_TABLE_NAME).size(), NUM_BROKERS); |
| |
| ExternalView brokerResourceExternalView = |
| _helixAdmin.getResourceExternalView(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE); |
| assertEquals(brokerResourceExternalView.getStateMap(OFFLINE_TABLE_NAME).size(), NUM_BROKERS); |
| assertEquals(brokerResourceExternalView.getStateMap(REALTIME_TABLE_NAME).size(), NUM_BROKERS); |
| |
| RoutingManager routingManager = _brokerStarter.getRoutingManager(); |
| assertTrue(routingManager.routingExists(OFFLINE_TABLE_NAME)); |
| assertTrue(routingManager.routingExists(REALTIME_TABLE_NAME)); |
| |
| BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest("SELECT * FROM " + OFFLINE_TABLE_NAME); |
| RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest); |
| assertNotNull(routingTable); |
| assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS); |
| assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().size(), NUM_OFFLINE_SEGMENTS); |
| assertTrue(routingTable.getUnavailableSegments().isEmpty()); |
| |
| // Add a new segment into the OFFLINE table |
| _helixResourceManager |
| .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl"); |
| |
| TestUtils.waitForCondition(aVoid -> |
| routingManager.getRoutingTable(brokerRequest).getServerInstanceToSegmentsMap().values().iterator().next().size() |
| == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment into the routing table"); |
| |
| // Add a new table with different broker tenant |
| String newRawTableName = "newTable"; |
| String newOfflineTableName = TableNameBuilder.OFFLINE.tableNameWithType(newRawTableName); |
| TableConfig newTableConfig = |
| new TableConfigBuilder(TableType.OFFLINE).setTableName(newRawTableName).setBrokerTenant("testBroker").build(); |
| try { |
| _helixResourceManager.addTable(newTableConfig); |
| Assert.fail("Table creation should fail as testBroker does not exist"); |
| } catch (InvalidTableConfigException e) { |
| // expected |
| } |
| |
| // Add a new table with same broker tenant |
| newTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(newRawTableName) |
| .setServerTenant(TagNameUtils.DEFAULT_TENANT_NAME).build(); |
| _helixResourceManager.addTable(newTableConfig); |
| |
| // Broker tenant should be overridden to DefaultTenant |
| TableConfig newTableConfigInCluster = _helixResourceManager.getTableConfig(newOfflineTableName); |
| assertNotNull(newTableConfigInCluster); |
| assertEquals(newTableConfigInCluster.getTenantConfig().getBroker(), TagNameUtils.DEFAULT_TENANT_NAME); |
| |
| brokerResourceIdealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE); |
| assertEquals(brokerResourceIdealState.getInstanceSet(newOfflineTableName).size(), NUM_BROKERS); |
| |
| TestUtils.waitForCondition(aVoid -> { |
| Map<String, String> newTableStateMap = |
| _helixAdmin.getResourceExternalView(getHelixClusterName(), Helix.BROKER_RESOURCE_INSTANCE) |
| .getStateMap(newOfflineTableName); |
| return newTableStateMap != null && newTableStateMap.size() == NUM_BROKERS; |
| }, 30_000L, "Failed to find all brokers for the new table in the brokerResource ExternalView"); |
| |
| assertTrue(routingManager.routingExists(newOfflineTableName)); |
| } |
| |
| /** |
| * This test verifies that when the segments of an OFFLINE are refreshed, the TimeBoundaryInfo is also updated. |
| */ |
| @Test |
| public void testTimeBoundaryUpdate() { |
| RoutingManager routingManager = _brokerStarter.getRoutingManager(); |
| |
| // Time boundary should be 1 day smaller than the end time |
| int currentEndTime = 10; |
| TimeBoundaryInfo timeBoundaryInfo = routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME); |
| assertNotNull(timeBoundaryInfo); |
| assertEquals(timeBoundaryInfo.getTimeValue(), Integer.toString(currentEndTime - 1)); |
| |
| // Refresh a segment with a new end time |
| String segmentToRefresh = _helixResourceManager.getSegmentsFor(OFFLINE_TABLE_NAME).get(0); |
| int newEndTime = currentEndTime + 10; |
| SegmentZKMetadata segmentZKMetadata = |
| _helixResourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, segmentToRefresh); |
| _helixResourceManager.refreshSegment(OFFLINE_TABLE_NAME, |
| SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_TABLE_NAME, segmentToRefresh, newEndTime), |
| segmentZKMetadata, EXPECTED_VERSION, "downloadUrl", null); |
| |
| TestUtils.waitForCondition(aVoid -> routingManager.getTimeBoundaryInfo(OFFLINE_TABLE_NAME).getTimeValue() |
| .equals(Integer.toString(newEndTime - 1)), 30_000L, "Failed to update the time boundary for refreshed segment"); |
| } |
| |
| @AfterClass |
| public void tearDown() { |
| stopFakeInstances(); |
| _brokerStarter.stop(); |
| stopController(); |
| stopZk(); |
| } |
| } |