blob: 248f9f471a9e14aa935477ee8078621401ad5984 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.realtime.converter;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.converter.ColumnIndicesForRealtimeTable;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
public class RealtimeSegmentConverterTest {
private static final String STRING_COLUMN1 = "string_col1";
private static final String STRING_COLUMN2 = "string_col2";
private static final String STRING_COLUMN3 = "string_col3";
private static final String STRING_COLUMN4 = "string_col4";
private static final String LONG_COLUMN1 = "long_col1";
private static final String LONG_COLUMN2 = "long_col2";
private static final String LONG_COLUMN3 = "long_col3";
private static final String LONG_COLUMN4 = "long_col4";
private static final String MV_INT_COLUMN = "mv_col";
private static final String DATE_TIME_COLUMN = "date_time_col";
private static final File TMP_DIR =
new File(FileUtils.getTempDirectory(), RealtimeSegmentConverterTest.class.getName());
public void setup() {
Preconditions.checkState(TMP_DIR.mkdirs());
}
@Test
public void testNoVirtualColumnsInSchema() {
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING)
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "col1"),
new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.DAYS, "col2")).build();
String segmentName = "segment1";
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, segmentName);
Assert.assertEquals(schema.getColumnNames().size(), 5);
Schema newSchema = RealtimeSegmentConverter.getUpdatedSchema(schema);
Assert.assertEquals(newSchema.getColumnNames().size(), 2);
}
@Test
public void testNoRecordsIndexed()
throws Exception {
File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
.setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
.setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
.setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2))
.setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
.setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).build();
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
.addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
.addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
.addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
.addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
.addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT).addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
.addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
String tableNameWithType = tableConfig.getTableName();
String segmentName = "testTable__0__0__123456";
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
.setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN).setCapacity(1000)
.setAvgNumMultiValues(3).setNoDictionaryColumns(Sets.newHashSet(LONG_COLUMN2))
.setVarLengthDictionaryColumns(Sets.newHashSet(STRING_COLUMN3))
.setInvertedIndexColumns(Sets.newHashSet(STRING_COLUMN1))
.setSegmentZKMetadata(getSegmentZKMetadata(segmentName)).setOffHeap(true)
.setMemoryManager(new DirectMemoryManager(segmentName))
.setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new File(tmpDir, "stats")))
.setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
// create mutable segment impl
MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
File outputDir = new File(tmpDir, "outputDir");
SegmentZKPropsConfig segmentZKPropsConfig = new SegmentZKPropsConfig();
segmentZKPropsConfig.setStartOffset("1");
segmentZKPropsConfig.setEndOffset("100");
ColumnIndicesForRealtimeTable cdc = new ColumnIndicesForRealtimeTable(indexingConfig.getSortedColumn().get(0),
indexingConfig.getInvertedIndexColumns(), null, null,
indexingConfig.getNoDictionaryColumns(), indexingConfig.getVarLengthDictionaryColumns());
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(mutableSegmentImpl, segmentZKPropsConfig, outputDir.getAbsolutePath(), schema,
tableNameWithType, tableConfig, segmentName, cdc, false);
converter.build(SegmentVersion.v3, null);
SegmentMetadataImpl metadata = new SegmentMetadataImpl(new File(outputDir, segmentName));
Assert.assertEquals(metadata.getTotalDocs(), 0);
Assert.assertEquals(metadata.getTimeColumn(), DATE_TIME_COLUMN);
Assert.assertEquals(metadata.getTimeUnit(), TimeUnit.MILLISECONDS);
Assert.assertEquals(metadata.getStartTime(), metadata.getEndTime());
Assert.assertTrue(metadata.getAllColumns().containsAll(schema.getColumnNames()));
Assert.assertEquals(
metadata.getPropertiesConfiguration().getProperty(CommonConstants.Segment.Realtime.START_OFFSET), "1");
Assert.assertEquals(metadata.getPropertiesConfiguration().getProperty(CommonConstants.Segment.Realtime.END_OFFSET),
"100");
}
private SegmentZKMetadata getSegmentZKMetadata(String segmentName) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentName);
segmentZKMetadata.setCreationTime(System.currentTimeMillis());
return segmentZKMetadata;
}
public void destroy() {
FileUtils.deleteQuietly(TMP_DIR);
}
}