summaryrefslogtreecommitdiffstats
path: root/node_modules/worker-farm/lib
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/worker-farm/lib')
-rw-r--r--node_modules/worker-farm/lib/child/index.js56
-rw-r--r--node_modules/worker-farm/lib/farm.js348
-rw-r--r--node_modules/worker-farm/lib/fork.js33
-rw-r--r--node_modules/worker-farm/lib/index.js34
4 files changed, 0 insertions, 471 deletions
diff --git a/node_modules/worker-farm/lib/child/index.js b/node_modules/worker-farm/lib/child/index.js
deleted file mode 100644
index 78f6337..0000000
--- a/node_modules/worker-farm/lib/child/index.js
+++ /dev/null
@@ -1,56 +0,0 @@
-'use strict'
-
-let $module
-
-/*
- let contextProto = this.context;
- while (contextProto = Object.getPrototypeOf(contextProto)) {
- completionGroups.push(Object.getOwnPropertyNames(contextProto));
- }
-*/
-
-
-function handle (data) {
- let idx = data.idx
- , child = data.child
- , method = data.method
- , args = data.args
- , callback = function () {
- let _args = Array.prototype.slice.call(arguments)
- if (_args[0] instanceof Error) {
- let e = _args[0]
- _args[0] = {
- '$error' : '$error'
- , 'type' : e.constructor.name
- , 'message' : e.message
- , 'stack' : e.stack
- }
- Object.keys(e).forEach(function(key) {
- _args[0][key] = e[key]
- })
- }
- process.send({ owner: 'farm', idx: idx, child: child, args: _args })
- }
- , exec
-
- if (method == null && typeof $module == 'function')
- exec = $module
- else if (typeof $module[method] == 'function')
- exec = $module[method]
-
- if (!exec)
- return console.error('NO SUCH METHOD:', method)
-
- exec.apply(null, args.concat([ callback ]))
-}
-
-
-process.on('message', function (data) {
- if (data.owner !== 'farm') {
- return;
- }
-
- if (!$module) return $module = require(data.module)
- if (data.event == 'die') return process.exit(0)
- handle(data)
-})
diff --git a/node_modules/worker-farm/lib/farm.js b/node_modules/worker-farm/lib/farm.js
deleted file mode 100644
index 60720dc..0000000
--- a/node_modules/worker-farm/lib/farm.js
+++ /dev/null
@@ -1,348 +0,0 @@
-'use strict'
-
-const DEFAULT_OPTIONS = {
- workerOptions : {}
- , maxCallsPerWorker : Infinity
- , maxConcurrentWorkers : (require('os').cpus() || { length: 1 }).length
- , maxConcurrentCallsPerWorker : 10
- , maxConcurrentCalls : Infinity
- , maxCallTime : Infinity // exceed this and the whole worker is terminated
- , maxRetries : Infinity
- , forcedKillTime : 100
- , autoStart : false
- , onChild : function() {}
- }
-
-const fork = require('./fork')
- , TimeoutError = require('errno').create('TimeoutError')
- , ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
- , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
-
-
-function Farm (options, path) {
- this.options = Object.assign({}, DEFAULT_OPTIONS, options)
- this.path = path
- this.activeCalls = 0
-}
-
-
-// make a handle to pass back in the form of an external API
-Farm.prototype.mkhandle = function (method) {
- return function () {
- let args = Array.prototype.slice.call(arguments)
- if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) {
- let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')')
- if (typeof args[args.length - 1] == 'function')
- return process.nextTick(args[args.length - 1].bind(null, err))
- throw err
- }
- this.addCall({
- method : method
- , callback : args.pop()
- , args : args
- , retries : 0
- })
- }.bind(this)
-}
-
-
-// a constructor of sorts
-Farm.prototype.setup = function (methods) {
- let iface
- if (!methods) { // single-function export
- iface = this.mkhandle()
- } else { // multiple functions on the export
- iface = {}
- methods.forEach(function (m) {
- iface[m] = this.mkhandle(m)
- }.bind(this))
- }
-
- this.searchStart = -1
- this.childId = -1
- this.children = {}
- this.activeChildren = 0
- this.callQueue = []
-
- if (this.options.autoStart) {
- while (this.activeChildren < this.options.maxConcurrentWorkers)
- this.startChild()
- }
-
- return iface
-}
-
-
-// when a child exits, check if there are any outstanding jobs and requeue them
-Farm.prototype.onExit = function (childId) {
- // delay this to give any sends a chance to finish
- setTimeout(function () {
- let doQueue = false
- if (this.children[childId] && this.children[childId].activeCalls) {
- this.children[childId].calls.forEach(function (call, i) {
- if (!call) return
- else if (call.retries >= this.options.maxRetries) {
- this.receive({
- idx : i
- , child : childId
- , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
- })
- } else {
- call.retries++
- this.callQueue.unshift(call)
- doQueue = true
- }
- }.bind(this))
- }
- this.stopChild(childId)
- doQueue && this.processQueue()
- }.bind(this), 10)
-}
-
-
-// start a new worker
-Farm.prototype.startChild = function () {
- this.childId++
-
- let forked = fork(this.path, this.options.workerOptions)
- , id = this.childId
- , c = {
- send : forked.send
- , child : forked.child
- , calls : []
- , activeCalls : 0
- , exitCode : null
- }
-
- this.options.onChild(forked.child);
-
- forked.child.on('message', function(data) {
- if (data.owner !== 'farm') {
- return;
- }
- this.receive(data);
- }.bind(this))
- forked.child.once('exit', function (code) {
- c.exitCode = code
- this.onExit(id)
- }.bind(this))
-
- this.activeChildren++
- this.children[id] = c
-}
-
-
-// stop a worker, identified by id
-Farm.prototype.stopChild = function (childId) {
- let child = this.children[childId]
- if (child) {
- child.send({owner: 'farm', event: 'die'})
- setTimeout(function () {
- if (child.exitCode === null)
- child.child.kill('SIGKILL')
- }, this.options.forcedKillTime).unref()
- ;delete this.children[childId]
- this.activeChildren--
- }
-}
-
-
-// called from a child process, the data contains information needed to
-// look up the child and the original call so we can invoke the callback
-Farm.prototype.receive = function (data) {
- let idx = data.idx
- , childId = data.child
- , args = data.args
- , child = this.children[childId]
- , call
-
- if (!child) {
- return console.error(
- 'Worker Farm: Received message for unknown child. '
- + 'This is likely as a result of premature child death, '
- + 'the operation will have been re-queued.'
- )
- }
-
- call = child.calls[idx]
- if (!call) {
- return console.error(
- 'Worker Farm: Received message for unknown index for existing child. '
- + 'This should not happen!'
- )
- }
-
- if (this.options.maxCallTime !== Infinity)
- clearTimeout(call.timer)
-
- if (args[0] && args[0].$error == '$error') {
- let e = args[0]
- switch (e.type) {
- case 'TypeError': args[0] = new TypeError(e.message); break
- case 'RangeError': args[0] = new RangeError(e.message); break
- case 'EvalError': args[0] = new EvalError(e.message); break
- case 'ReferenceError': args[0] = new ReferenceError(e.message); break
- case 'SyntaxError': args[0] = new SyntaxError(e.message); break
- case 'URIError': args[0] = new URIError(e.message); break
- default: args[0] = new Error(e.message)
- }
- args[0].type = e.type
- args[0].stack = e.stack
-
- // Copy any custom properties to pass it on.
- Object.keys(e).forEach(function(key) {
- args[0][key] = e[key];
- });
- }
-
- process.nextTick(function () {
- call.callback.apply(null, args)
- })
-
- ;delete child.calls[idx]
- child.activeCalls--
- this.activeCalls--
-
- if (child.calls.length >= this.options.maxCallsPerWorker
- && !Object.keys(child.calls).length) {
- // this child has finished its run, kill it
- this.stopChild(childId)
- }
-
- // allow any outstanding calls to be processed
- this.processQueue()
-}
-
-
-Farm.prototype.childTimeout = function (childId) {
- let child = this.children[childId]
- , i
-
- if (!child)
- return
-
- for (i in child.calls) {
- this.receive({
- idx : i
- , child : childId
- , args : [ new TimeoutError('worker call timed out!') ]
- })
- }
- this.stopChild(childId)
-}
-
-
-// send a call to a worker, identified by id
-Farm.prototype.send = function (childId, call) {
- let child = this.children[childId]
- , idx = child.calls.length
-
- child.calls.push(call)
- child.activeCalls++
- this.activeCalls++
-
- child.send({
- owner : 'farm'
- , idx : idx
- , child : childId
- , method : call.method
- , args : call.args
- })
-
- if (this.options.maxCallTime !== Infinity) {
- call.timer =
- setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
- }
-}
-
-
-// a list of active worker ids, in order, but the starting offset is
-// shifted each time this method is called, so we work our way through
-// all workers when handing out jobs
-Farm.prototype.childKeys = function () {
- let cka = Object.keys(this.children)
- , cks
-
- if (this.searchStart >= cka.length - 1)
- this.searchStart = 0
- else
- this.searchStart++
-
- cks = cka.splice(0, this.searchStart)
-
- return cka.concat(cks)
-}
-
-
-// Calls are added to a queue, this processes the queue and is called
-// whenever there might be a chance to send more calls to the workers.
-// The various options all impact on when we're able to send calls,
-// they may need to be kept in a queue until a worker is ready.
-Farm.prototype.processQueue = function () {
- let cka, i = 0, childId
-
- if (!this.callQueue.length)
- return this.ending && this.end()
-
- if (this.activeChildren < this.options.maxConcurrentWorkers)
- this.startChild()
-
- for (cka = this.childKeys(); i < cka.length; i++) {
- childId = +cka[i]
- if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
- && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
-
- this.send(childId, this.callQueue.shift())
- if (!this.callQueue.length)
- return this.ending && this.end()
- } /*else {
- console.log(
- , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
- , this.children[childId].calls.length < this.options.maxCallsPerWorker
- , this.children[childId].calls.length , this.options.maxCallsPerWorker)
- }*/
- }
-
- if (this.ending)
- this.end()
-}
-
-
-// add a new call to the call queue, then trigger a process of the queue
-Farm.prototype.addCall = function (call) {
- if (this.ending)
- return this.end() // don't add anything new to the queue
- this.callQueue.push(call)
- this.processQueue()
-}
-
-
-// kills child workers when they're all done
-Farm.prototype.end = function (callback) {
- let complete = true
- if (this.ending === false)
- return
- if (callback)
- this.ending = callback
- else if (this.ending == null)
- this.ending = true
- Object.keys(this.children).forEach(function (child) {
- if (!this.children[child])
- return
- if (!this.children[child].activeCalls)
- this.stopChild(child)
- else
- complete = false
- }.bind(this))
-
- if (complete && typeof this.ending == 'function') {
- process.nextTick(function () {
- this.ending()
- this.ending = false
- }.bind(this))
- }
-}
-
-
-module.exports = Farm
-module.exports.TimeoutError = TimeoutError
diff --git a/node_modules/worker-farm/lib/fork.js b/node_modules/worker-farm/lib/fork.js
deleted file mode 100644
index 5a035d9..0000000
--- a/node_modules/worker-farm/lib/fork.js
+++ /dev/null
@@ -1,33 +0,0 @@
-'use strict'
-
-const childProcess = require('child_process')
- , childModule = require.resolve('./child/index')
-
-
-function fork (forkModule, workerOptions) {
- // suppress --debug / --inspect flags while preserving others (like --harmony)
- let filteredArgs = process.execArgv.filter(function (v) {
- return !(/^--(debug|inspect)/).test(v)
- })
- , options = Object.assign({
- execArgv : filteredArgs
- , env : process.env
- , cwd : process.cwd()
- }, workerOptions)
- , child = childProcess.fork(childModule, process.argv, options)
-
- child.on('error', function() {
- // this *should* be picked up by onExit and the operation requeued
- })
-
- child.send({ owner: 'farm', module: forkModule })
-
- // return a send() function for this child
- return {
- send : child.send.bind(child)
- , child : child
- }
-}
-
-
-module.exports = fork
diff --git a/node_modules/worker-farm/lib/index.js b/node_modules/worker-farm/lib/index.js
deleted file mode 100644
index 8c14222..0000000
--- a/node_modules/worker-farm/lib/index.js
+++ /dev/null
@@ -1,34 +0,0 @@
-'use strict'
-
-const Farm = require('./farm')
-
-let farms = [] // keep record of farms so we can end() them if required
-
-
-function farm (options, path, methods) {
- if (typeof options == 'string') {
- methods = path
- path = options
- options = {}
- }
-
- let f = new Farm(options, path)
- , api = f.setup(methods)
-
- farms.push({ farm: f, api: api })
-
- // return the public API
- return api
-}
-
-
-function end (api, callback) {
- for (let i = 0; i < farms.length; i++)
- if (farms[i] && farms[i].api === api)
- return farms[i].farm.end(callback)
- process.nextTick(callback.bind(null, new Error('Worker farm not found!')))
-}
-
-
-module.exports = farm
-module.exports.end = end