streams.js 4.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. 'use strict';
  2. /**
  3. Streams in a WebSocket connection
  4. ---------------------------------
  5. We model a WebSocket as two duplex streams: one stream is for the wire protocol
  6. over an I/O socket, and the other is for incoming/outgoing messages.
  7. +----------+ +---------+ +----------+
  8. [1] write(chunk) -->| ~~~~~~~~ +----->| parse() +----->| ~~~~~~~~ +--> emit('data') [2]
  9. | | +----+----+ | |
  10. | | | | |
  11. | IO | | [5] | Messages |
  12. | | V | |
  13. | | +---------+ | |
  14. [4] emit('data') <--+ ~~~~~~~~ |<-----+ frame() |<-----+ ~~~~~~~~ |<-- write(chunk) [3]
  15. +----------+ +---------+ +----------+
  16. Message transfer in each direction is simple: IO receives a byte stream [1] and
  17. sends this stream for parsing. The parser will periodically emit a complete
  18. message text on the Messages stream [2]. Similarly, when messages are written
  19. to the Messages stream [3], they are framed using the WebSocket wire format and
  20. emitted via IO [4].
  21. There is a feedback loop via [5] since some input from [1] will be things like
  22. ping, pong and close frames. In these cases the protocol responds by emitting
  23. responses directly back to [4] rather than emitting messages via [2].
  24. For the purposes of flow control, we consider the sources of each Readable
  25. stream to be as follows:
  26. * [2] receives input from [1]
  27. * [4] receives input from [1] and [3]
  28. The classes below express the relationships described above without prescribing
  29. anything about how parse() and frame() work, other than assuming they emit
  30. 'data' events to the IO and Messages streams. They will work with any protocol
  31. driver having these two methods.
  32. **/
  33. var Stream = require('stream').Stream,
  34. util = require('util');
  35. var IO = function(driver) {
  36. this.readable = this.writable = true;
  37. this._paused = false;
  38. this._driver = driver;
  39. };
  40. util.inherits(IO, Stream);
  41. // The IO pause() and resume() methods will be called when the socket we are
  42. // piping to gets backed up and drains. Since IO output [4] comes from IO input
  43. // [1] and Messages input [3], we need to tell both of those to return false
  44. // from write() when this stream is paused.
  45. IO.prototype.pause = function() {
  46. this._paused = true;
  47. this._driver.messages._paused = true;
  48. };
  49. IO.prototype.resume = function() {
  50. this._paused = false;
  51. this.emit('drain');
  52. var messages = this._driver.messages;
  53. messages._paused = false;
  54. messages.emit('drain');
  55. };
  56. // When we receive input from a socket, send it to the parser and tell the
  57. // source whether to back off.
  58. IO.prototype.write = function(chunk) {
  59. if (!this.writable) return false;
  60. this._driver.parse(chunk);
  61. return !this._paused;
  62. };
  63. // The IO end() method will be called when the socket piping into it emits
  64. // 'close' or 'end', i.e. the socket is closed. In this situation the Messages
  65. // stream will not emit any more data so we emit 'end'.
  66. IO.prototype.end = function(chunk) {
  67. if (!this.writable) return;
  68. if (chunk !== undefined) this.write(chunk);
  69. this.writable = false;
  70. var messages = this._driver.messages;
  71. if (messages.readable) {
  72. messages.readable = messages.writable = false;
  73. messages.emit('end');
  74. }
  75. };
  76. IO.prototype.destroy = function() {
  77. this.end();
  78. };
  79. var Messages = function(driver) {
  80. this.readable = this.writable = true;
  81. this._paused = false;
  82. this._driver = driver;
  83. };
  84. util.inherits(Messages, Stream);
  85. // The Messages pause() and resume() methods will be called when the app that's
  86. // processing the messages gets backed up and drains. If we're emitting
  87. // messages too fast we should tell the source to slow down. Message output [2]
  88. // comes from IO input [1].
  89. Messages.prototype.pause = function() {
  90. this._driver.io._paused = true;
  91. };
  92. Messages.prototype.resume = function() {
  93. this._driver.io._paused = false;
  94. this._driver.io.emit('drain');
  95. };
  96. // When we receive messages from the user, send them to the formatter and tell
  97. // the source whether to back off.
  98. Messages.prototype.write = function(message) {
  99. if (!this.writable) return false;
  100. if (typeof message === 'string') this._driver.text(message);
  101. else this._driver.binary(message);
  102. return !this._paused;
  103. };
  104. // The Messages end() method will be called when a stream piping into it emits
  105. // 'end'. Many streams may be piped into the WebSocket and one of them ending
  106. // does not mean the whole socket is done, so just process the input and move
  107. // on leaving the socket open.
  108. Messages.prototype.end = function(message) {
  109. if (message !== undefined) this.write(message);
  110. };
  111. Messages.prototype.destroy = function() {};
  112. exports.IO = IO;
  113. exports.Messages = Messages;