trackStream.js 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. export const streamChunk = function* (chunk, chunkSize) {
  2. let len = chunk.byteLength;
  3. if (!chunkSize || len < chunkSize) {
  4. yield chunk;
  5. return;
  6. }
  7. let pos = 0;
  8. let end;
  9. while (pos < len) {
  10. end = pos + chunkSize;
  11. yield chunk.slice(pos, end);
  12. pos = end;
  13. }
  14. }
  15. export const readBytes = async function* (iterable, chunkSize) {
  16. for await (const chunk of readStream(iterable)) {
  17. yield* streamChunk(chunk, chunkSize);
  18. }
  19. }
  20. const readStream = async function* (stream) {
  21. if (stream[Symbol.asyncIterator]) {
  22. yield* stream;
  23. return;
  24. }
  25. const reader = stream.getReader();
  26. try {
  27. for (;;) {
  28. const {done, value} = await reader.read();
  29. if (done) {
  30. break;
  31. }
  32. yield value;
  33. }
  34. } finally {
  35. await reader.cancel();
  36. }
  37. }
  38. export const trackStream = (stream, chunkSize, onProgress, onFinish) => {
  39. const iterator = readBytes(stream, chunkSize);
  40. let bytes = 0;
  41. let done;
  42. let _onFinish = (e) => {
  43. if (!done) {
  44. done = true;
  45. onFinish && onFinish(e);
  46. }
  47. }
  48. return new ReadableStream({
  49. async pull(controller) {
  50. try {
  51. const {done, value} = await iterator.next();
  52. if (done) {
  53. _onFinish();
  54. controller.close();
  55. return;
  56. }
  57. let len = value.byteLength;
  58. if (onProgress) {
  59. let loadedBytes = bytes += len;
  60. onProgress(loadedBytes);
  61. }
  62. controller.enqueue(new Uint8Array(value));
  63. } catch (err) {
  64. _onFinish(err);
  65. throw err;
  66. }
  67. },
  68. cancel(reason) {
  69. _onFinish(reason);
  70. return iterator.return();
  71. }
  72. }, {
  73. highWaterMark: 2
  74. })
  75. }