blob: 1866623fb65b656945771f32fa6170aefe9d4cbd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.minion;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
public class SegmentPurgerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentPurgerTest");
private static final File ORIGINAL_SEGMENT_DIR = new File(TEMP_DIR, "originalSegment");
private static final File PURGED_SEGMENT_DIR = new File(TEMP_DIR, "purgedSegment");
private static final Random RANDOM = new Random();
private static final int NUM_ROWS = 10000;
private static final String TABLE_NAME = "testTable";
private static final String SEGMENT_NAME = "testSegment";
private static final String D1 = "d1";
private static final String D2 = "d2";
private TableConfig _tableConfig;
private File _originalIndexDir;
private int _expectedNumRecordsPurged;
private int _expectedNumRecordsModified;
@BeforeClass
public void setUp()
throws Exception {
FileUtils.deleteDirectory(TEMP_DIR);
_tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInvertedIndexColumns(Collections.singletonList(D1)).setCreateInvertedIndexDuringSegmentGeneration(true)
.build();
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT)
.addSingleValueDimension(D2, FieldSpec.DataType.INT).build();
List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS; i++) {
GenericRow row = new GenericRow();
int value1 = RANDOM.nextInt(100);
int value2 = RANDOM.nextInt(100);
if (value1 == 0) {
_expectedNumRecordsPurged++;
} else if (value2 == 0) {
_expectedNumRecordsModified++;
}
row.putValue(D1, value1);
row.putValue(D2, value2);
rows.add(row);
}
GenericRowRecordReader genericRowRecordReader = new GenericRowRecordReader(rows);
SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, schema);
config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
config.setSegmentName(SEGMENT_NAME);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(config, genericRowRecordReader);
driver.build();
_originalIndexDir = new File(ORIGINAL_SEGMENT_DIR, SEGMENT_NAME);
}
@Test
public void testPurgeSegment()
throws Exception {
// Purge records with d1 = 0
SegmentPurger.RecordPurger recordPurger = row -> row.getValue(D1).equals(0);
// Modify records with d2 = 0 to d2 = Integer.MAX_VALUE
SegmentPurger.RecordModifier recordModifier = row -> {
if (row.getValue(D2).equals(0)) {
row.putValue(D2, Integer.MAX_VALUE);
return true;
} else {
return false;
}
};
SegmentPurger segmentPurger =
new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, recordPurger, recordModifier);
File purgedIndexDir = segmentPurger.purgeSegment();
// Check the purge/modify counter in segment purger
assertEquals(segmentPurger.getNumRecordsPurged(), _expectedNumRecordsPurged);
assertEquals(segmentPurger.getNumRecordsModified(), _expectedNumRecordsModified);
// Check crc and index creation time
SegmentMetadataImpl purgedSegmentMetadata = new SegmentMetadataImpl(purgedIndexDir);
SegmentMetadataImpl originalSegmentMetadata = new SegmentMetadataImpl(_originalIndexDir);
assertNotEquals(purgedSegmentMetadata.getCrc(), originalSegmentMetadata.getCrc());
assertEquals(purgedSegmentMetadata.getIndexCreationTime(), originalSegmentMetadata.getIndexCreationTime());
try (PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(purgedIndexDir)) {
int numRecordsRemaining = 0;
int numRecordsModified = 0;
GenericRow row = new GenericRow();
while (pinotSegmentRecordReader.hasNext()) {
row = pinotSegmentRecordReader.next(row);
// Purged segment should not have any record with d1 = 0 or d2 = 0
assertNotEquals(row.getValue(D1), 0);
assertNotEquals(row.getValue(D2), 0);
numRecordsRemaining++;
if (row.getValue(D2).equals(Integer.MAX_VALUE)) {
numRecordsModified++;
}
}
assertEquals(numRecordsRemaining, NUM_ROWS - _expectedNumRecordsPurged);
assertEquals(numRecordsModified, _expectedNumRecordsModified);
}
// Check inverted index
Map<String, Object> props = new HashMap<>();
props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString());
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(purgedIndexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setTableConfig(_tableConfig)
.setSegmentName(purgedSegmentMetadata.getName()).setSegmentDirectoryConfigs(new PinotConfiguration(props))
.build()); SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
assertTrue(reader.hasIndexFor(D1, ColumnIndexType.INVERTED_INDEX));
assertFalse(reader.hasIndexFor(D2, ColumnIndexType.INVERTED_INDEX));
}
}
@AfterClass
public void tearDown()
throws Exception {
FileUtils.deleteDirectory(TEMP_DIR);
}
}