test.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. "use strict";
  2. require("leaked-handles");
  3. var _fs = _interopRequireDefault(require("fs"));
  4. var _stream = _interopRequireDefault(require("stream"));
  5. var _tap = _interopRequireDefault(require("tap"));
  6. var _ = _interopRequireDefault(require("."));
  7. function _interopRequireDefault(obj) {
  8. return obj && obj.__esModule ? obj : { default: obj };
  9. }
  10. const streamToString = stream =>
  11. new Promise((resolve, reject) => {
  12. let ended = false;
  13. let data = "";
  14. stream
  15. .on("error", reject)
  16. .on("data", chunk => {
  17. if (ended) throw new Error("`data` emitted after `end`");
  18. data += chunk;
  19. })
  20. .on("end", () => {
  21. ended = true;
  22. resolve(data);
  23. });
  24. });
  25. const waitForBytesWritten = (stream, bytes, resolve) => {
  26. if (stream.bytesWritten >= bytes) {
  27. setImmediate(resolve);
  28. return;
  29. }
  30. setImmediate(() => waitForBytesWritten(stream, bytes, resolve));
  31. };
  32. _tap.default.test("Data from a complete stream.", async t => {
  33. let data = "";
  34. const source = new _stream.default.Readable({
  35. read() {}
  36. });
  37. const chunk1 = "1".repeat(10);
  38. source.push(chunk1);
  39. source.push(null);
  40. data += chunk1;
  41. let capacitor1 = new _.default();
  42. t.strictSame(
  43. capacitor1._readStreams.size,
  44. 0,
  45. "should start with 0 read streams"
  46. );
  47. source.pipe(capacitor1);
  48. const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
  49. t.strictSame(
  50. capacitor1._readStreams.size,
  51. 1,
  52. "should attach a new read stream before receiving data"
  53. );
  54. const result = await streamToString(capacitor1Stream1);
  55. t.sameStrict(result, data, "should stream all data");
  56. t.sameStrict(
  57. capacitor1._readStreams.size,
  58. 0,
  59. "should no longer have any attacheds read streams"
  60. );
  61. });
  62. _tap.default.test(
  63. "Data from an open stream, 1 chunk, no read streams.",
  64. async t => {
  65. let data = "";
  66. const source = new _stream.default.Readable({
  67. read() {}
  68. });
  69. let capacitor1 = new _.default();
  70. t.strictSame(
  71. capacitor1._readStreams.size,
  72. 0,
  73. "should start with 0 read streams"
  74. );
  75. source.pipe(capacitor1);
  76. const chunk1 = "1".repeat(10);
  77. source.push(chunk1);
  78. source.push(null);
  79. data += chunk1;
  80. const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
  81. t.strictSame(
  82. capacitor1._readStreams.size,
  83. 1,
  84. "should attach a new read stream before receiving data"
  85. );
  86. const result = await streamToString(capacitor1Stream1);
  87. t.sameStrict(result, data, "should stream all data");
  88. t.sameStrict(
  89. capacitor1._readStreams.size,
  90. 0,
  91. "should no longer have any attacheds read streams"
  92. );
  93. }
  94. );
  95. _tap.default.test(
  96. "Data from an open stream, 1 chunk, 1 read stream.",
  97. async t => {
  98. let data = "";
  99. const source = new _stream.default.Readable({
  100. read() {}
  101. });
  102. let capacitor1 = new _.default();
  103. t.strictSame(
  104. capacitor1._readStreams.size,
  105. 0,
  106. "should start with 0 read streams"
  107. );
  108. source.pipe(capacitor1);
  109. const capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
  110. t.strictSame(
  111. capacitor1._readStreams.size,
  112. 1,
  113. "should attach a new read stream before receiving data"
  114. );
  115. const chunk1 = "1".repeat(10);
  116. source.push(chunk1);
  117. source.push(null);
  118. data += chunk1;
  119. const result = await streamToString(capacitor1Stream1);
  120. t.sameStrict(result, data, "should stream all data");
  121. t.sameStrict(
  122. capacitor1._readStreams.size,
  123. 0,
  124. "should no longer have any attacheds read streams"
  125. );
  126. }
  127. );
  128. const withChunkSize = size =>
  129. _tap.default.test(`--- with chunk size: ${size}`, async t => {
  130. let data = "";
  131. const source = new _stream.default.Readable({
  132. read() {}
  133. });
  134. let capacitor1;
  135. let capacitor1Stream1;
  136. await t.test(
  137. "can add a read stream before any data has been written",
  138. async t => {
  139. capacitor1 = new _.default();
  140. t.strictSame(
  141. capacitor1._readStreams.size,
  142. 0,
  143. "should start with 0 read streams"
  144. );
  145. capacitor1Stream1 = capacitor1.createReadStream("capacitor1Stream1");
  146. t.strictSame(
  147. capacitor1._readStreams.size,
  148. 1,
  149. "should attach a new read stream before receiving data"
  150. );
  151. await t.test("creates a temporary file", async t => {
  152. t.plan(3);
  153. await new Promise(resolve => capacitor1.on("open", resolve));
  154. t.type(
  155. capacitor1.path,
  156. "string",
  157. "capacitor1.path should be a string"
  158. );
  159. t.type(capacitor1.fd, "number", "capacitor1.fd should be a number");
  160. t.ok(_fs.default.existsSync(capacitor1.path), "creates a temp file");
  161. });
  162. }
  163. );
  164. source.pipe(capacitor1);
  165. const chunk1 = "1".repeat(size);
  166. source.push(chunk1);
  167. data += chunk1;
  168. await new Promise(resolve =>
  169. waitForBytesWritten(capacitor1, size, resolve)
  170. );
  171. let capacitor1Stream2;
  172. t.test("can add a read stream after data has been written", t => {
  173. capacitor1Stream2 = capacitor1.createReadStream("capacitor1Stream2");
  174. t.strictSame(
  175. capacitor1._readStreams.size,
  176. 2,
  177. "should attach a new read stream after first write"
  178. );
  179. t.end();
  180. });
  181. const writeEventBytesWritten = new Promise(resolve => {
  182. capacitor1.once("write", () => {
  183. resolve(capacitor1.bytesWritten);
  184. });
  185. });
  186. const chunk2 = "2".repeat(size);
  187. source.push(chunk2);
  188. data += chunk2;
  189. await new Promise(resolve =>
  190. waitForBytesWritten(capacitor1, 2 * size, resolve)
  191. );
  192. await t.test("write event emitted after bytes are written", async t => {
  193. t.strictSame(
  194. await writeEventBytesWritten,
  195. 2 * size,
  196. "bytesWritten should include new chunk"
  197. );
  198. });
  199. const finished = new Promise(resolve => capacitor1.once("finish", resolve));
  200. source.push(null);
  201. await finished;
  202. let capacitor1Stream3;
  203. let capacitor1Stream4;
  204. t.test("can create a read stream after the source has ended", t => {
  205. capacitor1Stream3 = capacitor1.createReadStream("capacitor1Stream3");
  206. capacitor1Stream4 = capacitor1.createReadStream("capacitor1Stream4");
  207. t.strictSame(
  208. capacitor1._readStreams.size,
  209. 4,
  210. "should attach new read streams after end"
  211. );
  212. t.end();
  213. });
  214. await t.test("streams complete data to a read stream", async t => {
  215. const result2 = await streamToString(capacitor1Stream2);
  216. t.strictSame(
  217. capacitor1Stream2.ended,
  218. true,
  219. "should mark read stream as ended"
  220. );
  221. t.strictSame(result2, data, "should stream complete data");
  222. const result4 = await streamToString(capacitor1Stream4);
  223. t.strictSame(
  224. capacitor1Stream4.ended,
  225. true,
  226. "should mark read stream as ended"
  227. );
  228. t.strictSame(result4, data, "should stream complete data");
  229. t.strictSame(
  230. capacitor1._readStreams.size,
  231. 2,
  232. "should detach an ended read stream"
  233. );
  234. });
  235. await t.test("can destroy a read stream", async t => {
  236. await new Promise(resolve => {
  237. capacitor1Stream1.once("error", resolve);
  238. capacitor1Stream1.destroy(new Error("test"));
  239. });
  240. t.strictSame(
  241. capacitor1Stream1.destroyed,
  242. true,
  243. "should mark read stream as destroyed"
  244. );
  245. t.type(
  246. capacitor1Stream1.error,
  247. Error,
  248. "should store an error on read stream"
  249. );
  250. t.strictSame(
  251. capacitor1._readStreams.size,
  252. 1,
  253. "should detach a destroyed read stream"
  254. );
  255. });
  256. t.test("can delay destruction of a capacitor", t => {
  257. capacitor1.destroy(null);
  258. t.strictSame(
  259. capacitor1.destroyed,
  260. false,
  261. "should not destroy while read streams exist"
  262. );
  263. t.strictSame(
  264. capacitor1._destroyPending,
  265. true,
  266. "should mark for future destruction"
  267. );
  268. t.end();
  269. });
  270. await t.test("destroys capacitor once no read streams exist", async t => {
  271. const readStreamDestroyed = new Promise(resolve =>
  272. capacitor1Stream3.on("close", resolve)
  273. );
  274. const capacitorDestroyed = new Promise(resolve =>
  275. capacitor1.on("close", resolve)
  276. );
  277. capacitor1Stream3.destroy(null);
  278. await readStreamDestroyed;
  279. t.strictSame(
  280. capacitor1Stream3.destroyed,
  281. true,
  282. "should mark read stream as destroyed"
  283. );
  284. t.strictSame(
  285. capacitor1Stream3.error,
  286. null,
  287. "should not store an error on read stream"
  288. );
  289. t.strictSame(
  290. capacitor1._readStreams.size,
  291. 0,
  292. "should detach a destroyed read stream"
  293. );
  294. await capacitorDestroyed;
  295. t.strictSame(capacitor1.closed, true, "should mark capacitor as closed");
  296. t.strictSame(capacitor1.fd, null, "should set fd to null");
  297. t.strictSame(
  298. capacitor1.destroyed,
  299. true,
  300. "should mark capacitor as destroyed"
  301. );
  302. t.notOk(_fs.default.existsSync(capacitor1.path), "removes its temp file");
  303. });
  304. t.test("cannot create a read stream after destruction", t => {
  305. try {
  306. capacitor1.createReadStream();
  307. } catch (error) {
  308. t.ok(
  309. error instanceof _.ReadAfterDestroyedError,
  310. "should not create a read stream once destroyed"
  311. );
  312. t.end();
  313. }
  314. });
  315. const capacitor2 = new _.default();
  316. const capacitor2Stream1 = capacitor2.createReadStream("capacitor2Stream1");
  317. const capacitor2Stream2 = capacitor2.createReadStream("capacitor2Stream2");
  318. const capacitor2ReadStream1Destroyed = new Promise(resolve =>
  319. capacitor2Stream1.on("close", resolve)
  320. );
  321. const capacitor2Destroyed = new Promise(resolve =>
  322. capacitor2.on("close", resolve)
  323. );
  324. capacitor2Stream1.destroy();
  325. await capacitor2ReadStream1Destroyed;
  326. await t.test("propagates errors to attached read streams", async t => {
  327. capacitor2.destroy();
  328. await new Promise(resolve => setImmediate(resolve));
  329. t.strictSame(
  330. capacitor2Stream2.destroyed,
  331. false,
  332. "should not immediately mark attached read streams as destroyed"
  333. );
  334. capacitor2.destroy(new Error("test"));
  335. await capacitor2Destroyed;
  336. t.type(capacitor2.error, Error, "should store an error on capacitor");
  337. t.strictSame(
  338. capacitor2.destroyed,
  339. true,
  340. "should mark capacitor as destroyed"
  341. );
  342. t.type(
  343. capacitor2Stream2.error,
  344. Error,
  345. "should store an error on attached read streams"
  346. );
  347. t.strictSame(
  348. capacitor2Stream2.destroyed,
  349. true,
  350. "should mark attached read streams as destroyed"
  351. );
  352. t.strictSame(
  353. capacitor2Stream1.error,
  354. null,
  355. "should not store an error on detached read streams"
  356. );
  357. });
  358. });
  359. withChunkSize(10);
  360. withChunkSize(100000);