| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package com.datatorrent.lib.io; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import javax.validation.constraints.NotNull; |
| |
| import org.apache.apex.malhar.lib.wal.WindowDataManager; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.collect.TreeMultimap; |
| |
| import com.datatorrent.api.Component; |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.StorageAgent; |
| import com.datatorrent.api.annotation.Stateless; |
| import com.datatorrent.common.util.FSStorageAgent; |
| import com.datatorrent.lib.io.fs.AbstractFileInputOperator; |
| |
| /** |
| * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window. An idempotent agent |
| * cannot make any guarantees about the tuples emitted in the application window which fails. |
| * |
| * The order of tuples is guaranteed for ordered input sources. |
| * |
| * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow |
| * checkpoints to occur within an application window and checkpoints must be aligned with |
| * application window boundaries. |
| * |
| * @since 2.0.0 |
| * @deprecated use {@link WindowDataManager} |
| */ |
| @Deprecated |
| public interface IdempotentStorageManager extends StorageAgent, Component<Context.OperatorContext> |
| { |
| /** |
| * Gets the largest window for which there is recovery data. |
| * @return Returns the window id |
| */ |
| long getLargestRecoveryWindow(); |
| |
| /** |
| * When an operator can partition itself dynamically then there is no guarantee that an input state which was being handled |
| * by one instance previously will be handled by the same instance after partitioning. <br/> |
| * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no longer be the |
| * instance that handles file X after repartitioning as no. of instances may have changed and file X is re-hashed to another instance. <br/> |
| * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the operators for the window |
| * being replayed and fix it's state. |
| * |
| * @param windowId window id. |
| * @return mapping of operator id to the corresponding state |
| * @throws IOException |
| */ |
| Map<Integer, Object> load(long windowId) throws IOException; |
| |
| /** |
| * Delete the artifacts of the operator for windows <= windowId. |
| * |
| * @param operatorId |
| * @param windowId |
| * @throws IOException |
| */ |
| public void deleteUpTo(int operatorId, long windowId) throws IOException; |
| |
| /** |
| * This informs the idempotent storage manager that operator is partitioned so that it can set properties and distribute state. |
| * |
| * @param newManagers all the new idempotent storage managers. |
| * @param removedOperatorIds set of operator ids which were removed after partitioning. |
| */ |
| void partitioned(Collection<IdempotentStorageManager> newManagers, Set<Integer> removedOperatorIds); |
| |
| IdempotentStorageManager newInstance(); |
| |
| /** |
| * An {@link IdempotentStorageManager} that uses FS to persist state. |
| */ |
| public static class FSIdempotentStorageManager implements IdempotentStorageManager |
| { |
| private static final String DEF_RECOVERY_PATH = "idempotentState"; |
| |
| protected transient FSStorageAgent storageAgent; |
| |
| /** |
| * Recovery path relative to app path where state is saved. |
| */ |
| @NotNull |
| protected String recoveryPath; |
| |
| /** |
| * largest window for which there is recovery data across all physical operator instances. |
| */ |
| protected transient long largestRecoveryWindow; |
| |
| /** |
| * This is not null only for one physical instance.<br/> |
| * It consists of operator ids which have been deleted but have some state that can be replayed. |
| * Only one of the instances would be handling (modifying) the files that belong to this state. |
| */ |
| protected Set<Integer> deletedOperators; |
| |
| /** |
| * Sorted mapping from window id to all the operators that have state to replay for that window. |
| */ |
| protected final transient TreeMultimap<Long, Integer> replayState; |
| |
| protected transient FileSystem fs; |
| protected transient Path appPath; |
| |
| public FSIdempotentStorageManager() |
| { |
| replayState = TreeMultimap.create(); |
| largestRecoveryWindow = Stateless.WINDOW_ID; |
| recoveryPath = DEF_RECOVERY_PATH; |
| } |
| |
| @Override |
| public void setup(Context.OperatorContext context) |
| { |
| Configuration configuration = new Configuration(); |
| appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath); |
| |
| try { |
| storageAgent = new FSStorageAgent(appPath.toString(), configuration); |
| |
| fs = FileSystem.newInstance(appPath.toUri(), configuration); |
| |
| if (fs.exists(appPath)) { |
| FileStatus[] fileStatuses = fs.listStatus(appPath); |
| |
| for (FileStatus operatorDirStatus : fileStatuses) { |
| int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName()); |
| |
| for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) { |
| String fileName = status.getPath().getName(); |
| if (fileName.endsWith(FSStorageAgent.TMP_FILE)) { |
| continue; |
| } |
| long windowId = Long.parseLong(fileName, 16); |
| replayState.put(windowId, operatorId); |
| if (windowId > largestRecoveryWindow) { |
| largestRecoveryWindow = windowId; |
| } |
| } |
| } |
| } |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void save(Object object, int operatorId, long windowId) throws IOException |
| { |
| storageAgent.save(object, operatorId, windowId); |
| } |
| |
| @Override |
| public Object load(int operatorId, long windowId) throws IOException |
| { |
| Set<Integer> operators = replayState.get(windowId); |
| if (operators == null || !operators.contains(operatorId)) { |
| return null; |
| } |
| return storageAgent.load(operatorId, windowId); |
| } |
| |
| @Override |
| public void delete(int operatorId, long windowId) throws IOException |
| { |
| storageAgent.delete(operatorId, windowId); |
| } |
| |
| @Override |
| public Map<Integer, Object> load(long windowId) throws IOException |
| { |
| Set<Integer> operators = replayState.get(windowId); |
| if (operators == null) { |
| return null; |
| } |
| Map<Integer, Object> data = Maps.newHashMap(); |
| for (int operatorId : operators) { |
| data.put(operatorId, load(operatorId, windowId)); |
| } |
| return data; |
| } |
| |
| @Override |
| public long[] getWindowIds(int operatorId) throws IOException |
| { |
| Path operatorPath = new Path(appPath, String.valueOf(operatorId)); |
| if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) { |
| return null; |
| } |
| return storageAgent.getWindowIds(operatorId); |
| } |
| |
| /** |
| * This deletes all the recovery files of window ids <= windowId. |
| * |
| * @param operatorId operator id. |
| * @param windowId the largest window id for which the states will be deleted. |
| * @throws IOException |
| */ |
| @Override |
| public void deleteUpTo(int operatorId, long windowId) throws IOException |
| { |
| //deleting the replay state |
| if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) { |
| Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator(); |
| while (iterator.hasNext()) { |
| Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next(); |
| long lwindow = windowEntry.getKey(); |
| if (lwindow > windowId) { |
| break; |
| } |
| for (Integer loperator : windowEntry.getValue()) { |
| |
| if (deletedOperators.contains(loperator)) { |
| storageAgent.delete(loperator, lwindow); |
| |
| Path loperatorPath = new Path(appPath, Integer.toString(loperator)); |
| if (fs.listStatus(loperatorPath).length == 0) { |
| //The operator was deleted and it has nothing to replay. |
| deletedOperators.remove(loperator); |
| fs.delete(loperatorPath, true); |
| } |
| } else if (loperator == operatorId) { |
| storageAgent.delete(loperator, lwindow); |
| } |
| } |
| iterator.remove(); |
| } |
| } |
| |
| if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) { |
| long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId); |
| Arrays.sort(windowsAfterReplay); |
| for (long lwindow : windowsAfterReplay) { |
| if (lwindow <= windowId) { |
| storageAgent.delete(operatorId, lwindow); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public long getLargestRecoveryWindow() |
| { |
| return largestRecoveryWindow; |
| } |
| |
| @Override |
| public void partitioned(Collection<IdempotentStorageManager> newManagers, Set<Integer> removedOperatorIds) |
| { |
| Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(), "there has to be one idempotent storage manager"); |
| FSIdempotentStorageManager deletedOperatorsManager = null; |
| |
| if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) { |
| if (this.deletedOperators == null) { |
| this.deletedOperators = Sets.newHashSet(); |
| } |
| this.deletedOperators.addAll(removedOperatorIds); |
| } |
| |
| for (IdempotentStorageManager storageManager : newManagers) { |
| |
| FSIdempotentStorageManager lmanager = (FSIdempotentStorageManager)storageManager; |
| lmanager.recoveryPath = this.recoveryPath; |
| lmanager.storageAgent = this.storageAgent; |
| |
| if (lmanager.deletedOperators != null) { |
| deletedOperatorsManager = lmanager; |
| } |
| //only one physical instance can manage deleted operators so clearing this field for rest of the instances. |
| if (lmanager != deletedOperatorsManager) { |
| lmanager.deletedOperators = null; |
| } |
| } |
| |
| if (removedOperatorIds == null || removedOperatorIds.isEmpty()) { |
| //Nothing to do |
| return; |
| } |
| if (this.deletedOperators != null) { |
| |
| //If some operators were removed then there needs to be a manager which can clean there state when it is not needed. |
| if (deletedOperatorsManager == null) { |
| //None of the managers were handling deleted operators data. |
| deletedOperatorsManager = (FSIdempotentStorageManager)newManagers.iterator().next(); |
| deletedOperatorsManager.deletedOperators = Sets.newHashSet(); |
| } |
| |
| deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds); |
| } |
| } |
| |
| @Override |
| public void teardown() |
| { |
| try { |
| fs.close(); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public String getRecoveryPath() |
| { |
| return recoveryPath; |
| } |
| |
| public void setRecoveryPath(String recoveryPath) |
| { |
| this.recoveryPath = recoveryPath; |
| } |
| |
| @Override |
| public FSIdempotentStorageManager newInstance() |
| { |
| return new FSIdempotentStorageManager(); |
| } |
| } |
| |
| /** |
| * This {@link IdempotentStorageManager} will never do recovery. This is a convenience class so that operators |
| * can use the same logic for maintaining idempotency and avoiding idempotency. |
| */ |
| public static class NoopIdempotentStorageManager implements IdempotentStorageManager |
| { |
| @Override |
| public long getLargestRecoveryWindow() |
| { |
| return Stateless.WINDOW_ID; |
| } |
| |
| @Override |
| public Map<Integer, Object> load(long windowId) throws IOException |
| { |
| return null; |
| } |
| |
| @Override |
| public void partitioned(Collection<IdempotentStorageManager> newManagers, Set<Integer> removedOperatorIds) |
| { |
| } |
| |
| @Override |
| public void setup(Context.OperatorContext context) |
| { |
| } |
| |
| @Override |
| public void teardown() |
| { |
| } |
| |
| @Override |
| public void save(Object object, int operatorId, long windowId) throws IOException |
| { |
| } |
| |
| @Override |
| public Object load(int operatorId, long windowId) throws IOException |
| { |
| return null; |
| } |
| |
| @Override |
| public void delete(int operatorId, long windowId) throws IOException |
| { |
| } |
| |
| @Override |
| public void deleteUpTo(int operatorId, long windowId) throws IOException |
| { |
| } |
| |
| @Override |
| public long[] getWindowIds(int operatorId) throws IOException |
| { |
| return new long[0]; |
| } |
| |
| @Override |
| public NoopIdempotentStorageManager newInstance() |
| { |
| return new NoopIdempotentStorageManager(); |
| } |
| } |
| } |