From 0013991b7fbf239ffd74311266bb9e63e22b46cb Mon Sep 17 00:00:00 2001 From: Andrew Lavers Date: Sat, 28 Mar 2020 06:13:06 -0400 Subject: [PATCH] fix: abort incomplete pipelines upon reconnect (#1084) Elasticache severs the connection immediately after it returns a READONLY error. This can sometimes leave queued up pipelined commands in an inconsistent state when the connection is reestablished. For example, if a pipeline has 6 commands and the second one generates a READONLY error, Elasticache will only return results for the first two before severing the connection. Upon reconnect, the pipeline still thinks it has 6 commands to send but the commandQueue has only 4. This fix will detect any pipeline command sets that only had a partial response before connection loss, and abort them. This Elasticache behavior also affects transactions. If reconnectOnError returns 2, some transaction fragments may end up in the offlineQueue. This fix will check the offlineQueue for any such transaction fragments and abort them, so that we don't send mismatched multi/exec to redis upon reconnection. - Introduced piplineIndex property on pipelined commands to allow for later cleanup - Added a routine to event_handler that aborts any pipelined commands inside commandQueue and offlineQueue that were interrupted in the middle of the pipeline - Added a routine to event_handler that removes any transaction fragments from the offline queue - Introduced inTransaction property on commands to simplify pipeline logic - Added a flags param to mock_server to allow the Elasticache disconnect behavior to be simulated - Added a reconnect_on_error test case for transactions - Added some test cases testing for correct handling of this unique elasticache behavior - Added unit tests to validate inTransaction and pipelineIndex setting Fixes #965 --- examples/basic_operations.js | 2 +- lib/command.ts | 2 + lib/pipeline.ts | 21 ++-- lib/redis/event_handler.ts | 62 +++++++++++ test/functional/elasticache.ts | 153 ++++++++++++++++++++++++++ test/functional/reconnect_on_error.ts | 35 ++++++ test/helpers/mock_server.ts | 16 ++- test/unit/pipeline.ts | 69 ++++++++++++ 8 files changed, 344 insertions(+), 16 deletions(-) create mode 100644 test/functional/elasticache.ts create mode 100644 test/unit/pipeline.ts diff --git a/examples/basic_operations.js b/examples/basic_operations.js index ec3876fb..72681293 100644 --- a/examples/basic_operations.js +++ b/examples/basic_operations.js @@ -33,7 +33,7 @@ redis.sadd("set", [1, 3, 5, 7]); redis.spop("set"); // Promise resolves to "5" or another item in the set // Most responses are strings, or arrays of strings -redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three") +redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three"); redis.zrange("sortedSet", 0, 2, "WITHSCORES").then(res => console.log(res)); // Promise resolves to ["one", "1", "dos", "2", "three", "3"] as if the command was ` redis> ZRANGE sortedSet 0 2 WITHSCORES ` // Some responses have transformers to JS values diff --git a/lib/command.ts b/lib/command.ts index 90e36252..66a7d96f 100644 --- a/lib/command.ts +++ b/lib/command.ts @@ -154,6 +154,8 @@ export default class Command implements ICommand { private callback: CallbackFunction; private transformed: boolean = false; public isCustomCommand: boolean = false; + public inTransaction: boolean = false; + public pipelineIndex?: number; private slot?: number | null; private keys?: Array; diff --git a/lib/pipeline.ts b/lib/pipeline.ts index ca436959..b9278049 100644 --- a/lib/pipeline.ts +++ b/lib/pipeline.ts @@ -65,15 +65,9 @@ Pipeline.prototype.fillResult = function(value, position) { if (this.isCluster) { let retriable = true; let commonError: { name: string; message: string }; - let inTransaction: boolean; for (let i = 0; i < this._result.length; ++i) { var error = this._result[i][0]; var command = this._queue[i]; - if (command.name === "multi") { - inTransaction = true; - } else if (command.name === "exec") { - inTransaction = false; - } if (error) { if ( command.name === "exec" && @@ -94,7 +88,7 @@ Pipeline.prototype.fillResult = function(value, position) { retriable = false; break; } - } else if (!inTransaction) { + } else if (!command.inTransaction) { var isReadOnly = exists(command.name) && hasFlag(command.name, "readonly"); if (!isReadOnly) { @@ -107,7 +101,7 @@ Pipeline.prototype.fillResult = function(value, position) { var _this = this; var errv = commonError.message.split(" "); var queue = this._queue; - inTransaction = false; + let inTransaction = false; this._queue = []; for (let i = 0; i < queue.length; ++i) { if ( @@ -122,11 +116,7 @@ Pipeline.prototype.fillResult = function(value, position) { } queue[i].initPromise(); this.sendCommand(queue[i]); - if (queue[i].name === "multi") { - inTransaction = true; - } else if (queue[i].name === "exec") { - inTransaction = false; - } + inTransaction = queue[i].inTransaction; } let matched = true; @@ -174,7 +164,12 @@ Pipeline.prototype.fillResult = function(value, position) { }; Pipeline.prototype.sendCommand = function(command) { + if (this._transactions > 0) { + command.inTransaction = true; + } + const position = this._queue.length; + command.pipelineIndex = position; command.promise .then(result => { diff --git a/lib/redis/event_handler.ts b/lib/redis/event_handler.ts index 15e158e7..709c9f30 100644 --- a/lib/redis/event_handler.ts +++ b/lib/redis/event_handler.ts @@ -1,7 +1,10 @@ "use strict"; +import Deque = require("denque"); +import { AbortError } from "redis-errors"; import Command from "../command"; import { MaxRetriesPerRequestError } from "../errors"; +import { ICommandItem, ICommand } from "../types"; import { Debug, noop, CONNECTION_CLOSED_ERROR_MSG } from "../utils"; import DataHandler from "../DataHandler"; @@ -77,6 +80,61 @@ export function connectHandler(self) { }; } +function abortError(command: ICommand) { + const err = new AbortError("Command aborted due to connection close"); + (err as any).command = { + name: command.name, + args: command.args + }; + return err; +} + +// If a contiguous set of pipeline commands starts from index zero then they +// can be safely reattempted. If however we have a chain of pipelined commands +// starting at index 1 or more it means we received a partial response before +// the connection close and those pipelined commands must be aborted. For +// example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after +// aborting and purging we'll have a queue that looks like this: [0, 1, 2] +function abortIncompletePipelines(commandQueue: Deque) { + let expectedIndex = 0; + for (let i = 0; i < commandQueue.length; ) { + const command = commandQueue.peekAt(i).command as Command; + const pipelineIndex = command.pipelineIndex; + if (pipelineIndex === undefined || pipelineIndex === 0) { + expectedIndex = 0; + } + if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) { + commandQueue.remove(i, 1); + command.reject(abortError(command)); + continue; + } + i++; + } +} + +// If only a partial transaction result was received before connection close, +// we have to abort any transaction fragments that may have ended up in the +// offline queue +function abortTransactionFragments(commandQueue: Deque) { + for (let i = 0; i < commandQueue.length; ) { + const command = commandQueue.peekAt(i).command as Command; + if (command.name === "multi") { + break; + } + if (command.name === "exec") { + commandQueue.remove(i, 1); + command.reject(abortError(command)); + break; + } + if ((command as Command).inTransaction) { + commandQueue.remove(i, 1); + command.reject(abortError(command)); + } else { + i++; + } + } +} + export function closeHandler(self) { return function() { self.setStatus("close"); @@ -85,8 +143,12 @@ export function closeHandler(self) { self.prevCondition = self.condition; } if (self.commandQueue.length) { + abortIncompletePipelines(self.commandQueue); self.prevCommandQueue = self.commandQueue; } + if (self.offlineQueue.length) { + abortTransactionFragments(self.offlineQueue); + } if (self.manuallyClosing) { self.manuallyClosing = false; diff --git a/test/functional/elasticache.ts b/test/functional/elasticache.ts new file mode 100644 index 00000000..2af3bb1f --- /dev/null +++ b/test/functional/elasticache.ts @@ -0,0 +1,153 @@ +import Redis from "../../lib/redis"; +import { expect } from "chai"; +import MockServer from "../helpers/mock_server"; + +// AWS Elasticache closes the connection immediately when it encounters a READONLY error +function simulateElasticache(options: { + reconnectOnErrorValue: boolean | number; +}) { + let inTransaction = false; + const mockServer = new MockServer(30000, (argv, socket, flags) => { + switch (argv[0]) { + case "multi": + inTransaction = true; + return MockServer.raw("+OK\r\n"); + case "del": + flags.disconnect = true; + return new Error( + "READONLY You can't write against a read only replica." + ); + case "get": + return inTransaction ? MockServer.raw("+QUEUED\r\n") : argv[1]; + case "exec": + inTransaction = false; + return []; + } + }); + + return new Redis({ + port: 30000, + reconnectOnError(err: Error): boolean | number { + // bring the mock server back up + mockServer.connect(); + return options.reconnectOnErrorValue; + } + }); +} + +function expectReplyError(err) { + expect(err).to.exist; + expect(err.name).to.eql("ReplyError"); +} + +function expectAbortError(err) { + expect(err).to.exist; + expect(err.name).to.eql("AbortError"); + expect(err.message).to.eql("Command aborted due to connection close"); +} + +describe("elasticache", function() { + it("should abort a failed transaction when connection is lost", function(done) { + const redis = simulateElasticache({ reconnectOnErrorValue: true }); + + redis + .multi() + .del("foo") + .del("bar") + .exec(err => { + expectAbortError(err); + expect(err.command).to.eql({ + name: "exec", + args: [] + }); + expect(err.previousErrors).to.have.lengthOf(2); + expectReplyError(err.previousErrors[0]); + expect(err.previousErrors[0].command).to.eql({ + name: "del", + args: ["foo"] + }); + expectAbortError(err.previousErrors[1]); + expect(err.previousErrors[1].command).to.eql({ + name: "del", + args: ["bar"] + }); + + // ensure we've recovered into a healthy state + redis.get("foo", (err, res) => { + expect(res).to.eql("foo"); + done(); + }); + }); + }); + + it("should not resend failed transaction commands", function(done) { + const redis = simulateElasticache({ reconnectOnErrorValue: 2 }); + redis + .multi() + .del("foo") + .get("bar") + .exec(err => { + expectAbortError(err); + expect(err.command).to.eql({ + name: "exec", + args: [] + }); + expect(err.previousErrors).to.have.lengthOf(2); + expectAbortError(err.previousErrors[0]); + expect(err.previousErrors[0].command).to.eql({ + name: "del", + args: ["foo"] + }); + expectAbortError(err.previousErrors[1]); + expect(err.previousErrors[1].command).to.eql({ + name: "get", + args: ["bar"] + }); + + // ensure we've recovered into a healthy state + redis.get("foo", (err, res) => { + expect(res).to.eql("foo"); + done(); + }); + }); + }); + + it("should resend intact pipelines", function(done) { + const redis = simulateElasticache({ reconnectOnErrorValue: true }); + + let p1Result; + redis + .pipeline() + .del("foo") + .get("bar") + .exec((err, result) => (p1Result = result)); + + redis + .pipeline() + .get("baz") + .get("qux") + .exec((err, p2Result) => { + // First pipeline should have been aborted + expect(p1Result).to.have.lengthOf(2); + expect(p1Result[0]).to.have.lengthOf(1); + expect(p1Result[1]).to.have.lengthOf(1); + expectReplyError(p1Result[0][0]); + expect(p1Result[0][0].command).to.eql({ + name: "del", + args: ["foo"] + }); + expectAbortError(p1Result[1][0]); + expect(p1Result[1][0].command).to.eql({ + name: "get", + args: ["bar"] + }); + + // Second pipeline was intact and should have been retried successfully + expect(p2Result).to.have.lengthOf(2); + expect(p2Result[0]).to.eql([null, "baz"]); + expect(p2Result[1]).to.eql([null, "qux"]); + + done(); + }); + }); +}); diff --git a/test/functional/reconnect_on_error.ts b/test/functional/reconnect_on_error.ts index 803730f5..b8f1bd2c 100644 --- a/test/functional/reconnect_on_error.ts +++ b/test/functional/reconnect_on_error.ts @@ -1,5 +1,6 @@ import Redis from "../../lib/redis"; import { expect } from "chai"; +import * as sinon from "sinon"; describe("reconnectOnError", function() { it("should pass the error as the first param", function(done) { @@ -109,4 +110,38 @@ describe("reconnectOnError", function() { done(); }); }); + + it("should work with pipelined multi", function(done) { + var redis = new Redis({ + reconnectOnError: function() { + // deleting foo allows sadd below to succeed on the second try + redis.del("foo"); + return 2; + } + }); + var delSpy = sinon.spy(redis, "del"); + + redis.set("foo", "bar"); + redis.set("i", 1); + redis + .pipeline() + .sadd("foo", "a") // trigger a WRONGTYPE error + .multi() + .get("foo") + .incr("i") + .exec() + .exec(function(err, res) { + expect(delSpy.calledOnce).to.eql(true); + expect(delSpy.firstCall.args[0]).to.eql("foo"); + expect(err).to.be.null; + expect(res).to.eql([ + [null, 1], + [null, "OK"], + [null, "QUEUED"], + [null, "QUEUED"], + [null, ["bar", 2]] + ]); + done(); + }); + }); }); diff --git a/test/helpers/mock_server.ts b/test/helpers/mock_server.ts index a9a6408a..b6905aa6 100644 --- a/test/helpers/mock_server.ts +++ b/test/helpers/mock_server.ts @@ -32,7 +32,14 @@ export function getConnectionName(socket: Socket): string | undefined { return connectionNameMap.get(socket); } -export type MockServerHandler = (reply: any, socket: Socket) => any; +interface IFlags { + disconnect?: boolean; +} +export type MockServerHandler = ( + reply: any, + socket: Socket, + flags: IFlags +) => any; export default class MockServer extends EventEmitter { static REDIS_OK = "+OK"; @@ -84,7 +91,12 @@ export default class MockServer extends EventEmitter { this.write(c, this.slotTable); return; } - this.write(c, this.handler && this.handler(reply, c)); + let flags: Flags = {}; + let handlerResult = this.handler && this.handler(reply, c, flags); + this.write(c, handlerResult); + if (flags.disconnect) { + this.disconnect(); + } }, returnError: function() {} }); diff --git a/test/unit/pipeline.ts b/test/unit/pipeline.ts new file mode 100644 index 00000000..b3e2a63b --- /dev/null +++ b/test/unit/pipeline.ts @@ -0,0 +1,69 @@ +import * as sinon from "sinon"; +import { expect } from "chai"; +import Pipeline from "../../lib/pipeline"; +import Commander from "../../lib/commander"; +import Redis from "../../lib/redis"; + +describe("Pipeline", function() { + beforeEach(() => { + sinon.stub(Redis.prototype, "connect").resolves(); + sinon.stub(Commander.prototype, "sendCommand").callsFake(command => { + return command; + }); + }); + + afterEach(() => { + sinon.restore(); + }); + + it("should properly mark commands as transactions", function() { + const redis = new Redis(); + const p = new Pipeline(redis); + let i = 0; + + function validate(name, inTransaction) { + const command = p._queue[i++]; + expect(command.name).to.eql(name); + expect(command.inTransaction).to.eql(inTransaction); + } + + p.get(); + p.multi(); + p.get(); + p.multi(); + p.exec(); + p.exec(); + p.get(); + + validate("get", false); + validate("multi", true); + validate("get", true); + validate("multi", true); + validate("exec", true); + validate("exec", false); + validate("get", false); + }); + + it("should properly set pipelineIndex on commands", function() { + const redis = new Redis(); + const p = new Pipeline(redis); + let i = 0; + + function validate(name) { + const command = p._queue[i]; + expect(command.name).to.eql(name); + expect(command.pipelineIndex).to.eql(i); + i++; + } + + p.get(); + p.set(); + p.del(); + p.ping(); + + validate("get"); + validate("set"); + validate("del"); + validate("ping"); + }); +});