Skip to content

Commit

Permalink
fix: delete completed incoming QOS 2 messages (#893)
Browse files Browse the repository at this point in the history
* fix: delete completed incoming QOS 2 messages

* delay deleting incoming QOS 2 messages from store until PUBCOMP has been sent

* verify handshake order of QOS 2

* ensure QOS 2 message is only handled once, when sending pubcomp fails and multiple pubrel are received

* add test to check incoming store is empty after QOS 2 handshake
  • Loading branch information
joeblynch authored and mcollina committed Dec 14, 2018
1 parent e6b5f7d commit 9a39faa
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 13 deletions.
12 changes: 4 additions & 8 deletions lib/client.js
Expand Up @@ -1215,18 +1215,14 @@ MqttClient.prototype._handlePubrel = function (packet, callback) {
var comp = {cmd: 'pubcomp', messageId: mid}

that.incomingStore.get(packet, function (err, pub) {
if (!err && pub.cmd !== 'pubrel') {
if (!err) {
that.emit('message', pub.topic, pub.payload, pub)
that.incomingStore.put(packet, function (err) {
that.handleMessage(pub, function (err) {
if (err) {
return callback(err)
}
that.handleMessage(pub, function (err) {
if (err) {
return callback(err)
}
that._sendPacket(comp, callback)
})
that.incomingStore.del(pub, nop)
that._sendPacket(comp, callback)
})
} else {
that._sendPacket(comp, callback)
Expand Down
161 changes: 156 additions & 5 deletions test/abstract_client.js
Expand Up @@ -1008,7 +1008,7 @@ module.exports = function (server, config) {
return new AsyncStore()
}
}
AsyncStore.prototype.put = function (packet, cb) {
AsyncStore.prototype.del = function (packet, cb) {
process.nextTick(function () {
cb(new Error('Error'))
})
Expand All @@ -1031,15 +1031,15 @@ module.exports = function (server, config) {
})

it('should handle success with async incoming store in QoS 2 `handlePubrel` method', function (done) {
var putComplete = false
var delComplete = false
function AsyncStore () {
if (!(this instanceof AsyncStore)) {
return new AsyncStore()
}
}
AsyncStore.prototype.put = function (packet, cb) {
AsyncStore.prototype.del = function (packet, cb) {
process.nextTick(function () {
putComplete = true
delComplete = true
cb(null)
})
}
Expand All @@ -1055,7 +1055,7 @@ module.exports = function (server, config) {
messageId: 1,
qos: 2
}, function () {
putComplete.should.equal(true)
delComplete.should.equal(true)
done()
client.end()
})
Expand Down Expand Up @@ -2002,11 +2002,78 @@ module.exports = function (server, config) {
var testTopic = 'test'
var testMessage = 'message'
var mid = 253
var publishReceived = false
var pubrecReceived = false
var pubrelReceived = false

client.once('connect', function () {
client.subscribe(testTopic, {qos: 2})
})

client.on('packetreceive', (packet) => {
switch (packet.cmd) {
case 'connack':
case 'suback':
// expected, but not specifically part of QOS 2 semantics
break
case 'publish':
pubrecReceived.should.be.false()
pubrelReceived.should.be.false()
publishReceived = true
break
case 'pubrel':
publishReceived.should.be.true()
pubrecReceived.should.be.true()
pubrelReceived = true
break
default:
should.fail()
}
})

server.once('client', function (serverClient) {
serverClient.once('subscribe', function () {
serverClient.publish({
topic: testTopic,
payload: testMessage,
qos: 2,
messageId: mid
})
})

serverClient.on('pubrec', function () {
publishReceived.should.be.true()
pubrelReceived.should.be.false()
pubrecReceived = true
})

serverClient.once('pubcomp', function () {
client.removeAllListeners()
serverClient.removeAllListeners()
publishReceived.should.be.true()
pubrecReceived.should.be.true()
pubrelReceived.should.be.true()
done()
})
})
})

it('should should empty the incoming store after a qos 2 handshake is completed', function (done) {
var client = connect()
var testTopic = 'test'
var testMessage = 'message'
var mid = 253

client.once('connect', function () {
client.subscribe(testTopic, {qos: 2})
})

client.on('packetreceive', (packet) => {
if (packet.cmd === 'pubrel') {
should(client.incomingStore._inflights.size).be.equal(1)
}
})

server.once('client', function (serverClient) {
serverClient.once('subscribe', function () {
serverClient.publish({
Expand All @@ -2018,10 +2085,94 @@ module.exports = function (server, config) {
})

serverClient.once('pubcomp', function () {
should(client.incomingStore._inflights.size).be.equal(0)
client.removeAllListeners()
done()
})
})
})

function testMultiplePubrel (shouldSendPubcompFail, done) {
var client = connect()
var testTopic = 'test'
var testMessage = 'message'
var mid = 253
var pubcompCount = 0
var pubrelCount = 0
var handleMessageCount = 0
var emitMessageCount = 0
var origSendPacket = client._sendPacket
var shouldSendFail

client.handleMessage = function (packet, callback) {
handleMessageCount++
callback()
}

client.on('message', function () {
emitMessageCount++
})

client._sendPacket = function (packet, sendDone) {
shouldSendFail = packet.cmd === 'pubcomp' && shouldSendPubcompFail
if (sendDone) {
sendDone(shouldSendFail ? new Error('testing pubcomp failure') : undefined)
}

// send the mocked response
switch (packet.cmd) {
case 'subscribe':
const suback = {cmd: 'suback', messageId: packet.messageId, granted: [2]}
client._handlePacket(suback, function (err) {
should(err).not.be.ok()
})
break
case 'pubrec':
case 'pubcomp':
// for both pubrec and pubcomp, reply with pubrel, simulating the server not receiving the pubcomp
if (packet.cmd === 'pubcomp') {
pubcompCount++
if (pubcompCount === 2) {
// end the test once the client has gone through two rounds of replying to pubrel messages
pubrelCount.should.be.exactly(2)
handleMessageCount.should.be.exactly(1)
emitMessageCount.should.be.exactly(1)
client._sendPacket = origSendPacket
done()
break
}
}

// simulate the pubrel message, either in response to pubrec or to mock pubcomp failing to be received
const pubrel = {cmd: 'pubrel', messageId: mid}
pubrelCount++
client._handlePacket(pubrel, function (err) {
if (shouldSendFail) {
should(err).be.ok()
} else {
should(err).not.be.ok()
}
})
break
}
}

client.once('connect', function () {
client.subscribe(testTopic, {qos: 2})
const publish = {cmd: 'publish', topic: testTopic, payload: testMessage, qos: 2, messageId: mid}
client._handlePacket(publish, function (err) {
should(err).not.be.ok()
})
})
}

it('handle qos 2 messages exactly once when multiple pubrel received', function (done) {
testMultiplePubrel(false, done)
})

it('handle qos 2 messages exactly once when multiple pubrel received and sending pubcomp fails on client', function (done) {
testMultiplePubrel(true, done)
})
})

describe('auto reconnect', function () {
Expand Down

0 comments on commit 9a39faa

Please sign in to comment.