Skip to content

Commit

Permalink
Merge pull request #92 from noflo/ports_api
Browse files Browse the repository at this point in the history
Make runtime ports/packets accessible in JS
  • Loading branch information
jonnor committed Aug 24, 2017
2 parents c4a13b0 + 4698c19 commit 3f71486
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 45 deletions.
32 changes: 29 additions & 3 deletions spec/Runtime.coffee
Expand Up @@ -49,6 +49,7 @@ describe 'Runtime protocol', ->
client.send 'runtime', 'getruntime',
secret: 'super-secret'
describe 'with a graph containing exported ports', ->
ports = null
before ->
runtime = new direct.Runtime
permissions:
Expand All @@ -61,6 +62,7 @@ describe 'Runtime protocol', ->
client = null
runtime = null
runtime = new direct.Runtime
ports = null
it 'should be possible to upload graph', (done) ->
client.on 'error', (err) ->
done err
Expand Down Expand Up @@ -99,17 +101,20 @@ describe 'Runtime protocol', ->
graph: 'bar'
secret: 'second-secret'
client.send 'graph', 'addoutport',
public: 'in'
node: 'Hello'
public: 'out'
node: 'World'
port: 'out'
graph: 'bar'
secret: 'second-secret'
it 'should be possible to start the network', (done) ->
client.on 'error', (err) ->
done err
client.on 'message', (msg) ->
return unless msg.protocol is 'network'
return unless msg.command is 'started'
done()
runtime.runtime.on 'ports', (emittedPorts) ->
ports = emittedPorts
client.send 'network', 'start',
graph: 'bar'
secret: 'second-secret'
Expand All @@ -118,15 +123,36 @@ describe 'Runtime protocol', ->
hello: 'World'
client.on 'error', (err) ->
done err
client.on 'message', (msg) ->
messageListener = (msg) ->
return unless msg.protocol is 'runtime'
return unless msg.command is 'packet'
return unless msg.payload.port is 'out'
return unless msg.payload.event is 'data'
chai.expect(msg.payload.payload).to.eql payload
client.removeListener 'message', messageListener
done()
client.on 'message', messageListener
client.send 'runtime', 'packet',
graph: 'bar'
port: 'in'
event: 'data'
payload: payload
secret: 'second-secret'
it 'should have emitted ports via JS API', ->
chai.expect(ports.inPorts.length).to.equal 1
chai.expect(ports.outPorts.length).to.equal 1
it 'packets sent via JS API to IN should be received at OUT', (done) ->
payload =
hello: 'JavaScript'
runtime.runtime.on 'packet', (msg) ->
return unless msg.event is 'data'
chai.expect(msg.payload).to.eql payload
done()
runtime.runtime.sendPacket
graph: 'bar'
port: 'in'
event: 'data'
payload: payload
secret: 'second-secret'
, (err) ->
return done err if err
108 changes: 66 additions & 42 deletions src/protocol/Runtime.coffee
@@ -1,8 +1,8 @@
noflo = require 'noflo'
EventEmitter = require('events').EventEmitter

sendToInport = (component, portName, event, payload) ->
sendToInport = (port, event, payload) ->
socket = noflo.internalSocket.createSocket()
port = component.inPorts[portName]
port.attach socket
switch event
when 'connect' then socket.connect()
Expand All @@ -12,38 +12,56 @@ sendToInport = (component, portName, event, payload) ->
when 'data' then socket.send payload
port.detach socket

portsPayload = (name, graph) ->
inports = []
outports = []
if graph
for pub, internal of graph.inports
inports.push
id: pub
type: 'any' # TODO: lookup on internal
description: internal.metadata?.description
addressable: false
required: false
for pub, internal of graph.outports
outports.push
id: pub
type: 'any' # TODO: lookup on internal
description: internal.metadata?.description
addressable: false
required: false
findPort = (network, name, inPort) ->
return unless network.graph
if inPort
internal = network.graph.inports[name]
else
internal = network.graph.outports[name]
return unless internal?.process
component = network.getNode(internal.process)?.component
return unless component
return component.inPorts[internal.port] if inPort
return component.outPorts[internal.port]

