blob: 11b68a722ab217de62e5f50cca1cc7b660c003d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.lib.io;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
/**
* An idempotent storage manager allows an operator to emit the same tuples in every replayed application window. An idempotent agent
* cannot make any guarantees about the tuples emitted in the application window which fails.
*
* The order of tuples is guaranteed for ordered input sources.
*
* <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow
* checkpoints to occur within an application window and checkpoints must be aligned with
* application window boundaries.
*
* @since 2.0.0
* @deprecated use {@link WindowDataManager}
*/
@Deprecated
public interface IdempotentStorageManager extends StorageAgent, Component<Context.OperatorContext>
{
/**
* Gets the largest window for which there is recovery data.
* @return Returns the window id
*/
long getLargestRecoveryWindow();
/**
* When an operator can partition itself dynamically then there is no guarantee that an input state which was being handled
* by one instance previously will be handled by the same instance after partitioning. <br/>
* For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no longer be the
* instance that handles file X after repartitioning as no. of instances may have changed and file X is re-hashed to another instance. <br/>
* The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the operators for the window
* being replayed and fix it's state.
*
* @param windowId window id.
* @return mapping of operator id to the corresponding state
* @throws IOException
*/
Map<Integer, Object> load(long windowId) throws IOException;
/**
* Delete the artifacts of the operator for windows <= windowId.
*
* @param operatorId
* @param windowId
* @throws IOException
*/
public void deleteUpTo(int operatorId, long windowId) throws IOException;
/**
* This informs the idempotent storage manager that operator is partitioned so that it can set properties and distribute state.
*
* @param newManagers all the new idempotent storage managers.
* @param removedOperatorIds set of operator ids which were removed after partitioning.
*/
void partitioned(Collection<IdempotentStorageManager> newManagers, Set<Integer> removedOperatorIds);
IdempotentStorageManager newInstance();
/**
* An {@link IdempotentStorageManager} that uses FS to persist state.
*/
public static class FSIdempotentStorageManager implements IdempotentStorageManager
{
private static final String DEF_RECOVERY_PATH = "idempotentState";
protected transient FSStorageAgent storageAgent;
/**
* Recovery path relative to app path where state is saved.
*/
@NotNull
protected String recoveryPath;
/**
* largest window for which there is recovery data across all physical operator instances.
*/
protected transient long largestRecoveryWindow;
/**
* This is not null only for one physical instance.<br/>
* It consists of operator ids which have been deleted but have some state that can be replayed.
* Only one of the instances would be handling (modifying) the files that belong to this state.
*/
protected Set<Integer> deletedOperators;
/**
* Sorted mapping from window id to all the operators that have state to replay for that window.
*/
protected final transient TreeMultimap<Long, Integer> replayState;
protected transient FileSystem fs;
protected transient Path appPath;
public FSIdempotentStorageManager()
{
replayState = TreeMultimap.create();
largestRecoveryWindow = Stateless.WINDOW_ID;
recoveryPath = DEF_RECOVERY_PATH;
}
@Override
public void setup(Context.OperatorContext context)
{
Configuration configuration = new Configuration();
appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
try {
storageAgent = new FSStorageAgent(appPath.toString(), configuration);
fs = FileSystem.newInstance(appPath.toUri(), configuration);
if (fs.exists(appPath)) {
FileStatus[] fileStatuses = fs.listStatus(appPath);
for (FileStatus operatorDirStatus : fileStatuses) {
int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
String fileName = status.getPath().getName();
if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
continue;
}
long windowId = Long.parseLong(fileName, 16);
replayState.put(windowId, operatorId);
if (windowId > largestRecoveryWindow) {
largestRecoveryWindow = windowId;
}
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void save(Object object, int operatorId, long windowId) throws IOException
{
storageAgent.save(object, operatorId, windowId);
}
@Override
public Object load(int operatorId, long windowId) throws IOException
{
Set<Integer> operators = replayState.get(windowId);
if (operators == null || !operators.contains(operatorId)) {
return null;
}
return storageAgent.load(operatorId, windowId);
}
@Override
public void delete(int operatorId, long windowId) throws IOException
{
storageAgent.delete(operatorId, windowId);
}
@Override
public Map<Integer, Object> load(long windowId) throws IOException
{
Set<Integer> operators = replayState.get(windowId);
if (operators == null) {
return null;
}
Map<Integer, Object> data = Maps.newHashMap();
for (int operatorId : operators) {
data.put(operatorId, load(operatorId, windowId));
}
return data;
}
@Override
public long[] getWindowIds(int operatorId) throws IOException
{
Path operatorPath = new Path(appPath, String.valueOf(operatorId));
if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) {
return null;
}
return storageAgent.getWindowIds(operatorId);
}
/**
* This deletes all the recovery files of window ids <= windowId.
*
* @param operatorId operator id.
* @param windowId the largest window id for which the states will be deleted.
* @throws IOException
*/
@Override
public void deleteUpTo(int operatorId, long windowId) throws IOException
{
//deleting the replay state
if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
long lwindow = windowEntry.getKey();
if (lwindow > windowId) {
break;
}
for (Integer loperator : windowEntry.getValue()) {
if (deletedOperators.contains(loperator)) {
storageAgent.delete(loperator, lwindow);
Path loperatorPath = new Path(appPath, Integer.toString(loperator));
if (fs.listStatus(loperatorPath).length == 0) {
//The operator was deleted and it has nothing to replay.
deletedOperators.remove(loperator);
fs.delete(loperatorPath, true);
}
} else if (loperator == operatorId) {
storageAgent.delete(loperator, lwindow);
}
}
iterator.remove();
}
}
if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) {
long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId);
Arrays.sort(windowsAfterReplay);
for (long lwindow : windowsAfterReplay) {
if (lwindow <= windowId) {
storageAgent.delete(operatorId, lwindow);
}
}
}
}
@Override
public long getLargestRecoveryWindow()
{
return largestRecoveryWindow;
}
@Override
public void partitioned(Collection<IdempotentStorageManager> newManagers, Set<Integer> removedOperatorIds)
{
Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(), "there has to be one idempotent storage manager");
FSIdempotentStorageManager deletedOperatorsManager = null;
if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) {
if (this.deletedOperators == null) {
this.deletedOperators = Sets.newHashSet();
}
this.deletedOperators.addAll(removedOperatorIds);
}
for (IdempotentStorageManager storageManager : newManagers) {
FSIdempotentStorageManager lmanager = (FSIdempotentStorageManager)storageManager;
lmanager.recoveryPath = this.recoveryPath;
lmanager.storageAgent = this.storageAgent;
if (lmanager.deletedOperators != null) {
deletedOperatorsManager = lmanager;
}
//only one physical instance can manage deleted operators so clearing this field for rest of the instances.
if (lmanager != deletedOperatorsManager) {
lmanager.deletedOperators = null;
}
}
if (removedOperatorIds == null || removedOperatorIds.isEmpty()) {
//Nothing to do
return;
}
if (this.deletedOperators != null) {
//If some operators were removed then there needs to be a manager which can clean there state when it is not needed.
if (deletedOperatorsManager == null) {
//None of the managers were handling deleted operators data.
deletedOperatorsManager = (FSIdempotentStorageManager)newManagers.iterator().next();
deletedOperatorsManager.deletedOperators = Sets.newHashSet();
}
deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds);
}
}
@Override
public void teardown()
{
try {
fs.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public String getRecoveryPath()
{
return recoveryPath;
}
public void setRecoveryPath(String recoveryPath)
{
this.recoveryPath = recoveryPath;
}
@Override
public FSIdempotentStorageManager newInstance()
{
return new FSIdempotentStorageManager();
}
}
/**
* This {@link IdempotentStorageManager} will never do recovery. This is a convenience class so that operators
* can use the same logic for maintaining idempotency and avoiding idempotency.
*/
public static class NoopIdempotentStorageManager implements IdempotentStorageManager
{
@Override
public long getLargestRecoveryWindow()
{
return Stateless.WINDOW_ID;
}
@Override
public Map<Integer, Object> load(long windowId) throws IOException
{
return null;
}
@Override
public void partitioned(Collection<IdempotentStorageManager> newManagers, Set<Integer> removedOperatorIds)
{
}
@Override
public void setup(Context.OperatorContext context)
{
}
@Override
public void teardown()
{
}
@Override
public void save(Object object, int operatorId, long windowId) throws IOException
{
}
@Override
public Object load(int operatorId, long windowId) throws IOException
{
return null;
}
@Override
public void delete(int operatorId, long windowId) throws IOException
{
}
@Override
public void deleteUpTo(int operatorId, long windowId) throws IOException
{
}
@Override
public long[] getWindowIds(int operatorId) throws IOException
{
return new long[0];
}
@Override
public NoopIdempotentStorageManager newInstance()
{
return new NoopIdempotentStorageManager();
}
}
}