| 'use strict'; |
| |
| const Readable = require('stream').Readable; |
| const EventEmitter = require('events').EventEmitter; |
| const path = require('path'); |
| const normalizeOptions = require('./normalize-options'); |
| const stat = require('./stat'); |
| const call = require('./call'); |
| |
| /** |
| * Asynchronously reads the contents of a directory and streams the results |
| * via a {@link stream.Readable}. |
| */ |
| class DirectoryReader { |
| /** |
| * @param {string} dir - The absolute or relative directory path to read |
| * @param {object} [options] - User-specified options, if any (see {@link normalizeOptions}) |
| * @param {object} internalOptions - Internal options that aren't part of the public API |
| * @class |
| */ |
| constructor (dir, options, internalOptions) { |
| this.options = options = normalizeOptions(options, internalOptions); |
| |
| // Indicates whether we should keep reading |
| // This is set false if stream.Readable.push() returns false. |
| this.shouldRead = true; |
| |
| // The directories to read |
| // (initialized with the top-level directory) |
| this.queue = [{ |
| path: dir, |
| basePath: options.basePath, |
| posixBasePath: options.posixBasePath, |
| depth: 0 |
| }]; |
| |
| // The number of directories that are currently being processed |
| this.pending = 0; |
| |
| // The data that has been read, but not yet emitted |
| this.buffer = []; |
| |
| this.stream = new Readable({ objectMode: true }); |
| this.stream._read = () => { |
| // Start (or resume) reading |
| this.shouldRead = true; |
| |
| // If we have data in the buffer, then send the next chunk |
| if (this.buffer.length > 0) { |
| this.pushFromBuffer(); |
| } |
| |
| // If we have directories queued, then start processing the next one |
| if (this.queue.length > 0) { |
| if (this.options.facade.sync) { |
| while (this.queue.length > 0) { |
| this.readNextDirectory(); |
| } |
| } |
| else { |
| this.readNextDirectory(); |
| } |
| } |
| |
| this.checkForEOF(); |
| }; |
| } |
| |
| /** |
| * Reads the next directory in the queue |
| */ |
| readNextDirectory () { |
| let facade = this.options.facade; |
| let dir = this.queue.shift(); |
| this.pending++; |
| |
| // Read the directory listing |
| call.safe(facade.fs.readdir, dir.path, (err, items) => { |
| if (err) { |
| // fs.readdir threw an error |
| this.emit('error', err); |
| return this.finishedReadingDirectory(); |
| } |
| |
| try { |
| // Process each item in the directory (simultaneously, if async) |
| facade.forEach( |
| items, |
| this.processItem.bind(this, dir), |
| this.finishedReadingDirectory.bind(this, dir) |
| ); |
| } |
| catch (err2) { |
| // facade.forEach threw an error |
| // (probably because fs.readdir returned an invalid result) |
| this.emit('error', err2); |
| this.finishedReadingDirectory(); |
| } |
| }); |
| } |
| |
| /** |
| * This method is called after all items in a directory have been processed. |
| * |
| * NOTE: This does not necessarily mean that the reader is finished, since there may still |
| * be other directories queued or pending. |
| */ |
| finishedReadingDirectory () { |
| this.pending--; |
| |
| if (this.shouldRead) { |
| // If we have directories queued, then start processing the next one |
| if (this.queue.length > 0 && this.options.facade.async) { |
| this.readNextDirectory(); |
| } |
| |
| this.checkForEOF(); |
| } |
| } |
| |
| /** |
| * Determines whether the reader has finished processing all items in all directories. |
| * If so, then the "end" event is fired (via {@Readable#push}) |
| */ |
| checkForEOF () { |
| if (this.buffer.length === 0 && // The stuff we've already read |
| this.pending === 0 && // The stuff we're currently reading |
| this.queue.length === 0) { // The stuff we haven't read yet |
| // There's no more stuff! |
| this.stream.push(null); |
| } |
| } |
| |
| /** |
| * Processes a single item in a directory. |
| * |
| * If the item is a directory, and `option.deep` is enabled, then the item will be added |
| * to the directory queue. |
| * |
| * If the item meets the filter criteria, then it will be emitted to the reader's stream. |
| * |
| * @param {object} dir - A directory object from the queue |
| * @param {string} item - The name of the item (name only, no path) |
| * @param {function} done - A callback function that is called after the item has been processed |
| */ |
| processItem (dir, item, done) { |
| let stream = this.stream; |
| let options = this.options; |
| |
| let itemPath = dir.basePath + item; |
| let posixPath = dir.posixBasePath + item; |
| let fullPath = path.join(dir.path, item); |
| |
| // If `options.deep` is a number, and we've already recursed to the max depth, |
| // then there's no need to check fs.Stats to know if it's a directory. |
| // If `options.deep` is a function, then we'll need fs.Stats |
| let maxDepthReached = dir.depth >= options.recurseDepth; |
| |
| // Do we need to call `fs.stat`? |
| let needStats = |
| !maxDepthReached || // we need the fs.Stats to know if it's a directory |
| options.stats || // the user wants fs.Stats objects returned |
| options.recurseFn || // we need fs.Stats for the recurse function |
| options.filterFn || // we need fs.Stats for the filter function |
| EventEmitter.listenerCount(stream, 'file') || // we need the fs.Stats to know if it's a file |
| EventEmitter.listenerCount(stream, 'directory') || // we need the fs.Stats to know if it's a directory |
| EventEmitter.listenerCount(stream, 'symlink'); // we need the fs.Stats to know if it's a symlink |
| |
| // If we don't need stats, then exit early |
| if (!needStats) { |
| if (this.filter(itemPath, posixPath)) { |
| this.pushOrBuffer({ data: itemPath }); |
| } |
| return done(); |
| } |
| |
| // Get the fs.Stats object for this path |
| stat(options.facade.fs, fullPath, (err, stats) => { |
| if (err) { |
| // fs.stat threw an error |
| this.emit('error', err); |
| return done(); |
| } |
| |
| try { |
| // Add the item's path to the fs.Stats object |
| // The base of this path, and its separators are determined by the options |
| // (i.e. options.basePath and options.sep) |
| stats.path = itemPath; |
| |
| // Add depth of the path to the fs.Stats object for use this in the filter function |
| stats.depth = dir.depth; |
| |
| if (this.shouldRecurse(stats, posixPath, maxDepthReached)) { |
| // Add this subdirectory to the queue |
| this.queue.push({ |
| path: fullPath, |
| basePath: itemPath + options.sep, |
| posixBasePath: posixPath + '/', |
| depth: dir.depth + 1, |
| }); |
| } |
| |
| // Determine whether this item matches the filter criteria |
| if (this.filter(stats, posixPath)) { |
| this.pushOrBuffer({ |
| data: options.stats ? stats : itemPath, |
| file: stats.isFile(), |
| directory: stats.isDirectory(), |
| symlink: stats.isSymbolicLink(), |
| }); |
| } |
| |
| done(); |
| } |
| catch (err2) { |
| // An error occurred while processing the item |
| // (probably during a user-specified function, such as options.deep, options.filter, etc.) |
| this.emit('error', err2); |
| done(); |
| } |
| }); |
| } |
| |
| /** |
| * Pushes the given chunk of data to the stream, or adds it to the buffer, |
| * depending on the state of the stream. |
| * |
| * @param {object} chunk |
| */ |
| pushOrBuffer (chunk) { |
| // Add the chunk to the buffer |
| this.buffer.push(chunk); |
| |
| // If we're still reading, then immediately emit the next chunk in the buffer |
| // (which may or may not be the chunk that we just added) |
| if (this.shouldRead) { |
| this.pushFromBuffer(); |
| } |
| } |
| |
| /** |
| * Immediately pushes the next chunk in the buffer to the reader's stream. |
| * The "data" event will always be fired (via {@link Readable#push}). |
| * In addition, the "file", "directory", and/or "symlink" events may be fired, |
| * depending on the type of properties of the chunk. |
| */ |
| pushFromBuffer () { |
| let stream = this.stream; |
| let chunk = this.buffer.shift(); |
| |
| // Stream the data |
| try { |
| this.shouldRead = stream.push(chunk.data); |
| } |
| catch (err) { |
| this.emit('error', err); |
| } |
| |
| // Also emit specific events, based on the type of chunk |
| chunk.file && this.emit('file', chunk.data); |
| chunk.symlink && this.emit('symlink', chunk.data); |
| chunk.directory && this.emit('directory', chunk.data); |
| } |
| |
| /** |
| * Determines whether the given directory meets the user-specified recursion criteria. |
| * If the user didn't specify recursion criteria, then this function will default to true. |
| * |
| * @param {fs.Stats} stats - The directory's {@link fs.Stats} object |
| * @param {string} posixPath - The item's POSIX path (used for glob matching) |
| * @param {boolean} maxDepthReached - Whether we've already crawled the user-specified depth |
| * @returns {boolean} |
| */ |
| shouldRecurse (stats, posixPath, maxDepthReached) { |
| let options = this.options; |
| |
| if (maxDepthReached) { |
| // We've already crawled to the maximum depth. So no more recursion. |
| return false; |
| } |
| else if (!stats.isDirectory()) { |
| // It's not a directory. So don't try to crawl it. |
| return false; |
| } |
| else if (options.recurseGlob) { |
| // Glob patterns are always tested against the POSIX path, even on Windows |
| // https://github.com/isaacs/node-glob#windows |
| return options.recurseGlob.test(posixPath); |
| } |
| else if (options.recurseRegExp) { |
| // Regular expressions are tested against the normal path |
| // (based on the OS or options.sep) |
| return options.recurseRegExp.test(stats.path); |
| } |
| else if (options.recurseFn) { |
| try { |
| // Run the user-specified recursion criteria |
| return options.recurseFn.call(null, stats); |
| } |
| catch (err) { |
| // An error occurred in the user's code. |
| // In Sync and Async modes, this will return an error. |
| // In Streaming mode, we emit an "error" event, but continue processing |
| this.emit('error', err); |
| } |
| } |
| else { |
| // No recursion function was specified, and we're within the maximum depth. |
| // So crawl this directory. |
| return true; |
| } |
| } |
| |
| /** |
| * Determines whether the given item meets the user-specified filter criteria. |
| * If the user didn't specify a filter, then this function will always return true. |
| * |
| * @param {string|fs.Stats} value - Either the item's path, or the item's {@link fs.Stats} object |
| * @param {string} posixPath - The item's POSIX path (used for glob matching) |
| * @returns {boolean} |
| */ |
| filter (value, posixPath) { |
| let options = this.options; |
| |
| if (options.filterGlob) { |
| // Glob patterns are always tested against the POSIX path, even on Windows |
| // https://github.com/isaacs/node-glob#windows |
| return options.filterGlob.test(posixPath); |
| } |
| else if (options.filterRegExp) { |
| // Regular expressions are tested against the normal path |
| // (based on the OS or options.sep) |
| return options.filterRegExp.test(value.path || value); |
| } |
| else if (options.filterFn) { |
| try { |
| // Run the user-specified filter function |
| return options.filterFn.call(null, value); |
| } |
| catch (err) { |
| // An error occurred in the user's code. |
| // In Sync and Async modes, this will return an error. |
| // In Streaming mode, we emit an "error" event, but continue processing |
| this.emit('error', err); |
| } |
| } |
| else { |
| // No filter was specified, so match everything |
| return true; |
| } |
| } |
| |
| /** |
| * Emits an event. If one of the event listeners throws an error, |
| * then an "error" event is emitted. |
| * |
| * @param {string} eventName |
| * @param {*} data |
| */ |
| emit (eventName, data) { |
| let stream = this.stream; |
| |
| try { |
| stream.emit(eventName, data); |
| } |
| catch (err) { |
| if (eventName === 'error') { |
| // Don't recursively emit "error" events. |
| // If the first one fails, then just throw |
| throw err; |
| } |
| else { |
| stream.emit('error', err); |
| } |
| } |
| } |
| } |
| |
| module.exports = DirectoryReader; |