Skip to content

Commit

Permalink
Fix publish interrupt during stored packets processing. (#902)
Browse files Browse the repository at this point in the history
* Fix publish interrupt during stored packets processing.

If `publish()` is called during stored packets processing, store the packet id into `packetIdsDuringStoreProcessing` kvs. The key is packet id and the value is boolean (true: prossesed, false: not processed). At this time, the value is false. If the packet is actually sent, the value is set to true.

When stream process reaches the end, check all of `packetIdsDuringStoreProcessing` are processed, if it doens't, try again flowing process from the beginning. In this process, already processed (but not removed yet because puback is not receieved) packets should be skipped. In order to do that, check  `packetIdsDuringStoreProcessing` value. If it is true, skip it.

* Add '_' prefix to internal properties.

* Refactoring to increase codecov.
  • Loading branch information
redboltz authored and mcollina committed Dec 18, 2018
1 parent 821422e commit 0379bf5
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 54 deletions.
166 changes: 112 additions & 54 deletions lib/client.js
Expand Up @@ -167,6 +167,10 @@ function MqttClient (streamBuilder, options) {
this.connackTimer = null
// Reconnect timer
this.reconnectTimer = null
// Is processing store?
this._storeProcessing = false
// Packet Ids are put into the store during store processing
this._packetIdsDuringStoreProcessing = {}
/**
* MessageIDs starting with 1
* ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810
Expand Down Expand Up @@ -434,10 +438,19 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
case 2:
// Add to callbacks
this.outgoing[packet.messageId] = callback || nop
this._sendPacket(packet, undefined, opts.cbStorePut)
if (this._storeProcessing) {
this._packetIdsDuringStoreProcessing[packet.messageId] = false
this._storePacket(packet, undefined, opts.cbStorePut)
} else {
this._sendPacket(packet, undefined, opts.cbStorePut)
}
break
default:
this._sendPacket(packet, callback, opts.cbStorePut)
if (this._storeProcessing) {
this._storePacket(packet, callback, opts.cbStorePut)
} else {
this._sendPacket(packet, callback, opts.cbStorePut)
}
break
}

Expand Down Expand Up @@ -881,20 +894,7 @@ MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
cbStorePut = cbStorePut || nop

if (!this.connected) {
if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
this.queue.push({ packet: packet, cb: cb })
} else if (packet.qos > 0) {
cb = this.outgoing[packet.messageId]
this.outgoingStore.put(packet, function (err) {
if (err) {
return cb && cb(err)
}
cbStorePut()
})
} else if (cb) {
cb(new Error('No connection to broker'))
}

this._storePacket(packet, cb, cbStorePut)
return
}

Expand Down Expand Up @@ -930,6 +930,32 @@ MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
}
}

/**
* _storePacket - queue a packet
* @param {String} type - packet type (see `protocol`)
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @api private
*/
MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
cbStorePut = cbStorePut || nop

if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
this.queue.push({ packet: packet, cb: cb })
} else if (packet.qos > 0) {
cb = this.outgoing[packet.messageId]
this.outgoingStore.put(packet, function (err) {
if (err) {
return cb && cb(err)
}
cbStorePut()
})
} else if (cb) {
cb(new Error('No connection to broker'))
}
}

/**
* _setupPingTimer - setup the ping timer
*
Expand Down Expand Up @@ -1289,57 +1315,89 @@ MqttClient.prototype._onConnect = function (packet) {
this._resubscribe()

this.connected = true
var outStore = this.outgoingStore.createStream()

this.once('close', remove)
outStore.on('end', function () {
that.removeListener('close', remove)
that.emit('connect', packet)
})
outStore.on('error', function (err) {
that.removeListener('close', remove)
that.emit('error', err)
})

function remove () {
outStore.destroy()
outStore = null
}
function startStreamProcess () {
var outStore = that.outgoingStore.createStream()

function storeDeliver () {
// edge case, we wrapped this twice
if (!outStore) {
return
function clearStoreProcessing () {
that._storeProcessing = false
that._packetIdsDuringStoreProcessing = {}
}

var packet = outStore.read(1)
var cb
that.once('close', remove)
outStore.on('error', function (err) {
clearStoreProcessing()
that.removeListener('close', remove)
that.emit('error', err)
})

if (!packet) {
// read when data is available in the future
outStore.once('readable', storeDeliver)
return
function remove () {
outStore.destroy()
outStore = null
clearStoreProcessing()
}

// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId]
that.outgoing[packet.messageId] = function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}
function storeDeliver () {
// edge case, we wrapped this twice
if (!outStore) {
return
}
that._storeProcessing = true

var packet = outStore.read(1)

var cb

if (!packet) {
// read when data is available in the future
outStore.once('readable', storeDeliver)
return
}

// Skip already processed store packets
if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
storeDeliver()
return
}

// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId]
that.outgoing[packet.messageId] = function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}

storeDeliver()
}
that._packetIdsDuringStoreProcessing[packet.messageId] = true
that._sendPacket(packet)
} else if (outStore.destroy) {
outStore.destroy()
}
that._sendPacket(packet)
} else if (outStore.destroy) {
outStore.destroy()
}
}

outStore.on('end', function () {
var allProcessed = true
for (var id in that._packetIdsDuringStoreProcessing) {
if (!that._packetIdsDuringStoreProcessing[id]) {
allProcessed = false
break
}
}
if (allProcessed) {
clearStoreProcessing()
that.removeListener('close', remove)
that.emit('connect', packet)
} else {
startStreamProcess()
}
})
storeDeliver()
}
// start flowing
storeDeliver()
startStreamProcess()
}

module.exports = MqttClient
58 changes: 58 additions & 0 deletions test/abstract_client.js
Expand Up @@ -536,6 +536,64 @@ module.exports = function (server, config) {
})
})

it('should not interrupt messages', function (done) {
var client = null
var incomingStore = new mqtt.Store({ clean: false })
var outgoingStore = new mqtt.Store({ clean: false })
var publishCount = 0
var server2 = new Server(function (c) {
c.on('connect', function () {
c.connack({returnCode: 0})
})
c.on('publish', function (packet) {
if (packet.qos !== 0) {
c.puback({messageId: packet.messageId})
}
switch (publishCount++) {
case 0:
packet.payload.toString().should.equal('payload1')
break
case 1:
packet.payload.toString().should.equal('payload2')
break
case 2:
packet.payload.toString().should.equal('payload3')
break
case 3:
packet.payload.toString().should.equal('payload4')
server2.close()
done()
break
}
})
})

server2.listen(port + 50, function () {
client = mqtt.connect({
port: port + 50,
host: 'localhost',
clean: false,
clientId: 'cid1',
reconnectPeriod: 0,
incomingStore: incomingStore,
outgoingStore: outgoingStore,
queueQoSZero: true
})
client.on('packetreceive', function (packet) {
if (packet.cmd === 'connack') {
setImmediate(
function () {
client.publish('test', 'payload3', {qos: 1})
client.publish('test', 'payload4', {qos: 0})
}
)
}
})
client.publish('test', 'payload1', {qos: 2})
client.publish('test', 'payload2', {qos: 2})
})
})

it('should call cb if an outgoing QoS 0 message is not sent', function (done) {
var client = connect({queueQoSZero: false})
var called = false
Expand Down

0 comments on commit 0379bf5

Please sign in to comment.