blob: c65216260ec079d478a7fc2b79d0c4bc85205b88 [file] [log] [blame]
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.common.util;
import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static com.uber.hoodie.common.model.HoodieTestUtils.getDefaultHadoopConf;
import static com.uber.hoodie.common.util.CompactionTestUtils.createCompactionPlan;
import static com.uber.hoodie.common.util.CompactionTestUtils.scheduleCompaction;
import static com.uber.hoodie.common.util.CompactionTestUtils.setupAndValidateCompactionOperations;
import com.google.common.collect.ImmutableMap;
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.CompactionTestUtils.TestHoodieDataFile;
import com.uber.hoodie.common.util.collection.Pair;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestCompactionUtils {
private static final Map<String, Double> metrics =
new ImmutableMap.Builder<String, Double>()
.put("key1", 1.0)
.put("key2", 3.0).build();
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
private HoodieTableMetaClient metaClient;
private String basePath;
private Function<Pair<String, FileSlice>, Map<String, Double>> metricsCaptureFn = (partitionFileSlice) -> metrics;
@Before
public void init() throws IOException {
metaClient = HoodieTestUtils.initTableType(getDefaultHadoopConf(),
tmpFolder.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
basePath = metaClient.getBasePath();
}
@Test
public void testBuildFromFileSlice() {
// Empty File-Slice with no data and log files
FileSlice emptyFileSlice = new FileSlice("000", "empty1");
HoodieCompactionOperation op = CompactionUtils.buildFromFileSlice(
DEFAULT_PARTITION_PATHS[0], emptyFileSlice, Optional.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(emptyFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
// File Slice with data-file but no log files
FileSlice noLogFileSlice = new FileSlice("000", "noLog1");
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
op = CompactionUtils.buildFromFileSlice(
DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Optional.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
//File Slice with no data-file but log files present
FileSlice noDataFileSlice = new FileSlice("000", "noData1");
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
op = CompactionUtils.buildFromFileSlice(
DEFAULT_PARTITION_PATHS[0], noDataFileSlice, Optional.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
//File Slice with data-file and log files present
FileSlice fileSlice = new FileSlice("000", "noData1");
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
fileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
fileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
op = CompactionUtils.buildFromFileSlice(
DEFAULT_PARTITION_PATHS[0], fileSlice, Optional.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0]);
}
/**
* Generate input for compaction plan tests
*/
private Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> buildCompactionPlan() {
FileSlice emptyFileSlice = new FileSlice("000", "empty1");
FileSlice fileSlice = new FileSlice("000", "noData1");
fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
fileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
fileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
FileSlice noLogFileSlice = new FileSlice("000", "noLog1");
noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog.parquet"));
FileSlice noDataFileSlice = new FileSlice("000", "noData1");
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
List<FileSlice> fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice);
List<Pair<String, FileSlice>> input = fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f))
.collect(Collectors.toList());
return Pair.of(input, CompactionUtils.buildFromFileSlices(input, Optional.empty(), Optional.of(metricsCaptureFn)));
}
@Test
public void testBuildFromFileSlices() {
Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> inputAndPlan = buildCompactionPlan();
testFileSlicesCompactionPlanEquality(inputAndPlan.getKey(), inputAndPlan.getValue());
}
@Test
public void testCompactionTransformation() {
// check HoodieCompactionOperation <=> CompactionOperation transformation function
Pair<List<Pair<String, FileSlice>>, HoodieCompactionPlan> inputAndPlan = buildCompactionPlan();
HoodieCompactionPlan plan = inputAndPlan.getRight();
List<HoodieCompactionOperation> originalOps = plan.getOperations();
List<HoodieCompactionOperation> regeneratedOps =
originalOps.stream().map(op -> {
// Convert to CompactionOperation
return CompactionUtils.buildCompactionOperation(op);
}).map(op2 -> {
// Convert back to HoodieCompactionOperation and check for equality
return CompactionUtils.buildHoodieCompactionOperation(op2);
}).collect(Collectors.toList());
Assert.assertTrue("Transformation did get tested", originalOps.size() > 0);
Assert.assertEquals("All fields set correctly in transformations", originalOps, regeneratedOps);
}
@Test(expected = IllegalStateException.class)
public void testGetAllPendingCompactionOperationsWithDupFileId() throws IOException {
// Case where there is duplicate fileIds in compaction requests
HoodieCompactionPlan plan1 = createCompactionPlan(metaClient, "000", "001", 10, true, true);
HoodieCompactionPlan plan2 = createCompactionPlan(metaClient, "002", "003", 0, false, false);
scheduleCompaction(metaClient, "001", plan1);
scheduleCompaction(metaClient, "003", plan2);
// schedule same plan again so that there will be duplicates
scheduleCompaction(metaClient, "005", plan1);
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
Map<String, Pair<String, HoodieCompactionOperation>> res =
CompactionUtils.getAllPendingCompactionOperations(metaClient);
}
@Test
public void testGetAllPendingCompactionOperations() throws IOException {
// Case where there are 4 compaction requests where 1 is empty.
setupAndValidateCompactionOperations(metaClient, false, 10, 10, 10, 0);
}
@Test
public void testGetAllPendingInflightCompactionOperations() throws IOException {
// Case where there are 4 compaction requests where 1 is empty. All of them are marked inflight
setupAndValidateCompactionOperations(metaClient, true, 10, 10, 10, 0);
}
@Test
public void testGetAllPendingCompactionOperationsForEmptyCompactions() throws IOException {
// Case where there are 4 compaction requests and all are empty.
setupAndValidateCompactionOperations(metaClient, false, 0, 0, 0, 0);
}
/**
* Validates if generated compaction plan matches with input file-slices
*
* @param input File Slices with partition-path
* @param plan Compaction Plan
*/
private void testFileSlicesCompactionPlanEquality(List<Pair<String, FileSlice>> input,
HoodieCompactionPlan plan) {
Assert.assertEquals("All file-slices present", input.size(), plan.getOperations().size());
IntStream.range(0, input.size()).boxed().forEach(idx ->
testFileSliceCompactionOpEquality(input.get(idx).getValue(), plan.getOperations().get(idx),
input.get(idx).getKey()));
}
/**
* Validates if generated compaction operation matches with input file slice and partition path
*
* @param slice File Slice
* @param op HoodieCompactionOperation
* @param expPartitionPath Partition path
*/
private void testFileSliceCompactionOpEquality(FileSlice slice, HoodieCompactionOperation op,
String expPartitionPath) {
Assert.assertEquals("Partition path is correct", expPartitionPath, op.getPartitionPath());
Assert.assertEquals("Same base-instant", slice.getBaseInstantTime(), op.getBaseInstantTime());
Assert.assertEquals("Same file-id", slice.getFileId(), op.getFileId());
if (slice.getDataFile().isPresent()) {
Assert.assertEquals("Same data-file", slice.getDataFile().get().getPath(), op.getDataFilePath());
}
List<String> paths = slice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList());
IntStream.range(0, paths.size()).boxed().forEach(idx -> {
Assert.assertEquals("Log File Index " + idx, paths.get(idx), op.getDeltaFilePaths().get(idx));
});
Assert.assertEquals("Metrics set", metrics, op.getMetrics());
}
}