Skip to content

Commit

Permalink
refactor(find): use FindOperation for finds
Browse files Browse the repository at this point in the history
This changeset introduces support for using an operation for find
operations. Unlike other operations, this one is still in progress
in terms of becoming a proper subclass of `CommandOperationV2`.
Since we must maintain support for legacy versions of the server,
we cannot simply reuse the command operation.
  • Loading branch information
mbroadst authored and daprahamian committed Aug 13, 2019
1 parent 0f88582 commit 6acef6d
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 19 deletions.
16 changes: 12 additions & 4 deletions lib/collection.js
Expand Up @@ -6,7 +6,6 @@ const checkCollectionName = require('./utils').checkCollectionName;
const ObjectID = require('./core').BSON.ObjectID;
const MongoError = require('./core').MongoError;
const toError = require('./utils').toError;
const handleCallback = require('./utils').handleCallback;
const normalizeHintField = require('./utils').normalizeHintField;
const decorateCommand = require('./utils').decorateCommand;
const decorateWithCollation = require('./utils').decorateWithCollation;
Expand Down Expand Up @@ -45,6 +44,7 @@ const DropCollectionOperation = require('./operations/drop').DropCollectionOpera
const DropIndexOperation = require('./operations/drop_index');
const DropIndexesOperation = require('./operations/drop_indexes');
const EstimatedDocumentCountOperation = require('./operations/estimated_document_count');
const FindOperation = require('./operations/find');
const FindOneOperation = require('./operations/find_one');
const FindAndModifyOperation = require('./operations/find_and_modify');
const FindOneAndDeleteOperation = require('./operations/find_one_and_delete');
Expand Down Expand Up @@ -428,10 +428,18 @@ Collection.prototype.find = deprecateOptions(
throw err;
}

// TODO: pass object cursor
const cursor = this.s.topology.cursor(this.s.namespace.toString(), findCommand, newOptions);
const cursor = this.s.topology.cursor(
new FindOperation(this, this.s.namespace, findCommand, newOptions),
newOptions
);

// TODO: remove this when NODE-2074 is resolved
if (typeof callback === 'function') {
callback(null, cursor);
return;
}

return typeof callback === 'function' ? handleCallback(callback, null, cursor) : cursor;
return cursor;
}
);

Expand Down
23 changes: 21 additions & 2 deletions lib/core/cursor.js
Expand Up @@ -12,6 +12,7 @@ const executeOperation = require('../operations/execute_operation');
const Readable = require('stream').Readable;
const SUPPORTS = require('../utils').SUPPORTS;
const MongoDBNamespace = require('../utils').MongoDBNamespace;
const OperationBase = require('../operations/operation').OperationBase;

