summaryrefslogtreecommitdiffstats
path: root/node_modules/mongodb/lib/gridfs-stream
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/mongodb/lib/gridfs-stream')
-rw-r--r--node_modules/mongodb/lib/gridfs-stream/download.js433
-rw-r--r--node_modules/mongodb/lib/gridfs-stream/index.js359
-rw-r--r--node_modules/mongodb/lib/gridfs-stream/upload.js538
3 files changed, 0 insertions, 1330 deletions
diff --git a/node_modules/mongodb/lib/gridfs-stream/download.js b/node_modules/mongodb/lib/gridfs-stream/download.js
deleted file mode 100644
index 0aab5dc..0000000
--- a/node_modules/mongodb/lib/gridfs-stream/download.js
+++ /dev/null
@@ -1,433 +0,0 @@
-'use strict';
-
-var stream = require('stream'),
- util = require('util');
-
-module.exports = GridFSBucketReadStream;
-
-/**
- * A readable stream that enables you to read buffers from GridFS.
- *
- * Do not instantiate this class directly. Use `openDownloadStream()` instead.
- *
- * @class
- * @extends external:Readable
- * @param {Collection} chunks Handle for chunks collection
- * @param {Collection} files Handle for files collection
- * @param {Object} readPreference The read preference to use
- * @param {Object} filter The query to use to find the file document
- * @param {Object} [options] Optional settings.
- * @param {Number} [options.sort] Optional sort for the file find query
- * @param {Number} [options.skip] Optional skip for the file find query
- * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from
- * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before
- * @fires GridFSBucketReadStream#error
- * @fires GridFSBucketReadStream#file
- */
-function GridFSBucketReadStream(chunks, files, readPreference, filter, options) {
- this.s = {
- bytesRead: 0,
- chunks: chunks,
- cursor: null,
- expected: 0,
- files: files,
- filter: filter,
- init: false,
- expectedEnd: 0,
- file: null,
- options: options,
- readPreference: readPreference
- };
-
- stream.Readable.call(this);
-}
-
-util.inherits(GridFSBucketReadStream, stream.Readable);
-
-/**
- * An error occurred
- *
- * @event GridFSBucketReadStream#error
- * @type {Error}
- */
-
-/**
- * Fires when the stream loaded the file document corresponding to the
- * provided id.
- *
- * @event GridFSBucketReadStream#file
- * @type {object}
- */
-
-/**
- * Emitted when a chunk of data is available to be consumed.
- *
- * @event GridFSBucketReadStream#data
- * @type {object}
- */
-
-/**
- * Fired when the stream is exhausted (no more data events).
- *
- * @event GridFSBucketReadStream#end
- * @type {object}
- */
-
-/**
- * Fired when the stream is exhausted and the underlying cursor is killed
- *
- * @event GridFSBucketReadStream#close
- * @type {object}
- */
-
-/**
- * Reads from the cursor and pushes to the stream.
- * Private Impl, do not call directly
- * @ignore
- * @method
- */
-
-GridFSBucketReadStream.prototype._read = function() {
- var _this = this;
- if (this.destroyed) {
- return;
- }
-
- waitForFile(_this, function() {
- doRead(_this);
- });
-};
-
-/**
- * Sets the 0-based offset in bytes to start streaming from. Throws
- * an error if this stream has entered flowing mode
- * (e.g. if you've already called `on('data')`)
- * @method
- * @param {Number} start Offset in bytes to start reading at
- * @return {GridFSBucketReadStream} Reference to Self
- */
-
-GridFSBucketReadStream.prototype.start = function(start) {
- throwIfInitialized(this);
- this.s.options.start = start;
- return this;
-};
-
-/**
- * Sets the 0-based offset in bytes to start streaming from. Throws
- * an error if this stream has entered flowing mode
- * (e.g. if you've already called `on('data')`)
- * @method
- * @param {Number} end Offset in bytes to stop reading at
- * @return {GridFSBucketReadStream} Reference to self
- */
-
-GridFSBucketReadStream.prototype.end = function(end) {
- throwIfInitialized(this);
- this.s.options.end = end;
- return this;
-};
-
-/**
- * Marks this stream as aborted (will never push another `data` event)
- * and kills the underlying cursor. Will emit the 'end' event, and then
- * the 'close' event once the cursor is successfully killed.
- *
- * @method
- * @param {GridFSBucket~errorCallback} [callback] called when the cursor is successfully closed or an error occurred.
- * @fires GridFSBucketWriteStream#close
- * @fires GridFSBucketWriteStream#end
- */
-
-GridFSBucketReadStream.prototype.abort = function(callback) {
- var _this = this;
- this.push(null);
- this.destroyed = true;
- if (this.s.cursor) {
- this.s.cursor.close(function(error) {
- _this.emit('close');
- callback && callback(error);
- });
- } else {
- if (!this.s.init) {
- // If not initialized, fire close event because we will never
- // get a cursor
- _this.emit('close');
- }
- callback && callback();
- }
-};
-
-/**
- * @ignore
- */
-
-function throwIfInitialized(self) {
- if (self.s.init) {
- throw new Error('You cannot change options after the stream has entered' + 'flowing mode!');
- }
-}
-
-/**
- * @ignore
- */
-
-function doRead(_this) {
- if (_this.destroyed) {
- return;
- }
-
- _this.s.cursor.next(function(error, doc) {
- if (_this.destroyed) {
- return;
- }
- if (error) {
- return __handleError(_this, error);
- }
- if (!doc) {
- _this.push(null);
-
- process.nextTick(() => {
- _this.s.cursor.close(function(error) {
- if (error) {
- __handleError(_this, error);
- return;
- }
-
- _this.emit('close');
- });
- });
-
- return;
- }
-
- var bytesRemaining = _this.s.file.length - _this.s.bytesRead;
- var expectedN = _this.s.expected++;
- var expectedLength = Math.min(_this.s.file.chunkSize, bytesRemaining);
-
- if (doc.n > expectedN) {
- var errmsg = 'ChunkIsMissing: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
- return __handleError(_this, new Error(errmsg));
- }
-
- if (doc.n < expectedN) {
- errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n + ', expected: ' + expectedN;
- return __handleError(_this, new Error(errmsg));
- }
-
- var buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
-
- if (buf.length !== expectedLength) {
- if (bytesRemaining <= 0) {
- errmsg = 'ExtraChunk: Got unexpected n: ' + doc.n;
- return __handleError(_this, new Error(errmsg));
- }
-
- errmsg =
- 'ChunkIsWrongSize: Got unexpected length: ' + buf.length + ', expected: ' + expectedLength;
- return __handleError(_this, new Error(errmsg));
- }
-
- _this.s.bytesRead += buf.length;
-
- if (buf.length === 0) {
- return _this.push(null);
- }
-
- var sliceStart = null;
- var sliceEnd = null;
-
- if (_this.s.bytesToSkip != null) {
- sliceStart = _this.s.bytesToSkip;
- _this.s.bytesToSkip = 0;
- }
-
- const atEndOfStream = expectedN === _this.s.expectedEnd - 1;
- const bytesLeftToRead = _this.s.options.end - _this.s.bytesToSkip;
- if (atEndOfStream && _this.s.bytesToTrim != null) {
- sliceEnd = _this.s.file.chunkSize - _this.s.bytesToTrim;
- } else if (_this.s.options.end && bytesLeftToRead < doc.data.length()) {
- sliceEnd = bytesLeftToRead;
- }
-
- if (sliceStart != null || sliceEnd != null) {
- buf = buf.slice(sliceStart || 0, sliceEnd || buf.length);
- }
-
- _this.push(buf);
- });
-}
-
-/**
- * @ignore
- */
-
-function init(self) {
- var findOneOptions = {};
- if (self.s.readPreference) {
- findOneOptions.readPreference = self.s.readPreference;
- }
- if (self.s.options && self.s.options.sort) {
- findOneOptions.sort = self.s.options.sort;
- }
- if (self.s.options && self.s.options.skip) {
- findOneOptions.skip = self.s.options.skip;
- }
-
- self.s.files.findOne(self.s.filter, findOneOptions, function(error, doc) {
- if (error) {
- return __handleError(self, error);
- }
-
- if (!doc) {
- var identifier = self.s.filter._id ? self.s.filter._id.toString() : self.s.filter.filename;
- var errmsg = 'FileNotFound: file ' + identifier + ' was not found';
- var err = new Error(errmsg);
- err.code = 'ENOENT';
- return __handleError(self, err);
- }
-
- // If document is empty, kill the stream immediately and don't
- // execute any reads
- if (doc.length <= 0) {
- self.push(null);
- return;
- }
-
- if (self.destroyed) {
- // If user destroys the stream before we have a cursor, wait
- // until the query is done to say we're 'closed' because we can't
- // cancel a query.
- self.emit('close');
- return;
- }
-
- try {
- self.s.bytesToSkip = handleStartOption(self, doc, self.s.options);
- } catch (error) {
- return __handleError(self, error);
- }
-
- var filter = { files_id: doc._id };
-
- // Currently (MongoDB 3.4.4) skip function does not support the index,
- // it needs to retrieve all the documents first and then skip them. (CS-25811)
- // As work around we use $gte on the "n" field.
- if (self.s.options && self.s.options.start != null) {
- var skip = Math.floor(self.s.options.start / doc.chunkSize);
- if (skip > 0) {
- filter['n'] = { $gte: skip };
- }
- }
- self.s.cursor = self.s.chunks.find(filter).sort({ n: 1 });
-
- if (self.s.readPreference) {
- self.s.cursor.setReadPreference(self.s.readPreference);
- }
-
- self.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
- self.s.file = doc;
-
- try {
- self.s.bytesToTrim = handleEndOption(self, doc, self.s.cursor, self.s.options);
- } catch (error) {
- return __handleError(self, error);
- }
-
- self.emit('file', doc);
- });
-}
-
-/**
- * @ignore
- */
-
-function waitForFile(_this, callback) {
- if (_this.s.file) {
- return callback();
- }
-
- if (!_this.s.init) {
- init(_this);
- _this.s.init = true;
- }
-
- _this.once('file', function() {
- callback();
- });
-}
-
-/**
- * @ignore
- */
-
-function handleStartOption(stream, doc, options) {
- if (options && options.start != null) {
- if (options.start > doc.length) {
- throw new Error(
- 'Stream start (' +
- options.start +
- ') must not be ' +
- 'more than the length of the file (' +
- doc.length +
- ')'
- );
- }
- if (options.start < 0) {
- throw new Error('Stream start (' + options.start + ') must not be ' + 'negative');
- }
- if (options.end != null && options.end < options.start) {
- throw new Error(
- 'Stream start (' +
- options.start +
- ') must not be ' +
- 'greater than stream end (' +
- options.end +
- ')'
- );
- }
-
- stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
- stream.s.expected = Math.floor(options.start / doc.chunkSize);
-
- return options.start - stream.s.bytesRead;
- }
-}
-
-/**
- * @ignore
- */
-
-function handleEndOption(stream, doc, cursor, options) {
- if (options && options.end != null) {
- if (options.end > doc.length) {
- throw new Error(
- 'Stream end (' +
- options.end +
- ') must not be ' +
- 'more than the length of the file (' +
- doc.length +
- ')'
- );
- }
- if (options.start < 0) {
- throw new Error('Stream end (' + options.end + ') must not be ' + 'negative');
- }
-
- var start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
-
- cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
-
- stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
-
- return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
- }
-}
-
-/**
- * @ignore
- */
-
-function __handleError(_this, error) {
- _this.emit('error', error);
-}
diff --git a/node_modules/mongodb/lib/gridfs-stream/index.js b/node_modules/mongodb/lib/gridfs-stream/index.js
deleted file mode 100644
index 6509839..0000000
--- a/node_modules/mongodb/lib/gridfs-stream/index.js
+++ /dev/null
@@ -1,359 +0,0 @@
-'use strict';
-
-var Emitter = require('events').EventEmitter;
-var GridFSBucketReadStream = require('./download');
-var GridFSBucketWriteStream = require('./upload');
-var shallowClone = require('../utils').shallowClone;
-var toError = require('../utils').toError;
-var util = require('util');
-var executeLegacyOperation = require('../utils').executeLegacyOperation;
-
-var DEFAULT_GRIDFS_BUCKET_OPTIONS = {
- bucketName: 'fs',
- chunkSizeBytes: 255 * 1024
-};
-
-module.exports = GridFSBucket;
-
-/**
- * Constructor for a streaming GridFS interface
- * @class
- * @extends external:EventEmitter
- * @param {Db} db A db handle
- * @param {object} [options] Optional settings.
- * @param {string} [options.bucketName="fs"] The 'files' and 'chunks' collections will be prefixed with the bucket name followed by a dot.
- * @param {number} [options.chunkSizeBytes=255 * 1024] Number of bytes stored in each chunk. Defaults to 255KB
- * @param {object} [options.writeConcern] Optional write concern to be passed to write operations, for instance `{ w: 1 }`
- * @param {object} [options.readPreference] Optional read preference to be passed to read operations
- * @fires GridFSBucketWriteStream#index
- */
-
-function GridFSBucket(db, options) {
- Emitter.apply(this);
- this.setMaxListeners(0);
-
- if (options && typeof options === 'object') {
- options = shallowClone(options);
- var keys = Object.keys(DEFAULT_GRIDFS_BUCKET_OPTIONS);
- for (var i = 0; i < keys.length; ++i) {
- if (!options[keys[i]]) {
- options[keys[i]] = DEFAULT_GRIDFS_BUCKET_OPTIONS[keys[i]];
- }
- }
- } else {
- options = DEFAULT_GRIDFS_BUCKET_OPTIONS;
- }
-
- this.s = {
- db: db,
- options: options,
- _chunksCollection: db.collection(options.bucketName + '.chunks'),
- _filesCollection: db.collection(options.bucketName + '.files'),
- checkedIndexes: false,
- calledOpenUploadStream: false,
- promiseLibrary: db.s.promiseLibrary || Promise
- };
-}
-
-util.inherits(GridFSBucket, Emitter);
-
-/**
- * When the first call to openUploadStream is made, the upload stream will
- * check to see if it needs to create the proper indexes on the chunks and
- * files collections. This event is fired either when 1) it determines that
- * no index creation is necessary, 2) when it successfully creates the
- * necessary indexes.
- *
- * @event GridFSBucket#index
- * @type {Error}
- */
-
-/**
- * Returns a writable stream (GridFSBucketWriteStream) for writing
- * buffers to GridFS. The stream's 'id' property contains the resulting
- * file's id.
- * @method
- * @param {string} filename The value of the 'filename' key in the files doc
- * @param {object} [options] Optional settings.
- * @param {number} [options.chunkSizeBytes] Optional overwrite this bucket's chunkSizeBytes for this file
- * @param {object} [options.metadata] Optional object to store in the file document's `metadata` field
- * @param {string} [options.contentType] Optional string to store in the file document's `contentType` field
- * @param {array} [options.aliases] Optional array of strings to store in the file document's `aliases` field
- * @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data
- * @return {GridFSBucketWriteStream}
- */
-
-GridFSBucket.prototype.openUploadStream = function(filename, options) {
- if (options) {
- options = shallowClone(options);
- } else {
- options = {};
- }
- if (!options.chunkSizeBytes) {
- options.chunkSizeBytes = this.s.options.chunkSizeBytes;
- }
- return new GridFSBucketWriteStream(this, filename, options);
-};
-
-/**
- * Returns a writable stream (GridFSBucketWriteStream) for writing
- * buffers to GridFS for a custom file id. The stream's 'id' property contains the resulting
- * file's id.
- * @method
- * @param {string|number|object} id A custom id used to identify the file
- * @param {string} filename The value of the 'filename' key in the files doc
- * @param {object} [options] Optional settings.
- * @param {number} [options.chunkSizeBytes] Optional overwrite this bucket's chunkSizeBytes for this file
- * @param {object} [options.metadata] Optional object to store in the file document's `metadata` field
- * @param {string} [options.contentType] Optional string to store in the file document's `contentType` field
- * @param {array} [options.aliases] Optional array of strings to store in the file document's `aliases` field
- * @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data
- * @return {GridFSBucketWriteStream}
- */
-
-GridFSBucket.prototype.openUploadStreamWithId = function(id, filename, options) {
- if (options) {
- options = shallowClone(options);
- } else {
- options = {};
- }
-
- if (!options.chunkSizeBytes) {
- options.chunkSizeBytes = this.s.options.chunkSizeBytes;
- }
-
- options.id = id;
-
- return new GridFSBucketWriteStream(this, filename, options);
-};
-
-/**
- * Returns a readable stream (GridFSBucketReadStream) for streaming file
- * data from GridFS.
- * @method
- * @param {ObjectId} id The id of the file doc
- * @param {Object} [options] Optional settings.
- * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from
- * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before
- * @return {GridFSBucketReadStream}
- */
-
-GridFSBucket.prototype.openDownloadStream = function(id, options) {
- var filter = { _id: id };
- options = {
- start: options && options.start,
- end: options && options.end
- };
-
- return new GridFSBucketReadStream(
- this.s._chunksCollection,
- this.s._filesCollection,
- this.s.options.readPreference,
- filter,
- options
- );
-};
-
-/**
- * Deletes a file with the given id
- * @method
- * @param {ObjectId} id The id of the file doc
- * @param {GridFSBucket~errorCallback} [callback]
- */
-
-GridFSBucket.prototype.delete = function(id, callback) {
- return executeLegacyOperation(this.s.db.s.topology, _delete, [this, id, callback], {
- skipSessions: true
- });
-};
-
-/**
- * @ignore
- */
-
-function _delete(_this, id, callback) {
- _this.s._filesCollection.deleteOne({ _id: id }, function(error, res) {
- if (error) {
- return callback(error);
- }
-
- _this.s._chunksCollection.deleteMany({ files_id: id }, function(error) {
- if (error) {
- return callback(error);
- }
-
- // Delete orphaned chunks before returning FileNotFound
- if (!res.result.n) {
- var errmsg = 'FileNotFound: no file with id ' + id + ' found';
- return callback(new Error(errmsg));
- }
-
- callback();
- });
- });
-}
-
-/**
- * Convenience wrapper around find on the files collection
- * @method
- * @param {Object} filter
- * @param {Object} [options] Optional settings for cursor
- * @param {number} [options.batchSize=1000] The number of documents to return per batch. See {@link https://docs.mongodb.com/manual/reference/command/find|find command documentation}.
- * @param {number} [options.limit] Optional limit for cursor
- * @param {number} [options.maxTimeMS] Optional maxTimeMS for cursor
- * @param {boolean} [options.noCursorTimeout] Optionally set cursor's `noCursorTimeout` flag
- * @param {number} [options.skip] Optional skip for cursor
- * @param {object} [options.sort] Optional sort for cursor
- * @return {Cursor}
- */
-
-GridFSBucket.prototype.find = function(filter, options) {
- filter = filter || {};
- options = options || {};
-
- var cursor = this.s._filesCollection.find(filter);
-
- if (options.batchSize != null) {
- cursor.batchSize(options.batchSize);
- }
- if (options.limit != null) {
- cursor.limit(options.limit);
- }
- if (options.maxTimeMS != null) {
- cursor.maxTimeMS(options.maxTimeMS);
- }
- if (options.noCursorTimeout != null) {
- cursor.addCursorFlag('noCursorTimeout', options.noCursorTimeout);
- }
- if (options.skip != null) {
- cursor.skip(options.skip);
- }
- if (options.sort != null) {
- cursor.sort(options.sort);
- }
-
- return cursor;
-};
-
-/**
- * Returns a readable stream (GridFSBucketReadStream) for streaming the
- * file with the given name from GridFS. If there are multiple files with
- * the same name, this will stream the most recent file with the given name
- * (as determined by the `uploadDate` field). You can set the `revision`
- * option to change this behavior.
- * @method
- * @param {String} filename The name of the file to stream
- * @param {Object} [options] Optional settings
- * @param {number} [options.revision=-1] The revision number relative to the oldest file with the given filename. 0 gets you the oldest file, 1 gets you the 2nd oldest, -1 gets you the newest.
- * @param {Number} [options.start] Optional 0-based offset in bytes to start streaming from
- * @param {Number} [options.end] Optional 0-based offset in bytes to stop streaming before
- * @return {GridFSBucketReadStream}
- */
-
-GridFSBucket.prototype.openDownloadStreamByName = function(filename, options) {
- var sort = { uploadDate: -1 };
- var skip = null;
- if (options && options.revision != null) {
- if (options.revision >= 0) {
- sort = { uploadDate: 1 };
- skip = options.revision;
- } else {
- skip = -options.revision - 1;
- }
- }
-
- var filter = { filename: filename };
- options = {
- sort: sort,
- skip: skip,
- start: options && options.start,
- end: options && options.end
- };
- return new GridFSBucketReadStream(
- this.s._chunksCollection,
- this.s._filesCollection,
- this.s.options.readPreference,
- filter,
- options
- );
-};
-
-/**
- * Renames the file with the given _id to the given string
- * @method
- * @param {ObjectId} id the id of the file to rename
- * @param {String} filename new name for the file
- * @param {GridFSBucket~errorCallback} [callback]
- */
-
-GridFSBucket.prototype.rename = function(id, filename, callback) {
- return executeLegacyOperation(this.s.db.s.topology, _rename, [this, id, filename, callback], {
- skipSessions: true
- });
-};
-
-/**
- * @ignore
- */
-
-function _rename(_this, id, filename, callback) {
- var filter = { _id: id };
- var update = { $set: { filename: filename } };
- _this.s._filesCollection.updateOne(filter, update, function(error, res) {
- if (error) {
- return callback(error);
- }
- if (!res.result.n) {
- return callback(toError('File with id ' + id + ' not found'));
- }
- callback();
- });
-}
-
-/**
- * Removes this bucket's files collection, followed by its chunks collection.
- * @method
- * @param {GridFSBucket~errorCallback} [callback]
- */
-
-GridFSBucket.prototype.drop = function(callback) {
- return executeLegacyOperation(this.s.db.s.topology, _drop, [this, callback], {
- skipSessions: true
- });
-};
-
-/**
- * Return the db logger
- * @method
- * @return {Logger} return the db logger
- * @ignore
- */
-GridFSBucket.prototype.getLogger = function() {
- return this.s.db.s.logger;
-};
-
-/**
- * @ignore
- */
-
-function _drop(_this, callback) {
- _this.s._filesCollection.drop(function(error) {
- if (error) {
- return callback(error);
- }
- _this.s._chunksCollection.drop(function(error) {
- if (error) {
- return callback(error);
- }
-
- return callback();
- });
- });
-}
-
-/**
- * Callback format for all GridFSBucket methods that can accept a callback.
- * @callback GridFSBucket~errorCallback
- * @param {MongoError|undefined} error If present, an error instance representing any errors that occurred
- * @param {*} result If present, a returned result for the method
- */
diff --git a/node_modules/mongodb/lib/gridfs-stream/upload.js b/node_modules/mongodb/lib/gridfs-stream/upload.js
deleted file mode 100644
index 578949a..0000000
--- a/node_modules/mongodb/lib/gridfs-stream/upload.js
+++ /dev/null
@@ -1,538 +0,0 @@
-'use strict';
-
-var core = require('../core');
-var crypto = require('crypto');
-var stream = require('stream');
-var util = require('util');
-var Buffer = require('safe-buffer').Buffer;
-
-var ERROR_NAMESPACE_NOT_FOUND = 26;
-
-module.exports = GridFSBucketWriteStream;
-
-/**
- * A writable stream that enables you to write buffers to GridFS.
- *
- * Do not instantiate this class directly. Use `openUploadStream()` instead.
- *
- * @class
- * @extends external:Writable
- * @param {GridFSBucket} bucket Handle for this stream's corresponding bucket
- * @param {string} filename The value of the 'filename' key in the files doc
- * @param {object} [options] Optional settings.
- * @param {string|number|object} [options.id] Custom file id for the GridFS file.
- * @param {number} [options.chunkSizeBytes] The chunk size to use, in bytes
- * @param {number} [options.w] The write concern
- * @param {number} [options.wtimeout] The write concern timeout
- * @param {number} [options.j] The journal write concern
- * @param {boolean} [options.disableMD5=false] If true, disables adding an md5 field to file data
- * @fires GridFSBucketWriteStream#error
- * @fires GridFSBucketWriteStream#finish
- */
-
-function GridFSBucketWriteStream(bucket, filename, options) {
- options = options || {};
- this.bucket = bucket;
- this.chunks = bucket.s._chunksCollection;
- this.filename = filename;
- this.files = bucket.s._filesCollection;
- this.options = options;
- // Signals the write is all done
- this.done = false;
-
- this.id = options.id ? options.id : core.BSON.ObjectId();
- this.chunkSizeBytes = this.options.chunkSizeBytes;
- this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
- this.length = 0;
- this.md5 = !options.disableMD5 && crypto.createHash('md5');
- this.n = 0;
- this.pos = 0;
- this.state = {
- streamEnd: false,
- outstandingRequests: 0,
- errored: false,
- aborted: false,
- promiseLibrary: this.bucket.s.promiseLibrary
- };
-
- if (!this.bucket.s.calledOpenUploadStream) {
- this.bucket.s.calledOpenUploadStream = true;
-
- var _this = this;
- checkIndexes(this, function() {
- _this.bucket.s.checkedIndexes = true;
- _this.bucket.emit('index');
- });
- }
-}
-
-util.inherits(GridFSBucketWriteStream, stream.Writable);
-
-/**
- * An error occurred
- *
- * @event GridFSBucketWriteStream#error
- * @type {Error}
- */
-
-/**
- * `end()` was called and the write stream successfully wrote the file
- * metadata and all the chunks to MongoDB.
- *
- * @event GridFSBucketWriteStream#finish
- * @type {object}
- */
-
-/**
- * Write a buffer to the stream.
- *
- * @method
- * @param {Buffer} chunk Buffer to write
- * @param {String} encoding Optional encoding for the buffer
- * @param {GridFSBucket~errorCallback} callback Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
- * @return {Boolean} False if this write required flushing a chunk to MongoDB. True otherwise.
- */
-
-GridFSBucketWriteStream.prototype.write = function(chunk, encoding, callback) {
- var _this = this;
- return waitForIndexes(this, function() {
- return doWrite(_this, chunk, encoding, callback);
- });
-};
-
-/**
- * Places this write stream into an aborted state (all future writes fail)
- * and deletes all chunks that have already been written.
- *
- * @method
- * @param {GridFSBucket~errorCallback} callback called when chunks are successfully removed or error occurred
- * @return {Promise} if no callback specified
- */
-
-GridFSBucketWriteStream.prototype.abort = function(callback) {
- if (this.state.streamEnd) {
- var error = new Error('Cannot abort a stream that has already completed');
- if (typeof callback === 'function') {
- return callback(error);
- }
- return this.state.promiseLibrary.reject(error);
- }
- if (this.state.aborted) {
- error = new Error('Cannot call abort() on a stream twice');
- if (typeof callback === 'function') {
- return callback(error);
- }
- return this.state.promiseLibrary.reject(error);
- }
- this.state.aborted = true;
- this.chunks.deleteMany({ files_id: this.id }, function(error) {
- if (typeof callback === 'function') callback(error);
- });
-};
-
-/**
- * Tells the stream that no more data will be coming in. The stream will
- * persist the remaining data to MongoDB, write the files document, and
- * then emit a 'finish' event.
- *
- * @method
- * @param {Buffer} chunk Buffer to write
- * @param {String} encoding Optional encoding for the buffer
- * @param {GridFSBucket~errorCallback} callback Function to call when all files and chunks have been persisted to MongoDB
- */
-
-GridFSBucketWriteStream.prototype.end = function(chunk, encoding, callback) {
- var _this = this;
- if (typeof chunk === 'function') {
- (callback = chunk), (chunk = null), (encoding = null);
- } else if (typeof encoding === 'function') {
- (callback = encoding), (encoding = null);
- }
-
- if (checkAborted(this, callback)) {
- return;
- }
- this.state.streamEnd = true;
-
- if (callback) {
- this.once('finish', function(result) {
- callback(null, result);
- });
- }
-
- if (!chunk) {
- waitForIndexes(this, function() {
- writeRemnant(_this);
- });
- return;
- }
-
- this.write(chunk, encoding, function() {
- writeRemnant(_this);
- });
-};
-
-/**
- * @ignore
- */
-
-function __handleError(_this, error, callback) {
- if (_this.state.errored) {
- return;
- }
- _this.state.errored = true;
- if (callback) {
- return callback(error);
- }
- _this.emit('error', error);
-}
-
-/**
- * @ignore
- */
-
-function createChunkDoc(filesId, n, data) {
- return {
- _id: core.BSON.ObjectId(),
- files_id: filesId,
- n: n,
- data: data
- };
-}
-
-/**
- * @ignore
- */
-
-function checkChunksIndex(_this, callback) {
- _this.chunks.listIndexes().toArray(function(error, indexes) {
- if (error) {
- // Collection doesn't exist so create index
- if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
- var index = { files_id: 1, n: 1 };
- _this.chunks.createIndex(index, { background: false, unique: true }, function(error) {
- if (error) {
- return callback(error);
- }
-
- callback();
- });
- return;
- }
- return callback(error);
- }
-
- var hasChunksIndex = false;
- indexes.forEach(function(index) {
- if (index.key) {
- var keys = Object.keys(index.key);
- if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
- hasChunksIndex = true;
- }
- }
- });
-
- if (hasChunksIndex) {
- callback();
- } else {
- index = { files_id: 1, n: 1 };
- var indexOptions = getWriteOptions(_this);
-
- indexOptions.background = false;
- indexOptions.unique = true;
-
- _this.chunks.createIndex(index, indexOptions, function(error) {
- if (error) {
- return callback(error);
- }
-
- callback();
- });
- }
- });
-}
-
-/**
- * @ignore
- */
-
-function checkDone(_this, callback) {
- if (_this.done) return true;
- if (_this.state.streamEnd && _this.state.outstandingRequests === 0 && !_this.state.errored) {
- // Set done so we dont' trigger duplicate createFilesDoc
- _this.done = true;
- // Create a new files doc
- var filesDoc = createFilesDoc(
- _this.id,
- _this.length,
- _this.chunkSizeBytes,
- _this.md5 && _this.md5.digest('hex'),
- _this.filename,
- _this.options.contentType,
- _this.options.aliases,
- _this.options.metadata
- );
-
- if (checkAborted(_this, callback)) {
- return false;
- }
-
- _this.files.insertOne(filesDoc, getWriteOptions(_this), function(error) {
- if (error) {
- return __handleError(_this, error, callback);
- }
- _this.emit('finish', filesDoc);
- });
-
- return true;
- }
-
- return false;
-}
-
-/**
- * @ignore
- */
-
-function checkIndexes(_this, callback) {
- _this.files.findOne({}, { _id: 1 }, function(error, doc) {
- if (error) {
- return callback(error);
- }
- if (doc) {
- return callback();
- }
-
- _this.files.listIndexes().toArray(function(error, indexes) {
- if (error) {
- // Collection doesn't exist so create index
- if (error.code === ERROR_NAMESPACE_NOT_FOUND) {
- var index = { filename: 1, uploadDate: 1 };
- _this.files.createIndex(index, { background: false }, function(error) {
- if (error) {
- return callback(error);
- }
-
- checkChunksIndex(_this, callback);
- });
- return;
- }
- return callback(error);
- }
-
- var hasFileIndex = false;
- indexes.forEach(function(index) {
- var keys = Object.keys(index.key);
- if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
- hasFileIndex = true;
- }
- });
-
- if (hasFileIndex) {
- checkChunksIndex(_this, callback);
- } else {
- index = { filename: 1, uploadDate: 1 };
-
- var indexOptions = getWriteOptions(_this);
-
- indexOptions.background = false;
-
- _this.files.createIndex(index, indexOptions, function(error) {
- if (error) {
- return callback(error);
- }
-
- checkChunksIndex(_this, callback);
- });
- }
- });
- });
-}
-
-/**
- * @ignore
- */
-
-function createFilesDoc(_id, length, chunkSize, md5, filename, contentType, aliases, metadata) {
- var ret = {
- _id: _id,
- length: length,
- chunkSize: chunkSize,
- uploadDate: new Date(),
- filename: filename
- };
-
- if (md5) {
- ret.md5 = md5;
- }
-
- if (contentType) {
- ret.contentType = contentType;
- }
-
- if (aliases) {
- ret.aliases = aliases;
- }
-
- if (metadata) {
- ret.metadata = metadata;
- }
-
- return ret;
-}
-
-/**
- * @ignore
- */
-
-function doWrite(_this, chunk, encoding, callback) {
- if (checkAborted(_this, callback)) {
- return false;
- }
-
- var inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
-
- _this.length += inputBuf.length;
-
- // Input is small enough to fit in our buffer
- if (_this.pos + inputBuf.length < _this.chunkSizeBytes) {
- inputBuf.copy(_this.bufToStore, _this.pos);
- _this.pos += inputBuf.length;
-
- callback && callback();
-
- // Note that we reverse the typical semantics of write's return value
- // to be compatible with node's `.pipe()` function.
- // True means client can keep writing.
- return true;
- }
-
- // Otherwise, buffer is too big for current chunk, so we need to flush
- // to MongoDB.
- var inputBufRemaining = inputBuf.length;
- var spaceRemaining = _this.chunkSizeBytes - _this.pos;
- var numToCopy = Math.min(spaceRemaining, inputBuf.length);
- var outstandingRequests = 0;
- while (inputBufRemaining > 0) {
- var inputBufPos = inputBuf.length - inputBufRemaining;
- inputBuf.copy(_this.bufToStore, _this.pos, inputBufPos, inputBufPos + numToCopy);
- _this.pos += numToCopy;
- spaceRemaining -= numToCopy;
- if (spaceRemaining === 0) {
- if (_this.md5) {
- _this.md5.update(_this.bufToStore);
- }
- var doc = createChunkDoc(_this.id, _this.n, Buffer.from(_this.bufToStore));
- ++_this.state.outstandingRequests;
- ++outstandingRequests;
-
- if (checkAborted(_this, callback)) {
- return false;
- }
-
- _this.chunks.insertOne(doc, getWriteOptions(_this), function(error) {
- if (error) {
- return __handleError(_this, error);
- }
- --_this.state.outstandingRequests;
- --outstandingRequests;
-
- if (!outstandingRequests) {
- _this.emit('drain', doc);
- callback && callback();
- checkDone(_this);
- }
- });
-
- spaceRemaining = _this.chunkSizeBytes;
- _this.pos = 0;
- ++_this.n;
- }
- inputBufRemaining -= numToCopy;
- numToCopy = Math.min(spaceRemaining, inputBufRemaining);
- }
-
- // Note that we reverse the typical semantics of write's return value
- // to be compatible with node's `.pipe()` function.
- // False means the client should wait for the 'drain' event.
- return false;
-}
-
-/**
- * @ignore
- */
-
-function getWriteOptions(_this) {
- var obj = {};
- if (_this.options.writeConcern) {
- obj.w = _this.options.writeConcern.w;
- obj.wtimeout = _this.options.writeConcern.wtimeout;
- obj.j = _this.options.writeConcern.j;
- }
- return obj;
-}
-
-/**
- * @ignore
- */
-
-function waitForIndexes(_this, callback) {
- if (_this.bucket.s.checkedIndexes) {
- return callback(false);
- }
-
- _this.bucket.once('index', function() {
- callback(true);
- });
-
- return true;
-}
-
-/**
- * @ignore
- */
-
-function writeRemnant(_this, callback) {
- // Buffer is empty, so don't bother to insert
- if (_this.pos === 0) {
- return checkDone(_this, callback);
- }
-
- ++_this.state.outstandingRequests;
-
- // Create a new buffer to make sure the buffer isn't bigger than it needs
- // to be.
- var remnant = Buffer.alloc(_this.pos);
- _this.bufToStore.copy(remnant, 0, 0, _this.pos);
- if (_this.md5) {
- _this.md5.update(remnant);
- }
- var doc = createChunkDoc(_this.id, _this.n, remnant);
-
- // If the stream was aborted, do not write remnant
- if (checkAborted(_this, callback)) {
- return false;
- }
-
- _this.chunks.insertOne(doc, getWriteOptions(_this), function(error) {
- if (error) {
- return __handleError(_this, error);
- }
- --_this.state.outstandingRequests;
- checkDone(_this);
- });
-}
-
-/**
- * @ignore
- */
-
-function checkAborted(_this, callback) {
- if (_this.state.aborted) {
- if (typeof callback === 'function') {
- callback(new Error('this stream has been aborted'));
- }
- return true;
- }
- return false;
-}