123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- import { SubjectSubscriber } from '../Subject';
- import { Observable } from '../Observable';
- import { Subscriber } from '../Subscriber';
- import { Subscription } from '../Subscription';
- import { refCount as higherOrderRefCount } from '../operators/refCount';
- /**
- * @class ConnectableObservable<T>
- */
- export class ConnectableObservable extends Observable {
- constructor(/** @deprecated internal use only */ source,
- /** @deprecated internal use only */ subjectFactory) {
- super();
- this.source = source;
- this.subjectFactory = subjectFactory;
- /** @deprecated internal use only */ this._refCount = 0;
- this._isComplete = false;
- }
- /** @deprecated internal use only */ _subscribe(subscriber) {
- return this.getSubject().subscribe(subscriber);
- }
- /** @deprecated internal use only */ getSubject() {
- const subject = this._subject;
- if (!subject || subject.isStopped) {
- this._subject = this.subjectFactory();
- }
- return this._subject;
- }
- connect() {
- let connection = this._connection;
- if (!connection) {
- this._isComplete = false;
- connection = this._connection = new Subscription();
- connection.add(this.source
- .subscribe(new ConnectableSubscriber(this.getSubject(), this)));
- if (connection.closed) {
- this._connection = null;
- connection = Subscription.EMPTY;
- }
- else {
- this._connection = connection;
- }
- }
- return connection;
- }
- refCount() {
- return higherOrderRefCount()(this);
- }
- }
- const connectableProto = ConnectableObservable.prototype;
- export const connectableObservableDescriptor = {
- operator: { value: null },
- _refCount: { value: 0, writable: true },
- _subject: { value: null, writable: true },
- _connection: { value: null, writable: true },
- _subscribe: { value: connectableProto._subscribe },
- _isComplete: { value: connectableProto._isComplete, writable: true },
- getSubject: { value: connectableProto.getSubject },
- connect: { value: connectableProto.connect },
- refCount: { value: connectableProto.refCount }
- };
- class ConnectableSubscriber extends SubjectSubscriber {
- constructor(destination, connectable) {
- super(destination);
- this.connectable = connectable;
- }
- _error(err) {
- this._unsubscribe();
- super._error(err);
- }
- _complete() {
- this.connectable._isComplete = true;
- this._unsubscribe();
- super._complete();
- }
- /** @deprecated internal use only */ _unsubscribe() {
- const connectable = this.connectable;
- if (connectable) {
- this.connectable = null;
- const connection = connectable._connection;
- connectable._refCount = 0;
- connectable._subject = null;
- connectable._connection = null;
- if (connection) {
- connection.unsubscribe();
- }
- }
- }
- }
- class RefCountOperator {
- constructor(connectable) {
- this.connectable = connectable;
- }
- call(subscriber, source) {
- const { connectable } = this;
- connectable._refCount++;
- const refCounter = new RefCountSubscriber(subscriber, connectable);
- const subscription = source.subscribe(refCounter);
- if (!refCounter.closed) {
- refCounter.connection = connectable.connect();
- }
- return subscription;
- }
- }
- class RefCountSubscriber extends Subscriber {
- constructor(destination, connectable) {
- super(destination);
- this.connectable = connectable;
- }
- /** @deprecated internal use only */ _unsubscribe() {
- const { connectable } = this;
- if (!connectable) {
- this.connection = null;
- return;
- }
- this.connectable = null;
- const refCount = connectable._refCount;
- if (refCount <= 0) {
- this.connection = null;
- return;
- }
- connectable._refCount = refCount - 1;
- if (refCount > 1) {
- this.connection = null;
- return;
- }
- ///
- // Compare the local RefCountSubscriber's connection Subscription to the
- // connection Subscription on the shared ConnectableObservable. In cases
- // where the ConnectableObservable source synchronously emits values, and
- // the RefCountSubscriber's downstream Observers synchronously unsubscribe,
- // execution continues to here before the RefCountOperator has a chance to
- // supply the RefCountSubscriber with the shared connection Subscription.
- // For example:
- // ```
- // Observable.range(0, 10)
- // .publish()
- // .refCount()
- // .take(5)
- // .subscribe();
- // ```
- // In order to account for this case, RefCountSubscriber should only dispose
- // the ConnectableObservable's shared connection Subscription if the
- // connection Subscription exists, *and* either:
- // a. RefCountSubscriber doesn't have a reference to the shared connection
- // Subscription yet, or,
- // b. RefCountSubscriber's connection Subscription reference is identical
- // to the shared connection Subscription
- ///
- const { connection } = this;
- const sharedConnection = connectable._connection;
- this.connection = null;
- if (sharedConnection && (!connection || sharedConnection === connection)) {
- sharedConnection.unsubscribe();
- }
- }
- }
- //# sourceMappingURL=ConnectableObservable.js.map
|