blob: c5b9ac84f6a7f4c2ca95358f606f667b2c61dcbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.optional;
@RunWith(Parameterized.class)
public class TestSplitPlanning extends TableTestBase {
private static final Configuration CONF = new Configuration();
private static final HadoopTables TABLES = new HadoopTables(CONF);
private static final Schema SCHEMA = new Schema(
optional(1, "id", Types.IntegerType.get()),
optional(2, "data", Types.StringType.get())
);
@Rule
public TemporaryFolder temp = new TemporaryFolder();
private Table table = null;
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
}
public TestSplitPlanning(int formatVersion) {
super(formatVersion);
}
@Before
public void setupTable() throws IOException {
File tableDir = temp.newFolder();
String tableLocation = tableDir.toURI().toString();
table = TABLES.create(SCHEMA, tableLocation);
table.updateProperties()
.set(TableProperties.SPLIT_SIZE, String.valueOf(128 * 1024 * 1024))
.set(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(4 * 1024 * 1024))
.set(TableProperties.SPLIT_LOOKBACK, String.valueOf(Integer.MAX_VALUE))
.commit();
}
@Test
public void testBasicSplitPlanning() {
List<DataFile> files128Mb = newFiles(4, 128 * 1024 * 1024);
appendFiles(files128Mb);
// we expect 4 bins since split size is 128MB and we have 4 files 128MB each
Assert.assertEquals(4, Iterables.size(table.newScan().planTasks()));
List<DataFile> files32Mb = newFiles(16, 32 * 1024 * 1024);
appendFiles(files32Mb);
// we expect 8 bins after we add 16 files 32MB each as they will form additional 4 bins
Assert.assertEquals(8, Iterables.size(table.newScan().planTasks()));
}
@Test
public void testSplitPlanningWithSmallFiles() {
List<DataFile> files60Mb = newFiles(50, 60 * 1024 * 1024);
List<DataFile> files5Kb = newFiles(370, 5 * 1024);
Iterable<DataFile> files = Iterables.concat(files60Mb, files5Kb);
appendFiles(files);
// 50 files of size 60MB will form 25 bins as split size is 128MB
// each of those bins will have 8MB left and all 370 files of size 5KB would end up
// in one of them without "read.split.open-file-cost"
// as "read.split.open-file-cost" is 4MB, each of the original 25 bins will get at most 2 files
// so 50 of 370 files will be packed into the existing 25 bins and the remaining 320 files
// will form additional 10 bins, resulting in 35 bins in total
Assert.assertEquals(35, Iterables.size(table.newScan().planTasks()));
}
@Test
public void testSplitPlanningWithNoMinWeight() {
table.updateProperties()
.set(TableProperties.SPLIT_OPEN_FILE_COST, "0")
.commit();
List<DataFile> files60Mb = newFiles(2, 60 * 1024 * 1024);
List<DataFile> files5Kb = newFiles(100, 5 * 1024);
Iterable<DataFile> files = Iterables.concat(files60Mb, files5Kb);
appendFiles(files);
// all small files will be packed into one bin as "read.split.open-file-cost" is set to 0
Assert.assertEquals(1, Iterables.size(table.newScan().planTasks()));
}
@Test
public void testSplitPlanningWithOverridenSize() {
List<DataFile> files128Mb = newFiles(4, 128 * 1024 * 1024);
appendFiles(files128Mb);
// we expect 2 bins since we are overriding split size in scan with 256MB
TableScan scan = table.newScan()
.option(TableProperties.SPLIT_SIZE, String.valueOf(256L * 1024 * 1024));
Assert.assertEquals(2, Iterables.size(scan.planTasks()));
}
@Test
public void testSplitPlanningWithOverriddenSizeForMetadataJsonFile() {
List<DataFile> files8Mb = newFiles(32, 8 * 1024 * 1024, FileFormat.METADATA);
appendFiles(files8Mb);
// we expect 16 bins since we are overriding split size in scan with 16MB
TableScan scan = table.newScan()
.option(TableProperties.SPLIT_SIZE, String.valueOf(16L * 1024 * 1024));
Assert.assertEquals(16, Iterables.size(scan.planTasks()));
}
@Test
public void testSplitPlanningWithOverriddenSizeForLargeMetadataJsonFile() {
List<DataFile> files128Mb = newFiles(4, 128 * 1024 * 1024, FileFormat.METADATA);
appendFiles(files128Mb);
// although overriding split size in scan with 8MB, we expect 4 bins since metadata file is not splittable
TableScan scan = table.newScan()
.option(TableProperties.SPLIT_SIZE, String.valueOf(8L * 1024 * 1024));
Assert.assertEquals(4, Iterables.size(scan.planTasks()));
}
@Test
public void testSplitPlanningWithOverridenLookback() {
List<DataFile> files120Mb = newFiles(1, 120 * 1024 * 1024);
List<DataFile> file128Mb = newFiles(1, 128 * 1024 * 1024);
Iterable<DataFile> files = Iterables.concat(files120Mb, file128Mb);
appendFiles(files);
// we expect 2 bins from non-overriden table properties
TableScan scan = table.newScan()
.option(TableProperties.SPLIT_LOOKBACK, "1");
CloseableIterable<CombinedScanTask> tasks = scan.planTasks();
Assert.assertEquals(2, Iterables.size(tasks));
// since lookback was overridden to 1, we expect the first bin to be the largest of the two.
CombinedScanTask combinedScanTask = tasks.iterator().next();
FileScanTask task = combinedScanTask.files().iterator().next();
Assert.assertEquals(128 * 1024 * 1024, task.length());
}
@Test
public void testSplitPlanningWithOverridenOpenCostSize() {
List<DataFile> files16Mb = newFiles(16, 16 * 1024 * 1024);
appendFiles(files16Mb);
// we expect 4 bins since we are overriding open file cost in scan with a cost of 32MB
// we can fit at most 128Mb/32Mb = 4 files per bin
TableScan scan = table.newScan()
.option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(32L * 1024 * 1024));
Assert.assertEquals(4, Iterables.size(scan.planTasks()));
}
private void appendFiles(Iterable<DataFile> files) {
AppendFiles appendFiles = table.newAppend();
files.forEach(appendFiles::appendFile);
appendFiles.commit();
}
private List<DataFile> newFiles(int numFiles, long sizeInBytes) {
return newFiles(numFiles, sizeInBytes, FileFormat.PARQUET);
}
private List<DataFile> newFiles(int numFiles, long sizeInBytes, FileFormat fileFormat) {
List<DataFile> files = Lists.newArrayList();
for (int fileNum = 0; fileNum < numFiles; fileNum++) {
files.add(newFile(sizeInBytes, fileFormat));
}
return files;
}
private DataFile newFile(long sizeInBytes, FileFormat fileFormat) {
String fileName = UUID.randomUUID().toString();
return DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(fileFormat.addExtension(fileName))
.withFileSizeInBytes(sizeInBytes)
.withRecordCount(2)
.build();
}
}