[FLINK-11419][filesystem] Wait for lease to be revoked when truncating file in Hadoop.
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index a716029..be0d134 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -86,20 +86,7 @@
this.targetFile = checkNotNull(recoverable.targetFile());
this.tempFile = checkNotNull(recoverable.tempFile());
- waitUntilLeaseIsRevoked(tempFile);
-
- // truncate back and append
- boolean truncated;
- try {
- truncated = truncate(fs, tempFile, recoverable.offset());
- } catch (Exception e) {
- throw new IOException("Missing data in tmp file: " + tempFile, e);
- }
-
- if (!truncated) {
- // Truncate did not complete immediately, we must wait for the operation to complete and release the lease
- waitUntilLeaseIsRevoked(tempFile);
- }
+ safelyTruncateFile(fs, tempFile, recoverable);
out = fs.append(tempFile);
@@ -162,6 +149,30 @@
// Hadoop 2.7, which have no truncation calls for HDFS.
// ------------------------------------------------------------------------
+ private static void safelyTruncateFile(
+ final FileSystem fileSystem,
+ final Path path,
+ final HadoopFsRecoverable recoverable) throws IOException {
+
+ ensureTruncateInitialized();
+
+ waitUntilLeaseIsRevoked(fileSystem, path);
+
+ // truncate back and append
+ boolean truncated;
+ try {
+ truncated = truncate(fileSystem, path, recoverable.offset());
+ } catch (Exception e) {
+ throw new IOException("Problem while truncating file: " + path, e);
+ }
+
+ if (!truncated) {
+ // Truncate did not complete immediately, we must wait for
+ // the operation to complete and release the lease.
+ waitUntilLeaseIsRevoked(fileSystem, path);
+ }
+ }
+
private static void ensureTruncateInitialized() throws FlinkRuntimeException {
if (truncateHandle == null) {
Method truncateMethod;
@@ -180,7 +191,7 @@
}
}
- static boolean truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
+ private static boolean truncate(final FileSystem hadoopFs, final Path file, final long length) throws IOException {
if (truncateHandle != null) {
try {
return (Boolean) truncateHandle.invoke(hadoopFs, file, length);
@@ -197,7 +208,7 @@
else {
throw new IllegalStateException("Truncation handle has not been initialized");
}
- return true;
+ return false;
}
// ------------------------------------------------------------------------
@@ -268,12 +279,7 @@
if (srcStatus.getLen() > expectedLength) {
// can happen if we go from persist to recovering for commit directly
// truncate the trailing junk away
- try {
- truncate(fs, src, expectedLength);
- } catch (Exception e) {
- // this can happen if the file is smaller than expected
- throw new IOException("Problem while truncating file: " + src, e);
- }
+ safelyTruncateFile(fs, src, recoverable);
}
// rename to final location (if it exists, overwrite it)
@@ -312,7 +318,7 @@
*
* @param path The path to the file we want to resume writing to.
*/
- private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException {
+ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path path) throws IOException {
Preconditions.checkState(fs instanceof DistributedFileSystem);
final DistributedFileSystem dfs = (DistributedFileSystem) fs;