| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 | 'use strict';import stream from 'stream';import utils from '../utils.js';const kInternals = Symbol('internals');class AxiosTransformStream extends stream.Transform{  constructor(options) {    options = utils.toFlatObject(options, {      maxRate: 0,      chunkSize: 64 * 1024,      minChunkSize: 100,      timeWindow: 500,      ticksRate: 2,      samplesCount: 15    }, null, (prop, source) => {      return !utils.isUndefined(source[prop]);    });    super({      readableHighWaterMark: options.chunkSize    });    const internals = this[kInternals] = {      timeWindow: options.timeWindow,      chunkSize: options.chunkSize,      maxRate: options.maxRate,      minChunkSize: options.minChunkSize,      bytesSeen: 0,      isCaptured: false,      notifiedBytesLoaded: 0,      ts: Date.now(),      bytes: 0,      onReadCallback: null    };    this.on('newListener', event => {      if (event === 'progress') {        if (!internals.isCaptured) {          internals.isCaptured = true;        }      }    });  }  _read(size) {    const internals = this[kInternals];    if (internals.onReadCallback) {      internals.onReadCallback();    }    return super._read(size);  }  _transform(chunk, encoding, callback) {    const internals = this[kInternals];    const maxRate = internals.maxRate;    const readableHighWaterMark = this.readableHighWaterMark;    const timeWindow = internals.timeWindow;    const divider = 1000 / timeWindow;    const bytesThreshold = (maxRate / divider);    const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0;    const pushChunk = (_chunk, _callback) => {      const bytes = Buffer.byteLength(_chunk);      internals.bytesSeen += bytes;      internals.bytes += bytes;      internals.isCaptured && this.emit('progress', internals.bytesSeen);      if (this.push(_chunk)) {        process.nextTick(_callback);      } else {        internals.onReadCallback = () => {          internals.onReadCallback = null;          process.nextTick(_callback);        };      }    }    const transformChunk = (_chunk, _callback) => {      const chunkSize = Buffer.byteLength(_chunk);      let chunkRemainder = null;      let maxChunkSize = readableHighWaterMark;      let bytesLeft;      let passed = 0;      if (maxRate) {        const now = Date.now();        if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) {          internals.ts = now;          bytesLeft = bytesThreshold - internals.bytes;          internals.bytes = bytesLeft < 0 ? -bytesLeft : 0;          passed = 0;        }        bytesLeft = bytesThreshold - internals.bytes;      }      if (maxRate) {        if (bytesLeft <= 0) {          // next time window          return setTimeout(() => {            _callback(null, _chunk);          }, timeWindow - passed);        }        if (bytesLeft < maxChunkSize) {          maxChunkSize = bytesLeft;        }      }      if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) {        chunkRemainder = _chunk.subarray(maxChunkSize);        _chunk = _chunk.subarray(0, maxChunkSize);      }      pushChunk(_chunk, chunkRemainder ? () => {        process.nextTick(_callback, null, chunkRemainder);      } : _callback);    };    transformChunk(chunk, function transformNextChunk(err, _chunk) {      if (err) {        return callback(err);      }      if (_chunk) {        transformChunk(_chunk, transformNextChunk);      } else {        callback(null);      }    });  }}export default AxiosTransformStream;
 |