blob: c688b3217c4eca9ace3cf701f987822c1e07ed81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An implementation of the {@link RecoverableFsDataOutputStream} for Hadoop's
* file system abstraction.
*/
@Internal
class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
private static final long LEASE_TIMEOUT = 100_000L;
private static Method truncateHandle;
private final FileSystem fs;
private final Path targetFile;
private final Path tempFile;
private final FSDataOutputStream out;
HadoopRecoverableFsDataOutputStream(
FileSystem fs,
Path targetFile,
Path tempFile) throws IOException {
ensureTruncateInitialized();
this.fs = checkNotNull(fs);
this.targetFile = checkNotNull(targetFile);
this.tempFile = checkNotNull(tempFile);
this.out = fs.create(tempFile);
}
HadoopRecoverableFsDataOutputStream(
FileSystem fs,
HadoopFsRecoverable recoverable) throws IOException {
ensureTruncateInitialized();
this.fs = checkNotNull(fs);
this.targetFile = checkNotNull(recoverable.targetFile());
this.tempFile = checkNotNull(recoverable.tempFile());
// truncate back and append
try {
truncate(fs, tempFile, recoverable.offset());
} catch (Exception e) {
throw new IOException("Missing data in tmp file: " + tempFile, e);
}
waitUntilLeaseIsRevoked(tempFile);
out = fs.append(tempFile);
// sanity check
long pos = out.getPos();
if (pos != recoverable.offset()) {
IOUtils.closeQuietly(out);
throw new IOException("Truncate failed: " + tempFile +
" (requested=" + recoverable.offset() + " ,size=" + pos + ')');
}
}
@Override
public void write(int b) throws IOException {
out.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
@Override
public void flush() throws IOException {
out.hflush();
}
@Override
public void sync() throws IOException {
out.hflush();
out.hsync();
}
@Override
public long getPos() throws IOException {
return out.getPos();
}
@Override
public ResumeRecoverable persist() throws IOException {
sync();
return new HadoopFsRecoverable(targetFile, tempFile, getPos());
}
@Override
public Committer closeForCommit() throws IOException {
final long pos = getPos();
close();
return new HadoopFsCommitter(fs, new HadoopFsRecoverable(targetFile, tempFile, pos));
}
@Override
public void close() throws IOException {
out.close();
}
// ------------------------------------------------------------------------
// Reflection utils for truncation
// These are needed to compile against Hadoop versions before
// Hadoop 2.7, which have no truncation calls for HDFS.
// ------------------------------------------------------------------------
private static void ensureTruncateInitialized() throws FlinkRuntimeException {
if (truncateHandle == null) {
Method truncateMethod;
try {
truncateMethod = FileSystem.class.getMethod("truncate", Path.class, long.class);
}
catch (NoSuchMethodException e) {
throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
}
if (!Modifier.isPublic(truncateMethod.getModifiers())) {
throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
}
truncateHandle = truncateMethod;
}
}
static void truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
if (truncateHandle != null) {
try {
truncateHandle.invoke(hadoopFs, file, length);
}
catch (InvocationTargetException e) {
ExceptionUtils.rethrowIOException(e.getTargetException());
}
catch (Throwable t) {
throw new IOException(
"Truncation of file failed because of access/linking problems with Hadoop's truncate call. " +
"This is most likely a dependency conflict or class loading problem.");
}
}
else {
throw new IllegalStateException("Truncation handle has not been initialized");
}
}
// ------------------------------------------------------------------------
// Committer
// ------------------------------------------------------------------------
/**
* Implementation of a committer for the Hadoop File System abstraction.
* This implementation commits by renaming the temp file to the final file path.
* The temp file is truncated before renaming in case there is trailing garbage data.
*/
static class HadoopFsCommitter implements Committer {
private final FileSystem fs;
private final HadoopFsRecoverable recoverable;
HadoopFsCommitter(FileSystem fs, HadoopFsRecoverable recoverable) {
this.fs = checkNotNull(fs);
this.recoverable = checkNotNull(recoverable);
}
@Override
public void commit() throws IOException {
final Path src = recoverable.tempFile();
final Path dest = recoverable.targetFile();
final long expectedLength = recoverable.offset();
final FileStatus srcStatus;
try {
srcStatus = fs.getFileStatus(src);
}
catch (IOException e) {
throw new IOException("Cannot clean commit: Staging file does not exist.");
}
if (srcStatus.getLen() != expectedLength) {
// something was done to this file since the committer was created.
// this is not the "clean" case
throw new IOException("Cannot clean commit: File has trailing junk data.");
}
try {
fs.rename(src, dest);
}
catch (IOException e) {
throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
}
}
@Override
public void commitAfterRecovery() throws IOException {
final Path src = recoverable.tempFile();
final Path dest = recoverable.targetFile();
final long expectedLength = recoverable.offset();
FileStatus srcStatus = null;
try {
srcStatus = fs.getFileStatus(src);
}
catch (FileNotFoundException e) {
// status remains null
}
catch (IOException e) {
throw new IOException("Committing during recovery failed: Could not access status of source file.");
}
if (srcStatus != null) {
if (srcStatus.getLen() > expectedLength) {
// can happen if we go from persist to recovering for commit directly
// truncate the trailing junk away
try {
truncate(fs, src, expectedLength);
} catch (Exception e) {
// this can happen if the file is smaller than expected
throw new IOException("Problem while truncating file: " + src, e);
}
}
// rename to final location (if it exists, overwrite it)
try {
fs.rename(src, dest);
}
catch (IOException e) {
throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
}
}
else if (!fs.exists(dest)) {
// neither exists - that can be a sign of
// - (1) a serious problem (file system loss of data)
// - (2) a recovery of a savepoint that is some time old and the users
// removed the files in the meantime.
// TODO how to handle this?
// We probably need an option for users whether this should log,
// or result in an exception or unrecoverable exception
}
}
@Override
public CommitRecoverable getRecoverable() {
return recoverable;
}
}
/**
* Called when resuming execution after a failure and waits until the lease
* of the file we are resuming is free.
*
* <p>The lease of the file we are resuming writing/committing to may still
* belong to the process that failed previously and whose state we are
* recovering.
*
* @param path The path to the file we want to resume writing to.
*/
private boolean waitUntilLeaseIsRevoked(final Path path) throws IOException {
Preconditions.checkState(fs instanceof DistributedFileSystem);
final DistributedFileSystem dfs = (DistributedFileSystem) fs;
dfs.recoverLease(path);
final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
final StopWatch sw = new StopWatch();
sw.start();
boolean isClosed = dfs.isFileClosed(path);
while (!isClosed && deadline.hasTimeLeft()) {
try {
Thread.sleep(500L);
} catch (InterruptedException e1) {
throw new IOException("Recovering the lease failed: ", e1);
}
isClosed = dfs.isFileClosed(path);
}
return isClosed;
}
}