123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- "use strict";
- require("leaked-handles");
- var _fs = _interopRequireDefault(require("fs"));
- var _stream = _interopRequireDefault(require("stream"));
- var _tap = _interopRequireDefault(require("tap"));
- var _ = _interopRequireDefault(require("."));
- function _interopRequireDefault(obj) {
- return obj && obj.__esModule ? obj : { default: obj };
- }
- const streamToString = stream =>
- new Promise((resolve, reject) => {
- let ended = false;
- let data = "";
- stream
- .on("error", reject)
- .on("data", chunk => {
- if (ended) throw new Error("`data` emitted after `end`");
- data += chunk;
- })
- .on("end", () => {
- ended = true;
- resolve(data);
- });
- });
- const waitForBytesWritten = (stream, bytes, resolve) => {
- if (stream.bytesWritten >= bytes) {
- setImmediate(resolve);
- return;
- }
- setImmediate(() => waitForBytesWritten(stream, bytes, resolve));
- };
- _tap.default.test("Data from a complete stream.", async t => {
- let data = "";
- const source = new _stream.default.Readable({
- read() {}
- });
- const chunk1 = "1".repeat(10);
- source.push(chunk1);
- source.push(null);
- data += chunk1;
- let capacitor1 = new _.default();
- t.strictSame(
- capacitor1._readStreams.size,
- 0,
- "should start with 0 read streams"
- );
- source.pipe(capacitor1);
- const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
- t.strictSame(
- capacitor1._readStreams.size,
- 1,
- "should attach a new read stream before receiving data"
- );
- const result = await streamToString(capacitor1Stream1);
- t.sameStrict(result, data, "should stream all data");
- t.sameStrict(
- capacitor1._readStreams.size,
- 0,
- "should no longer have any attacheds read streams"
- );
- });
- _tap.default.test(
- "Data from an open stream, 1 chunk, no read streams.",
- async t => {
- let data = "";
- const source = new _stream.default.Readable({
- read() {}
- });
- let capacitor1 = new _.default();
- t.strictSame(
- capacitor1._readStreams.size,
- 0,
- "should start with 0 read streams"
- );
- source.pipe(capacitor1);
- const chunk1 = "1".repeat(10);
- source.push(chunk1);
- source.push(null);
- data += chunk1;
- const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
- t.strictSame(
- capacitor1._readStreams.size,
- 1,
- "should attach a new read stream before receiving data"
- );
- const result = await streamToString(capacitor1Stream1);
- t.sameStrict(result, data, "should stream all data");
- t.sameStrict(
- capacitor1._readStreams.size,
- 0,
- "should no longer have any attacheds read streams"
- );
- }
- );
- _tap.default.test(
- "Data from an open stream, 1 chunk, 1 read stream.",
- async t => {
- let data = "";
- const source = new _stream.default.Readable({
- read() {}
- });
- let capacitor1 = new _.default();
- t.strictSame(
- capacitor1._readStreams.size,
- 0,
- "should start with 0 read streams"
- );
- source.pipe(capacitor1);
- const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
- t.strictSame(
- capacitor1._readStreams.size,
- 1,
- "should attach a new read stream before receiving data"
- );
- const chunk1 = "1".repeat(10);
- source.push(chunk1);
- source.push(null);
- data += chunk1;
- const result = await streamToString(capacitor1Stream1);
- t.sameStrict(result, data, "should stream all data");
- t.sameStrict(
- capacitor1._readStreams.size,
- 0,
- "should no longer have any attacheds read streams"
- );
- }
- );
- const withChunkSize = size =>
- _tap.default.test(`--- with chunk size: ${size}`, async t => {
- let data = "";
- const source = new _stream.default.Readable({
- read() {}
- });
- let capacitor1;
- let capacitor1Stream1;
- await t.test(
- "can add a read stream before any data has been written",
- async t => {
- capacitor1 = new _.default();
- t.strictSame(
- capacitor1._readStreams.size,
- 0,
- "should start with 0 read streams"
- );
- capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
- t.strictSame(
- capacitor1._readStreams.size,
- 1,
- "should attach a new read stream before receiving data"
- );
- await t.test("creates a temporary file", async t => {
- t.plan(3);
- await new Promise(resolve => capacitor1.on("open", resolve));
- t.type(
- capacitor1.path,
- "string",
- "capacitor1.path should be a string"
- );
- t.type(capacitor1.fd, "number", "capacitor1.fd should be a number");
- t.ok(_fs.default.existsSync(capacitor1.path), "creates a temp file");
- });
- }
- );
- source.pipe(capacitor1);
- const chunk1 = "1".repeat(size);
- source.push(chunk1);
- data += chunk1;
- await new Promise(resolve =>
- waitForBytesWritten(capacitor1, size, resolve)
- );
- let capacitor1Stream2;
- t.test("can add a read stream after data has been written", t => {
- capacitor1Stream2 = capacitor1.createReadStream("capacitor1Stream2");
- t.strictSame(
- capacitor1._readStreams.size,
- 2,
- "should attach a new read stream after first write"
- );
- t.end();
- });
- const writeEventBytesWritten = new Promise(resolve => {
- capacitor1.once("write", () => {
- resolve(capacitor1.bytesWritten);
- });
- });
- const chunk2 = "2".repeat(size);
- source.push(chunk2);
- data += chunk2;
- await new Promise(resolve =>
- waitForBytesWritten(capacitor1, 2 * size, resolve)
- );
- await t.test("write event emitted after bytes are written", async t => {
- t.strictSame(
- await writeEventBytesWritten,
- 2 * size,
- "bytesWritten should include new chunk"
- );
- });
- const finished = new Promise(resolve => capacitor1.once("finish", resolve));
- source.push(null);
- await finished;
- let capacitor1Stream3;
- let capacitor1Stream4;
- t.test("can create a read stream after the source has ended", t => {
- capacitor1Stream3 = capacitor1.createReadStream("capacitor1Stream3");
- capacitor1Stream4 = capacitor1.createReadStream("capacitor1Stream4");
- t.strictSame(
- capacitor1._readStreams.size,
- 4,
- "should attach new read streams after end"
- );
- t.end();
- });
- await t.test("streams complete data to a read stream", async t => {
- const result2 = await streamToString(capacitor1Stream2);
- t.strictSame(
- capacitor1Stream2.ended,
- true,
- "should mark read stream as ended"
- );
- t.strictSame(result2, data, "should stream complete data");
- const result4 = await streamToString(capacitor1Stream4);
- t.strictSame(
- capacitor1Stream4.ended,
- true,
- "should mark read stream as ended"
- );
- t.strictSame(result4, data, "should stream complete data");
- t.strictSame(
- capacitor1._readStreams.size,
- 2,
- "should detach an ended read stream"
- );
- });
- await t.test("can destroy a read stream", async t => {
- await new Promise(resolve => {
- capacitor1Stream1.once("error", resolve);
- capacitor1Stream1.destroy(new Error("test"));
- });
- t.strictSame(
- capacitor1Stream1.destroyed,
- true,
- "should mark read stream as destroyed"
- );
- t.type(
- capacitor1Stream1.error,
- Error,
- "should store an error on read stream"
- );
- t.strictSame(
- capacitor1._readStreams.size,
- 1,
- "should detach a destroyed read stream"
- );
- });
- t.test("can delay destruction of a capacitor", t => {
- capacitor1.destroy(null);
- t.strictSame(
- capacitor1.destroyed,
- false,
- "should not destroy while read streams exist"
- );
- t.strictSame(
- capacitor1._destroyPending,
- true,
- "should mark for future destruction"
- );
- t.end();
- });
- await t.test("destroys capacitor once no read streams exist", async t => {
- const readStreamDestroyed = new Promise(resolve =>
- capacitor1Stream3.on("close", resolve)
- );
- const capacitorDestroyed = new Promise(resolve =>
- capacitor1.on("close", resolve)
- );
- capacitor1Stream3.destroy(null);
- await readStreamDestroyed;
- t.strictSame(
- capacitor1Stream3.destroyed,
- true,
- "should mark read stream as destroyed"
- );
- t.strictSame(
- capacitor1Stream3.error,
- null,
- "should not store an error on read stream"
- );
- t.strictSame(
- capacitor1._readStreams.size,
- 0,
- "should detach a destroyed read stream"
- );
- await capacitorDestroyed;
- t.strictSame(capacitor1.closed, true, "should mark capacitor as closed");
- t.strictSame(capacitor1.fd, null, "should set fd to null");
- t.strictSame(
- capacitor1.destroyed,
- true,
- "should mark capacitor as destroyed"
- );
- t.notOk(_fs.default.existsSync(capacitor1.path), "removes its temp file");
- });
- t.test("cannot create a read stream after destruction", t => {
- try {
- capacitor1.createReadStream();
- } catch (error) {
- t.ok(
- error instanceof _.ReadAfterDestroyedError,
- "should not create a read stream once destroyed"
- );
- t.end();
- }
- });
- const capacitor2 = new _.default();
- const capacitor2Stream1 = capacitor2.createReadStream("capacitor2Stream1");
- const capacitor2Stream2 = capacitor2.createReadStream("capacitor2Stream2");
- const capacitor2ReadStream1Destroyed = new Promise(resolve =>
- capacitor2Stream1.on("close", resolve)
- );
- const capacitor2Destroyed = new Promise(resolve =>
- capacitor2.on("close", resolve)
- );
- capacitor2Stream1.destroy();
- await capacitor2ReadStream1Destroyed;
- await t.test("propagates errors to attached read streams", async t => {
- capacitor2.destroy();
- await new Promise(resolve => setImmediate(resolve));
- t.strictSame(
- capacitor2Stream2.destroyed,
- false,
- "should not immediately mark attached read streams as destroyed"
- );
- capacitor2.destroy(new Error("test"));
- await capacitor2Destroyed;
- t.type(capacitor2.error, Error, "should store an error on capacitor");
- t.strictSame(
- capacitor2.destroyed,
- true,
- "should mark capacitor as destroyed"
- );
- t.type(
- capacitor2Stream2.error,
- Error,
- "should store an error on attached read streams"
- );
- t.strictSame(
- capacitor2Stream2.destroyed,
- true,
- "should mark attached read streams as destroyed"
- );
- t.strictSame(
- capacitor2Stream1.error,
- null,
- "should not store an error on detached read streams"
- );
- });
- });
- withChunkSize(10);
- withChunkSize(100000);
|