summaryrefslogtreecommitdiffstats
path: root/node_modules/mongodb/lib/change_stream.js
diff options
context:
space:
mode:
authorGravatar Piotr Russ <mail@pruss.it> 2020-11-16 00:10:28 +0100
committerGravatar Piotr Russ <mail@pruss.it> 2020-11-16 00:10:28 +0100
commite06ec920f7a5d784e674c4c4b4e6d1da3dc7391d (patch)
tree55713f725f77b44ebfec86e4eec3ce33e71458ca /node_modules/mongodb/lib/change_stream.js
downloadwebsite_creator-e06ec920f7a5d784e674c4c4b4e6d1da3dc7391d.tar.gz
website_creator-e06ec920f7a5d784e674c4c4b4e6d1da3dc7391d.tar.bz2
website_creator-e06ec920f7a5d784e674c4c4b4e6d1da3dc7391d.zip
api, login, auth
Diffstat (limited to 'node_modules/mongodb/lib/change_stream.js')
-rw-r--r--node_modules/mongodb/lib/change_stream.js623
1 files changed, 623 insertions, 0 deletions
diff --git a/node_modules/mongodb/lib/change_stream.js b/node_modules/mongodb/lib/change_stream.js
new file mode 100644
index 0000000..b226702
--- /dev/null
+++ b/node_modules/mongodb/lib/change_stream.js
@@ -0,0 +1,623 @@
+'use strict';
+
+const Denque = require('denque');
+const EventEmitter = require('events');
+const isResumableError = require('./error').isResumableError;
+const MongoError = require('./core').MongoError;
+const Cursor = require('./cursor');
+const relayEvents = require('./core/utils').relayEvents;
+const maxWireVersion = require('./core/utils').maxWireVersion;
+const maybePromise = require('./utils').maybePromise;
+const now = require('./utils').now;
+const calculateDurationInMs = require('./utils').calculateDurationInMs;
+const AggregateOperation = require('./operations/aggregate');
+
+const kResumeQueue = Symbol('resumeQueue');
+
+const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
+const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
+ CHANGE_STREAM_OPTIONS
+);
+
+const CHANGE_DOMAIN_TYPES = {
+ COLLECTION: Symbol('Collection'),
+ DATABASE: Symbol('Database'),
+ CLUSTER: Symbol('Cluster')
+};
+
+/**
+ * @typedef ResumeToken
+ * @description Represents the logical starting point for a new or resuming {@link ChangeStream} on the server.
+ * @see https://docs.mongodb.com/master/changeStreams/#change-stream-resume-token
+ */
+
+/**
+ * @typedef OperationTime
+ * @description Represents a specific point in time on a server. Can be retrieved by using {@link Db#command}
+ * @see https://docs.mongodb.com/manual/reference/method/db.runCommand/#response
+ */
+
+/**
+ * @typedef ChangeStreamOptions
+ * @description Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
+ * @property {string} [fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred.
+ * @property {number} [maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query.
+ * @property {ResumeToken} [resumeAfter] Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/master/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}.
+ * @property {ResumeToken} [startAfter] Similar to resumeAfter, but will allow you to start after an invalidated event. See {@link https://docs.mongodb.com/master/changeStreams/#startafter-for-change-streams|ChangeStream documentation}.
+ * @property {OperationTime} [startAtOperationTime] Will start the changeStream after the specified operationTime.
+ * @property {number} [batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
+ * @property {object} [collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
+ * @property {ReadPreference} [readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
+ */
+
+/**
+ * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
+ * @class ChangeStream
+ * @since 3.0.0
+ * @param {(MongoClient|Db|Collection)} parent The parent object that created this change stream
+ * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
+ * @param {ChangeStreamOptions} [options] Optional settings
+ * @fires ChangeStream#close
+ * @fires ChangeStream#change
+ * @fires ChangeStream#end
+ * @fires ChangeStream#error
+ * @fires ChangeStream#resumeTokenChanged
+ * @return {ChangeStream} a ChangeStream instance.
+ */
+class ChangeStream extends EventEmitter {
+ constructor(parent, pipeline, options) {
+ super();
+ const Collection = require('./collection');
+ const Db = require('./db');
+ const MongoClient = require('./mongo_client');
+
+ this.pipeline = pipeline || [];
+ this.options = options || {};
+
+ this.parent = parent;
+ this.namespace = parent.s.namespace;
+ if (parent instanceof Collection) {
+ this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
+ this.topology = parent.s.db.serverConfig;
+ } else if (parent instanceof Db) {
+ this.type = CHANGE_DOMAIN_TYPES.DATABASE;
+ this.topology = parent.serverConfig;
+ } else if (parent instanceof MongoClient) {
+ this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
+ this.topology = parent.topology;
+ } else {
+ throw new TypeError(
+ 'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
+ );
+ }
+
+ this.promiseLibrary = parent.s.promiseLibrary;
+ if (!this.options.readPreference && parent.s.readPreference) {
+ this.options.readPreference = parent.s.readPreference;
+ }
+
+ this[kResumeQueue] = new Denque();
+
+ // Create contained Change Stream cursor
+ this.cursor = createChangeStreamCursor(this, options);
+
+ this.closed = false;
+
+ // Listen for any `change` listeners being added to ChangeStream
+ this.on('newListener', eventName => {
+ if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
+ this.cursor.on('data', change => processNewChange(this, change));
+ }
+ });
+
+ // Listen for all `change` listeners being removed from ChangeStream
+ this.on('removeListener', eventName => {
+ if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
+ this.cursor.removeAllListeners('data');
+ }
+ });
+ }
+
+ /**
+ * @property {ResumeToken} resumeToken
+ * The cached resume token that will be used to resume
+ * after the most recently returned change.
+ */
+ get resumeToken() {
+ return this.cursor.resumeToken;
+ }
+
+ /**
+ * Check if there is any document still available in the Change Stream
+ * @function ChangeStream.prototype.hasNext
+ * @param {ChangeStream~resultCallback} [callback] The result callback.
+ * @throws {MongoError}
+ * @returns {Promise|void} returns Promise if no callback passed
+ */
+ hasNext(callback) {
+ return maybePromise(this.parent, callback, cb => {
+ getCursor(this, (err, cursor) => {
+ if (err) return cb(err); // failed to resume, raise an error
+ cursor.hasNext(cb);
+ });
+ });
+ }
+
+ /**
+ * Get the next available document from the Change Stream, returns null if no more documents are available.
+ * @function ChangeStream.prototype.next
+ * @param {ChangeStream~resultCallback} [callback] The result callback.
+ * @throws {MongoError}
+ * @returns {Promise|void} returns Promise if no callback passed
+ */
+ next(callback) {
+ return maybePromise(this.parent, callback, cb => {
+ getCursor(this, (err, cursor) => {
+ if (err) return cb(err); // failed to resume, raise an error
+ cursor.next((error, change) => {
+ if (error) {
+ this[kResumeQueue].push(() => this.next(cb));
+ processError(this, error, cb);
+ return;
+ }
+ processNewChange(this, change, cb);
+ });
+ });
+ });
+ }
+
+ /**
+ * Is the change stream closed
+ * @method ChangeStream.prototype.isClosed
+ * @return {boolean}
+ */
+ isClosed() {
+ return this.closed || (this.cursor && this.cursor.isClosed());
+ }
+
+ /**
+ * Close the Change Stream
+ * @method ChangeStream.prototype.close
+ * @param {ChangeStream~resultCallback} [callback] The result callback.
+ * @return {Promise} returns Promise if no callback passed
+ */
+ close(callback) {
+ return maybePromise(this.parent, callback, cb => {
+ if (this.closed) return cb();
+
+ // flag the change stream as explicitly closed
+ this.closed = true;
+
+ if (!this.cursor) return cb();
+
+ // Tidy up the existing cursor
+ const cursor = this.cursor;
+
+ return cursor.close(err => {
+ ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
+ this.cursor = undefined;
+
+ return cb(err);
+ });
+ });
+ }
+
+ /**
+ * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
+ * @method
+ * @param {Writable} destination The destination for writing data
+ * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options}
+ * @return {null}
+ */
+ pipe(destination, options) {
+ if (!this.pipeDestinations) {
+ this.pipeDestinations = [];
+ }
+ this.pipeDestinations.push(destination);
+ return this.cursor.pipe(destination, options);
+ }
+
+ /**
+ * This method will remove the hooks set up for a previous pipe() call.
+ * @param {Writable} [destination] The destination for writing data
+ * @return {null}
+ */
+ unpipe(destination) {
+ if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) {
+ this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1);
+ }
+ return this.cursor.unpipe(destination);
+ }
+
+ /**
+ * Return a modified Readable stream including a possible transform method.
+ * @method
+ * @param {object} [options] Optional settings.
+ * @param {function} [options.transform] A transformation method applied to each document emitted by the stream.
+ * @return {Cursor}
+ */
+ stream(options) {
+ this.streamOptions = options;
+ return this.cursor.stream(options);
+ }
+
+ /**
+ * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
+ * @return {null}
+ */
+ pause() {
+ return this.cursor.pause();
+ }
+
+ /**
+ * This method will cause the readable stream to resume emitting data events.
+ * @return {null}
+ */
+ resume() {
+ return this.cursor.resume();
+ }
+}
+
+class ChangeStreamCursor extends Cursor {
+ constructor(topology, operation, options) {
+ super(topology, operation, options);
+
+ options = options || {};
+ this._resumeToken = null;
+ this.startAtOperationTime = options.startAtOperationTime;
+
+ if (options.startAfter) {
+ this.resumeToken = options.startAfter;
+ } else if (options.resumeAfter) {
+ this.resumeToken = options.resumeAfter;
+ }
+ }
+
+ set resumeToken(token) {
+ this._resumeToken = token;
+ this.emit('resumeTokenChanged', token);
+ }
+
+ get resumeToken() {
+ return this._resumeToken;
+ }
+
+ get resumeOptions() {
+ const result = {};
+ for (const optionName of CURSOR_OPTIONS) {
+ if (this.options[optionName]) result[optionName] = this.options[optionName];
+ }
+
+ if (this.resumeToken || this.startAtOperationTime) {
+ ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);
+
+ if (this.resumeToken) {
+ const resumeKey =
+ this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
+ result[resumeKey] = this.resumeToken;
+ } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
+ result.startAtOperationTime = this.startAtOperationTime;
+ }
+ }
+
+ return result;
+ }
+
+ cacheResumeToken(resumeToken) {
+ if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
+ this.resumeToken = this.cursorState.postBatchResumeToken;
+ } else {
+ this.resumeToken = resumeToken;
+ }
+ this.hasReceived = true;
+ }
+
+ _processBatch(batchName, response) {
+ const cursor = response.cursor;
+ if (cursor.postBatchResumeToken) {
+ this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
+
+ if (cursor[batchName].length === 0) {
+ this.resumeToken = cursor.postBatchResumeToken;
+ }
+ }
+ }
+
+ _initializeCursor(callback) {
+ super._initializeCursor((err, result) => {
+ if (err || result == null) {
+ callback(err, result);
+ return;
+ }
+
+ const response = result.documents[0];
+
+ if (
+ this.startAtOperationTime == null &&
+ this.resumeAfter == null &&
+ this.startAfter == null &&
+ maxWireVersion(this.server) >= 7
+ ) {
+ this.startAtOperationTime = response.operationTime;
+ }
+
+ this._processBatch('firstBatch', response);
+
+ this.emit('init', result);
+ this.emit('response');
+ callback(err, result);
+ });
+ }
+
+ _getMore(callback) {
+ super._getMore((err, response) => {
+ if (err) {
+ callback(err);
+ return;
+ }
+
+ this._processBatch('nextBatch', response);
+
+ this.emit('more', response);
+ this.emit('response');
+ callback(err, response);
+ });
+ }
+}
+
+/**
+ * @event ChangeStreamCursor#response
+ * internal event DO NOT USE
+ * @ignore
+ */
+
+// Create a new change stream cursor based on self's configuration
+function createChangeStreamCursor(self, options) {
+ const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
+ applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
+ if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
+ changeStreamStageOptions.allChangesForCluster = true;
+ }
+
+ const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
+ const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
+
+ const changeStreamCursor = new ChangeStreamCursor(
+ self.topology,
+ new AggregateOperation(self.parent, pipeline, options),
+ cursorOptions
+ );
+
+ relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
+
+ /**
+ * Fired for each new matching change in the specified namespace. Attaching a `change`
+ * event listener to a Change Stream will switch the stream into flowing mode. Data will
+ * then be passed as soon as it is available.
+ *
+ * @event ChangeStream#change
+ * @type {object}
+ */
+ if (self.listenerCount('change') > 0) {
+ changeStreamCursor.on('data', function(change) {
+ processNewChange(self, change);
+ });
+ }
+
+ /**
+ * Change stream close event
+ *
+ * @event ChangeStream#close
+ * @type {null}
+ */
+
+ /**
+ * Change stream end event
+ *
+ * @event ChangeStream#end
+ * @type {null}
+ */
+
+ /**
+ * Emitted each time the change stream stores a new resume token.
+ *
+ * @event ChangeStream#resumeTokenChanged
+ * @type {ResumeToken}
+ */
+
+ /**
+ * Fired when the stream encounters an error.
+ *
+ * @event ChangeStream#error
+ * @type {Error}
+ */
+ changeStreamCursor.on('error', function(error) {
+ processError(self, error);
+ });
+
+ if (self.pipeDestinations) {
+ const cursorStream = changeStreamCursor.stream(self.streamOptions);
+ for (let pipeDestination of self.pipeDestinations) {
+ cursorStream.pipe(pipeDestination);
+ }
+ }
+
+ return changeStreamCursor;
+}
+
+function applyKnownOptions(target, source, optionNames) {
+ optionNames.forEach(name => {
+ if (source[name]) {
+ target[name] = source[name];
+ }
+ });
+
+ return target;
+}
+
+// This method performs a basic server selection loop, satisfying the requirements of
+// ChangeStream resumability until the new SDAM layer can be used.
+const SELECTION_TIMEOUT = 30000;
+function waitForTopologyConnected(topology, options, callback) {
+ setTimeout(() => {
+ if (options && options.start == null) {
+ options.start = now();
+ }
+
+ const start = options.start || now();
+ const timeout = options.timeout || SELECTION_TIMEOUT;
+ const readPreference = options.readPreference;
+ if (topology.isConnected({ readPreference })) {
+ return callback();
+ }
+
+ if (calculateDurationInMs(start) > timeout) {
+ return callback(new MongoError('Timed out waiting for connection'));
+ }
+
+ waitForTopologyConnected(topology, options, callback);
+ }, 500); // this is an arbitrary wait time to allow SDAM to transition
+}
+
+function processNewChange(changeStream, change, callback) {
+ const cursor = changeStream.cursor;
+
+ // a null change means the cursor has been notified, implicitly closing the change stream
+ if (change == null) {
+ changeStream.closed = true;
+ }
+
+ if (changeStream.closed) {
+ if (callback) callback(new MongoError('ChangeStream is closed'));
+ return;
+ }
+
+ if (change && !change._id) {
+ const noResumeTokenError = new Error(
+ 'A change stream document has been received that lacks a resume token (_id).'
+ );
+
+ if (!callback) return changeStream.emit('error', noResumeTokenError);
+ return callback(noResumeTokenError);
+ }
+
+ // cache the resume token
+ cursor.cacheResumeToken(change._id);
+
+ // wipe the startAtOperationTime if there was one so that there won't be a conflict
+ // between resumeToken and startAtOperationTime if we need to reconnect the cursor
+ changeStream.options.startAtOperationTime = undefined;
+
+ // Return the change
+ if (!callback) return changeStream.emit('change', change);
+ return callback(undefined, change);
+}
+
+function processError(changeStream, error, callback) {
+ const topology = changeStream.topology;
+ const cursor = changeStream.cursor;
+
+ // If the change stream has been closed explictly, do not process error.
+ if (changeStream.closed) {
+ if (callback) callback(new MongoError('ChangeStream is closed'));
+ return;
+ }
+
+ // if the resume succeeds, continue with the new cursor
+ function resumeWithCursor(newCursor) {
+ changeStream.cursor = newCursor;
+ processResumeQueue(changeStream);
+ }
+
+ // otherwise, raise an error and close the change stream
+ function unresumableError(err) {
+ if (!callback) {
+ changeStream.emit('error', err);
+ changeStream.emit('close');
+ }
+ processResumeQueue(changeStream, err);
+ changeStream.closed = true;
+ }
+
+ if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
+ changeStream.cursor = undefined;
+
+ // stop listening to all events from old cursor
+ ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event));
+
+ // close internal cursor, ignore errors
+ cursor.close();
+
+ waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => {
+ // if the topology can't reconnect, close the stream
+ if (err) return unresumableError(err);
+
+ // create a new cursor, preserving the old cursor's options
+ const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions);
+
+ // attempt to continue in emitter mode
+ if (!callback) return resumeWithCursor(newCursor);
+
+ // attempt to continue in iterator mode
+ newCursor.hasNext(err => {
+ // if there's an error immediately after resuming, close the stream
+ if (err) return unresumableError(err);
+ resumeWithCursor(newCursor);
+ });
+ });
+ return;
+ }
+
+ if (!callback) return changeStream.emit('error', error);
+ return callback(error);
+}
+
+/**
+ * Safely provides a cursor across resume attempts
+ *
+ * @param {ChangeStream} changeStream the parent ChangeStream
+ * @param {function} callback gets the cursor or error
+ * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor
+ */
+function getCursor(changeStream, callback) {
+ if (changeStream.isClosed()) {
+ callback(new MongoError('ChangeStream is closed.'));
+ return;
+ }
+
+ // if a cursor exists and it is open, return it
+ if (changeStream.cursor) {
+ callback(undefined, changeStream.cursor);
+ return;
+ }
+
+ // no cursor, queue callback until topology reconnects
+ changeStream[kResumeQueue].push(callback);
+}
+
+/**
+ * Drain the resume queue when a new has become available
+ *
+ * @param {ChangeStream} changeStream the parent ChangeStream
+ * @param {ChangeStreamCursor?} changeStream.cursor the new cursor
+ * @param {Error} [err] error getting a new cursor
+ */
+function processResumeQueue(changeStream, err) {
+ while (changeStream[kResumeQueue].length) {
+ const request = changeStream[kResumeQueue].pop();
+ if (changeStream.isClosed() && !err) {
+ request(new MongoError('Change Stream is not open.'));
+ return;
+ }
+ request(err, changeStream.cursor);
+ }
+}
+
+/**
+ * The callback format for results
+ * @callback ChangeStream~resultCallback
+ * @param {MongoError} error An error instance representing the error during the execution.
+ * @param {(object|null)} result The result object if the command was executed successfully.
+ */
+
+module.exports = ChangeStream;