diff options
author | 2020-11-18 23:26:45 +0100 | |
---|---|---|
committer | 2020-11-18 23:26:45 +0100 | |
commit | 81ddf9b700bc48a1f8e472209f080f9c1d9a9b09 (patch) | |
tree | 8b959d50c5a614cbf9fcb346ed556140374d4b6d /node_modules/mongodb/lib/gridfs-stream | |
parent | 1870f3fdf43707a15fda0f609a021f516f45eb63 (diff) | |
download | website_creator-81ddf9b700bc48a1f8e472209f080f9c1d9a9b09.tar.gz website_creator-81ddf9b700bc48a1f8e472209f080f9c1d9a9b09.tar.bz2 website_creator-81ddf9b700bc48a1f8e472209f080f9c1d9a9b09.zip |
rm node_modules
Diffstat (limited to 'node_modules/mongodb/lib/gridfs-stream')
-rw-r--r-- | node_modules/mongodb/lib/gridfs-stream/download.js | 433 | ||||
-rw-r--r-- | node_modules/mongodb/lib/gridfs-stream/index.js | 359 | ||||
-rw-r--r-- | node_modules/mongodb/lib/gridfs-stream/upload.js | 538 |
3 files changed, 0 insertions, 1330 deletions
diff --git a/node_modules/mongodb/lib/gridfs-stream/download.js b/node_modules/mongodb/lib/gridfs-stream/download.js deleted file mode 100644 index 0aab5dc..0000000 --- a/node_modules/mongodb/lib/gridfs-stream/download.js +++ /dev/null @@ -1,433 +0,0 @@ -'use strict'; - -var stream = require('stream'), - util = require('util'); - -module.exports = GridFSBucketReadStream; - -/** - * A readable stream that enables you to read buffers from GridFS. - * - * Do not instantiate this class directly. Use `openDownloadStream()` instead. - * - * @class - * @extends external:Readable - * @param {Collection} chunks Handle for chunks collection - * @param {Collection} files Handle for files collection - * @param {Object} readPreference The read preference to use - * @param {Object} filter The query to use to find the file document - * @param {Object} [options] Optional settings. - * @param {Number} [options.sort] Optional sort for the file find query - * @param {Number} [options.skip] Optional skip for the file find query - * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from - * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before - * @fires GridFSBucketReadStream#error - * @fires GridFSBucketReadStream#file - */ -function GridFSBucketReadStream(chunks, files, readPreference, filter, options) { - this.s = { - bytesRead: 0, - chunks: chunks, - cursor: null, - expected: 0, - files: files, - filter: filter, - init: false, - expectedEnd: 0, - file: null, - options: options, - readPreference: readPreference - }; - - stream.Readable.call(this); -} - -util.inherits(GridFSBucketReadStream, stream.Readable); - -/** - * An error occurred - * - * @event GridFSBucketReadStream#error - * @type {Error} - */ - -/** - * Fires when the stream loaded the file document corresponding to the - * provided id. - * - * @event GridFSBucketReadStream#file - * @type {object} - */ - -/** - * Emitted when a chunk of data is available to be consumed. - * - * @event GridFSBucketReadStream#data - * @type {object} - */ - -/** - * Fired when the stream is exhausted (no more data events). - * - * @event GridFSBucketReadStream#end - * @type {object} - */ - -/** - * Fired when the stream is exhausted and the underlying cursor is killed - * - * @event GridFSBucketReadStream#close - * @type {object} - */ - -/** - * Reads from the cursor and pushes to the stream. - * Private Impl, do not call directly - * @ignore - * @method - */ - -GridFSBucketReadStream.prototype._read = function() { - var _this = this; - if (this.destroyed) { - return; - } - - waitForFile(_this, function() { - doRead(_this); - }); -}; - -/** - * Sets the 0-based offset in bytes to start streaming from. Throws - * an error if this stream has entered flowing mode - * (e.g. if you've already called `on('data')`) - * @method - * @param {Number} start Offset in bytes to start reading at - * @return {GridFSBucketReadStream} Reference to Self - */ - -GridFSBucketReadStream.prototype.start = function(start) { - throwIfInitialized(this); - this.s.options.start = start; - return this; -}; - -/** - * Sets the 0-based offset in bytes to start streaming from. Throws - * an error if this stream has entered flowing mode - * (e.g. if you've already called `on('data')`) - * @method - * @param {Number} end Offset in bytes to stop reading at - * @return {GridFSBucketReadStream} Reference to self - */ - -GridFSBucketReadStream.prototype.end = function(end) { - throwIfInitialized(this); - this.s.options.end = end; - return this; -}; - -/** - * Marks this stream as aborted (will never push another `data` event) - * and kills the underlying cursor. Will emit the 'end' event, and then - * the 'close' event once the cursor is successfully killed. - * - * @method - * @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred. - * @fires GridFSBucketWriteStream#close - * @fires GridFSBucketWriteStream#end - */ - -GridFSBucketReadStream.prototype.abort = function(callback) { - var _this = this; - this.push(null); - this.destroyed = true; - if (this.s.cursor) { - this.s.cursor.close(function(error) { - _this.emit('close'); - callback && callback(error); - }); - } else { - if (!this.s.init) { - // If not initialized, fire close event because we will never - // get a cursor - _this.emit('close'); - } - callback && callback(); - } -}; - -/** - * @ignore - */ - -function throwIfInitialized(self) { - if (self.s.init) { - throw new Error('You cannot change options after the stream has entered' + 'flowing mode!'); - } -} - -/** - * @ignore - */ - -function doRead(_this) { - if (_this.destroyed) { - return; - } - - _this.s.cursor.next(function(error, doc) { - if (_this.destroyed) { - return; - } - if (error) { - return __handleError(_this, error); - } - if (!doc) { - _this.push(null); - - process.nextTick(() => { - _this.s.cursor.close(function(error) { - if (error) { - __handleError(_this, error); - return; - } - - _this.emit('close'); - }); - }); - - return; - } - - var bytesRemaining = _this.s.file.length - _this.s.bytesRead; - var expectedN = _this.s.expected++; - var expectedLength = Math.min(_this.s.file.chunkSize, bytesRemaining); - - if (doc.n > expectedN) { - var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n + ', expected: ' + expectedN; - return __handleError(_this, new Error(errmsg)); - } - - if (doc.n < expectedN) { - errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n + ', expected: ' + expectedN; - return __handleError(_this, new Error(errmsg)); - } - - var buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer; - - if (buf.length !== expectedLength) { - if (bytesRemaining <= 0) { - errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n; - return __handleError(_this, new Error(errmsg)); - } - - errmsg = - 'ChunkIsWrongSize: Got unexpected length: ' + buf.length + ', expected: ' + expectedLength; - return __handleError(_this, new Error(errmsg)); - } - - _this.s.bytesRead += buf.length; - - if (buf.length === 0) { - return _this.push(null); - } - - var sliceStart = null; - var sliceEnd = null; - - if (_this.s.bytesToSkip != null) { - sliceStart = _this.s.bytesToSkip; - _this.s.bytesToSkip = 0; - } - - const atEndOfStream = expectedN === _this.s.expectedEnd - 1; - const bytesLeftToRead = _this.s.options.end - _this.s.bytesToSkip; - if (atEndOfStream && _this.s.bytesToTrim != null) { - sliceEnd = _this.s.file.chunkSize - _this.s.bytesToTrim; - } else if (_this.s.options.end && bytesLeftToRead < doc.data.length()) { - sliceEnd = bytesLeftToRead; - } - - if (sliceStart != null || sliceEnd != null) { - buf = buf.slice(sliceStart || 0, sliceEnd || buf.length); - } - - _this.push(buf); - }); -} - -/** - * @ignore - */ - -function init(self) { - var findOneOptions = {}; - if (self.s.readPreference) { - findOneOptions.readPreference = self.s.readPreference; - } - if (self.s.options && self.s.options.sort) { - findOneOptions.sort = self.s.options.sort; - } - if (self.s.options && self.s.options.skip) { - findOneOptions.skip = self.s.options.skip; - } - - self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) { - if (error) { - return __handleError(self, error); - } - - if (!doc) { - var identifier = self.s.filter._id ? self.s.filter._id.toString() : self.s.filter.filename; - var errmsg = 'FileNotFound: file ' + identifier + ' was not found'; - var err = new Error(errmsg); - err.code = 'ENOENT'; - return __handleError(self, err); - } - - // If document is empty, kill the stream immediately and don't - // execute any reads - if (doc.length <= 0) { - self.push(null); - return; - } - - if (self.destroyed) { - // If user destroys the stream before we have a cursor, wait - // until the query is done to say we're 'closed' because we can't - // cancel a query. - self.emit('close'); - return; - } - - try { - self.s.bytesToSkip = handleStartOption(self, doc, self.s.options); - } catch (error) { - return __handleError(self, error); - } - - var filter = { files_id: doc._id }; - - // Currently (MongoDB 3.4.4) skip function does not support the index, - // it needs to retrieve all the documents first and then skip them. (CS-25811) - // As work around we use $gte on the "n" field. - if (self.s.options && self.s.options.start != null) { - var skip = Math.floor(self.s.options.start / doc.chunkSize); - if (skip > 0) { - filter['n'] = { $gte: skip }; - } - } - self.s.cursor = self.s.chunks.find(filter).sort({ n: 1 }); - - if (self.s.readPreference) { - self.s.cursor.setReadPreference(self.s.readPreference); - } - - self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize); - self.s.file = doc; - - try { - self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor, self.s.options); - } catch (error) { - return __handleError(self, error); - } - - self.emit('file', doc); - }); -} - -/** - * @ignore - */ - -function waitForFile(_this, callback) { - if (_this.s.file) { - return callback(); - } - - if (!_this.s.init) { - init(_this); - _this.s.init = true; - } - - _this.once('file', function() { - callback(); - }); -} - -/** - * @ignore - */ - -function handleStartOption(stream, doc, options) { - if (options && options.start != null) { - if (options.start > doc.length) { - throw new Error( - 'Stream start (' + - options.start + - ') must not be ' + - 'more than the length of the file (' + - doc.length + - ')' - ); - } - if (options.start < 0) { - throw new Error('Stream start (' + options.start + ') must not be ' + 'negative'); - } - if (options.end != null && options.end < options.start) { - throw new Error( - 'Stream start (' + - options.start + - ') must not be ' + - 'greater than stream end (' + - options.end + - ')' - ); - } - - stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize; - stream.s.expected = Math.floor(options.start / doc.chunkSize); - - return options.start - stream.s.bytesRead; - } -} - -/** - * @ignore - */ - -function handleEndOption(stream, doc, cursor, options) { - if (options && options.end != null) { - if (options.end > doc.length) { - throw new Error( - 'Stream end (' + - options.end + - ') must not be ' + - 'more than the length of the file (' + - doc.length + - ')' - ); - } - if (options.start < 0) { - throw new Error('Stream end (' + options.end + ') must not be ' + 'negative'); - } - - var start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0; - - cursor.limit(Math.ceil(options.end / doc.chunkSize) - start); - - stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize); - - return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end; - } -} - -/** - * @ignore - */ - -function __handleError(_this, error) { - _this.emit('error', error); -} diff --git a/node_modules/mongodb/lib/gridfs-stream/index.js b/node_modules/mongodb/lib/gridfs-stream/index.js deleted file mode 100644 index 6509839..0000000 --- a/node_modules/mongodb/lib/gridfs-stream/index.js +++ /dev/null @@ -1,359 +0,0 @@ -'use strict'; - -var Emitter = require('events').EventEmitter; -var GridFSBucketReadStream = require('./download'); -var GridFSBucketWriteStream = require('./upload'); -var shallowClone = require('../utils').shallowClone; -var toError = require('../utils').toError; -var util = require('util'); -var executeLegacyOperation = require('../utils').executeLegacyOperation; - -var DEFAULT_GRIDFS_BUCKET_OPTIONS = { - bucketName: 'fs', - chunkSizeBytes: 255 * 1024 -}; - -module.exports = GridFSBucket; - -/** - * Constructor for a streaming GridFS interface - * @class - * @extends external:EventEmitter - * @param {Db} db A db handle - * @param {object} [options] Optional settings. - * @param {string} [options.bucketName="fs"] The 'files' and 'chunks' collections will be prefixed with the bucket name followed by a dot. - * @param {number} [options.chunkSizeBytes=255 * 1024] Number of bytes stored in each chunk. Defaults to 255KB - * @param {object} [options.writeConcern] Optional write concern to be passed to write operations, for instance `{ w: 1 }` - * @param {object} [options.readPreference] Optional read preference to be passed to read operations - * @fires GridFSBucketWriteStream#index - */ - -function GridFSBucket(db, options) { - Emitter.apply(this); - this.setMaxListeners(0); - - if (options && typeof options === 'object') { - options = shallowClone(options); - var keys = Object.keys(DEFAULT_GRIDFS_BUCKET_OPTIONS); - for (var i = 0; i < keys.length; ++i) { - if (!options[keys[i]]) { - options[keys[i]] = DEFAULT_GRIDFS_BUCKET_OPTIONS[keys[i]]; - } - } - } else { - options = DEFAULT_GRIDFS_BUCKET_OPTIONS; - } - - this.s = { - db: db, - options: options, - _chunksCollection: db.collection(options.bucketName + '.chunks'), - _filesCollection: db.collection(options.bucketName + '.files'), - checkedIndexes: false, - calledOpenUploadStream: false, - promiseLibrary: db.s.promiseLibrary || Promise - }; -} - -util.inherits(GridFSBucket, Emitter); - -/** - * When the first call to openUploadStream is made, the upload stream will - * check to see if it needs to create the proper indexes on the chunks and - * files collections. This event is fired either when 1) it determines that - * no index creation is necessary, 2) when it successfully creates the - * necessary indexes. - * - * @event GridFSBucket#index - * @type {Error} - */ - -/** - * Returns a writable stream (GridFSBucketWriteStream) for writing - * buffers to GridFS. The stream's 'id' property contains the resulting - * file's id. - * @method - * @param {string} filename The value of the 'filename' key in the files doc - * @param {object} [options] Optional settings. - * @param {number} [options.chunkSizeBytes] Optional overwrite this bucket's chunkSizeBytes for this file - * @param {object} [options.metadata] Optional object to store in the file document's `metadata` field - * @param {string} [options.contentType] Optional string to store in the file document's `contentType` field - * @param {array} [options.aliases] Optional array of strings to store in the file document's `aliases` field - * @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data - * @return {GridFSBucketWriteStream} - */ - -GridFSBucket.prototype.openUploadStream = function(filename, options) { - if (options) { - options = shallowClone(options); - } else { - options = {}; - } - if (!options.chunkSizeBytes) { - options.chunkSizeBytes = this.s.options.chunkSizeBytes; - } - return new GridFSBucketWriteStream(this, filename, options); -}; - -/** - * Returns a writable stream (GridFSBucketWriteStream) for writing - * buffers to GridFS for a custom file id. The stream's 'id' property contains the resulting - * file's id. - * @method - * @param {string|number|object} id A custom id used to identify the file - * @param {string} filename The value of the 'filename' key in the files doc - * @param {object} [options] Optional settings. - * @param {number} [options.chunkSizeBytes] Optional overwrite this bucket's chunkSizeBytes for this file - * @param {object} [options.metadata] Optional object to store in the file document's `metadata` field - * @param {string} [options.contentType] Optional string to store in the file document's `contentType` field - * @param {array} [options.aliases] Optional array of strings to store in the file document's `aliases` field - * @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data - * @return {GridFSBucketWriteStream} - */ - -GridFSBucket.prototype.openUploadStreamWithId = function(id, filename, options) { - if (options) { - options = shallowClone(options); - } else { - options = {}; - } - - if (!options.chunkSizeBytes) { - options.chunkSizeBytes = this.s.options.chunkSizeBytes; - } - - options.id = id; - - return new GridFSBucketWriteStream(this, filename, options); -}; - -/** - * Returns a readable stream (GridFSBucketReadStream) for streaming file - * data from GridFS. - * @method - * @param {ObjectId} id The id of the file doc - * @param {Object} [options] Optional settings. - * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from - * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before - * @return {GridFSBucketReadStream} - */ - -GridFSBucket.prototype.openDownloadStream = function(id, options) { - var filter = { _id: id }; - options = { - start: options && options.start, - end: options && options.end - }; - - return new GridFSBucketReadStream( - this.s._chunksCollection, - this.s._filesCollection, - this.s.options.readPreference, - filter, - options - ); -}; - -/** - * Deletes a file with the given id - * @method - * @param {ObjectId} id The id of the file doc - * @param {GridFSBucket~errorCallback} [callback] - */ - -GridFSBucket.prototype.delete = function(id, callback) { - return executeLegacyOperation(this.s.db.s.topology, _delete, [this, id, callback], { - skipSessions: true - }); -}; - -/** - * @ignore - */ - -function _delete(_this, id, callback) { - _this.s._filesCollection.deleteOne({ _id: id }, function(error, res) { - if (error) { - return callback(error); - } - - _this.s._chunksCollection.deleteMany({ files_id: id }, function(error) { - if (error) { - return callback(error); - } - - // Delete orphaned chunks before returning FileNotFound - if (!res.result.n) { - var errmsg = 'FileNotFound: no file with id ' + id + ' found'; - return callback(new Error(errmsg)); - } - - callback(); - }); - }); -} - -/** - * Convenience wrapper around find on the files collection - * @method - * @param {Object} filter - * @param {Object} [options] Optional settings for cursor - * @param {number} [options.batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find|find command documentation}. - * @param {number} [options.limit] Optional limit for cursor - * @param {number} [options.maxTimeMS] Optional maxTimeMS for cursor - * @param {boolean} [options.noCursorTimeout] Optionally set cursor's `noCursorTimeout` flag - * @param {number} [options.skip] Optional skip for cursor - * @param {object} [options.sort] Optional sort for cursor - * @return {Cursor} - */ - -GridFSBucket.prototype.find = function(filter, options) { - filter = filter || {}; - options = options || {}; - - var cursor = this.s._filesCollection.find(filter); - - if (options.batchSize != null) { - cursor.batchSize(options.batchSize); - } - if (options.limit != null) { - cursor.limit(options.limit); - } - if (options.maxTimeMS != null) { - cursor.maxTimeMS(options.maxTimeMS); - } - if (options.noCursorTimeout != null) { - cursor.addCursorFlag('noCursorTimeout', options.noCursorTimeout); - } - if (options.skip != null) { - cursor.skip(options.skip); - } - if (options.sort != null) { - cursor.sort(options.sort); - } - - return cursor; -}; - -/** - * Returns a readable stream (GridFSBucketReadStream) for streaming the - * file with the given name from GridFS. If there are multiple files with - * the same name, this will stream the most recent file with the given name - * (as determined by the `uploadDate` field). You can set the `revision` - * option to change this behavior. - * @method - * @param {String} filename The name of the file to stream - * @param {Object} [options] Optional settings - * @param {number} [options.revision=-1] The revision number relative to the oldest file with the given filename. 0 gets you the oldest file, 1 gets you the 2nd oldest, -1 gets you the newest. - * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from - * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before - * @return {GridFSBucketReadStream} - */ - -GridFSBucket.prototype.openDownloadStreamByName = function(filename, options) { - var sort = { uploadDate: -1 }; - var skip = null; - if (options && options.revision != null) { - if (options.revision >= 0) { - sort = { uploadDate: 1 }; - skip = options.revision; - } else { - skip = -options.revision - 1; - } - } - - var filter = { filename: filename }; - options = { - sort: sort, - skip: skip, - start: options && options.start, - end: options && options.end - }; - return new GridFSBucketReadStream( - this.s._chunksCollection, - this.s._filesCollection, - this.s.options.readPreference, - filter, - options - ); -}; - -/** - * Renames the file with the given _id to the given string - * @method - * @param {ObjectId} id the id of the file to rename - * @param {String} filename new name for the file - * @param {GridFSBucket~errorCallback} [callback] - */ - -GridFSBucket.prototype.rename = function(id, filename, callback) { - return executeLegacyOperation(this.s.db.s.topology, _rename, [this, id, filename, callback], { - skipSessions: true - }); -}; - -/** - * @ignore - */ - -function _rename(_this, id, filename, callback) { - var filter = { _id: id }; - var update = { $set: { filename: filename } }; - _this.s._filesCollection.updateOne(filter, update, function(error, res) { - if (error) { - return callback(error); - } - if (!res.result.n) { - return callback(toError('File with id ' + id + ' not found')); - } - callback(); - }); -} - -/** - * Removes this bucket's files collection, followed by its chunks collection. - * @method - * @param {GridFSBucket~errorCallback} [callback] - */ - -GridFSBucket.prototype.drop = function(callback) { - return executeLegacyOperation(this.s.db.s.topology, _drop, [this, callback], { - skipSessions: true - }); -}; - -/** - * Return the db logger - * @method - * @return {Logger} return the db logger - * @ignore - */ -GridFSBucket.prototype.getLogger = function() { - return this.s.db.s.logger; -}; - -/** - * @ignore - */ - -function _drop(_this, callback) { - _this.s._filesCollection.drop(function(error) { - if (error) { - return callback(error); - } - _this.s._chunksCollection.drop(function(error) { - if (error) { - return callback(error); - } - - return callback(); - }); - }); -} - -/** - * Callback format for all GridFSBucket methods that can accept a callback. - * @callback GridFSBucket~errorCallback - * @param {MongoError|undefined} error If present, an error instance representing any errors that occurred - * @param {*} result If present, a returned result for the method - */ diff --git a/node_modules/mongodb/lib/gridfs-stream/upload.js b/node_modules/mongodb/lib/gridfs-stream/upload.js deleted file mode 100644 index 578949a..0000000 --- a/node_modules/mongodb/lib/gridfs-stream/upload.js +++ /dev/null @@ -1,538 +0,0 @@ -'use strict'; - -var core = require('../core'); -var crypto = require('crypto'); -var stream = require('stream'); -var util = require('util'); -var Buffer = require('safe-buffer').Buffer; - -var ERROR_NAMESPACE_NOT_FOUND = 26; - -module.exports = GridFSBucketWriteStream; - -/** - * A writable stream that enables you to write buffers to GridFS. - * - * Do not instantiate this class directly. Use `openUploadStream()` instead. - * - * @class - * @extends external:Writable - * @param {GridFSBucket} bucket Handle for this stream's corresponding bucket - * @param {string} filename The value of the 'filename' key in the files doc - * @param {object} [options] Optional settings. - * @param {string|number|object} [options.id] Custom file id for the GridFS file. - * @param {number} [options.chunkSizeBytes] The chunk size to use, in bytes - * @param {number} [options.w] The write concern - * @param {number} [options.wtimeout] The write concern timeout - * @param {number} [options.j] The journal write concern - * @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data - * @fires GridFSBucketWriteStream#error - * @fires GridFSBucketWriteStream#finish - */ - -function GridFSBucketWriteStream(bucket, filename, options) { - options = options || {}; - this.bucket = bucket; - this.chunks = bucket.s._chunksCollection; - this.filename = filename; - this.files = bucket.s._filesCollection; - this.options = options; - // Signals the write is all done - this.done = false; - - this.id = options.id ? options.id : core.BSON.ObjectId(); - this.chunkSizeBytes = this.options.chunkSizeBytes; - this.bufToStore = Buffer.alloc(this.chunkSizeBytes); - this.length = 0; - this.md5 = !options.disableMD5 && crypto.createHash('md5'); - this.n = 0; - this.pos = 0; - this.state = { - streamEnd: false, - outstandingRequests: 0, - errored: false, - aborted: false, - promiseLibrary: this.bucket.s.promiseLibrary - }; - - if (!this.bucket.s.calledOpenUploadStream) { - this.bucket.s.calledOpenUploadStream = true; - - var _this = this; - checkIndexes(this, function() { - _this.bucket.s.checkedIndexes = true; - _this.bucket.emit('index'); - }); - } -} - -util.inherits(GridFSBucketWriteStream, stream.Writable); - -/** - * An error occurred - * - * @event GridFSBucketWriteStream#error - * @type {Error} - */ - -/** - * `end()` was called and the write stream successfully wrote the file - * metadata and all the chunks to MongoDB. - * - * @event GridFSBucketWriteStream#finish - * @type {object} - */ - -/** - * Write a buffer to the stream. - * - * @method - * @param {Buffer} chunk Buffer to write - * @param {String} encoding Optional encoding for the buffer - * @param {GridFSBucket~errorCallback} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush. - * @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise. - */ - -GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) { - var _this = this; - return waitForIndexes(this, function() { - return doWrite(_this, chunk, encoding, callback); - }); -}; - -/** - * Places this write stream into an aborted state (all future writes fail) - * and deletes all chunks that have already been written. - * - * @method - * @param {GridFSBucket~errorCallback} callback called when chunks are successfully removed or error occurred - * @return {Promise} if no callback specified - */ - -GridFSBucketWriteStream.prototype.abort = function(callback) { - if (this.state.streamEnd) { - var error = new Error('Cannot abort a stream that has already completed'); - if (typeof callback === 'function') { - return callback(error); - } - return this.state.promiseLibrary.reject(error); - } - if (this.state.aborted) { - error = new Error('Cannot call abort() on a stream twice'); - if (typeof callback === 'function') { - return callback(error); - } - return this.state.promiseLibrary.reject(error); - } - this.state.aborted = true; - this.chunks.deleteMany({ files_id: this.id }, function(error) { - if (typeof callback === 'function') callback(error); - }); -}; - -/** - * Tells the stream that no more data will be coming in. The stream will - * persist the remaining data to MongoDB, write the files document, and - * then emit a 'finish' event. - * - * @method - * @param {Buffer} chunk Buffer to write - * @param {String} encoding Optional encoding for the buffer - * @param {GridFSBucket~errorCallback} callback Function to call when all files and chunks have been persisted to MongoDB - */ - -GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) { - var _this = this; - if (typeof chunk === 'function') { - (callback = chunk), (chunk = null), (encoding = null); - } else if (typeof encoding === 'function') { - (callback = encoding), (encoding = null); - } - - if (checkAborted(this, callback)) { - return; - } - this.state.streamEnd = true; - - if (callback) { - this.once('finish', function(result) { - callback(null, result); - }); - } - - if (!chunk) { - waitForIndexes(this, function() { - writeRemnant(_this); - }); - return; - } - - this.write(chunk, encoding, function() { - writeRemnant(_this); - }); -}; - -/** - * @ignore - */ - -function __handleError(_this, error, callback) { - if (_this.state.errored) { - return; - } - _this.state.errored = true; - if (callback) { - return callback(error); - } - _this.emit('error', error); -} - -/** - * @ignore - */ - -function createChunkDoc(filesId, n, data) { - return { - _id: core.BSON.ObjectId(), - files_id: filesId, - n: n, - data: data - }; -} - -/** - * @ignore - */ - -function checkChunksIndex(_this, callback) { - _this.chunks.listIndexes().toArray(function(error, indexes) { - if (error) { - // Collection doesn't exist so create index - if (error.code === ERROR_NAMESPACE_NOT_FOUND) { - var index = { files_id: 1, n: 1 }; - _this.chunks.createIndex(index, { background: false, unique: true }, function(error) { - if (error) { - return callback(error); - } - - callback(); - }); - return; - } - return callback(error); - } - - var hasChunksIndex = false; - indexes.forEach(function(index) { - if (index.key) { - var keys = Object.keys(index.key); - if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) { - hasChunksIndex = true; - } - } - }); - - if (hasChunksIndex) { - callback(); - } else { - index = { files_id: 1, n: 1 }; - var indexOptions = getWriteOptions(_this); - - indexOptions.background = false; - indexOptions.unique = true; - - _this.chunks.createIndex(index, indexOptions, function(error) { - if (error) { - return callback(error); - } - - callback(); - }); - } - }); -} - -/** - * @ignore - */ - -function checkDone(_this, callback) { - if (_this.done) return true; - if (_this.state.streamEnd && _this.state.outstandingRequests === 0 && !_this.state.errored) { - // Set done so we dont' trigger duplicate createFilesDoc - _this.done = true; - // Create a new files doc - var filesDoc = createFilesDoc( - _this.id, - _this.length, - _this.chunkSizeBytes, - _this.md5 && _this.md5.digest('hex'), - _this.filename, - _this.options.contentType, - _this.options.aliases, - _this.options.metadata - ); - - if (checkAborted(_this, callback)) { - return false; - } - - _this.files.insertOne(filesDoc, getWriteOptions(_this), function(error) { - if (error) { - return __handleError(_this, error, callback); - } - _this.emit('finish', filesDoc); - }); - - return true; - } - - return false; -} - -/** - * @ignore - */ - -function checkIndexes(_this, callback) { - _this.files.findOne({}, { _id: 1 }, function(error, doc) { - if (error) { - return callback(error); - } - if (doc) { - return callback(); - } - - _this.files.listIndexes().toArray(function(error, indexes) { - if (error) { - // Collection doesn't exist so create index - if (error.code === ERROR_NAMESPACE_NOT_FOUND) { - var index = { filename: 1, uploadDate: 1 }; - _this.files.createIndex(index, { background: false }, function(error) { - if (error) { - return callback(error); - } - - checkChunksIndex(_this, callback); - }); - return; - } - return callback(error); - } - - var hasFileIndex = false; - indexes.forEach(function(index) { - var keys = Object.keys(index.key); - if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) { - hasFileIndex = true; - } - }); - - if (hasFileIndex) { - checkChunksIndex(_this, callback); - } else { - index = { filename: 1, uploadDate: 1 }; - - var indexOptions = getWriteOptions(_this); - - indexOptions.background = false; - - _this.files.createIndex(index, indexOptions, function(error) { - if (error) { - return callback(error); - } - - checkChunksIndex(_this, callback); - }); - } - }); - }); -} - -/** - * @ignore - */ - -function createFilesDoc(_id, length, chunkSize, md5, filename, contentType, aliases, metadata) { - var ret = { - _id: _id, - length: length, - chunkSize: chunkSize, - uploadDate: new Date(), - filename: filename - }; - - if (md5) { - ret.md5 = md5; - } - - if (contentType) { - ret.contentType = contentType; - } - - if (aliases) { - ret.aliases = aliases; - } - - if (metadata) { - ret.metadata = metadata; - } - - return ret; -} - -/** - * @ignore - */ - -function doWrite(_this, chunk, encoding, callback) { - if (checkAborted(_this, callback)) { - return false; - } - - var inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); - - _this.length += inputBuf.length; - - // Input is small enough to fit in our buffer - if (_this.pos + inputBuf.length < _this.chunkSizeBytes) { - inputBuf.copy(_this.bufToStore, _this.pos); - _this.pos += inputBuf.length; - - callback && callback(); - - // Note that we reverse the typical semantics of write's return value - // to be compatible with node's `.pipe()` function. - // True means client can keep writing. - return true; - } - - // Otherwise, buffer is too big for current chunk, so we need to flush - // to MongoDB. - var inputBufRemaining = inputBuf.length; - var spaceRemaining = _this.chunkSizeBytes - _this.pos; - var numToCopy = Math.min(spaceRemaining, inputBuf.length); - var outstandingRequests = 0; - while (inputBufRemaining > 0) { - var inputBufPos = inputBuf.length - inputBufRemaining; - inputBuf.copy(_this.bufToStore, _this.pos, inputBufPos, inputBufPos + numToCopy); - _this.pos += numToCopy; - spaceRemaining -= numToCopy; - if (spaceRemaining === 0) { - if (_this.md5) { - _this.md5.update(_this.bufToStore); - } - var doc = createChunkDoc(_this.id, _this.n, Buffer.from(_this.bufToStore)); - ++_this.state.outstandingRequests; - ++outstandingRequests; - - if (checkAborted(_this, callback)) { - return false; - } - - _this.chunks.insertOne(doc, getWriteOptions(_this), function(error) { - if (error) { - return __handleError(_this, error); - } - --_this.state.outstandingRequests; - --outstandingRequests; - - if (!outstandingRequests) { - _this.emit('drain', doc); - callback && callback(); - checkDone(_this); - } - }); - - spaceRemaining = _this.chunkSizeBytes; - _this.pos = 0; - ++_this.n; - } - inputBufRemaining -= numToCopy; - numToCopy = Math.min(spaceRemaining, inputBufRemaining); - } - - // Note that we reverse the typical semantics of write's return value - // to be compatible with node's `.pipe()` function. - // False means the client should wait for the 'drain' event. - return false; -} - -/** - * @ignore - */ - -function getWriteOptions(_this) { - var obj = {}; - if (_this.options.writeConcern) { - obj.w = _this.options.writeConcern.w; - obj.wtimeout = _this.options.writeConcern.wtimeout; - obj.j = _this.options.writeConcern.j; - } - return obj; -} - -/** - * @ignore - */ - -function waitForIndexes(_this, callback) { - if (_this.bucket.s.checkedIndexes) { - return callback(false); - } - - _this.bucket.once('index', function() { - callback(true); - }); - - return true; -} - -/** - * @ignore - */ - -function writeRemnant(_this, callback) { - // Buffer is empty, so don't bother to insert - if (_this.pos === 0) { - return checkDone(_this, callback); - } - - ++_this.state.outstandingRequests; - - // Create a new buffer to make sure the buffer isn't bigger than it needs - // to be. - var remnant = Buffer.alloc(_this.pos); - _this.bufToStore.copy(remnant, 0, 0, _this.pos); - if (_this.md5) { - _this.md5.update(remnant); - } - var doc = createChunkDoc(_this.id, _this.n, remnant); - - // If the stream was aborted, do not write remnant - if (checkAborted(_this, callback)) { - return false; - } - - _this.chunks.insertOne(doc, getWriteOptions(_this), function(error) { - if (error) { - return __handleError(_this, error); - } - --_this.state.outstandingRequests; - checkDone(_this); - }); -} - -/** - * @ignore - */ - -function checkAborted(_this, callback) { - if (_this.state.aborted) { - if (typeof callback === 'function') { - callback(new Error('this stream has been aborted')); - } - return true; - } - return false; -} |