From 81ddf9b700bc48a1f8e472209f080f9c1d9a9b09 Mon Sep 17 00:00:00 2001 From: Piotr Russ Date: Wed, 18 Nov 2020 23:26:45 +0100 Subject: rm node_modules --- node_modules/mongodb/lib/change_stream.js | 623 ------------------------------ 1 file changed, 623 deletions(-) delete mode 100644 node_modules/mongodb/lib/change_stream.js (limited to 'node_modules/mongodb/lib/change_stream.js') diff --git a/node_modules/mongodb/lib/change_stream.js b/node_modules/mongodb/lib/change_stream.js deleted file mode 100644 index b226702..0000000 --- a/node_modules/mongodb/lib/change_stream.js +++ /dev/null @@ -1,623 +0,0 @@ -'use strict'; - -const Denque = require('denque'); -const EventEmitter = require('events'); -const isResumableError = require('./error').isResumableError; -const MongoError = require('./core').MongoError; -const Cursor = require('./cursor'); -const relayEvents = require('./core/utils').relayEvents; -const maxWireVersion = require('./core/utils').maxWireVersion; -const maybePromise = require('./utils').maybePromise; -const now = require('./utils').now; -const calculateDurationInMs = require('./utils').calculateDurationInMs; -const AggregateOperation = require('./operations/aggregate'); - -const kResumeQueue = Symbol('resumeQueue'); - -const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; -const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( - CHANGE_STREAM_OPTIONS -); - -const CHANGE_DOMAIN_TYPES = { - COLLECTION: Symbol('Collection'), - DATABASE: Symbol('Database'), - CLUSTER: Symbol('Cluster') -}; - -/** - * @typedef ResumeToken - * @description Represents the logical starting point for a new or resuming {@link ChangeStream} on the server. - * @see https://docs.mongodb.com/master/changeStreams/#change-stream-resume-token - */ - -/** - * @typedef OperationTime - * @description Represents a specific point in time on a server. Can be retrieved by using {@link Db#command} - * @see https://docs.mongodb.com/manual/reference/method/db.runCommand/#response - */ - -/** - * @typedef ChangeStreamOptions - * @description Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified. - * @property {string} [fullDocument='default'] Allowed values: ‘default’, ‘updateLookup’. When set to ‘updateLookup’, the change stream will include both a delta describing the changes to the document, as well as a copy of the entire document that was changed from some time after the change occurred. - * @property {number} [maxAwaitTimeMS] The maximum amount of time for the server to wait on new documents to satisfy a change stream query. - * @property {ResumeToken} [resumeAfter] Allows you to start a changeStream after a specified event. See {@link https://docs.mongodb.com/master/changeStreams/#resumeafter-for-change-streams|ChangeStream documentation}. - * @property {ResumeToken} [startAfter] Similar to resumeAfter, but will allow you to start after an invalidated event. See {@link https://docs.mongodb.com/master/changeStreams/#startafter-for-change-streams|ChangeStream documentation}. - * @property {OperationTime} [startAtOperationTime] Will start the changeStream after the specified operationTime. - * @property {number} [batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. - * @property {object} [collation] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}. - * @property {ReadPreference} [readPreference] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}. - */ - -/** - * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. - * @class ChangeStream - * @since 3.0.0 - * @param {(MongoClient|Db|Collection)} parent The parent object that created this change stream - * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents - * @param {ChangeStreamOptions} [options] Optional settings - * @fires ChangeStream#close - * @fires ChangeStream#change - * @fires ChangeStream#end - * @fires ChangeStream#error - * @fires ChangeStream#resumeTokenChanged - * @return {ChangeStream} a ChangeStream instance. - */ -class ChangeStream extends EventEmitter { - constructor(parent, pipeline, options) { - super(); - const Collection = require('./collection'); - const Db = require('./db'); - const MongoClient = require('./mongo_client'); - - this.pipeline = pipeline || []; - this.options = options || {}; - - this.parent = parent; - this.namespace = parent.s.namespace; - if (parent instanceof Collection) { - this.type = CHANGE_DOMAIN_TYPES.COLLECTION; - this.topology = parent.s.db.serverConfig; - } else if (parent instanceof Db) { - this.type = CHANGE_DOMAIN_TYPES.DATABASE; - this.topology = parent.serverConfig; - } else if (parent instanceof MongoClient) { - this.type = CHANGE_DOMAIN_TYPES.CLUSTER; - this.topology = parent.topology; - } else { - throw new TypeError( - 'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient' - ); - } - - this.promiseLibrary = parent.s.promiseLibrary; - if (!this.options.readPreference && parent.s.readPreference) { - this.options.readPreference = parent.s.readPreference; - } - - this[kResumeQueue] = new Denque(); - - // Create contained Change Stream cursor - this.cursor = createChangeStreamCursor(this, options); - - this.closed = false; - - // Listen for any `change` listeners being added to ChangeStream - this.on('newListener', eventName => { - if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { - this.cursor.on('data', change => processNewChange(this, change)); - } - }); - - // Listen for all `change` listeners being removed from ChangeStream - this.on('removeListener', eventName => { - if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) { - this.cursor.removeAllListeners('data'); - } - }); - } - - /** - * @property {ResumeToken} resumeToken - * The cached resume token that will be used to resume - * after the most recently returned change. - */ - get resumeToken() { - return this.cursor.resumeToken; - } - - /** - * Check if there is any document still available in the Change Stream - * @function ChangeStream.prototype.hasNext - * @param {ChangeStream~resultCallback} [callback] The result callback. - * @throws {MongoError} - * @returns {Promise|void} returns Promise if no callback passed - */ - hasNext(callback) { - return maybePromise(this.parent, callback, cb => { - getCursor(this, (err, cursor) => { - if (err) return cb(err); // failed to resume, raise an error - cursor.hasNext(cb); - }); - }); - } - - /** - * Get the next available document from the Change Stream, returns null if no more documents are available. - * @function ChangeStream.prototype.next - * @param {ChangeStream~resultCallback} [callback] The result callback. - * @throws {MongoError} - * @returns {Promise|void} returns Promise if no callback passed - */ - next(callback) { - return maybePromise(this.parent, callback, cb => { - getCursor(this, (err, cursor) => { - if (err) return cb(err); // failed to resume, raise an error - cursor.next((error, change) => { - if (error) { - this[kResumeQueue].push(() => this.next(cb)); - processError(this, error, cb); - return; - } - processNewChange(this, change, cb); - }); - }); - }); - } - - /** - * Is the change stream closed - * @method ChangeStream.prototype.isClosed - * @return {boolean} - */ - isClosed() { - return this.closed || (this.cursor && this.cursor.isClosed()); - } - - /** - * Close the Change Stream - * @method ChangeStream.prototype.close - * @param {ChangeStream~resultCallback} [callback] The result callback. - * @return {Promise} returns Promise if no callback passed - */ - close(callback) { - return maybePromise(this.parent, callback, cb => { - if (this.closed) return cb(); - - // flag the change stream as explicitly closed - this.closed = true; - - if (!this.cursor) return cb(); - - // Tidy up the existing cursor - const cursor = this.cursor; - - return cursor.close(err => { - ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - this.cursor = undefined; - - return cb(err); - }); - }); - } - - /** - * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream. - * @method - * @param {Writable} destination The destination for writing data - * @param {object} [options] {@link https://nodejs.org/api/stream.html#stream_readable_pipe_destination_options|Pipe options} - * @return {null} - */ - pipe(destination, options) { - if (!this.pipeDestinations) { - this.pipeDestinations = []; - } - this.pipeDestinations.push(destination); - return this.cursor.pipe(destination, options); - } - - /** - * This method will remove the hooks set up for a previous pipe() call. - * @param {Writable} [destination] The destination for writing data - * @return {null} - */ - unpipe(destination) { - if (this.pipeDestinations && this.pipeDestinations.indexOf(destination) > -1) { - this.pipeDestinations.splice(this.pipeDestinations.indexOf(destination), 1); - } - return this.cursor.unpipe(destination); - } - - /** - * Return a modified Readable stream including a possible transform method. - * @method - * @param {object} [options] Optional settings. - * @param {function} [options.transform] A transformation method applied to each document emitted by the stream. - * @return {Cursor} - */ - stream(options) { - this.streamOptions = options; - return this.cursor.stream(options); - } - - /** - * This method will cause a stream in flowing mode to stop emitting data events. Any data that becomes available will remain in the internal buffer. - * @return {null} - */ - pause() { - return this.cursor.pause(); - } - - /** - * This method will cause the readable stream to resume emitting data events. - * @return {null} - */ - resume() { - return this.cursor.resume(); - } -} - -class ChangeStreamCursor extends Cursor { - constructor(topology, operation, options) { - super(topology, operation, options); - - options = options || {}; - this._resumeToken = null; - this.startAtOperationTime = options.startAtOperationTime; - - if (options.startAfter) { - this.resumeToken = options.startAfter; - } else if (options.resumeAfter) { - this.resumeToken = options.resumeAfter; - } - } - - set resumeToken(token) { - this._resumeToken = token; - this.emit('resumeTokenChanged', token); - } - - get resumeToken() { - return this._resumeToken; - } - - get resumeOptions() { - const result = {}; - for (const optionName of CURSOR_OPTIONS) { - if (this.options[optionName]) result[optionName] = this.options[optionName]; - } - - if (this.resumeToken || this.startAtOperationTime) { - ['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]); - - if (this.resumeToken) { - const resumeKey = - this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter'; - result[resumeKey] = this.resumeToken; - } else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) { - result.startAtOperationTime = this.startAtOperationTime; - } - } - - return result; - } - - cacheResumeToken(resumeToken) { - if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) { - this.resumeToken = this.cursorState.postBatchResumeToken; - } else { - this.resumeToken = resumeToken; - } - this.hasReceived = true; - } - - _processBatch(batchName, response) { - const cursor = response.cursor; - if (cursor.postBatchResumeToken) { - this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken; - - if (cursor[batchName].length === 0) { - this.resumeToken = cursor.postBatchResumeToken; - } - } - } - - _initializeCursor(callback) { - super._initializeCursor((err, result) => { - if (err || result == null) { - callback(err, result); - return; - } - - const response = result.documents[0]; - - if ( - this.startAtOperationTime == null && - this.resumeAfter == null && - this.startAfter == null && - maxWireVersion(this.server) >= 7 - ) { - this.startAtOperationTime = response.operationTime; - } - - this._processBatch('firstBatch', response); - - this.emit('init', result); - this.emit('response'); - callback(err, result); - }); - } - - _getMore(callback) { - super._getMore((err, response) => { - if (err) { - callback(err); - return; - } - - this._processBatch('nextBatch', response); - - this.emit('more', response); - this.emit('response'); - callback(err, response); - }); - } -} - -/** - * @event ChangeStreamCursor#response - * internal event DO NOT USE - * @ignore - */ - -// Create a new change stream cursor based on self's configuration -function createChangeStreamCursor(self, options) { - const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' }; - applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS); - if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) { - changeStreamStageOptions.allChangesForCluster = true; - } - - const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline); - const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS); - - const changeStreamCursor = new ChangeStreamCursor( - self.topology, - new AggregateOperation(self.parent, pipeline, options), - cursorOptions - ); - - relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']); - - /** - * Fired for each new matching change in the specified namespace. Attaching a `change` - * event listener to a Change Stream will switch the stream into flowing mode. Data will - * then be passed as soon as it is available. - * - * @event ChangeStream#change - * @type {object} - */ - if (self.listenerCount('change') > 0) { - changeStreamCursor.on('data', function(change) { - processNewChange(self, change); - }); - } - - /** - * Change stream close event - * - * @event ChangeStream#close - * @type {null} - */ - - /** - * Change stream end event - * - * @event ChangeStream#end - * @type {null} - */ - - /** - * Emitted each time the change stream stores a new resume token. - * - * @event ChangeStream#resumeTokenChanged - * @type {ResumeToken} - */ - - /** - * Fired when the stream encounters an error. - * - * @event ChangeStream#error - * @type {Error} - */ - changeStreamCursor.on('error', function(error) { - processError(self, error); - }); - - if (self.pipeDestinations) { - const cursorStream = changeStreamCursor.stream(self.streamOptions); - for (let pipeDestination of self.pipeDestinations) { - cursorStream.pipe(pipeDestination); - } - } - - return changeStreamCursor; -} - -function applyKnownOptions(target, source, optionNames) { - optionNames.forEach(name => { - if (source[name]) { - target[name] = source[name]; - } - }); - - return target; -} - -// This method performs a basic server selection loop, satisfying the requirements of -// ChangeStream resumability until the new SDAM layer can be used. -const SELECTION_TIMEOUT = 30000; -function waitForTopologyConnected(topology, options, callback) { - setTimeout(() => { - if (options && options.start == null) { - options.start = now(); - } - - const start = options.start || now(); - const timeout = options.timeout || SELECTION_TIMEOUT; - const readPreference = options.readPreference; - if (topology.isConnected({ readPreference })) { - return callback(); - } - - if (calculateDurationInMs(start) > timeout) { - return callback(new MongoError('Timed out waiting for connection')); - } - - waitForTopologyConnected(topology, options, callback); - }, 500); // this is an arbitrary wait time to allow SDAM to transition -} - -function processNewChange(changeStream, change, callback) { - const cursor = changeStream.cursor; - - // a null change means the cursor has been notified, implicitly closing the change stream - if (change == null) { - changeStream.closed = true; - } - - if (changeStream.closed) { - if (callback) callback(new MongoError('ChangeStream is closed')); - return; - } - - if (change && !change._id) { - const noResumeTokenError = new Error( - 'A change stream document has been received that lacks a resume token (_id).' - ); - - if (!callback) return changeStream.emit('error', noResumeTokenError); - return callback(noResumeTokenError); - } - - // cache the resume token - cursor.cacheResumeToken(change._id); - - // wipe the startAtOperationTime if there was one so that there won't be a conflict - // between resumeToken and startAtOperationTime if we need to reconnect the cursor - changeStream.options.startAtOperationTime = undefined; - - // Return the change - if (!callback) return changeStream.emit('change', change); - return callback(undefined, change); -} - -function processError(changeStream, error, callback) { - const topology = changeStream.topology; - const cursor = changeStream.cursor; - - // If the change stream has been closed explictly, do not process error. - if (changeStream.closed) { - if (callback) callback(new MongoError('ChangeStream is closed')); - return; - } - - // if the resume succeeds, continue with the new cursor - function resumeWithCursor(newCursor) { - changeStream.cursor = newCursor; - processResumeQueue(changeStream); - } - - // otherwise, raise an error and close the change stream - function unresumableError(err) { - if (!callback) { - changeStream.emit('error', err); - changeStream.emit('close'); - } - processResumeQueue(changeStream, err); - changeStream.closed = true; - } - - if (cursor && isResumableError(error, maxWireVersion(cursor.server))) { - changeStream.cursor = undefined; - - // stop listening to all events from old cursor - ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); - - // close internal cursor, ignore errors - cursor.close(); - - waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => { - // if the topology can't reconnect, close the stream - if (err) return unresumableError(err); - - // create a new cursor, preserving the old cursor's options - const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - - // attempt to continue in emitter mode - if (!callback) return resumeWithCursor(newCursor); - - // attempt to continue in iterator mode - newCursor.hasNext(err => { - // if there's an error immediately after resuming, close the stream - if (err) return unresumableError(err); - resumeWithCursor(newCursor); - }); - }); - return; - } - - if (!callback) return changeStream.emit('error', error); - return callback(error); -} - -/** - * Safely provides a cursor across resume attempts - * - * @param {ChangeStream} changeStream the parent ChangeStream - * @param {function} callback gets the cursor or error - * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor - */ -function getCursor(changeStream, callback) { - if (changeStream.isClosed()) { - callback(new MongoError('ChangeStream is closed.')); - return; - } - - // if a cursor exists and it is open, return it - if (changeStream.cursor) { - callback(undefined, changeStream.cursor); - return; - } - - // no cursor, queue callback until topology reconnects - changeStream[kResumeQueue].push(callback); -} - -/** - * Drain the resume queue when a new has become available - * - * @param {ChangeStream} changeStream the parent ChangeStream - * @param {ChangeStreamCursor?} changeStream.cursor the new cursor - * @param {Error} [err] error getting a new cursor - */ -function processResumeQueue(changeStream, err) { - while (changeStream[kResumeQueue].length) { - const request = changeStream[kResumeQueue].pop(); - if (changeStream.isClosed() && !err) { - request(new MongoError('Change Stream is not open.')); - return; - } - request(err, changeStream.cursor); - } -} - -/** - * The callback format for results - * @callback ChangeStream~resultCallback - * @param {MongoError} error An error instance representing the error during the execution. - * @param {(object|null)} result The result object if the command was executed successfully. - */ - -module.exports = ChangeStream; -- cgit v1.2.3