[FLINK-11419][filesystem] Wait for lease to be revoked when truncating file in Hadoop.
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index a716029..be0d134 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -86,20 +86,7 @@
 		this.targetFile = checkNotNull(recoverable.targetFile());
 		this.tempFile = checkNotNull(recoverable.tempFile());
 
-		waitUntilLeaseIsRevoked(tempFile);
-
-		// truncate back and append
-		boolean truncated;
-		try {
-			truncated = truncate(fs, tempFile, recoverable.offset());
-		} catch (Exception e) {
-			throw new IOException("Missing data in tmp file: " + tempFile, e);
-		}
-
-		if (!truncated) {
-			// Truncate did not complete immediately, we must wait for the operation to complete and release the lease
-			waitUntilLeaseIsRevoked(tempFile);
-		}
+		safelyTruncateFile(fs, tempFile, recoverable);
 
 		out = fs.append(tempFile);
 
@@ -162,6 +149,30 @@
 	//    Hadoop 2.7, which have no truncation calls for HDFS.
 	// ------------------------------------------------------------------------
 
+	private static void safelyTruncateFile(
+			final FileSystem fileSystem,
+			final Path path,
+			final HadoopFsRecoverable recoverable) throws IOException {
+
+		ensureTruncateInitialized();
+
+		waitUntilLeaseIsRevoked(fileSystem, path);
+
+		// truncate back and append
+		boolean truncated;
+		try {
+			truncated = truncate(fileSystem, path, recoverable.offset());
+		} catch (Exception e) {
+			throw new IOException("Problem while truncating file: " + path, e);
+		}
+
+		if (!truncated) {
+			// Truncate did not complete immediately, we must wait for
+			// the operation to complete and release the lease.
+			waitUntilLeaseIsRevoked(fileSystem, path);
+		}
+	}
+
 	private static void ensureTruncateInitialized() throws FlinkRuntimeException {
 		if (truncateHandle == null) {
 			Method truncateMethod;
@@ -180,7 +191,7 @@
 		}
 	}
 
-	static boolean truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
+	private static boolean truncate(final FileSystem hadoopFs, final Path file, final long length) throws IOException {
 		if (truncateHandle != null) {
 			try {
 				return (Boolean) truncateHandle.invoke(hadoopFs, file, length);
@@ -197,7 +208,7 @@
 		else {
 			throw new IllegalStateException("Truncation handle has not been initialized");
 		}
-		return true;
+		return false;
 	}
 
 	// ------------------------------------------------------------------------
@@ -268,12 +279,7 @@
 				if (srcStatus.getLen() > expectedLength) {
 					// can happen if we go from persist to recovering for commit directly
 					// truncate the trailing junk away
-					try {
-						truncate(fs, src, expectedLength);
-					} catch (Exception e) {
-						// this can happen if the file is smaller than  expected
-						throw new IOException("Problem while truncating file: " + src, e);
-					}
+					safelyTruncateFile(fs, src, recoverable);
 				}
 
 				// rename to final location (if it exists, overwrite it)
@@ -312,7 +318,7 @@
 	 *
 	 * @param path The path to the file we want to resume writing to.
 	 */
-	private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException {
+	private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path path) throws IOException {
 		Preconditions.checkState(fs instanceof DistributedFileSystem);
 
 		final DistributedFileSystem dfs = (DistributedFileSystem) fs;