Skip to content

Commit

Permalink
resubscribe mqtt5 fix (#946)
Browse files Browse the repository at this point in the history
* resubscribe mqtt5 fix

* for loop rebase
  • Loading branch information
scarry1992 authored and mcollina committed May 14, 2019
1 parent 93046a5 commit e8de240
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
19 changes: 16 additions & 3 deletions lib/client.js
Expand Up @@ -542,6 +542,7 @@ MqttClient.prototype.subscribe = function () {
currentOpts.nl = opts.nl
currentOpts.rap = opts.rap
currentOpts.rh = opts.rh
currentOpts.properties = opts.properties
}
subs.push(currentOpts)
}
Expand All @@ -561,6 +562,7 @@ MqttClient.prototype.subscribe = function () {
currentOpts.nl = obj[k].nl
currentOpts.rap = obj[k].rap
currentOpts.rh = obj[k].rh
currentOpts.properties = opts.properties
}
subs.push(currentOpts)
}
Expand Down Expand Up @@ -595,6 +597,7 @@ MqttClient.prototype.subscribe = function () {
topic.nl = sub.nl || false
topic.rap = sub.rap || false
topic.rh = sub.rh || 0
topic.properties = sub.properties
}
that._resubscribeTopics[sub.topic] = topic
topics.push(sub.topic)
Expand Down Expand Up @@ -1305,12 +1308,22 @@ MqttClient.prototype.getLastMessageId = function () {
* @api private
*/
MqttClient.prototype._resubscribe = function (connack) {
var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
if (!this._firstConnection &&
(this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) &&
Object.keys(this._resubscribeTopics).length > 0) {
_resubscribeTopicsKeys.length > 0) {
if (this.options.resubscribe) {
this._resubscribeTopics.resubscribe = true
this.subscribe(this._resubscribeTopics)
if (this.options.protocolVersion === 5) {
for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
var resubscribeTopic = {}
resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
resubscribeTopic.resubscribe = true
this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties})
}
} else {
this._resubscribeTopics.resubscribe = true
this.subscribe(this._resubscribeTopics)
}
} else {
this._resubscribeTopics = {}
}
Expand Down
1 change: 1 addition & 0 deletions test/abstract_client.js
Expand Up @@ -1777,6 +1777,7 @@ module.exports = function (server, config) {
result.nl = false
result.rap = false
result.rh = 0
result.properties = undefined
}
granted.should.containEql(result)
done()
Expand Down
54 changes: 54 additions & 0 deletions test/client.js
Expand Up @@ -751,6 +751,60 @@ describe('MqttClient', function () {
})
})

it('should resubscribe when reconnecting with protocolVersion 5 and properties', function (done) {
this.timeout(15000)
var tryReconnect = true
var reconnectEvent = false
var server326 = new Server(function (client) {
client.on('connect', function (packet) {
client.on('subscribe', function (packet) {
if (!reconnectEvent) {
client.suback({
messageId: packet.messageId,
granted: packet.subscriptions.map(function (e) {
return e.qos
})
})
} else {
if (!tryReconnect) {
should(packet.properties.userProperties.test).be.equal('test')
client.end()
server326.close()
done()
}
}
})
client.connack({
reasonCode: 0,
sessionPresent: false
})
})
}).listen(port + 326)
var opts = {
host: 'localhost',
port: port + 326,
protocolVersion: 5
}
var client = mqtt.connect(opts)

client.on('reconnect', function () {
reconnectEvent = true
})

client.on('connect', function (connack) {
should(connack.sessionPresent).be.equal(false)
if (tryReconnect) {
client.subscribe('hello', { properties: { userProperties: { test: 'test' } } }, function () {
client.stream.end()
})

tryReconnect = false
} else {
reconnectEvent.should.equal(true)
}
})
})

var serverErr = new Server(function (client) {
client.on('connect', function (packet) {
client.connack({
Expand Down

0 comments on commit e8de240

Please sign in to comment.