diff options
author | 2020-11-16 00:10:28 +0100 | |
---|---|---|
committer | 2020-11-16 00:10:28 +0100 | |
commit | e06ec920f7a5d784e674c4c4b4e6d1da3dc7391d (patch) | |
tree | 55713f725f77b44ebfec86e4eec3ce33e71458ca /node_modules/mongodb/lib/topologies | |
download | website_creator-e06ec920f7a5d784e674c4c4b4e6d1da3dc7391d.tar.gz website_creator-e06ec920f7a5d784e674c4c4b4e6d1da3dc7391d.tar.bz2 website_creator-e06ec920f7a5d784e674c4c4b4e6d1da3dc7391d.zip |
api, login, auth
Diffstat (limited to 'node_modules/mongodb/lib/topologies')
-rw-r--r-- | node_modules/mongodb/lib/topologies/mongos.js | 445 | ||||
-rw-r--r-- | node_modules/mongodb/lib/topologies/native_topology.js | 78 | ||||
-rw-r--r-- | node_modules/mongodb/lib/topologies/replset.js | 489 | ||||
-rw-r--r-- | node_modules/mongodb/lib/topologies/server.js | 448 | ||||
-rw-r--r-- | node_modules/mongodb/lib/topologies/topology_base.js | 417 |
5 files changed, 1877 insertions, 0 deletions
diff --git a/node_modules/mongodb/lib/topologies/mongos.js b/node_modules/mongodb/lib/topologies/mongos.js new file mode 100644 index 0000000..bf30d20 --- /dev/null +++ b/node_modules/mongodb/lib/topologies/mongos.js @@ -0,0 +1,445 @@ +'use strict'; + +const TopologyBase = require('./topology_base').TopologyBase; +const MongoError = require('../core').MongoError; +const CMongos = require('../core').Mongos; +const Cursor = require('../cursor'); +const Server = require('./server'); +const Store = require('./topology_base').Store; +const MAX_JS_INT = require('../utils').MAX_JS_INT; +const translateOptions = require('../utils').translateOptions; +const filterOptions = require('../utils').filterOptions; +const mergeOptions = require('../utils').mergeOptions; + +/** + * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is + * used to construct connections. + * + * **Mongos Should not be used, use MongoClient.connect** + */ + +// Allowed parameters +var legalOptionNames = [ + 'ha', + 'haInterval', + 'acceptableLatencyMS', + 'poolSize', + 'ssl', + 'checkServerIdentity', + 'sslValidate', + 'sslCA', + 'sslCRL', + 'sslCert', + 'ciphers', + 'ecdhCurve', + 'sslKey', + 'sslPass', + 'socketOptions', + 'bufferMaxEntries', + 'store', + 'auto_reconnect', + 'autoReconnect', + 'emitError', + 'keepAlive', + 'keepAliveInitialDelay', + 'noDelay', + 'connectTimeoutMS', + 'socketTimeoutMS', + 'loggerLevel', + 'logger', + 'reconnectTries', + 'appname', + 'domainsEnabled', + 'servername', + 'promoteLongs', + 'promoteValues', + 'promoteBuffers', + 'promiseLibrary', + 'monitorCommands' +]; + +/** + * Creates a new Mongos instance + * @class + * @deprecated + * @param {Server[]} servers A seedlist of servers participating in the replicaset. + * @param {object} [options] Optional settings. + * @param {booelan} [options.ha=true] Turn on high availability monitoring. + * @param {number} [options.haInterval=5000] Time between each replicaset status check. + * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons. + * @param {number} [options.acceptableLatencyMS=15] Cutoff latency point in MS for MongoS proxy selection + * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support) + * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. + * @param {boolean} [options.sslValidate=false] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {array} [options.sslCA] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {array} [options.sslCRL] Array of revocation certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {string} [options.ciphers] Passed directly through to tls.createSecureContext. See https://nodejs.org/dist/latest-v9.x/docs/api/tls.html#tls_tls_createsecurecontext_options for more info. + * @param {string} [options.ecdhCurve] Passed directly through to tls.createSecureContext. See https://nodejs.org/dist/latest-v9.x/docs/api/tls.html#tls_tls_createsecurecontext_options for more info. + * @param {(Buffer|string)} [options.sslCert] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {(Buffer|string)} [options.sslKey] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {(Buffer|string)} [options.sslPass] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {string} [options.servername] String containing the server name requested via TLS SNI. + * @param {object} [options.socketOptions] Socket options + * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option. + * @param {boolean} [options.socketOptions.keepAlive=true] TCP Connection keep alive enabled + * @param {number} [options.socketOptions.keepAliveInitialDelay=120000] The number of milliseconds to wait before initiating keepAlive on the TCP socket + * @param {number} [options.socketOptions.connectTimeoutMS=10000] How long to wait for a connection to be established before timing out + * @param {number} [options.socketOptions.socketTimeoutMS=0] How long a send or receive on a socket can take before timing out + * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit. + * @param {boolean} [options.monitorCommands=false] Enable command monitoring for this topology + * @fires Mongos#connect + * @fires Mongos#ha + * @fires Mongos#joined + * @fires Mongos#left + * @fires Mongos#fullsetup + * @fires Mongos#open + * @fires Mongos#close + * @fires Mongos#error + * @fires Mongos#timeout + * @fires Mongos#parseError + * @fires Mongos#commandStarted + * @fires Mongos#commandSucceeded + * @fires Mongos#commandFailed + * @property {string} parserType the parser type used (c++ or js). + * @return {Mongos} a Mongos instance. + */ +class Mongos extends TopologyBase { + constructor(servers, options) { + super(); + + options = options || {}; + var self = this; + + // Filter the options + options = filterOptions(options, legalOptionNames); + + // Ensure all the instances are Server + for (var i = 0; i < servers.length; i++) { + if (!(servers[i] instanceof Server)) { + throw MongoError.create({ + message: 'all seed list instances must be of the Server type', + driver: true + }); + } + } + + // Stored options + var storeOptions = { + force: false, + bufferMaxEntries: + typeof options.bufferMaxEntries === 'number' ? options.bufferMaxEntries : MAX_JS_INT + }; + + // Shared global store + var store = options.store || new Store(self, storeOptions); + + // Build seed list + var seedlist = servers.map(function(x) { + return { host: x.host, port: x.port }; + }); + + // Get the reconnect option + var reconnect = typeof options.auto_reconnect === 'boolean' ? options.auto_reconnect : true; + reconnect = typeof options.autoReconnect === 'boolean' ? options.autoReconnect : reconnect; + + // Clone options + var clonedOptions = mergeOptions( + {}, + { + disconnectHandler: store, + cursorFactory: Cursor, + reconnect: reconnect, + emitError: typeof options.emitError === 'boolean' ? options.emitError : true, + size: typeof options.poolSize === 'number' ? options.poolSize : 5, + monitorCommands: + typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false + } + ); + + // Translate any SSL options and other connectivity options + clonedOptions = translateOptions(clonedOptions, options); + + // Socket options + var socketOptions = + options.socketOptions && Object.keys(options.socketOptions).length > 0 + ? options.socketOptions + : options; + + // Translate all the options to the core types + clonedOptions = translateOptions(clonedOptions, socketOptions); + + // Internal state + this.s = { + // Create the Mongos + coreTopology: new CMongos(seedlist, clonedOptions), + // Server capabilities + sCapabilities: null, + // Debug turned on + debug: clonedOptions.debug, + // Store option defaults + storeOptions: storeOptions, + // Cloned options + clonedOptions: clonedOptions, + // Actual store of callbacks + store: store, + // Options + options: options, + // Server Session Pool + sessionPool: null, + // Active client sessions + sessions: new Set(), + // Promise library + promiseLibrary: options.promiseLibrary || Promise + }; + } + + // Connect + connect(_options, callback) { + var self = this; + if ('function' === typeof _options) (callback = _options), (_options = {}); + if (_options == null) _options = {}; + if (!('function' === typeof callback)) callback = null; + _options = Object.assign({}, this.s.clonedOptions, _options); + self.s.options = _options; + + // Update bufferMaxEntries + self.s.storeOptions.bufferMaxEntries = + typeof _options.bufferMaxEntries === 'number' ? _options.bufferMaxEntries : -1; + + // Error handler + var connectErrorHandler = function() { + return function(err) { + // Remove all event handlers + var events = ['timeout', 'error', 'close']; + events.forEach(function(e) { + self.removeListener(e, connectErrorHandler); + }); + + self.s.coreTopology.removeListener('connect', connectErrorHandler); + // Force close the topology + self.close(true); + + // Try to callback + try { + callback(err); + } catch (err) { + process.nextTick(function() { + throw err; + }); + } + }; + }; + + // Actual handler + var errorHandler = function(event) { + return function(err) { + if (event !== 'error') { + self.emit(event, err); + } + }; + }; + + // Error handler + var reconnectHandler = function() { + self.emit('reconnect'); + self.s.store.execute(); + }; + + // relay the event + var relay = function(event) { + return function(t, server) { + self.emit(event, t, server); + }; + }; + + // Connect handler + var connectHandler = function() { + // Clear out all the current handlers left over + var events = ['timeout', 'error', 'close', 'fullsetup']; + events.forEach(function(e) { + self.s.coreTopology.removeAllListeners(e); + }); + + // Set up listeners + self.s.coreTopology.on('timeout', errorHandler('timeout')); + self.s.coreTopology.on('error', errorHandler('error')); + self.s.coreTopology.on('close', errorHandler('close')); + + // Set up serverConfig listeners + self.s.coreTopology.on('fullsetup', function() { + self.emit('fullsetup', self); + }); + + // Emit open event + self.emit('open', null, self); + + // Return correctly + try { + callback(null, self); + } catch (err) { + process.nextTick(function() { + throw err; + }); + } + }; + + // Clear out all the current handlers left over + var events = [ + 'timeout', + 'error', + 'close', + 'serverOpening', + 'serverDescriptionChanged', + 'serverHeartbeatStarted', + 'serverHeartbeatSucceeded', + 'serverHeartbeatFailed', + 'serverClosed', + 'topologyOpening', + 'topologyClosed', + 'topologyDescriptionChanged', + 'commandStarted', + 'commandSucceeded', + 'commandFailed' + ]; + events.forEach(function(e) { + self.s.coreTopology.removeAllListeners(e); + }); + + // Set up SDAM listeners + self.s.coreTopology.on('serverDescriptionChanged', relay('serverDescriptionChanged')); + self.s.coreTopology.on('serverHeartbeatStarted', relay('serverHeartbeatStarted')); + self.s.coreTopology.on('serverHeartbeatSucceeded', relay('serverHeartbeatSucceeded')); + self.s.coreTopology.on('serverHeartbeatFailed', relay('serverHeartbeatFailed')); + self.s.coreTopology.on('serverOpening', relay('serverOpening')); + self.s.coreTopology.on('serverClosed', relay('serverClosed')); + self.s.coreTopology.on('topologyOpening', relay('topologyOpening')); + self.s.coreTopology.on('topologyClosed', relay('topologyClosed')); + self.s.coreTopology.on('topologyDescriptionChanged', relay('topologyDescriptionChanged')); + self.s.coreTopology.on('commandStarted', relay('commandStarted')); + self.s.coreTopology.on('commandSucceeded', relay('commandSucceeded')); + self.s.coreTopology.on('commandFailed', relay('commandFailed')); + + // Set up listeners + self.s.coreTopology.once('timeout', connectErrorHandler('timeout')); + self.s.coreTopology.once('error', connectErrorHandler('error')); + self.s.coreTopology.once('close', connectErrorHandler('close')); + self.s.coreTopology.once('connect', connectHandler); + // Join and leave events + self.s.coreTopology.on('joined', relay('joined')); + self.s.coreTopology.on('left', relay('left')); + + // Reconnect server + self.s.coreTopology.on('reconnect', reconnectHandler); + + // Start connection + self.s.coreTopology.connect(_options); + } +} + +Object.defineProperty(Mongos.prototype, 'haInterval', { + enumerable: true, + get: function() { + return this.s.coreTopology.s.haInterval; + } +}); + +/** + * A mongos connect event, used to verify that the connection is up and running + * + * @event Mongos#connect + * @type {Mongos} + */ + +/** + * The mongos high availability event + * + * @event Mongos#ha + * @type {function} + * @param {string} type The stage in the high availability event (start|end) + * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only + * @param {number} data.id The id for this high availability request + * @param {object} data.state An object containing the information about the current replicaset + */ + +/** + * A server member left the mongos set + * + * @event Mongos#left + * @type {function} + * @param {string} type The type of member that left (primary|secondary|arbiter) + * @param {Server} server The server object that left + */ + +/** + * A server member joined the mongos set + * + * @event Mongos#joined + * @type {function} + * @param {string} type The type of member that joined (primary|secondary|arbiter) + * @param {Server} server The server object that joined + */ + +/** + * Mongos fullsetup event, emitted when all proxies in the topology have been connected to. + * + * @event Mongos#fullsetup + * @type {Mongos} + */ + +/** + * Mongos open event, emitted when mongos can start processing commands. + * + * @event Mongos#open + * @type {Mongos} + */ + +/** + * Mongos close event + * + * @event Mongos#close + * @type {object} + */ + +/** + * Mongos error event, emitted if there is an error listener. + * + * @event Mongos#error + * @type {MongoError} + */ + +/** + * Mongos timeout event + * + * @event Mongos#timeout + * @type {object} + */ + +/** + * Mongos parseError event + * + * @event Mongos#parseError + * @type {object} + */ + +/** + * An event emitted indicating a command was started, if command monitoring is enabled + * + * @event Mongos#commandStarted + * @type {object} + */ + +/** + * An event emitted indicating a command succeeded, if command monitoring is enabled + * + * @event Mongos#commandSucceeded + * @type {object} + */ + +/** + * An event emitted indicating a command failed, if command monitoring is enabled + * + * @event Mongos#commandFailed + * @type {object} + */ + +module.exports = Mongos; diff --git a/node_modules/mongodb/lib/topologies/native_topology.js b/node_modules/mongodb/lib/topologies/native_topology.js new file mode 100644 index 0000000..cb7d91d --- /dev/null +++ b/node_modules/mongodb/lib/topologies/native_topology.js @@ -0,0 +1,78 @@ +'use strict'; + +const Topology = require('../core').Topology; +const ServerCapabilities = require('./topology_base').ServerCapabilities; +const Cursor = require('../cursor'); +const translateOptions = require('../utils').translateOptions; + +class NativeTopology extends Topology { + constructor(servers, options) { + options = options || {}; + + let clonedOptions = Object.assign( + {}, + { + cursorFactory: Cursor, + reconnect: false, + emitError: typeof options.emitError === 'boolean' ? options.emitError : true, + maxPoolSize: + typeof options.maxPoolSize === 'number' + ? options.maxPoolSize + : typeof options.poolSize === 'number' + ? options.poolSize + : 10, + minPoolSize: + typeof options.minPoolSize === 'number' + ? options.minPoolSize + : typeof options.minSize === 'number' + ? options.minSize + : 0, + monitorCommands: + typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false + } + ); + + // Translate any SSL options and other connectivity options + clonedOptions = translateOptions(clonedOptions, options); + + // Socket options + var socketOptions = + options.socketOptions && Object.keys(options.socketOptions).length > 0 + ? options.socketOptions + : options; + + // Translate all the options to the core types + clonedOptions = translateOptions(clonedOptions, socketOptions); + + super(servers, clonedOptions); + } + + capabilities() { + if (this.s.sCapabilities) return this.s.sCapabilities; + if (this.lastIsMaster() == null) return null; + this.s.sCapabilities = new ServerCapabilities(this.lastIsMaster()); + return this.s.sCapabilities; + } + + // Command + command(ns, cmd, options, callback) { + super.command(ns.toString(), cmd, options, callback); + } + + // Insert + insert(ns, ops, options, callback) { + super.insert(ns.toString(), ops, options, callback); + } + + // Update + update(ns, ops, options, callback) { + super.update(ns.toString(), ops, options, callback); + } + + // Remove + remove(ns, ops, options, callback) { + super.remove(ns.toString(), ops, options, callback); + } +} + +module.exports = NativeTopology; diff --git a/node_modules/mongodb/lib/topologies/replset.js b/node_modules/mongodb/lib/topologies/replset.js new file mode 100644 index 0000000..80701f5 --- /dev/null +++ b/node_modules/mongodb/lib/topologies/replset.js @@ -0,0 +1,489 @@ +'use strict'; + +const Server = require('./server'); +const Cursor = require('../cursor'); +const MongoError = require('../core').MongoError; +const TopologyBase = require('./topology_base').TopologyBase; +const Store = require('./topology_base').Store; +const CReplSet = require('../core').ReplSet; +const MAX_JS_INT = require('../utils').MAX_JS_INT; +const translateOptions = require('../utils').translateOptions; +const filterOptions = require('../utils').filterOptions; +const mergeOptions = require('../utils').mergeOptions; + +/** + * @fileOverview The **ReplSet** class is a class that represents a Replicaset topology and is + * used to construct connections. + * + * **ReplSet Should not be used, use MongoClient.connect** + */ + +// Allowed parameters +var legalOptionNames = [ + 'ha', + 'haInterval', + 'replicaSet', + 'rs_name', + 'secondaryAcceptableLatencyMS', + 'connectWithNoPrimary', + 'poolSize', + 'ssl', + 'checkServerIdentity', + 'sslValidate', + 'sslCA', + 'sslCert', + 'ciphers', + 'ecdhCurve', + 'sslCRL', + 'sslKey', + 'sslPass', + 'socketOptions', + 'bufferMaxEntries', + 'store', + 'auto_reconnect', + 'autoReconnect', + 'emitError', + 'keepAlive', + 'keepAliveInitialDelay', + 'noDelay', + 'connectTimeoutMS', + 'socketTimeoutMS', + 'strategy', + 'debug', + 'family', + 'loggerLevel', + 'logger', + 'reconnectTries', + 'appname', + 'domainsEnabled', + 'servername', + 'promoteLongs', + 'promoteValues', + 'promoteBuffers', + 'maxStalenessSeconds', + 'promiseLibrary', + 'minSize', + 'monitorCommands' +]; + +/** + * Creates a new ReplSet instance + * @class + * @deprecated + * @param {Server[]} servers A seedlist of servers participating in the replicaset. + * @param {object} [options] Optional settings. + * @param {boolean} [options.ha=true] Turn on high availability monitoring. + * @param {number} [options.haInterval=10000] Time between each replicaset status check. + * @param {string} [options.replicaSet] The name of the replicaset to connect to. + * @param {number} [options.secondaryAcceptableLatencyMS=15] Sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms) + * @param {boolean} [options.connectWithNoPrimary=false] Sets if the driver should connect even if no primary is available + * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons. + * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support) + * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. + * @param {boolean} [options.sslValidate=false] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {array} [options.sslCA] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {array} [options.sslCRL] Array of revocation certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {(Buffer|string)} [options.sslCert] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher. + * @param {string} [options.ciphers] Passed directly through to tls.createSecureContext. See https://nodejs.org/dist/latest-v9.x/docs/api/tls.html#tls_tls_createsecurecontext_options for more info. + * @param {string} [options.ecdhCurve] Passed directly through to tls.createSecureContext. See https://nodejs.org/dist/latest-v9.x/docs/api/tls.html#tls_tls_createsecurecontext_options for more info. + * @param {(Buffer|string)} [options.sslKey] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {(Buffer|string)} [options.sslPass] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {string} [options.servername] String containing the server name requested via TLS SNI. + * @param {object} [options.socketOptions] Socket options + * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option. + * @param {boolean} [options.socketOptions.keepAlive=true] TCP Connection keep alive enabled + * @param {number} [options.socketOptions.keepAliveInitialDelay=120000] The number of milliseconds to wait before initiating keepAlive on the TCP socket + * @param {number} [options.socketOptions.connectTimeoutMS=10000] How long to wait for a connection to be established before timing out + * @param {number} [options.socketOptions.socketTimeoutMS=360000] How long a send or receive on a socket can take before timing out + * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit. + * @param {number} [options.maxStalenessSeconds=undefined] The max staleness to secondary reads (values under 10 seconds cannot be guaranteed); + * @param {boolean} [options.monitorCommands=false] Enable command monitoring for this topology + * @fires ReplSet#connect + * @fires ReplSet#ha + * @fires ReplSet#joined + * @fires ReplSet#left + * @fires ReplSet#fullsetup + * @fires ReplSet#open + * @fires ReplSet#close + * @fires ReplSet#error + * @fires ReplSet#timeout + * @fires ReplSet#parseError + * @fires ReplSet#commandStarted + * @fires ReplSet#commandSucceeded + * @fires ReplSet#commandFailed + * @property {string} parserType the parser type used (c++ or js). + * @return {ReplSet} a ReplSet instance. + */ +class ReplSet extends TopologyBase { + constructor(servers, options) { + super(); + + options = options || {}; + var self = this; + + // Filter the options + options = filterOptions(options, legalOptionNames); + + // Ensure all the instances are Server + for (var i = 0; i < servers.length; i++) { + if (!(servers[i] instanceof Server)) { + throw MongoError.create({ + message: 'all seed list instances must be of the Server type', + driver: true + }); + } + } + + // Stored options + var storeOptions = { + force: false, + bufferMaxEntries: + typeof options.bufferMaxEntries === 'number' ? options.bufferMaxEntries : MAX_JS_INT + }; + + // Shared global store + var store = options.store || new Store(self, storeOptions); + + // Build seed list + var seedlist = servers.map(function(x) { + return { host: x.host, port: x.port }; + }); + + // Clone options + var clonedOptions = mergeOptions( + {}, + { + disconnectHandler: store, + cursorFactory: Cursor, + reconnect: false, + emitError: typeof options.emitError === 'boolean' ? options.emitError : true, + size: typeof options.poolSize === 'number' ? options.poolSize : 5, + monitorCommands: + typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false + } + ); + + // Translate any SSL options and other connectivity options + clonedOptions = translateOptions(clonedOptions, options); + + // Socket options + var socketOptions = + options.socketOptions && Object.keys(options.socketOptions).length > 0 + ? options.socketOptions + : options; + + // Translate all the options to the core types + clonedOptions = translateOptions(clonedOptions, socketOptions); + + // Create the ReplSet + var coreTopology = new CReplSet(seedlist, clonedOptions); + + // Listen to reconnect event + coreTopology.on('reconnect', function() { + self.emit('reconnect'); + store.execute(); + }); + + // Internal state + this.s = { + // Replicaset + coreTopology: coreTopology, + // Server capabilities + sCapabilities: null, + // Debug tag + tag: options.tag, + // Store options + storeOptions: storeOptions, + // Cloned options + clonedOptions: clonedOptions, + // Store + store: store, + // Options + options: options, + // Server Session Pool + sessionPool: null, + // Active client sessions + sessions: new Set(), + // Promise library + promiseLibrary: options.promiseLibrary || Promise + }; + + // Debug + if (clonedOptions.debug) { + // Last ismaster + Object.defineProperty(this, 'replset', { + enumerable: true, + get: function() { + return coreTopology; + } + }); + } + } + + // Connect method + connect(_options, callback) { + var self = this; + if ('function' === typeof _options) (callback = _options), (_options = {}); + if (_options == null) _options = {}; + if (!('function' === typeof callback)) callback = null; + _options = Object.assign({}, this.s.clonedOptions, _options); + self.s.options = _options; + + // Update bufferMaxEntries + self.s.storeOptions.bufferMaxEntries = + typeof _options.bufferMaxEntries === 'number' ? _options.bufferMaxEntries : -1; + + // Actual handler + var errorHandler = function(event) { + return function(err) { + if (event !== 'error') { + self.emit(event, err); + } + }; + }; + + // Clear out all the current handlers left over + var events = [ + 'timeout', + 'error', + 'close', + 'serverOpening', + 'serverDescriptionChanged', + 'serverHeartbeatStarted', + 'serverHeartbeatSucceeded', + 'serverHeartbeatFailed', + 'serverClosed', + 'topologyOpening', + 'topologyClosed', + 'topologyDescriptionChanged', + 'commandStarted', + 'commandSucceeded', + 'commandFailed', + 'joined', + 'left', + 'ping', + 'ha' + ]; + events.forEach(function(e) { + self.s.coreTopology.removeAllListeners(e); + }); + + // relay the event + var relay = function(event) { + return function(t, server) { + self.emit(event, t, server); + }; + }; + + // Replset events relay + var replsetRelay = function(event) { + return function(t, server) { + self.emit(event, t, server.lastIsMaster(), server); + }; + }; + + // Relay ha + var relayHa = function(t, state) { + self.emit('ha', t, state); + + if (t === 'start') { + self.emit('ha_connect', t, state); + } else if (t === 'end') { + self.emit('ha_ismaster', t, state); + } + }; + + // Set up serverConfig listeners + self.s.coreTopology.on('joined', replsetRelay('joined')); + self.s.coreTopology.on('left', relay('left')); + self.s.coreTopology.on('ping', relay('ping')); + self.s.coreTopology.on('ha', relayHa); + + // Set up SDAM listeners + self.s.coreTopology.on('serverDescriptionChanged', relay('serverDescriptionChanged')); + self.s.coreTopology.on('serverHeartbeatStarted', relay('serverHeartbeatStarted')); + self.s.coreTopology.on('serverHeartbeatSucceeded', relay('serverHeartbeatSucceeded')); + self.s.coreTopology.on('serverHeartbeatFailed', relay('serverHeartbeatFailed')); + self.s.coreTopology.on('serverOpening', relay('serverOpening')); + self.s.coreTopology.on('serverClosed', relay('serverClosed')); + self.s.coreTopology.on('topologyOpening', relay('topologyOpening')); + self.s.coreTopology.on('topologyClosed', relay('topologyClosed')); + self.s.coreTopology.on('topologyDescriptionChanged', relay('topologyDescriptionChanged')); + self.s.coreTopology.on('commandStarted', relay('commandStarted')); + self.s.coreTopology.on('commandSucceeded', relay('commandSucceeded')); + self.s.coreTopology.on('commandFailed', relay('commandFailed')); + + self.s.coreTopology.on('fullsetup', function() { + self.emit('fullsetup', self, self); + }); + + self.s.coreTopology.on('all', function() { + self.emit('all', null, self); + }); + + // Connect handler + var connectHandler = function() { + // Set up listeners + self.s.coreTopology.once('timeout', errorHandler('timeout')); + self.s.coreTopology.once('error', errorHandler('error')); + self.s.coreTopology.once('close', errorHandler('close')); + + // Emit open event + self.emit('open', null, self); + + // Return correctly + try { + callback(null, self); + } catch (err) { + process.nextTick(function() { + throw err; + }); + } + }; + + // Error handler + var connectErrorHandler = function() { + return function(err) { + ['timeout', 'error', 'close'].forEach(function(e) { + self.s.coreTopology.removeListener(e, connectErrorHandler); + }); + + self.s.coreTopology.removeListener('connect', connectErrorHandler); + // Destroy the replset + self.s.coreTopology.destroy(); + + // Try to callback + try { + callback(err); + } catch (err) { + if (!self.s.coreTopology.isConnected()) + process.nextTick(function() { + throw err; + }); + } + }; + }; + + // Set up listeners + self.s.coreTopology.once('timeout', connectErrorHandler('timeout')); + self.s.coreTopology.once('error', connectErrorHandler('error')); + self.s.coreTopology.once('close', connectErrorHandler('close')); + self.s.coreTopology.once('connect', connectHandler); + + // Start connection + self.s.coreTopology.connect(_options); + } + + close(forceClosed, callback) { + ['timeout', 'error', 'close', 'joined', 'left'].forEach(e => this.removeAllListeners(e)); + super.close(forceClosed, callback); + } +} + +Object.defineProperty(ReplSet.prototype, 'haInterval', { + enumerable: true, + get: function() { + return this.s.coreTopology.s.haInterval; + } +}); + +/** + * A replset connect event, used to verify that the connection is up and running + * + * @event ReplSet#connect + * @type {ReplSet} + */ + +/** + * The replset high availability event + * + * @event ReplSet#ha + * @type {function} + * @param {string} type The stage in the high availability event (start|end) + * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only + * @param {number} data.id The id for this high availability request + * @param {object} data.state An object containing the information about the current replicaset + */ + +/** + * A server member left the replicaset + * + * @event ReplSet#left + * @type {function} + * @param {string} type The type of member that left (primary|secondary|arbiter) + * @param {Server} server The server object that left + */ + +/** + * A server member joined the replicaset + * + * @event ReplSet#joined + * @type {function} + * @param {string} type The type of member that joined (primary|secondary|arbiter) + * @param {Server} server The server object that joined + */ + +/** + * ReplSet open event, emitted when replicaset can start processing commands. + * + * @event ReplSet#open + * @type {Replset} + */ + +/** + * ReplSet fullsetup event, emitted when all servers in the topology have been connected to. + * + * @event ReplSet#fullsetup + * @type {Replset} + */ + +/** + * ReplSet close event + * + * @event ReplSet#close + * @type {object} + */ + +/** + * ReplSet error event, emitted if there is an error listener. + * + * @event ReplSet#error + * @type {MongoError} + */ + +/** + * ReplSet timeout event + * + * @event ReplSet#timeout + * @type {object} + */ + +/** + * ReplSet parseError event + * + * @event ReplSet#parseError + * @type {object} + */ + +/** + * An event emitted indicating a command was started, if command monitoring is enabled + * + * @event ReplSet#commandStarted + * @type {object} + */ + +/** + * An event emitted indicating a command succeeded, if command monitoring is enabled + * + * @event ReplSet#commandSucceeded + * @type {object} + */ + +/** + * An event emitted indicating a command failed, if command monitoring is enabled + * + * @event ReplSet#commandFailed + * @type {object} + */ + +module.exports = ReplSet; diff --git a/node_modules/mongodb/lib/topologies/server.js b/node_modules/mongodb/lib/topologies/server.js new file mode 100644 index 0000000..0abaad3 --- /dev/null +++ b/node_modules/mongodb/lib/topologies/server.js @@ -0,0 +1,448 @@ +'use strict'; + +const CServer = require('../core').Server; +const Cursor = require('../cursor'); +const TopologyBase = require('./topology_base').TopologyBase; +const Store = require('./topology_base').Store; +const MongoError = require('../core').MongoError; +const MAX_JS_INT = require('../utils').MAX_JS_INT; +const translateOptions = require('../utils').translateOptions; +const filterOptions = require('../utils').filterOptions; +const mergeOptions = require('../utils').mergeOptions; + +/** + * @fileOverview The **Server** class is a class that represents a single server topology and is + * used to construct connections. + * + * **Server Should not be used, use MongoClient.connect** + */ + +// Allowed parameters +var legalOptionNames = [ + 'ha', + 'haInterval', + 'acceptableLatencyMS', + 'poolSize', + 'ssl', + 'checkServerIdentity', + 'sslValidate', + 'sslCA', + 'sslCRL', + 'sslCert', + 'ciphers', + 'ecdhCurve', + 'sslKey', + 'sslPass', + 'socketOptions', + 'bufferMaxEntries', + 'store', + 'auto_reconnect', + 'autoReconnect', + 'emitError', + 'keepAlive', + 'keepAliveInitialDelay', + 'noDelay', + 'connectTimeoutMS', + 'socketTimeoutMS', + 'family', + 'loggerLevel', + 'logger', + 'reconnectTries', + 'reconnectInterval', + 'monitoring', + 'appname', + 'domainsEnabled', + 'servername', + 'promoteLongs', + 'promoteValues', + 'promoteBuffers', + 'compression', + 'promiseLibrary', + 'monitorCommands' +]; + +/** + * Creates a new Server instance + * @class + * @deprecated + * @param {string} host The host for the server, can be either an IP4, IP6 or domain socket style host. + * @param {number} [port] The server port if IP4. + * @param {object} [options] Optional settings. + * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons. + * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support) + * @param {boolean} [options.sslValidate=false] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function. + * @param {array} [options.sslCA] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {array} [options.sslCRL] Array of revocation certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {(Buffer|string)} [options.sslCert] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {string} [options.ciphers] Passed directly through to tls.createSecureContext. See https://nodejs.org/dist/latest-v9.x/docs/api/tls.html#tls_tls_createsecurecontext_options for more info. + * @param {string} [options.ecdhCurve] Passed directly through to tls.createSecureContext. See https://nodejs.org/dist/latest-v9.x/docs/api/tls.html#tls_tls_createsecurecontext_options for more info. + * @param {(Buffer|string)} [options.sslKey] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {(Buffer|string)} [options.sslPass] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher) + * @param {string} [options.servername] String containing the server name requested via TLS SNI. + * @param {object} [options.socketOptions] Socket options + * @param {boolean} [options.socketOptions.autoReconnect=true] Reconnect on error. + * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option. + * @param {boolean} [options.socketOptions.keepAlive=true] TCP Connection keep alive enabled + * @param {number} [options.socketOptions.keepAliveInitialDelay=120000] The number of milliseconds to wait before initiating keepAlive on the TCP socket + * @param {number} [options.socketOptions.connectTimeoutMS=10000] How long to wait for a connection to be established before timing out + * @param {number} [options.socketOptions.socketTimeoutMS=0] How long a send or receive on a socket can take before timing out + * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times + * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries + * @param {boolean} [options.monitoring=true] Triggers the server instance to call ismaster + * @param {number} [options.haInterval=10000] The interval of calling ismaster when monitoring is enabled. + * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit. + * @param {boolean} [options.monitorCommands=false] Enable command monitoring for this topology + * @fires Server#connect + * @fires Server#close + * @fires Server#error + * @fires Server#timeout + * @fires Server#parseError + * @fires Server#reconnect + * @fires Server#commandStarted + * @fires Server#commandSucceeded + * @fires Server#commandFailed + * @property {string} parserType the parser type used (c++ or js). + * @return {Server} a Server instance. + */ +class Server extends TopologyBase { + constructor(host, port, options) { + super(); + var self = this; + + // Filter the options + options = filterOptions(options, legalOptionNames); + + // Promise library + const promiseLibrary = options.promiseLibrary; + + // Stored options + var storeOptions = { + force: false, + bufferMaxEntries: + typeof options.bufferMaxEntries === 'number' ? options.bufferMaxEntries : MAX_JS_INT + }; + + // Shared global store + var store = options.store || new Store(self, storeOptions); + + // Detect if we have a socket connection + if (host.indexOf('/') !== -1) { + if (port != null && typeof port === 'object') { + options = port; + port = null; + } + } else if (port == null) { + throw MongoError.create({ message: 'port must be specified', driver: true }); + } + + // Get the reconnect option + var reconnect = typeof options.auto_reconnect === 'boolean' ? options.auto_reconnect : true; + reconnect = typeof options.autoReconnect === 'boolean' ? options.autoReconnect : reconnect; + + // Clone options + var clonedOptions = mergeOptions( + {}, + { + host: host, + port: port, + disconnectHandler: store, + cursorFactory: Cursor, + reconnect: reconnect, + emitError: typeof options.emitError === 'boolean' ? options.emitError : true, + size: typeof options.poolSize === 'number' ? options.poolSize : 5, + monitorCommands: + typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false + } + ); + + // Translate any SSL options and other connectivity options + clonedOptions = translateOptions(clonedOptions, options); + + // Socket options + var socketOptions = + options.socketOptions && Object.keys(options.socketOptions).length > 0 + ? options.socketOptions + : options; + + // Translate all the options to the core types + clonedOptions = translateOptions(clonedOptions, socketOptions); + + // Define the internal properties + this.s = { + // Create an instance of a server instance from core module + coreTopology: new CServer(clonedOptions), + // Server capabilities + sCapabilities: null, + // Cloned options + clonedOptions: clonedOptions, + // Reconnect + reconnect: clonedOptions.reconnect, + // Emit error + emitError: clonedOptions.emitError, + // Pool size + poolSize: clonedOptions.size, + // Store Options + storeOptions: storeOptions, + // Store + store: store, + // Host + host: host, + // Port + port: port, + // Options + options: options, + // Server Session Pool + sessionPool: null, + // Active client sessions + sessions: new Set(), + // Promise library + promiseLibrary: promiseLibrary || Promise + }; + } + + // Connect + connect(_options, callback) { + var self = this; + if ('function' === typeof _options) (callback = _options), (_options = {}); + if (_options == null) _options = this.s.clonedOptions; + if (!('function' === typeof callback)) callback = null; + _options = Object.assign({}, this.s.clonedOptions, _options); + self.s.options = _options; + + // Update bufferMaxEntries + self.s.storeOptions.bufferMaxEntries = + typeof _options.bufferMaxEntries === 'number' ? _options.bufferMaxEntries : -1; + + // Error handler + var connectErrorHandler = function() { + return function(err) { + // Remove all event handlers + var events = ['timeout', 'error', 'close']; + events.forEach(function(e) { + self.s.coreTopology.removeListener(e, connectHandlers[e]); + }); + + self.s.coreTopology.removeListener('connect', connectErrorHandler); + + // Try to callback + try { + callback(err); + } catch (err) { + process.nextTick(function() { + throw err; + }); + } + }; + }; + + // Actual handler + var errorHandler = function(event) { + return function(err) { + if (event !== 'error') { + self.emit(event, err); + } + }; + }; + + // Error handler + var reconnectHandler = function() { + self.emit('reconnect', self); + self.s.store.execute(); + }; + + // Reconnect failed + var reconnectFailedHandler = function(err) { + self.emit('reconnectFailed', err); + self.s.store.flush(err); + }; + + // Destroy called on topology, perform cleanup + var destroyHandler = function() { + self.s.store.flush(); + }; + + // relay the event + var relay = function(event) { + return function(t, server) { + self.emit(event, t, server); + }; + }; + + // Connect handler + var connectHandler = function() { + // Clear out all the current handlers left over + ['timeout', 'error', 'close', 'destroy'].forEach(function(e) { + self.s.coreTopology.removeAllListeners(e); + }); + + // Set up listeners + self.s.coreTopology.on('timeout', errorHandler('timeout')); + self.s.coreTopology.once('error', errorHandler('error')); + self.s.coreTopology.on('close', errorHandler('close')); + // Only called on destroy + self.s.coreTopology.on('destroy', destroyHandler); + + // Emit open event + self.emit('open', null, self); + + // Return correctly + try { + callback(null, self); + } catch (err) { + process.nextTick(function() { + throw err; + }); + } + }; + + // Set up listeners + var connectHandlers = { + timeout: connectErrorHandler('timeout'), + error: connectErrorHandler('error'), + close: connectErrorHandler('close') + }; + + // Clear out all the current handlers left over + [ + 'timeout', + 'error', + 'close', + 'serverOpening', + 'serverDescriptionChanged', + 'serverHeartbeatStarted', + 'serverHeartbeatSucceeded', + 'serverHeartbeatFailed', + 'serverClosed', + 'topologyOpening', + 'topologyClosed', + 'topologyDescriptionChanged', + 'commandStarted', + 'commandSucceeded', + 'commandFailed' + ].forEach(function(e) { + self.s.coreTopology.removeAllListeners(e); + }); + + // Add the event handlers + self.s.coreTopology.once('timeout', connectHandlers.timeout); + self.s.coreTopology.once('error', connectHandlers.error); + self.s.coreTopology.once('close', connectHandlers.close); + self.s.coreTopology.once('connect', connectHandler); + // Reconnect server + self.s.coreTopology.on('reconnect', reconnectHandler); + self.s.coreTopology.on('reconnectFailed', reconnectFailedHandler); + + // Set up SDAM listeners + self.s.coreTopology.on('serverDescriptionChanged', relay('serverDescriptionChanged')); + self.s.coreTopology.on('serverHeartbeatStarted', relay('serverHeartbeatStarted')); + self.s.coreTopology.on('serverHeartbeatSucceeded', relay('serverHeartbeatSucceeded')); + self.s.coreTopology.on('serverHeartbeatFailed', relay('serverHeartbeatFailed')); + self.s.coreTopology.on('serverOpening', relay('serverOpening')); + self.s.coreTopology.on('serverClosed', relay('serverClosed')); + self.s.coreTopology.on('topologyOpening', relay('topologyOpening')); + self.s.coreTopology.on('topologyClosed', relay('topologyClosed')); + self.s.coreTopology.on('topologyDescriptionChanged', relay('topologyDescriptionChanged')); + self.s.coreTopology.on('commandStarted', relay('commandStarted')); + self.s.coreTopology.on('commandSucceeded', relay('commandSucceeded')); + self.s.coreTopology.on('commandFailed', relay('commandFailed')); + self.s.coreTopology.on('attemptReconnect', relay('attemptReconnect')); + self.s.coreTopology.on('monitoring', relay('monitoring')); + + // Start connection + self.s.coreTopology.connect(_options); + } +} + +Object.defineProperty(Server.prototype, 'poolSize', { + enumerable: true, + get: function() { + return this.s.coreTopology.connections().length; + } +}); + +Object.defineProperty(Server.prototype, 'autoReconnect', { + enumerable: true, + get: function() { + return this.s.reconnect; + } +}); + +Object.defineProperty(Server.prototype, 'host', { + enumerable: true, + get: function() { + return this.s.host; + } +}); + +Object.defineProperty(Server.prototype, 'port', { + enumerable: true, + get: function() { + return this.s.port; + } +}); + +/** + * Server connect event + * + * @event Server#connect + * @type {object} + */ + +/** + * Server close event + * + * @event Server#close + * @type {object} + */ + +/** + * Server reconnect event + * + * @event Server#reconnect + * @type {object} + */ + +/** + * Server error event + * + * @event Server#error + * @type {MongoError} + */ + +/** + * Server timeout event + * + * @event Server#timeout + * @type {object} + */ + +/** + * Server parseError event + * + * @event Server#parseError + * @type {object} + */ + +/** + * An event emitted indicating a command was started, if command monitoring is enabled + * + * @event Server#commandStarted + * @type {object} + */ + +/** + * An event emitted indicating a command succeeded, if command monitoring is enabled + * + * @event Server#commandSucceeded + * @type {object} + */ + +/** + * An event emitted indicating a command failed, if command monitoring is enabled + * + * @event Server#commandFailed + * @type {object} + */ + +module.exports = Server; diff --git a/node_modules/mongodb/lib/topologies/topology_base.js b/node_modules/mongodb/lib/topologies/topology_base.js new file mode 100644 index 0000000..938f1a2 --- /dev/null +++ b/node_modules/mongodb/lib/topologies/topology_base.js @@ -0,0 +1,417 @@ +'use strict'; + +const EventEmitter = require('events'), + MongoError = require('../core').MongoError, + f = require('util').format, + ReadPreference = require('../core').ReadPreference, + ClientSession = require('../core').Sessions.ClientSession; + +// The store of ops +var Store = function(topology, storeOptions) { + var self = this; + var storedOps = []; + storeOptions = storeOptions || { force: false, bufferMaxEntries: -1 }; + + // Internal state + this.s = { + storedOps: storedOps, + storeOptions: storeOptions, + topology: topology + }; + + Object.defineProperty(this, 'length', { + enumerable: true, + get: function() { + return self.s.storedOps.length; + } + }); +}; + +Store.prototype.add = function(opType, ns, ops, options, callback) { + if (this.s.storeOptions.force) { + return callback(MongoError.create({ message: 'db closed by application', driver: true })); + } + + if (this.s.storeOptions.bufferMaxEntries === 0) { + return callback( + MongoError.create({ + message: f( + 'no connection available for operation and number of stored operation > %s', + this.s.storeOptions.bufferMaxEntries + ), + driver: true + }) + ); + } + + if ( + this.s.storeOptions.bufferMaxEntries > 0 && + this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries + ) { + while (this.s.storedOps.length > 0) { + var op = this.s.storedOps.shift(); + op.c( + MongoError.create({ + message: f( + 'no connection available for operation and number of stored operation > %s', + this.s.storeOptions.bufferMaxEntries + ), + driver: true + }) + ); + } + + return; + } + + this.s.storedOps.push({ t: opType, n: ns, o: ops, op: options, c: callback }); +}; + +Store.prototype.addObjectAndMethod = function(opType, object, method, params, callback) { + if (this.s.storeOptions.force) { + return callback(MongoError.create({ message: 'db closed by application', driver: true })); + } + + if (this.s.storeOptions.bufferMaxEntries === 0) { + return callback( + MongoError.create({ + message: f( + 'no connection available for operation and number of stored operation > %s', + this.s.storeOptions.bufferMaxEntries + ), + driver: true + }) + ); + } + + if ( + this.s.storeOptions.bufferMaxEntries > 0 && + this.s.storedOps.length > this.s.storeOptions.bufferMaxEntries + ) { + while (this.s.storedOps.length > 0) { + var op = this.s.storedOps.shift(); + op.c( + MongoError.create({ + message: f( + 'no connection available for operation and number of stored operation > %s', + this.s.storeOptions.bufferMaxEntries + ), + driver: true + }) + ); + } + + return; + } + + this.s.storedOps.push({ t: opType, m: method, o: object, p: params, c: callback }); +}; + +Store.prototype.flush = function(err) { + while (this.s.storedOps.length > 0) { + this.s.storedOps + .shift() + .c( + err || + MongoError.create({ message: f('no connection available for operation'), driver: true }) + ); + } +}; + +var primaryOptions = ['primary', 'primaryPreferred', 'nearest', 'secondaryPreferred']; +var secondaryOptions = ['secondary', 'secondaryPreferred']; + +Store.prototype.execute = function(options) { + options = options || {}; + // Get current ops + var ops = this.s.storedOps; + // Reset the ops + this.s.storedOps = []; + + // Unpack options + var executePrimary = typeof options.executePrimary === 'boolean' ? options.executePrimary : true; + var executeSecondary = + typeof options.executeSecondary === 'boolean' ? options.executeSecondary : true; + + // Execute all the stored ops + while (ops.length > 0) { + var op = ops.shift(); + + if (op.t === 'cursor') { + if (executePrimary && executeSecondary) { + op.o[op.m].apply(op.o, op.p); + } else if ( + executePrimary && + op.o.options && + op.o.options.readPreference && + primaryOptions.indexOf(op.o.options.readPreference.mode) !== -1 + ) { + op.o[op.m].apply(op.o, op.p); + } else if ( + !executePrimary && + executeSecondary && + op.o.options && + op.o.options.readPreference && + secondaryOptions.indexOf(op.o.options.readPreference.mode) !== -1 + ) { + op.o[op.m].apply(op.o, op.p); + } + } else if (op.t === 'auth') { + this.s.topology[op.t].apply(this.s.topology, op.o); + } else { + if (executePrimary && executeSecondary) { + this.s.topology[op.t](op.n, op.o, op.op, op.c); + } else if ( + executePrimary && + op.op && + op.op.readPreference && + primaryOptions.indexOf(op.op.readPreference.mode) !== -1 + ) { + this.s.topology[op.t](op.n, op.o, op.op, op.c); + } else if ( + !executePrimary && + executeSecondary && + op.op && + op.op.readPreference && + secondaryOptions.indexOf(op.op.readPreference.mode) !== -1 + ) { + this.s.topology[op.t](op.n, op.o, op.op, op.c); + } + } + } +}; + +Store.prototype.all = function() { + return this.s.storedOps; +}; + +// Server capabilities +var ServerCapabilities = function(ismaster) { + var setup_get_property = function(object, name, value) { + Object.defineProperty(object, name, { + enumerable: true, + get: function() { + return value; + } + }); + }; + + // Capabilities + var aggregationCursor = false; + var writeCommands = false; + var textSearch = false; + var authCommands = false; + var listCollections = false; + var listIndexes = false; + var maxNumberOfDocsInBatch = ismaster.maxWriteBatchSize || 1000; + var commandsTakeWriteConcern = false; + var commandsTakeCollation = false; + + if (ismaster.minWireVersion >= 0) { + textSearch = true; + } + + if (ismaster.maxWireVersion >= 1) { + aggregationCursor = true; + authCommands = true; + } + + if (ismaster.maxWireVersion >= 2) { + writeCommands = true; + } + + if (ismaster.maxWireVersion >= 3) { + listCollections = true; + listIndexes = true; + } + + if (ismaster.maxWireVersion >= 5) { + commandsTakeWriteConcern = true; + commandsTakeCollation = true; + } + + // If no min or max wire version set to 0 + if (ismaster.minWireVersion == null) { + ismaster.minWireVersion = 0; + } + + if (ismaster.maxWireVersion == null) { + ismaster.maxWireVersion = 0; + } + + // Map up read only parameters + setup_get_property(this, 'hasAggregationCursor', aggregationCursor); + setup_get_property(this, 'hasWriteCommands', writeCommands); + setup_get_property(this, 'hasTextSearch', textSearch); + setup_get_property(this, 'hasAuthCommands', authCommands); + setup_get_property(this, 'hasListCollectionsCommand', listCollections); + setup_get_property(this, 'hasListIndexesCommand', listIndexes); + setup_get_property(this, 'minWireVersion', ismaster.minWireVersion); + setup_get_property(this, 'maxWireVersion', ismaster.maxWireVersion); + setup_get_property(this, 'maxNumberOfDocsInBatch', maxNumberOfDocsInBatch); + setup_get_property(this, 'commandsTakeWriteConcern', commandsTakeWriteConcern); + setup_get_property(this, 'commandsTakeCollation', commandsTakeCollation); +}; + +class TopologyBase extends EventEmitter { + constructor() { + super(); + this.setMaxListeners(Infinity); + } + + // Sessions related methods + hasSessionSupport() { + return this.logicalSessionTimeoutMinutes != null; + } + + startSession(options, clientOptions) { + const session = new ClientSession(this, this.s.sessionPool, options, clientOptions); + + session.once('ended', () => { + this.s.sessions.delete(session); + }); + + this.s.sessions.add(session); + return session; + } + + endSessions(sessions, callback) { + return this.s.coreTopology.endSessions(sessions, callback); + } + + get clientMetadata() { + return this.s.coreTopology.s.options.metadata; + } + + // Server capabilities + capabilities() { + if (this.s.sCapabilities) return this.s.sCapabilities; + if (this.s.coreTopology.lastIsMaster() == null) return null; + this.s.sCapabilities = new ServerCapabilities(this.s.coreTopology.lastIsMaster()); + return this.s.sCapabilities; + } + + // Command + command(ns, cmd, options, callback) { + this.s.coreTopology.command(ns.toString(), cmd, ReadPreference.translate(options), callback); + } + + // Insert + insert(ns, ops, options, callback) { + this.s.coreTopology.insert(ns.toString(), ops, options, callback); + } + + // Update + update(ns, ops, options, callback) { + this.s.coreTopology.update(ns.toString(), ops, options, callback); + } + + // Remove + remove(ns, ops, options, callback) { + this.s.coreTopology.remove(ns.toString(), ops, options, callback); + } + + // IsConnected + isConnected(options) { + options = options || {}; + options = ReadPreference.translate(options); + + return this.s.coreTopology.isConnected(options); + } + + // IsDestroyed + isDestroyed() { + return this.s.coreTopology.isDestroyed(); + } + + // Cursor + cursor(ns, cmd, options) { + options = options || {}; + options = ReadPreference.translate(options); + options.disconnectHandler = this.s.store; + options.topology = this; + + return this.s.coreTopology.cursor(ns, cmd, options); + } + + lastIsMaster() { + return this.s.coreTopology.lastIsMaster(); + } + + selectServer(selector, options, callback) { + return this.s.coreTopology.selectServer(selector, options, callback); + } + + /** + * Unref all sockets + * @method + */ + unref() { + return this.s.coreTopology.unref(); + } + + /** + * All raw connections + * @method + * @return {array} + */ + connections() { + return this.s.coreTopology.connections(); + } + + close(forceClosed, callback) { + // If we have sessions, we want to individually move them to the session pool, + // and then send a single endSessions call. + this.s.sessions.forEach(session => session.endSession()); + + if (this.s.sessionPool) { + this.s.sessionPool.endAllPooledSessions(); + } + + // We need to wash out all stored processes + if (forceClosed === true) { + this.s.storeOptions.force = forceClosed; + this.s.store.flush(); + } + + this.s.coreTopology.destroy( + { + force: typeof forceClosed === 'boolean' ? forceClosed : false + }, + callback + ); + } +} + +// Properties +Object.defineProperty(TopologyBase.prototype, 'bson', { + enumerable: true, + get: function() { + return this.s.coreTopology.s.bson; + } +}); + +Object.defineProperty(TopologyBase.prototype, 'parserType', { + enumerable: true, + get: function() { + return this.s.coreTopology.parserType; + } +}); + +Object.defineProperty(TopologyBase.prototype, 'logicalSessionTimeoutMinutes', { + enumerable: true, + get: function() { + return this.s.coreTopology.logicalSessionTimeoutMinutes; + } +}); + +Object.defineProperty(TopologyBase.prototype, 'type', { + enumerable: true, + get: function() { + return this.s.coreTopology.type; + } +}); + +exports.Store = Store; +exports.ServerCapabilities = ServerCapabilities; +exports.TopologyBase = TopologyBase; |