blob: e30126a8bb063aec34beaa72b11aedd8167504cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;
import java.util.Collections;
import java.util.List;
import org.apache.iceberg.MicroBatches.MicroBatch;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestMicroBatchBuilder extends TableTestBase {
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] { 1, 2 };
}
public TestMicroBatchBuilder(int formatVersion) {
super(formatVersion);
}
@Before
public void setupTableProperties() {
table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "3").commit();
}
@Test
public void testGenerateMicroBatch() {
add(table.newAppend(), files("A", "B", "C", "D", "E"));
MicroBatch batch = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, Long.MAX_VALUE, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 5);
Assert.assertEquals(batch.sizeInBytes(), 50);
Assert.assertTrue(batch.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A", "B", "C", "D", "E"), filesToScan(batch.tasks()));
MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 15L, true);
Assert.assertEquals(batch1.endFileIndex(), 1);
Assert.assertEquals(batch1.sizeInBytes(), 10);
Assert.assertFalse(batch1.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A"), filesToScan(batch1.tasks()));
MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 30L, true);
Assert.assertEquals(batch2.endFileIndex(), 4);
Assert.assertEquals(batch2.sizeInBytes(), 30);
Assert.assertFalse(batch2.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("B", "C", "D"), filesToScan(batch2.tasks()));
MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 50L, true);
Assert.assertEquals(batch3.endFileIndex(), 5);
Assert.assertEquals(batch3.sizeInBytes(), 10);
Assert.assertTrue(batch3.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("E"), filesToScan(batch3.tasks()));
}
@Test
public void testGenerateMicroBatchWithSmallTargetSize() {
add(table.newAppend(), files("A", "B", "C", "D", "E"));
MicroBatch batch = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(0, 10L, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 1);
Assert.assertEquals(batch.sizeInBytes(), 10);
Assert.assertFalse(batch.lastIndexOfSnapshot());
filesMatch(Lists.newArrayList("A"), filesToScan(batch.tasks()));
MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch.endFileIndex(), 5L, true);
Assert.assertEquals(batch1.endFileIndex(), 2);
Assert.assertEquals(batch1.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks()));
Assert.assertFalse(batch1.lastIndexOfSnapshot());
MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch1.endFileIndex(), 10L, true);
Assert.assertEquals(batch2.endFileIndex(), 3);
Assert.assertEquals(batch2.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks()));
Assert.assertFalse(batch2.lastIndexOfSnapshot());
MicroBatch batch3 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch2.endFileIndex(), 10L, true);
Assert.assertEquals(batch3.endFileIndex(), 4);
Assert.assertEquals(batch3.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks()));
Assert.assertFalse(batch3.lastIndexOfSnapshot());
MicroBatch batch4 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch3.endFileIndex(), 5L, true);
Assert.assertEquals(batch4.endFileIndex(), 5);
Assert.assertEquals(batch4.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks()));
Assert.assertTrue(batch4.lastIndexOfSnapshot());
MicroBatch batch5 = MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
.generate(batch4.endFileIndex(), 5L, true);
Assert.assertEquals(batch5.endFileIndex(), 5);
Assert.assertEquals(batch5.sizeInBytes(), 0);
Assert.assertTrue(Iterables.isEmpty(batch5.tasks()));
Assert.assertTrue(batch5.lastIndexOfSnapshot());
}
private static DataFile file(String name) {
return DataFiles.builder(SPEC)
.withPath(name + ".parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0") // easy way to set partition data for now
.withRecordCount(1)
.build();
}
private static void add(AppendFiles appendFiles, List<DataFile> adds) {
for (DataFile f : adds) {
appendFiles.appendFile(f);
}
appendFiles.commit();
}
private static void delete(DeleteFiles deleteFiles, List<DataFile> deletes) {
for (DataFile f : deletes) {
deleteFiles.deleteFile(f);
}
deleteFiles.commit();
}
private static List<DataFile> files(String... names) {
return Lists.transform(Lists.newArrayList(names), TestMicroBatchBuilder::file);
}
private static List<String> filesToScan(Iterable<FileScanTask> tasks) {
Iterable<String> filesToRead = Iterables.transform(tasks, t -> {
String path = t.file().path().toString();
return path.split("\\.")[0];
});
return Lists.newArrayList(filesToRead);
}
private static void filesMatch(List<String> expected, List<String> actual) {
Collections.sort(expected);
Collections.sort(actual);
Assert.assertEquals(expected, actual);
}
}