AxiosTransformStream.js 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. 'use strict';
  2. import stream from 'stream';
  3. import utils from '../utils.js';
  4. const kInternals = Symbol('internals');
  5. class AxiosTransformStream extends stream.Transform{
  6. constructor(options) {
  7. options = utils.toFlatObject(options, {
  8. maxRate: 0,
  9. chunkSize: 64 * 1024,
  10. minChunkSize: 100,
  11. timeWindow: 500,
  12. ticksRate: 2,
  13. samplesCount: 15
  14. }, null, (prop, source) => {
  15. return !utils.isUndefined(source[prop]);
  16. });
  17. super({
  18. readableHighWaterMark: options.chunkSize
  19. });
  20. const internals = this[kInternals] = {
  21. timeWindow: options.timeWindow,
  22. chunkSize: options.chunkSize,
  23. maxRate: options.maxRate,
  24. minChunkSize: options.minChunkSize,
  25. bytesSeen: 0,
  26. isCaptured: false,
  27. notifiedBytesLoaded: 0,
  28. ts: Date.now(),
  29. bytes: 0,
  30. onReadCallback: null
  31. };
  32. this.on('newListener', event => {
  33. if (event === 'progress') {
  34. if (!internals.isCaptured) {
  35. internals.isCaptured = true;
  36. }
  37. }
  38. });
  39. }
  40. _read(size) {
  41. const internals = this[kInternals];
  42. if (internals.onReadCallback) {
  43. internals.onReadCallback();
  44. }
  45. return super._read(size);
  46. }
  47. _transform(chunk, encoding, callback) {
  48. const internals = this[kInternals];
  49. const maxRate = internals.maxRate;
  50. const readableHighWaterMark = this.readableHighWaterMark;
  51. const timeWindow = internals.timeWindow;
  52. const divider = 1000 / timeWindow;
  53. const bytesThreshold = (maxRate / divider);
  54. const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
  55. const pushChunk = (_chunk, _callback) => {
  56. const bytes = Buffer.byteLength(_chunk);
  57. internals.bytesSeen += bytes;
  58. internals.bytes += bytes;
  59. internals.isCaptured && this.emit('progress', internals.bytesSeen);
  60. if (this.push(_chunk)) {
  61. process.nextTick(_callback);
  62. } else {
  63. internals.onReadCallback = () => {
  64. internals.onReadCallback = null;
  65. process.nextTick(_callback);
  66. };
  67. }
  68. }
  69. const transformChunk = (_chunk, _callback) => {
  70. const chunkSize = Buffer.byteLength(_chunk);
  71. let chunkRemainder = null;
  72. let maxChunkSize = readableHighWaterMark;
  73. let bytesLeft;
  74. let passed = 0;
  75. if (maxRate) {
  76. const now = Date.now();
  77. if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
  78. internals.ts = now;
  79. bytesLeft = bytesThreshold - internals.bytes;
  80. internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
  81. passed = 0;
  82. }
  83. bytesLeft = bytesThreshold - internals.bytes;
  84. }
  85. if (maxRate) {
  86. if (bytesLeft <= 0) {
  87. // next time window
  88. return setTimeout(() => {
  89. _callback(null, _chunk);
  90. }, timeWindow - passed);
  91. }
  92. if (bytesLeft < maxChunkSize) {
  93. maxChunkSize = bytesLeft;
  94. }
  95. }
  96. if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
  97. chunkRemainder = _chunk.subarray(maxChunkSize);
  98. _chunk = _chunk.subarray(0, maxChunkSize);
  99. }
  100. pushChunk(_chunk, chunkRemainder ? () => {
  101. process.nextTick(_callback, null, chunkRemainder);
  102. } : _callback);
  103. };
  104. transformChunk(chunk, function transformNextChunk(err, _chunk) {
  105. if (err) {
  106. return callback(err);
  107. }
  108. if (_chunk) {
  109. transformChunk(_chunk, transformNextChunk);
  110. } else {
  111. callback(null);
  112. }
  113. });
  114. }
  115. }
  116. export default AxiosTransformStream;