summaryrefslogtreecommitdiffstats
path: root/node_modules/mongodb/lib/cmap
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/mongodb/lib/cmap')
-rw-r--r--node_modules/mongodb/lib/cmap/connection.js384
-rw-r--r--node_modules/mongodb/lib/cmap/connection_pool.js591
-rw-r--r--node_modules/mongodb/lib/cmap/errors.js35
-rw-r--r--node_modules/mongodb/lib/cmap/events.js154
-rw-r--r--node_modules/mongodb/lib/cmap/message_stream.js196
-rw-r--r--node_modules/mongodb/lib/cmap/stream_description.js45
6 files changed, 1405 insertions, 0 deletions
diff --git a/node_modules/mongodb/lib/cmap/connection.js b/node_modules/mongodb/lib/cmap/connection.js
new file mode 100644
index 0000000..bf71562
--- /dev/null
+++ b/node_modules/mongodb/lib/cmap/connection.js
@@ -0,0 +1,384 @@
+'use strict';
+
+const EventEmitter = require('events');
+const MessageStream = require('./message_stream');
+const MongoError = require('../core/error').MongoError;
+const MongoNetworkError = require('../core/error').MongoNetworkError;
+const MongoNetworkTimeoutError = require('../core/error').MongoNetworkTimeoutError;
+const MongoWriteConcernError = require('../core/error').MongoWriteConcernError;
+const CommandResult = require('../core/connection/command_result');
+const StreamDescription = require('./stream_description').StreamDescription;
+const wp = require('../core/wireprotocol');
+const apm = require('../core/connection/apm');
+const updateSessionFromResponse = require('../core/sessions').updateSessionFromResponse;
+const uuidV4 = require('../core/utils').uuidV4;
+const now = require('../utils').now;
+const calculateDurationInMs = require('../utils').calculateDurationInMs;
+
+const kStream = Symbol('stream');
+const kQueue = Symbol('queue');
+const kMessageStream = Symbol('messageStream');
+const kGeneration = Symbol('generation');
+const kLastUseTime = Symbol('lastUseTime');
+const kClusterTime = Symbol('clusterTime');
+const kDescription = Symbol('description');
+const kIsMaster = Symbol('ismaster');
+const kAutoEncrypter = Symbol('autoEncrypter');
+
+class Connection extends EventEmitter {
+ constructor(stream, options) {
+ super(options);
+
+ this.id = options.id;
+ this.address = streamIdentifier(stream);
+ this.bson = options.bson;
+ this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 0;
+ this.host = options.host || 'localhost';
+ this.port = options.port || 27017;
+ this.monitorCommands =
+ typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false;
+ this.closed = false;
+ this.destroyed = false;
+
+ this[kDescription] = new StreamDescription(this.address, options);
+ this[kGeneration] = options.generation;
+ this[kLastUseTime] = now();
+
+ // retain a reference to an `AutoEncrypter` if present
+ if (options.autoEncrypter) {
+ this[kAutoEncrypter] = options.autoEncrypter;
+ }
+
+ // setup parser stream and message handling
+ this[kQueue] = new Map();
+ this[kMessageStream] = new MessageStream(options);
+ this[kMessageStream].on('message', messageHandler(this));
+ this[kStream] = stream;
+ stream.on('error', () => {
+ /* ignore errors, listen to `close` instead */
+ });
+
+ stream.on('close', () => {
+ if (this.closed) {
+ return;
+ }
+
+ this.closed = true;
+ this[kQueue].forEach(op =>
+ op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`))
+ );
+ this[kQueue].clear();
+
+ this.emit('close');
+ });
+
+ stream.on('timeout', () => {
+ if (this.closed) {
+ return;
+ }
+
+ stream.destroy();
+ this.closed = true;
+ this[kQueue].forEach(op =>
+ op.cb(
+ new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
+ beforeHandshake: this[kIsMaster] == null
+ })
+ )
+ );
+
+ this[kQueue].clear();
+ this.emit('close');
+ });
+
+ // hook the message stream up to the passed in stream
+ stream.pipe(this[kMessageStream]);
+ this[kMessageStream].pipe(stream);
+ }
+
+ get description() {
+ return this[kDescription];
+ }
+
+ get ismaster() {
+ return this[kIsMaster];
+ }
+
+ // the `connect` method stores the result of the handshake ismaster on the connection
+ set ismaster(response) {
+ this[kDescription].receiveResponse(response);
+
+ // TODO: remove this, and only use the `StreamDescription` in the future
+ this[kIsMaster] = response;
+ }
+
+ get generation() {
+ return this[kGeneration] || 0;
+ }
+
+ get idleTime() {
+ return calculateDurationInMs(this[kLastUseTime]);
+ }
+
+ get clusterTime() {
+ return this[kClusterTime];
+ }
+
+ get stream() {
+ return this[kStream];
+ }
+
+ markAvailable() {
+ this[kLastUseTime] = now();
+ }
+
+ destroy(options, callback) {
+ if (typeof options === 'function') {
+ callback = options;
+ options = {};
+ }
+
+ options = Object.assign({ force: false }, options);
+ if (this[kStream] == null || this.destroyed) {
+ this.destroyed = true;
+ if (typeof callback === 'function') {
+ callback();
+ }
+
+ return;
+ }
+
+ if (options.force) {
+ this[kStream].destroy();
+ this.destroyed = true;
+ if (typeof callback === 'function') {
+ callback();
+ }
+
+ return;
+ }
+
+ this[kStream].end(err => {
+ this.destroyed = true;
+ if (typeof callback === 'function') {
+ callback(err);
+ }
+ });
+ }
+
+ // Wire protocol methods
+ command(ns, cmd, options, callback) {
+ wp.command(makeServerTrampoline(this), ns, cmd, options, callback);
+ }
+
+ query(ns, cmd, cursorState, options, callback) {
+ wp.query(makeServerTrampoline(this), ns, cmd, cursorState, options, callback);
+ }
+
+ getMore(ns, cursorState, batchSize, options, callback) {
+ wp.getMore(makeServerTrampoline(this), ns, cursorState, batchSize, options, callback);
+ }
+
+ killCursors(ns, cursorState, callback) {
+ wp.killCursors(makeServerTrampoline(this), ns, cursorState, callback);
+ }
+
+ insert(ns, ops, options, callback) {
+ wp.insert(makeServerTrampoline(this), ns, ops, options, callback);
+ }
+
+ update(ns, ops, options, callback) {
+ wp.update(makeServerTrampoline(this), ns, ops, options, callback);
+ }
+
+ remove(ns, ops, options, callback) {
+ wp.remove(makeServerTrampoline(this), ns, ops, options, callback);
+ }
+}
+
+/// This lets us emulate a legacy `Server` instance so we can work with the existing wire
+/// protocol methods. Eventually, the operation executor will return a `Connection` to execute
+/// against.
+function makeServerTrampoline(connection) {
+ const server = {
+ description: connection.description,
+ clusterTime: connection[kClusterTime],
+ s: {
+ bson: connection.bson,
+ pool: { write: write.bind(connection), isConnected: () => true }
+ }
+ };
+
+ if (connection[kAutoEncrypter]) {
+ server.autoEncrypter = connection[kAutoEncrypter];
+ }
+
+ return server;
+}
+
+function messageHandler(conn) {
+ return function messageHandler(message) {
+ // always emit the message, in case we are streaming
+ conn.emit('message', message);
+ if (!conn[kQueue].has(message.responseTo)) {
+ return;
+ }
+
+ const operationDescription = conn[kQueue].get(message.responseTo);
+ const callback = operationDescription.cb;
+
+ // SERVER-45775: For exhaust responses we should be able to use the same requestId to
+ // track response, however the server currently synthetically produces remote requests
+ // making the `responseTo` change on each response
+ conn[kQueue].delete(message.responseTo);
+ if (message.moreToCome) {
+ // requeue the callback for next synthetic request
+ conn[kQueue].set(message.requestId, operationDescription);
+ } else if (operationDescription.socketTimeoutOverride) {
+ conn[kStream].setTimeout(conn.socketTimeout);
+ }
+
+ try {
+ // Pass in the entire description because it has BSON parsing options
+ message.parse(operationDescription);
+ } catch (err) {
+ callback(new MongoError(err));
+ return;
+ }
+
+ if (message.documents[0]) {
+ const document = message.documents[0];
+ const session = operationDescription.session;
+ if (session) {
+ updateSessionFromResponse(session, document);
+ }
+
+ if (document.$clusterTime) {
+ conn[kClusterTime] = document.$clusterTime;
+ conn.emit('clusterTimeReceived', document.$clusterTime);
+ }
+
+ if (operationDescription.command) {
+ if (document.writeConcernError) {
+ callback(new MongoWriteConcernError(document.writeConcernError, document));
+ return;
+ }
+
+ if (document.ok === 0 || document.$err || document.errmsg || document.code) {
+ callback(new MongoError(document));
+ return;
+ }
+ }
+ }
+
+ // NODE-2382: reenable in our glorious non-leaky abstraction future
+ // callback(null, operationDescription.fullResult ? message : message.documents[0]);
+
+ callback(
+ undefined,
+ new CommandResult(
+ operationDescription.fullResult ? message : message.documents[0],
+ conn,
+ message
+ )
+ );
+ };
+}
+
+function streamIdentifier(stream) {
+ if (typeof stream.address === 'function') {
+ return `${stream.remoteAddress}:${stream.remotePort}`;
+ }
+
+ return uuidV4().toString('hex');
+}
+
+// Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance
+function write(command, options, callback) {
+ if (typeof options === 'function') {
+ callback = options;
+ }
+
+ options = options || {};
+ const operationDescription = {
+ requestId: command.requestId,
+ cb: callback,
+ session: options.session,
+ fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false,
+ noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
+ documentsReturnedIn: options.documentsReturnedIn,
+ command: !!options.command,
+
+ // for BSON parsing
+ promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
+ promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
+ promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
+ raw: typeof options.raw === 'boolean' ? options.raw : false
+ };
+
+ if (this[kDescription] && this[kDescription].compressor) {
+ operationDescription.agreedCompressor = this[kDescription].compressor;
+
+ if (this[kDescription].zlibCompressionLevel) {
+ operationDescription.zlibCompressionLevel = this[kDescription].zlibCompressionLevel;
+ }
+ }
+
+ if (typeof options.socketTimeout === 'number') {
+ operationDescription.socketTimeoutOverride = true;
+ this[kStream].setTimeout(options.socketTimeout);
+ }
+
+ // if command monitoring is enabled we need to modify the callback here
+ if (this.monitorCommands) {
+ this.emit('commandStarted', new apm.CommandStartedEvent(this, command));
+
+ operationDescription.started = now();
+ operationDescription.cb = (err, reply) => {
+ if (err) {
+ this.emit(
+ 'commandFailed',
+ new apm.CommandFailedEvent(this, command, err, operationDescription.started)
+ );
+ } else {
+ if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) {
+ this.emit(
+ 'commandFailed',
+ new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started)
+ );
+ } else {
+ this.emit(
+ 'commandSucceeded',
+ new apm.CommandSucceededEvent(this, command, reply, operationDescription.started)
+ );
+ }
+ }
+
+ if (typeof callback === 'function') {
+ callback(err, reply);
+ }
+ };
+ }
+
+ if (!operationDescription.noResponse) {
+ this[kQueue].set(operationDescription.requestId, operationDescription);
+ }
+
+ try {
+ this[kMessageStream].writeCommand(command, operationDescription);
+ } catch (e) {
+ if (!operationDescription.noResponse) {
+ this[kQueue].delete(operationDescription.requestId);
+ operationDescription.cb(e);
+ return;
+ }
+ }
+
+ if (operationDescription.noResponse) {
+ operationDescription.cb();
+ }
+}
+
+module.exports = {
+ Connection
+};
diff --git a/node_modules/mongodb/lib/cmap/connection_pool.js b/node_modules/mongodb/lib/cmap/connection_pool.js
new file mode 100644
index 0000000..4500d9a
--- /dev/null
+++ b/node_modules/mongodb/lib/cmap/connection_pool.js
@@ -0,0 +1,591 @@
+'use strict';
+
+const Denque = require('denque');
+const EventEmitter = require('events').EventEmitter;
+const Logger = require('../core/connection/logger');
+const makeCounter = require('../utils').makeCounter;
+const MongoError = require('../core/error').MongoError;
+const Connection = require('./connection').Connection;
+const eachAsync = require('../core/utils').eachAsync;
+const connect = require('../core/connection/connect');
+const relayEvents = require('../core/utils').relayEvents;
+
+const errors = require('./errors');
+const PoolClosedError = errors.PoolClosedError;
+const WaitQueueTimeoutError = errors.WaitQueueTimeoutError;
+
+const events = require('./events');
+const ConnectionPoolCreatedEvent = events.ConnectionPoolCreatedEvent;
+const ConnectionPoolClosedEvent = events.ConnectionPoolClosedEvent;
+const ConnectionCreatedEvent = events.ConnectionCreatedEvent;
+const ConnectionReadyEvent = events.ConnectionReadyEvent;
+const ConnectionClosedEvent = events.ConnectionClosedEvent;
+const ConnectionCheckOutStartedEvent = events.ConnectionCheckOutStartedEvent;
+const ConnectionCheckOutFailedEvent = events.ConnectionCheckOutFailedEvent;
+const ConnectionCheckedOutEvent = events.ConnectionCheckedOutEvent;
+const ConnectionCheckedInEvent = events.ConnectionCheckedInEvent;
+const ConnectionPoolClearedEvent = events.ConnectionPoolClearedEvent;
+
+const kLogger = Symbol('logger');
+const kConnections = Symbol('connections');
+const kPermits = Symbol('permits');
+const kMinPoolSizeTimer = Symbol('minPoolSizeTimer');
+const kGeneration = Symbol('generation');
+const kConnectionCounter = Symbol('connectionCounter');
+const kCancellationToken = Symbol('cancellationToken');
+const kWaitQueue = Symbol('waitQueue');
+const kCancelled = Symbol('cancelled');
+
+const VALID_POOL_OPTIONS = new Set([
+ // `connect` options
+ 'ssl',
+ 'bson',
+ 'connectionType',
+ 'monitorCommands',
+ 'socketTimeout',
+ 'credentials',
+ 'compression',
+
+ // node Net options
+ 'host',
+ 'port',
+ 'localAddress',
+ 'localPort',
+ 'family',
+ 'hints',
+ 'lookup',
+ 'path',
+
+ // node TLS options
+ 'ca',
+ 'cert',
+ 'sigalgs',
+ 'ciphers',
+ 'clientCertEngine',
+ 'crl',
+ 'dhparam',
+ 'ecdhCurve',
+ 'honorCipherOrder',
+ 'key',
+ 'privateKeyEngine',
+ 'privateKeyIdentifier',
+ 'maxVersion',
+ 'minVersion',
+ 'passphrase',
+ 'pfx',
+ 'secureOptions',
+ 'secureProtocol',
+ 'sessionIdContext',
+ 'allowHalfOpen',
+ 'rejectUnauthorized',
+ 'pskCallback',
+ 'ALPNProtocols',
+ 'servername',
+ 'checkServerIdentity',
+ 'session',
+ 'minDHSize',
+ 'secureContext',
+
+ // spec options
+ 'maxPoolSize',
+ 'minPoolSize',
+ 'maxIdleTimeMS',
+ 'waitQueueTimeoutMS'
+]);
+
+function resolveOptions(options, defaults) {
+ const newOptions = Array.from(VALID_POOL_OPTIONS).reduce((obj, key) => {
+ if (Object.prototype.hasOwnProperty.call(options, key)) {
+ obj[key] = options[key];
+ }
+
+ return obj;
+ }, {});
+
+ return Object.freeze(Object.assign({}, defaults, newOptions));
+}
+
+/**
+ * Configuration options for drivers wrapping the node driver.
+ *
+ * @typedef {Object} ConnectionPoolOptions
+ * @property
+ * @property {string} [host] The host to connect to
+ * @property {number} [port] The port to connect to
+ * @property {bson} [bson] The BSON instance to use for new connections
+ * @property {number} [maxPoolSize=100] The maximum number of connections that may be associated with a pool at a given time. This includes in use and available connections.
+ * @property {number} [minPoolSize=0] The minimum number of connections that MUST exist at any moment in a single connection pool.
+ * @property {number} [maxIdleTimeMS] The maximum amount of time a connection should remain idle in the connection pool before being marked idle.
+ * @property {number} [waitQueueTimeoutMS=0] The maximum amount of time operation execution should wait for a connection to become available. The default is 0 which means there is no limit.
+ */
+
+/**
+ * A pool of connections which dynamically resizes, and emit events related to pool activity
+ *
+ * @property {number} generation An integer representing the SDAM generation of the pool
+ * @property {number} totalConnectionCount An integer expressing how many total connections (active + in use) the pool currently has
+ * @property {number} availableConnectionCount An integer expressing how many connections are currently available in the pool.
+ * @property {string} address The address of the endpoint the pool is connected to
+ *
+ * @emits ConnectionPool#connectionPoolCreated
+ * @emits ConnectionPool#connectionPoolClosed
+ * @emits ConnectionPool#connectionCreated
+ * @emits ConnectionPool#connectionReady
+ * @emits ConnectionPool#connectionClosed
+ * @emits ConnectionPool#connectionCheckOutStarted
+ * @emits ConnectionPool#connectionCheckOutFailed
+ * @emits ConnectionPool#connectionCheckedOut
+ * @emits ConnectionPool#connectionCheckedIn
+ * @emits ConnectionPool#connectionPoolCleared
+ */
+class ConnectionPool extends EventEmitter {
+ /**
+ * Create a new Connection Pool
+ *
+ * @param {ConnectionPoolOptions} options
+ */
+ constructor(options) {
+ super();
+ options = options || {};
+
+ this.closed = false;
+ this.options = resolveOptions(options, {
+ connectionType: Connection,
+ maxPoolSize: typeof options.maxPoolSize === 'number' ? options.maxPoolSize : 100,
+ minPoolSize: typeof options.minPoolSize === 'number' ? options.minPoolSize : 0,
+ maxIdleTimeMS: typeof options.maxIdleTimeMS === 'number' ? options.maxIdleTimeMS : 0,
+ waitQueueTimeoutMS:
+ typeof options.waitQueueTimeoutMS === 'number' ? options.waitQueueTimeoutMS : 0,
+ autoEncrypter: options.autoEncrypter,
+ metadata: options.metadata
+ });
+
+ if (options.minSize > options.maxSize) {
+ throw new TypeError(
+ 'Connection pool minimum size must not be greater than maxiumum pool size'
+ );
+ }
+
+ this[kLogger] = Logger('ConnectionPool', options);
+ this[kConnections] = new Denque();
+ this[kPermits] = this.options.maxPoolSize;
+ this[kMinPoolSizeTimer] = undefined;
+ this[kGeneration] = 0;
+ this[kConnectionCounter] = makeCounter(1);
+ this[kCancellationToken] = new EventEmitter();
+ this[kCancellationToken].setMaxListeners(Infinity);
+ this[kWaitQueue] = new Denque();
+
+ process.nextTick(() => {
+ this.emit('connectionPoolCreated', new ConnectionPoolCreatedEvent(this));
+ ensureMinPoolSize(this);
+ });
+ }
+
+ get address() {
+ return `${this.options.host}:${this.options.port}`;
+ }
+
+ get generation() {
+ return this[kGeneration];
+ }
+
+ get totalConnectionCount() {
+ return this[kConnections].length + (this.options.maxPoolSize - this[kPermits]);
+ }
+
+ get availableConnectionCount() {
+ return this[kConnections].length;
+ }
+
+ get waitQueueSize() {
+ return this[kWaitQueue].length;
+ }
+
+ /**
+ * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it
+ * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
+ * explicitly destroyed by the new owner.
+ *
+ * @param {ConnectionPool~checkOutCallback} callback
+ */
+ checkOut(callback) {
+ this.emit('connectionCheckOutStarted', new ConnectionCheckOutStartedEvent(this));
+
+ if (this.closed) {
+ this.emit('connectionCheckOutFailed', new ConnectionCheckOutFailedEvent(this, 'poolClosed'));
+ callback(new PoolClosedError(this));
+ return;
+ }
+
+ const waitQueueMember = { callback };
+
+ const pool = this;
+ const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
+ if (waitQueueTimeoutMS) {
+ waitQueueMember.timer = setTimeout(() => {
+ waitQueueMember[kCancelled] = true;
+ waitQueueMember.timer = undefined;
+
+ pool.emit('connectionCheckOutFailed', new ConnectionCheckOutFailedEvent(pool, 'timeout'));
+ waitQueueMember.callback(new WaitQueueTimeoutError(pool));
+ }, waitQueueTimeoutMS);
+ }
+
+ this[kWaitQueue].push(waitQueueMember);
+ process.nextTick(() => processWaitQueue(this));
+ }
+
+ /**
+ * Check a connection into the pool.
+ *
+ * @param {Connection} connection The connection to check in
+ */
+ checkIn(connection) {
+ const poolClosed = this.closed;
+ const stale = connectionIsStale(this, connection);
+ const willDestroy = !!(poolClosed || stale || connection.closed);
+
+ if (!willDestroy) {
+ connection.markAvailable();
+ this[kConnections].push(connection);
+ }
+
+ this.emit('connectionCheckedIn', new ConnectionCheckedInEvent(this, connection));
+
+ if (willDestroy) {
+ const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
+ destroyConnection(this, connection, reason);
+ }
+
+ process.nextTick(() => processWaitQueue(this));
+ }
+
+ /**
+ * Clear the pool
+ *
+ * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
+ * previous generation will eventually be pruned during subsequent checkouts.
+ */
+ clear() {
+ this[kGeneration] += 1;
+ this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this));
+ }
+
+ /**
+ * Close the pool
+ *
+ * @param {object} [options] Optional settings
+ * @param {boolean} [options.force] Force close connections
+ * @param {Function} callback
+ */
+ close(options, callback) {
+ if (typeof options === 'function') {
+ callback = options;
+ }
+
+ options = Object.assign({ force: false }, options);
+ if (this.closed) {
+ return callback();
+ }
+
+ // immediately cancel any in-flight connections
+ this[kCancellationToken].emit('cancel');
+
+ // drain the wait queue
+ while (this.waitQueueSize) {
+ const waitQueueMember = this[kWaitQueue].pop();
+ clearTimeout(waitQueueMember.timer);
+ if (!waitQueueMember[kCancelled]) {
+ waitQueueMember.callback(new MongoError('connection pool closed'));
+ }
+ }
+
+ // clear the min pool size timer
+ if (this[kMinPoolSizeTimer]) {
+ clearTimeout(this[kMinPoolSizeTimer]);
+ }
+
+ // end the connection counter
+ if (typeof this[kConnectionCounter].return === 'function') {
+ this[kConnectionCounter].return();
+ }
+
+ // mark the pool as closed immediately
+ this.closed = true;
+
+ eachAsync(
+ this[kConnections].toArray(),
+ (conn, cb) => {
+ this.emit('connectionClosed', new ConnectionClosedEvent(this, conn, 'poolClosed'));
+ conn.destroy(options, cb);
+ },
+ err => {
+ this[kConnections].clear();
+ this.emit('connectionPoolClosed', new ConnectionPoolClosedEvent(this));
+ callback(err);
+ }
+ );
+ }
+
+ /**
+ * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda
+ * has completed by calling back.
+ *
+ * NOTE: please note the required signature of `fn`
+ *
+ * @param {ConnectionPool~withConnectionCallback} fn A function which operates on a managed connection
+ * @param {Function} callback The original callback
+ * @return {Promise}
+ */
+ withConnection(fn, callback) {
+ this.checkOut((err, conn) => {
+ // don't callback with `err` here, we might want to act upon it inside `fn`
+
+ fn(err, conn, (fnErr, result) => {
+ if (typeof callback === 'function') {
+ if (fnErr) {
+ callback(fnErr);
+ } else {
+ callback(undefined, result);
+ }
+ }
+
+ if (conn) {
+ this.checkIn(conn);
+ }
+ });
+ });
+ }
+}
+
+function ensureMinPoolSize(pool) {
+ if (pool.closed || pool.options.minPoolSize === 0) {
+ return;
+ }
+
+ const minPoolSize = pool.options.minPoolSize;
+ for (let i = pool.totalConnectionCount; i < minPoolSize; ++i) {
+ createConnection(pool);
+ }
+
+ pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10);
+}
+
+function connectionIsStale(pool, connection) {
+ return connection.generation !== pool[kGeneration];
+}
+
+function connectionIsIdle(pool, connection) {
+ return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS);
+}
+
+function createConnection(pool, callback) {
+ const connectOptions = Object.assign(
+ {
+ id: pool[kConnectionCounter].next().value,
+ generation: pool[kGeneration]
+ },
+ pool.options
+ );
+
+ pool[kPermits]--;
+ connect(connectOptions, pool[kCancellationToken], (err, connection) => {
+ if (err) {
+ pool[kPermits]++;
+ pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`);
+ if (typeof callback === 'function') {
+ callback(err);
+ }
+
+ return;
+ }
+
+ // The pool might have closed since we started trying to create a connection
+ if (pool.closed) {
+ connection.destroy({ force: true });
+ return;
+ }
+
+ // forward all events from the connection to the pool
+ relayEvents(connection, pool, [
+ 'commandStarted',
+ 'commandFailed',
+ 'commandSucceeded',
+ 'clusterTimeReceived'
+ ]);
+
+ pool.emit('connectionCreated', new ConnectionCreatedEvent(pool, connection));
+
+ connection.markAvailable();
+ pool.emit('connectionReady', new ConnectionReadyEvent(pool, connection));
+
+ // if a callback has been provided, check out the connection immediately
+ if (typeof callback === 'function') {
+ callback(undefined, connection);
+ return;
+ }
+
+ // otherwise add it to the pool for later acquisition, and try to process the wait queue
+ pool[kConnections].push(connection);
+ process.nextTick(() => processWaitQueue(pool));
+ });
+}
+
+function destroyConnection(pool, connection, reason) {
+ pool.emit('connectionClosed', new ConnectionClosedEvent(pool, connection, reason));
+
+ // allow more connections to be created
+ pool[kPermits]++;
+
+ // destroy the connection
+ process.nextTick(() => connection.destroy());
+}
+
+function processWaitQueue(pool) {
+ if (pool.closed) {
+ return;
+ }
+
+ while (pool.waitQueueSize) {
+ const waitQueueMember = pool[kWaitQueue].peekFront();
+ if (waitQueueMember[kCancelled]) {
+ pool[kWaitQueue].shift();
+ continue;
+ }
+
+ if (!pool.availableConnectionCount) {
+ break;
+ }
+
+ const connection = pool[kConnections].shift();
+ const isStale = connectionIsStale(pool, connection);
+ const isIdle = connectionIsIdle(pool, connection);
+ if (!isStale && !isIdle && !connection.closed) {
+ pool.emit('connectionCheckedOut', new ConnectionCheckedOutEvent(pool, connection));
+ clearTimeout(waitQueueMember.timer);
+ pool[kWaitQueue].shift();
+ waitQueueMember.callback(undefined, connection);
+ return;
+ }
+
+ const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle';
+ destroyConnection(pool, connection, reason);
+ }
+
+ const maxPoolSize = pool.options.maxPoolSize;
+ if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) {
+ createConnection(pool, (err, connection) => {
+ const waitQueueMember = pool[kWaitQueue].shift();
+ if (waitQueueMember == null || waitQueueMember[kCancelled]) {
+ if (err == null) {
+ pool[kConnections].push(connection);
+ }
+
+ return;
+ }
+
+ if (err) {
+ pool.emit('connectionCheckOutFailed', new ConnectionCheckOutFailedEvent(pool, err));
+ } else {
+ pool.emit('connectionCheckedOut', new ConnectionCheckedOutEvent(pool, connection));
+ }
+
+ clearTimeout(waitQueueMember.timer);
+ waitQueueMember.callback(err, connection);
+ });
+
+ return;
+ }
+}
+
+/**
+ * A callback provided to `withConnection`
+ *
+ * @callback ConnectionPool~withConnectionCallback
+ * @param {MongoError} error An error instance representing the error during the execution.
+ * @param {Connection} connection The managed connection which was checked out of the pool.
+ * @param {Function} callback A function to call back after connection management is complete
+ */
+
+/**
+ * A callback provided to `checkOut`
+ *
+ * @callback ConnectionPool~checkOutCallback
+ * @param {MongoError} error An error instance representing the error during checkout
+ * @param {Connection} connection A connection from the pool
+ */
+
+/**
+ * Emitted once when the connection pool is created
+ *
+ * @event ConnectionPool#connectionPoolCreated
+ * @type {PoolCreatedEvent}
+ */
+
+/**
+ * Emitted once when the connection pool is closed
+ *
+ * @event ConnectionPool#connectionPoolClosed
+ * @type {PoolClosedEvent}
+ */
+
+/**
+ * Emitted each time a connection is created
+ *
+ * @event ConnectionPool#connectionCreated
+ * @type {ConnectionCreatedEvent}
+ */
+
+/**
+ * Emitted when a connection becomes established, and is ready to use
+ *
+ * @event ConnectionPool#connectionReady
+ * @type {ConnectionReadyEvent}
+ */
+
+/**
+ * Emitted when a connection is closed
+ *
+ * @event ConnectionPool#connectionClosed
+ * @type {ConnectionClosedEvent}
+ */
+
+/**
+ * Emitted when an attempt to check out a connection begins
+ *
+ * @event ConnectionPool#connectionCheckOutStarted
+ * @type {ConnectionCheckOutStartedEvent}
+ */
+
+/**
+ * Emitted when an attempt to check out a connection fails
+ *
+ * @event ConnectionPool#connectionCheckOutFailed
+ * @type {ConnectionCheckOutFailedEvent}
+ */
+
+/**
+ * Emitted each time a connection is successfully checked out of the connection pool
+ *
+ * @event ConnectionPool#connectionCheckedOut
+ * @type {ConnectionCheckedOutEvent}
+ */
+
+/**
+ * Emitted each time a connection is successfully checked into the connection pool
+ *
+ * @event ConnectionPool#connectionCheckedIn
+ * @type {ConnectionCheckedInEvent}
+ */
+
+/**
+ * Emitted each time the connection pool is cleared and it's generation incremented
+ *
+ * @event ConnectionPool#connectionPoolCleared
+ * @type {PoolClearedEvent}
+ */
+
+module.exports = {
+ ConnectionPool
+};
diff --git a/node_modules/mongodb/lib/cmap/errors.js b/node_modules/mongodb/lib/cmap/errors.js
new file mode 100644
index 0000000..d933019
--- /dev/null
+++ b/node_modules/mongodb/lib/cmap/errors.js
@@ -0,0 +1,35 @@
+'use strict';
+const MongoError = require('../core/error').MongoError;
+
+/**
+ * An error indicating a connection pool is closed
+ *
+ * @property {string} address The address of the connection pool
+ * @extends MongoError
+ */
+class PoolClosedError extends MongoError {
+ constructor(pool) {
+ super('Attempted to check out a connection from closed connection pool');
+ this.name = 'MongoPoolClosedError';
+ this.address = pool.address;
+ }
+}
+
+/**
+ * An error thrown when a request to check out a connection times out
+ *
+ * @property {string} address The address of the connection pool
+ * @extends MongoError
+ */
+class WaitQueueTimeoutError extends MongoError {
+ constructor(pool) {
+ super('Timed out while checking out a connection from connection pool');
+ this.name = 'MongoWaitQueueTimeoutError';
+ this.address = pool.address;
+ }
+}
+
+module.exports = {
+ PoolClosedError,
+ WaitQueueTimeoutError
+};
diff --git a/node_modules/mongodb/lib/cmap/events.js b/node_modules/mongodb/lib/cmap/events.js
new file mode 100644
index 0000000..dcc8b67
--- /dev/null
+++ b/node_modules/mongodb/lib/cmap/events.js
@@ -0,0 +1,154 @@
+'use strict';
+
+/**
+ * The base class for all monitoring events published from the connection pool
+ *
+ * @property {number} time A timestamp when the event was created
+ * @property {string} address The address (host/port pair) of the pool
+ */
+class ConnectionPoolMonitoringEvent {
+ constructor(pool) {
+ this.time = new Date();
+ this.address = pool.address;
+ }
+}
+
+/**
+ * An event published when a connection pool is created
+ *
+ * @property {Object} options The options used to create this connection pool
+ */
+class ConnectionPoolCreatedEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool) {
+ super(pool);
+ this.options = pool.options;
+ }
+}
+
+/**
+ * An event published when a connection pool is closed
+ */
+class ConnectionPoolClosedEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool) {
+ super(pool);
+ }
+}
+
+/**
+ * An event published when a connection pool creates a new connection
+ *
+ * @property {number} connectionId A monotonically increasing, per-pool id for the newly created connection
+ */
+class ConnectionCreatedEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool, connection) {
+ super(pool);
+ this.connectionId = connection.id;
+ }
+}
+
+/**
+ * An event published when a connection is ready for use
+ *
+ * @property {number} connectionId The id of the connection
+ */
+class ConnectionReadyEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool, connection) {
+ super(pool);
+ this.connectionId = connection.id;
+ }
+}
+
+/**
+ * An event published when a connection is closed
+ *
+ * @property {number} connectionId The id of the connection
+ * @property {string} reason The reason the connection was closed
+ */
+class ConnectionClosedEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool, connection, reason) {
+ super(pool);
+ this.connectionId = connection.id;
+ this.reason = reason || 'unknown';
+ }
+}
+
+/**
+ * An event published when a request to check a connection out begins
+ */
+class ConnectionCheckOutStartedEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool) {
+ super(pool);
+ }
+}
+
+/**
+ * An event published when a request to check a connection out fails
+ *
+ * @property {string} reason The reason the attempt to check out failed
+ */
+class ConnectionCheckOutFailedEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool, reason) {
+ super(pool);
+ this.reason = reason;
+ }
+}
+
+/**
+ * An event published when a connection is checked out of the connection pool
+ *
+ * @property {number} connectionId The id of the connection
+ */
+class ConnectionCheckedOutEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool, connection) {
+ super(pool);
+ this.connectionId = connection.id;
+ }
+}
+
+/**
+ * An event published when a connection is checked into the connection pool
+ *
+ * @property {number} connectionId The id of the connection
+ */
+class ConnectionCheckedInEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool, connection) {
+ super(pool);
+ this.connectionId = connection.id;
+ }
+}
+
+/**
+ * An event published when a connection pool is cleared
+ */
+class ConnectionPoolClearedEvent extends ConnectionPoolMonitoringEvent {
+ constructor(pool) {
+ super(pool);
+ }
+}
+
+const CMAP_EVENT_NAMES = [
+ 'connectionPoolCreated',
+ 'connectionPoolClosed',
+ 'connectionCreated',
+ 'connectionReady',
+ 'connectionClosed',
+ 'connectionCheckOutStarted',
+ 'connectionCheckOutFailed',
+ 'connectionCheckedOut',
+ 'connectionCheckedIn',
+ 'connectionPoolCleared'
+];
+
+module.exports = {
+ CMAP_EVENT_NAMES,
+ ConnectionPoolCreatedEvent,
+ ConnectionPoolClosedEvent,
+ ConnectionCreatedEvent,
+ ConnectionReadyEvent,
+ ConnectionClosedEvent,
+ ConnectionCheckOutStartedEvent,
+ ConnectionCheckOutFailedEvent,
+ ConnectionCheckedOutEvent,
+ ConnectionCheckedInEvent,
+ ConnectionPoolClearedEvent
+};
diff --git a/node_modules/mongodb/lib/cmap/message_stream.js b/node_modules/mongodb/lib/cmap/message_stream.js
new file mode 100644
index 0000000..c8f458e
--- /dev/null
+++ b/node_modules/mongodb/lib/cmap/message_stream.js
@@ -0,0 +1,196 @@
+'use strict';
+
+const Duplex = require('stream').Duplex;
+const BufferList = require('bl');
+const MongoParseError = require('../core/error').MongoParseError;
+const decompress = require('../core/wireprotocol/compression').decompress;
+const Response = require('../core/connection/commands').Response;
+const BinMsg = require('../core/connection/msg').BinMsg;
+const MongoError = require('../core/error').MongoError;
+const OP_COMPRESSED = require('../core/wireprotocol/shared').opcodes.OP_COMPRESSED;
+const OP_MSG = require('../core/wireprotocol/shared').opcodes.OP_MSG;
+const MESSAGE_HEADER_SIZE = require('../core/wireprotocol/shared').MESSAGE_HEADER_SIZE;
+const COMPRESSION_DETAILS_SIZE = require('../core/wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
+const opcodes = require('../core/wireprotocol/shared').opcodes;
+const compress = require('../core/wireprotocol/compression').compress;
+const compressorIDs = require('../core/wireprotocol/compression').compressorIDs;
+const uncompressibleCommands = require('../core/wireprotocol/compression').uncompressibleCommands;
+const Msg = require('../core/connection/msg').Msg;
+
+const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
+const kBuffer = Symbol('buffer');
+
+/**
+ * A duplex stream that is capable of reading and writing raw wire protocol messages, with
+ * support for optional compression
+ */
+class MessageStream extends Duplex {
+ constructor(options) {
+ options = options || {};
+ super(options);
+
+ this.bson = options.bson;
+ this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;
+
+ this[kBuffer] = new BufferList();
+ }
+
+ _write(chunk, _, callback) {
+ const buffer = this[kBuffer];
+ buffer.append(chunk);
+
+ processIncomingData(this, callback);
+ }
+
+ _read(/* size */) {
+ // NOTE: This implementation is empty because we explicitly push data to be read
+ // when `writeMessage` is called.
+ return;
+ }
+
+ writeCommand(command, operationDescription) {
+ // TODO: agreed compressor should live in `StreamDescription`
+ const shouldCompress = operationDescription && !!operationDescription.agreedCompressor;
+ if (!shouldCompress || !canCompress(command)) {
+ const data = command.toBin();
+ this.push(Array.isArray(data) ? Buffer.concat(data) : data);
+ return;
+ }
+
+ // otherwise, compress the message
+ const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin());
+ const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
+
+ // Extract information needed for OP_COMPRESSED from the uncompressed message
+ const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
+
+ // Compress the message body
+ compress({ options: operationDescription }, messageToBeCompressed, (err, compressedMessage) => {
+ if (err) {
+ operationDescription.cb(err, null);
+ return;
+ }
+
+ // Create the msgHeader of OP_COMPRESSED
+ const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
+ msgHeader.writeInt32LE(
+ MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
+ 0
+ ); // messageLength
+ msgHeader.writeInt32LE(command.requestId, 4); // requestID
+ msgHeader.writeInt32LE(0, 8); // responseTo (zero)
+ msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode
+
+ // Create the compression details of OP_COMPRESSED
+ const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
+ compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
+ compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
+ compressionDetails.writeUInt8(compressorIDs[operationDescription.agreedCompressor], 8); // compressorID
+
+ this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
+ });
+ }
+}
+
+// Return whether a command contains an uncompressible command term
+// Will return true if command contains no uncompressible command terms
+function canCompress(command) {
+ const commandDoc = command instanceof Msg ? command.command : command.query;
+ const commandName = Object.keys(commandDoc)[0];
+ return !uncompressibleCommands.has(commandName);
+}
+
+function processIncomingData(stream, callback) {
+ const buffer = stream[kBuffer];
+ if (buffer.length < 4) {
+ callback();
+ return;
+ }
+
+ const sizeOfMessage = buffer.readInt32LE(0);
+ if (sizeOfMessage < 0) {
+ callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
+ return;
+ }
+
+ if (sizeOfMessage > stream.maxBsonMessageSize) {
+ callback(
+ new MongoParseError(
+ `Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`
+ )
+ );
+ return;
+ }
+
+ if (sizeOfMessage > buffer.length) {
+ callback();
+ return;
+ }
+
+ const message = buffer.slice(0, sizeOfMessage);
+ buffer.consume(sizeOfMessage);
+
+ const messageHeader = {
+ length: message.readInt32LE(0),
+ requestId: message.readInt32LE(4),
+ responseTo: message.readInt32LE(8),
+ opCode: message.readInt32LE(12)
+ };
+
+ let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
+ const responseOptions = stream.responseOptions;
+ if (messageHeader.opCode !== OP_COMPRESSED) {
+ const messageBody = message.slice(MESSAGE_HEADER_SIZE);
+ stream.emit(
+ 'message',
+ new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
+ );
+
+ if (buffer.length >= 4) {
+ processIncomingData(stream, callback);
+ } else {
+ callback();
+ }
+
+ return;
+ }
+
+ messageHeader.fromCompressed = true;
+ messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
+ messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
+ const compressorID = message[MESSAGE_HEADER_SIZE + 8];
+ const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);
+
+ // recalculate based on wrapped opcode
+ ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
+
+ decompress(compressorID, compressedBuffer, (err, messageBody) => {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ if (messageBody.length !== messageHeader.length) {
+ callback(
+ new MongoError(
+ 'Decompressing a compressed message from the server failed. The message is corrupt.'
+ )
+ );
+
+ return;
+ }
+
+ stream.emit(
+ 'message',
+ new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
+ );
+
+ if (buffer.length >= 4) {
+ processIncomingData(stream, callback);
+ } else {
+ callback();
+ }
+ });
+}
+
+module.exports = MessageStream;
diff --git a/node_modules/mongodb/lib/cmap/stream_description.js b/node_modules/mongodb/lib/cmap/stream_description.js
new file mode 100644
index 0000000..e806a5f
--- /dev/null
+++ b/node_modules/mongodb/lib/cmap/stream_description.js
@@ -0,0 +1,45 @@
+'use strict';
+const parseServerType = require('../core/sdam/server_description').parseServerType;
+
+const RESPONSE_FIELDS = [
+ 'minWireVersion',
+ 'maxWireVersion',
+ 'maxBsonObjectSize',
+ 'maxMessageSizeBytes',
+ 'maxWriteBatchSize',
+ '__nodejs_mock_server__'
+];
+
+class StreamDescription {
+ constructor(address, options) {
+ this.address = address;
+ this.type = parseServerType(null);
+ this.minWireVersion = undefined;
+ this.maxWireVersion = undefined;
+ this.maxBsonObjectSize = 16777216;
+ this.maxMessageSizeBytes = 48000000;
+ this.maxWriteBatchSize = 100000;
+ this.compressors =
+ options && options.compression && Array.isArray(options.compression.compressors)
+ ? options.compression.compressors
+ : [];
+ }
+
+ receiveResponse(response) {
+ this.type = parseServerType(response);
+
+ RESPONSE_FIELDS.forEach(field => {
+ if (typeof response[field] !== 'undefined') {
+ this[field] = response[field];
+ }
+ });
+
+ if (response.compression) {
+ this.compressor = this.compressors.filter(c => response.compression.indexOf(c) !== -1)[0];
+ }
+ }
+}
+
+module.exports = {
+ StreamDescription
+};