From 81ddf9b700bc48a1f8e472209f080f9c1d9a9b09 Mon Sep 17 00:00:00 2001 From: Piotr Russ Date: Wed, 18 Nov 2020 23:26:45 +0100 Subject: rm node_modules --- node_modules/mongodb/lib/cmap/connection.js | 384 ------------- node_modules/mongodb/lib/cmap/connection_pool.js | 591 --------------------- node_modules/mongodb/lib/cmap/errors.js | 35 -- node_modules/mongodb/lib/cmap/events.js | 154 ------ node_modules/mongodb/lib/cmap/message_stream.js | 196 ------- .../mongodb/lib/cmap/stream_description.js | 45 -- 6 files changed, 1405 deletions(-) delete mode 100644 node_modules/mongodb/lib/cmap/connection.js delete mode 100644 node_modules/mongodb/lib/cmap/connection_pool.js delete mode 100644 node_modules/mongodb/lib/cmap/errors.js delete mode 100644 node_modules/mongodb/lib/cmap/events.js delete mode 100644 node_modules/mongodb/lib/cmap/message_stream.js delete mode 100644 node_modules/mongodb/lib/cmap/stream_description.js (limited to 'node_modules/mongodb/lib/cmap') diff --git a/node_modules/mongodb/lib/cmap/connection.js b/node_modules/mongodb/lib/cmap/connection.js deleted file mode 100644 index bf71562..0000000 --- a/node_modules/mongodb/lib/cmap/connection.js +++ /dev/null @@ -1,384 +0,0 @@ -'use strict'; - -const EventEmitter = require('events'); -const MessageStream = require('./message_stream'); -const MongoError = require('../core/error').MongoError; -const MongoNetworkError = require('../core/error').MongoNetworkError; -const MongoNetworkTimeoutError = require('../core/error').MongoNetworkTimeoutError; -const MongoWriteConcernError = require('../core/error').MongoWriteConcernError; -const CommandResult = require('../core/connection/command_result'); -const StreamDescription = require('./stream_description').StreamDescription; -const wp = require('../core/wireprotocol'); -const apm = require('../core/connection/apm'); -const updateSessionFromResponse = require('../core/sessions').updateSessionFromResponse; -const uuidV4 = require('../core/utils').uuidV4; -const now = require('../utils').now; -const calculateDurationInMs = require('../utils').calculateDurationInMs; - -const kStream = Symbol('stream'); -const kQueue = Symbol('queue'); -const kMessageStream = Symbol('messageStream'); -const kGeneration = Symbol('generation'); -const kLastUseTime = Symbol('lastUseTime'); -const kClusterTime = Symbol('clusterTime'); -const kDescription = Symbol('description'); -const kIsMaster = Symbol('ismaster'); -const kAutoEncrypter = Symbol('autoEncrypter'); - -class Connection extends EventEmitter { - constructor(stream, options) { - super(options); - - this.id = options.id; - this.address = streamIdentifier(stream); - this.bson = options.bson; - this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 0; - this.host = options.host || 'localhost'; - this.port = options.port || 27017; - this.monitorCommands = - typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false; - this.closed = false; - this.destroyed = false; - - this[kDescription] = new StreamDescription(this.address, options); - this[kGeneration] = options.generation; - this[kLastUseTime] = now(); - - // retain a reference to an `AutoEncrypter` if present - if (options.autoEncrypter) { - this[kAutoEncrypter] = options.autoEncrypter; - } - - // setup parser stream and message handling - this[kQueue] = new Map(); - this[kMessageStream] = new MessageStream(options); - this[kMessageStream].on('message', messageHandler(this)); - this[kStream] = stream; - stream.on('error', () => { - /* ignore errors, listen to `close` instead */ - }); - - stream.on('close', () => { - if (this.closed) { - return; - } - - this.closed = true; - this[kQueue].forEach(op => - op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)) - ); - this[kQueue].clear(); - - this.emit('close'); - }); - - stream.on('timeout', () => { - if (this.closed) { - return; - } - - stream.destroy(); - this.closed = true; - this[kQueue].forEach(op => - op.cb( - new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, { - beforeHandshake: this[kIsMaster] == null - }) - ) - ); - - this[kQueue].clear(); - this.emit('close'); - }); - - // hook the message stream up to the passed in stream - stream.pipe(this[kMessageStream]); - this[kMessageStream].pipe(stream); - } - - get description() { - return this[kDescription]; - } - - get ismaster() { - return this[kIsMaster]; - } - - // the `connect` method stores the result of the handshake ismaster on the connection - set ismaster(response) { - this[kDescription].receiveResponse(response); - - // TODO: remove this, and only use the `StreamDescription` in the future - this[kIsMaster] = response; - } - - get generation() { - return this[kGeneration] || 0; - } - - get idleTime() { - return calculateDurationInMs(this[kLastUseTime]); - } - - get clusterTime() { - return this[kClusterTime]; - } - - get stream() { - return this[kStream]; - } - - markAvailable() { - this[kLastUseTime] = now(); - } - - destroy(options, callback) { - if (typeof options === 'function') { - callback = options; - options = {}; - } - - options = Object.assign({ force: false }, options); - if (this[kStream] == null || this.destroyed) { - this.destroyed = true; - if (typeof callback === 'function') { - callback(); - } - - return; - } - - if (options.force) { - this[kStream].destroy(); - this.destroyed = true; - if (typeof callback === 'function') { - callback(); - } - - return; - } - - this[kStream].end(err => { - this.destroyed = true; - if (typeof callback === 'function') { - callback(err); - } - }); - } - - // Wire protocol methods - command(ns, cmd, options, callback) { - wp.command(makeServerTrampoline(this), ns, cmd, options, callback); - } - - query(ns, cmd, cursorState, options, callback) { - wp.query(makeServerTrampoline(this), ns, cmd, cursorState, options, callback); - } - - getMore(ns, cursorState, batchSize, options, callback) { - wp.getMore(makeServerTrampoline(this), ns, cursorState, batchSize, options, callback); - } - - killCursors(ns, cursorState, callback) { - wp.killCursors(makeServerTrampoline(this), ns, cursorState, callback); - } - - insert(ns, ops, options, callback) { - wp.insert(makeServerTrampoline(this), ns, ops, options, callback); - } - - update(ns, ops, options, callback) { - wp.update(makeServerTrampoline(this), ns, ops, options, callback); - } - - remove(ns, ops, options, callback) { - wp.remove(makeServerTrampoline(this), ns, ops, options, callback); - } -} - -/// This lets us emulate a legacy `Server` instance so we can work with the existing wire -/// protocol methods. Eventually, the operation executor will return a `Connection` to execute -/// against. -function makeServerTrampoline(connection) { - const server = { - description: connection.description, - clusterTime: connection[kClusterTime], - s: { - bson: connection.bson, - pool: { write: write.bind(connection), isConnected: () => true } - } - }; - - if (connection[kAutoEncrypter]) { - server.autoEncrypter = connection[kAutoEncrypter]; - } - - return server; -} - -function messageHandler(conn) { - return function messageHandler(message) { - // always emit the message, in case we are streaming - conn.emit('message', message); - if (!conn[kQueue].has(message.responseTo)) { - return; - } - - const operationDescription = conn[kQueue].get(message.responseTo); - const callback = operationDescription.cb; - - // SERVER-45775: For exhaust responses we should be able to use the same requestId to - // track response, however the server currently synthetically produces remote requests - // making the `responseTo` change on each response - conn[kQueue].delete(message.responseTo); - if (message.moreToCome) { - // requeue the callback for next synthetic request - conn[kQueue].set(message.requestId, operationDescription); - } else if (operationDescription.socketTimeoutOverride) { - conn[kStream].setTimeout(conn.socketTimeout); - } - - try { - // Pass in the entire description because it has BSON parsing options - message.parse(operationDescription); - } catch (err) { - callback(new MongoError(err)); - return; - } - - if (message.documents[0]) { - const document = message.documents[0]; - const session = operationDescription.session; - if (session) { - updateSessionFromResponse(session, document); - } - - if (document.$clusterTime) { - conn[kClusterTime] = document.$clusterTime; - conn.emit('clusterTimeReceived', document.$clusterTime); - } - - if (operationDescription.command) { - if (document.writeConcernError) { - callback(new MongoWriteConcernError(document.writeConcernError, document)); - return; - } - - if (document.ok === 0 || document.$err || document.errmsg || document.code) { - callback(new MongoError(document)); - return; - } - } - } - - // NODE-2382: reenable in our glorious non-leaky abstraction future - // callback(null, operationDescription.fullResult ? message : message.documents[0]); - - callback( - undefined, - new CommandResult( - operationDescription.fullResult ? message : message.documents[0], - conn, - message - ) - ); - }; -} - -function streamIdentifier(stream) { - if (typeof stream.address === 'function') { - return `${stream.remoteAddress}:${stream.remotePort}`; - } - - return uuidV4().toString('hex'); -} - -// Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance -function write(command, options, callback) { - if (typeof options === 'function') { - callback = options; - } - - options = options || {}; - const operationDescription = { - requestId: command.requestId, - cb: callback, - session: options.session, - fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false, - noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false, - documentsReturnedIn: options.documentsReturnedIn, - command: !!options.command, - - // for BSON parsing - promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true, - promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true, - promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false, - raw: typeof options.raw === 'boolean' ? options.raw : false - }; - - if (this[kDescription] && this[kDescription].compressor) { - operationDescription.agreedCompressor = this[kDescription].compressor; - - if (this[kDescription].zlibCompressionLevel) { - operationDescription.zlibCompressionLevel = this[kDescription].zlibCompressionLevel; - } - } - - if (typeof options.socketTimeout === 'number') { - operationDescription.socketTimeoutOverride = true; - this[kStream].setTimeout(options.socketTimeout); - } - - // if command monitoring is enabled we need to modify the callback here - if (this.monitorCommands) { - this.emit('commandStarted', new apm.CommandStartedEvent(this, command)); - - operationDescription.started = now(); - operationDescription.cb = (err, reply) => { - if (err) { - this.emit( - 'commandFailed', - new apm.CommandFailedEvent(this, command, err, operationDescription.started) - ); - } else { - if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) { - this.emit( - 'commandFailed', - new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started) - ); - } else { - this.emit( - 'commandSucceeded', - new apm.CommandSucceededEvent(this, command, reply, operationDescription.started) - ); - } - } - - if (typeof callback === 'function') { - callback(err, reply); - } - }; - } - - if (!operationDescription.noResponse) { - this[kQueue].set(operationDescription.requestId, operationDescription); - } - - try { - this[kMessageStream].writeCommand(command, operationDescription); - } catch (e) { - if (!operationDescription.noResponse) { - this[kQueue].delete(operationDescription.requestId); - operationDescription.cb(e); - return; - } - } - - if (operationDescription.noResponse) { - operationDescription.cb(); - } -} - -module.exports = { - Connection -}; diff --git a/node_modules/mongodb/lib/cmap/connection_pool.js b/node_modules/mongodb/lib/cmap/connection_pool.js deleted file mode 100644 index 4500d9a..0000000 --- a/node_modules/mongodb/lib/cmap/connection_pool.js +++ /dev/null @@ -1,591 +0,0 @@ -'use strict'; - -const Denque = require('denque'); -const EventEmitter = require('events').EventEmitter; -const Logger = require('../core/connection/logger'); -const makeCounter = require('../utils').makeCounter; -const MongoError = require('../core/error').MongoError; -const Connection = require('./connection').Connection; -const eachAsync = require('../core/utils').eachAsync; -const connect = require('../core/connection/connect'); -const relayEvents = require('../core/utils').relayEvents; - -const errors = require('./errors'); -const PoolClosedError = errors.PoolClosedError; -const WaitQueueTimeoutError = errors.WaitQueueTimeoutError; - -const events = require('./events'); -const ConnectionPoolCreatedEvent = events.ConnectionPoolCreatedEvent; -const ConnectionPoolClosedEvent = events.ConnectionPoolClosedEvent; -const ConnectionCreatedEvent = events.ConnectionCreatedEvent; -const ConnectionReadyEvent = events.ConnectionReadyEvent; -const ConnectionClosedEvent = events.ConnectionClosedEvent; -const ConnectionCheckOutStartedEvent = events.ConnectionCheckOutStartedEvent; -const ConnectionCheckOutFailedEvent = events.ConnectionCheckOutFailedEvent; -const ConnectionCheckedOutEvent = events.ConnectionCheckedOutEvent; -const ConnectionCheckedInEvent = events.ConnectionCheckedInEvent; -const ConnectionPoolClearedEvent = events.ConnectionPoolClearedEvent; - -const kLogger = Symbol('logger'); -const kConnections = Symbol('connections'); -const kPermits = Symbol('permits'); -const kMinPoolSizeTimer = Symbol('minPoolSizeTimer'); -const kGeneration = Symbol('generation'); -const kConnectionCounter = Symbol('connectionCounter'); -const kCancellationToken = Symbol('cancellationToken'); -const kWaitQueue = Symbol('waitQueue'); -const kCancelled = Symbol('cancelled'); - -const VALID_POOL_OPTIONS = new Set([ - // `connect` options - 'ssl', - 'bson', - 'connectionType', - 'monitorCommands', - 'socketTimeout', - 'credentials', - 'compression', - - // node Net options - 'host', - 'port', - 'localAddress', - 'localPort', - 'family', - 'hints', - 'lookup', - 'path', - - // node TLS options - 'ca', - 'cert', - 'sigalgs', - 'ciphers', - 'clientCertEngine', - 'crl', - 'dhparam', - 'ecdhCurve', - 'honorCipherOrder', - 'key', - 'privateKeyEngine', - 'privateKeyIdentifier', - 'maxVersion', - 'minVersion', - 'passphrase', - 'pfx', - 'secureOptions', - 'secureProtocol', - 'sessionIdContext', - 'allowHalfOpen', - 'rejectUnauthorized', - 'pskCallback', - 'ALPNProtocols', - 'servername', - 'checkServerIdentity', - 'session', - 'minDHSize', - 'secureContext', - - // spec options - 'maxPoolSize', - 'minPoolSize', - 'maxIdleTimeMS', - 'waitQueueTimeoutMS' -]); - -function resolveOptions(options, defaults) { - const newOptions = Array.from(VALID_POOL_OPTIONS).reduce((obj, key) => { - if (Object.prototype.hasOwnProperty.call(options, key)) { - obj[key] = options[key]; - } - - return obj; - }, {}); - - return Object.freeze(Object.assign({}, defaults, newOptions)); -} - -/** - * Configuration options for drivers wrapping the node driver. - * - * @typedef {Object} ConnectionPoolOptions - * @property - * @property {string} [host] The host to connect to - * @property {number} [port] The port to connect to - * @property {bson} [bson] The BSON instance to use for new connections - * @property {number} [maxPoolSize=100] The maximum number of connections that may be associated with a pool at a given time. This includes in use and available connections. - * @property {number} [minPoolSize=0] The minimum number of connections that MUST exist at any moment in a single connection pool. - * @property {number} [maxIdleTimeMS] The maximum amount of time a connection should remain idle in the connection pool before being marked idle. - * @property {number} [waitQueueTimeoutMS=0] The maximum amount of time operation execution should wait for a connection to become available. The default is 0 which means there is no limit. - */ - -/** - * A pool of connections which dynamically resizes, and emit events related to pool activity - * - * @property {number} generation An integer representing the SDAM generation of the pool - * @property {number} totalConnectionCount An integer expressing how many total connections (active + in use) the pool currently has - * @property {number} availableConnectionCount An integer expressing how many connections are currently available in the pool. - * @property {string} address The address of the endpoint the pool is connected to - * - * @emits ConnectionPool#connectionPoolCreated - * @emits ConnectionPool#connectionPoolClosed - * @emits ConnectionPool#connectionCreated - * @emits ConnectionPool#connectionReady - * @emits ConnectionPool#connectionClosed - * @emits ConnectionPool#connectionCheckOutStarted - * @emits ConnectionPool#connectionCheckOutFailed - * @emits ConnectionPool#connectionCheckedOut - * @emits ConnectionPool#connectionCheckedIn - * @emits ConnectionPool#connectionPoolCleared - */ -class ConnectionPool extends EventEmitter { - /** - * Create a new Connection Pool - * - * @param {ConnectionPoolOptions} options - */ - constructor(options) { - super(); - options = options || {}; - - this.closed = false; - this.options = resolveOptions(options, { - connectionType: Connection, - maxPoolSize: typeof options.maxPoolSize === 'number' ? options.maxPoolSize : 100, - minPoolSize: typeof options.minPoolSize === 'number' ? options.minPoolSize : 0, - maxIdleTimeMS: typeof options.maxIdleTimeMS === 'number' ? options.maxIdleTimeMS : 0, - waitQueueTimeoutMS: - typeof options.waitQueueTimeoutMS === 'number' ? options.waitQueueTimeoutMS : 0, - autoEncrypter: options.autoEncrypter, - metadata: options.metadata - }); - - if (options.minSize > options.maxSize) { - throw new TypeError( - 'Connection pool minimum size must not be greater than maxiumum pool size' - ); - } - - this[kLogger] = Logger('ConnectionPool', options); - this[kConnections] = new Denque(); - this[kPermits] = this.options.maxPoolSize; - this[kMinPoolSizeTimer] = undefined; - this[kGeneration] = 0; - this[kConnectionCounter] = makeCounter(1); - this[kCancellationToken] = new EventEmitter(); - this[kCancellationToken].setMaxListeners(Infinity); - this[kWaitQueue] = new Denque(); - - process.nextTick(() => { - this.emit('connectionPoolCreated', new ConnectionPoolCreatedEvent(this)); - ensureMinPoolSize(this); - }); - } - - get address() { - return `${this.options.host}:${this.options.port}`; - } - - get generation() { - return this[kGeneration]; - } - - get totalConnectionCount() { - return this[kConnections].length + (this.options.maxPoolSize - this[kPermits]); - } - - get availableConnectionCount() { - return this[kConnections].length; - } - - get waitQueueSize() { - return this[kWaitQueue].length; - } - - /** - * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it - * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or - * explicitly destroyed by the new owner. - * - * @param {ConnectionPool~checkOutCallback} callback - */ - checkOut(callback) { - this.emit('connectionCheckOutStarted', new ConnectionCheckOutStartedEvent(this)); - - if (this.closed) { - this.emit('connectionCheckOutFailed', new ConnectionCheckOutFailedEvent(this, 'poolClosed')); - callback(new PoolClosedError(this)); - return; - } - - const waitQueueMember = { callback }; - - const pool = this; - const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS; - if (waitQueueTimeoutMS) { - waitQueueMember.timer = setTimeout(() => { - waitQueueMember[kCancelled] = true; - waitQueueMember.timer = undefined; - - pool.emit('connectionCheckOutFailed', new ConnectionCheckOutFailedEvent(pool, 'timeout')); - waitQueueMember.callback(new WaitQueueTimeoutError(pool)); - }, waitQueueTimeoutMS); - } - - this[kWaitQueue].push(waitQueueMember); - process.nextTick(() => processWaitQueue(this)); - } - - /** - * Check a connection into the pool. - * - * @param {Connection} connection The connection to check in - */ - checkIn(connection) { - const poolClosed = this.closed; - const stale = connectionIsStale(this, connection); - const willDestroy = !!(poolClosed || stale || connection.closed); - - if (!willDestroy) { - connection.markAvailable(); - this[kConnections].push(connection); - } - - this.emit('connectionCheckedIn', new ConnectionCheckedInEvent(this, connection)); - - if (willDestroy) { - const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale'; - destroyConnection(this, connection, reason); - } - - process.nextTick(() => processWaitQueue(this)); - } - - /** - * Clear the pool - * - * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a - * previous generation will eventually be pruned during subsequent checkouts. - */ - clear() { - this[kGeneration] += 1; - this.emit('connectionPoolCleared', new ConnectionPoolClearedEvent(this)); - } - - /** - * Close the pool - * - * @param {object} [options] Optional settings - * @param {boolean} [options.force] Force close connections - * @param {Function} callback - */ - close(options, callback) { - if (typeof options === 'function') { - callback = options; - } - - options = Object.assign({ force: false }, options); - if (this.closed) { - return callback(); - } - - // immediately cancel any in-flight connections - this[kCancellationToken].emit('cancel'); - - // drain the wait queue - while (this.waitQueueSize) { - const waitQueueMember = this[kWaitQueue].pop(); - clearTimeout(waitQueueMember.timer); - if (!waitQueueMember[kCancelled]) { - waitQueueMember.callback(new MongoError('connection pool closed')); - } - } - - // clear the min pool size timer - if (this[kMinPoolSizeTimer]) { - clearTimeout(this[kMinPoolSizeTimer]); - } - - // end the connection counter - if (typeof this[kConnectionCounter].return === 'function') { - this[kConnectionCounter].return(); - } - - // mark the pool as closed immediately - this.closed = true; - - eachAsync( - this[kConnections].toArray(), - (conn, cb) => { - this.emit('connectionClosed', new ConnectionClosedEvent(this, conn, 'poolClosed')); - conn.destroy(options, cb); - }, - err => { - this[kConnections].clear(); - this.emit('connectionPoolClosed', new ConnectionPoolClosedEvent(this)); - callback(err); - } - ); - } - - /** - * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda - * has completed by calling back. - * - * NOTE: please note the required signature of `fn` - * - * @param {ConnectionPool~withConnectionCallback} fn A function which operates on a managed connection - * @param {Function} callback The original callback - * @return {Promise} - */ - withConnection(fn, callback) { - this.checkOut((err, conn) => { - // don't callback with `err` here, we might want to act upon it inside `fn` - - fn(err, conn, (fnErr, result) => { - if (typeof callback === 'function') { - if (fnErr) { - callback(fnErr); - } else { - callback(undefined, result); - } - } - - if (conn) { - this.checkIn(conn); - } - }); - }); - } -} - -function ensureMinPoolSize(pool) { - if (pool.closed || pool.options.minPoolSize === 0) { - return; - } - - const minPoolSize = pool.options.minPoolSize; - for (let i = pool.totalConnectionCount; i < minPoolSize; ++i) { - createConnection(pool); - } - - pool[kMinPoolSizeTimer] = setTimeout(() => ensureMinPoolSize(pool), 10); -} - -function connectionIsStale(pool, connection) { - return connection.generation !== pool[kGeneration]; -} - -function connectionIsIdle(pool, connection) { - return !!(pool.options.maxIdleTimeMS && connection.idleTime > pool.options.maxIdleTimeMS); -} - -function createConnection(pool, callback) { - const connectOptions = Object.assign( - { - id: pool[kConnectionCounter].next().value, - generation: pool[kGeneration] - }, - pool.options - ); - - pool[kPermits]--; - connect(connectOptions, pool[kCancellationToken], (err, connection) => { - if (err) { - pool[kPermits]++; - pool[kLogger].debug(`connection attempt failed with error [${JSON.stringify(err)}]`); - if (typeof callback === 'function') { - callback(err); - } - - return; - } - - // The pool might have closed since we started trying to create a connection - if (pool.closed) { - connection.destroy({ force: true }); - return; - } - - // forward all events from the connection to the pool - relayEvents(connection, pool, [ - 'commandStarted', - 'commandFailed', - 'commandSucceeded', - 'clusterTimeReceived' - ]); - - pool.emit('connectionCreated', new ConnectionCreatedEvent(pool, connection)); - - connection.markAvailable(); - pool.emit('connectionReady', new ConnectionReadyEvent(pool, connection)); - - // if a callback has been provided, check out the connection immediately - if (typeof callback === 'function') { - callback(undefined, connection); - return; - } - - // otherwise add it to the pool for later acquisition, and try to process the wait queue - pool[kConnections].push(connection); - process.nextTick(() => processWaitQueue(pool)); - }); -} - -function destroyConnection(pool, connection, reason) { - pool.emit('connectionClosed', new ConnectionClosedEvent(pool, connection, reason)); - - // allow more connections to be created - pool[kPermits]++; - - // destroy the connection - process.nextTick(() => connection.destroy()); -} - -function processWaitQueue(pool) { - if (pool.closed) { - return; - } - - while (pool.waitQueueSize) { - const waitQueueMember = pool[kWaitQueue].peekFront(); - if (waitQueueMember[kCancelled]) { - pool[kWaitQueue].shift(); - continue; - } - - if (!pool.availableConnectionCount) { - break; - } - - const connection = pool[kConnections].shift(); - const isStale = connectionIsStale(pool, connection); - const isIdle = connectionIsIdle(pool, connection); - if (!isStale && !isIdle && !connection.closed) { - pool.emit('connectionCheckedOut', new ConnectionCheckedOutEvent(pool, connection)); - clearTimeout(waitQueueMember.timer); - pool[kWaitQueue].shift(); - waitQueueMember.callback(undefined, connection); - return; - } - - const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle'; - destroyConnection(pool, connection, reason); - } - - const maxPoolSize = pool.options.maxPoolSize; - if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) { - createConnection(pool, (err, connection) => { - const waitQueueMember = pool[kWaitQueue].shift(); - if (waitQueueMember == null || waitQueueMember[kCancelled]) { - if (err == null) { - pool[kConnections].push(connection); - } - - return; - } - - if (err) { - pool.emit('connectionCheckOutFailed', new ConnectionCheckOutFailedEvent(pool, err)); - } else { - pool.emit('connectionCheckedOut', new ConnectionCheckedOutEvent(pool, connection)); - } - - clearTimeout(waitQueueMember.timer); - waitQueueMember.callback(err, connection); - }); - - return; - } -} - -/** - * A callback provided to `withConnection` - * - * @callback ConnectionPool~withConnectionCallback - * @param {MongoError} error An error instance representing the error during the execution. - * @param {Connection} connection The managed connection which was checked out of the pool. - * @param {Function} callback A function to call back after connection management is complete - */ - -/** - * A callback provided to `checkOut` - * - * @callback ConnectionPool~checkOutCallback - * @param {MongoError} error An error instance representing the error during checkout - * @param {Connection} connection A connection from the pool - */ - -/** - * Emitted once when the connection pool is created - * - * @event ConnectionPool#connectionPoolCreated - * @type {PoolCreatedEvent} - */ - -/** - * Emitted once when the connection pool is closed - * - * @event ConnectionPool#connectionPoolClosed - * @type {PoolClosedEvent} - */ - -/** - * Emitted each time a connection is created - * - * @event ConnectionPool#connectionCreated - * @type {ConnectionCreatedEvent} - */ - -/** - * Emitted when a connection becomes established, and is ready to use - * - * @event ConnectionPool#connectionReady - * @type {ConnectionReadyEvent} - */ - -/** - * Emitted when a connection is closed - * - * @event ConnectionPool#connectionClosed - * @type {ConnectionClosedEvent} - */ - -/** - * Emitted when an attempt to check out a connection begins - * - * @event ConnectionPool#connectionCheckOutStarted - * @type {ConnectionCheckOutStartedEvent} - */ - -/** - * Emitted when an attempt to check out a connection fails - * - * @event ConnectionPool#connectionCheckOutFailed - * @type {ConnectionCheckOutFailedEvent} - */ - -/** - * Emitted each time a connection is successfully checked out of the connection pool - * - * @event ConnectionPool#connectionCheckedOut - * @type {ConnectionCheckedOutEvent} - */ - -/** - * Emitted each time a connection is successfully checked into the connection pool - * - * @event ConnectionPool#connectionCheckedIn - * @type {ConnectionCheckedInEvent} - */ - -/** - * Emitted each time the connection pool is cleared and it's generation incremented - * - * @event ConnectionPool#connectionPoolCleared - * @type {PoolClearedEvent} - */ - -module.exports = { - ConnectionPool -}; diff --git a/node_modules/mongodb/lib/cmap/errors.js b/node_modules/mongodb/lib/cmap/errors.js deleted file mode 100644 index d933019..0000000 --- a/node_modules/mongodb/lib/cmap/errors.js +++ /dev/null @@ -1,35 +0,0 @@ -'use strict'; -const MongoError = require('../core/error').MongoError; - -/** - * An error indicating a connection pool is closed - * - * @property {string} address The address of the connection pool - * @extends MongoError - */ -class PoolClosedError extends MongoError { - constructor(pool) { - super('Attempted to check out a connection from closed connection pool'); - this.name = 'MongoPoolClosedError'; - this.address = pool.address; - } -} - -/** - * An error thrown when a request to check out a connection times out - * - * @property {string} address The address of the connection pool - * @extends MongoError - */ -class WaitQueueTimeoutError extends MongoError { - constructor(pool) { - super('Timed out while checking out a connection from connection pool'); - this.name = 'MongoWaitQueueTimeoutError'; - this.address = pool.address; - } -} - -module.exports = { - PoolClosedError, - WaitQueueTimeoutError -}; diff --git a/node_modules/mongodb/lib/cmap/events.js b/node_modules/mongodb/lib/cmap/events.js deleted file mode 100644 index dcc8b67..0000000 --- a/node_modules/mongodb/lib/cmap/events.js +++ /dev/null @@ -1,154 +0,0 @@ -'use strict'; - -/** - * The base class for all monitoring events published from the connection pool - * - * @property {number} time A timestamp when the event was created - * @property {string} address The address (host/port pair) of the pool - */ -class ConnectionPoolMonitoringEvent { - constructor(pool) { - this.time = new Date(); - this.address = pool.address; - } -} - -/** - * An event published when a connection pool is created - * - * @property {Object} options The options used to create this connection pool - */ -class ConnectionPoolCreatedEvent extends ConnectionPoolMonitoringEvent { - constructor(pool) { - super(pool); - this.options = pool.options; - } -} - -/** - * An event published when a connection pool is closed - */ -class ConnectionPoolClosedEvent extends ConnectionPoolMonitoringEvent { - constructor(pool) { - super(pool); - } -} - -/** - * An event published when a connection pool creates a new connection - * - * @property {number} connectionId A monotonically increasing, per-pool id for the newly created connection - */ -class ConnectionCreatedEvent extends ConnectionPoolMonitoringEvent { - constructor(pool, connection) { - super(pool); - this.connectionId = connection.id; - } -} - -/** - * An event published when a connection is ready for use - * - * @property {number} connectionId The id of the connection - */ -class ConnectionReadyEvent extends ConnectionPoolMonitoringEvent { - constructor(pool, connection) { - super(pool); - this.connectionId = connection.id; - } -} - -/** - * An event published when a connection is closed - * - * @property {number} connectionId The id of the connection - * @property {string} reason The reason the connection was closed - */ -class ConnectionClosedEvent extends ConnectionPoolMonitoringEvent { - constructor(pool, connection, reason) { - super(pool); - this.connectionId = connection.id; - this.reason = reason || 'unknown'; - } -} - -/** - * An event published when a request to check a connection out begins - */ -class ConnectionCheckOutStartedEvent extends ConnectionPoolMonitoringEvent { - constructor(pool) { - super(pool); - } -} - -/** - * An event published when a request to check a connection out fails - * - * @property {string} reason The reason the attempt to check out failed - */ -class ConnectionCheckOutFailedEvent extends ConnectionPoolMonitoringEvent { - constructor(pool, reason) { - super(pool); - this.reason = reason; - } -} - -/** - * An event published when a connection is checked out of the connection pool - * - * @property {number} connectionId The id of the connection - */ -class ConnectionCheckedOutEvent extends ConnectionPoolMonitoringEvent { - constructor(pool, connection) { - super(pool); - this.connectionId = connection.id; - } -} - -/** - * An event published when a connection is checked into the connection pool - * - * @property {number} connectionId The id of the connection - */ -class ConnectionCheckedInEvent extends ConnectionPoolMonitoringEvent { - constructor(pool, connection) { - super(pool); - this.connectionId = connection.id; - } -} - -/** - * An event published when a connection pool is cleared - */ -class ConnectionPoolClearedEvent extends ConnectionPoolMonitoringEvent { - constructor(pool) { - super(pool); - } -} - -const CMAP_EVENT_NAMES = [ - 'connectionPoolCreated', - 'connectionPoolClosed', - 'connectionCreated', - 'connectionReady', - 'connectionClosed', - 'connectionCheckOutStarted', - 'connectionCheckOutFailed', - 'connectionCheckedOut', - 'connectionCheckedIn', - 'connectionPoolCleared' -]; - -module.exports = { - CMAP_EVENT_NAMES, - ConnectionPoolCreatedEvent, - ConnectionPoolClosedEvent, - ConnectionCreatedEvent, - ConnectionReadyEvent, - ConnectionClosedEvent, - ConnectionCheckOutStartedEvent, - ConnectionCheckOutFailedEvent, - ConnectionCheckedOutEvent, - ConnectionCheckedInEvent, - ConnectionPoolClearedEvent -}; diff --git a/node_modules/mongodb/lib/cmap/message_stream.js b/node_modules/mongodb/lib/cmap/message_stream.js deleted file mode 100644 index c8f458e..0000000 --- a/node_modules/mongodb/lib/cmap/message_stream.js +++ /dev/null @@ -1,196 +0,0 @@ -'use strict'; - -const Duplex = require('stream').Duplex; -const BufferList = require('bl'); -const MongoParseError = require('../core/error').MongoParseError; -const decompress = require('../core/wireprotocol/compression').decompress; -const Response = require('../core/connection/commands').Response; -const BinMsg = require('../core/connection/msg').BinMsg; -const MongoError = require('../core/error').MongoError; -const OP_COMPRESSED = require('../core/wireprotocol/shared').opcodes.OP_COMPRESSED; -const OP_MSG = require('../core/wireprotocol/shared').opcodes.OP_MSG; -const MESSAGE_HEADER_SIZE = require('../core/wireprotocol/shared').MESSAGE_HEADER_SIZE; -const COMPRESSION_DETAILS_SIZE = require('../core/wireprotocol/shared').COMPRESSION_DETAILS_SIZE; -const opcodes = require('../core/wireprotocol/shared').opcodes; -const compress = require('../core/wireprotocol/compression').compress; -const compressorIDs = require('../core/wireprotocol/compression').compressorIDs; -const uncompressibleCommands = require('../core/wireprotocol/compression').uncompressibleCommands; -const Msg = require('../core/connection/msg').Msg; - -const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; -const kBuffer = Symbol('buffer'); - -/** - * A duplex stream that is capable of reading and writing raw wire protocol messages, with - * support for optional compression - */ -class MessageStream extends Duplex { - constructor(options) { - options = options || {}; - super(options); - - this.bson = options.bson; - this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize; - - this[kBuffer] = new BufferList(); - } - - _write(chunk, _, callback) { - const buffer = this[kBuffer]; - buffer.append(chunk); - - processIncomingData(this, callback); - } - - _read(/* size */) { - // NOTE: This implementation is empty because we explicitly push data to be read - // when `writeMessage` is called. - return; - } - - writeCommand(command, operationDescription) { - // TODO: agreed compressor should live in `StreamDescription` - const shouldCompress = operationDescription && !!operationDescription.agreedCompressor; - if (!shouldCompress || !canCompress(command)) { - const data = command.toBin(); - this.push(Array.isArray(data) ? Buffer.concat(data) : data); - return; - } - - // otherwise, compress the message - const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin()); - const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); - - // Extract information needed for OP_COMPRESSED from the uncompressed message - const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); - - // Compress the message body - compress({ options: operationDescription }, messageToBeCompressed, (err, compressedMessage) => { - if (err) { - operationDescription.cb(err, null); - return; - } - - // Create the msgHeader of OP_COMPRESSED - const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); - msgHeader.writeInt32LE( - MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, - 0 - ); // messageLength - msgHeader.writeInt32LE(command.requestId, 4); // requestID - msgHeader.writeInt32LE(0, 8); // responseTo (zero) - msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode - - // Create the compression details of OP_COMPRESSED - const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); - compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode - compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader - compressionDetails.writeUInt8(compressorIDs[operationDescription.agreedCompressor], 8); // compressorID - - this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); - }); - } -} - -// Return whether a command contains an uncompressible command term -// Will return true if command contains no uncompressible command terms -function canCompress(command) { - const commandDoc = command instanceof Msg ? command.command : command.query; - const commandName = Object.keys(commandDoc)[0]; - return !uncompressibleCommands.has(commandName); -} - -function processIncomingData(stream, callback) { - const buffer = stream[kBuffer]; - if (buffer.length < 4) { - callback(); - return; - } - - const sizeOfMessage = buffer.readInt32LE(0); - if (sizeOfMessage < 0) { - callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); - return; - } - - if (sizeOfMessage > stream.maxBsonMessageSize) { - callback( - new MongoParseError( - `Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}` - ) - ); - return; - } - - if (sizeOfMessage > buffer.length) { - callback(); - return; - } - - const message = buffer.slice(0, sizeOfMessage); - buffer.consume(sizeOfMessage); - - const messageHeader = { - length: message.readInt32LE(0), - requestId: message.readInt32LE(4), - responseTo: message.readInt32LE(8), - opCode: message.readInt32LE(12) - }; - - let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; - const responseOptions = stream.responseOptions; - if (messageHeader.opCode !== OP_COMPRESSED) { - const messageBody = message.slice(MESSAGE_HEADER_SIZE); - stream.emit( - 'message', - new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) - ); - - if (buffer.length >= 4) { - processIncomingData(stream, callback); - } else { - callback(); - } - - return; - } - - messageHeader.fromCompressed = true; - messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE); - messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4); - const compressorID = message[MESSAGE_HEADER_SIZE + 8]; - const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9); - - // recalculate based on wrapped opcode - ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; - - decompress(compressorID, compressedBuffer, (err, messageBody) => { - if (err) { - callback(err); - return; - } - - if (messageBody.length !== messageHeader.length) { - callback( - new MongoError( - 'Decompressing a compressed message from the server failed. The message is corrupt.' - ) - ); - - return; - } - - stream.emit( - 'message', - new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) - ); - - if (buffer.length >= 4) { - processIncomingData(stream, callback); - } else { - callback(); - } - }); -} - -module.exports = MessageStream; diff --git a/node_modules/mongodb/lib/cmap/stream_description.js b/node_modules/mongodb/lib/cmap/stream_description.js deleted file mode 100644 index e806a5f..0000000 --- a/node_modules/mongodb/lib/cmap/stream_description.js +++ /dev/null @@ -1,45 +0,0 @@ -'use strict'; -const parseServerType = require('../core/sdam/server_description').parseServerType; - -const RESPONSE_FIELDS = [ - 'minWireVersion', - 'maxWireVersion', - 'maxBsonObjectSize', - 'maxMessageSizeBytes', - 'maxWriteBatchSize', - '__nodejs_mock_server__' -]; - -class StreamDescription { - constructor(address, options) { - this.address = address; - this.type = parseServerType(null); - this.minWireVersion = undefined; - this.maxWireVersion = undefined; - this.maxBsonObjectSize = 16777216; - this.maxMessageSizeBytes = 48000000; - this.maxWriteBatchSize = 100000; - this.compressors = - options && options.compression && Array.isArray(options.compression.compressors) - ? options.compression.compressors - : []; - } - - receiveResponse(response) { - this.type = parseServerType(response); - - RESPONSE_FIELDS.forEach(field => { - if (typeof response[field] !== 'undefined') { - this[field] = response[field]; - } - }); - - if (response.compression) { - this.compressor = this.compressors.filter(c => response.compression.indexOf(c) !== -1)[0]; - } - } -} - -module.exports = { - StreamDescription -}; -- cgit v1.2.3