1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018 |
- const proc = typeof process === 'object' && process
- ? process
- : {
- stdout: null,
- stderr: null,
- };
- import { EventEmitter } from 'node:events';
- import Stream from 'node:stream';
- import { StringDecoder } from 'node:string_decoder';
- /**
- * Return true if the argument is a Minipass stream, Node stream, or something
- * else that Minipass can interact with.
- */
- export const isStream = (s) => !!s &&
- typeof s === 'object' &&
- (s instanceof Minipass ||
- s instanceof Stream ||
- isReadable(s) ||
- isWritable(s));
- /**
- * Return true if the argument is a valid {@link Minipass.Readable}
- */
- export const isReadable = (s) => !!s &&
- typeof s === 'object' &&
- s instanceof EventEmitter &&
- typeof s.pipe === 'function' &&
- // node core Writable streams have a pipe() method, but it throws
- s.pipe !== Stream.Writable.prototype.pipe;
- /**
- * Return true if the argument is a valid {@link Minipass.Writable}
- */
- export const isWritable = (s) => !!s &&
- typeof s === 'object' &&
- s instanceof EventEmitter &&
- typeof s.write === 'function' &&
- typeof s.end === 'function';
- const EOF = Symbol('EOF');
- const MAYBE_EMIT_END = Symbol('maybeEmitEnd');
- const EMITTED_END = Symbol('emittedEnd');
- const EMITTING_END = Symbol('emittingEnd');
- const EMITTED_ERROR = Symbol('emittedError');
- const CLOSED = Symbol('closed');
- const READ = Symbol('read');
- const FLUSH = Symbol('flush');
- const FLUSHCHUNK = Symbol('flushChunk');
- const ENCODING = Symbol('encoding');
- const DECODER = Symbol('decoder');
- const FLOWING = Symbol('flowing');
- const PAUSED = Symbol('paused');
- const RESUME = Symbol('resume');
- const BUFFER = Symbol('buffer');
- const PIPES = Symbol('pipes');
- const BUFFERLENGTH = Symbol('bufferLength');
- const BUFFERPUSH = Symbol('bufferPush');
- const BUFFERSHIFT = Symbol('bufferShift');
- const OBJECTMODE = Symbol('objectMode');
- // internal event when stream is destroyed
- const DESTROYED = Symbol('destroyed');
- // internal event when stream has an error
- const ERROR = Symbol('error');
- const EMITDATA = Symbol('emitData');
- const EMITEND = Symbol('emitEnd');
- const EMITEND2 = Symbol('emitEnd2');
- const ASYNC = Symbol('async');
- const ABORT = Symbol('abort');
- const ABORTED = Symbol('aborted');
- const SIGNAL = Symbol('signal');
- const DATALISTENERS = Symbol('dataListeners');
- const DISCARDED = Symbol('discarded');
- const defer = (fn) => Promise.resolve().then(fn);
- const nodefer = (fn) => fn();
- const isEndish = (ev) => ev === 'end' || ev === 'finish' || ev === 'prefinish';
- const isArrayBufferLike = (b) => b instanceof ArrayBuffer ||
- (!!b &&
- typeof b === 'object' &&
- b.constructor &&
- b.constructor.name === 'ArrayBuffer' &&
- b.byteLength >= 0);
- const isArrayBufferView = (b) => !Buffer.isBuffer(b) && ArrayBuffer.isView(b);
- /**
- * Internal class representing a pipe to a destination stream.
- *
- * @internal
- */
- class Pipe {
- src;
- dest;
- opts;
- ondrain;
- constructor(src, dest, opts) {
- this.src = src;
- this.dest = dest;
- this.opts = opts;
- this.ondrain = () => src[RESUME]();
- this.dest.on('drain', this.ondrain);
- }
- unpipe() {
- this.dest.removeListener('drain', this.ondrain);
- }
- // only here for the prototype
- /* c8 ignore start */
- proxyErrors(_er) { }
- /* c8 ignore stop */
- end() {
- this.unpipe();
- if (this.opts.end)
- this.dest.end();
- }
- }
- /**
- * Internal class representing a pipe to a destination stream where
- * errors are proxied.
- *
- * @internal
- */
- class PipeProxyErrors extends Pipe {
- unpipe() {
- this.src.removeListener('error', this.proxyErrors);
- super.unpipe();
- }
- constructor(src, dest, opts) {
- super(src, dest, opts);
- this.proxyErrors = er => dest.emit('error', er);
- src.on('error', this.proxyErrors);
- }
- }
- const isObjectModeOptions = (o) => !!o.objectMode;
- const isEncodingOptions = (o) => !o.objectMode && !!o.encoding && o.encoding !== 'buffer';
- /**
- * Main export, the Minipass class
- *
- * `RType` is the type of data emitted, defaults to Buffer
- *
- * `WType` is the type of data to be written, if RType is buffer or string,
- * then any {@link Minipass.ContiguousData} is allowed.
- *
- * `Events` is the set of event handler signatures that this object
- * will emit, see {@link Minipass.Events}
- */
- export class Minipass extends EventEmitter {
- [FLOWING] = false;
- [PAUSED] = false;
- [PIPES] = [];
- [BUFFER] = [];
- [OBJECTMODE];
- [ENCODING];
- [ASYNC];
- [DECODER];
- [EOF] = false;
- [EMITTED_END] = false;
- [EMITTING_END] = false;
- [CLOSED] = false;
- [EMITTED_ERROR] = null;
- [BUFFERLENGTH] = 0;
- [DESTROYED] = false;
- [SIGNAL];
- [ABORTED] = false;
- [DATALISTENERS] = 0;
- [DISCARDED] = false;
- /**
- * true if the stream can be written
- */
- writable = true;
- /**
- * true if the stream can be read
- */
- readable = true;
- /**
- * If `RType` is Buffer, then options do not need to be provided.
- * Otherwise, an options object must be provided to specify either
- * {@link Minipass.SharedOptions.objectMode} or
- * {@link Minipass.SharedOptions.encoding}, as appropriate.
- */
- constructor(...args) {
- const options = (args[0] ||
- {});
- super();
- if (options.objectMode && typeof options.encoding === 'string') {
- throw new TypeError('Encoding and objectMode may not be used together');
- }
- if (isObjectModeOptions(options)) {
- this[OBJECTMODE] = true;
- this[ENCODING] = null;
- }
- else if (isEncodingOptions(options)) {
- this[ENCODING] = options.encoding;
- this[OBJECTMODE] = false;
- }
- else {
- this[OBJECTMODE] = false;
- this[ENCODING] = null;
- }
- this[ASYNC] = !!options.async;
- this[DECODER] = this[ENCODING]
- ? new StringDecoder(this[ENCODING])
- : null;
- //@ts-ignore - private option for debugging and testing
- if (options && options.debugExposeBuffer === true) {
- Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] });
- }
- //@ts-ignore - private option for debugging and testing
- if (options && options.debugExposePipes === true) {
- Object.defineProperty(this, 'pipes', { get: () => this[PIPES] });
- }
- const { signal } = options;
- if (signal) {
- this[SIGNAL] = signal;
- if (signal.aborted) {
- this[ABORT]();
- }
- else {
- signal.addEventListener('abort', () => this[ABORT]());
- }
- }
- }
- /**
- * The amount of data stored in the buffer waiting to be read.
- *
- * For Buffer strings, this will be the total byte length.
- * For string encoding streams, this will be the string character length,
- * according to JavaScript's `string.length` logic.
- * For objectMode streams, this is a count of the items waiting to be
- * emitted.
- */
- get bufferLength() {
- return this[BUFFERLENGTH];
- }
- /**
- * The `BufferEncoding` currently in use, or `null`
- */
- get encoding() {
- return this[ENCODING];
- }
- /**
- * @deprecated - This is a read only property
- */
- set encoding(_enc) {
- throw new Error('Encoding must be set at instantiation time');
- }
- /**
- * @deprecated - Encoding may only be set at instantiation time
- */
- setEncoding(_enc) {
- throw new Error('Encoding must be set at instantiation time');
- }
- /**
- * True if this is an objectMode stream
- */
- get objectMode() {
- return this[OBJECTMODE];
- }
- /**
- * @deprecated - This is a read-only property
- */
- set objectMode(_om) {
- throw new Error('objectMode must be set at instantiation time');
- }
- /**
- * true if this is an async stream
- */
- get ['async']() {
- return this[ASYNC];
- }
- /**
- * Set to true to make this stream async.
- *
- * Once set, it cannot be unset, as this would potentially cause incorrect
- * behavior. Ie, a sync stream can be made async, but an async stream
- * cannot be safely made sync.
- */
- set ['async'](a) {
- this[ASYNC] = this[ASYNC] || !!a;
- }
- // drop everything and get out of the flow completely
- [ABORT]() {
- this[ABORTED] = true;
- this.emit('abort', this[SIGNAL]?.reason);
- this.destroy(this[SIGNAL]?.reason);
- }
- /**
- * True if the stream has been aborted.
- */
- get aborted() {
- return this[ABORTED];
- }
- /**
- * No-op setter. Stream aborted status is set via the AbortSignal provided
- * in the constructor options.
- */
- set aborted(_) { }
- write(chunk, encoding, cb) {
- if (this[ABORTED])
- return false;
- if (this[EOF])
- throw new Error('write after end');
- if (this[DESTROYED]) {
- this.emit('error', Object.assign(new Error('Cannot call write after a stream was destroyed'), { code: 'ERR_STREAM_DESTROYED' }));
- return true;
- }
- if (typeof encoding === 'function') {
- cb = encoding;
- encoding = 'utf8';
- }
- if (!encoding)
- encoding = 'utf8';
- const fn = this[ASYNC] ? defer : nodefer;
- // convert array buffers and typed array views into buffers
- // at some point in the future, we may want to do the opposite!
- // leave strings and buffers as-is
- // anything is only allowed if in object mode, so throw
- if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
- if (isArrayBufferView(chunk)) {
- //@ts-ignore - sinful unsafe type changing
- chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength);
- }
- else if (isArrayBufferLike(chunk)) {
- //@ts-ignore - sinful unsafe type changing
- chunk = Buffer.from(chunk);
- }
- else if (typeof chunk !== 'string') {
- throw new Error('Non-contiguous data written to non-objectMode stream');
- }
- }
- // handle object mode up front, since it's simpler
- // this yields better performance, fewer checks later.
- if (this[OBJECTMODE]) {
- // maybe impossible?
- /* c8 ignore start */
- if (this[FLOWING] && this[BUFFERLENGTH] !== 0)
- this[FLUSH](true);
- /* c8 ignore stop */
- if (this[FLOWING])
- this.emit('data', chunk);
- else
- this[BUFFERPUSH](chunk);
- if (this[BUFFERLENGTH] !== 0)
- this.emit('readable');
- if (cb)
- fn(cb);
- return this[FLOWING];
- }
- // at this point the chunk is a buffer or string
- // don't buffer it up or send it to the decoder
- if (!chunk.length) {
- if (this[BUFFERLENGTH] !== 0)
- this.emit('readable');
- if (cb)
- fn(cb);
- return this[FLOWING];
- }
- // fast-path writing strings of same encoding to a stream with
- // an empty buffer, skipping the buffer/decoder dance
- if (typeof chunk === 'string' &&
- // unless it is a string already ready for us to use
- !(encoding === this[ENCODING] && !this[DECODER]?.lastNeed)) {
- //@ts-ignore - sinful unsafe type change
- chunk = Buffer.from(chunk, encoding);
- }
- if (Buffer.isBuffer(chunk) && this[ENCODING]) {
- //@ts-ignore - sinful unsafe type change
- chunk = this[DECODER].write(chunk);
- }
- // Note: flushing CAN potentially switch us into not-flowing mode
- if (this[FLOWING] && this[BUFFERLENGTH] !== 0)
- this[FLUSH](true);
- if (this[FLOWING])
- this.emit('data', chunk);
- else
- this[BUFFERPUSH](chunk);
- if (this[BUFFERLENGTH] !== 0)
- this.emit('readable');
- if (cb)
- fn(cb);
- return this[FLOWING];
- }
- /**
- * Low-level explicit read method.
- *
- * In objectMode, the argument is ignored, and one item is returned if
- * available.
- *
- * `n` is the number of bytes (or in the case of encoding streams,
- * characters) to consume. If `n` is not provided, then the entire buffer
- * is returned, or `null` is returned if no data is available.
- *
- * If `n` is greater that the amount of data in the internal buffer,
- * then `null` is returned.
- */
- read(n) {
- if (this[DESTROYED])
- return null;
- this[DISCARDED] = false;
- if (this[BUFFERLENGTH] === 0 ||
- n === 0 ||
- (n && n > this[BUFFERLENGTH])) {
- this[MAYBE_EMIT_END]();
- return null;
- }
- if (this[OBJECTMODE])
- n = null;
- if (this[BUFFER].length > 1 && !this[OBJECTMODE]) {
- // not object mode, so if we have an encoding, then RType is string
- // otherwise, must be Buffer
- this[BUFFER] = [
- (this[ENCODING]
- ? this[BUFFER].join('')
- : Buffer.concat(this[BUFFER], this[BUFFERLENGTH])),
- ];
- }
- const ret = this[READ](n || null, this[BUFFER][0]);
- this[MAYBE_EMIT_END]();
- return ret;
- }
- [READ](n, chunk) {
- if (this[OBJECTMODE])
- this[BUFFERSHIFT]();
- else {
- const c = chunk;
- if (n === c.length || n === null)
- this[BUFFERSHIFT]();
- else if (typeof c === 'string') {
- this[BUFFER][0] = c.slice(n);
- chunk = c.slice(0, n);
- this[BUFFERLENGTH] -= n;
- }
- else {
- this[BUFFER][0] = c.subarray(n);
- chunk = c.subarray(0, n);
- this[BUFFERLENGTH] -= n;
- }
- }
- this.emit('data', chunk);
- if (!this[BUFFER].length && !this[EOF])
- this.emit('drain');
- return chunk;
- }
- end(chunk, encoding, cb) {
- if (typeof chunk === 'function') {
- cb = chunk;
- chunk = undefined;
- }
- if (typeof encoding === 'function') {
- cb = encoding;
- encoding = 'utf8';
- }
- if (chunk !== undefined)
- this.write(chunk, encoding);
- if (cb)
- this.once('end', cb);
- this[EOF] = true;
- this.writable = false;
- // if we haven't written anything, then go ahead and emit,
- // even if we're not reading.
- // we'll re-emit if a new 'end' listener is added anyway.
- // This makes MP more suitable to write-only use cases.
- if (this[FLOWING] || !this[PAUSED])
- this[MAYBE_EMIT_END]();
- return this;
- }
- // don't let the internal resume be overwritten
- [RESUME]() {
- if (this[DESTROYED])
- return;
- if (!this[DATALISTENERS] && !this[PIPES].length) {
- this[DISCARDED] = true;
- }
- this[PAUSED] = false;
- this[FLOWING] = true;
- this.emit('resume');
- if (this[BUFFER].length)
- this[FLUSH]();
- else if (this[EOF])
- this[MAYBE_EMIT_END]();
- else
- this.emit('drain');
- }
- /**
- * Resume the stream if it is currently in a paused state
- *
- * If called when there are no pipe destinations or `data` event listeners,
- * this will place the stream in a "discarded" state, where all data will
- * be thrown away. The discarded state is removed if a pipe destination or
- * data handler is added, if pause() is called, or if any synchronous or
- * asynchronous iteration is started.
- */
- resume() {
- return this[RESUME]();
- }
- /**
- * Pause the stream
- */
- pause() {
- this[FLOWING] = false;
- this[PAUSED] = true;
- this[DISCARDED] = false;
- }
- /**
- * true if the stream has been forcibly destroyed
- */
- get destroyed() {
- return this[DESTROYED];
- }
- /**
- * true if the stream is currently in a flowing state, meaning that
- * any writes will be immediately emitted.
- */
- get flowing() {
- return this[FLOWING];
- }
- /**
- * true if the stream is currently in a paused state
- */
- get paused() {
- return this[PAUSED];
- }
- [BUFFERPUSH](chunk) {
- if (this[OBJECTMODE])
- this[BUFFERLENGTH] += 1;
- else
- this[BUFFERLENGTH] += chunk.length;
- this[BUFFER].push(chunk);
- }
- [BUFFERSHIFT]() {
- if (this[OBJECTMODE])
- this[BUFFERLENGTH] -= 1;
- else
- this[BUFFERLENGTH] -= this[BUFFER][0].length;
- return this[BUFFER].shift();
- }
- [FLUSH](noDrain = false) {
- do { } while (this[FLUSHCHUNK](this[BUFFERSHIFT]()) &&
- this[BUFFER].length);
- if (!noDrain && !this[BUFFER].length && !this[EOF])
- this.emit('drain');
- }
- [FLUSHCHUNK](chunk) {
- this.emit('data', chunk);
- return this[FLOWING];
- }
- /**
- * Pipe all data emitted by this stream into the destination provided.
- *
- * Triggers the flow of data.
- */
- pipe(dest, opts) {
- if (this[DESTROYED])
- return dest;
- this[DISCARDED] = false;
- const ended = this[EMITTED_END];
- opts = opts || {};
- if (dest === proc.stdout || dest === proc.stderr)
- opts.end = false;
- else
- opts.end = opts.end !== false;
- opts.proxyErrors = !!opts.proxyErrors;
- // piping an ended stream ends immediately
- if (ended) {
- if (opts.end)
- dest.end();
- }
- else {
- // "as" here just ignores the WType, which pipes don't care about,
- // since they're only consuming from us, and writing to the dest
- this[PIPES].push(!opts.proxyErrors
- ? new Pipe(this, dest, opts)
- : new PipeProxyErrors(this, dest, opts));
- if (this[ASYNC])
- defer(() => this[RESUME]());
- else
- this[RESUME]();
- }
- return dest;
- }
- /**
- * Fully unhook a piped destination stream.
- *
- * If the destination stream was the only consumer of this stream (ie,
- * there are no other piped destinations or `'data'` event listeners)
- * then the flow of data will stop until there is another consumer or
- * {@link Minipass#resume} is explicitly called.
- */
- unpipe(dest) {
- const p = this[PIPES].find(p => p.dest === dest);
- if (p) {
- if (this[PIPES].length === 1) {
- if (this[FLOWING] && this[DATALISTENERS] === 0) {
- this[FLOWING] = false;
- }
- this[PIPES] = [];
- }
- else
- this[PIPES].splice(this[PIPES].indexOf(p), 1);
- p.unpipe();
- }
- }
- /**
- * Alias for {@link Minipass#on}
- */
- addListener(ev, handler) {
- return this.on(ev, handler);
- }
- /**
- * Mostly identical to `EventEmitter.on`, with the following
- * behavior differences to prevent data loss and unnecessary hangs:
- *
- * - Adding a 'data' event handler will trigger the flow of data
- *
- * - Adding a 'readable' event handler when there is data waiting to be read
- * will cause 'readable' to be emitted immediately.
- *
- * - Adding an 'endish' event handler ('end', 'finish', etc.) which has
- * already passed will cause the event to be emitted immediately and all
- * handlers removed.
- *
- * - Adding an 'error' event handler after an error has been emitted will
- * cause the event to be re-emitted immediately with the error previously
- * raised.
- */
- on(ev, handler) {
- const ret = super.on(ev, handler);
- if (ev === 'data') {
- this[DISCARDED] = false;
- this[DATALISTENERS]++;
- if (!this[PIPES].length && !this[FLOWING]) {
- this[RESUME]();
- }
- }
- else if (ev === 'readable' && this[BUFFERLENGTH] !== 0) {
- super.emit('readable');
- }
- else if (isEndish(ev) && this[EMITTED_END]) {
- super.emit(ev);
- this.removeAllListeners(ev);
- }
- else if (ev === 'error' && this[EMITTED_ERROR]) {
- const h = handler;
- if (this[ASYNC])
- defer(() => h.call(this, this[EMITTED_ERROR]));
- else
- h.call(this, this[EMITTED_ERROR]);
- }
- return ret;
- }
- /**
- * Alias for {@link Minipass#off}
- */
- removeListener(ev, handler) {
- return this.off(ev, handler);
- }
- /**
- * Mostly identical to `EventEmitter.off`
- *
- * If a 'data' event handler is removed, and it was the last consumer
- * (ie, there are no pipe destinations or other 'data' event listeners),
- * then the flow of data will stop until there is another consumer or
- * {@link Minipass#resume} is explicitly called.
- */
- off(ev, handler) {
- const ret = super.off(ev, handler);
- // if we previously had listeners, and now we don't, and we don't
- // have any pipes, then stop the flow, unless it's been explicitly
- // put in a discarded flowing state via stream.resume().
- if (ev === 'data') {
- this[DATALISTENERS] = this.listeners('data').length;
- if (this[DATALISTENERS] === 0 &&
- !this[DISCARDED] &&
- !this[PIPES].length) {
- this[FLOWING] = false;
- }
- }
- return ret;
- }
- /**
- * Mostly identical to `EventEmitter.removeAllListeners`
- *
- * If all 'data' event handlers are removed, and they were the last consumer
- * (ie, there are no pipe destinations), then the flow of data will stop
- * until there is another consumer or {@link Minipass#resume} is explicitly
- * called.
- */
- removeAllListeners(ev) {
- const ret = super.removeAllListeners(ev);
- if (ev === 'data' || ev === undefined) {
- this[DATALISTENERS] = 0;
- if (!this[DISCARDED] && !this[PIPES].length) {
- this[FLOWING] = false;
- }
- }
- return ret;
- }
- /**
- * true if the 'end' event has been emitted
- */
- get emittedEnd() {
- return this[EMITTED_END];
- }
- [MAYBE_EMIT_END]() {
- if (!this[EMITTING_END] &&
- !this[EMITTED_END] &&
- !this[DESTROYED] &&
- this[BUFFER].length === 0 &&
- this[EOF]) {
- this[EMITTING_END] = true;
- this.emit('end');
- this.emit('prefinish');
- this.emit('finish');
- if (this[CLOSED])
- this.emit('close');
- this[EMITTING_END] = false;
- }
- }
- /**
- * Mostly identical to `EventEmitter.emit`, with the following
- * behavior differences to prevent data loss and unnecessary hangs:
- *
- * If the stream has been destroyed, and the event is something other
- * than 'close' or 'error', then `false` is returned and no handlers
- * are called.
- *
- * If the event is 'end', and has already been emitted, then the event
- * is ignored. If the stream is in a paused or non-flowing state, then
- * the event will be deferred until data flow resumes. If the stream is
- * async, then handlers will be called on the next tick rather than
- * immediately.
- *
- * If the event is 'close', and 'end' has not yet been emitted, then
- * the event will be deferred until after 'end' is emitted.
- *
- * If the event is 'error', and an AbortSignal was provided for the stream,
- * and there are no listeners, then the event is ignored, matching the
- * behavior of node core streams in the presense of an AbortSignal.
- *
- * If the event is 'finish' or 'prefinish', then all listeners will be
- * removed after emitting the event, to prevent double-firing.
- */
- emit(ev, ...args) {
- const data = args[0];
- // error and close are only events allowed after calling destroy()
- if (ev !== 'error' &&
- ev !== 'close' &&
- ev !== DESTROYED &&
- this[DESTROYED]) {
- return false;
- }
- else if (ev === 'data') {
- return !this[OBJECTMODE] && !data
- ? false
- : this[ASYNC]
- ? (defer(() => this[EMITDATA](data)), true)
- : this[EMITDATA](data);
- }
- else if (ev === 'end') {
- return this[EMITEND]();
- }
- else if (ev === 'close') {
- this[CLOSED] = true;
- // don't emit close before 'end' and 'finish'
- if (!this[EMITTED_END] && !this[DESTROYED])
- return false;
- const ret = super.emit('close');
- this.removeAllListeners('close');
- return ret;
- }
- else if (ev === 'error') {
- this[EMITTED_ERROR] = data;
- super.emit(ERROR, data);
- const ret = !this[SIGNAL] || this.listeners('error').length
- ? super.emit('error', data)
- : false;
- this[MAYBE_EMIT_END]();
- return ret;
- }
- else if (ev === 'resume') {
- const ret = super.emit('resume');
- this[MAYBE_EMIT_END]();
- return ret;
- }
- else if (ev === 'finish' || ev === 'prefinish') {
- const ret = super.emit(ev);
- this.removeAllListeners(ev);
- return ret;
- }
- // Some other unknown event
- const ret = super.emit(ev, ...args);
- this[MAYBE_EMIT_END]();
- return ret;
- }
- [EMITDATA](data) {
- for (const p of this[PIPES]) {
- if (p.dest.write(data) === false)
- this.pause();
- }
- const ret = this[DISCARDED] ? false : super.emit('data', data);
- this[MAYBE_EMIT_END]();
- return ret;
- }
- [EMITEND]() {
- if (this[EMITTED_END])
- return false;
- this[EMITTED_END] = true;
- this.readable = false;
- return this[ASYNC]
- ? (defer(() => this[EMITEND2]()), true)
- : this[EMITEND2]();
- }
- [EMITEND2]() {
- if (this[DECODER]) {
- const data = this[DECODER].end();
- if (data) {
- for (const p of this[PIPES]) {
- p.dest.write(data);
- }
- if (!this[DISCARDED])
- super.emit('data', data);
- }
- }
- for (const p of this[PIPES]) {
- p.end();
- }
- const ret = super.emit('end');
- this.removeAllListeners('end');
- return ret;
- }
- /**
- * Return a Promise that resolves to an array of all emitted data once
- * the stream ends.
- */
- async collect() {
- const buf = Object.assign([], {
- dataLength: 0,
- });
- if (!this[OBJECTMODE])
- buf.dataLength = 0;
- // set the promise first, in case an error is raised
- // by triggering the flow here.
- const p = this.promise();
- this.on('data', c => {
- buf.push(c);
- if (!this[OBJECTMODE])
- buf.dataLength += c.length;
- });
- await p;
- return buf;
- }
- /**
- * Return a Promise that resolves to the concatenation of all emitted data
- * once the stream ends.
- *
- * Not allowed on objectMode streams.
- */
- async concat() {
- if (this[OBJECTMODE]) {
- throw new Error('cannot concat in objectMode');
- }
- const buf = await this.collect();
- return (this[ENCODING]
- ? buf.join('')
- : Buffer.concat(buf, buf.dataLength));
- }
- /**
- * Return a void Promise that resolves once the stream ends.
- */
- async promise() {
- return new Promise((resolve, reject) => {
- this.on(DESTROYED, () => reject(new Error('stream destroyed')));
- this.on('error', er => reject(er));
- this.on('end', () => resolve());
- });
- }
- /**
- * Asynchronous `for await of` iteration.
- *
- * This will continue emitting all chunks until the stream terminates.
- */
- [Symbol.asyncIterator]() {
- // set this up front, in case the consumer doesn't call next()
- // right away.
- this[DISCARDED] = false;
- let stopped = false;
- const stop = async () => {
- this.pause();
- stopped = true;
- return { value: undefined, done: true };
- };
- const next = () => {
- if (stopped)
- return stop();
- const res = this.read();
- if (res !== null)
- return Promise.resolve({ done: false, value: res });
- if (this[EOF])
- return stop();
- let resolve;
- let reject;
- const onerr = (er) => {
- this.off('data', ondata);
- this.off('end', onend);
- this.off(DESTROYED, ondestroy);
- stop();
- reject(er);
- };
- const ondata = (value) => {
- this.off('error', onerr);
- this.off('end', onend);
- this.off(DESTROYED, ondestroy);
- this.pause();
- resolve({ value, done: !!this[EOF] });
- };
- const onend = () => {
- this.off('error', onerr);
- this.off('data', ondata);
- this.off(DESTROYED, ondestroy);
- stop();
- resolve({ done: true, value: undefined });
- };
- const ondestroy = () => onerr(new Error('stream destroyed'));
- return new Promise((res, rej) => {
- reject = rej;
- resolve = res;
- this.once(DESTROYED, ondestroy);
- this.once('error', onerr);
- this.once('end', onend);
- this.once('data', ondata);
- });
- };
- return {
- next,
- throw: stop,
- return: stop,
- [Symbol.asyncIterator]() {
- return this;
- },
- };
- }
- /**
- * Synchronous `for of` iteration.
- *
- * The iteration will terminate when the internal buffer runs out, even
- * if the stream has not yet terminated.
- */
- [Symbol.iterator]() {
- // set this up front, in case the consumer doesn't call next()
- // right away.
- this[DISCARDED] = false;
- let stopped = false;
- const stop = () => {
- this.pause();
- this.off(ERROR, stop);
- this.off(DESTROYED, stop);
- this.off('end', stop);
- stopped = true;
- return { done: true, value: undefined };
- };
- const next = () => {
- if (stopped)
- return stop();
- const value = this.read();
- return value === null ? stop() : { done: false, value };
- };
- this.once('end', stop);
- this.once(ERROR, stop);
- this.once(DESTROYED, stop);
- return {
- next,
- throw: stop,
- return: stop,
- [Symbol.iterator]() {
- return this;
- },
- };
- }
- /**
- * Destroy a stream, preventing it from being used for any further purpose.
- *
- * If the stream has a `close()` method, then it will be called on
- * destruction.
- *
- * After destruction, any attempt to write data, read data, or emit most
- * events will be ignored.
- *
- * If an error argument is provided, then it will be emitted in an
- * 'error' event.
- */
- destroy(er) {
- if (this[DESTROYED]) {
- if (er)
- this.emit('error', er);
- else
- this.emit(DESTROYED);
- return this;
- }
- this[DESTROYED] = true;
- this[DISCARDED] = true;
- // throw away all buffered data, it's never coming out
- this[BUFFER].length = 0;
- this[BUFFERLENGTH] = 0;
- const wc = this;
- if (typeof wc.close === 'function' && !this[CLOSED])
- wc.close();
- if (er)
- this.emit('error', er);
- // if no error to emit, still reject pending promises
- else
- this.emit(DESTROYED);
- return this;
- }
- /**
- * Alias for {@link isStream}
- *
- * Former export location, maintained for backwards compatibility.
- *
- * @deprecated
- */
- static get isStream() {
- return isStream;
- }
- }
- //# sourceMappingURL=index.js.map
|