| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pinot.integration.tests; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.google.common.base.Function; |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import javax.annotation.Nullable; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader; |
| import org.apache.pinot.plugin.segmentuploader.SegmentUploaderDefault; |
| import org.apache.pinot.plugin.segmentwriter.filebased.FileBasedSegmentWriter; |
| import org.apache.pinot.spi.config.table.TableConfig; |
| import org.apache.pinot.spi.config.table.TableType; |
| import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; |
| import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; |
| import org.apache.pinot.spi.data.Schema; |
| import org.apache.pinot.spi.data.readers.GenericRow; |
| import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; |
| import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; |
| import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; |
| import org.apache.pinot.spi.utils.JsonUtils; |
| import org.apache.pinot.spi.utils.builder.TableNameBuilder; |
| import org.apache.pinot.util.TestUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| |
| /** |
| * Tests creating segments via the {@link SegmentWriter} implementations |
| */ |
| public class SegmentWriterUploaderIntegrationTest extends BaseClusterIntegrationTest { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(SegmentWriterUploaderIntegrationTest.class); |
| |
| private Schema _schema; |
| private String _tableNameWithType; |
| private List<File> _avroFiles; |
| |
| @BeforeClass |
| public void setUp() |
| throws Exception { |
| TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); |
| |
| // Start the Pinot cluster |
| startZk(); |
| startController(); |
| startBroker(); |
| startServer(); |
| |
| // Create and upload the schema |
| _schema = createSchema(); |
| addSchema(_schema); |
| _tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(getTableName()); |
| |
| // Get avro files |
| _avroFiles = getAllAvroFiles(); |
| } |
| |
| @Nullable |
| protected IngestionConfig getIngestionConfig() { |
| Map<String, String> batchConfigMap = new HashMap<>(); |
| batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, _tarDir.getAbsolutePath()); |
| batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false"); |
| batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, _controllerBaseApiUrl); |
| IngestionConfig ingestionConfig = new IngestionConfig(); |
| ingestionConfig.setBatchIngestionConfig( |
| new BatchIngestionConfig(Collections.singletonList(batchConfigMap), "APPEND", "HOURLY")); |
| return ingestionConfig; |
| } |
| |
| /** |
| * Write the records from 3 avro files into the Pinot table using the {@link FileBasedSegmentWriter} |
| * Calls {@link SegmentWriter#flush()} after writing records from each avro file |
| * Checks the number of segments created and total docs from the query |
| */ |
| @Test |
| public void testFileBasedSegmentWriterAndDefaultUploader() |
| throws Exception { |
| |
| TableConfig offlineTableConfig = createOfflineTableConfig(); |
| addTableConfig(offlineTableConfig); |
| |
| SegmentWriter segmentWriter = new FileBasedSegmentWriter(); |
| segmentWriter.init(offlineTableConfig, _schema); |
| SegmentUploader segmentUploader = new SegmentUploaderDefault(); |
| segmentUploader.init(offlineTableConfig); |
| |
| GenericRow reuse = new GenericRow(); |
| long totalDocs = 0; |
| for (int i = 0; i < 3; i++) { |
| AvroRecordReader avroRecordReader = new AvroRecordReader(); |
| avroRecordReader.init(_avroFiles.get(i), null, null); |
| |
| long numDocsInSegment = 0; |
| while (avroRecordReader.hasNext()) { |
| avroRecordReader.next(reuse); |
| segmentWriter.collect(reuse); |
| numDocsInSegment++; |
| totalDocs++; |
| } |
| // flush to segment |
| URI segmentTarURI = segmentWriter.flush(); |
| // upload |
| segmentUploader.uploadSegment(segmentTarURI, null); |
| |
| // check num segments |
| Assert.assertEquals(getNumSegments(), i + 1); |
| // check numDocs in latest segment |
| Assert.assertEquals(getNumDocsInLatestSegment(), numDocsInSegment); |
| // check totalDocs in query |
| checkTotalDocsInQuery(totalDocs); |
| } |
| segmentWriter.close(); |
| |
| dropAllSegments(_tableNameWithType, TableType.OFFLINE); |
| checkNumSegments(0); |
| |
| // upload all together using dir |
| segmentUploader.uploadSegmentsFromDir(_tarDir.toURI(), null); |
| // check num segments |
| Assert.assertEquals(getNumSegments(), 3); |
| // check totalDocs in query |
| checkTotalDocsInQuery(totalDocs); |
| |
| dropOfflineTable(_tableNameWithType); |
| } |
| |
| private int getNumSegments() |
| throws IOException { |
| String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder. |
| forSegmentListAPI(_tableNameWithType, TableType.OFFLINE.toString())); |
| JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr); |
| return array.get(0).get("OFFLINE").size(); |
| } |
| |
| private int getTotalDocsFromQuery() |
| throws Exception { |
| JsonNode response = postQuery(String.format("select count(*) from %s", _tableNameWithType), _brokerBaseApiUrl); |
| return response.get("resultTable").get("rows").get(0).get(0).asInt(); |
| } |
| |
| private int getNumDocsInLatestSegment() |
| throws IOException { |
| String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder. |
| forSegmentListAPI(_tableNameWithType, TableType.OFFLINE.toString())); |
| JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr); |
| JsonNode segments = array.get(0).get("OFFLINE"); |
| String segmentName = segments.get(segments.size() - 1).asText(); |
| |
| jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder. |
| forSegmentMetadata(_tableNameWithType, segmentName)); |
| JsonNode metadata = JsonUtils.stringToJsonNode(jsonOutputStr); |
| return metadata.get("segment.total.docs").asInt(); |
| } |
| |
| private void checkTotalDocsInQuery(long expectedTotalDocs) { |
| TestUtils.waitForCondition(new Function<Void, Boolean>() { |
| @Nullable |
| @Override |
| public Boolean apply(@Nullable Void aVoid) { |
| try { |
| return getTotalDocsFromQuery() == expectedTotalDocs; |
| } catch (Exception e) { |
| LOGGER.error("Caught exception when getting totalDocs from query: {}", e.getMessage()); |
| return null; |
| } |
| } |
| }, 100L, 120_000, "Failed to load " + expectedTotalDocs + " documents", true); |
| } |
| |
| private void checkNumSegments(int expectedNumSegments) { |
| TestUtils.waitForCondition(new Function<Void, Boolean>() { |
| @Nullable |
| @Override |
| public Boolean apply(@Nullable Void aVoid) { |
| try { |
| return getNumSegments() == expectedNumSegments; |
| } catch (Exception e) { |
| LOGGER.error("Caught exception when getting num segments: {}", e.getMessage()); |
| return null; |
| } |
| } |
| }, 100L, 120_000, "Failed to load get num segments", true); |
| } |
| |
| @AfterClass |
| public void tearDown() |
| throws Exception { |
| stopServer(); |
| stopBroker(); |
| stopController(); |
| stopZk(); |
| |
| FileUtils.deleteDirectory(_tempDir); |
| } |
| } |