blob: 842519b4d89c5186bf5ff931970503acd73bcbb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.reef.runtime.yarn.driver.restart;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.util.CloseableIterable;
import java.io.*;
import java.nio.charset.StandardCharsets;
/**
* The DFS evaluator logger that does not support append and does append by overwrite.
* dfs.support.append should be false.
*/
@Private
public final class DFSEvaluatorLogOverwriteReaderWriter implements DFSEvaluatorLogReaderWriter {
private final FileSystem fileSystem;
private final DFSLineReader reader;
private final Path changeLogPath;
private final Path changeLogAltPath;
// This is the last path we will be writing to.
private Path pathToWriteTo = null;
private boolean fsClosed = false;
DFSEvaluatorLogOverwriteReaderWriter(final FileSystem fileSystem, final Path changeLogPath) {
this.fileSystem = fileSystem;
this.changeLogPath = changeLogPath;
this.changeLogAltPath = new Path(changeLogPath + ".alt");
this.reader = new DFSLineReader(fileSystem);
}
/**
* Writes a formatted entry (addition or removal) for an Evaluator ID into the DFS evaluator log.
* The log is appended to by reading first, adding on the information, and then overwriting the entire log.
* Since the {@link FileSystem} does not support appends, this {@link DFSEvaluatorLogReaderWriter}
* uses a two-file approach, where when we write, we always overwrite the older file.
* @param formattedEntry The formatted entry (entry with evaluator ID and addition/removal information).
* @throws IOException when file cannot be written.
*/
@Override
public synchronized void writeToEvaluatorLog(final String formattedEntry) throws IOException {
final Path writePath = getWritePath();
// readPath is always not the writePath.
final Path readPath = getAlternativePath(writePath);
try (FSDataOutputStream outputStream = this.fileSystem.create(writePath, true)) {
InputStream inputStream = null;
try {
final InputStream newEntryInputStream = new ByteArrayInputStream(
formattedEntry.getBytes(StandardCharsets.UTF_8));
if (fileSystem.exists(readPath)) {
inputStream = new SequenceInputStream(
this.fileSystem.open(readPath), newEntryInputStream);
} else {
inputStream = newEntryInputStream;
}
IOUtils.copyBytes(inputStream, outputStream, 4096, false);
} finally {
outputStream.hsync();
if (inputStream != null) {
inputStream.close();
}
}
}
}
/**
* Since the {@link FileSystem} does not support appends, this {@link DFSEvaluatorLogReaderWriter}
* uses a two-file approach, where when we read, we always read from the newer file.
*/
@Override
public synchronized CloseableIterable<String> readFromEvaluatorLog() throws IOException {
return reader.readLinesFromFile(getLongerFile());
}
/**
* Gets the alternative path. Returns one of changeLogPath and changeLogAltPath.
*/
private synchronized Path getAlternativePath(final Path path) {
if (path.equals(changeLogPath)) {
return changeLogAltPath;
}
return changeLogPath;
}
/**
* Gets the path to write to.
*/
private synchronized Path getWritePath() throws IOException {
if (pathToWriteTo == null) {
// If we have not yet written before, check existence of files.
final boolean originalExists = fileSystem.exists(changeLogPath);
final boolean altExists = fileSystem.exists(changeLogAltPath);
if (originalExists && altExists) {
final FileStatus originalStatus = fileSystem.getFileStatus(changeLogPath);
final FileStatus altStatus = fileSystem.getFileStatus(changeLogAltPath);
// Return the shorter file.
// TODO[JIRA REEF-1413]: This approach will not be able to work in REEF-1413.
// TODO[JIRA REEF-1413]: Note that we cannot use last modified time because Azure blob's HDFS API only
// TODO[JIRA REEF-1413]: supports time resolution up to a second.
final long originalLen = originalStatus.getLen();
final long altLen = altStatus.getLen();
if (originalLen < altLen) {
pathToWriteTo = changeLogPath;
} else {
pathToWriteTo = changeLogAltPath;
}
} else if (originalExists) {
// Return the file that does not exist.
pathToWriteTo = changeLogAltPath;
} else {
pathToWriteTo = changeLogPath;
}
}
final Path returnPath = pathToWriteTo;
pathToWriteTo = getAlternativePath(pathToWriteTo);
return returnPath;
}
private synchronized Path getLongerFile() throws IOException {
final boolean originalExists = fileSystem.exists(changeLogPath);
final boolean altExists = fileSystem.exists(changeLogAltPath);
// If both files exist, return the newest file path.
if (originalExists && altExists) {
final FileStatus originalStatus = fileSystem.getFileStatus(changeLogPath);
final FileStatus altStatus = fileSystem.getFileStatus(changeLogAltPath);
final long originalLastModTime = originalStatus.getLen();
final long altLastModTime = altStatus.getLen();
// Return the newer file.
if (originalLastModTime >= altLastModTime) {
return changeLogPath;
}
return changeLogAltPath;
} else if (altExists) {
// If only the alt file exists, return the alt file path.
return changeLogAltPath;
}
// If only the original file exists or if neither exist, return the original file path.
return changeLogPath;
}
/**
* Closes the FileSystem.
* @throws Exception
*/
@Override
public synchronized void close() throws Exception {
if (this.fileSystem != null && !this.fsClosed) {
this.fileSystem.close();
this.fsClosed = true;
}
}
}