Skip to content

Commit

Permalink
Merge pull request mqttjs#869 from ogis-fujiwara/cb-store-put
Browse files Browse the repository at this point in the history
Add new callback called when message is put into `outgoingStore`.
  • Loading branch information
mcollina authored and redboltz committed May 19, 2019
1 parent 5ab6175 commit 3fa438c
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ Publish a message to a topic
* `qos` QoS level, `Number`, default `0`
* `retain` retain flag, `Boolean`, default `false`
* `dup` mark as duplicate flag, `Boolean`, default `false`
* `cbStorePut` - `function ()`, fired when message is put into `outgoingStore` if QoS is `1` or `2`.
* `callback` - `function (err)`, fired when the QoS handling completes,
or at the next tick if QoS 0. An error occurs if client is disconnecting.

Expand Down
19 changes: 12 additions & 7 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ function flush (queue) {
}
}

function storeAndSend (client, packet, cb) {
function storeAndSend (client, packet, cb, cbStorePut) {
client.outgoingStore.put(packet, function storedPacket (err) {
if (err) {
return cb && cb(err)
}
cbStorePut()
sendPacket(client, packet, cb)
})
}
Expand Down Expand Up @@ -306,6 +307,7 @@ MqttClient.prototype._checkDisconnecting = function (callback) {
* {Number} qos - qos level to publish on
* {Boolean} retain - whether or not to retain the message
* {Boolean} dup - whether or not mark a message as duplicate
* {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
* @param {Function} [callback] - function(err){}
* called when publish succeeds or fails
* @returns {MqttClient} this - for chaining
Expand Down Expand Up @@ -346,13 +348,12 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
switch (opts.qos) {
case 1:
case 2:

// Add to callbacks
this.outgoing[packet.messageId] = callback || nop
this._sendPacket(packet)
this._sendPacket(packet, undefined, opts.cbStorePut)
break
default:
this._sendPacket(packet, callback)
this._sendPacket(packet, callback, opts.cbStorePut)
break
}

Expand Down Expand Up @@ -715,9 +716,12 @@ MqttClient.prototype._cleanUp = function (forced, done) {
* @param {String} type - packet type (see `protocol`)
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @api private
*/
MqttClient.prototype._sendPacket = function (packet, cb) {
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
cbStorePut = cbStorePut || nop

if (!this.connected) {
if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
this.queue.push({ packet: packet, cb: cb })
Expand All @@ -727,6 +731,7 @@ MqttClient.prototype._sendPacket = function (packet, cb) {
if (err) {
return cb && cb(err)
}
cbStorePut()
})
} else if (cb) {
cb(new Error('No connection to broker'))
Expand All @@ -742,7 +747,7 @@ MqttClient.prototype._sendPacket = function (packet, cb) {
case 'publish':
break
case 'pubrel':
storeAndSend(this, packet, cb)
storeAndSend(this, packet, cb, cbStorePut)
return
default:
sendPacket(this, packet, cb)
Expand All @@ -752,7 +757,7 @@ MqttClient.prototype._sendPacket = function (packet, cb) {
switch (packet.qos) {
case 2:
case 1:
storeAndSend(this, packet, cb)
storeAndSend(this, packet, cb, cbStorePut)
break
/**
* no need of case here since it will be caught by default
Expand Down
42 changes: 42 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,48 @@ module.exports = function (server, config) {
})
})
})

function testCallbackStorePutByQoS (qos, clean, expected, done) {
var client = connect({
clean: clean,
clientId: 'testId'
})

var callbacks = []

function cbStorePut () {
callbacks.push('storeput')
}

client.on('connect', function () {
client.publish('test', 'test', {qos: qos, cbStorePut: cbStorePut}, function (err) {
if (err) done(err)
callbacks.push('publish')
should.deepEqual(callbacks, expected)
done()
})
client.end()
})
}

it('should not call cbStorePut when publishing message with QoS `0` and clean `true`', function (done) {
testCallbackStorePutByQoS(0, true, ['publish'], done)
})
it('should not call cbStorePut when publishing message with QoS `0` and clean `false`', function (done) {
testCallbackStorePutByQoS(0, false, ['publish'], done)
})
it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `true`', function (done) {
testCallbackStorePutByQoS(1, true, ['storeput', 'publish'], done)
})
it('should call cbStorePut before publish completes when publishing message with QoS `1` and clean `false`', function (done) {
testCallbackStorePutByQoS(1, false, ['storeput', 'publish'], done)
})
it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `true`', function (done) {
testCallbackStorePutByQoS(2, true, ['storeput', 'publish'], done)
})
it('should call cbStorePut before publish completes when publishing message with QoS `2` and clean `false`', function (done) {
testCallbackStorePutByQoS(2, false, ['storeput', 'publish'], done)
})
})

describe('unsubscribing', function () {
Expand Down
8 changes: 6 additions & 2 deletions types/lib/client.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export declare type OnMessageCallback = (topic: string, payload: Buffer, packet:
export declare type OnPacketCallback = (packet: Packet) => void
export declare type OnErrorCallback = (error: Error) => void
export declare type PacketCallback = (error?: Error, packet?: Packet) => any
export declare type StorePutCallback = () => void
export declare type CloseCallback = () => void

export interface IStream extends events.EventEmitter {
Expand Down Expand Up @@ -92,6 +93,9 @@ export declare class MqttClient extends events.EventEmitter {
*
* @param {Function} [callback] - function(err){}
* called when publish succeeds or fails
*
* @param {Function} [cbStorePut] - function(){}
* called when message is put into outgoingStore
* @returns {Client} this - for chaining
* @api public
*
Expand All @@ -101,9 +105,9 @@ export declare class MqttClient extends events.EventEmitter {
* @example client.publish('topic', 'message', console.log)
*/
public publish (topic: string, message: string | Buffer,
opts: IClientPublishOptions, callback?: PacketCallback): this
opts: IClientPublishOptions, callback?: PacketCallback, cbStorePut?: StorePutCallback): this
public publish (topic: string, message: string | Buffer,
callback?: PacketCallback): this
callback?: PacketCallback, cbStorePut?: StorePutCallback): this

/**
* subscribe - subscribe to <topic>
Expand Down

0 comments on commit 3fa438c

Please sign in to comment.