Skip to content

Commit

Permalink
refactor(change-stream): use AggregateOperation for stream cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst authored and daprahamian committed Aug 13, 2019
1 parent 831968d commit ae729f8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 60 deletions.
96 changes: 36 additions & 60 deletions lib/change_stream.js
Expand Up @@ -3,11 +3,10 @@
const EventEmitter = require('events');
const isResumableError = require('./error').isResumableError;
const MongoError = require('./core').MongoError;
const ReadConcern = require('./read_concern');
const MongoDBNamespace = require('./utils').MongoDBNamespace;
const Cursor = require('./cursor');
const relayEvents = require('./core/utils').relayEvents;
const maxWireVersion = require('./core/utils').maxWireVersion;
const AggregateOperation = require('./operations/aggregate');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand Down Expand Up @@ -49,7 +48,7 @@ const CHANGE_DOMAIN_TYPES = {
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
* @class ChangeStream
* @since 3.0.0
* @param {(MongoClient|Db|Collection)} changeDomain The domain against which to create the change stream
* @param {(MongoClient|Db|Collection)} parent The parent object that created this change stream
* @param {Array} pipeline An array of {@link https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
* @param {ChangeStreamOptions} [options] Optional settings
* @fires ChangeStream#close
Expand All @@ -60,7 +59,7 @@ const CHANGE_DOMAIN_TYPES = {
* @return {ChangeStream} a ChangeStream instance.
*/
class ChangeStream extends EventEmitter {
constructor(changeDomain, pipeline, options) {
constructor(parent, pipeline, options) {
super();
const Collection = require('./collection');
const Db = require('./db');
Expand All @@ -69,29 +68,26 @@ class ChangeStream extends EventEmitter {
this.pipeline = pipeline || [];
this.options = options || {};

this.namespace =
changeDomain instanceof MongoClient
? new MongoDBNamespace('admin')
: changeDomain.s.namespace;

if (changeDomain instanceof Collection) {
this.parent = parent;
this.namespace = parent.s.namespace;
if (parent instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
this.topology = changeDomain.s.db.serverConfig;
} else if (changeDomain instanceof Db) {
this.topology = parent.s.db.serverConfig;
} else if (parent instanceof Db) {
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
this.topology = changeDomain.serverConfig;
} else if (changeDomain instanceof MongoClient) {
this.topology = parent.serverConfig;
} else if (parent instanceof MongoClient) {
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
this.topology = changeDomain.topology;
this.topology = parent.topology;
} else {
throw new TypeError(
'changeDomain provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
'parent provided to ChangeStream constructor is not an instance of Collection, Db, or MongoClient'
);
}

this.promiseLibrary = changeDomain.s.promiseLibrary;
if (!this.options.readPreference && changeDomain.s.readPreference) {
this.options.readPreference = changeDomain.s.readPreference;
this.promiseLibrary = parent.s.promiseLibrary;
if (!this.options.readPreference && parent.s.readPreference) {
this.options.readPreference = parent.s.readPreference;
}

// Create contained Change Stream cursor
Expand Down Expand Up @@ -259,9 +255,8 @@ class ChangeStream extends EventEmitter {
}

class ChangeStreamCursor extends Cursor {
constructor(topology, ns, cmd, options) {
// TODO: spread will help a lot here
super(topology, ns, cmd, options);
constructor(topology, operation, options) {
super(topology, operation, options);

options = options || {};
this._resumeToken = null;
Expand Down Expand Up @@ -363,8 +358,22 @@ class ChangeStreamCursor extends Cursor {
*/

// Create a new change stream cursor based on self's configuration
var createChangeStreamCursor = function(self, options) {
const changeStreamCursor = buildChangeStreamAggregationCommand(self, options);
function createChangeStreamCursor(self, options) {
const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
const changeStreamOptions = Object.assign({ batchSize: 1 }, options);
const changeStreamCursor = new ChangeStreamCursor(
self.topology,
new AggregateOperation(self.parent, pipeline, changeStreamOptions),
cursorOptions
);

relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);

/**
Expand Down Expand Up @@ -420,50 +429,17 @@ var createChangeStreamCursor = function(self, options) {
}

return changeStreamCursor;
};
}

function applyKnownOptions(target, source, optionNames) {
optionNames.forEach(name => {
if (source[name]) {
target[name] = source[name];
}
});
}

var buildChangeStreamAggregationCommand = function(self, options) {
options = options || {};
const topology = self.topology;
const namespace = self.namespace;
const pipeline = self.pipeline;

const changeStreamStageOptions = { fullDocument: options.fullDocument || 'default' };
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);

// Map cursor options
const cursorOptions = { cursorFactory: ChangeStreamCursor };
applyKnownOptions(cursorOptions, options, CURSOR_OPTIONS);

if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
changeStreamStageOptions.allChangesForCluster = true;
}

var changeStreamPipeline = [{ $changeStream: changeStreamStageOptions }];

changeStreamPipeline = changeStreamPipeline.concat(pipeline);

var command = {
aggregate: self.type === CHANGE_DOMAIN_TYPES.COLLECTION ? namespace.collection : 1,
pipeline: changeStreamPipeline,
readConcern: new ReadConcern(ReadConcern.MAJORITY),
cursor: {
batchSize: options.batchSize || 1
}
};

// Create and return the cursor
// TODO: switch to passing namespace object later
return topology.cursor(namespace.toString(), command, cursorOptions);
};
return target;
}

// This method performs a basic server selection loop, satisfying the requirements of
// ChangeStream resumability until the new SDAM layer can be used.
Expand Down
1 change: 1 addition & 0 deletions lib/cursor.js
Expand Up @@ -961,6 +961,7 @@ class Cursor extends CoreCursor {
// subclass `CommandOperationV2`. To be removed asap.
if (this.operation && this.operation.cmd == null) {
this.operation.options.explain = true;
this.operation.fullResponse = false;
return executeOperation(this.topology, this.operation, callback);
}

Expand Down

0 comments on commit ae729f8

Please sign in to comment.