Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delayed emit timing of connect event. #866

Merged
merged 4 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
171 changes: 92 additions & 79 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,74 +129,15 @@ function MqttClient (streamBuilder, options) {
// Inflight callbacks
this.outgoing = {}

// Mark connected on connect
this.on('connect', function () {
if (this.disconnected) {
return
}

this.connected = true
var outStore = this.outgoingStore.createStream()

this.once('close', remove)
outStore.on('end', function () {
that.removeListener('close', remove)
})
outStore.on('error', function (err) {
that.removeListener('close', remove)
that.emit('error', err)
})

function remove () {
outStore.destroy()
outStore = null
}

function storeDeliver () {
// edge case, we wrapped this twice
if (!outStore) {
return
}

var packet = outStore.read(1)
var cb

if (!packet) {
// read when data is available in the future
outStore.once('readable', storeDeliver)
return
}

// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId]
that.outgoing[packet.messageId] = function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}

storeDeliver()
}
that._sendPacket(packet)
} else if (outStore.destroy) {
outStore.destroy()
}
}

// start flowing
storeDeliver()
})
// True if connection is first time.
this.firstConnection = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make this _firstConnection? this should ideally be private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I committed this. Could you please review?


// Mark disconnected on stream close
this.on('close', function () {
this.connected = false
clearTimeout(this.connackTimer)
})

// Setup ping timer
this.on('connect', this._setupPingTimer)

// Send queued packets
this.on('connect', function () {
var queue = this.queue
Expand Down Expand Up @@ -225,23 +166,6 @@ function MqttClient (streamBuilder, options) {
deliver()
})

var firstConnection = true
// resubscribe
this.on('connect', function () {
if (!firstConnection &&
this.options.clean &&
Object.keys(this._resubscribeTopics).length > 0) {
if (this.options.resubscribe) {
this._resubscribeTopics.resubscribe = true
this.subscribe(this._resubscribeTopics)
} else {
this._resubscribeTopics = {}
}
}

firstConnection = false
})

// Clear ping timer
this.on('close', function () {
if (that.pingTimer !== null) {
Expand Down Expand Up @@ -915,7 +839,7 @@ MqttClient.prototype._handleConnack = function (packet) {

if (rc === 0) {
this.reconnecting = false
this.emit('connect', packet)
this._onConnect(packet)
} else if (rc > 0) {
var err = new Error('Connection refused: ' + errors[rc])
err.code = rc
Expand Down Expand Up @@ -1117,4 +1041,93 @@ MqttClient.prototype.getLastMessageId = function () {
return (this.nextId === 1) ? 65535 : (this.nextId - 1)
}

/**
* _resubscribe
* @api private
*/
MqttClient.prototype._resubscribe = function () {
if (!this.firstConnection &&
this.options.clean &&
Object.keys(this._resubscribeTopics).length > 0) {
if (this.options.resubscribe) {
this._resubscribeTopics.resubscribe = true
this.subscribe(this._resubscribeTopics)
} else {
this._resubscribeTopics = {}
}
}

this.firstConnection = false
}

/**
* _onConnect
*
* @api private
*/
MqttClient.prototype._onConnect = function (packet) {
if (this.disconnected) {
this.emit('connect', packet)
return
}

var that = this

this._setupPingTimer()
this._resubscribe()

this.connected = true
var outStore = this.outgoingStore.createStream()

this.once('close', remove)
outStore.on('end', function () {
that.removeListener('close', remove)
that.emit('connect', packet)
})
outStore.on('error', function (err) {
that.removeListener('close', remove)
that.emit('error', err)
})

function remove () {
outStore.destroy()
outStore = null
}

function storeDeliver () {
// edge case, we wrapped this twice
if (!outStore) {
return
}

var packet = outStore.read(1)
var cb

if (!packet) {
// read when data is available in the future
outStore.once('readable', storeDeliver)
return
}

// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId]
that.outgoing[packet.messageId] = function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}

storeDeliver()
}
that._sendPacket(packet)
} else if (outStore.destroy) {
outStore.destroy()
}
}

// start flowing
storeDeliver()
}

module.exports = MqttClient
67 changes: 67 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,73 @@ module.exports = function (server, config) {
})
})
})

it('should keep message order', function (done) {
var publishCount = 0
var reconnect = false
var client = {}
var incomingStore = new mqtt.Store({ clean: false })
var outgoingStore = new mqtt.Store({ clean: false })
var server2 = new Server(function (c) {
// errors are not interesting for this test
// but they might happen on some platforms
c.on('error', function () {})

c.on('connect', function (packet) {
c.connack({returnCode: 0})
})
c.on('publish', function (packet) {
c.puback({messageId: packet.messageId})
if (reconnect) {
switch (publishCount++) {
case 0:
packet.payload.toString().should.equal('payload1')
break
case 1:
packet.payload.toString().should.equal('payload2')
break
case 2:
packet.payload.toString().should.equal('payload3')
server2.close()
done()
break
}
}
})
})

server2.listen(port + 50, function () {
client = mqtt.connect({
port: port + 50,
host: 'localhost',
clean: false,
clientId: 'cid1',
reconnectPeriod: 0,
incomingStore: incomingStore,
outgoingStore: outgoingStore
})

client.on('connect', function () {
if (!reconnect) {
client.publish('topic', 'payload1', {qos: 1})
client.publish('topic', 'payload2', {qos: 1})
client.end(true)
} else {
client.publish('topic', 'payload3', {qos: 1})
}
})
client.on('close', function () {
if (!reconnect) {
client.reconnect({
clean: false,
incomingStore: incomingStore,
outgoingStore: outgoingStore
})
reconnect = true
}
})
})
})
})

describe('unsubscribing', function () {
Expand Down