blob: e5c506692ac0cf7eee41e09ae9acff9347dba749 [file] [log] [blame]
export class PipeIterator<T> implements IterableIterator<T> {
constructor(protected iterator: IterableIterator<T>, protected encoding?: any) {}
[Symbol.iterator]() { return this.iterator; }
next(value?: any) { return this.iterator.next(value); }
throw(error?: any) {
if (typeof this.iterator.throw === 'function') {
return this.iterator.throw(error);
}
return { done: true, value: null as any };
}
return(value?: any) {
if (typeof this.iterator.return === 'function') {
return this.iterator.return(value);
}
return { done: true, value: null as any };
}
pipe(stream: NodeJS.WritableStream) {
let { encoding } = this;
let res: IteratorResult<T>;
let write = (err?: any) => {
stream['removeListener']('error', write);
stream['removeListener']('drain', write);
if (err) { return this.throw(err); }
if (stream['writable']) {
do {
if ((res = this.next()).done) { break; }
} while (emit(stream, encoding, res.value));
}
return wait(stream, res && res.done, write);
};
write();
return stream;
}
}
export class AsyncPipeIterator<T> implements AsyncIterableIterator<T> {
constructor(protected iterator: AsyncIterableIterator<T>, protected encoding?: any) {}
[Symbol.asyncIterator]() { return this.iterator; }
next(value?: any) { return this.iterator.next(value); }
async throw(error?: any) {
if (typeof this.iterator.throw === 'function') {
return this.iterator.throw(error);
}
return { done: true, value: null as any };
}
async return(value?: any) {
if (typeof this.iterator.return === 'function') {
return this.iterator.return(value);
}
return { done: true, value: null as any };
}
pipe(stream: NodeJS.WritableStream) {
let { encoding } = this;
let res: IteratorResult<T>;
let write = async (err?: any) => {
stream['removeListener']('error', write);
stream['removeListener']('drain', write);
if (err) { return this.throw(err); }
if (stream['writable']) {
do {
if ((res = await this.next()).done) { break; }
} while (emit(stream, encoding, res.value));
}
return wait(stream, res && res.done, write);
};
write();
return stream;
}
}
const toBufferOrUint8Array = (() => {
// If in node, convert Uint8Arrays to Buffer instances. This is necessary
// because some node APIs ('http' etc.) don't work unless you give them Buffers.
// This eval also defeats closure-compiler, which doesn't recognize the Buffer constructor.
const BufferCtor = eval('typeof Buffer !== "undefined" ? Buffer : null');
return !BufferCtor ? (arr: Uint8Array) => arr :
(arr: Uint8Array) => BufferCtor.from(arr.buffer, arr.byteOffset, arr.byteLength);
})();
function emit(stream: NodeJS.WritableStream, encoding: string, value: any) {
return stream['write']((encoding === 'utf8' ? value + '\n' : toBufferOrUint8Array(value)) as any, encoding);
}
function wait(stream: NodeJS.WritableStream, done: boolean, write: (x?: any) => void) {
const p = eval('process'); // defeat closure compiler
if (!done) {
stream['once']('error', write);
stream['once']('drain', write);
} else if (!(!p || stream === p.stdout) && !(stream as any)['isTTY']) {
stream['end']();
}
}