Skip to content

Commit

Permalink
Read port metadata from component when available
Browse files Browse the repository at this point in the history
  • Loading branch information
bergie committed Aug 23, 2017
1 parent 8c86f2d commit 4b31fb9
Showing 1 changed file with 54 additions and 40 deletions.
94 changes: 54 additions & 40 deletions src/protocol/Runtime.coffee
@@ -1,9 +1,8 @@
noflo = require 'noflo'
EventEmitter = require('events').EventEmitter

sendToInport = (component, portName, event, payload) ->
sendToInport = (port, event, payload) ->
socket = noflo.internalSocket.createSocket()
port = component.inPorts[portName]
port.attach socket
switch event
when 'connect' then socket.connect()
Expand All @@ -13,54 +12,72 @@ sendToInport = (component, portName, event, payload) ->
when 'data' then socket.send payload
port.detach socket

portsPayload = (name, graph) ->
inports = []
outports = []
if graph
for pub, internal of graph.inports
inports.push
id: pub
type: 'any' # TODO: lookup on internal
description: internal.metadata?.description
addressable: false
required: false
for pub, internal of graph.outports
outports.push
id: pub
type: 'any' # TODO: lookup on internal
description: internal.metadata?.description
addressable: false
required: false
findPort = (network, name, inPort) ->
return unless network.graph
if inPort
internal = network.graph.inports[name]
else
internal = network.graph.outports[name]
return unless internal?.process
component = network.getNode(internal.process)?.component
return unless component
return component.inPorts[name] if inPort
return component.outPorts[name]

portToPayload = (pub, internal, network, inPort) ->
def =
id: pub
type: 'all'
description: internal.metadata?.description
addressable: false
required: false
port = findPort network, pub, inPort
# Network has been prepared but isn't running yet so
# we don't have full component info
return def unless port
def.type = port.getDataType()
def.description = internal.metadata?.description or port.getDescription()
def.addressable = port.isAddressable()
def.required = port.isRequired()
return def

portsPayload = (name, network) ->
payload =
graph: name
inPorts: inports
outPorts: outports
inPorts: []
outPorts: []
return payload unless network?.graph
for pub, internal of network.graph.inports
payload.inPorts.push portToPayload pub, internal, network, true
for pub, internal of network.graph.outports
payload.outPorts.push portToPayload pub, internal, network, false
return payload

class RuntimeProtocol extends EventEmitter
constructor: (@transport) ->
@outputSockets = {} # graphName -> publicPort -> noflo.Socket
@mainGraph = null

@transport.network.on 'addnetwork', (network, name) =>
@subscribeExportedPorts name, network.graph, true
@subscribeExportedPorts name, network, true
@subscribeOutPorts name, network
@sendPorts name, network.graph
@sendPorts name, network

if network.isStarted()
# processes don't exist until started
@emit 'ports', portsPayload name, network.graph
@emit 'ports', portsPayload name, network
@subscribeOutdata name, network, true
network.on 'start', () =>
# processes don't exist until started
@emit 'ports', portsPayload name, network.graph
@emit 'ports', portsPayload name, network
@subscribeOutdata name, network, true

@transport.network.on 'removenetwork', (network, name) =>
@subscribeOutdata name, network, false
@subscribeOutPorts name, network
@subscribeExportedPorts name, network.graph, false
@sendPorts name, null
@emit 'ports', portsPayload name, network.graph
@emit 'ports', portsPayload name, network

send: (topic, payload, context) ->
@transport.send 'runtime', topic, payload, context
Expand Down Expand Up @@ -122,10 +139,10 @@ class RuntimeProtocol extends EventEmitter
@send 'runtime', payload, context
# send port info about currently set up networks
for name, network of @transport.network.networks
@sendPorts name, network.graph, context
@sendPorts name, network, context

sendPorts: (name, graph, context) ->
payload = portsPayload name, graph
sendPorts: (name, network, context) ->
payload = portsPayload name, network
if not context
@sendAll 'ports', payload
else
Expand All @@ -135,9 +152,9 @@ class RuntimeProtocol extends EventEmitter
@mainGraph = id
# XXX: should send updated runtime info?

subscribeExportedPorts: (name, graph, add) ->
subscribeExportedPorts: (name, network, add) ->
sendExportedPorts = () =>
@sendPorts name, graph
@sendPorts name, network

dependencies = [
'addInport'
Expand All @@ -146,11 +163,11 @@ class RuntimeProtocol extends EventEmitter
'removeOutport'
]
for d in dependencies
graph.removeListener d, sendExportedPorts
network.graph.removeListener d, sendExportedPorts

if add
for d in dependencies
graph.on d, sendExportedPorts
network.graph.on d, sendExportedPorts

subscribeOutPorts: (name, network, add) ->
portRemoved = () =>
Expand Down Expand Up @@ -208,13 +225,10 @@ class RuntimeProtocol extends EventEmitter
socket.on event, sendFunc event

receivePacket: (payload, callback) ->
graph = @transport.graph.graphs[payload.graph]
network = @transport.network.networks[payload.graph]
return callback new Error "Cannot find network for graph #{payload.graph}" if not network

internal = graph.inports[payload.port]
component = network.network.getNode(internal?.process)?.component
return callback new Error "Cannot find internal port for #{payload.port}" if not (internal and component)
sendToInport component, internal.port, payload.event, payload.payload
port = findPort network.network, payload.port, true
return callback new Error "Cannot find internal port for #{payload.port}" if not port
sendToInport port, payload.event, payload.payload

module.exports = RuntimeProtocol

0 comments on commit 4b31fb9

Please sign in to comment.