blob: f454188964f6e2ca3ab6169a127f745026741737 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.wal;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.utils.FileContextUtils;
import org.apache.apex.malhar.lib.utils.IOUtils;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.local.LocalFs;
import org.apache.hadoop.fs.local.RawLocalFs;
import com.google.common.base.Preconditions;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.netlet.util.Slice;
/**
* A WAL implementation that is file based.
* <p/>
* Note:<br/>
* The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause
* problems. Typically the WAL Reader will only used in recovery.<br/>
*
* Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in
* operator's thread.
*
* @since 3.4.0
*/
public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
{
@NotNull
private String filePath;
//max length of the file
@Min(0)
private long maxLength;
private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0);
@NotNull
private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this);
@NotNull
private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this);
//part => tmp file path;
private final Map<Integer, String> tempPartFiles = new TreeMap<>();
private long lastCheckpointedWindow = Stateless.WINDOW_ID;
@Override
public void setup()
{
try {
FileContext fileContext = FileContextUtils.getFileContext(filePath);
if (maxLength == 0) {
maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
}
fileSystemWALWriter.open(fileContext);
fileSystemWALReader.open(fileContext);
} catch (IOException e) {
throw new RuntimeException("during setup", e);
}
}
@Override
public void beforeCheckpoint(long window)
{
try {
lastCheckpointedWindow = window;
fileSystemWALWriter.flush();
} catch (IOException e) {
throw new RuntimeException("during before cp", e);
}
}
/**
* A temporary WAL file is not renamed as soon as it is closed (completed) instead it is tagged to be renamed with
* respect to the window it gets closed. The actual renaming is deferred until the window gets committed.<br/>
* <br/>
* The reason to do so is because after a failure, when the WAL is restored to a checkpoint it might expect to write
* to a temporary file. For example, the writer starts writing to WAL_10_x.tmp in window 59 and then gets
* check-pointed.<br/>
* In window 60 it completes writing to WAL_10_x.tmp and then closes it. If the file is renamed right then and there
* is a failure then after restoration with the state of window 59, it will expect to continue writing to WAL_10_x.tmp
* which will not be found.<br/>
* <br/>
* Writing to the final target (WAL part file) is avoided unless it is certain that the file will not be open to write
* again and this happens when the window in which the file was completed gets committed.
*
* @param window committed window
*/
@Override
public void committed(long window)
{
try {
fileSystemWALWriter.finalizeFiles(window);
} catch (IOException e) {
throw new RuntimeException("during committed", e);
}
}
@Override
public void teardown()
{
try {
fileSystemWALReader.close();
fileSystemWALWriter.close();
} catch (IOException e) {
throw new RuntimeException("during teardown", e);
}
}
protected long getLastCheckpointedWindow()
{
return lastCheckpointedWindow;
}
protected String getPartFilePath(int partNumber)
{
return filePath + "_" + partNumber;
}
@Override
public FileSystemWALReader getReader()
{
return fileSystemWALReader;
}
/**
* Sets the File System WAL Reader. This can be used to override the default wal reader.
*
* @param fileSystemWALReader wal reader.
*/
public void setFileSystemWALReader(@NotNull FileSystemWALReader fileSystemWALReader)
{
this.fileSystemWALReader = Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
}
@Override
public FileSystemWALWriter getWriter()
{
return fileSystemWALWriter;
}
/**
* Sets the File System WAL Writer. This can be used to override the default wal writer.
*
* @param fileSystemWALWriter wal writer.
*/
public void setFileSystemWALWriter(@NotNull FileSystemWALWriter fileSystemWALWriter)
{
this.fileSystemWALWriter = Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
}
/**
* @return WAL file path
*/
public String getFilePath()
{
return filePath;
}
/**
* Sets the WAL file path.
*
* @param filePath WAL file path
*/
public void setFilePath(@NotNull String filePath)
{
this.filePath = Preconditions.checkNotNull(filePath, "filePath");
}
/**
* @return max length of a WAL part file.
*/
public long getMaxLength()
{
return maxLength;
}
/**
* Sets the maximum length of a WAL part file.
*
* @param maxLength max length of the WAL part file
*/
public void setMaxLength(long maxLength)
{
this.maxLength = maxLength;
}
public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer>
{
private final int partNum;
private long offset;
private FileSystemWALPointer()
{
//for kryo
partNum = -1;
}
public FileSystemWALPointer(int partNum, long offset)
{
this.partNum = partNum;
this.offset = offset;
}
@Override
public int compareTo(@NotNull FileSystemWALPointer o)
{
int partNumComparison = Integer.compare(this.partNum, o.partNum);
return partNumComparison == 0 ? Long.compare(this.offset, o.offset) : partNumComparison;
}
public int getPartNum()
{
return partNum;
}
public long getOffset()
{
return offset;
}
@Override
public String toString()
{
return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset=" + offset + '}';
}
}
/**
* A FileSystem Wal Reader
*/
public static class FileSystemWALReader implements WAL.WALReader<FileSystemWALPointer>
{
private FileSystemWALPointer currentPointer;
private transient DataInputStream inputStream;
private transient Path currentOpenPath;
private transient boolean isOpenPathTmp;
private final FileSystemWAL fileSystemWAL;
private transient FileContext fileContext;
private FileSystemWALReader()
{
//for kryo
fileSystemWAL = null;
}
public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
{
this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
currentPointer = new FileSystemWALPointer(fileSystemWal.walStartPointer.partNum,
fileSystemWal.walStartPointer.offset);
}
protected void open(@NotNull FileContext fileContext) throws IOException
{
this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext");
}
protected void close() throws IOException
{
if (inputStream != null) {
inputStream.close();
inputStream = null;
}
}
@Override
public void seek(FileSystemWALPointer pointer) throws IOException
{
Preconditions.checkArgument(pointer.compareTo(fileSystemWAL.walStartPointer) >= 0, "invalid pointer");
if (inputStream != null) {
close();
}
inputStream = getInputStream(pointer);
currentPointer = pointer;
}
/**
* Move to the next WAL segment.
*
* @return true if the next part file exists and is opened; false otherwise.
* @throws IOException
*/
private boolean nextSegment() throws IOException
{
if (inputStream != null) {
close();
}
currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
inputStream = getInputStream(currentPointer);
return inputStream != null;
}
private DataInputStream getInputStream(FileSystemWALPointer walPointer) throws IOException
{
Preconditions.checkArgument(inputStream == null, "input stream not null");
Path pathToReadFrom;
String tmpPath = fileSystemWAL.tempPartFiles.get(walPointer.getPartNum());
if (tmpPath != null) {
pathToReadFrom = new Path(tmpPath);
isOpenPathTmp = true;
} else {
pathToReadFrom = new Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
isOpenPathTmp = false;
}
LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer);
if (fileContext.util().exists(pathToReadFrom)) {
DataInputStream stream = fileContext.open(pathToReadFrom);
if (walPointer.offset > 0) {
stream.skip(walPointer.offset);
}
currentOpenPath = pathToReadFrom;
return stream;
}
return null;
}
@Override
public Slice next() throws IOException
{
do {
if (inputStream == null) {
inputStream = getInputStream(currentPointer);
}
if (inputStream != null && isOpenPathTmp && !fileSystemWAL.tempPartFiles.containsKey(currentPointer.partNum)) {
//if the tmp path was finalized the path may not exist any more
close();
inputStream = getInputStream(currentPointer);
}
if (inputStream != null && currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) {
int len = inputStream.readInt();
Preconditions.checkState(len >= 0, "negative length");
byte[] data = new byte[len];
inputStream.readFully(data);
currentPointer.offset += data.length + 4;
return new Slice(data);
}
} while (nextSegment());
close();
return null;
}
@Override
public FileSystemWALPointer getStartPointer()
{
return fileSystemWAL.walStartPointer;
}
}
/**
* A FileSystem WAL Writer.
*/
public static class FileSystemWALWriter implements WAL.WALWriter<FileSystemWALPointer>
{
private FileSystemWALPointer currentPointer = new FileSystemWALPointer(0, 0);
private transient DataOutputStream outputStream;
//windowId => Latest part which can be finalized.
private final Map<Long, Integer> pendingFinalization = new TreeMap<>();
private final FileSystemWAL fileSystemWAL;
private transient FileContext fileContext;
private int latestFinalizedPart = -1;
private int lowestDeletedPart = -1;
private FileSystemWALWriter()
{
//for kryo
fileSystemWAL = null;
}
public FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal)
{
this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal");
}
protected void open(@NotNull FileContext fileContext) throws IOException
{
this.fileContext = Preconditions.checkNotNull(fileContext, "file context");
recover();
}
private void recover() throws IOException
{
LOG.debug("current point", currentPointer);
String tmpFilePath = fileSystemWAL.tempPartFiles.get(currentPointer.getPartNum());
if (tmpFilePath != null) {
Path tmpPath = new Path(tmpFilePath);
if (fileContext.util().exists(tmpPath)) {
LOG.debug("tmp path exists {}", tmpPath);
outputStream = getOutputStream(new FileSystemWALPointer(currentPointer.partNum, 0));
DataInputStream inputStreamOldTmp = fileContext.open(tmpPath);
IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream);
outputStream.flush();
//remove old tmp
inputStreamOldTmp.close();
LOG.debug("delete tmp {}", tmpPath);
fileContext.delete(tmpPath, true);
}
}
//find all valid path names
Set<String> validPathNames = new HashSet<>();
for (Map.Entry<Integer, String> entry : fileSystemWAL.tempPartFiles.entrySet()) {
if (entry.getKey() <= currentPointer.partNum) {
validPathNames.add(new Path(entry.getValue()).getName());
}
}
LOG.debug("valid names {}", validPathNames);
//there can be a failure just between the flush and the actual checkpoint which can leave some stray tmp files
//which aren't accounted by tmp files map
Path walPath = new Path(fileSystemWAL.filePath);
Path parentWAL = walPath.getParent();
if (parentWAL != null && fileContext.util().exists(parentWAL)) {
RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL);
while (remoteIterator.hasNext()) {
FileStatus status = remoteIterator.next();
String fileName = status.getPath().getName();
if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) &&
!validPathNames.contains(fileName)) {
LOG.debug("delete stray tmp {}", status.getPath());
fileContext.delete(status.getPath(), true);
}
}
}
}
protected void close() throws IOException
{
if (outputStream != null) {
outputStream.close();
outputStream = null;
LOG.debug("closed {}", currentPointer.partNum);
}
}
@Override
public int append(Slice entry) throws IOException
{
if (outputStream == null) {
outputStream = getOutputStream(currentPointer);
}
int entryLength = entry.length + 4;
// rotate if needed
if (shouldRotate(entryLength)) {
rotate(true);
}
outputStream.writeInt(entry.length);
outputStream.write(entry.buffer, entry.offset, entry.length);
currentPointer.offset += entryLength;
if (currentPointer.offset == fileSystemWAL.maxLength) {
//if the file is completed then we can rotate it. do not have to wait for next entry
rotate(false);
}
return entryLength;
}
@Override
public FileSystemWALPointer getPointer()
{
return currentPointer;
}
@Override
public void delete(FileSystemWALPointer pointer) throws IOException
{
if (pointer.compareTo(currentPointer) <= 0) {
fileSystemWAL.walStartPointer = pointer;
deleteFinalizedParts(pointer);
}
}
private void deleteFinalizedParts(FileSystemWALPointer pointer) throws IOException
{
int lastPartDeleted = -1;
//delete all part files completely which are smaller than pointer.partNum
for (int i = lowestDeletedPart + 1; i < pointer.partNum; i++) {
if (i <= latestFinalizedPart) {
//delete a part only if it is finalized.
Path partPath = new Path(fileSystemWAL.getPartFilePath(i));
if (fileContext.util().exists(partPath)) {
LOG.debug("delete {}", partPath);
fileContext.delete(partPath, true);
lastPartDeleted = i;
} else {
break;
}
}
}
if (lastPartDeleted != -1) {
lowestDeletedPart = lastPartDeleted;
}
//truncate the pointer.partNum till pointer.offset
if (pointer.partNum <= latestFinalizedPart && pointer.offset > 0) {
String part = fileSystemWAL.getPartFilePath(pointer.partNum);
Path inputPartPath = new Path(part);
long length = fileContext.getFileStatus(inputPartPath).getLen();
LOG.debug("truncate {} from {} length {}", part, pointer.offset, length);
if (length > pointer.offset) {
String temp = getTmpFilePath(part);
Path tmpPart = new Path(temp);
DataInputStream inputStream = fileContext.open(inputPartPath);
DataOutputStream outputStream = fileContext.create(tmpPart, EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND),
Options.CreateOpts.CreateParent.createParent());
IOUtils.copyPartial(inputStream, pointer.offset, length - pointer.offset, outputStream);
inputStream.close();
outputStream.close();
if (fileSystemWAL.walStartPointer.partNum == pointer.partNum) {
//Since the file is abridged
fileSystemWAL.walStartPointer.offset = 0;
}
fileContext.rename(tmpPart, inputPartPath, Options.Rename.OVERWRITE);
}
}
}
protected void flush() throws IOException
{
if (outputStream != null) {
if (fileContext.getDefaultFileSystem() instanceof LocalFs ||
fileContext.getDefaultFileSystem() instanceof RawLocalFs) {
//until the stream is closed on the local FS, readers don't see any data.
close();
} else {
Syncable syncableOutputStream = (Syncable)outputStream;
syncableOutputStream.hflush();
syncableOutputStream.hsync();
}
}
}
protected boolean shouldRotate(int entryLength)
{
return currentPointer.offset + entryLength > fileSystemWAL.maxLength;
}
protected void rotate(boolean openNextFile) throws IOException
{
flush();
close();
//all parts up to current part can be finalized.
pendingFinalization.put(fileSystemWAL.getLastCheckpointedWindow(), currentPointer.partNum);
LOG.debug("rotate {} to {}", currentPointer.partNum, currentPointer.partNum + 1);
currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0);
if (openNextFile) {
//if adding the new entry to the file can cause the current file to exceed the max length then it is rotated.
outputStream = getOutputStream(currentPointer);
}
}
protected void finalizeFiles(long window) throws IOException
{
if (!fileSystemWAL.tempPartFiles.isEmpty()) {
//finalize temporary files
int largestPartAvailable = fileSystemWAL.tempPartFiles.keySet().iterator().next();
Iterator<Map.Entry<Long, Integer>> pendingFinalizeIter = pendingFinalization.entrySet().iterator();
while (pendingFinalizeIter.hasNext()) {
Map.Entry<Long, Integer> entry = pendingFinalizeIter.next();
if (entry.getKey() >= window) {
//finalize files which were requested for finalization in the window < committed window
break;
}
pendingFinalizeIter.remove();
int partToFinalizeTill = entry.getValue();
for (int i = largestPartAvailable; i <= partToFinalizeTill; i++) {
String tmpToFinalize = fileSystemWAL.tempPartFiles.remove(i);
Path tmpPath = new Path(tmpToFinalize);
if (fileContext.util().exists(tmpPath)) {
LOG.debug("finalize {} of part {}", tmpToFinalize, i);
fileContext.rename(tmpPath, new Path(fileSystemWAL.getPartFilePath(i)), Options.Rename.OVERWRITE);
latestFinalizedPart = i;
}
}
largestPartAvailable = partToFinalizeTill + 1;
}
}
if (lowestDeletedPart != -1 && lowestDeletedPart < fileSystemWAL.walStartPointer.partNum) {
//delete any pending finalized files which were not deleted when the delete request was made.
deleteFinalizedParts(fileSystemWAL.walStartPointer);
}
}
private DataOutputStream getOutputStream(FileSystemWALPointer pointer) throws IOException
{
Preconditions.checkArgument(outputStream == null, "output stream is not null");
if (pointer.offset > 0 && (fileContext.getDefaultFileSystem() instanceof LocalFs ||
fileContext.getDefaultFileSystem() instanceof RawLocalFs)) {
//on local file system the stream is closed instead of flush so we open it again in append mode if the
//offset > 0.
return fileContext.create(new Path(fileSystemWAL.tempPartFiles.get(pointer.partNum)),
EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), Options.CreateOpts.CreateParent.createParent());
}
String partFile = fileSystemWAL.getPartFilePath(pointer.partNum);
String tmpFilePath = getTmpFilePath(partFile);
fileSystemWAL.tempPartFiles.put(pointer.partNum, tmpFilePath);
Preconditions.checkArgument(pointer.offset == 0, "offset > 0");
LOG.debug("open {} => {}", pointer.partNum, tmpFilePath);
outputStream = fileContext.create(new Path(tmpFilePath),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
return outputStream;
}
}
private static String getTmpFilePath(String filePath)
{
return filePath + '.' + System.currentTimeMillis() + TMP_EXTENSION;
}
private static final String TMP_EXTENSION = ".tmp";
private static final Logger LOG = LoggerFactory.getLogger(FileSystemWAL.class);
}