Skip to content

Commit

Permalink
fix bug in weixin min program and add support to ali min program (#898)
Browse files Browse the repository at this point in the history
* add support for Ali Mini Program

* remove test for wx and ali Mini Program

* refactor lib/connect/ali.js

* rewrite wx.js

* fix wx.js

* remove unuse console, and fix connect/index.js for alis

* revert ws.js

* fix ali.js

* remove weapp-test script in package.json

* fix README.md

* fix ali.js to adapt IDE
  • Loading branch information
SyMind authored and mcollina committed Dec 14, 2018
1 parent 1684819 commit e6b5f7d
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 172 deletions.
26 changes: 22 additions & 4 deletions README.md
Expand Up @@ -525,22 +525,40 @@ at https://unpkg.com/mqtt/dist/mqtt.min.js.
See http://unpkg.com for the full documentation on version ranges.

<a name="weapp"></a>
## Weixin App
Surport [Weixin App](https://mp.weixin.qq.com/). See [Doc](https://mp.weixin.qq.com/debug/wxadoc/dev/api/network-socket.html).
## WeChat Mini Program
Surport [WeChat Mini Program](https://mp.weixin.qq.com/). See [Doc](https://mp.weixin.qq.com/debug/wxadoc/dev/api/network-socket.html).
<a name="example"></a>

## Example(js)

```js
var mqtt = require('mqtt')
var client = mqtt.connect('wxs://test.mosquitto.org')
var client = mqtt.connect('wxs://test.mosquitto.org')
```

## Example(ts)

```ts
import { connect } from 'mqtt';
const client = connect('wxs://test.mosquitto.org');
const client = connect('wxs://test.mosquitto.org');
```

## Ali Mini Program
Surport [Ali Mini Program](https://open.alipay.com/channel/miniIndex.htm). See [Doc](https://docs.alipay.com/mini/developer/getting-started).
<a name="example"></a>

## Example(js)

```js
var mqtt = require('mqtt')
var client = mqtt.connect('alis://test.mosquitto.org')
```

## Example(ts)

```ts
import { connect } from 'mqtt';
const client = connect('alis://test.mosquitto.org');
```

<a name="browserify"></a>
Expand Down
130 changes: 130 additions & 0 deletions lib/connect/ali.js
@@ -0,0 +1,130 @@
'use strict'

var Transform = require('readable-stream').Transform
var duplexify = require('duplexify')
var base64 = require('base64-js')

/* global FileReader */
var my
var proxy
var stream
var isInitialized = false

function buildProxy () {
var proxy = new Transform()
proxy._write = function (chunk, encoding, next) {
my.sendSocketMessage({
data: chunk,
success: function () {
next()
},
fail: function () {
next(new Error())
}
})
}
proxy._flush = function socketEnd (done) {
my.closeSocket({
success: function () {
done()
}
})
}

return proxy
}

function setDefaultOpts (opts) {
if (!opts.hostname) {
opts.hostname = 'localhost'
}
if (!opts.path) {
opts.path = '/'
}

if (!opts.wsOptions) {
opts.wsOptions = {}
}
}

function buildUrl (opts, client) {
var protocol = opts.protocol === 'alis' ? 'wss' : 'ws'
var url = protocol + '://' + opts.hostname + opts.path
if (opts.port && opts.port !== 80 && opts.port !== 443) {
url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path
}
if (typeof (opts.transformWsUrl) === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
}

function bindEventHandler () {
if (isInitialized) return

isInitialized = true

my.onSocketOpen(function () {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
})

my.onSocketMessage(function (res) {
if (typeof res.data === 'string') {
var array = base64.toByteArray(res.data)
var buffer = Buffer.from(array)
proxy.push(buffer)
} else {
var reader = new FileReader()
reader.addEventListener('load', function () {
var data = reader.result

if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
})
reader.readAsArrayBuffer(res.data)
}
})

my.onSocketClose(function () {
stream.end()
stream.destroy()
})

my.onSocketError(function (res) {
stream.destroy(res)
})
}

function buildStream (client, opts) {
opts.hostname = opts.hostname || opts.host

if (!opts.hostname) {
throw new Error('Could not determine host. Specify host manually.')
}

var websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'

setDefaultOpts(opts)

var url = buildUrl(opts, client)
my = opts.my
my.connectSocket({
url: url,
protocols: websocketSubProtocol
})

proxy = buildProxy()
stream = duplexify.obj()

bindEventHandler()

return stream
}

module.exports = buildStream
12 changes: 10 additions & 2 deletions lib/connect/index.js
Expand Up @@ -15,6 +15,9 @@ if (process.title !== 'browser') {
} else {
protocols.wx = require('./wx')
protocols.wxs = require('./wx')

protocols.ali = require('./ali')
protocols.alis = require('./ali')
}

protocols.ws = require('./ws')
Expand Down Expand Up @@ -76,7 +79,7 @@ function connect (brokerUrl, opts) {

if (opts.cert && opts.key) {
if (opts.protocol) {
if (['mqtts', 'wss', 'wxs'].indexOf(opts.protocol) === -1) {
if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) {
switch (opts.protocol) {
case 'mqtt':
opts.protocol = 'mqtts'
Expand All @@ -87,6 +90,9 @@ function connect (brokerUrl, opts) {
case 'wx':
opts.protocol = 'wxs'
break
case 'ali':
opts.protocol = 'alis'
break
default:
throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!')
}
Expand All @@ -105,7 +111,9 @@ function connect (brokerUrl, opts) {
'ws',
'wss',
'wx',
'wxs'
'wxs',
'ali',
'alis'
].filter(function (key, index) {
if (isSecure && index % 2 === 0) {
// Skip insecure protocols when requesting a secure one.
Expand Down
147 changes: 75 additions & 72 deletions lib/connect/wx.js
@@ -1,66 +1,50 @@
'use strict'

/* global wx */
var socketOpen = false
var socketMsgQueue = []
var Transform = require('readable-stream').Transform
var duplexify = require('duplexify')

function sendSocketMessage (msg) {
if (socketOpen) {
wx.sendSocketMessage({
data: msg.buffer || msg
/* global wx */
var socketTask
var proxy
var stream

function buildProxy () {
var proxy = new Transform()
proxy._write = function (chunk, encoding, next) {
socketTask.send({
data: chunk,
success: function () {
next()
},
fail: function (errMsg) {
next(new Error(errMsg))
}
})
}
proxy._flush = function socketEnd (done) {
socketTask.close({
success: function () {
done()
}
})
} else {
socketMsgQueue.push(msg)
}

return proxy
}

function WebSocket (url, protocols) {
var ws = {
OPEN: 1,
CLOSING: 2,
CLOSED: 3,
readyState: socketOpen ? 1 : 0,
send: sendSocketMessage,
close: wx.closeSocket,
onopen: null,
onmessage: null,
onclose: null,
onerror: null
function setDefaultOpts (opts) {
if (!opts.hostname) {
opts.hostname = 'localhost'
}
if (!opts.path) {
opts.path = '/'
}

wx.connectSocket({
url: url,
protocols: protocols
})
wx.onSocketOpen(function (res) {
ws.readyState = ws.OPEN
socketOpen = true
for (var i = 0; i < socketMsgQueue.length; i++) {
sendSocketMessage(socketMsgQueue[i])
}
socketMsgQueue = []

ws.onopen && ws.onopen.apply(ws, arguments)
})
wx.onSocketMessage(function (res) {
ws.onmessage && ws.onmessage.apply(ws, arguments)
})
wx.onSocketClose(function () {
ws.onclose && ws.onclose.apply(ws, arguments)
ws.readyState = ws.CLOSED
socketOpen = false
})
wx.onSocketError(function () {
ws.onerror && ws.onerror.apply(ws, arguments)
ws.readyState = ws.CLOSED
socketOpen = false
})

return ws
if (!opts.wsOptions) {
opts.wsOptions = {}
}
}

var websocket = require('websocket-stream')

function buildUrl (opts, client) {
var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws'
var url = protocol + '://' + opts.hostname + opts.path
Expand All @@ -73,38 +57,57 @@ function buildUrl (opts, client) {
return url
}

function setDefaultOpts (opts) {
if (!opts.hostname) {
opts.hostname = 'localhost'
}
if (!opts.path) {
opts.path = '/'
}
function bindEventHandler () {
socketTask.onOpen(function () {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
})

if (!opts.wsOptions) {
opts.wsOptions = {}
}
socketTask.onMessage(function (res) {
var data = res.data

if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
})

socketTask.onClose(function () {
stream.end()
stream.destroy()
})

socketTask.onError(function (res) {
stream.destroy(res)
})
}

function createWebSocket (client, opts) {
function buildStream (client, opts) {
opts.hostname = opts.hostname || opts.host

if (!opts.hostname) {
throw new Error('Could not determine host. Specify host manually.')
}

var websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'

setDefaultOpts(opts)

var url = buildUrl(opts, client)
return websocket(WebSocket(url, [websocketSubProtocol]))
}
socketTask = wx.connectSocket({
url: url,
protocols: websocketSubProtocol
})

function buildBuilder (client, opts) {
opts.hostname = opts.hostname || opts.host
proxy = buildProxy()
stream = duplexify.obj()

if (!opts.hostname) {
throw new Error('Could not determine host. Specify host manually.')
}
bindEventHandler()

return createWebSocket(client, opts)
return stream
}

module.exports = buildBuilder
module.exports = buildStream

0 comments on commit e6b5f7d

Please sign in to comment.