delayed_stream.js 2.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. var Stream = require('stream').Stream;
  2. var util = require('util');
  3. module.exports = DelayedStream;
  4. function DelayedStream() {
  5. this.source = null;
  6. this.dataSize = 0;
  7. this.maxDataSize = 1024 * 1024;
  8. this.pauseStream = true;
  9. this._maxDataSizeExceeded = false;
  10. this._released = false;
  11. this._bufferedEvents = [];
  12. }
  13. util.inherits(DelayedStream, Stream);
  14. DelayedStream.create = function(source, options) {
  15. var delayedStream = new this();
  16. options = options || {};
  17. for (var option in options) {
  18. delayedStream[option] = options[option];
  19. }
  20. delayedStream.source = source;
  21. var realEmit = source.emit;
  22. source.emit = function() {
  23. delayedStream._handleEmit(arguments);
  24. return realEmit.apply(source, arguments);
  25. };
  26. source.on('error', function() {});
  27. if (delayedStream.pauseStream) {
  28. source.pause();
  29. }
  30. return delayedStream;
  31. };
  32. Object.defineProperty(DelayedStream.prototype, 'readable', {
  33. configurable: true,
  34. enumerable: true,
  35. get: function() {
  36. return this.source.readable;
  37. }
  38. });
  39. DelayedStream.prototype.setEncoding = function() {
  40. return this.source.setEncoding.apply(this.source, arguments);
  41. };
  42. DelayedStream.prototype.resume = function() {
  43. if (!this._released) {
  44. this.release();
  45. }
  46. this.source.resume();
  47. };
  48. DelayedStream.prototype.pause = function() {
  49. this.source.pause();
  50. };
  51. DelayedStream.prototype.release = function() {
  52. this._released = true;
  53. this._bufferedEvents.forEach(function(args) {
  54. this.emit.apply(this, args);
  55. }.bind(this));
  56. this._bufferedEvents = [];
  57. };
  58. DelayedStream.prototype.pipe = function() {
  59. var r = Stream.prototype.pipe.apply(this, arguments);
  60. this.resume();
  61. return r;
  62. };
  63. DelayedStream.prototype._handleEmit = function(args) {
  64. if (this._released) {
  65. this.emit.apply(this, args);
  66. return;
  67. }
  68. if (args[0] === 'data') {
  69. this.dataSize += args[1].length;
  70. this._checkIfMaxDataSizeExceeded();
  71. }
  72. this._bufferedEvents.push(args);
  73. };
  74. DelayedStream.prototype._checkIfMaxDataSizeExceeded = function() {
  75. if (this._maxDataSizeExceeded) {
  76. return;
  77. }
  78. if (this.dataSize <= this.maxDataSize) {
  79. return;
  80. }
  81. this._maxDataSizeExceeded = true;
  82. var message =
  83. 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'
  84. this.emit('error', new Error(message));
  85. };