summaryrefslogtreecommitdiffstats
path: root/node_modules/mongodb/lib/cmap/connection.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/mongodb/lib/cmap/connection.js')
-rw-r--r--node_modules/mongodb/lib/cmap/connection.js384
1 files changed, 384 insertions, 0 deletions
diff --git a/node_modules/mongodb/lib/cmap/connection.js b/node_modules/mongodb/lib/cmap/connection.js
new file mode 100644
index 0000000..bf71562
--- /dev/null
+++ b/node_modules/mongodb/lib/cmap/connection.js
@@ -0,0 +1,384 @@
+'use strict';
+
+const EventEmitter = require('events');
+const MessageStream = require('./message_stream');
+const MongoError = require('../core/error').MongoError;
+const MongoNetworkError = require('../core/error').MongoNetworkError;
+const MongoNetworkTimeoutError = require('../core/error').MongoNetworkTimeoutError;
+const MongoWriteConcernError = require('../core/error').MongoWriteConcernError;
+const CommandResult = require('../core/connection/command_result');
+const StreamDescription = require('./stream_description').StreamDescription;
+const wp = require('../core/wireprotocol');
+const apm = require('../core/connection/apm');
+const updateSessionFromResponse = require('../core/sessions').updateSessionFromResponse;
+const uuidV4 = require('../core/utils').uuidV4;
+const now = require('../utils').now;
+const calculateDurationInMs = require('../utils').calculateDurationInMs;
+
+const kStream = Symbol('stream');
+const kQueue = Symbol('queue');
+const kMessageStream = Symbol('messageStream');
+const kGeneration = Symbol('generation');
+const kLastUseTime = Symbol('lastUseTime');
+const kClusterTime = Symbol('clusterTime');
+const kDescription = Symbol('description');
+const kIsMaster = Symbol('ismaster');
+const kAutoEncrypter = Symbol('autoEncrypter');
+
+class Connection extends EventEmitter {
+ constructor(stream, options) {
+ super(options);
+
+ this.id = options.id;
+ this.address = streamIdentifier(stream);
+ this.bson = options.bson;
+ this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 0;
+ this.host = options.host || 'localhost';
+ this.port = options.port || 27017;
+ this.monitorCommands =
+ typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false;
+ this.closed = false;
+ this.destroyed = false;
+
+ this[kDescription] = new StreamDescription(this.address, options);
+ this[kGeneration] = options.generation;
+ this[kLastUseTime] = now();
+
+ // retain a reference to an `AutoEncrypter` if present
+ if (options.autoEncrypter) {
+ this[kAutoEncrypter] = options.autoEncrypter;
+ }
+
+ // setup parser stream and message handling
+ this[kQueue] = new Map();
+ this[kMessageStream] = new MessageStream(options);
+ this[kMessageStream].on('message', messageHandler(this));
+ this[kStream] = stream;
+ stream.on('error', () => {
+ /* ignore errors, listen to `close` instead */
+ });
+
+ stream.on('close', () => {
+ if (this.closed) {
+ return;
+ }
+
+ this.closed = true;
+ this[kQueue].forEach(op =>
+ op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`))
+ );
+ this[kQueue].clear();
+
+ this.emit('close');
+ });
+
+ stream.on('timeout', () => {
+ if (this.closed) {
+ return;
+ }
+
+ stream.destroy();
+ this.closed = true;
+ this[kQueue].forEach(op =>
+ op.cb(
+ new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
+ beforeHandshake: this[kIsMaster] == null
+ })
+ )
+ );
+
+ this[kQueue].clear();
+ this.emit('close');
+ });
+
+ // hook the message stream up to the passed in stream
+ stream.pipe(this[kMessageStream]);
+ this[kMessageStream].pipe(stream);
+ }
+
+ get description() {
+ return this[kDescription];
+ }
+
+ get ismaster() {
+ return this[kIsMaster];
+ }
+
+ // the `connect` method stores the result of the handshake ismaster on the connection
+ set ismaster(response) {
+ this[kDescription].receiveResponse(response);
+
+ // TODO: remove this, and only use the `StreamDescription` in the future
+ this[kIsMaster] = response;
+ }
+
+ get generation() {
+ return this[kGeneration] || 0;
+ }
+
+ get idleTime() {
+ return calculateDurationInMs(this[kLastUseTime]);
+ }
+
+ get clusterTime() {
+ return this[kClusterTime];
+ }
+
+ get stream() {
+ return this[kStream];
+ }
+
+ markAvailable() {
+ this[kLastUseTime] = now();
+ }
+
+ destroy(options, callback) {
+ if (typeof options === 'function') {
+ callback = options;
+ options = {};
+ }
+
+ options = Object.assign({ force: false }, options);
+ if (this[kStream] == null || this.destroyed) {
+ this.destroyed = true;
+ if (typeof callback === 'function') {
+ callback();
+ }
+
+ return;
+ }
+
+ if (options.force) {
+ this[kStream].destroy();
+ this.destroyed = true;
+ if (typeof callback === 'function') {
+ callback();
+ }
+
+ return;
+ }
+
+ this[kStream].end(err => {
+ this.destroyed = true;
+ if (typeof callback === 'function') {
+ callback(err);
+ }
+ });
+ }
+
+ // Wire protocol methods
+ command(ns, cmd, options, callback) {
+ wp.command(makeServerTrampoline(this), ns, cmd, options, callback);
+ }
+
+ query(ns, cmd, cursorState, options, callback) {
+ wp.query(makeServerTrampoline(this), ns, cmd, cursorState, options, callback);
+ }
+
+ getMore(ns, cursorState, batchSize, options, callback) {
+ wp.getMore(makeServerTrampoline(this), ns, cursorState, batchSize, options, callback);
+ }
+
+ killCursors(ns, cursorState, callback) {
+ wp.killCursors(makeServerTrampoline(this), ns, cursorState, callback);
+ }
+
+ insert(ns, ops, options, callback) {
+ wp.insert(makeServerTrampoline(this), ns, ops, options, callback);
+ }
+
+ update(ns, ops, options, callback) {
+ wp.update(makeServerTrampoline(this), ns, ops, options, callback);
+ }
+
+ remove(ns, ops, options, callback) {
+ wp.remove(makeServerTrampoline(this), ns, ops, options, callback);
+ }
+}
+
+/// This lets us emulate a legacy `Server` instance so we can work with the existing wire
+/// protocol methods. Eventually, the operation executor will return a `Connection` to execute
+/// against.
+function makeServerTrampoline(connection) {
+ const server = {
+ description: connection.description,
+ clusterTime: connection[kClusterTime],
+ s: {
+ bson: connection.bson,
+ pool: { write: write.bind(connection), isConnected: () => true }
+ }
+ };
+
+ if (connection[kAutoEncrypter]) {
+ server.autoEncrypter = connection[kAutoEncrypter];
+ }
+
+ return server;
+}
+
+function messageHandler(conn) {
+ return function messageHandler(message) {
+ // always emit the message, in case we are streaming
+ conn.emit('message', message);
+ if (!conn[kQueue].has(message.responseTo)) {
+ return;
+ }
+
+ const operationDescription = conn[kQueue].get(message.responseTo);
+ const callback = operationDescription.cb;
+
+ // SERVER-45775: For exhaust responses we should be able to use the same requestId to
+ // track response, however the server currently synthetically produces remote requests
+ // making the `responseTo` change on each response
+ conn[kQueue].delete(message.responseTo);
+ if (message.moreToCome) {
+ // requeue the callback for next synthetic request
+ conn[kQueue].set(message.requestId, operationDescription);
+ } else if (operationDescription.socketTimeoutOverride) {
+ conn[kStream].setTimeout(conn.socketTimeout);
+ }
+
+ try {
+ // Pass in the entire description because it has BSON parsing options
+ message.parse(operationDescription);
+ } catch (err) {
+ callback(new MongoError(err));
+ return;
+ }
+
+ if (message.documents[0]) {
+ const document = message.documents[0];
+ const session = operationDescription.session;
+ if (session) {
+ updateSessionFromResponse(session, document);
+ }
+
+ if (document.$clusterTime) {
+ conn[kClusterTime] = document.$clusterTime;
+ conn.emit('clusterTimeReceived', document.$clusterTime);
+ }
+
+ if (operationDescription.command) {
+ if (document.writeConcernError) {
+ callback(new MongoWriteConcernError(document.writeConcernError, document));
+ return;
+ }
+
+ if (document.ok === 0 || document.$err || document.errmsg || document.code) {
+ callback(new MongoError(document));
+ return;
+ }
+ }
+ }
+
+ // NODE-2382: reenable in our glorious non-leaky abstraction future
+ // callback(null, operationDescription.fullResult ? message : message.documents[0]);
+
+ callback(
+ undefined,
+ new CommandResult(
+ operationDescription.fullResult ? message : message.documents[0],
+ conn,
+ message
+ )
+ );
+ };
+}
+
+function streamIdentifier(stream) {
+ if (typeof stream.address === 'function') {
+ return `${stream.remoteAddress}:${stream.remotePort}`;
+ }
+
+ return uuidV4().toString('hex');
+}
+
+// Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance
+function write(command, options, callback) {
+ if (typeof options === 'function') {
+ callback = options;
+ }
+
+ options = options || {};
+ const operationDescription = {
+ requestId: command.requestId,
+ cb: callback,
+ session: options.session,
+ fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false,
+ noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
+ documentsReturnedIn: options.documentsReturnedIn,
+ command: !!options.command,
+
+ // for BSON parsing
+ promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
+ promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
+ promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
+ raw: typeof options.raw === 'boolean' ? options.raw : false
+ };
+
+ if (this[kDescription] && this[kDescription].compressor) {
+ operationDescription.agreedCompressor = this[kDescription].compressor;
+
+ if (this[kDescription].zlibCompressionLevel) {
+ operationDescription.zlibCompressionLevel = this[kDescription].zlibCompressionLevel;
+ }
+ }
+
+ if (typeof options.socketTimeout === 'number') {
+ operationDescription.socketTimeoutOverride = true;
+ this[kStream].setTimeout(options.socketTimeout);
+ }
+
+ // if command monitoring is enabled we need to modify the callback here
+ if (this.monitorCommands) {
+ this.emit('commandStarted', new apm.CommandStartedEvent(this, command));
+
+ operationDescription.started = now();
+ operationDescription.cb = (err, reply) => {
+ if (err) {
+ this.emit(
+ 'commandFailed',
+ new apm.CommandFailedEvent(this, command, err, operationDescription.started)
+ );
+ } else {
+ if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) {
+ this.emit(
+ 'commandFailed',
+ new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started)
+ );
+ } else {
+ this.emit(
+ 'commandSucceeded',
+ new apm.CommandSucceededEvent(this, command, reply, operationDescription.started)
+ );
+ }
+ }
+
+ if (typeof callback === 'function') {
+ callback(err, reply);
+ }
+ };
+ }
+
+ if (!operationDescription.noResponse) {
+ this[kQueue].set(operationDescription.requestId, operationDescription);
+ }
+
+ try {
+ this[kMessageStream].writeCommand(command, operationDescription);
+ } catch (e) {
+ if (!operationDescription.noResponse) {
+ this[kQueue].delete(operationDescription.requestId);
+ operationDescription.cb(e);
+ return;
+ }
+ }
+
+ if (operationDescription.noResponse) {
+ operationDescription.cb();
+ }
+}
+
+module.exports = {
+ Connection
+};