WebSocketSubject.js 9.1KB


  1. /** PURE_IMPORTS_START .._.._Subject,.._.._Subscriber,.._.._Observable,.._.._Subscription,.._.._util_root,.._.._ReplaySubject,.._.._util_tryCatch,.._.._util_errorObject,.._.._util_assign PURE_IMPORTS_END */
  2. var __extends = (this && this.__extends) || function (d, b) {
  3. for (var p in b)
  4. if (b.hasOwnProperty(p))
  5. d[p] = b[p];
  6. function __() { this.constructor = d; }
  7. d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
  8. };
  9. import { Subject, AnonymousSubject } from '../../Subject';
  10. import { Subscriber } from '../../Subscriber';
  11. import { Observable } from '../../Observable';
  12. import { Subscription } from '../../Subscription';
  13. import { root } from '../../util/root';
  14. import { ReplaySubject } from '../../ReplaySubject';
  15. import { tryCatch } from '../../util/tryCatch';
  16. import { errorObject } from '../../util/errorObject';
  17. import { assign } from '../../util/assign';
  18. /**
  19. * We need this JSDoc comment for affecting ESDoc.
  20. * @extends {Ignored}
  21. * @hide true
  22. */
  23. export var WebSocketSubject = /*@__PURE__*/ (/*@__PURE__*/ function (_super) {
  24. __extends(WebSocketSubject, _super);
  25. function WebSocketSubject(urlConfigOrSource, destination) {
  26. if (urlConfigOrSource instanceof Observable) {
  27. _super.call(this, destination, urlConfigOrSource);
  28. }
  29. else {
  30. _super.call(this);
  31. this.WebSocketCtor = root.WebSocket;
  32. this._output = new Subject();
  33. if (typeof urlConfigOrSource === 'string') {
  34. this.url = urlConfigOrSource;
  35. }
  36. else {
  37. // WARNING: config object could override important members here.
  38. assign(this, urlConfigOrSource);
  39. }
  40. if (!this.WebSocketCtor) {
  41. throw new Error('no WebSocket constructor can be found');
  42. }
  43. this.destination = new ReplaySubject();
  44. }
  45. }
  46. WebSocketSubject.prototype.resultSelector = function (e) {
  47. return JSON.parse(e.data);
  48. };
  49. /**
  50. * Wrapper around the w3c-compatible WebSocket object provided by the browser.
  51. *
  52. * @example <caption>Wraps browser WebSocket</caption>
  53. *
  54. * let socket$ = Observable.webSocket('ws://localhost:8081');
  55. *
  56. * socket$.subscribe(
  57. * (msg) => console.log('message received: ' + msg),
  58. * (err) => console.log(err),
  59. * () => console.log('complete')
  60. * );
  61. *
  62. * socket$.next(JSON.stringify({ op: 'hello' }));
  63. *
  64. * @example <caption>Wraps WebSocket from nodejs-websocket (using node.js)</caption>
  65. *
  66. * import { w3cwebsocket } from 'websocket';
  67. *
  68. * let socket$ = Observable.webSocket({
  69. * url: 'ws://localhost:8081',
  70. * WebSocketCtor: w3cwebsocket
  71. * });
  72. *
  73. * socket$.subscribe(
  74. * (msg) => console.log('message received: ' + msg),
  75. * (err) => console.log(err),
  76. * () => console.log('complete')
  77. * );
  78. *
  79. * socket$.next(JSON.stringify({ op: 'hello' }));
  80. *
  81. * @param {string | WebSocketSubjectConfig} urlConfigOrSource the source of the websocket as an url or a structure defining the websocket object
  82. * @return {WebSocketSubject}
  83. * @static true
  84. * @name webSocket
  85. * @owner Observable
  86. */
  87. WebSocketSubject.create = function (urlConfigOrSource) {
  88. return new WebSocketSubject(urlConfigOrSource);
  89. };
  90. WebSocketSubject.prototype.lift = function (operator) {
  91. var sock = new WebSocketSubject(this, this.destination);
  92. sock.operator = operator;
  93. return sock;
  94. };
  95. WebSocketSubject.prototype._resetState = function () {
  96. this.socket = null;
  97. if (!this.source) {
  98. this.destination = new ReplaySubject();
  99. }
  100. this._output = new Subject();
  101. };
  102. // TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures
  103. WebSocketSubject.prototype.multiplex = function (subMsg, unsubMsg, messageFilter) {
  104. var self = this;
  105. return new Observable(function (observer) {
  106. var result = tryCatch(subMsg)();
  107. if (result === errorObject) {
  108. observer.error(errorObject.e);
  109. }
  110. else {
  111. self.next(result);
  112. }
  113. var subscription = self.subscribe(function (x) {
  114. var result = tryCatch(messageFilter)(x);
  115. if (result === errorObject) {
  116. observer.error(errorObject.e);
  117. }
  118. else if (result) {
  119. observer.next(x);
  120. }
  121. }, function (err) { return observer.error(err); }, function () { return observer.complete(); });
  122. return function () {
  123. var result = tryCatch(unsubMsg)();
  124. if (result === errorObject) {
  125. observer.error(errorObject.e);
  126. }
  127. else {
  128. self.next(result);
  129. }
  130. subscription.unsubscribe();
  131. };
  132. });
  133. };
  134. WebSocketSubject.prototype._connectSocket = function () {
  135. var _this = this;
  136. var WebSocketCtor = this.WebSocketCtor;
  137. var observer = this._output;
  138. var socket = null;
  139. try {
  140. socket = this.protocol ?
  141. new WebSocketCtor(this.url, this.protocol) :
  142. new WebSocketCtor(this.url);
  143. this.socket = socket;
  144. if (this.binaryType) {
  145. this.socket.binaryType = this.binaryType;
  146. }
  147. }
  148. catch (e) {
  149. observer.error(e);
  150. return;
  151. }
  152. var subscription = new Subscription(function () {
  153. _this.socket = null;
  154. if (socket && socket.readyState === 1) {
  155. socket.close();
  156. }
  157. });
  158. socket.onopen = function (e) {
  159. var openObserver = _this.openObserver;
  160. if (openObserver) {
  161. openObserver.next(e);
  162. }
  163. var queue = _this.destination;
  164. _this.destination = Subscriber.create(function (x) { return socket.readyState === 1 && socket.send(x); }, function (e) {
  165. var closingObserver = _this.closingObserver;
  166. if (closingObserver) {
  167. closingObserver.next(undefined);
  168. }
  169. if (e && e.code) {
  170. socket.close(e.code, e.reason);
  171. }
  172. else {
  173. observer.error(new TypeError('WebSocketSubject.error must be called with an object with an error code, ' +
  174. 'and an optional reason: { code: number, reason: string }'));
  175. }
  176. _this._resetState();
  177. }, function () {
  178. var closingObserver = _this.closingObserver;
  179. if (closingObserver) {
  180. closingObserver.next(undefined);
  181. }
  182. socket.close();
  183. _this._resetState();
  184. });
  185. if (queue && queue instanceof ReplaySubject) {
  186. subscription.add(queue.subscribe(_this.destination));
  187. }
  188. };
  189. socket.onerror = function (e) {
  190. _this._resetState();
  191. observer.error(e);
  192. };
  193. socket.onclose = function (e) {
  194. _this._resetState();
  195. var closeObserver = _this.closeObserver;
  196. if (closeObserver) {
  197. closeObserver.next(e);
  198. }
  199. if (e.wasClean) {
  200. observer.complete();
  201. }
  202. else {
  203. observer.error(e);
  204. }
  205. };
  206. socket.onmessage = function (e) {
  207. var result = tryCatch(_this.resultSelector)(e);
  208. if (result === errorObject) {
  209. observer.error(errorObject.e);
  210. }
  211. else {
  212. observer.next(result);
  213. }
  214. };
  215. };
  216. /** @deprecated internal use only */ WebSocketSubject.prototype._subscribe = function (subscriber) {
  217. var _this = this;
  218. var source = this.source;
  219. if (source) {
  220. return source.subscribe(subscriber);
  221. }
  222. if (!this.socket) {
  223. this._connectSocket();
  224. }
  225. var subscription = new Subscription();
  226. subscription.add(this._output.subscribe(subscriber));
  227. subscription.add(function () {
  228. var socket = _this.socket;
  229. if (_this._output.observers.length === 0) {
  230. if (socket && socket.readyState === 1) {
  231. socket.close();
  232. }
  233. _this._resetState();
  234. }
  235. });
  236. return subscription;
  237. };
  238. WebSocketSubject.prototype.unsubscribe = function () {
  239. var _a = this, source = _a.source, socket = _a.socket;
  240. if (socket && socket.readyState === 1) {
  241. socket.close();
  242. this._resetState();
  243. }
  244. _super.prototype.unsubscribe.call(this);
  245. if (!source) {
  246. this.destination = new ReplaySubject();
  247. }
  248. };
  249. return WebSocketSubject;
  250. }(AnonymousSubject));
  251. //# sourceMappingURL=WebSocketSubject.js.map