Skip to content

Commit

Permalink
Abort incomplete pipelines upon reconnect
Browse files Browse the repository at this point in the history
Elasticache severs the connection immediately after it returns a
READONLY error.  This can sometimes leave queued up pipelined commands
in an inconsistent state when the connection is reestablished. This fix
will detect any pipeline command sets that only had a partial response
before connection loss, and abort them.  This fix will also check the
offlineQueue for any transaction fragments, so that we don't send
mismatched multi/exec to redis upon reconnection.

- Introduced piplineIndex property on pipelined commands to allow for later
cleanup
- Added a routine to event_handler that aborts any pipelined commands inside
commandQueue and offlineQueue that were interrupted in the middle of the
pipeline
- Introduced inTransaction property on commands to simplify pipeline logic
- Added a flags param to mock_server to allow the Elasticache disconnect
behavior to be simulated
- Added a reconnect_on_error test case for transactions
- Added some test cases testing for correct handling of this unique elasticache
behavior
- Added unit tests to validate inTransaction and pipelineIndex setting

Fixes redis#965
  • Loading branch information
alavers committed Mar 27, 2020
1 parent 4bbdfd6 commit a1f444c
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 16 deletions.
2 changes: 1 addition & 1 deletion examples/basic_operations.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ redis.sadd("set", [1, 3, 5, 7]);
redis.spop("set"); // Promise resolves to "5" or another item in the set

// Most responses are strings, or arrays of strings
redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three")
redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three");
redis.zrange("sortedSet", 0, 2, "WITHSCORES").then(res => console.log(res)); // Promise resolves to ["one", "1", "dos", "2", "three", "3"] as if the command was ` redis> ZRANGE sortedSet 0 2 WITHSCORES `

