Component = require "component"
http = require 'http-message'
util = require 'util'
express = require 'express'
bodyParser = require 'body-parser'
path = require 'path'
q = require 'q'
url = require 'url'
class ModuleFE extends Component
# Constructor
# Same parameters as Component.
#
constructor: (@runtime, @role, @iid, @incnum, @localData, @resources, @parameters, @requires, @provides) ->
@runtime.setLogger [ModuleFE]
super
@optServiceChannel = null
@jobsApiChannel = null
@jobNotificationServiceChannel = null
@taskExecutorChannel = null
#Front End specific properties
@app = null
@router = null
@path = '/opc'
@logger.info "constructor completed"
setupChannels: () =>
@logger.info "setupChannels()"
@logger.info 'provides : ', Object.keys(@provides)
@logger.info 'requires: ', Object.keys(@requires)
@optServiceChannel = @provides['optServiceChannel']
@logger.info "setupChannels initialized #{util.inspect(@optServiceChannel)}"
@jobsApiChannel = @requires['jobsApi']
@taskApiChannel = @requires['taskApi']
@jobNotificationChannel = @requires['jobNotificationReceive']
@jobNotificationChannel.on 'message', @onJobNotification
cfg = if @optServiceChannel.config? then @optServiceChannel.config else {}
cfg.timeout = 500000
@optServiceChannel.setConfig cfg
cfg = if @jobsApiChannel.config? then @jobsApiChannel.config else {}
cfg.timeout = 240000
@jobsApiChannel.setConfig cfg
cfg = if @taskApiChannel.config? then @taskApiChannel.config else {}
cfg.timeout = 240000
@taskApiChannel.setConfig cfg
@logger.info "setupChannels() completed"
###
Starts the execution
###
run: () =>
@logger.info "run()"
super
@setupChannels()
@initRouter()
@startExpress()
#@startHttpServer()
@logger.info "run() completed"
###
Start httpServer
###
#startHttpServer: () =>
# @logger.info 'startHttpServer'
# @httpServer = http.createServer()
# @httpServer.on 'request', @onHttpRequest
# @httpServer.on 'error', @onHttpError
# @httpServer.listen @optServiceChannel
###
Receive and process new http requests
###
#onHttpRequest: (request, response) =>
# pathname = url.parse(request.url).pathname
# if (pathname is "/opc")
# postData = ""
#
# request.on "data", (chunk) =>
# @logger.info "onHttpRequest onData"
# postData = postData + chunk
#
# request.on "end", () =>
# message = JSON.parse postData
# @logger.info "onHttpRequest onEnd. Data = #{message}"
#
# @handleRequest(message, response)
#
# else
# msg =
# success: false
# message: "Invalid resource #{pathname}"
# @sendResponse(response, msg, 400)
handleRequest: (msg, res) =>
@logger.info "handleRequest() OK"
msg.success = true
@sendResponse(res, msg)
###
Event listener for HTTP server "error" event.
###
onHttpError: (error) =>
@logger.info "Cfe:onHttpServerError"
unless (error.syscall is 'listen') then throw error
switch error.code
when "EACCES"
@logger.info 'Cfe:onHttpServerError: port requires elevated \
privileges'
when "EADDRINUSE"
@logger.info 'Cfe:onHttpServerError: port is already in use'
throw error
###
Configure Router
###
initRouter: () =>
@logger.info "initRouter()"
@router = express.Router()
@router.get '/jobsApi', @jobsApi
@router.get '/taskApi', @taskApi
@router.get '/ping', @ping
###@router.get '/api', @getApi###
@logger.info "initRouter() completed"
###
Check if component is up
###
ping: (req, rep) =>
@logger.info "ping()"
response =
success: true
message: 'PONG'
@sendResponse rep, response
###
Returns description of the Task Executor Front-End API
###
###getApi: (req, res) =>
@logger.info 'getApi()'
message =
success: true
response: [
{
url: 'PUT/jobsApi'
description: 'Redirects call to job component'
form:
action: "action"
params:
param1: 'param1'
param2: 'param2'
param3: 'param3'
example:
action: "execute"
form:
idPlan: 'idPlan'
idAlgorithm: 'idAlgorithm'
default:
ipserver: '192.168.183.60'
user: 'user'
password: 'c2net'
db: 'MSQh2dp1hc0r0_001_STables'
}
{
url: 'PUT/taskApi'
description: 'Redirects call to job component'
form:
action: "action"
params:
param1: 'param1'
param2: 'param2'
param3: 'param3'
example:
form:
action: "action"
params:
idPlan: 'idPlan'
idAlgorithm: 'idAlgorithm'
}
{
url: 'GET/taskExecutor/ping'
action: "ping"
description: 'Checks component is alive '
}
{
url: 'GET/taskExecutor/api'
action: "api"
description: 'Returns description of the JobsEX Front-End API'
}
]
@sendResponse res, message###
###
Forwards call to Jobs
###
jobsApi: (req, rep) =>
@logger.info "jobsApi() invoked"
msgString = JSON.stringify(req.query)
@logger.info "jobsApi() msgString # #{msgString}"
@jobsApiChannel.sendRequest msgString
.then (response) =>
message = JSON.parse(response.message.toString())
@logger.info "jobsApi() completed: #{util.inspect(message)}"
@sendResponse rep, message
.fail (err) =>
@logger.error "jobsApi() error #{err.message}"
@sendError rep, err.message
###
Forwards call to Task Executor
###
taskApi: (req, rep) =>
@logger.info "taskApi() started"
msgString = JSON.stringify(req.query)
@logger.info "taskApi() msg : #{msgString}"
@taskApiChannel.sendRequest msgString
.then (response) =>
@logger.info "taskApi() response received"
#[ [ { status: 'OK' }, <Buffer 7b 22 6d ... > ], null ]
message = JSON.parse(response.message.toString())
@logger.info "taskApi() completed #{util.inspect(message)}"
@sendResponse rep, message
.fail (err) =>
@logger.error "taskApi() error #{err.message}"
@sendError rep, err.message
###
A new job notification message has been published.
###
onJobNotification: (buffer) =>
@logger.info "onJobNotification()"
message = JSON.parse(buffer)
@logger.info "onJobNotification() message received:\n #{util.inspect message}"
if message.response?.job?.result
@logger.info "onJobNotification() result: \n#{util.inspect message.response.job.result}"
###
Sends notification to Jobs
###
sendNotification: (message) =>
@logger.info "sendNotificationToJobs() #{util.inspect(message)}"
###
Configure express
###
startExpress: () =>
@logger.info 'startExpress'
@app = express()
@app.use bodyParser.json()
@app.use bodyParser.urlencoded({extended: true})
# express routes
@app.use @path, @router
@logger.info "startExpress() app in path #{@path}"
@app.use (req, res, next) ->
return res.status(404).send('URL Not Found')
@httpServer = http.createServer(@app)
@httpServer.on 'error', @_onHttpError
@httpServer.listen @optServiceChannel
@logger.info "startExpress() completed"
###
Saves the state and stops the execution.
###
shutdown: ->
super()
@logger.info "shutdown()"
###
Changes the component instance parameters
Returns: 'true' if the reconfig can be take and 'false' otherwise.
###
reconfig: (resources, parameters) ->
@logger.info "reconfig()"
result = super parameters
return result
sendError: (res, msg) =>
res.writeHead 500, {"Content-type": "application/json"}
res.write msg
res.end()
@logger.error "sendError() #{util.inspect(msg)}"
@sendResponse(res, msg, 500)
sendResponse: (res, msg) =>
res.writeHead 200, {"Content-type": "application/json"}
res.write JSON.stringify msg
res.end()
@logger.info "sendResponse() #{util.inspect(msg)}"
###
Event listener for HTTP server "error" event.
###
_onHttpError: (error) =>
@logger.error 'Cfe:onHttpError'
throw error unless (error.syscall is 'listen')
switch error.code
when 'EACCES'
@logger.error 'Cfe:onHttpError: requires elevated privileges'
when 'EADDRINUSE'
@logger.error 'Cfe:onHttpError: address is already in use'
else
@logger.error "Cfe:onHttpError: code = #{error.code}"
throw error
module.exports = ModuleFE
## Patch for unit-tests: use ports instead of channels
module.exports.useNativeHttp = () -> http = require 'http'