diff --git a/lib/change_stream.js b/lib/change_stream.js index 423b4ab441..c2a2d3bed7 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -3,11 +3,10 @@ const EventEmitter = require('events'); const isResumableError = require('./error').isResumableError; const MongoError = require('./core').MongoError; -const ReadConcern = require('./read_concern'); -const MongoDBNamespace = require('./utils').MongoDBNamespace; const Cursor = require('./cursor'); const relayEvents = require('./core/utils').relayEvents; const maxWireVersion = require('./core/utils').maxWireVersion; +const AggregateOperation = require('./operations/aggregate'); const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( @@ -49,7 +48,7 @@ const CHANGE_DOMAIN_TYPES = { * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}. * @class ChangeStream * @since 3.0.0 - * @param {(MongoClient|Db|Collection)} changeDomain The domain against which to create the change stream + * @param {(MongoClient|Db|Collection)} parent The parent object that created this change stream * @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents * @param {ChangeStreamOptions} [options] Optional settings * @fires ChangeStream#close @@ -60,7 +59,7 @@ const CHANGE_DOMAIN_TYPES = { * @return {ChangeStream} a ChangeStream instance. */ class ChangeStream extends EventEmitter { - constructor(changeDomain, pipeline, options) { + constructor(parent, pipeline, options) { super(); const Collection = require('./collection'); const Db = require('./db'); @@ -69,29 +68,26 @@ class ChangeStream extends EventEmitter { this.pipeline = pipeline || []; this.options = options || {}; - this.namespace = - changeDomain instanceof MongoClient - ? new MongoDBNamespace('admin') - : changeDomain.s.namespace; - - if (changeDomain instanceof Collection) { + this.parent = parent; + this.namespace = parent.s.namespace; + if (parent instanceof Collection) { this.type = CHANGE_DOMAIN_TYPES.COLLECTION; - this.topology = changeDomain.s.db.serverConfig; - } else if (changeDomain instanceof Db) { + this.topology = parent.s.db.serverConfig; + } else if (parent instanceof Db) { this.type = CHANGE_DOMAIN_TYPES.DATABASE; - this.topology = changeDomain.serverConfig; - } else if (changeDomain instanceof MongoClient) { + this.topology = parent.serverConfig; + } else if (parent instanceof MongoClient) { this.type = CHANGE_DOMAIN_TYPES.CLUSTER; - this.topology = changeDomain.topology; + this.topology = parent.topology; } else { throw new TypeError( - 'changeDomain provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient' + 'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient' ); } - this.promiseLibrary = changeDomain.s.promiseLibrary; - if (!this.options.readPreference && changeDomain.s.readPreference) { - this.options.readPreference = changeDomain.s.readPreference; + this.promiseLibrary = parent.s.promiseLibrary; + if (!this.options.readPreference && parent.s.readPreference) { + this.options.readPreference = parent.s.readPreference; } // Create contained Change Stream cursor @@ -259,9 +255,8 @@ class ChangeStream extends EventEmitter { } class ChangeStreamCursor extends Cursor { - constructor(topology, ns, cmd, options) { - // TODO: spread will help a lot here - super(topology, ns, cmd, options); + constructor(topology, operation, options) { + super(topology, operation, options); options = options || {}; this._resumeToken = null; @@ -363,8 +358,22 @@ class ChangeStreamCursor extends Cursor { */ // Create a new change stream cursor based on self's configuration -var createChangeStreamCursor = function(self, options) { - const changeStreamCursor = buildChangeStreamAggregationCommand(self, options); +function createChangeStreamCursor(self, options) { + const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' }; + applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS); + if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) { + changeStreamStageOptions.allChangesForCluster = true; + } + + const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline); + const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS); + const changeStreamOptions = Object.assign({ batchSize: 1 }, options); + const changeStreamCursor = new ChangeStreamCursor( + self.topology, + new AggregateOperation(self.parent, pipeline, changeStreamOptions), + cursorOptions + ); + relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']); /** @@ -420,7 +429,7 @@ var createChangeStreamCursor = function(self, options) { } return changeStreamCursor; -}; +} function applyKnownOptions(target, source, optionNames) { optionNames.forEach(name => { @@ -428,42 +437,9 @@ function applyKnownOptions(target, source, optionNames) { target[name] = source[name]; } }); -} - -var buildChangeStreamAggregationCommand = function(self, options) { - options = options || {}; - const topology = self.topology; - const namespace = self.namespace; - const pipeline = self.pipeline; - - const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' }; - applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS); - - // Map cursor options - const cursorOptions = { cursorFactory: ChangeStreamCursor }; - applyKnownOptions(cursorOptions, options, CURSOR_OPTIONS); - - if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) { - changeStreamStageOptions.allChangesForCluster = true; - } - var changeStreamPipeline = [{ $changeStream: changeStreamStageOptions }]; - - changeStreamPipeline = changeStreamPipeline.concat(pipeline); - - var command = { - aggregate: self.type === CHANGE_DOMAIN_TYPES.COLLECTION ? namespace.collection : 1, - pipeline: changeStreamPipeline, - readConcern: new ReadConcern(ReadConcern.MAJORITY), - cursor: { - batchSize: options.batchSize || 1 - } - }; - - // Create and return the cursor - // TODO: switch to passing namespace object later - return topology.cursor(namespace.toString(), command, cursorOptions); -}; + return target; +} // This method performs a basic server selection loop, satisfying the requirements of // ChangeStream resumability until the new SDAM layer can be used. diff --git a/lib/cursor.js b/lib/cursor.js index 6e8ac07891..962cb0bf34 100644 --- a/lib/cursor.js +++ b/lib/cursor.js @@ -961,6 +961,7 @@ class Cursor extends CoreCursor { // subclass `CommandOperationV2`. To be removed asap. if (this.operation && this.operation.cmd == null) { this.operation.options.explain = true; + this.operation.fullResponse = false; return executeOperation(this.topology, this.operation, callback); }