123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- 'use strict';
- import stream from 'stream';
- import utils from '../utils.js';
- const kInternals = Symbol('internals');
- class AxiosTransformStream extends stream.Transform{
- constructor(options) {
- options = utils.toFlatObject(options, {
- maxRate: 0,
- chunkSize: 64 * 1024,
- minChunkSize: 100,
- timeWindow: 500,
- ticksRate: 2,
- samplesCount: 15
- }, null, (prop, source) => {
- return !utils.isUndefined(source[prop]);
- });
- super({
- readableHighWaterMark: options.chunkSize
- });
- const internals = this[kInternals] = {
- timeWindow: options.timeWindow,
- chunkSize: options.chunkSize,
- maxRate: options.maxRate,
- minChunkSize: options.minChunkSize,
- bytesSeen: 0,
- isCaptured: false,
- notifiedBytesLoaded: 0,
- ts: Date.now(),
- bytes: 0,
- onReadCallback: null
- };
- this.on('newListener', event => {
- if (event === 'progress') {
- if (!internals.isCaptured) {
- internals.isCaptured = true;
- }
- }
- });
- }
- _read(size) {
- const internals = this[kInternals];
- if (internals.onReadCallback) {
- internals.onReadCallback();
- }
- return super._read(size);
- }
- _transform(chunk, encoding, callback) {
- const internals = this[kInternals];
- const maxRate = internals.maxRate;
- const readableHighWaterMark = this.readableHighWaterMark;
- const timeWindow = internals.timeWindow;
- const divider = 1000 / timeWindow;
- const bytesThreshold = (maxRate / divider);
- const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;
- const pushChunk = (_chunk, _callback) => {
- const bytes = Buffer.byteLength(_chunk);
- internals.bytesSeen += bytes;
- internals.bytes += bytes;
- internals.isCaptured && this.emit('progress', internals.bytesSeen);
- if (this.push(_chunk)) {
- process.nextTick(_callback);
- } else {
- internals.onReadCallback = () => {
- internals.onReadCallback = null;
- process.nextTick(_callback);
- };
- }
- }
- const transformChunk = (_chunk, _callback) => {
- const chunkSize = Buffer.byteLength(_chunk);
- let chunkRemainder = null;
- let maxChunkSize = readableHighWaterMark;
- let bytesLeft;
- let passed = 0;
- if (maxRate) {
- const now = Date.now();
- if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {
- internals.ts = now;
- bytesLeft = bytesThreshold - internals.bytes;
- internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;
- passed = 0;
- }
- bytesLeft = bytesThreshold - internals.bytes;
- }
- if (maxRate) {
- if (bytesLeft <= 0) {
- // next time window
- return setTimeout(() => {
- _callback(null, _chunk);
- }, timeWindow - passed);
- }
- if (bytesLeft < maxChunkSize) {
- maxChunkSize = bytesLeft;
- }
- }
- if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {
- chunkRemainder = _chunk.subarray(maxChunkSize);
- _chunk = _chunk.subarray(0, maxChunkSize);
- }
- pushChunk(_chunk, chunkRemainder ? () => {
- process.nextTick(_callback, null, chunkRemainder);
- } : _callback);
- };
- transformChunk(chunk, function transformNextChunk(err, _chunk) {
- if (err) {
- return callback(err);
- }
- if (_chunk) {
- transformChunk(_chunk, transformNextChunk);
- } else {
- callback(null);
- }
- });
- }
- }
- export default AxiosTransformStream;
|