const BSON = retrieveBSON();
const Long = BSON.Long;
Expand Down Expand Up @@ -77,11 +78,11 @@ class CoreCursor extends Readable {
super({ objectMode: true });
options = options || {};

if (typeof ns !== 'string') {
if (ns instanceof OperationBase) {
this.operation = ns;
ns = this.operation.ns.toString();
options = this.operation.options;
cmd = {};
cmd = this.operation.cmd ? this.operation.cmd : {};
}

// Cursor pool
Expand Down Expand Up @@ -161,6 +162,11 @@ class CoreCursor extends Readable {
this.cursorState.cursorId = cmd;
this.cursorState.lastCursorId = cmd;
}

// TODO: remove as part of NODE-2104
if (this.operation) {
this.operation.cursorState = this.cursorState;
}
}

setCursorBatchSize(value) {
Expand Down Expand Up @@ -385,6 +391,11 @@ class CoreCursor extends Readable {

if (session && (options.force || session.owner === this)) {
this.cursorState.session = undefined;

if (this.operation) {
this.operation.clearSession();
}

session.endSession(callback);
return true;
}
Expand Down Expand Up @@ -523,6 +534,14 @@ class CoreCursor extends Readable {
};

if (cursor.operation) {
if (cursor.logger.isDebug()) {
cursor.logger.debug(
`issue initial query [${JSON.stringify(cursor.cmd)}] with flags [${JSON.stringify(
cursor.query
)}]`
);
}

executeOperation(cursor.topology, cursor.operation, (err, result) => {
if (err) {
done(err);
Expand Down
17 changes: 6 additions & 11 deletions lib/cursor.js
Expand Up @@ -106,13 +106,8 @@ const fields = ['numberOfRetries', 'tailableRetryInterval'];
class Cursor extends CoreCursor {
constructor(topology, ns, cmd, options) {
super(topology, ns, cmd, options);
const streamOptions = {};

if (typeof ns !== 'string') {
this.operation = ns;
ns = this.operation.ns.toString();
if (this.operation) {
options = this.operation.options;
cmd = {};
}

// Tailable cursor options
Expand All @@ -131,8 +126,6 @@ class Cursor extends CoreCursor {
currentNumberOfRetries: currentNumberOfRetries,
// State
state: CursorState.INIT,
// Stream options
streamOptions,
// Promise library
promiseLibrary,
// Current doc
Expand All @@ -153,8 +146,8 @@ class Cursor extends CoreCursor {

// Get the batchSize
let batchSize = 1000;
if (cmd.cursor && cmd.cursor.batchSize) {
batchSize = cmd.cursor && cmd.cursor.batchSize;
if (this.cmd.cursor && this.cmd.cursor.batchSize) {
batchSize = this.cmd.cursor.batchSize;
} else if (options.cursor && options.cursor.batchSize) {
batchSize = options.cursor.batchSize;
} else if (typeof options.batchSize === 'number') {
Expand Down Expand Up @@ -960,7 +953,9 @@ class Cursor extends CoreCursor {
* @return {Promise} returns Promise if no callback passed
*/
explain(callback) {
if (this.operation) {
// NOTE: the next line includes a special case for operations which do not
// subclass `CommandOperationV2`. To be removed asap.
if (this.operation && this.operation.cmd == null) {
this.operation.options.explain = true;
return executeOperation(this.topology, this.operation, callback);
}
Expand Down
33 changes: 33 additions & 0 deletions lib/operations/find.js
@@ -0,0 +1,33 @@
'use strict';

const OperationBase = require('./operation').OperationBase;
const Aspect = require('./operation').Aspect;
const defineAspects = require('./operation').defineAspects;

class FindOperation extends OperationBase {
constructor(collection, ns, command, options) {
super(options);

this.ns = ns;
this.cmd = command;
}

execute(server, callback) {
// copied from `CommandOperationV2`, to be subclassed in the future
this.server = server;

const cursorState = this.cursorState || {};

// TOOD: use `MongoDBNamespace` through and through
server.query(this.ns.toString(), this.cmd, cursorState, this.options, callback);
}
}

defineAspects(FindOperation, [
Aspect.READ_OPERATION,
Aspect.RETRYABLE,
Aspect.EXECUTE_WITH_SELECTION,
Aspect.SKIP_SESSION
]);

module.exports = FindOperation;
5 changes: 4 additions & 1 deletion test/functional/cursor_tests.js
Expand Up @@ -297,6 +297,7 @@ describe('Cursor', function() {
const client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 });

client.connect((err, client) => {
expect(err).to.not.exist;
const db = client.db(configuration.db);

let internalClientCursor;
Expand All @@ -311,7 +312,9 @@ describe('Cursor', function() {
const cursor = db.collection('countTEST').find({ qty: { $gt: 4 } });
cursor.count(true, { readPreference: ReadPreference.SECONDARY }, err => {
expect(err).to.be.null;
expect(internalClientCursor.getCall(0).args[2])

const operation = internalClientCursor.getCall(0).args[0];
expect(operation.options)
.to.have.nested.property('readPreference')
.that.deep.equals(expectedReadPreference);
client.close();
Expand Down
1 change: 1 addition & 0 deletions test/functional/find_tests.js
Expand Up @@ -36,6 +36,7 @@ describe('Find', function() {

// Ensure correct insertion testing via the cursor and the count function
collection.find().toArray(function(err, documents) {
expect(err).to.not.exist;
test.equal(2, documents.length);

collection.count(function(err, count) {
Expand Down
3 changes: 2 additions & 1 deletion test/functional/retryable_reads_tests.js
Expand Up @@ -25,7 +25,8 @@ describe('Retryable Reads', function() {
spec.description.match(/listCollections/i) ||
spec.description.match(/listCollectionNames/i) ||
spec.description.match(/estimatedDocumentCount/i) ||
spec.description.match(/count/i)
spec.description.match(/count/i) ||
spec.description.match(/find/i)
);
});
});

0 comments on commit 6acef6d

Please sign in to comment.