blob: 9cacf1faac27f7120b24123399424d7972c30d55 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.testutils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
import static org.apache.hudi.common.testutils.FileCreateUtils.createCleanFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCleanFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCompaction;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedReplaceCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
public class HoodieTestTable {
protected final String basePath;
protected final FileSystem fs;
protected HoodieTableMetaClient metaClient;
protected String currentInstantTime;
protected HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) {
ValidationUtils.checkArgument(Objects.equals(basePath, metaClient.getBasePath()));
ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs()));
this.basePath = basePath;
this.fs = fs;
this.metaClient = metaClient;
}
public static HoodieTestTable of(HoodieTableMetaClient metaClient) {
return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient);
}
public static String makeNewCommitTime(int sequence) {
return String.format("%09d", sequence);
}
public static String makeNewCommitTime() {
return makeNewCommitTime(Instant.now());
}
public static String makeNewCommitTime(Instant dateTime) {
return COMMIT_FORMATTER.format(Date.from(dateTime));
}
public static List<String> makeIncrementalCommitTimes(int num) {
return makeIncrementalCommitTimes(num, 1);
}
public static List<String> makeIncrementalCommitTimes(int num, int firstOffsetSeconds) {
final Instant now = Instant.now();
return IntStream.range(0, num)
.mapToObj(i -> makeNewCommitTime(now.plus(firstOffsetSeconds + i, SECONDS)))
.collect(Collectors.toList());
}
public HoodieTestTable addRequestedCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addInflightCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
createInflightCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addInflightDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
createInflightDeltaCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
createInflightCommit(basePath, instantTime);
createCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
createInflightDeltaCommit(basePath, instantTime);
createDeltaCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addReplaceCommit(String instantTime, HoodieReplaceCommitMetadata metadata) throws Exception {
createRequestedReplaceCommit(basePath, instantTime);
createInflightReplaceCommit(basePath, instantTime);
createReplaceCommit(basePath, instantTime, metadata);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
createRequestedCleanFile(basePath, instantTime, cleanerPlan);
createInflightCleanFile(basePath, instantTime, cleanerPlan);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPlan, HoodieCleanMetadata metadata) throws IOException {
createRequestedCleanFile(basePath, instantTime, cleanerPlan);
createInflightCleanFile(basePath, instantTime, cleanerPlan);
createCleanFile(basePath, instantTime, metadata);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addRequestedCompaction(String instantTime) throws IOException {
createRequestedCompaction(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addCompaction(String instantTime) throws IOException {
createRequestedCompaction(basePath, instantTime);
createInflightCompaction(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable forCommit(String instantTime) {
currentInstantTime = instantTime;
return this;
}
public HoodieTestTable forDeltaCommit(String instantTime) {
currentInstantTime = instantTime;
return this;
}
public HoodieTestTable forCompaction(String instantTime) {
currentInstantTime = instantTime;
return this;
}
public HoodieTestTable withPartitionMetaFiles(String... partitionPaths) throws IOException {
for (String partitionPath : partitionPaths) {
FileCreateUtils.createPartitionMetaFile(basePath, partitionPath);
}
return this;
}
public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException {
return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType);
}
public HoodieTestTable withMarkerFile(String partitionPath, String fileId, IOType ioType) throws IOException {
createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType);
return this;
}
public HoodieTestTable withMarkerFiles(String partitionPath, int num, IOType ioType) throws IOException {
String[] fileIds = IntStream.range(0, num).mapToObj(i -> UUID.randomUUID().toString()).toArray(String[]::new);
return withMarkerFiles(partitionPath, fileIds, ioType);
}
public HoodieTestTable withMarkerFiles(String partitionPath, String[] fileIds, IOType ioType) throws IOException {
for (String fileId : fileIds) {
createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType);
}
return this;
}
/**
* Insert one base file to each of the given distinct partitions.
*
* @return A {@link Map} of partition and its newly inserted file's id.
*/
public Map<String, String> withBaseFilesInPartitions(String... partitions) throws Exception {
Map<String, String> partitionFileIdMap = new HashMap<>();
for (String p : partitions) {
String fileId = UUID.randomUUID().toString();
FileCreateUtils.createBaseFile(basePath, p, currentInstantTime, fileId);
partitionFileIdMap.put(p, fileId);
}
return partitionFileIdMap;
}
public HoodieTestTable withBaseFilesInPartitions(Map<String, String> partitionAndFileId) throws Exception {
for (Map.Entry<String, String> pair : partitionAndFileId.entrySet()) {
withBaseFilesInPartition(pair.getKey(), pair.getValue());
}
return this;
}
public HoodieTestTable withBaseFilesInPartition(String partition, String... fileIds) throws Exception {
for (String f : fileIds) {
FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, f);
}
return this;
}
public HoodieTestTable withBaseFilesInPartition(String partition, int... lengths) throws Exception {
for (int l : lengths) {
String fileId = UUID.randomUUID().toString();
FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, fileId, l);
}
return this;
}
public String withLogFile(String partitionPath) throws Exception {
String fileId = UUID.randomUUID().toString();
withLogFile(partitionPath, fileId);
return fileId;
}
public HoodieTestTable withLogFile(String partitionPath, String fileId) throws Exception {
return withLogFile(partitionPath, fileId, 0);
}
public HoodieTestTable withLogFile(String partitionPath, String fileId, int version) throws Exception {
FileCreateUtils.createLogFile(basePath, partitionPath, currentInstantTime, fileId, version);
return this;
}
public boolean inflightCommitsExist(String... instantTime) {
return Arrays.stream(instantTime).allMatch(this::inflightCommitExists);
}
public boolean inflightCommitExists(String instantTime) {
try {
return fs.exists(getInflightCommitFilePath(instantTime));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public boolean commitsExist(String... instantTime) {
return Arrays.stream(instantTime).allMatch(this::commitExists);
}
public boolean commitExists(String instantTime) {
try {
return fs.exists(getCommitFilePath(instantTime));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public boolean baseFilesExist(Map<String, String> partitionAndFileId, String instantTime) {
return partitionAndFileId.entrySet().stream().allMatch(entry -> {
String partition = entry.getKey();
String fileId = entry.getValue();
return baseFileExists(partition, instantTime, fileId);
});
}
public boolean baseFilesExist(String partition, String instantTime, String... fileIds) {
return Arrays.stream(fileIds).allMatch(f -> baseFileExists(partition, instantTime, f));
}
public boolean baseFileExists(String partition, String instantTime, String fileId) {
try {
return fs.exists(new Path(Paths.get(basePath, partition, baseFileName(instantTime, fileId)).toString()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) {
return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v));
}
public boolean logFileExists(String partition, String instantTime, String fileId, int version) {
try {
return fs.exists(new Path(Paths.get(basePath, partition, logFileName(instantTime, fileId, version)).toString()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public Path getInflightCommitFilePath(String instantTime) {
return new Path(Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME, instantTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION).toUri());
}
public Path getCommitFilePath(String instantTime) {
return new Path(Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME, instantTime + HoodieTimeline.COMMIT_EXTENSION).toUri());
}
public Path getRequestedCompactionFilePath(String instantTime) {
return new Path(Paths.get(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME, instantTime + HoodieTimeline.REQUESTED_COMPACTION_EXTENSION).toUri());
}
public Path getPartitionPath(String partition) {
return new Path(Paths.get(basePath, partition).toUri());
}
public Path getBaseFilePath(String partition, String fileId) {
return new Path(Paths.get(basePath, partition, getBaseFileNameById(fileId)).toUri());
}
public String getBaseFileNameById(String fileId) {
return baseFileName(currentInstantTime, fileId);
}
public FileStatus[] listAllBaseFiles() throws IOException {
return listAllBaseFiles(HoodieFileFormat.PARQUET.getFileExtension());
}
public FileStatus[] listAllBaseFiles(String fileExtension) throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(basePath)).stream()
.filter(status -> status.getPath().getName().endsWith(fileExtension))
.toArray(FileStatus[]::new);
}
public FileStatus[] listAllLogFiles() throws IOException {
return listAllLogFiles(HoodieFileFormat.HOODIE_LOG.getFileExtension());
}
public FileStatus[] listAllLogFiles(String fileExtension) throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(basePath)).stream()
.filter(status -> status.getPath().getName().contains(fileExtension))
.toArray(FileStatus[]::new);
}
public FileStatus[] listAllBaseAndLogFiles() throws IOException {
return Stream.concat(Stream.of(listAllBaseFiles()), Stream.of(listAllLogFiles())).toArray(FileStatus[]::new);
}
public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]);
}
public FileStatus[] listAllFilesInTempFolder() throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]);
}
public static class HoodieTestTableException extends RuntimeException {
public HoodieTestTableException(Throwable t) {
super(t);
}
}
}