| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.apex.malhar.lib.io.fs; |
| |
| import java.io.FilterOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.annotation.Nonnull; |
| import javax.validation.constraints.Min; |
| import javax.validation.constraints.NotNull; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.apex.malhar.lib.counters.BasicCounters; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang.mutable.MutableLong; |
| import org.apache.commons.lang3.mutable.MutableInt; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.AbstractFileSystem; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Options; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.RawLocalFileSystem; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Strings; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.common.cache.RemovalListener; |
| import com.google.common.cache.RemovalNotification; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| import com.datatorrent.api.Context; |
| import com.datatorrent.api.Context.OperatorContext; |
| import com.datatorrent.api.DefaultInputPort; |
| import com.datatorrent.api.Operator; |
| import com.datatorrent.api.StreamCodec; |
| import com.datatorrent.api.annotation.OperatorAnnotation; |
| import com.datatorrent.common.util.BaseOperator; |
| |
| /** |
| * This base implementation for a fault tolerant HDFS output operator, |
| * which can handle outputting to multiple files when the output file depends on the tuple. |
| * The file a tuple is output to is determined by the getFilePath method. The operator can |
| * also output files to rolling mode. In rolling mode by default file names have '.#' appended to the |
| * end, where # is an integer. A maximum length for files is specified and whenever the current output |
| * file size exceeds the maximum length, output is rolled over to a new file whose name ends in '.#+1'. |
| * <br/> |
| * <br/> |
| * <b>Note:</b> This operator maintains internal state in a map of files names to offsets in the endOffsets |
| * field. If the user configures this operator to write to an enormous |
| * number of files, there is a risk that the operator will run out of memory. In such a case the |
| * user is responsible for maintaining the internal state to prevent the operator from running out |
| * |
| * BenchMark Results |
| * ----------------- |
| * The operator writes 21 MB/s with the following configuration |
| * |
| * Container memory size=4G |
| * Tuple byte size of 32 |
| * output to a single file |
| * Application window length of 1 second |
| * |
| * The operator writes 25 MB/s with the following configuration |
| * |
| * Container memory size=4G |
| * Tuple byte size of 32 |
| * output to a single file |
| * Application window length of 30 seconds |
| * |
| * @displayName FS Writer |
| * @category Output |
| * @tags fs, file, output operator |
| * |
| * @param <INPUT> This is the input tuple type. |
| * |
| * @since 2.0.0 |
| */ |
| @OperatorAnnotation(checkpointableWithinAppWindow = false) |
| public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator implements Operator.CheckpointListener, Operator.CheckpointNotificationListener |
| { |
| private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class); |
| |
| private static final String TMP_EXTENSION = ".tmp"; |
| |
| private static final String APPEND_TMP_FILE = "_APPENDING"; |
| |
| private static final int MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION = 25; |
| |
| /** |
| * Size of the copy buffer used to restore files to checkpointed state. |
| */ |
| private static final int COPY_BUFFER_SIZE = 1024; |
| |
| /** |
| * The default number of max open files. |
| */ |
| public static final int DEFAULT_MAX_OPEN_FILES = 100; |
| |
| /** |
| * Keyname to rolling file number. |
| */ |
| protected Map<String, MutableInt> openPart; |
| |
| /** |
| * Keyname to offset of the current rolling file. |
| */ |
| protected Map<String, MutableLong> endOffsets; |
| |
| /** |
| * Keyname to counts. |
| */ |
| protected Map<String, MutableLong> counts; |
| |
| /** |
| * Filename to rotation state mapping during a rotation period. Look at {@link #rotationWindows} |
| */ |
| @NotNull |
| protected Map<String, RotationState> rotationStates; |
| |
| /** |
| * The path of the directory to where files are written. |
| */ |
| @NotNull |
| protected String filePath; |
| |
| /** |
| * The total number of bytes written by the operator. |
| */ |
| protected long totalBytesWritten = 0; |
| |
| /** |
| * The replication level for your output files. |
| */ |
| @Min(0) |
| protected int replication = 0; |
| |
| /** |
| * The maximum number of open files you can have in your streamsCache. |
| */ |
| @Min(1) |
| protected int maxOpenFiles = DEFAULT_MAX_OPEN_FILES; |
| |
| /** |
| * The maximum length in bytes of a rolling file. The default value of this is Long.MAX_VALUE |
| */ |
| @Min(1) |
| protected Long maxLength = Long.MAX_VALUE; |
| |
| /** |
| * The file rotation window interval. |
| * The files are rotated periodically after the specified value of windows have ended. If set to 0 this feature is |
| * disabled. |
| */ |
| @Min(0) |
| protected int rotationWindows = 0; |
| |
| /** |
| * True if {@link #maxLength} < {@link Long#MAX_VALUE} or {@link #rotationWindows} > 0 |
| */ |
| protected transient boolean rollingFile = false; |
| |
| /** |
| * The file system used to write to. |
| */ |
| protected transient FileSystem fs; |
| protected transient FileContext fileContext; |
| |
| protected short filePermission = 0777; |
| |
| /** |
| * This is the cache which holds open file streams. |
| */ |
| protected transient LoadingCache<String, FSFilterStreamContext> streamsCache; |
| |
| /** |
| * This is the operator context passed at setup. |
| */ |
| protected transient OperatorContext context; |
| |
| /** |
| * StopWatch tracking the total time the operator has spent writing bytes. |
| */ |
| private transient long totalWritingTime; |
| |
| /** |
| * File output counters. |
| */ |
| protected final BasicCounters<MutableLong> fileCounters = new BasicCounters<>(MutableLong.class); |
| |
| protected StreamCodec<INPUT> streamCodec; |
| |
| /** |
| * Number of windows since the last rotation |
| */ |
| private int rotationCount; |
| |
| /** |
| * If a filter stream provider is set it is used to obtain the filter that will be applied to data before it is |
| * stored in the file. If it null no filter is applied and data is written as is. Multiple filters can be chained |
| * together by using a filter stream chain provider. |
| */ |
| protected FilterStreamProvider filterStreamProvider; |
| |
| /** |
| * When true this will write to a filename.tmp instead of filename. This is added because at times when the operator |
| * gets killed the lease of the last file it was writing to is still open in hdfs. So truncating the file during |
| * recovery fails. Ideally this should be false however currently it should remain true otherwise it will lead to |
| * namenode instability and crash. |
| */ |
| protected boolean alwaysWriteToTmp = true; |
| |
| private final Map<String, String> fileNameToTmpName; |
| private final Map<Long, Set<String>> finalizedFiles; |
| protected final Map<String, MutableInt> finalizedPart; |
| |
| protected long currentWindow; |
| |
| /** |
| * The stream is expired (closed and evicted from cache) after the specified duration has passed since it was last |
| * accessed by a read or write. |
| * <p/> |
| * https://code.google.com/p/guava-libraries/wiki/CachesExplained <br/> |
| * Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a |
| * value expires, or anything of the sort. Instead, it performs small amounts of maintenance during write |
| * operations, or during occasional read operations if writes are rare.<br/> |
| * This isn't the most effective way but adds a little bit of optimization. |
| */ |
| private Long expireStreamAfterAccessMillis; |
| private final Set<String> filesWithOpenStreams; |
| |
| private transient boolean initializeContext; |
| |
| /** |
| * This input port receives incoming tuples. |
| */ |
| public final transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>() |
| { |
| @Override |
| public void process(INPUT tuple) |
| { |
| processTuple(tuple); |
| } |
| |
| @Override |
| public StreamCodec<INPUT> getStreamCodec() |
| { |
| if (AbstractFileOutputOperator.this.streamCodec == null) { |
| return super.getStreamCodec(); |
| } else { |
| return streamCodec; |
| } |
| } |
| }; |
| |
| private static class RotationState |
| { |
| boolean notEmpty; |
| boolean rotated; |
| } |
| |
| public AbstractFileOutputOperator() |
| { |
| endOffsets = Maps.newHashMap(); |
| counts = Maps.newHashMap(); |
| openPart = Maps.newHashMap(); |
| rotationStates = Maps.newHashMap(); |
| fileNameToTmpName = Maps.newHashMap(); |
| finalizedFiles = Maps.newTreeMap(); |
| finalizedPart = Maps.newHashMap(); |
| filesWithOpenStreams = Sets.newHashSet(); |
| } |
| |
| /** |
| * Override this method to change the FileSystem instance that is used by the operator. |
| * This method is mainly helpful for unit testing. |
| * @return A FileSystem object. |
| * @throws IOException |
| */ |
| protected FileSystem getFSInstance() throws IOException |
| { |
| FileSystem tempFS = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration()); |
| |
| if (tempFS instanceof LocalFileSystem) { |
| tempFS = ((LocalFileSystem)tempFS).getRaw(); |
| } |
| |
| return tempFS; |
| } |
| |
| @Override |
| public void setup(Context.OperatorContext context) |
| { |
| LOG.debug("setup initiated"); |
| if (expireStreamAfterAccessMillis == null) { |
| expireStreamAfterAccessMillis = (long)(context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * |
| context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT)); |
| } |
| rollingFile = (maxLength < Long.MAX_VALUE) || (rotationWindows > 0); |
| |
| //Getting required file system instance. |
| try { |
| fs = getFSInstance(); |
| } catch (IOException ex) { |
| throw new RuntimeException(ex); |
| } |
| |
| if (replication <= 0) { |
| replication = fs.getDefaultReplication(new Path(filePath)); |
| } |
| |
| LOG.debug("FS class {}", fs.getClass()); |
| |
| //building cache |
| RemovalListener<String, FSFilterStreamContext> removalListener = createCacheRemoveListener(); |
| CacheLoader<String, FSFilterStreamContext> loader = createCacheLoader(); |
| streamsCache = CacheBuilder.newBuilder().maximumSize(maxOpenFiles).expireAfterAccess(expireStreamAfterAccessMillis, |
| TimeUnit.MILLISECONDS).removalListener(removalListener).build(loader); |
| |
| LOG.debug("File system class: {}", fs.getClass()); |
| LOG.debug("end-offsets {}", endOffsets); |
| |
| try { |
| //Restore the files in case they were corrupted and the operator was re-deployed. |
| Path writerPath = new Path(filePath); |
| if (fs.exists(writerPath)) { |
| for (String seenFileName : endOffsets.keySet()) { |
| String seenFileNamePart = getPartFileNamePri(seenFileName); |
| LOG.debug("seenFileNamePart: {}", seenFileNamePart); |
| |
| Path activeFilePath; |
| if (alwaysWriteToTmp) { |
| String tmpFileName = fileNameToTmpName.get(seenFileNamePart); |
| activeFilePath = new Path(filePath + Path.SEPARATOR + tmpFileName); |
| } else { |
| activeFilePath = new Path(filePath + Path.SEPARATOR + seenFileNamePart); |
| } |
| |
| if (fs.exists(activeFilePath)) { |
| recoverFile(seenFileName, seenFileNamePart, activeFilePath); |
| } |
| } |
| } |
| |
| if (rollingFile) { |
| //delete the left over future rolling files produced from the previous crashed instance of this operator. |
| for (String seenFileName : endOffsets.keySet()) { |
| try { |
| Integer fileOpenPart = this.openPart.get(seenFileName).getValue(); |
| int nextPart = fileOpenPart + 1; |
| String seenPartFileName; |
| while (true) { |
| seenPartFileName = getPartFileName(seenFileName, nextPart); |
| Path activePath = null; |
| if (alwaysWriteToTmp) { |
| String tmpFileName = fileNameToTmpName.get(seenPartFileName); |
| if (tmpFileName != null) { |
| activePath = new Path(filePath + Path.SEPARATOR + tmpFileName); |
| } |
| } else { |
| activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName); |
| } |
| if (activePath == null || !fs.exists(activePath)) { |
| break; |
| } |
| |
| fs.delete(activePath, true); |
| nextPart++; |
| } |
| |
| seenPartFileName = getPartFileName(seenFileName, fileOpenPart); |
| Path activePath = null; |
| if (alwaysWriteToTmp) { |
| String tmpFileName = fileNameToTmpName.get(seenPartFileName); |
| if (tmpFileName != null) { |
| activePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName)); |
| } |
| } else { |
| activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName); |
| } |
| |
| if (activePath != null && fs.exists(activePath) && fs.getFileStatus(activePath).getLen() > maxLength) { |
| //Handle the case when restoring to a checkpoint where the current rolling file |
| //already has a length greater than max length. |
| LOG.debug("rotating file at setup."); |
| rotate(seenFileName); |
| } |
| } catch (IOException | ExecutionException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| LOG.debug("setup completed"); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| this.context = context; |
| |
| fileCounters.setCounter(Counters.TOTAL_BYTES_WRITTEN, new MutableLong()); |
| fileCounters.setCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS, new MutableLong()); |
| } |
| |
| /** |
| * Recovers a file which exists on the disk. If the length of the file is not same as the |
| * length which the operator remembers then the file is truncated. <br/> |
| * When always writing to a temporary file, then a file is restored even when the length is same as what the |
| * operator remembers however this is done only for files which had open streams that weren't closed before |
| * failure. |
| * |
| * @param filename name of the actual file. |
| * @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the |
| * latest open part file name. |
| * @param filepath path of the file. When always writing to temp file, this is the path of the temp file; |
| * otherwise path of the actual file. |
| * @throws IOException |
| */ |
| private void recoverFile(String filename, String partFileName, Path filepath) throws IOException |
| { |
| LOG.debug("path exists {}", filepath); |
| long offset = endOffsets.get(filename).longValue(); |
| FSDataInputStream inputStream = fs.open(filepath); |
| FileStatus status = fs.getFileStatus(filepath); |
| |
| if (status.getLen() != offset) { |
| LOG.info("path corrupted {} {} {}", filepath, offset, status.getLen()); |
| byte[] buffer = new byte[COPY_BUFFER_SIZE]; |
| String recoveryFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION; |
| Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName); |
| FSDataOutputStream fsOutput = openStream(recoveryFilePath, false); |
| |
| while (inputStream.getPos() < offset) { |
| long remainingBytes = offset - inputStream.getPos(); |
| int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int)remainingBytes : COPY_BUFFER_SIZE; |
| inputStream.read(buffer); |
| fsOutput.write(buffer, 0, bytesToWrite); |
| } |
| |
| flush(fsOutput); |
| fsOutput.close(); |
| inputStream.close(); |
| |
| LOG.debug("active {} recovery {} ", filepath, recoveryFilePath); |
| |
| if (alwaysWriteToTmp) { |
| //recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator |
| //is restored to an earlier check-pointed window, it will look for an older tmp. |
| fileNameToTmpName.put(partFileName, recoveryFileName); |
| } else { |
| LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath()); |
| rename(recoveryFilePath, status.getPath()); |
| } |
| } else { |
| if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) { |
| String currentTmp = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION; |
| FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false); |
| IOUtils.copy(inputStream, outputStream); |
| streamsCache.put(filename, new FSFilterStreamContext(outputStream)); |
| fileNameToTmpName.put(partFileName, currentTmp); |
| } |
| inputStream.close(); |
| } |
| } |
| |
| /** |
| * Creates the {@link CacheLoader} for loading an output stream when it is not present in the cache. |
| * @return cache loader |
| */ |
| private CacheLoader<String, FSFilterStreamContext> createCacheLoader() |
| { |
| return new CacheLoader<String, FSFilterStreamContext>() |
| { |
| @Override |
| public FSFilterStreamContext load(@Nonnull String filename) |
| { |
| if (rollingFile) { |
| RotationState state = getRotationState(filename); |
| if (rollingFile && state.rotated) { |
| openPart.get(filename).add(1); |
| state.rotated = false; |
| MutableLong offset = endOffsets.get(filename); |
| offset.setValue(0); |
| } |
| } |
| |
| String partFileName = getPartFileNamePri(filename); |
| Path originalFilePath = new Path(filePath + Path.SEPARATOR + partFileName); |
| |
| Path activeFilePath; |
| if (!alwaysWriteToTmp) { |
| activeFilePath = originalFilePath; |
| } else { |
| //MLHR-1776 : writing to tmp file |
| String tmpFileName = fileNameToTmpName.get(partFileName); |
| if (tmpFileName == null) { |
| tmpFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION; |
| fileNameToTmpName.put(partFileName, tmpFileName); |
| } |
| activeFilePath = new Path(filePath + Path.SEPARATOR + tmpFileName); |
| } |
| |
| FSDataOutputStream fsOutput; |
| |
| boolean sawThisFileBefore = endOffsets.containsKey(filename); |
| |
| try { |
| if (fs.exists(originalFilePath) || (alwaysWriteToTmp && fs.exists(activeFilePath))) { |
| if (sawThisFileBefore) { |
| FileStatus fileStatus = fs.getFileStatus(activeFilePath); |
| MutableLong endOffset = endOffsets.get(filename); |
| |
| if (endOffset != null) { |
| endOffset.setValue(fileStatus.getLen()); |
| } else { |
| endOffsets.put(filename, new MutableLong(fileStatus.getLen())); |
| } |
| |
| fsOutput = openStream(activeFilePath, true); |
| LOG.debug("appending to {}", activeFilePath); |
| } else { |
| //We never saw this file before and we don't want to append |
| //If the file is rolling we need to delete all its parts. |
| if (rollingFile) { |
| int part = 0; |
| |
| while (true) { |
| Path seenPartFilePath = new Path(filePath + Path.SEPARATOR + getPartFileName(filename, part)); |
| if (!fs.exists(seenPartFilePath)) { |
| break; |
| } |
| |
| fs.delete(seenPartFilePath, true); |
| part = part + 1; |
| } |
| |
| fsOutput = openStream(activeFilePath, false); |
| } else { |
| //Not rolling is easy, just delete the file and create it again. |
| fs.delete(activeFilePath, true); |
| if (alwaysWriteToTmp) { |
| //we need to delete original file if that exists |
| if (fs.exists(originalFilePath)) { |
| fs.delete(originalFilePath, true); |
| } |
| } |
| fsOutput = openStream(activeFilePath, false); |
| } |
| } |
| } else { |
| fsOutput = openStream(activeFilePath, false); |
| } |
| filesWithOpenStreams.add(filename); |
| |
| LOG.info("opened {}, active {}", partFileName, activeFilePath); |
| return new FSFilterStreamContext(fsOutput); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| }; |
| } |
| |
| /** |
| * Creates the removal listener which is attached to the cache. |
| * |
| * @return cache entry removal listener. |
| */ |
| private RemovalListener<String, FSFilterStreamContext> createCacheRemoveListener() |
| { |
| //When an entry is removed from the cache, removal listener is notified and it closes the output stream. |
| return new RemovalListener<String, FSFilterStreamContext>() |
| { |
| @Override |
| public void onRemoval(@Nonnull RemovalNotification<String, FSFilterStreamContext> notification) |
| { |
| FSFilterStreamContext streamContext = notification.getValue(); |
| if (streamContext != null) { |
| try { |
| String filename = notification.getKey(); |
| String partFileName = getPartFileNamePri(filename); |
| |
| LOG.info("closing {}", partFileName); |
| long start = System.currentTimeMillis(); |
| |
| closeStream(streamContext); |
| filesWithOpenStreams.remove(filename); |
| |
| totalWritingTime += System.currentTimeMillis() - start; |
| } catch (IOException e) { |
| LOG.error("removing {}", notification.getValue(), e); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| }; |
| } |
| |
| /** |
| * Opens the stream for the specified file path in either append mode or create mode. |
| * |
| * @param filepath this is the path of either the actual file or the corresponding temporary file. |
| * @param append true for opening the file in append mode; false otherwise. |
| * @return output stream. |
| * @throws IOException |
| */ |
| protected FSDataOutputStream openStream(Path filepath, boolean append) throws IOException |
| { |
| FSDataOutputStream fsOutput; |
| if (append) { |
| fsOutput = openStreamInAppendMode(filepath); |
| } else { |
| fsOutput = fs.create(filepath, (short)replication); |
| fs.setPermission(filepath, FsPermission.createImmutable(filePermission)); |
| } |
| return fsOutput; |
| } |
| |
| /** |
| * Opens the stream for the given file path in append mode. Catch the exception if the FS doesnt support |
| * append operation and calls the openStreamForNonAppendFS(). |
| * @param filepath given file path |
| * @return output stream |
| */ |
| protected FSDataOutputStream openStreamInAppendMode(Path filepath) |
| { |
| FSDataOutputStream fsOutput = null; |
| try { |
| fsOutput = fs.append(filepath); |
| } catch (IOException e) { |
| if (e.getMessage().equals("Not supported")) { |
| fsOutput = openStreamForNonAppendFS(filepath); |
| } |
| } |
| return fsOutput; |
| } |
| |
| /** |
| * Opens the stream for the given file path for the file systems which are not supported append operation. |
| * @param filepath given file path |
| * @return output stream |
| */ |
| protected FSDataOutputStream openStreamForNonAppendFS(Path filepath) |
| { |
| try { |
| Path appendTmpFile = new Path(filepath + APPEND_TMP_FILE); |
| fs.rename(filepath, appendTmpFile); |
| FSDataInputStream fsIn = fs.open(appendTmpFile); |
| FSDataOutputStream fsOut = fs.create(filepath); |
| IOUtils.copy(fsIn, fsOut); |
| flush(fsOut); |
| fsIn.close(); |
| fs.delete(appendTmpFile); |
| return fsOut; |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Closes the stream which has been removed from the cache. |
| * |
| * @param streamContext stream context which is removed from the cache. |
| * @throws IOException |
| */ |
| protected void closeStream(FSFilterStreamContext streamContext) throws IOException |
| { |
| streamContext.close(); |
| } |
| |
| /** |
| * Renames source path to destination atomically. This relies on the FileContext api. If |
| * the underlying filesystem doesn't have an {@link AbstractFileSystem} then this should be overridden. |
| * |
| * @param source source path |
| * @param destination destination path |
| * @throws IOException |
| */ |
| protected void rename(Path source, Path destination) throws IOException |
| { |
| if (fileContext == null) { |
| fileContext = FileContext.getFileContext(fs.getUri()); |
| } |
| fileContext.rename(source, destination, Options.Rename.OVERWRITE); |
| } |
| |
| /** |
| * Requests a file to be finalized. When it is writing to a rolling file, this will |
| * request for finalizing the current open part and all the prev parts which weren't requested yet. |
| * |
| * @param fileName name of the file; part file name in case of rotation. |
| * @throws IOException |
| */ |
| protected void requestFinalize(String fileName) |
| { |
| Set<String> filesPerWindow = finalizedFiles.get(currentWindow); |
| if (filesPerWindow == null) { |
| filesPerWindow = Sets.newHashSet(); |
| finalizedFiles.put(currentWindow, filesPerWindow); |
| } |
| if (rollingFile) { |
| |
| MutableInt part = finalizedPart.get(fileName); |
| if (part == null) { |
| part = new MutableInt(-1); |
| finalizedPart.put(fileName, part); |
| } |
| MutableInt currentOpenPart = openPart.get(fileName); |
| |
| for (int x = part.getValue() + 1; x <= currentOpenPart.getValue(); x++) { |
| String prevPartNotFinalized = getPartFileName(fileName, x); |
| LOG.debug("request finalize {}", prevPartNotFinalized); |
| filesPerWindow.add(prevPartNotFinalized); |
| } |
| fileName = getPartFileNamePri(fileName); |
| part.setValue(currentOpenPart.getValue()); |
| } |
| filesPerWindow.add(fileName); |
| } |
| |
| @Override |
| public void teardown() |
| { |
| List<String> fileNames = new ArrayList<String>(); |
| int numberOfFailures = 0; |
| IOException savedException = null; |
| |
| //Close all the streams you can |
| Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); |
| for (String seenFileName : openStreams.keySet()) { |
| FSFilterStreamContext fsFilterStreamContext = openStreams.get(seenFileName); |
| try { |
| long start = System.currentTimeMillis(); |
| closeStream(fsFilterStreamContext); |
| filesWithOpenStreams.remove(seenFileName); |
| totalWritingTime += System.currentTimeMillis() - start; |
| } catch (IOException ex) { |
| //Count number of failures |
| numberOfFailures++; |
| //Add names of first N failed files to list |
| if (fileNames.size() < MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) { |
| fileNames.add(seenFileName); |
| //save exception |
| savedException = ex; |
| } |
| } |
| } |
| |
| //Try to close the file system |
| boolean fsFailed = false; |
| |
| try { |
| fs.close(); |
| } catch (IOException ex) { |
| //Closing file system failed |
| savedException = ex; |
| fsFailed = true; |
| } |
| |
| //There was a failure closing resources |
| if (savedException != null) { |
| String errorMessage = ""; |
| |
| //File system failed to close |
| if (fsFailed) { |
| errorMessage += "Closing the fileSystem failed. "; |
| } |
| |
| //Print names of atmost first N files that failed to close |
| if (!fileNames.isEmpty()) { |
| errorMessage += "The following files failed closing: "; |
| } |
| |
| for (String seenFileName: fileNames) { |
| errorMessage += seenFileName + ", "; |
| } |
| |
| if (numberOfFailures > MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) { |
| errorMessage += (numberOfFailures - MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION) + |
| " more files failed."; |
| } |
| |
| //Fail |
| throw new RuntimeException(errorMessage, savedException); |
| } |
| |
| long currentTimeStamp = System.currentTimeMillis(); |
| } |
| |
| /** |
| * This method processes received tuples. |
| * Tuples are written out to the appropriate files as determined by the getFileName method. |
| * If the output port is connected incoming tuples are also converted and emitted on the appropriate output port. |
| * @param tuple An incoming tuple which needs to be processed. |
| */ |
| protected void processTuple(INPUT tuple) |
| { |
| String fileName = getFileName(tuple); |
| |
| if (Strings.isNullOrEmpty(fileName)) { |
| return; |
| } |
| |
| try { |
| FilterOutputStream fsOutput = streamsCache.get(fileName).getFilterStream(); |
| byte[] tupleBytes = getBytesForTuple(tuple); |
| long start = System.currentTimeMillis(); |
| fsOutput.write(tupleBytes); |
| totalWritingTime += System.currentTimeMillis() - start; |
| totalBytesWritten += tupleBytes.length; |
| MutableLong currentOffset = endOffsets.get(fileName); |
| |
| if (currentOffset == null) { |
| currentOffset = new MutableLong(0); |
| endOffsets.put(fileName, currentOffset); |
| } |
| |
| currentOffset.add(tupleBytes.length); |
| |
| if (rotationWindows > 0) { |
| getRotationState(fileName).notEmpty = true; |
| } |
| |
| if (rollingFile && currentOffset.longValue() > maxLength) { |
| LOG.debug("Rotating file {} {} {}", fileName, openPart.get(fileName), currentOffset.longValue()); |
| rotate(fileName); |
| } |
| |
| MutableLong count = counts.get(fileName); |
| if (count == null) { |
| count = new MutableLong(0); |
| counts.put(fileName, count); |
| } |
| |
| count.add(1); |
| } catch (IOException | ExecutionException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| /** |
| * This method rolls over to the next files. |
| * @param fileName The file that you are rolling. |
| * @throws IllegalArgumentException |
| * @throws IOException |
| * @throws ExecutionException |
| */ |
| protected void rotate(String fileName) throws IllegalArgumentException, IOException, ExecutionException |
| { |
| if (!this.getRotationState(fileName).rotated) { |
| requestFinalize(fileName); |
| counts.remove(fileName); |
| streamsCache.invalidate(fileName); |
| MutableInt mi = openPart.get(fileName); |
| LOG.debug("Part file rotated {} : {}", fileName, mi.getValue()); |
| |
| //TODO: remove this as rotateHook is deprecated. |
| String partFileName = getPartFileName(fileName, mi.getValue()); |
| rotateHook(partFileName); |
| |
| getRotationState(fileName).rotated = true; |
| } |
| } |
| |
| private RotationState getRotationState(String fileName) |
| { |
| RotationState rotationState = rotationStates.get(fileName); |
| if (rotationState == null) { |
| rotationState = new RotationState(); |
| rotationStates.put(fileName, rotationState); |
| } |
| return rotationState; |
| } |
| |
| /** |
| * This hook is called after a rolling file part has filled up and is closed. The hook is passed |
| * the name of the file part that has just completed closed. |
| * @param finishedFile The name of the file part that has just completed and closed. |
| */ |
| @Deprecated |
| protected void rotateHook(@SuppressWarnings("unused") String finishedFile) |
| { |
| //Do nothing by default |
| } |
| |
| /** |
| * This method is used to force buffers to be flushed at the end of the window. |
| * flush must be used on a local file system, so an if statement checks to |
| * make sure that hflush is used on local file systems. |
| * @param fsOutput output stream |
| * @throws IOException |
| */ |
| protected void flush(FSDataOutputStream fsOutput) throws IOException |
| { |
| if (fs instanceof LocalFileSystem || |
| fs instanceof RawLocalFileSystem) { |
| fsOutput.flush(); |
| } else { |
| fsOutput.hflush(); |
| } |
| } |
| |
| /** |
| * Gets the current rolling file name. |
| * @param fileName The base name of the files you are rolling over. |
| * @return The name of the current rolling file. |
| */ |
| protected String getPartFileNamePri(String fileName) |
| { |
| if (!rollingFile) { |
| return fileName; |
| } |
| |
| MutableInt part = openPart.get(fileName); |
| |
| if (part == null) { |
| part = new MutableInt(0); |
| openPart.put(fileName, part); |
| LOG.debug("First file part number {}", part); |
| } |
| |
| return getPartFileName(fileName, |
| part.intValue()); |
| } |
| |
| /** |
| * This method constructs the next rolling files name based on the |
| * base file name and the number in the sequence of rolling files. |
| * @param fileName The base name of the file. |
| * @param part The part number of the rolling file. |
| * @return The rolling file name. |
| */ |
| protected String getPartFileName(String fileName, int part) |
| { |
| return fileName + "." + part; |
| } |
| |
| @Override |
| public void beginWindow(long windowId) |
| { |
| // All the filter state needs to be flushed to the disk. Not all filters allow a flush option, so the filters have |
| // to be closed and reopened. If no filter being is being used then it is a essentially a noop as the underlying |
| // FSDataOutputStream is not being closed in this operation. |
| if (initializeContext) { |
| try { |
| Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); |
| for (FSFilterStreamContext streamContext : openStreams.values()) { |
| streamContext.initializeContext(); |
| } |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| initializeContext = false; |
| } |
| currentWindow = windowId; |
| } |
| |
| @Override |
| public void endWindow() |
| { |
| if (rotationWindows > 0) { |
| if (++rotationCount == rotationWindows) { |
| rotationCount = 0; |
| // Rotate the files |
| Iterator<Map.Entry<String, MutableInt>> iterator = openPart.entrySet().iterator(); |
| while (iterator.hasNext()) { |
| String filename = iterator.next().getKey(); |
| // Rotate the file if the following conditions are met |
| // 1. The file is not already rotated during this period for other reasons such as max length is reached |
| // or rotate was explicitly called externally |
| // 2. The file is not empty |
| RotationState rotationState = rotationStates.get(filename); |
| boolean rotate = false; |
| if (rotationState != null) { |
| rotate = !rotationState.rotated && rotationState.notEmpty; |
| rotationState.notEmpty = false; |
| } |
| if (rotate) { |
| try { |
| rotate(filename); |
| } catch (IOException | ExecutionException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |
| } |
| |
| fileCounters.getCounter(Counters.TOTAL_TIME_WRITING_MILLISECONDS).setValue(totalWritingTime); |
| fileCounters.getCounter(Counters.TOTAL_BYTES_WRITTEN).setValue(totalBytesWritten); |
| context.setCounters(fileCounters); |
| } |
| |
| /** |
| * This method determines the file that a received tuple is written out to. |
| * @param tuple The tuple which can be used to determine the output file name. |
| * @return The name of the file this tuple will be written out to. |
| */ |
| protected abstract String getFileName(INPUT tuple); |
| |
| /** |
| * This method converts a tuple into bytes, so that it can be written out to a filesystem. |
| * @param tuple A received tuple to be converted into bytes. |
| * @return The received tuple in byte form. |
| */ |
| protected abstract byte[] getBytesForTuple(INPUT tuple); |
| |
| /** |
| * Sets the path of the working directory where files are being written. |
| * @param dir The path of the working directory where files are being written. |
| */ |
| public void setFilePath(@Nonnull String dir) |
| { |
| this.filePath = dir; |
| } |
| |
| /** |
| * Returns the path of the working directory where files are being written. |
| * @return The path of the working directory where files are being written. |
| */ |
| public String getFilePath() |
| { |
| return this.filePath; |
| } |
| |
| /** |
| * Sets the maximum length of a an output file in bytes. By default this is Long.MAX_VALUE, |
| * if this is not Long.MAX_VALUE then the output operator is in rolling mode. |
| * @param maxLength The maximum length of an output file in bytes, when in rolling mode. |
| */ |
| public void setMaxLength(long maxLength) |
| { |
| this.maxLength = maxLength; |
| } |
| |
| /** |
| * Sets the maximum length of a an output file in bytes, when in rolling mode. By default |
| * this is null, if this is not null then the operator is in rolling mode. |
| * @return The maximum length of an output file in bytes, when in rolling mode. |
| */ |
| public long getMaxLength() |
| { |
| return maxLength; |
| } |
| |
| /** |
| * Gets the file rotation window interval. |
| * The files are rotated periodically after the specified number of windows have ended. |
| * @return The number of windows |
| */ |
| public int getRotationWindows() |
| { |
| return rotationWindows; |
| } |
| |
| /** |
| * Sets the file rotation window interval. |
| * The files are rotated periodically after the specified number of windows have ended. |
| * @param rotationWindows The number of windows |
| */ |
| public void setRotationWindows(int rotationWindows) |
| { |
| this.rotationWindows = rotationWindows; |
| } |
| |
| /** |
| * Set the maximum number of files which can be written to at a time. |
| * @param maxOpenFiles The maximum number of files which can be written to at a time. |
| */ |
| public void setMaxOpenFiles(int maxOpenFiles) |
| { |
| this.maxOpenFiles = maxOpenFiles; |
| } |
| |
| /** |
| * Get the maximum number of files which can be open. |
| * @return The maximum number of files which can be open. |
| */ |
| public int getMaxOpenFiles() |
| { |
| return this.maxOpenFiles; |
| } |
| |
| /** |
| * Get the permission on the file which is being written. |
| * @return filePermission |
| */ |
| public short getFilePermission() |
| { |
| return filePermission; |
| } |
| |
| /** |
| * Set the permission on the file which is being written. |
| * @param filePermission |
| */ |
| public void setFilePermission(short filePermission) |
| { |
| this.filePermission = filePermission; |
| } |
| |
| /** |
| * Get the filter stream provider |
| * @return The filter stream provider. |
| */ |
| public FilterStreamProvider getFilterStreamProvider() |
| { |
| return filterStreamProvider; |
| } |
| |
| /** |
| * Set the filter stream provider. When a non-null provider is specified it will be used to supply the filter that |
| * will be applied to data before it is stored in the file. |
| * @param filterStreamProvider The filter stream provider |
| */ |
| public void setFilterStreamProvider(FilterStreamProvider filterStreamProvider) |
| { |
| this.filterStreamProvider = filterStreamProvider; |
| } |
| |
| public static enum Counters |
| { |
| /** |
| * An enum for counters representing the total number of bytes written |
| * by the operator. |
| */ |
| TOTAL_BYTES_WRITTEN, |
| |
| /** |
| * An enum for counters representing the total time the operator has |
| * been operational for. |
| */ |
| TOTAL_TIME_WRITING_MILLISECONDS |
| } |
| |
| protected class FSFilterStreamContext implements FilterStreamContext<FilterOutputStream> |
| { |
| |
| private FSDataOutputStream outputStream; |
| |
| private FilterStreamContext filterContext; |
| private NonCloseableFilterOutputStream outputWrapper; |
| |
| public FSFilterStreamContext(FSDataOutputStream outputStream) throws IOException |
| { |
| this.outputStream = outputStream; |
| outputWrapper = new NonCloseableFilterOutputStream(outputStream); |
| //resetFilter(); |
| initializeContext(); |
| } |
| |
| @Override |
| public FilterOutputStream getFilterStream() |
| { |
| if (filterContext != null) { |
| return filterContext.getFilterStream(); |
| } |
| return outputStream; |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void finalizeContext() throws IOException |
| { |
| if (filterContext != null) { |
| filterContext.finalizeContext(); |
| outputWrapper.flush(); |
| } |
| outputStream.hflush(); |
| if (filterStreamProvider != null) { |
| filterStreamProvider.reclaimFilterStreamContext(filterContext); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void initializeContext() throws IOException |
| { |
| if (filterStreamProvider != null) { |
| filterContext = filterStreamProvider.getFilterStreamContext(outputWrapper); |
| } |
| } |
| |
| public void close() throws IOException |
| { |
| //finalizeContext(); |
| if (filterContext != null) { |
| filterContext.getFilterStream().close(); |
| } |
| outputStream.close(); |
| } |
| |
| } |
| |
| private static class NonCloseableFilterOutputStream extends FilterOutputStream |
| { |
| public NonCloseableFilterOutputStream(OutputStream out) |
| { |
| super(out); |
| } |
| |
| @Override |
| public void close() throws IOException |
| { |
| } |
| } |
| |
| @Override |
| public void beforeCheckpoint(long l) |
| { |
| try { |
| Map<String, FSFilterStreamContext> openStreams = streamsCache.asMap(); |
| for (FSFilterStreamContext streamContext: openStreams.values()) { |
| long start = System.currentTimeMillis(); |
| streamContext.finalizeContext(); |
| totalWritingTime += System.currentTimeMillis() - start; |
| // Re-initialize context when next window starts after checkpoint |
| initializeContext = true; |
| } |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public void checkpointed(long l) |
| { |
| } |
| |
| @Override |
| public void committed(long l) |
| { |
| if (alwaysWriteToTmp) { |
| Iterator<Map.Entry<Long, Set<String>>> finalizedFilesIter = finalizedFiles.entrySet().iterator(); |
| try { |
| |
| while (finalizedFilesIter.hasNext()) { |
| Map.Entry<Long, Set<String>> filesPerWindow = finalizedFilesIter.next(); |
| if (filesPerWindow.getKey() > l) { |
| break; |
| } |
| for (String file : filesPerWindow.getValue()) { |
| finalizeFile(file); |
| } |
| finalizedFilesIter.remove(); |
| } |
| } catch (IOException e) { |
| throw new RuntimeException("failed to commit", e); |
| } |
| } |
| } |
| |
| /** |
| * Finalizing a file means that the same file will never be open again. |
| * |
| * @param fileName name of the file to finalize |
| */ |
| protected void finalizeFile(String fileName) throws IOException |
| { |
| String tmpFileName = fileNameToTmpName.get(fileName); |
| Path srcPath = new Path(filePath + Path.SEPARATOR + tmpFileName); |
| Path destPath = new Path(filePath + Path.SEPARATOR + fileName); |
| |
| if (!fs.exists(destPath)) { |
| LOG.debug("rename from tmp {} actual {} ", tmpFileName, fileName); |
| rename(srcPath, destPath); |
| } else if (fs.exists(srcPath)) { |
| /*if the destination and src both exists that means there was a failure between file rename and clearing the |
| endOffset so we just delete the tmp file*/ |
| LOG.debug("deleting tmp {}", tmpFileName); |
| fs.delete(srcPath, true); |
| } |
| endOffsets.remove(fileName); |
| fileNameToTmpName.remove(fileName); |
| |
| //when writing to tmp files there can be vagrant tmp files which we have to clean |
| FileStatus[] statuses = fs.listStatus(destPath.getParent()); |
| for (FileStatus status : statuses) { |
| String statusName = status.getPath().getName(); |
| if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(destPath.getName())) { |
| //a tmp file has tmp extension always preceded by timestamp |
| String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1)); |
| if (fileName.equals(actualFileName)) { |
| LOG.debug("deleting stray file {}", statusName); |
| fs.delete(status.getPath(), true); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @return true if writing to a tmp file rather than the actual file. false otherwise. |
| */ |
| public boolean isAlwaysWriteToTmp() |
| { |
| return alwaysWriteToTmp; |
| } |
| |
| /** |
| * This controls if data is always written to a tmp file rather than the actual file. Tmp files are renamed to actual |
| * files when files are finalized. |
| * @param alwaysWriteToTmp true if write to a tmp file; false otherwise. |
| */ |
| public void setAlwaysWriteToTmp(boolean alwaysWriteToTmp) |
| { |
| this.alwaysWriteToTmp = alwaysWriteToTmp; |
| } |
| |
| @VisibleForTesting |
| protected Map<String, String> getFileNameToTmpName() |
| { |
| return fileNameToTmpName; |
| } |
| |
| @VisibleForTesting |
| protected Map<Long, Set<String>> getFinalizedFiles() |
| { |
| return finalizedFiles; |
| } |
| |
| public Long getExpireStreamAfterAccessMillis() |
| { |
| return expireStreamAfterAccessMillis; |
| } |
| |
| /** |
| * Sets the duration after which the stream is expired (closed and removed from the cache) since it was last accessed |
| * by a read or write. |
| * |
| * @param millis time in millis. |
| */ |
| public void setExpireStreamAfterAccessMillis(Long millis) |
| { |
| this.expireStreamAfterAccessMillis = millis; |
| } |
| |
| /** |
| * Return the filter to use. If this method returns a filter the filter is applied to data before the data is stored |
| * in the file. If it returns null no filter is applied and data is written as is. Override this method to provide |
| * the filter implementation. Multiple filters can be chained together to return a chain filter. |
| * |
| * @param outputStream |
| * @return |
| */ |
| /* |
| protected FilterStreamContext getStreamContext(OutputStream outputStream) throws IOException |
| { |
| return null; |
| } |
| */ |
| |
| } |