blob: c85b494c292fa41a884ace4795e09b047a5c1c4e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner;
import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner;
import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PinotFSSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* Test for advanced push types.
* Currently only tests METADATA push type.
* todo: add test for URI push
*/
public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest {
@Override
protected Map<String, String> getStreamConfigs() {
return null;
}
@Override
protected String getSortedColumn() {
return null;
}
@Override
protected List<String> getInvertedIndexColumns() {
return null;
}
@Override
protected List<String> getNoDictionaryColumns() {
return null;
}
@Override
protected List<String> getRangeIndexColumns() {
return null;
}
@Override
protected List<String> getBloomFilterColumns() {
return null;
}
@BeforeMethod
public void setUpTest()
throws IOException {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
}
@BeforeClass
public void setUp()
throws Exception {
// Start Zk and Kafka
startZk();
// Start the Pinot cluster
startController();
startBroker();
startServer();
}
@Test
public void testUploadAndQuery()
throws Exception {
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig offlineTableConfig = createOfflineTableConfig();
addTableConfig(offlineTableConfig);
List<File> avroFiles = getAllAvroFiles();
// Create 1 segment, for METADATA push WITH move to final location
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(0), offlineTableConfig, schema, "_with_move",
_segmentDir, _tarDir);
SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner();
SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
PushJobSpec pushJobSpec = new PushJobSpec();
// set moveToDeepStoreForMetadataPush to true
pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
jobSpec.setPushJobSpec(pushJobSpec);
PinotFSSpec fsSpec = new PinotFSSpec();
fsSpec.setScheme("file");
fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
TableSpec tableSpec = new TableSpec();
tableSpec.setTableName(DEFAULT_TABLE_NAME);
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME));
jobSpec.setTableSpec(tableSpec);
PinotClusterSpec clusterSpec = new PinotClusterSpec();
clusterSpec.setControllerURI(_controllerBaseApiUrl);
jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
File dataDir = new File(_controllerConfig.getDataDir());
File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);
// Not present in dataDir, only present in sourceDir
Assert.assertFalse(dataDirSegments.exists());
Assert.assertEquals(_tarDir.listFiles().length, 1);
runner.init(jobSpec);
runner.run();
// Segment should be seen in dataDir
Assert.assertTrue(dataDirSegments.exists());
Assert.assertEquals(dataDirSegments.listFiles().length, 1);
Assert.assertEquals(_tarDir.listFiles().length, 1);
// test segment loaded
JsonNode segmentsList = getSegmentsList();
Assert.assertEquals(segmentsList.size(), 1);
String segmentNameWithMove = segmentsList.get(0).asText();
Assert.assertTrue(segmentNameWithMove.endsWith("_with_move"));
long numDocs = getNumDocs(segmentNameWithMove);
testCountStar(numDocs);
// Clear segment and tar dir
for (File segment : _segmentDir.listFiles()) {
FileUtils.deleteQuietly(segment);
}
for (File tar : _tarDir.listFiles()) {
FileUtils.deleteQuietly(tar);
}
// Create 1 segment, for METADATA push WITHOUT move to final location
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(1), offlineTableConfig, schema, "_without_move",
_segmentDir, _tarDir);
jobSpec.setPushJobSpec(new PushJobSpec());
runner = new SegmentMetadataPushJobRunner();
Assert.assertEquals(dataDirSegments.listFiles().length, 1);
Assert.assertEquals(_tarDir.listFiles().length, 1);
runner.init(jobSpec);
runner.run();
// should not see new segments in dataDir
Assert.assertEquals(dataDirSegments.listFiles().length, 1);
Assert.assertEquals(_tarDir.listFiles().length, 1);
// test segment loaded
segmentsList = getSegmentsList();
Assert.assertEquals(segmentsList.size(), 2);
String segmentNameWithoutMove = null;
for (JsonNode segment : segmentsList) {
if (segment.asText().endsWith("_without_move")) {
segmentNameWithoutMove = segment.asText();
}
}
Assert.assertNotNull(segmentNameWithoutMove);
numDocs += getNumDocs(segmentNameWithoutMove);
testCountStar(numDocs);
}
/**
* Runs both SegmentMetadataPushJobRunner and SegmentTarPushJobRunner while enabling consistent data push.
* Checks that segments are properly loaded and segment lineage entry were also in expected states.
*/
@Test
public void testUploadAndQueryWithConsistentPush()
throws Exception {
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig offlineTableConfig = createOfflineTableConfigWithConsistentPush();
addTableConfig(offlineTableConfig);
List<File> avroFiles = getAllAvroFiles();
String firstTimeStamp = Long.toString(System.currentTimeMillis());
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(0), offlineTableConfig, schema, firstTimeStamp,
_segmentDir, _tarDir);
// First test standalone metadata push job runner
BaseSegmentPushJobRunner runner = new SegmentMetadataPushJobRunner();
SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec();
PushJobSpec pushJobSpec = new PushJobSpec();
pushJobSpec.setCopyToDeepStoreForMetadataPush(true);
jobSpec.setPushJobSpec(pushJobSpec);
PinotFSSpec fsSpec = new PinotFSSpec();
fsSpec.setScheme("file");
fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS");
jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec));
jobSpec.setOutputDirURI(_tarDir.getAbsolutePath());
TableSpec tableSpec = new TableSpec();
tableSpec.setTableName(DEFAULT_TABLE_NAME);
tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME));
jobSpec.setTableSpec(tableSpec);
PinotClusterSpec clusterSpec = new PinotClusterSpec();
clusterSpec.setControllerURI(_controllerBaseApiUrl);
jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec});
File dataDir = new File(_controllerConfig.getDataDir());
File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME);
Assert.assertEquals(_tarDir.listFiles().length, 1);
runner.init(jobSpec);
runner.run();
// Segment should be seen in dataDir
Assert.assertTrue(dataDirSegments.exists());
Assert.assertEquals(dataDirSegments.listFiles().length, 1);
Assert.assertEquals(_tarDir.listFiles().length, 1);
// test segment loaded
JsonNode segmentsList = getSegmentsList();
Assert.assertEquals(segmentsList.size(), 1);
String firstSegmentName = segmentsList.get(0).asText();
Assert.assertTrue(firstSegmentName.endsWith(firstTimeStamp));
long numDocs = getNumDocs(firstSegmentName);
testCountStar(numDocs);
// Fetch segment lineage entry after running segment metadata push with consistent push enabled.
String segmentLineageResponse = ControllerTest.sendGetRequest(
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl)
.forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString()));
// Segment lineage should be in completed state.
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\""));
// SegmentsFrom should be empty as we started with a blank table.
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[]"));
// SegmentsTo should contain uploaded segment.
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"" + firstSegmentName + "\"]"));
// Clear segment and tar dir
for (File segment : _segmentDir.listFiles()) {
FileUtils.deleteQuietly(segment);
}
for (File tar : _tarDir.listFiles()) {
FileUtils.deleteQuietly(tar);
}
String secondTimeStamp = Long.toString(System.currentTimeMillis());
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(1), offlineTableConfig, schema, secondTimeStamp,
_segmentDir, _tarDir);
jobSpec.setPushJobSpec(new PushJobSpec());
// Now test standalone tar push job runner
runner = new SegmentTarPushJobRunner();
Assert.assertEquals(dataDirSegments.listFiles().length, 1);
Assert.assertEquals(_tarDir.listFiles().length, 1);
runner.init(jobSpec);
runner.run();
Assert.assertEquals(_tarDir.listFiles().length, 1);
// test segment loaded
segmentsList = getSegmentsList();
Assert.assertEquals(segmentsList.size(), 2);
String secondSegmentName = null;
for (JsonNode segment : segmentsList) {
if (segment.asText().endsWith(secondTimeStamp)) {
secondSegmentName = segment.asText();
}
}
Assert.assertNotNull(secondSegmentName);
numDocs = getNumDocs(secondSegmentName);
testCountStar(numDocs);
// Fetch segment lineage entry after running segment tar push with consistent push enabled.
segmentLineageResponse = ControllerTest.sendGetRequest(
ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl)
.forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString()));
// Segment lineage should be in completed state.
Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\""));
// SegmentsFrom should contain the previous segment
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsFrom\":[\"" + firstSegmentName + "\"]"));
// SegmentsTo should contain uploaded segment.
Assert.assertTrue(segmentLineageResponse.contains("\"segmentsTo\":[\"" + secondSegmentName + "\"]"));
}
protected TableConfig createOfflineTableConfigWithConsistentPush() {
TableConfig offlineTableConfig = createOfflineTableConfig();
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY", true));
offlineTableConfig.setIngestionConfig(ingestionConfig);
return offlineTableConfig;
}
private long getNumDocs(String segmentName)
throws IOException {
return JsonUtils.stringToJsonNode(
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME, segmentName)))
.get("segment.total.docs").asLong();
}
private JsonNode getSegmentsList()
throws IOException {
return JsonUtils.stringToJsonNode(sendGetRequest(
_controllerRequestURLBuilder.forSegmentListAPI(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())))
.get(0).get("OFFLINE");
}
protected void testCountStar(final long countStarResult) {
TestUtils.waitForCondition(new Function<Void, Boolean>() {
@Nullable
@Override
public Boolean apply(@Nullable Void aVoid) {
try {
return getCurrentCountStarResult() == countStarResult;
} catch (Exception e) {
return null;
}
}
}, 100L, 300_000, "Failed to load " + countStarResult + " documents", true);
}
@AfterMethod
public void tearDownTest()
throws IOException {
dropOfflineTable(getTableName());
}
@AfterClass
public void tearDown()
throws Exception {
stopServer();
stopBroker();
stopController();
stopZk();
}
}