Skip to content

Commit

Permalink
refactor(cursor): modernize and deduplicate cursor classes
Browse files Browse the repository at this point in the history
Since we merged core into native, there is no longer great need
to maintain a fully functional "core" cursor type, it has become
a private implementation detail. This changeset renames that class
to `CoreCursor`, converts it to an ES6 class, and then builds the
three public cursors on top of it. Here is a summary of changes:

 - All cursor classes are now ES6 classes
 - The `Cursor` from "core" has been renamed `CoreCursor`
 - a `CursorState` has been exported from the `CoreCursor` module
   so that all cursors may share a common state definition
 - `CoreCursor` extends `Readable` stream. Each of its subclasses
   inherited this, so this is where it naturally fits in the
   hierarchy. This wasn't possible before because the cursor
   exported by `mongodb-core` did not want to concern itself with
   higher concepts like streams. We are no longer shackled by this
   idea.
 - `CoreCursor` only exposes a public `_next` method because of
   the way the other cursors have traditionally subclassed it.
 - We still have `cursorState` and `s`, but they are only in _two_
   locations now (`CoreCursor` has a `cursorState`, and `Cursor`
   has a `s). Eventually I think even these should be merged, but
   it did not seem important to distinguish this now.
 - All cursors had a _lot_ of duplicated properties. As much as
   possible these have been deduplicated, and include the following
   stored properties: bson, options, topology, topologyOptions,
   streamOptions, namespace, ns, cmd, promiseLibrary
 - generally sessions are stored and referred to by the core
   cursor's `cursorState`. We could move all functionality there
   eventually.
  • Loading branch information
mbroadst authored and daprahamian committed Aug 13, 2019
1 parent feec4ba commit 0f88582
Show file tree
Hide file tree
Showing 31 changed files with 1,719 additions and 1,862 deletions.
401 changes: 177 additions & 224 deletions lib/aggregation_cursor.js

Large diffs are not rendered by default.

265 changes: 85 additions & 180 deletions lib/command_cursor.js
@@ -1,12 +1,9 @@
'use strict';

const inherits = require('util').inherits;
const ReadPreference = require('./core').ReadPreference;
const MongoError = require('./core').MongoError;
const Readable = require('stream').Readable;
const CoreCursor = require('./cursor');
const SUPPORTS = require('./utils').SUPPORTS;
const MongoDBNamespace = require('./utils').MongoDBNamespace;
const Cursor = require('./cursor');
const CursorState = require('./core/cursor').CursorState;

/**
* @fileOverview The **CommandCursor** class is an internal class that embodies a
Expand Down Expand Up @@ -55,62 +52,92 @@ const MongoDBNamespace = require('./utils').MongoDBNamespace;
* @fires CommandCursor#readable
* @return {CommandCursor} an CommandCursor instance.
*/
var CommandCursor = function(topology, ns, cmd, options) {
CoreCursor.apply(this, Array.prototype.slice.call(arguments, 0));
var state = CommandCursor.INIT;
var streamOptions = {};
const bson = topology.s.bson;
const topologyOptions = topology.s.options;

if (typeof ns !== 'string') {
this.operation = ns;
ns = this.operation.ns.toString();
options = this.operation.options;
cmd = {};
class CommandCursor extends Cursor {
constructor(topology, ns, cmd, options) {
super(topology, ns, cmd, options);
}

// MaxTimeMS
var maxTimeMS = null;

// Get the promiseLibrary
var promiseLibrary = options.promiseLibrary || Promise;

// Set up
Readable.call(this, { objectMode: true });

// Internal state
this.s = {
// MaxTimeMS
maxTimeMS: maxTimeMS,
// State
state: state,
// Stream options
streamOptions: streamOptions,
// BSON
bson: bson,
// Namespace
namespace: MongoDBNamespace.fromString(ns),
// Command
cmd: cmd,
// Options
options: options,
// Topology
topology: topology,
// Topology Options
topologyOptions: topologyOptions,
// Promise library
promiseLibrary: promiseLibrary,
// Optional ClientSession
session: options.session
};
};

Object.defineProperty(CommandCursor.prototype, 'namespace', {
enumerable: true,
get: function() {
return this.s.namespace.toString();
/**
* Set the ReadPreference for the cursor.
* @method
* @param {(string|ReadPreference)} readPreference The new read preference for the cursor.
* @throws {MongoError}
* @return {Cursor}
*/
setReadPreference(readPreference) {
if (this.s.state === CursorState.CLOSED || this.isDead()) {
throw MongoError.create({ message: 'Cursor is closed', driver: true });
}

if (this.s.state !== CursorState.INIT) {
throw MongoError.create({
message: 'cannot change cursor readPreference after cursor has been accessed',
driver: true
});
}

if (readPreference instanceof ReadPreference) {
this.options.readPreference = readPreference;
} else if (typeof readPreference === 'string') {
this.options.readPreference = new ReadPreference(readPreference);
} else {
throw new TypeError('Invalid read preference: ' + readPreference);
}

return this;
}
});

/**
* Set the batch size for the cursor.
* @method
* @param {number} value The batchSize for the cursor.
* @throws {MongoError}
* @return {CommandCursor}
*/
batchSize(value) {
if (this.s.state === CursorState.CLOSED || this.isDead()) {
throw MongoError.create({ message: 'Cursor is closed', driver: true });
}

if (typeof value !== 'number') {
throw MongoError.create({ message: 'batchSize requires an integer', driver: true });
}

if (this.cmd.cursor) {
this.cmd.cursor.batchSize = value;
}

this.setCursorBatchSize(value);
return this;
}

/**
* Add a maxTimeMS stage to the aggregation pipeline
* @method
* @param {number} value The state maxTimeMS value.
* @return {CommandCursor}
*/
maxTimeMS(value) {
if (this.topology.lastIsMaster().minWireVersion > 2) {
this.cmd.maxTimeMS = value;
}

return this;
}

/**
* Return the cursor logger
* @method
* @return {Logger} return the cursor logger
* @ignore
*/
getLogger() {
return this.logger;
}
}

// aliases
CommandCursor.prototype.get = CommandCursor.prototype.toArray;

/**
* CommandCursor stream data event, fired for each document in the cursor.
Expand Down Expand Up @@ -140,124 +167,6 @@ Object.defineProperty(CommandCursor.prototype, 'namespace', {
* @type {null}
*/

// Inherit from Readable
inherits(CommandCursor, Readable);

// Set the methods to inherit from prototype
var methodsToInherit = [
'_next',
'next',
'hasNext',
'each',
'forEach',
'toArray',
'rewind',
'bufferedCount',
'readBufferedDocuments',
'close',
'isClosed',
'kill',
'setCursorBatchSize',
'_find',
'_initializeCursor',
'_getMore',
'_killcursor',
'isDead',
'explain',
'isNotified',
'isKilled',
'_endSession'
];

// Only inherit the types we need
for (var i = 0; i < methodsToInherit.length; i++) {
CommandCursor.prototype[methodsToInherit[i]] = CoreCursor.prototype[methodsToInherit[i]];
}

if (SUPPORTS.ASYNC_ITERATOR) {
CommandCursor.prototype[Symbol.asyncIterator] = require('./async/async_iterator').asyncIterator;
}

/**
* Set the ReadPreference for the cursor.
* @method
* @param {(string|ReadPreference)} readPreference The new read preference for the cursor.
* @throws {MongoError}
* @return {Cursor}
*/
CommandCursor.prototype.setReadPreference = function(readPreference) {
if (this.s.state === CommandCursor.CLOSED || this.isDead()) {
throw MongoError.create({ message: 'Cursor is closed', driver: true });
}

if (this.s.state !== CommandCursor.INIT) {
throw MongoError.create({
message: 'cannot change cursor readPreference after cursor has been accessed',
driver: true
});
}

if (readPreference instanceof ReadPreference) {
this.s.options.readPreference = readPreference;
} else if (typeof readPreference === 'string') {
this.s.options.readPreference = new ReadPreference(readPreference);
} else {
throw new TypeError('Invalid read preference: ' + readPreference);
}

return this;
};

/**
* Set the batch size for the cursor.
* @method
* @param {number} value The batchSize for the cursor.
* @throws {MongoError}
* @return {CommandCursor}
*/
CommandCursor.prototype.batchSize = function(value) {
if (this.s.state === CommandCursor.CLOSED || this.isDead()) {
throw MongoError.create({ message: 'Cursor is closed', driver: true });
}

if (typeof value !== 'number') {
throw MongoError.create({ message: 'batchSize requires an integer', driver: true });
}

if (this.cmd.cursor) {
this.cmd.cursor.batchSize = value;
}

this.setCursorBatchSize(value);
return this;
};

/**
* Add a maxTimeMS stage to the aggregation pipeline
* @method
* @param {number} value The state maxTimeMS value.
* @return {CommandCursor}
*/
CommandCursor.prototype.maxTimeMS = function(value) {
if (this.s.topology.lastIsMaster().minWireVersion > 2) {
this.cmd.maxTimeMS = value;
}

return this;
};

/**
* Return the cursor logger
* @method
* @return {Logger} return the cursor logger
* @ignore
*/
CommandCursor.prototype.getLogger = function() {
return this.logger;
};

CommandCursor.prototype.get = CommandCursor.prototype.toArray;

/**
* Get the next available document from the cursor, returns null if no more documents are available.
* @function CommandCursor.prototype.next
Expand Down Expand Up @@ -357,8 +266,4 @@ CommandCursor.prototype.get = CommandCursor.prototype.toArray;
* @return {null}
*/

CommandCursor.INIT = 0;
CommandCursor.OPEN = 1;
CommandCursor.CLOSED = 2;

module.exports = CommandCursor;

0 comments on commit 0f88582

Please sign in to comment.