| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.table.action.compact.strategy; |
| |
| import org.apache.hudi.avro.model.HoodieCompactionOperation; |
| import org.apache.hudi.common.model.BaseFile; |
| import org.apache.hudi.common.model.HoodieBaseFile; |
| import org.apache.hudi.common.model.HoodieLogFile; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.common.util.collection.Pair; |
| import org.apache.hudi.config.HoodieCompactionConfig; |
| import org.apache.hudi.config.HoodieWriteConfig; |
| |
| import org.apache.hadoop.fs.Path; |
| import org.junit.jupiter.api.Test; |
| |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| public class TestHoodieCompactionStrategy { |
| |
| private static final long MB = 1024 * 1024L; |
| private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"}; |
| private static final Random RANDOM = new Random(); |
| |
| @Test |
| public void testUnBounded() { |
| Map<Long, List<Long>> sizesMap = new HashMap<>(); |
| sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB)); |
| sizesMap.put(110 * MB, new ArrayList<>()); |
| sizesMap.put(100 * MB, Collections.singletonList(MB)); |
| sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); |
| UnBoundedCompactionStrategy strategy = new UnBoundedCompactionStrategy(); |
| HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") |
| .withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build(); |
| List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap); |
| List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); |
| assertEquals(operations, returned, "UnBounded should not re-order or filter"); |
| } |
| |
| @Test |
| public void testBoundedIOSimple() { |
| Map<Long, List<Long>> sizesMap = new HashMap<>(); |
| sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB)); |
| sizesMap.put(110 * MB, new ArrayList<>()); |
| sizesMap.put(100 * MB, Collections.singletonList(MB)); |
| sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); |
| BoundedIOCompactionStrategy strategy = new BoundedIOCompactionStrategy(); |
| HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( |
| HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) |
| .build(); |
| List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap); |
| List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); |
| |
| assertTrue(returned.size() < operations.size(), "BoundedIOCompaction should have resulted in fewer compactions"); |
| assertEquals(2, returned.size(), "BoundedIOCompaction should have resulted in 2 compactions being chosen"); |
| // Total size of all the log files |
| Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)) |
| .map(Double::longValue).reduce(Long::sum).orElse(0L); |
| assertEquals(610, (long) returnedSize, |
| "Should chose the first 2 compactions which should result in a total IO of 690 MB"); |
| } |
| |
| @Test |
| public void testLogFileSizeCompactionSimple() { |
| Map<Long, List<Long>> sizesMap = new HashMap<>(); |
| sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB)); |
| sizesMap.put(110 * MB, new ArrayList<>()); |
| sizesMap.put(100 * MB, Collections.singletonList(MB)); |
| sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); |
| LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy(); |
| HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( |
| HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) |
| .build(); |
| List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap); |
| List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); |
| |
| assertTrue(returned.size() < operations.size(), |
| "LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions"); |
| assertEquals(1, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 1 compaction"); |
| // Total size of all the log files |
| Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)) |
| .map(Double::longValue).reduce(Long::sum).orElse(0L); |
| assertEquals(1204, (long) returnedSize, |
| "Should chose the first 2 compactions which should result in a total IO of 690 MB"); |
| } |
| |
| @Test |
| public void testDayBasedCompactionSimple() { |
| Map<Long, List<Long>> sizesMap = new HashMap<>(); |
| sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB)); |
| sizesMap.put(110 * MB, new ArrayList<>()); |
| sizesMap.put(100 * MB, Collections.singletonList(MB)); |
| sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); |
| |
| Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() { |
| { |
| put(120 * MB, partitionPaths[2]); |
| put(110 * MB, partitionPaths[2]); |
| put(100 * MB, partitionPaths[1]); |
| put(90 * MB, partitionPaths[0]); |
| } |
| }); |
| |
| DayBasedCompactionStrategy strategy = new DayBasedCompactionStrategy(); |
| HoodieWriteConfig writeConfig = |
| HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() |
| .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(1).build()).build(); |
| List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap); |
| List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); |
| |
| assertTrue(returned.size() < operations.size(), |
| "DayBasedCompactionStrategy should have resulted in fewer compactions"); |
| assertEquals(2, returned.size(), "DayBasedCompactionStrategy should have resulted in fewer compactions"); |
| |
| int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(), |
| returned.get(0).getPartitionPath()); |
| // Either the partition paths are sorted in descending order or they are equal |
| assertTrue(comparision >= 0, "DayBasedCompactionStrategy should sort partitions in descending order"); |
| } |
| |
| @Test |
| public void testBoundedPartitionAwareCompactionSimple() { |
| Map<Long, List<Long>> sizesMap = new HashMap<>(); |
| sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB)); |
| sizesMap.put(110 * MB, new ArrayList<>()); |
| sizesMap.put(100 * MB, Collections.singletonList(MB)); |
| sizesMap.put(70 * MB, Collections.singletonList(MB)); |
| sizesMap.put(80 * MB, Collections.singletonList(MB)); |
| sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); |
| |
| SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd"); |
| Date today = new Date(); |
| String currentDay = format.format(today); |
| |
| String currentDayMinus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-1)); |
| String currentDayMinus2 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-2)); |
| String currentDayMinus3 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-3)); |
| String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1)); |
| String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5)); |
| |
| Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() { |
| { |
| put(120 * MB, currentDay); |
| put(110 * MB, currentDayMinus1); |
| put(100 * MB, currentDayMinus2); |
| put(80 * MB, currentDayMinus3); |
| put(90 * MB, currentDayPlus1); |
| put(70 * MB, currentDayPlus5); |
| } |
| }); |
| |
| BoundedPartitionAwareCompactionStrategy strategy = new BoundedPartitionAwareCompactionStrategy(); |
| HoodieWriteConfig writeConfig = |
| HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() |
| .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build(); |
| List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap); |
| List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); |
| |
| assertTrue(returned.size() < operations.size(), |
| "BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions"); |
| assertEquals(5, returned.size(), |
| "BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions"); |
| |
| int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(), |
| returned.get(0).getPartitionPath()); |
| // Either the partition paths are sorted in descending order or they are equal |
| assertTrue(comparision >= 0, "BoundedPartitionAwareCompactionStrategy should sort partitions in descending order"); |
| } |
| |
| @Test |
| public void testUnboundedPartitionAwareCompactionSimple() { |
| Map<Long, List<Long>> sizesMap = new HashMap<>(); |
| sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB)); |
| sizesMap.put(110 * MB, new ArrayList<>()); |
| sizesMap.put(100 * MB, Collections.singletonList(MB)); |
| sizesMap.put(80 * MB, Collections.singletonList(MB)); |
| sizesMap.put(70 * MB, Collections.singletonList(MB)); |
| sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); |
| |
| SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd"); |
| Date today = new Date(); |
| String currentDay = format.format(today); |
| |
| String currentDayMinus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-1)); |
| String currentDayMinus2 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-2)); |
| String currentDayMinus3 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(-3)); |
| String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1)); |
| String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5)); |
| |
| Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() { |
| { |
| put(120 * MB, currentDay); |
| put(110 * MB, currentDayMinus1); |
| put(100 * MB, currentDayMinus2); |
| put(80 * MB, currentDayMinus3); |
| put(90 * MB, currentDayPlus1); |
| put(70 * MB, currentDayPlus5); |
| } |
| }); |
| |
| UnBoundedPartitionAwareCompactionStrategy strategy = new UnBoundedPartitionAwareCompactionStrategy(); |
| HoodieWriteConfig writeConfig = |
| HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder() |
| .withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build(); |
| List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap); |
| List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); |
| |
| assertTrue(returned.size() < operations.size(), |
| "UnBoundedPartitionAwareCompactionStrategy should not include last " |
| + writeConfig.getTargetPartitionsPerDayBasedCompaction() + " partitions or later partitions from today"); |
| assertEquals(1, returned.size(), |
| "BoundedPartitionAwareCompactionStrategy should have resulted in 1 compaction"); |
| } |
| |
| private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config, |
| Map<Long, List<Long>> sizesMap) { |
| Map<Long, String> keyToPartitionMap = sizesMap.keySet().stream() |
| .map(e -> Pair.of(e, partitionPaths[RANDOM.nextInt(partitionPaths.length - 1)])) |
| .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); |
| return createCompactionOperations(config, sizesMap, keyToPartitionMap); |
| } |
| |
| private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config, |
| Map<Long, List<Long>> sizesMap, Map<Long, String> keyToPartitionMap) { |
| List<HoodieCompactionOperation> operations = new ArrayList<>(sizesMap.size()); |
| |
| sizesMap.forEach((k, v) -> { |
| HoodieBaseFile df = TestHoodieBaseFile.newDataFile(k); |
| String partitionPath = keyToPartitionMap.get(k); |
| List<HoodieLogFile> logFiles = v.stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList()); |
| operations.add(new HoodieCompactionOperation(df.getCommitTime(), |
| logFiles.stream().map(s -> s.getPath().toString()).collect(Collectors.toList()), df.getPath(), df.getFileId(), |
| partitionPath, |
| config.getCompactionStrategy().captureMetrics(config, Option.of(df), partitionPath, logFiles), |
| df.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null)) |
| ); |
| }); |
| return operations; |
| } |
| |
| public static class TestHoodieBaseFile extends HoodieBaseFile { |
| |
| private final long size; |
| |
| public TestHoodieBaseFile(long size) { |
| super("/tmp/XYXYXYXYXYYX_11_20180918020003.parquet"); |
| this.size = size; |
| } |
| |
| public static HoodieBaseFile newDataFile(long size) { |
| return new TestHoodieBaseFile(size); |
| } |
| |
| @Override |
| public String getPath() { |
| return "/tmp/test"; |
| } |
| |
| @Override |
| public String getFileId() { |
| return UUID.randomUUID().toString(); |
| } |
| |
| @Override |
| public String getCommitTime() { |
| return "100"; |
| } |
| |
| @Override |
| public long getFileSize() { |
| return size; |
| } |
| } |
| |
| public static class TestHoodieLogFile extends HoodieLogFile { |
| |
| private final long size; |
| |
| public TestHoodieLogFile(long size) { |
| super("/tmp/.ce481ee7-9e53-4a2e-9992-f9e295fa79c0_20180919184844.log.1"); |
| this.size = size; |
| } |
| |
| public static HoodieLogFile newLogFile(long size) { |
| return new TestHoodieLogFile(size); |
| } |
| |
| @Override |
| public Path getPath() { |
| return new Path("/tmp/test-log"); |
| } |
| |
| @Override |
| public long getFileSize() { |
| return size; |
| } |
| } |
| } |