Skip to content

Commit

Permalink
Feat: Add pipelining with method chaining (#312 @funnisimo )
Browse files Browse the repository at this point in the history
Added a 'pipeline' method which allows chaining of methods like ioredis.

Updated discard and multi to work with the pipeline.
  • Loading branch information
funnisimo authored and stipsan committed Oct 3, 2017
1 parent 7ea436e commit 8368c2b
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 15 deletions.
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -11,7 +11,7 @@
"pretest": "printf \"node \" && node --version",
"test": "mocha --compilers js:babel-register --recursive",
"test:watch": "npm test -- --watch --growl",
"lint": "eslint .",
"lint": "./node_modules/.bin/eslint .",
"coveralls": "node_modules/.bin/babel-node node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec --recursive test && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js",
"codeclimate": "codeclimate-test-reporter < ./coverage/lcov.info",
"coverage": "npm run coveralls && npm run codeclimate",
Expand Down
6 changes: 4 additions & 2 deletions src/commands/discard.js
@@ -1,5 +1,7 @@
export function discard() {
this.batch.length = 0;

if (!this.batch) {
throw new Error('ERR DISCARD without MULTI');
}
this.batch = undefined;
return 'OK';
}
25 changes: 18 additions & 7 deletions src/index.js
Expand Up @@ -6,12 +6,13 @@ import * as commands from './commands';
import createCommand from './command';
import createData from './data';
import createExpires from './expires';
import Pipeline from './pipeline';

class RedisMock extends EventEmitter {
constructor({ data = {} } = { }) {
constructor({ data = {} } = {}) {
super();
this.channels = {};
this.batch = [];
this.batch = undefined;

this.expires = createExpires();

Expand All @@ -26,14 +27,24 @@ class RedisMock extends EventEmitter {
this.emit('ready');
});
}
multi(batch) {
this.batch = batch.map(([command, ...options]) => this[command].bind(this, ...options));
multi(batch = []) {
this.batch = new Pipeline(this);

return this;
batch.forEach(([command, ...options]) => this.batch[command](...options));

return this.batch;
}
pipeline() {
this.batch = new Pipeline(this);
return this.batch;
}
exec(callback) {
return Promise.all(this.batch.map(promise => promise()))
.then(results => results.map(result => [null, result])).nodeify(callback);
if (!this.batch) {
return Promise.reject(new Error('ERR EXEC without MULTI'));
}
const pipeline = this.batch;
this.batch = undefined;
return pipeline.exec(callback);
}
}

Expand Down
40 changes: 40 additions & 0 deletions src/pipeline.js
@@ -0,0 +1,40 @@
import Promise from 'bluebird';

import * as commands from './commands';

function createCommand(pipeline, emulate) {
return (...args) => {
const lastArgIndex = args.length - 1;
let callback = args[lastArgIndex];
if (typeof callback !== 'function') {
callback = undefined;
} else {
args.length = lastArgIndex; // eslint-disable-line no-param-reassign
}

// transform non-buffer arguments to strings to simulate real ioredis behavior
const stringArgs = args.map(arg => // eslint-disable-line no-confusing-arrow
arg instanceof Buffer ? arg : arg.toString()
);

pipeline.batch.push(() => emulate(...stringArgs));
return pipeline;
};
}

class Pipeline {
constructor(redis) {
this.batch = [];

Object.keys(commands).forEach((command) => {
this[command] = createCommand(this, commands[command].bind(redis));
});
}
exec(callback) {
const batch = this.batch;
this.batch = [];
return Promise.resolve(batch.map(cmd => [null, cmd()])).asCallback(callback);
}
}

export default Pipeline;
11 changes: 10 additions & 1 deletion test/commands/discard.js
Expand Up @@ -12,7 +12,16 @@ describe('discard', () => {
]);
return redis.discard().then((result) => {
expect(result).toBe('OK');
expect(redis.batch.length).toBe(0);
expect(redis.batch).toBe(undefined);
});
});

it('errors if you discard without starting a pipeline', () => {
const redis = new MockRedis();

return redis.discard()
.catch((err) => {
expect(err).toBeA(Error);
});
});
});
32 changes: 28 additions & 4 deletions test/multi.js
Expand Up @@ -10,9 +10,33 @@ describe('multi', () => {
['incr', 'user_next'],
['incr', 'post_next'],
]);
expect(redis.batch).toBeA('array');
expect(redis.batch.length).toBe(2);
expect(redis.batch[0]).toBeA('function');
expect(redis.batch[1]).toBeA('function');
expect(redis.batch).toBeA('object');
expect(redis.batch.batch).toBeA('array');
expect(redis.batch.batch.length).toBe(2);
expect(redis.batch.batch[0]).toBeA('function');
expect(redis.batch.batch[1]).toBeA('function');
});

it('allows for pipelining methods', () => {
const redis = new MockRedis();

return redis.pipeline()
.incr('user_next')
.incr('post_next')
.exec()
.then((results) => {
expect(results).toBeA('array');
expect(results.length).toBe(2);
expect(results[0]).toEqual([null, 1]);
expect(results[1]).toEqual([null, 1]);
});
});

it('errors if you exec without starting a pipeline', () => {
const redis = new MockRedis();

return redis.exec().catch((err) => {
expect(err).toBeA(Error);
});
});
});

0 comments on commit 8368c2b

Please sign in to comment.