portToPayload = (pub, internal, network, inPort) ->
def =
id: pub
type: 'all'
description: internal.metadata?.description
addressable: false
required: false
port = findPort network, pub, inPort
# Network has been prepared but isn't running yet so
# we don't have full component info
return def unless port
def.type = port.getDataType()
def.description = internal.metadata?.description or port.getDescription()
def.addressable = port.isAddressable()
def.required = port.isRequired()
return def

portsPayload = (name, network) ->
payload =
graph: name
inPorts: inports
outPorts: outports

class RuntimeProtocol
inPorts: []
outPorts: []
return payload unless network?.graph
for pub, internal of network.graph.inports
payload.inPorts.push portToPayload pub, internal, network, true
for pub, internal of network.graph.outports
payload.outPorts.push portToPayload pub, internal, network, false
return payload

class RuntimeProtocol extends EventEmitter
constructor: (@transport) ->
@outputSockets = {} # graphName -> publicPort -> noflo.Socket
@mainGraph = null

@transport.network.on 'addnetwork', (network, name) =>
@subscribeExportedPorts name, network.graph, true
@subscribeExportedPorts name, network, true
@subscribeOutPorts name, network
@sendPorts name, network.graph
@sendPorts name, network

if network.isStarted()
# processes don't exist until started
Expand Down Expand Up @@ -74,7 +92,10 @@ class RuntimeProtocol

switch topic
when 'getruntime' then @getRuntime payload, context
when 'packet' then @receivePacket payload, context
when 'packet' then @sendPacket payload, (err) =>
if err
@sendError err.message, context
return

getRuntime: (payload, context) ->
type = @transport.options.type
Expand Down Expand Up @@ -115,10 +136,11 @@ class RuntimeProtocol
@send 'runtime', payload, context
# send port info about currently set up networks
for name, network of @transport.network.networks
@sendPorts name, network.graph, context
@sendPorts name, network, context

sendPorts: (name, graph, context) ->
payload = portsPayload name, graph
sendPorts: (name, network, context) ->
payload = portsPayload name, network
@emit 'ports', payload
if not context
@sendAll 'ports', payload
else
Expand All @@ -128,9 +150,9 @@ class RuntimeProtocol
@mainGraph = id
# XXX: should send updated runtime info?

subscribeExportedPorts: (name, graph, add) ->
subscribeExportedPorts: (name, network, add) ->
sendExportedPorts = () =>
@sendPorts name, graph
@sendPorts name, network

dependencies = [
'addInport'
Expand All @@ -139,11 +161,11 @@ class RuntimeProtocol
'removeOutport'
]
for d in dependencies
graph.removeListener d, sendExportedPorts
network.graph.removeListener d, sendExportedPorts

if add
for d in dependencies
graph.on d, sendExportedPorts
network.graph.on d, sendExportedPorts

subscribeOutPorts: (name, network, add) ->
portRemoved = () =>
Expand Down Expand Up @@ -187,6 +209,11 @@ class RuntimeProtocol
component.outPorts[internal.port].attach socket
sendFunc = (event) =>
(payload) =>
@emit 'packet',
port: pub
event: event
graph: graphName
payload: payload
@sendAll 'packet',
port: pub
event: event
Expand All @@ -195,14 +222,11 @@ class RuntimeProtocol
for event in events
socket.on event, sendFunc event

receivePacket: (payload, context) ->
graph = @transport.graph.graphs[payload.graph]
sendPacket: (payload, callback) ->
network = @transport.network.networks[payload.graph]
return @sendError "Cannot find network for graph #{payload.graph}", context if not network

internal = graph.inports[payload.port]
component = network.network.getNode(internal?.process)?.component
return @sendError "Cannot find internal port for #{payload.port}", context if not (internal and component)
sendToInport component, internal.port, payload.event, payload.payload
return callback new Error "Cannot find network for graph #{payload.graph}" if not network
port = findPort network.network, payload.port, true
return callback new Error "Cannot find internal port for #{payload.port}" if not port
sendToInport port, payload.event, payload.payload

module.exports = RuntimeProtocol

0 comments on commit 3f71486

Please sign in to comment.