diff --git a/streaming/index.js b/streaming/index.js index 3a01be66a5..dd1a8d5462 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -74,6 +74,7 @@ const startMaster = () => { if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) { log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.'); } + log.info(`Starting streaming API server master with ${numWorkers} workers`); }; @@ -616,16 +617,9 @@ const startWorker = (workerId) => { }); }, 30000); - if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) { - server.listen(process.env.SOCKET || process.env.PORT, () => { - fs.chmodSync(server.address(), 0o666); - log.info(`Worker ${workerId} now listening on ${server.address()}`); - }); - } else { - server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => { - log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`); - }); - } + attachServerWithConfig(server, address => { + log.info(`Worker ${workerId} now listening on ${address}`); + }); const onExit = () => { log.info(`Worker ${workerId} exiting, bye bye`); @@ -645,9 +639,49 @@ const startWorker = (workerId) => { process.on('uncaughtException', onError); }; -throng({ - workers: numWorkers, - lifetime: Infinity, - start: startWorker, - master: startMaster, +const attachServerWithConfig = (server, onSuccess) => { + if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) { + server.listen(process.env.SOCKET || process.env.PORT, () => { + fs.chmodSync(server.address(), 0o666); + + if (onSuccess) { + onSuccess(server.address()); + } + }); + } else { + server.listen(+process.env.PORT || 4000, process.env.BIND || '0.0.0.0', () => { + if (onSuccess) { + onSuccess(`${server.address().address}:${server.address().port}`); + } + }); + } +}; + +const onPortAvailable = onSuccess => { + const testServer = http.createServer(); + + testServer.once('error', err => { + onSuccess(err); + }); + + testServer.once('listening', () => { + testServer.once('close', () => onSuccess()); + testServer.close(); + }); + + attachServerWithConfig(testServer); +}; + +onPortAvailable(err => { + if (err) { + log.error('Could not start server, the port or socket is in use'); + return; + } + + throng({ + workers: numWorkers, + lifetime: Infinity, + start: startWorker, + master: startMaster, + }); });