blob: 8826204cdafe11521a0d5af31df6c9793becb30b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Operates on marker files for a given write action (commit, delta commit, compaction).
*/
public class MarkerFiles implements Serializable {
private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
private final String instantTime;
private final transient FileSystem fs;
private final transient Path markerDirPath;
private final String basePath;
public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
this.instantTime = instantTime;
this.fs = fs;
this.markerDirPath = new Path(markerFolderPath);
this.basePath = basePath;
}
public MarkerFiles(HoodieTable table, String instantTime) {
this(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(),
table.getMetaClient().getMarkerFolderPath(instantTime),
instantTime);
}
public void quietDeleteMarkerDir(HoodieEngineContext context, int parallelism) {
try {
deleteMarkerDir(context, parallelism);
} catch (HoodieIOException ioe) {
LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
}
}
/**
* Delete Marker directory corresponding to an instant.
*
* @param context HoodieEngineContext.
* @param parallelism parallelism for deletion.
*/
public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) {
try {
if (fs.exists(markerDirPath)) {
FileStatus[] fileStatuses = fs.listStatus(markerDirPath);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
if (markerDirSubPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
parallelism = Math.min(markerDirSubPaths.size(), parallelism);
context.foreach(markerDirSubPaths, subPathStr -> {
Path subPath = new Path(subPathStr);
FileSystem fileSystem = subPath.getFileSystem(conf.get());
fileSystem.delete(subPath, true);
}, parallelism);
}
boolean result = fs.delete(markerDirPath, true);
LOG.info("Removing marker directory at " + markerDirPath);
return result;
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
return false;
}
public boolean doesMarkerDirExist() throws IOException {
return fs.exists(markerDirPath);
}
public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int parallelism) throws IOException {
Set<String> dataFiles = new HashSet<>();
FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath);
List<String> subDirectories = new ArrayList<>();
for (FileStatus topLevelStatus: topLevelStatuses) {
if (topLevelStatus.isFile()) {
String pathStr = topLevelStatus.getPath().toString();
if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
dataFiles.add(translateMarkerToDataPath(pathStr));
}
} else {
subDirectories.add(topLevelStatus.getPath().toString());
}
}
if (subDirectories.size() > 0) {
parallelism = Math.min(subDirectories.size(), parallelism);
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
dataFiles.addAll(context.flatMap(subDirectories, directory -> {
Path path = new Path(directory);
FileSystem fileSystem = path.getFileSystem(serializedConf.get());
RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true);
List<String> result = new ArrayList<>();
while (itr.hasNext()) {
FileStatus status = itr.next();
String pathStr = status.getPath().toString();
if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
result.add(translateMarkerToDataPath(pathStr));
}
}
return result.stream();
}, parallelism));
}
return dataFiles;
}
private String translateMarkerToDataPath(String markerPath) {
String rPath = stripMarkerFolderPrefix(markerPath);
return MarkerFiles.stripMarkerSuffix(rPath);
}
public static String stripMarkerSuffix(String path) {
return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
}
public List<String> allMarkerFilePaths() throws IOException {
List<String> markerFiles = new ArrayList<>();
FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
markerFiles.add(stripMarkerFolderPrefix(fileStatus.getPath().toString()));
return true;
}, false);
return markerFiles;
}
private String stripMarkerFolderPrefix(String fullMarkerPath) {
ValidationUtils.checkArgument(fullMarkerPath.contains(HoodieTableMetaClient.MARKER_EXTN));
String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime))).toString();
int begin = fullMarkerPath.indexOf(markerRootPath);
ValidationUtils.checkArgument(begin >= 0,
"Not in marker dir. Marker Path=" + fullMarkerPath + ", Expected Marker Root=" + markerRootPath);
return fullMarkerPath.substring(begin + markerRootPath.length() + 1);
}
/**
* The marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename.marker.writeIOType.
*/
public Path create(String partitionPath, String dataFileName, IOType type) {
Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath);
try {
fs.mkdirs(path); // create a new partition as needed.
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
String markerFileName = String.format("%s%s.%s", dataFileName, HoodieTableMetaClient.MARKER_EXTN, type.name());
Path markerPath = new Path(path, markerFileName);
try {
LOG.info("Creating Marker Path=" + markerPath);
fs.create(markerPath, false).close();
} catch (IOException e) {
throw new HoodieException("Failed to create marker file " + markerPath, e);
}
return markerPath;
}
}