blob: 4db5e17ddda0bae6d604d689417c0e04f3cb6011 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager;
import org.apache.apex.malhar.lib.utils.FileContextUtils;
import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.Slice;
/**
* An {@link WindowDataManager} that uses FS to persist state every completed application window.<p/>
*
* FSWindowDataManager uses {@link FSWindowReplayWAL} to write to files. While saving an artifact corresponding
* to a window, the window date manager saves:
* <ol>
* <li>Window id</li>
* <li>Artifact</li>
* </ol>
* In order to ensure that all the entries corresponding to a window id are appended to the same wal part file, the
* wal operates in batch mode. In batch mode, the rotation of a wal part is done only after a batch is complete.<br/>
* <p/>
*
* <b>Replaying data of a completed window</b><br/>
* Main support that {@link WindowDataManager} provides to input operators is to be able to replay windows which
* were completely processed but not checkpointed. This is necessary for making input operators idempotent.<br/>
* The {@link FileSystemWAL}, however, ignores any data which is not checkpointed after failure. Therefore,
* {@link FSWindowDataManager} cannot rely solely on the state in wal after failures and so during recovery it modifies
* the wal state by traversing through the wal files.<br/>
* <br/>
* {@link IncrementalCheckpointManager}, however, relies only on the checkpointed state and therefore sets
* {@link #relyOnCheckpoints} to true. This is because {@link IncrementalCheckpointManager} only saves data per
* checkpoint window.
* <p/>
*
* <b>Purging of stale artifacts</b><br/>
* When a window gets committed, it indicates that all the operators in the DAG have completely finished processing that
* window. This means that the data of this window can be deleted as it will never be requested for replaying.
* Operators can invoke {@link #committed(long)} callback of {@link FSWindowDataManager} to trigger deletion of stale
* artifacts.<br/>
* <p/>
*
* <b>Dynamic partitioning support provided</b><br/>
* An operator can call {@link #partition(int, Set)} to get new instances of {@link FSWindowDataManager} during
* re-partitioning. When operator partitions are removed, then one of the new instances will handle the state of
* all deleted instances.<br/>
* After re-partitioning, the largest completed window is the min of max completed windows across all partitions.</br>
*
* <p/>
* At times, after re-partitioning, a physical operator may want to read the data saved by all the partitions for a
* completed window id. For example, {@link AbstractFileInputOperator}, needs to redistribute files based on the hash
* of file-paths and its partition keys, so it reads artifacts saved by all partitions during replay of a completed
* window. {@link #retrieveAllPartitions(long)} retrieves the artifacts of all partitions wrt a completed window.
*
*
* @since 3.4.0
*/
public class FSWindowDataManager implements WindowDataManager
{
private static final String DEF_STATE_PATH = "idempotentState";
private static final String WAL_FILE_NAME = "wal";
/**
* State path relative to app filePath where state is saved.
*/
@NotNull
private String statePath = DEF_STATE_PATH;
private boolean isStatePathRelativeToAppPath = true;
/**
* This is not null only for one physical instance.<br/>
* It consists of operator ids which have been deleted but have some state that can be replayed.
* Only one of the instances would be handling (modifying) the files that belong to this state. <br/>
* The value is assigned during partitioning.
*/
private Set<Integer> deletedOperators;
private boolean repartitioned;
/**
* Used when it is not necessary to replay every streaming/app window.
* Used by {@link IncrementalCheckpointManager}
*/
private boolean relyOnCheckpoints;
private transient long largestCompletedWindow = Stateless.WINDOW_ID;
private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
//operator id -> wals (sorted)
private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>();
private transient String fullStatePath;
private transient int operatorId;
private final transient Kryo kryo = new Kryo();
private transient FileContext fileContext;
private transient SerializationBuffer serializationBuffer;
public FSWindowDataManager()
{
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
@Override
public void setup(Context.OperatorContext context)
{
serializationBuffer = new SerializationBuffer(new WindowedBlockStream());
operatorId = context.getId();
if (isStatePathRelativeToAppPath) {
fullStatePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + statePath;
} else {
fullStatePath = statePath;
}
try {
fileContext = FileContextUtils.getFileContext(fullStatePath);
setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void setupWals(long activationWindow) throws IOException
{
findFiles(wal, operatorId);
configureWal(wal, operatorId, !relyOnCheckpoints);
if (repartitioned) {
createReadOnlyWals();
for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
findFiles(entry.getValue(), entry.getKey());
configureWal(entry.getValue(), entry.getKey(), true);
}
}
//find largest completed window
if (!relyOnCheckpoints) {
long completedWindow = findLargestCompletedWindow(wal, null);
//committed will not delete temp files so it is possible that when reading from files, a smaller window
//than the activation window is found.
if (completedWindow > activationWindow) {
largestCompletedWindow = completedWindow;
}
if (wal.getReader().getCurrentPointer() != null) {
wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy());
}
} else {
wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer();
largestCompletedWindow = wal.getLastCheckpointedWindow();
}
if (repartitioned && largestCompletedWindow > Stateless.WINDOW_ID) {
//find the min of max window ids: a downstream will not finish a window until all the upstream have finished it.
for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
long completedWindow = Stateless.WINDOW_ID;
if (!relyOnCheckpoints) {
long window = findLargestCompletedWindow(entry.getValue(), null);
if (window > activationWindow) {
completedWindow = window;
}
} else {
completedWindow = findLargestCompletedWindow(entry.getValue(), activationWindow);
}
if (completedWindow < largestCompletedWindow) {
largestCompletedWindow = completedWindow;
}
}
}
//reset readers
wal.getReader().seek(wal.walStartPointer);
for (FSWindowReplayWAL wal : readOnlyWals.values()) {
wal.getReader().seek(wal.walStartPointer);
}
wal.setup();
for (FSWindowReplayWAL wal : readOnlyWals.values()) {
wal.setup();
}
}
protected void createReadOnlyWals() throws IOException
{
Path statePath = new Path(fullStatePath);
if (fileContext.util().exists(statePath)) {
RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(statePath);
while (operatorsIter.hasNext()) {
FileStatus status = operatorsIter.next();
int operatorId = Integer.parseInt(status.getPath().getName());
if (operatorId != this.operatorId) {
//create read-only wal for other partitions
FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
readOnlyWals.put(operatorId, wal);
}
}
}
}
private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean updateWalState) throws IOException
{
String operatorDir = fullStatePath + Path.SEPARATOR + operatorId;
wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME);
wal.fileContext = fileContext;
if (updateWalState) {
if (!wal.fileDescriptors.isEmpty()) {
SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet();
wal.walStartPointer = new FileSystemWAL.FileSystemWALPointer(sortedParts.first(), 0);
FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(sortedParts.last()).last();
if (last.isTmp) {
wal.tempPartFiles.put(last.part, last.filePath.toString());
}
}
}
}
private void findFiles(FSWindowReplayWAL wal, int operatorId) throws IOException
{
String operatorDir = fullStatePath + Path.SEPARATOR + operatorId;
Path operatorPath = new Path(operatorDir);
if (fileContext.util().exists(operatorPath)) {
RemoteIterator<FileStatus> walFilesIter = fileContext.listStatus(operatorPath);
while (walFilesIter.hasNext()) {
FileStatus fileStatus = walFilesIter.next();
FSWindowReplayWAL.FileDescriptor descriptor = FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath());
wal.fileDescriptors.put(descriptor.part, descriptor);
}
}
}
private long findLargestCompletedWindow(FSWindowReplayWAL wal, Long ceilingWindow) throws IOException
{
if (!wal.fileDescriptors.isEmpty()) {
FileSystemWAL.FileSystemWALReader reader = wal.getReader();
//to find the largest window, we only need to look at the last file.
NavigableSet<Integer> descendingParts = new TreeSet<>(wal.fileDescriptors.keySet()).descendingSet();
for (int part : descendingParts) {
FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(part).last();
reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0));
long endOffset = -1;
long lastWindow = Stateless.WINDOW_ID;
Slice slice = readNext(reader);
while (slice != null) {
boolean skipComplete = skipNext(reader); //skip the artifact because we need just the largest window id.
if (!skipComplete) {
//artifact not saved so this window was not finished.
break;
}
long offset = reader.getCurrentPointer().getOffset();
long window = Longs.fromByteArray(slice.toByteArray());
if (ceilingWindow != null && window > ceilingWindow) {
break;
}
endOffset = offset;
lastWindow = window;
slice = readNext(reader); //either null or next window
}
if (endOffset != -1) {
wal.walEndPointerAfterRecovery = new FileSystemWAL.FileSystemWALPointer(last.part, endOffset);
wal.windowWalParts.put(lastWindow, wal.walEndPointerAfterRecovery.getPartNum());
return lastWindow;
}
}
}
return Stateless.WINDOW_ID;
}
/**
* Helper method that catches IOException while reading from wal to check if an entry was saved completely or not.
* @param reader wal reader
* @return wal entry
*/
protected Slice readNext(FileSystemWAL.FileSystemWALReader reader)
{
try {
return reader.next();
} catch (IOException ex) {
//exception while reading wal entry which can be because there may have been failure while persisting an
//artifact so this window is not a finished window.
try {
reader.close();
} catch (IOException ioe) {
//closing the reader quietly.
}
return null;
}
}
/**
* Helper method that catches IOException while skipping an entry from wal to check if an entry was saved
* completely or not.
* @param reader wal reader
* @return true if skip was successful; false otherwise.
*/
private boolean skipNext(FileSystemWAL.FileSystemWALReader reader)
{
try {
reader.skipNext();
return true;
} catch (IOException ex) {
//exception while skipping wal entry which can be because there may have been failure while persisting an
//artifact so this window is not a finished window.
try {
reader.close();
} catch (IOException e) {
//closing the reader quietly
}
return false;
}
}
private void closeReaders() throws IOException
{
//close all reader stream and remove read-only wals
wal.getReader().close();
if (readOnlyWals.size() > 0) {
Iterator<Map.Entry<Integer, FSWindowReplayWAL>> walIterator = readOnlyWals.entrySet().iterator();
while (walIterator.hasNext()) {
Map.Entry<Integer, FSWindowReplayWAL> entry = walIterator.next();
entry.getValue().getReader().close();
int operatorId = entry.getKey();
if (deletedOperators == null || !deletedOperators.contains(operatorId)) {
//the read only wal can be removed.
walIterator.remove();
}
}
}
}
/**
* Save writes 2 entries to the wal: <br/>
* <ol>
* <li>window id</li>
* <li>artifact</li>
* </ol>
* Note: The wal is being used in batch mode so the part file will never be rotated between the 2 entries.<br/>
* The wal part file may be rotated after both the entries, when
* {@link FileSystemWAL.FileSystemWALWriter#rotateIfNecessary()} is triggered.
*
* @param object state
* @param windowId window id
* @throws IOException
*/
@Override
public void save(Object object, long windowId) throws IOException
{
closeReaders();
FileSystemWAL.FileSystemWALWriter writer = wal.getWriter();
byte[] windowIdBytes = Longs.toByteArray(windowId);
writer.append(new Slice(windowIdBytes));
/**
* writer.append() will copy the data to the file output stream.
* So the data in the buffer is not needed any more, and it is safe to reset the serializationBuffer.
*
* And as the data in stream memory can be cleaned all at once. So don't need to separate data by different windows,
* so beginWindow() and endWindow() don't need to be called
*/
writer.append(toSlice(object));
serializationBuffer.reset();
wal.beforeCheckpoint(windowId);
wal.windowWalParts.put(windowId, writer.getCurrentPointer().getPartNum());
writer.rotateIfNecessary();
}
/**
* The implementation assumes that artifacts are retrieved in increasing order of window ids. Typically it is used
* to replay tuples of successive windows in input operators after failure.
* @param windowId window id
* @return saved state for the window id.
* @throws IOException
*/
@Override
public Object retrieve(long windowId) throws IOException
{
return retrieve(wal, windowId);
}
@Override
public Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException
{
if (windowId > largestCompletedWindow) {
return null;
}
Map<Integer, Object> artifacts = Maps.newHashMap();
Object artifact = retrieve(wal, windowId);
if (artifact != null) {
artifacts.put(operatorId, artifact);
}
if (repartitioned) {
for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) {
artifact = retrieve(entry.getValue(), windowId);
if (artifact != null) {
artifacts.put(entry.getKey(), artifact);
}
}
}
return artifacts;
}
private Object retrieve(FSWindowReplayWAL wal, long windowId) throws IOException
{
if (windowId > largestCompletedWindow || wal.walEndPointerAfterRecovery == null) {
return null;
}
FileSystemWAL.FileSystemWALReader reader = wal.getReader();
while (reader.getCurrentPointer() == null ||
reader.getCurrentPointer().compareTo(wal.walEndPointerAfterRecovery) < 0) {
long currentWindow;
if (wal.retrievedWindow == null) {
wal.retrievedWindow = readNext(reader);
Preconditions.checkNotNull(wal.retrievedWindow);
}
currentWindow = Longs.fromByteArray(wal.retrievedWindow.toByteArray());
if (windowId == currentWindow) {
Slice data = readNext(reader);
Preconditions.checkNotNull(data, "data is null");
wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum());
wal.retrievedWindow = readNext(reader); //null or next window
return fromSlice(data);
} else if (windowId < currentWindow) {
//no artifact saved corresponding to that window and artifact is not read.
return null;
} else {
//windowId > current window so we skip the data
skipNext(reader);
wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum());
wal.retrievedWindow = readNext(reader); //null or next window
if (wal.retrievedWindow == null) {
//nothing else to read
return null;
}
}
}
return null;
}
/**
* Deletes artifacts for all windows less than equal to committed window id.<p/>
*
* {@link FSWindowDataManager} uses {@link FSWindowReplayWAL} to record data which writes to temp part files.
* The temp part files are finalized only when they are rotated. So when a window is committed, artifacts for
* windows <= committed window may still be in temporary files. These temporary files are needed for Wal recovery so
* we do not alter them and we delete a part file completely (opposed to partial deletion) for efficiency.<br/>
* Therefore, data of a window gets deleted only when it satisfies all the following criteria:
* <ul>
* <li>window <= committed window id</li>
* <li>the part file of the artifact is rotated.</li>
* <li>the part file doesn't contain artifacts for windows greater than the artifact's window to avoid partial
* file deletion.</li>
* </ul>
*
* In addition to this we also delete:
* <ol>
* <li>Some stray temporary files are also deleted which correspond to completely deleted parts.</li>
* <li>Once the committed window > largest recovery window, we delete the files of partitions that were removed.</li>
* </ol>
*
* @param committedWindowId window id
* @throws IOException
*/
@Override
public void committed(long committedWindowId) throws IOException
{
closeReaders();
//find the largest window <= committed window id and the part file corresponding to it is finalized.
Map.Entry<Long, Integer> largestEntryForDeletion = null;
Iterator<Map.Entry<Long, Integer>> iterator = wal.windowWalParts.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Integer> entry = iterator.next();
//only completely finalized part files are deleted.
if (entry.getKey() <= committedWindowId && !wal.tempPartFiles.containsKey(entry.getValue())) {
largestEntryForDeletion = entry;
iterator.remove();
}
if (entry.getKey() > committedWindowId) {
break;
}
}
if (largestEntryForDeletion != null && !wal.windowWalParts.containsValue(
largestEntryForDeletion.getValue()) /* no artifacts for higher window present*/) {
int highestPartToDelete = largestEntryForDeletion.getValue();
wal.getWriter().delete(new FileSystemWAL.FileSystemWALPointer(highestPartToDelete + 1, 0));
//also delete any old stray temp files that correspond to parts < deleteTillPointer.partNum
Iterator<Map.Entry<Integer, FSWindowReplayWAL.FileDescriptor>> fileIterator =
wal.fileDescriptors.entries().iterator();
while (fileIterator.hasNext()) {
Map.Entry<Integer, FSWindowReplayWAL.FileDescriptor> entry = fileIterator.next();
if (entry.getKey() <= highestPartToDelete && entry.getValue().isTmp) {
if (fileContext.util().exists(entry.getValue().filePath)) {
fileContext.delete(entry.getValue().filePath, true);
}
} else if (entry.getKey() > highestPartToDelete) {
break;
}
}
}
//delete data of partitions that have been removed
if (deletedOperators != null) {
Iterator<Integer> operatorIter = deletedOperators.iterator();
while (operatorIter.hasNext()) {
int deletedOperatorId = operatorIter.next();
FSWindowReplayWAL wal = readOnlyWals.get(deletedOperatorId);
if (committedWindowId > largestCompletedWindow) {
Path operatorDir = new Path(fullStatePath + Path.SEPARATOR + deletedOperatorId);
if (fileContext.util().exists(operatorDir)) {
fileContext.delete(operatorDir, true);
}
wal.teardown();
operatorIter.remove();
readOnlyWals.remove(deletedOperatorId);
}
}
if (deletedOperators.isEmpty()) {
deletedOperators = null;
}
}
}
private Slice toSlice(Object object)
{
kryo.writeClassAndObject(serializationBuffer, object);
return serializationBuffer.toSlice();
}
protected Object fromSlice(Slice slice)
{
Input input = new Input(slice.buffer, slice.offset, slice.length);
Object object = kryo.readClassAndObject(input);
input.close();
return object;
}
public long getLargestCompletedWindow()
{
return largestCompletedWindow;
}
@Override
public List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds)
{
repartitioned = true;
KryoCloneUtils<FSWindowDataManager> cloneUtils = KryoCloneUtils.createCloneUtils(this);
FSWindowDataManager[] windowDataManagers = cloneUtils.getClones(newCount);
if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) {
windowDataManagers[0].deletedOperators = removedOperatorIds;
}
List<WindowDataManager> mangers = new ArrayList<>();
mangers.addAll(Arrays.asList(windowDataManagers));
return mangers;
}
@Override
public void teardown()
{
wal.teardown();
for (FSWindowReplayWAL wal : readOnlyWals.values()) {
wal.teardown();
}
}
protected void setRelyOnCheckpoints(boolean relyOnCheckpoints)
{
this.relyOnCheckpoints = relyOnCheckpoints;
}
/**
* @return wal instance
*/
protected FSWindowReplayWAL getWal()
{
return wal;
}
@VisibleForTesting
public Set<Integer> getDeletedOperators()
{
if (deletedOperators == null) {
return null;
}
return ImmutableSet.copyOf(deletedOperators);
}
/**
* @return recovery filePath
*/
public String getStatePath()
{
return statePath;
}
/**
* Sets the state path. If {@link #isStatePathRelativeToAppPath} is true then this filePath is handled
* relative
* to the application filePath; otherwise it is handled as an absolute filePath.
*
* @param statePath recovery filePath
*/
public void setStatePath(String statePath)
{
this.statePath = statePath;
}
/**
* @return true if state path is relative to app path; false otherwise.
*/
public boolean isStatePathRelativeToAppPath()
{
return isStatePathRelativeToAppPath;
}
/**
* Specifies whether the state path is relative to application filePath.
*
* @param statePathRelativeToAppPath true if state path is relative to application path; false
* otherwise.
*/
public void setStatePathRelativeToAppPath(boolean statePathRelativeToAppPath)
{
isStatePathRelativeToAppPath = statePathRelativeToAppPath;
}
private static Logger LOG = LoggerFactory.getLogger(FSWindowDataManager.class);
}