// Some responses have transformers to JS values
Expand Down
2 changes: 2 additions & 0 deletions lib/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ export default class Command implements ICommand {
private callback: CallbackFunction;
private transformed: boolean = false;
public isCustomCommand: boolean = false;
public inTransaction: boolean = false;
public pipelineIndex?: number;

private slot?: number | null;
private keys?: Array<string | Buffer>;
Expand Down
21 changes: 8 additions & 13 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,9 @@ Pipeline.prototype.fillResult = function(value, position) {
if (this.isCluster) {
let retriable = true;
let commonError: { name: string; message: string };
let inTransaction: boolean;
for (let i = 0; i < this._result.length; ++i) {
var error = this._result[i][0];
var command = this._queue[i];
if (command.name === "multi") {
inTransaction = true;
} else if (command.name === "exec") {
inTransaction = false;
}
if (error) {
if (
command.name === "exec" &&
Expand All @@ -94,7 +88,7 @@ Pipeline.prototype.fillResult = function(value, position) {
retriable = false;
break;
}
} else if (!inTransaction) {
} else if (!command.inTransaction) {
var isReadOnly =
exists(command.name) && hasFlag(command.name, "readonly");
if (!isReadOnly) {
Expand All @@ -107,7 +101,7 @@ Pipeline.prototype.fillResult = function(value, position) {
var _this = this;
var errv = commonError.message.split(" ");
var queue = this._queue;
inTransaction = false;
let inTransaction = false;
this._queue = [];
for (let i = 0; i < queue.length; ++i) {
if (
Expand All @@ -122,11 +116,7 @@ Pipeline.prototype.fillResult = function(value, position) {
}
queue[i].initPromise();
this.sendCommand(queue[i]);
if (queue[i].name === "multi") {
inTransaction = true;
} else if (queue[i].name === "exec") {
inTransaction = false;
}
inTransaction = queue[i].inTransaction;
}

let matched = true;
Expand Down Expand Up @@ -174,7 +164,12 @@ Pipeline.prototype.fillResult = function(value, position) {
};

Pipeline.prototype.sendCommand = function(command) {
if (this._transactions > 0) {
command.inTransaction = true;
}

const position = this._queue.length;
command.pipelineIndex = position;

command.promise
.then(result => {
Expand Down
62 changes: 62 additions & 0 deletions lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"use strict";

import Deque = require("denque");
import { AbortError } from "redis-errors";
import Command from "../command";
import { MaxRetriesPerRequestError } from "../errors";
import { ICommandItem, ICommand } from "../types";
import { Debug, noop, CONNECTION_CLOSED_ERROR_MSG } from "../utils";
import DataHandler from "../DataHandler";

Expand Down Expand Up @@ -77,6 +80,61 @@ export function connectHandler(self) {
};
}

function abortError(command: ICommand) {
const err = new AbortError("Command aborted due to connection close");
(err as any).command = {
name: command.name,
args: command.args
};
return err;
}

// If a contiguous set of pipeline commands starts from index zero then they
// can be safely reattempted. If however we have a chain of pipelined commands
// starting at index 1 or more it means we received a partial response before
// the connection close and those pipelined commands must be aborted. For
// example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after
// aborting and purging we'll have a queue that looks like this: [0, 1, 2]
function abortIncompletePipelines(commandQueue: Deque<ICommandItem>) {
let expectedIndex = 0;
for (let i = 0; i < commandQueue.length; ) {
const command = commandQueue.peekAt(i).command as Command;
const pipelineIndex = command.pipelineIndex;
if (pipelineIndex === undefined || pipelineIndex === 0) {
expectedIndex = 0;
}
if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) {
commandQueue.remove(i, 1);
command.reject(abortError(command));
continue;
}
i++;
}
}

// If only a partial transaction result was received before connection close,
// we have to abort any transaction fragments that may have ended up in the
// offline queue
function abortTransactionFragments(commandQueue: Deque<ICommandItem>) {
for (let i = 0; i < commandQueue.length; ) {
const command = commandQueue.peekAt(i).command as Command;
if (command.name === "multi") {
break;
}
if (command.name === "exec") {
commandQueue.remove(i, 1);
command.reject(abortError(command));
break;
}
if ((command as Command).inTransaction) {
commandQueue.remove(i, 1);
command.reject(abortError(command));
} else {
i++;
}
}
}

export function closeHandler(self) {
return function() {
self.setStatus("close");
Expand All @@ -85,8 +143,12 @@ export function closeHandler(self) {
self.prevCondition = self.condition;
}
if (self.commandQueue.length) {
abortIncompletePipelines(self.commandQueue);
self.prevCommandQueue = self.commandQueue;
}
if (self.offlineQueue.length) {
abortTransactionFragments(self.offlineQueue);
}

if (self.manuallyClosing) {
self.manuallyClosing = false;
Expand Down
154 changes: 154 additions & 0 deletions test/functional/elasticache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import * as sinon from "sinon";
import Redis from "../../lib/redis";
import { expect } from "chai";
import MockServer from "../helpers/mock_server";

// AWS Elasticache closes the connection immediately when it encounters a READONLY error
function simulateElasticache(options: {
reconnectOnErrorValue: boolean | number;
}) {
let inTransaction = false;
const mockServer = new MockServer(30000, (argv, socket, flags) => {
switch (argv[0]) {
case "multi":
inTransaction = true;
return MockServer.raw("+OK\r\n");
case "del":
flags.disconnect = true;
return new Error(
"READONLY You can't write against a read only replica."
);
case "get":
return inTransaction ? MockServer.raw("+QUEUED\r\n") : argv[1];
case "exec":
inTransaction = false;
return [];
}
});

return new Redis({
port: 30000,
reconnectOnError(err: Error): boolean | number {
// bring the mock server back up
mockServer.connect();
return options.reconnectOnErrorValue;
}
});
}

function expectReplyError(err) {
expect(err).to.exist;
expect(err.name).to.eql("ReplyError");
}

function expectAbortError(err) {
expect(err).to.exist;
expect(err.name).to.eql("AbortError");
expect(err.message).to.eql("Command aborted due to connection close");
}

describe("elasticache", function() {
it("should abort a failed transaction when connection is lost", function(done) {
const redis = simulateElasticache({ reconnectOnErrorValue: true });

redis
.multi()
.del("foo")
.del("bar")
.exec(err => {
expectAbortError(err);
expect(err.command).to.eql({
name: "exec",
args: []
});
expect(err.previousErrors).to.have.lengthOf(2);
expectReplyError(err.previousErrors[0]);
expect(err.previousErrors[0].command).to.eql({
name: "del",
args: ["foo"]
});
expectAbortError(err.previousErrors[1]);
expect(err.previousErrors[1].command).to.eql({
name: "del",
args: ["bar"]
});

// ensure we've recovered into a healthy state
redis.get("foo", (err, res) => {
expect(res).to.eql("foo");
done();
});
});
});

it("should not resend failed transaction commands", function(done) {
const redis = simulateElasticache({ reconnectOnErrorValue: 2 });
redis
.multi()
.del("foo")
.get("bar")
.exec(err => {
expectAbortError(err);
expect(err.command).to.eql({
name: "exec",
args: []
});
expect(err.previousErrors).to.have.lengthOf(2);
expectAbortError(err.previousErrors[0]);
expect(err.previousErrors[0].command).to.eql({
name: "del",
args: ["foo"]
});
expectAbortError(err.previousErrors[1]);
expect(err.previousErrors[1].command).to.eql({
name: "get",
args: ["bar"]
});

// ensure we've recovered into a healthy state
redis.get("foo", (err, res) => {
expect(res).to.eql("foo");
done();
});
});
});

it("should resend intact pipelines", function(done) {
const redis = simulateElasticache({ reconnectOnErrorValue: true });

let p1Result;
const p1 = redis
.pipeline()
.del("foo")
.get("bar")
.exec((err, result) => (p1Result = result));

const p2 = redis
.pipeline()
.get("baz")
.get("qux")
.exec((err, p2Result) => {
// First pipeline should have been aborted
expect(p1Result).to.have.lengthOf(2);
expect(p1Result[0]).to.have.lengthOf(1);
expect(p1Result[1]).to.have.lengthOf(1);
expectReplyError(p1Result[0][0]);
expect(p1Result[0][0].command).to.eql({
name: "del",
args: ["foo"]
});
expectAbortError(p1Result[1][0]);
expect(p1Result[1][0].command).to.eql({
name: "get",
args: ["bar"]
});

// Second pipeline was intact and should have been retried successfully
expect(p2Result).to.have.lengthOf(2);
expect(p2Result[0]).to.eql([null, "baz"]);
expect(p2Result[1]).to.eql([null, "qux"]);

done();
});
});
});
35 changes: 35 additions & 0 deletions test/functional/reconnect_on_error.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Redis from "../../lib/redis";
import { expect } from "chai";
import * as sinon from "sinon";

describe("reconnectOnError", function() {
it("should pass the error as the first param", function(done) {
Expand Down Expand Up @@ -109,4 +110,38 @@ describe("reconnectOnError", function() {
done();
});
});

it("should work with pipelined multi", function(done) {
var redis = new Redis({
reconnectOnError: function() {
// deleting foo allows sadd below to succeed on the second try
redis.del("foo");
return 2;
}
});
var delSpy = sinon.spy(redis, "del");

redis.set("foo", "bar");
redis.set("i", 1);
redis
.pipeline()
.sadd("foo", "a") // trigger a WRONGTYPE error
.multi()
.get("foo")
.incr("i")
.exec()
.exec(function(err, res) {
expect(delSpy.calledOnce).to.eql(true);
expect(delSpy.firstCall.args[0]).to.eql("foo");
expect(err).to.be.null;
expect(res).to.eql([
[null, 1],
[null, "OK"],
[null, "QUEUED"],
[null, "QUEUED"],
[null, ["bar", 2]]
]);
done();
});
});
});
16 changes: 14 additions & 2 deletions test/helpers/mock_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ export function getConnectionName(socket: Socket): string | undefined {
return connectionNameMap.get(socket);
}

export type MockServerHandler = (reply: any, socket: Socket) => any;
interface Flags {
disconnect?: boolean;
}
export type MockServerHandler = (
reply: any,
socket: Socket,
flags: Flags
) => any;

export default class MockServer extends EventEmitter {
static REDIS_OK = "+OK";
Expand Down Expand Up @@ -84,7 +91,12 @@ export default class MockServer extends EventEmitter {
this.write(c, this.slotTable);
return;
}
this.write(c, this.handler && this.handler(reply, c));
let flags: Flags = {};
let handlerResult = this.handler && this.handler(reply, c, flags);
this.write(c, handlerResult);
if (flags.disconnect) {
this.disconnect();
}
},
returnError: function() {}
});
Expand Down

0 comments on commit a1f444c

Please sign in to comment.