Skip to main content
ReadableBase - node__stream.d.ts - Node documentation
class ReadableBase
implements [NodeJS.ReadableStream]
extends Stream

Usage in Deno

```typescript import { ReadableBase } from "node:node__stream.d.ts"; ```

Constructors

new
ReadableBase(opts?: ReadableOptions)

Properties

readonly
closed: boolean
Is `true` after `'close'` has been emitted.
destroyed: boolean
Is `true` after `readable.destroy()` has been called.
readonly
errored: Error | null
Returns error if the stream has been destroyed with an error.
readable: boolean
Is `true` if it is safe to call read, which means the stream has not been destroyed or emitted `'error'` or `'end'`.
readonly
readableAborted: boolean
Returns whether the stream was destroyed or errored before emitting `'end'`.
readonly
readableDidRead: boolean
Returns whether `'data'` has been emitted.
readonly
readableEncoding: BufferEncoding | null
Getter for the property `encoding` of a given `Readable` stream. The `encoding` property can be set using the setEncoding method.
readonly
readableEnded: boolean
Becomes `true` when [`'end'`](https://nodejs.org/docs/latest-v22.x/api/stream.html#event-end) event is emitted.
readonly
readableFlowing: boolean | null
This property reflects the current state of a `Readable` stream as described in the [Three states](https://nodejs.org/docs/latest-v22.x/api/stream.html#three-states) section.
readonly
readableHighWaterMark: number
Returns the value of `highWaterMark` passed when creating this `Readable`.
readonly
readableLength: number
This property contains the number of bytes (or objects) in the queue ready to be read. The value provides introspection data regarding the status of the `highWaterMark`.
readonly
readableObjectMode: boolean
Getter for the property `objectMode` of a given `Readable` stream.

Methods

[Symbol.asyncDispose](): Promise<void>
Calls `readable.destroy()` with an `AbortError` and returns a promise that fulfills when the stream is finished.
[Symbol.asyncIterator](): AsyncIterableIterator<any>
abstract
_construct(callback: (error?: Error | null) => void): void
_destroy(
error: Error | null,
callback: (error?: Error | null) => void,
): void
_read(size: number): void
addListener(
event: "close",
listener: () => void,
): this
Event emitter The defined events on documents including: 1. close 2. data 3. end 4. error 5. pause 6. readable 7. resume
addListener(
event: "data",
listener: (chunk: any) => void,
): this
addListener(
event: "end",
listener: () => void,
): this
addListener(
event: "error",
listener: (err: Error) => void,
): this
addListener(
event: "pause",
listener: () => void,
): this
addListener(
event: "readable",
listener: () => void,
): this
addListener(
event: "resume",
listener: () => void,
): this
addListener(
event: string | symbol,
listener: (...args: any[]) => void,
): this
asIndexedPairs(options?: Pick<ArrayOptions, "signal">): Readable
This method returns a new stream with chunks of the underlying stream paired with a counter in the form `[index, chunk]`. The first index value is `0` and it increases by 1 for each chunk produced.
destroy(error?: Error): this
Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'` event (unless `emitClose` is set to `false`). After this call, the readable stream will release any internal resources and subsequent calls to `push()` will be ignored. Once `destroy()` has been called any further calls will be a no-op and no further errors except from `_destroy()` may be emitted as `'error'`. Implementors should not override this method, but instead implement `readable._destroy()`.
drop(
limit: number,
options?: Pick<ArrayOptions, "signal">,
): Readable
This method returns a new stream with the first *limit* chunks dropped from the start.
emit(event: "close"): boolean
emit(
event: "data",
chunk: any,
): boolean
emit(event: "end"): boolean
emit(
event: "error",
err: Error,
): boolean
emit(event: "pause"): boolean
emit(event: "readable"): boolean
emit(event: "resume"): boolean
emit(
event: string | symbol,
...args: any[],
): boolean
every(
fn: (
data: any,
options?: Pick<ArrayOptions, "signal">,
) => boolean | Promise<boolean>
,
options?: ArrayOptions,
): Promise<boolean>
This method is similar to `Array.prototype.every` and calls *fn* on each chunk in the stream to check if all awaited return values are truthy value for *fn*. Once an *fn* call on a chunk `await`ed return value is falsy, the stream is destroyed and the promise is fulfilled with `false`. If all of the *fn* calls on the chunks return a truthy value, the promise is fulfilled with `true`.
filter(
fn: (
data: any,
options?: Pick<ArrayOptions, "signal">,
) => boolean | Promise<boolean>
,
options?: ArrayOptions,
): Readable
This method allows filtering the stream. For each chunk in the stream the *fn* function will be called and if it returns a truthy value, the chunk will be passed to the result stream. If the *fn* function returns a promise - that promise will be `await`ed.
find<T>(
fn: (
data: any,
options?: Pick<ArrayOptions, "signal">,
) => data is T
,
options?: ArrayOptions,
): Promise<T | undefined>
This method is similar to `Array.prototype.find` and calls *fn* on each chunk in the stream to find a chunk with a truthy value for *fn*. Once an *fn* call's awaited return value is truthy, the stream is destroyed and the promise is fulfilled with value for which *fn* returned a truthy value. If all of the *fn* calls on the chunks return a falsy value, the promise is fulfilled with `undefined`.
find(
fn: (
data: any,
options?: Pick<ArrayOptions, "signal">,
) => boolean | Promise<boolean>
,
options?: ArrayOptions,
): Promise<any>
flatMap(
fn: (
data: any,
options?: Pick<ArrayOptions, "signal">,
) => any
,
options?: ArrayOptions,
): Readable
This method returns a new stream by applying the given callback to each chunk of the stream and then flattening the result. It is possible to return a stream or another iterable or async iterable from *fn* and the result streams will be merged (flattened) into the returned stream.
forEach(
fn: (
data: any,
options?: Pick<ArrayOptions, "signal">,
) => void | Promise<void>
,
options?: ArrayOptions,
): Promise<void>
This method allows iterating a stream. For each chunk in the stream the *fn* function will be called. If the *fn* function returns a promise - that promise will be `await`ed. This method is different from `for await...of` loops in that it can optionally process chunks concurrently. In addition, a `forEach` iteration can only be stopped by having passed a `signal` option and aborting the related AbortController while `for await...of` can be stopped with `break` or `return`. In either case the stream will be destroyed. This method is different from listening to the `'data'` event in that it uses the `readable` event in the underlying machinary and can limit the number of concurrent *fn* calls.
isPaused(): boolean
The `readable.isPaused()` method returns the current operating state of the `Readable`. This is used primarily by the mechanism that underlies the `readable.pipe()` method. In most typical cases, there will be no reason to use this method directly. ```js const readable = new stream.Readable(); readable.isPaused(); // === false readable.pause(); readable.isPaused(); // === true readable.resume(); readable.isPaused(); // === false ```
iterator(options?: { destroyOnReturn?: boolean; }): AsyncIterableIterator<any>
The iterator created by this method gives users the option to cancel the destruction of the stream if the `for await...of` loop is exited by `return`, `break`, or `throw`, or if the iterator should destroy the stream if the stream emitted an error during iteration.
map(
fn: (
data: any,
options?: Pick<ArrayOptions, "signal">,
) => any
,
options?: ArrayOptions,
): Readable
This method allows mapping over the stream. The *fn* function will be called for every chunk in the stream. If the *fn* function returns a promise - that promise will be `await`ed before being passed to the result stream.
on(
event: "close",
listener: () => void,
): this
on(
event: "data",
listener: (chunk: any) => void,
): this
on(
event: "end",
listener: () => void,
): this
on(
event: "error",
listener: (err: Error) => void,
): this
on(
event: "pause",
listener: () => void,
): this
on(
event: "readable",
listener: () => void,
): this
on(
event: "resume",
listener: () => void,
): this
on(
event: string | symbol,
listener: (...args: any[]) => void,
): this
once(
event: "close",
listener: () => void,
): this
once(
event: "data",
listener: (chunk: any) => void,
): this
once(
event: "end",
listener: () => void,
): this
once(
event: "error",
listener: (err: Error) => void,
): this
once(
event: "pause",
listener: () => void,
): this
once(
event: "readable",
listener: () => void,
): this
once(
event: "resume",
listener: () => void,
): this
once(
event: string | symbol,
listener: (...args: any[]) => void,
): this
pause(): this
The `readable.pause()` method will cause a stream in flowing mode to stop emitting `'data'` events, switching out of flowing mode. Any data that becomes available will remain in the internal buffer. ```js const readable = getReadableStreamSomehow(); readable.on('data', (chunk) => { console.log(`Received ${chunk.length} bytes of data.`); readable.pause(); console.log('There will be no additional data for 1 second.'); setTimeout(() => { console.log('Now data will start flowing again.'); readable.resume(); }, 1000); }); ``` The `readable.pause()` method has no effect if there is a `'readable'` event listener.
prependListener(
event: "close",
listener: () => void,
): this
prependListener(
event: "data",
listener: (chunk: any) => void,
): this
prependListener(
event: "end",
listener: () => void,
): this
prependListener(
event: "error",
listener: (err: Error) => void,
): this
prependListener(
event: "pause",
listener: () => void,
): this
prependListener(
event: "readable",
listener: () => void,
): this
prependListener(
event: "resume",
listener: () => void,
): this
prependListener(
event: string | symbol,
listener: (...args: any[]) => void,
): this
prependOnceListener(
event: "close",
listener: () => void,
): this
prependOnceListener(
event: "data",
listener: (chunk: any) => void,
): this
prependOnceListener(
event: "end",
listener: () => void,
): this
prependOnceListener(
event: "error",
listener: (err: Error) => void,
): this
prependOnceListener(
event: "pause",
listener: () => void,
): this
prependOnceListener(
event: "readable",
listener: () => void,
): this
prependOnceListener(
event: "resume",
listener: () => void,
): this
prependOnceListener(
event: string | symbol,
listener: (...args: any[]) => void,
): this
push(
chunk: any,
encoding?: BufferEncoding,
): boolean
read(size?: number): any
The `readable.read()` method reads data out of the internal buffer and returns it. If no data is available to be read, `null` is returned. By default, the data is returned as a `Buffer` object unless an encoding has been specified using the `readable.setEncoding()` method or the stream is operating in object mode. The optional `size` argument specifies a specific number of bytes to read. If `size` bytes are not available to be read, `null` will be returned _unless_ the stream has ended, in which case all of the data remaining in the internal buffer will be returned. If the `size` argument is not specified, all of the data contained in the internal buffer will be returned. The `size` argument must be less than or equal to 1 GiB. The `readable.read()` method should only be called on `Readable` streams operating in paused mode. In flowing mode, `readable.read()` is called automatically until the internal buffer is fully drained. ```js const readable = getReadableStreamSomehow(); // 'readable' may be triggered multiple times as data is buffered in readable.on('readable', () => { let chunk; console.log('Stream is readable (new data received in buffer)'); // Use a loop to make sure we read all currently available data while (null !== (chunk = readable.read())) { console.log(`Read ${chunk.length} bytes of data...`); } }); // 'end' will be triggered once when there is no more data available readable.on('end', () => { console.log('Reached end of stream.'); }); ``` Each call to `readable.read()` returns a chunk of data, or `null`. The chunks are not concatenated. A `while` loop is necessary to consume all data currently in the buffer. When reading a large file `.read()` may return `null`, having consumed all buffered content so far, but there is still more data to come not yet buffered. In this case a new `'readable'` event will be emitted when there is more data in the buffer. Finally the `'end'` event will be emitted when there is no more data to come. Therefore to read a file's whole contents from a `readable`, it is necessary to collect chunks across multiple `'readable'` events: ```js const chunks = []; readable.on('readable', () => { let chunk; while (null !== (chunk = readable.read())) { chunks.push(chunk); } }); readable.on('end', () => { const content = chunks.join(''); }); ``` A `Readable` stream in object mode will always return a single item from a call to `readable.read(size)`, regardless of the value of the `size` argument. If the `readable.read()` method returns a chunk of data, a `'data'` event will also be emitted. Calling read after the `'end'` event has been emitted will return `null`. No runtime error will be raised.
reduce<T = any>(
fn: (
previous: any,
data: any,
options?: Pick<ArrayOptions, "signal">,
) => T
,
initial?: undefined,
options?: Pick<ArrayOptions, "signal">,
): Promise<T>
This method calls *fn* on each chunk of the stream in order, passing it the result from the calculation on the previous element. It returns a promise for the final value of the reduction. If no *initial* value is supplied the first chunk of the stream is used as the initial value. If the stream is empty, the promise is rejected with a `TypeError` with the `ERR_INVALID_ARGS` code property. The reducer function iterates the stream element-by-element which means that there is no *concurrency* parameter or parallelism. To perform a reduce concurrently, you can extract the async function to `readable.map` method.
reduce<T = any>(
fn: (
previous: T,
data: any,
options?: Pick<ArrayOptions, "signal">,
) => T
,
initial: T,
options?: Pick<ArrayOptions, "signal">,
): Promise<T>
removeListener(
event: "close",
listener: () => void,
): this
removeListener(
event: "data",
listener: (chunk: any) => void,
): this
removeListener(
event: "end",
listener: () => void,
): this
removeListener(
event: "error",
listener: (err: Error) => void,
): this
removeListener(
event: "pause",
listener: () => void,
): this
removeListener(
event: "readable",
listener: () => void,
): this
removeListener(
event: "resume",
listener: () => void,
): this
removeListener(
event: string | symbol,
listener: (...args: any[]) => void,
): this
resume(): this
The `readable.resume()` method causes an explicitly paused `Readable` stream to resume emitting `'data'` events, switching the stream into flowing mode. The `readable.resume()` method can be used to fully consume the data from a stream without actually processing any of that data: ```js getReadableStreamSomehow() .resume() .on('end', () => { console.log('Reached the end, but did not read anything.'); }); ``` The `readable.resume()` method has no effect if there is a `'readable'` event listener.
setEncoding(encoding: BufferEncoding): this
The `readable.setEncoding()` method sets the character encoding for data read from the `Readable` stream. By default, no encoding is assigned and stream data will be returned as `Buffer` objects. Setting an encoding causes the stream data to be returned as strings of the specified encoding rather than as `Buffer` objects. For instance, calling `readable.setEncoding('utf8')` will cause the output data to be interpreted as UTF-8 data, and passed as strings. Calling `readable.setEncoding('hex')` will cause the data to be encoded in hexadecimal string format. The `Readable` stream will properly handle multi-byte characters delivered through the stream that would otherwise become improperly decoded if simply pulled from the stream as `Buffer` objects. ```js const readable = getReadableStreamSomehow(); readable.setEncoding('utf8'); readable.on('data', (chunk) => { assert.equal(typeof chunk, 'string'); console.log('Got %d characters of string data:', chunk.length); }); ```
some(
fn: (
data: any,
options?: Pick<ArrayOptions, "signal">,
) => boolean | Promise<boolean>
,
options?: ArrayOptions,
): Promise<boolean>
This method is similar to `Array.prototype.some` and calls *fn* on each chunk in the stream until the awaited return value is `true` (or any truthy value). Once an *fn* call on a chunk `await`ed return value is truthy, the stream is destroyed and the promise is fulfilled with `true`. If none of the *fn* calls on the chunks return a truthy value, the promise is fulfilled with `false`.
take(
limit: number,
options?: Pick<ArrayOptions, "signal">,
): Readable
This method returns a new stream with the first *limit* chunks.
toArray(options?: Pick<ArrayOptions, "signal">): Promise<any[]>
This method allows easily obtaining the contents of a stream. As this method reads the entire stream into memory, it negates the benefits of streams. It's intended for interoperability and convenience, not as the primary way to consume streams.
unpipe(destination?: WritableStream): this
The `readable.unpipe()` method detaches a `Writable` stream previously attached using the pipe method. If the `destination` is not specified, then _all_ pipes are detached. If the `destination` is specified, but no pipe is set up for it, then the method does nothing. ```js import fs from 'node:fs'; const readable = getReadableStreamSomehow(); const writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt', // but only for the first second. readable.pipe(writable); setTimeout(() => { console.log('Stop writing to file.txt.'); readable.unpipe(writable); console.log('Manually close the file stream.'); writable.end(); }, 1000); ```
unshift(
chunk: any,
encoding?: BufferEncoding,
): void
Passing `chunk` as `null` signals the end of the stream (EOF) and behaves the same as `readable.push(null)`, after which no more data can be written. The EOF signal is put at the end of the buffer and any buffered data will still be flushed. The `readable.unshift()` method pushes a chunk of data back into the internal buffer. This is useful in certain situations where a stream is being consumed by code that needs to "un-consume" some amount of data that it has optimistically pulled out of the source, so that the data can be passed on to some other party. The `stream.unshift(chunk)` method cannot be called after the `'end'` event has been emitted or a runtime error will be thrown. Developers using `stream.unshift()` often should consider switching to use of a `Transform` stream instead. See the `API for stream implementers` section for more information. ```js // Pull off a header delimited by \n\n. // Use unshift() if we get too much. // Call the callback with (error, header, stream). import { StringDecoder } from 'node:string_decoder'; function parseHeader(stream, callback) { stream.on('error', callback); stream.on('readable', onReadable); const decoder = new StringDecoder('utf8'); let header = ''; function onReadable() { let chunk; while (null !== (chunk = stream.read())) { const str = decoder.write(chunk); if (str.includes('\n\n')) { // Found the header boundary. const split = str.split(/\n\n/); header += split.shift(); const remaining = split.join('\n\n'); const buf = Buffer.from(remaining, 'utf8'); stream.removeListener('error', callback); // Remove the 'readable' listener before unshifting. stream.removeListener('readable', onReadable); if (buf.length) stream.unshift(buf); // Now the body of the message can be read from the stream. callback(null, header, stream); return; } // Still reading the header. header += str; } } } ``` Unlike push, `stream.unshift(chunk)` will not end the reading process by resetting the internal reading state of the stream. This can cause unexpected results if `readable.unshift()` is called during a read (i.e. from within a _read implementation on a custom stream). Following the call to `readable.unshift()` with an immediate push will reset the reading state appropriately, however it is best to simply avoid calling `readable.unshift()` while in the process of performing a read.
wrap(stream: ReadableStream): this
Prior to Node.js 0.10, streams did not implement the entire `node:stream` module API as it is currently defined. (See `Compatibility` for more information.) When using an older Node.js library that emits `'data'` events and has a pause method that is advisory only, the `readable.wrap()` method can be used to create a `Readable` stream that uses the old stream as its data source. It will rarely be necessary to use `readable.wrap()` but the method has been provided as a convenience for interacting with older Node.js applications and libraries. ```js import { OldReader } from './old-api-module.js'; import { Readable } from 'node:stream'; const oreader = new OldReader(); const myReader = new Readable().wrap(oreader); myReader.on('readable', () => { myReader.read(); // etc. }); ```

Static Methods

from(
iterable: Iterable<any> | AsyncIterable<any>,
options?: ReadableOptions,
): Readable
A utility method for creating Readable Streams out of iterators.
isDisturbed(stream: Readable | ReadableStream): boolean
Returns whether the stream has been read from or cancelled.