| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.integration.tests; |
| |
| import com.google.common.primitives.Longs; |
| import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; |
| import java.io.File; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import org.apache.avro.file.DataFileStream; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.http.HttpStatus; |
| import org.apache.kafka.clients.producer.KafkaProducer; |
| import org.apache.kafka.clients.producer.Producer; |
| import org.apache.kafka.clients.producer.ProducerConfig; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; |
| import org.apache.pinot.common.utils.FileUploadDownloadClient; |
| import org.apache.pinot.controller.ControllerConf; |
| import org.apache.pinot.integration.tests.kafka.schemaregistry.SchemaRegistryStarter; |
| import org.apache.pinot.plugin.inputformat.avro.AvroUtils; |
| import org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder; |
| import org.apache.pinot.spi.config.table.TableConfig; |
| import org.apache.pinot.spi.config.table.TableType; |
| import org.apache.pinot.spi.data.Schema; |
| import org.apache.pinot.spi.env.PinotConfiguration; |
| import org.apache.pinot.spi.stream.StreamConfigProperties; |
| import org.apache.pinot.spi.utils.CommonConstants; |
| import org.apache.pinot.spi.utils.ReadMode; |
| import org.apache.pinot.spi.utils.builder.TableNameBuilder; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| |
| import static java.lang.String.format; |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotNull; |
| import static org.testng.Assert.assertTrue; |
| |
| |
| /** |
| * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer. |
| * TODO: Add separate module-level tests and remove the randomness of this test |
| */ |
| public class KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest |
| extends RealtimeClusterIntegrationTest { |
| private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; |
| private static final String TEST_UPDATED_INVERTED_INDEX_QUERY = |
| "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305"; |
| private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = Collections.singletonList("DivActualElapsedTime"); |
| private static final long RANDOM_SEED = System.currentTimeMillis(); |
| private static final Random RANDOM = new Random(RANDOM_SEED); |
| |
| private final boolean _isDirectAlloc = RANDOM.nextBoolean(); |
| private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); |
| private final boolean _enableSplitCommit = RANDOM.nextBoolean(); |
| private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); |
| private final long _startTime = System.currentTimeMillis(); |
| private SchemaRegistryStarter.KafkaSchemaRegistryInstance _schemaRegistry; |
| |
| @Override |
| protected int getNumKafkaBrokers() { |
| return 1; |
| } |
| |
| @Override |
| protected void startKafka() { |
| super.startKafka(); |
| startSchemaRegistry(); |
| } |
| |
| @Override |
| protected void stopKafka() { |
| stopSchemaRegistry(); |
| super.stopKafka(); |
| } |
| |
| private void startSchemaRegistry() { |
| if (_schemaRegistry == null) { |
| _schemaRegistry = SchemaRegistryStarter.startLocalInstance(SchemaRegistryStarter.DEFAULT_PORT); |
| } |
| } |
| |
| private void stopSchemaRegistry() { |
| try { |
| if (_schemaRegistry != null) { |
| _schemaRegistry.stop(); |
| _schemaRegistry = null; |
| } |
| } catch (Exception e) { |
| // Swallow exceptions |
| } |
| } |
| |
| @Override |
| protected void pushAvroIntoKafka(List<File> avroFiles) |
| throws Exception { |
| Properties avroProducerProps = new Properties(); |
| avroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort()); |
| avroProducerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, _schemaRegistry.getUrl()); |
| avroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, |
| "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| avroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, |
| "io.confluent.kafka.serializers.KafkaAvroSerializer"); |
| Producer<byte[], GenericRecord> avroProducer = new KafkaProducer<>(avroProducerProps); |
| |
| Properties nonAvroProducerProps = new Properties(); |
| nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaPort()); |
| nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, |
| "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, |
| "org.apache.kafka.common.serialization.ByteArraySerializer"); |
| Producer<byte[], byte[]> nonAvroProducer = new KafkaProducer<>(nonAvroProducerProps); |
| |
| if (injectTombstones()) { |
| // publish lots of tombstones to livelock the consumer if it can't handle this properly |
| for (int i = 0; i < 1000; i++) { |
| // publish a tombstone first |
| nonAvroProducer.send( |
| new ProducerRecord<>(getKafkaTopic(), Longs.toByteArray(System.currentTimeMillis()), null)); |
| } |
| } |
| for (File avroFile : avroFiles) { |
| try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) { |
| for (GenericRecord genericRecord : reader) { |
| byte[] keyBytes = (getPartitionColumn() == null) ? Longs.toByteArray(System.currentTimeMillis()) |
| : (genericRecord.get(getPartitionColumn())).toString().getBytes(); |
| // Ignore getKafkaMessageHeader() |
| nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, "Rubbish".getBytes(UTF_8))); |
| avroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, genericRecord)); |
| } |
| } |
| } |
| } |
| |
| @Override |
| protected Map<String, String> getStreamConfigs() { |
| Map<String, String> streamConfigMap = super.getStreamConfigs(); |
| String streamType = "kafka"; |
| streamConfigMap.put( |
| StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS), |
| KafkaConfluentSchemaRegistryAvroMessageDecoder.class.getName()); |
| streamConfigMap.put("stream.kafka.decoder.prop.schema.registry.rest.url", _schemaRegistry.getUrl()); |
| return streamConfigMap; |
| } |
| |
| @Override |
| protected boolean injectTombstones() { |
| return true; |
| } |
| |
| @Override |
| protected boolean useLlc() { |
| return true; |
| } |
| |
| @Override |
| protected String getLoadMode() { |
| return ReadMode.mmap.name(); |
| } |
| |
| @Override |
| public void startController() |
| throws Exception { |
| Map<String, Object> properties = getDefaultControllerConfiguration(); |
| |
| properties.put(ControllerConf.ALLOW_HLC_TABLES, false); |
| properties.put(ControllerConf.ENABLE_SPLIT_COMMIT, _enableSplitCommit); |
| |
| startController(properties); |
| enableResourceConfigForLeadControllerResource(_enableLeadControllerResource); |
| } |
| |
| @Override |
| protected void overrideServerConf(PinotConfiguration configuration) { |
| configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true); |
| configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc); |
| if (_isConsumerDirConfigured) { |
| configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); |
| } |
| if (_enableSplitCommit) { |
| configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true); |
| configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true); |
| } |
| } |
| |
| @Override |
| protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, TableConfig tableConfig) |
| throws Exception { |
| if (!_tarDir.exists()) { |
| _tarDir.mkdir(); |
| } |
| if (!_segmentDir.exists()) { |
| _segmentDir.mkdir(); |
| } |
| |
| // create segments out of the avro files (segments will be placed in _tarDir) |
| List<File> copyOfAvroFiles = new ArrayList<>(avroFiles); |
| ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); |
| |
| // upload segments to controller |
| uploadSegmentsToController(getTableName(), _tarDir, false, false); |
| |
| // upload the first segment again to verify refresh |
| uploadSegmentsToController(getTableName(), _tarDir, true, false); |
| |
| // upload the first segment again to verify refresh with different segment crc |
| uploadSegmentsToController(getTableName(), _tarDir, true, true); |
| |
| // add avro files to the original list so H2 will have the uploaded data as well |
| avroFiles.addAll(copyOfAvroFiles); |
| } |
| |
| private void uploadSegmentsToController(String tableName, File tarDir, boolean onlyFirstSegment, boolean changeCrc) |
| throws Exception { |
| File[] segmentTarFiles = tarDir.listFiles(); |
| assertNotNull(segmentTarFiles); |
| int numSegments = segmentTarFiles.length; |
| assertTrue(numSegments > 0); |
| if (onlyFirstSegment) { |
| numSegments = 1; |
| } |
| URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort); |
| try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { |
| if (numSegments == 1) { |
| File segmentTarFile = segmentTarFiles[0]; |
| if (changeCrc) { |
| changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString()); |
| } |
| assertEquals( |
| fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, |
| tableName, TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK); |
| } else { |
| // Upload segments in parallel |
| ExecutorService executorService = Executors.newFixedThreadPool(numSegments); |
| List<Future<Integer>> futures = new ArrayList<>(numSegments); |
| for (File segmentTarFile : segmentTarFiles) { |
| futures.add(executorService.submit( |
| () -> fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), |
| segmentTarFile, tableName, TableType.REALTIME).getStatusCode())); |
| } |
| executorService.shutdown(); |
| for (Future<Integer> future : futures) { |
| assertEquals((int) future.get(), HttpStatus.SC_OK); |
| } |
| } |
| } |
| } |
| |
| private void changeCrcInSegmentZKMetadata(String tableName, String segmentFilePath) { |
| int startIdx = segmentFilePath.indexOf("mytable_"); |
| int endIdx = segmentFilePath.indexOf(".tar.gz"); |
| String segmentName = segmentFilePath.substring(startIdx, endIdx); |
| String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); |
| SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName); |
| segmentZKMetadata.setCrc(111L); |
| _helixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata); |
| } |
| |
| @Override |
| protected long getCountStarResult() { |
| // all the data that was ingested from Kafka also got uploaded via the controller's upload endpoint |
| return super.getCountStarResult() * 2; |
| } |
| |
| @BeforeClass |
| @Override |
| public void setUp() |
| throws Exception { |
| System.out.println(format( |
| "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, " |
| + "enableLeadControllerResource: %s", RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, |
| _enableSplitCommit, _enableLeadControllerResource)); |
| |
| // Remove the consumer directory |
| FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY)); |
| |
| super.setUp(); |
| } |
| |
| @AfterClass |
| @Override |
| public void tearDown() |
| throws Exception { |
| FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY)); |
| super.tearDown(); |
| } |
| } |