12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- 'use strict';
-
- var RingBuffer = require('./ring_buffer');
-
- var Functor = function(session, method) {
- this._session = session;
- this._method = method;
- this._queue = new RingBuffer(Functor.QUEUE_SIZE);
- this._stopped = false;
- this.pending = 0;
- };
-
- Functor.QUEUE_SIZE = 8;
-
- Functor.prototype.call = function(error, message, callback, context) {
- if (this._stopped) return;
-
- var record = {error: error, message: message, callback: callback, context: context, done: false},
- called = false,
- self = this;
-
- this._queue.push(record);
-
- if (record.error) {
- record.done = true;
- this._stop();
- return this._flushQueue();
- }
-
- var handler = function(err, msg) {
- if (!(called ^ (called = true))) return;
-
- if (err) {
- self._stop();
- record.error = err;
- record.message = null;
- } else {
- record.message = msg;
- }
-
- record.done = true;
- self._flushQueue();
- };
-
- try {
- this._session[this._method](message, handler);
- } catch (err) {
- handler(err);
- }
- };
-
- Functor.prototype._stop = function() {
- this.pending = this._queue.length;
- this._stopped = true;
- };
-
- Functor.prototype._flushQueue = function() {
- var queue = this._queue, record;
-
- while (queue.length > 0 && queue.peek().done) {
- record = queue.shift();
- if (record.error) {
- this.pending = 0;
- queue.clear();
- } else {
- this.pending -= 1;
- }
- record.callback.call(record.context, record.error, record.message);
- }
- };
-
- module.exports = Functor;
|