diff --git a/package.json b/package.json index 658856d9..18bc659e 100644 --- a/package.json +++ b/package.json @@ -31,11 +31,14 @@ "nodemailer": "^1.4.0", "oauth": "^0.9.12", "q": "^1.4.1", + "redis": "^2.4.2", "sanitize-html": "git://github.com/calzoneman/sanitize-html", "serve-static": "^1.10.0", "socket.io": "^1.4.0", + "socket.io-redis": "^1.0.0", "source-map-support": "^0.4.0", "status-message-polyfill": "calzoneman/status-message-polyfill", + "uuid": "^2.0.1", "yamljs": "^0.1.6" }, "scripts": { diff --git a/src/backend/backendconfiguration.js b/src/backend/backendconfiguration.js new file mode 100644 index 00000000..2883df42 --- /dev/null +++ b/src/backend/backendconfiguration.js @@ -0,0 +1,23 @@ +class BackendConfiguration { + constructor(config) { + this.config = config; + } + + getRedisConfig() { + return this.config.redis; + } + + getListenerConfig() { + return this.config.proxy.listeners.map(listener => ({ + getHost() { + return listener.host; + }, + + getPort() { + return listener.port; + } + })); + } +} + +export { BackendConfiguration }; diff --git a/src/backend/backendmodule.js b/src/backend/backendmodule.js new file mode 100644 index 00000000..dcca65ea --- /dev/null +++ b/src/backend/backendmodule.js @@ -0,0 +1,74 @@ +import { RedisClusterClient } from '../io/cluster/redisclusterclient'; +import { FrontendPool } from 'cytube-common/lib/redis/frontendpool'; +import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider'; +import { loadFromToml } from 'cytube-common/lib/configuration/configloader'; +import path from 'path'; +import { BackendConfiguration } from './backendconfiguration'; +import logger from 'cytube-common/lib/logger'; +import redisAdapter from 'socket.io-redis'; + +const BACKEND_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'backend.toml'); + +class BackendModule { + constructor() { + this.initConfig(); + } + + initConfig() { + try { + this.backendConfig = loadFromToml(BackendConfiguration, BACKEND_CONFIG_PATH); + } catch (error) { + if (typeof error.line !== 'undefined') { + logger.error(`Error in configuration file: ${error} (line ${error.line})`); + } else { + logger.error(`Error loading configuration: ${error.stack}`); + } + + process.exit(1); + } + } + + onReady() { + const redisClientProvider = this.getRedisClientProvider(); + this.redisAdapter = redisAdapter({ + pubClient: redisClientProvider.get(), + subClient: redisClientProvider.get() + }); + this.sioEmitter = require('socket.io').instance; + this.sioEmitter.adapter(this.redisAdapter); + const IOBackend = require('./iobackend'); + this.ioBackend = new IOBackend( + this.backendConfig.getListenerConfig()[0], + this.sioEmitter, + redisClientProvider.get() + ) + } + + getFrontendPool() { + if (!this.frontendPool) { + this.frontendPool = new FrontendPool(this.getRedisClientProvider().get()); + } + + return this.frontendPool; + } + + getRedisClientProvider() { + if (!this.redisClientProvider) { + this.redisClientProvider = new RedisClientProvider( + this.backendConfig.getRedisConfig() + ); + } + + return this.redisClientProvider; + } + + getClusterClient() { + if (!this.redisClusterClient) { + this.redisClusterClient = new RedisClusterClient(this.getFrontendPool()); + } + + return this.redisClusterClient; + } +} + +export { BackendModule } diff --git a/src/backend/iobackend.js b/src/backend/iobackend.js new file mode 100644 index 00000000..2bb40fe6 --- /dev/null +++ b/src/backend/iobackend.js @@ -0,0 +1,45 @@ +import Server from 'cytube-common/lib/proxy/server'; +import ProxyInterceptor from './proxyinterceptor'; +import uuid from 'uuid'; +import PoolEntryUpdater from 'cytube-common/lib/redis/poolentryupdater'; +import JSONProtocol from 'cytube-common/lib/proxy/protocol'; +import { formatProxyAddress } from 'cytube-common/lib/util/addressutil'; + +const BACKEND_POOL = 'backend-hosts'; + +export default class IOBackend { + constructor(proxyListenerConfig, socketEmitter, poolRedisClient) { + this.proxyListenerConfig = proxyListenerConfig; + this.socketEmitter = socketEmitter; + this.poolRedisClient = poolRedisClient; + this.protocol = new JSONProtocol(); + this.initProxyInterceptor(); + this.initProxyListener(); + this.initBackendPoolUpdater(); + } + + initProxyInterceptor() { + this.proxyInterceptor = new ProxyInterceptor(this.socketEmitter); + } + + initProxyListener() { + this.proxyListener = new Server(this.proxyListenerConfig, this.protocol); + this.proxyListener.on('connection', + this.proxyInterceptor.onConnection.bind(this.proxyInterceptor)); + } + + initBackendPoolUpdater() { + const hostname = this.proxyListenerConfig.getHost(); + const port = this.proxyListenerConfig.getPort(); + const entry = { + address: formatProxyAddress(hostname, port) + } + this.poolEntryUpdater = new PoolEntryUpdater( + this.poolRedisClient, + BACKEND_POOL, + uuid.v4(), + entry + ); + this.poolEntryUpdater.start(); + } +} diff --git a/src/backend/proxiedsocket.js b/src/backend/proxiedsocket.js new file mode 100644 index 00000000..24805779 --- /dev/null +++ b/src/backend/proxiedsocket.js @@ -0,0 +1,46 @@ +import logger from 'cytube-common/lib/logger'; +import { EventEmitter } from 'events'; + +export default class ProxiedSocket extends EventEmitter { + constructor(socketID, socketIP, socketUser, socketEmitter, frontendConnection) { + super(); + this.id = socketID; + this.ip = socketIP; + this._realip = socketIP; + if (socketUser) { + this.user = { + name: socketUser.name, + global_rank: socketUser.globalRank + }; + } + this.socketEmitter = socketEmitter; + this.frontendConnection = frontendConnection; + } + + emit() { + const target = this.socketEmitter.to(this.id); + target.emit.apply(target, arguments); + } + + onProxiedEventReceived() { + try { + EventEmitter.prototype.emit.apply(this, arguments); + } catch (error) { + logger.error(`Emit failed: ${error.stack}`); + } + } + + join(channel) { + this.frontendConnection.write( + this.frontendConnection.protocol.newSocketJoinRoomsEvent( + this.id, [channel] + ) + ); + } + + disconnect() { + this.frontendConnection.write( + this.frontendConnection.protocol.newSocketKickEvent(this.id) + ); + } +} diff --git a/src/backend/proxyinterceptor.js b/src/backend/proxyinterceptor.js new file mode 100644 index 00000000..2f67e0fe --- /dev/null +++ b/src/backend/proxyinterceptor.js @@ -0,0 +1,76 @@ +import logger from 'cytube-common/lib/logger'; +import ioServer from '../io/ioserver'; +import ProxiedSocket from './proxiedsocket'; + +export default class ProxyInterceptor { + constructor(socketEmitter) { + this.socketEmitter = socketEmitter; + this.frontendConnections = {}; + this.frontendProxiedSockets = {}; + } + + /** + * Handle a new frontend proxy connection. + * + * @param {Connection} socket frontend proxy connection + */ + onConnection(socket) { + if (this.frontendConnections.hasOwnProperty(socket.endpoint)) { + logger.error(`Duplicate frontend connection: ${socket.endpoint}`); + return; + } + + this.frontendConnections[socket.endpoint] = socket; + socket.on('close', this.onFrontendDisconnect.bind(this, socket)); + socket.on('SocketConnectEvent', this.onSocketConnect.bind(this, socket)); + socket.on('SocketFrameEvent', this.onSocketFrame.bind(this, socket)); + } + + onFrontendDisconnect(socket) { + const endpoint = socket.endpoint; + if (this.frontendConnections.hasOwnProperty(endpoint)) { + if (this.frontendProxiedSockets.hasOwnProperty(endpoint)) { + logger.warn(`Frontend ${endpoint} disconnected`); + for (const key in this.frontendProxiedSockets[endpoint]) { + const proxySocket = this.frontendProxiedSockets[endpoint][key]; + proxySocket.onProxiedEventReceived('disconnect'); + } + delete this.frontendProxiedSockets[endpoint]; + } + delete this.frontendConnections[endpoint]; + } + } + + onSocketConnect(frontendConnection, socketID, socketIP, socketUser) { + const mapKey = frontendConnection.endpoint; + const proxiedSocket = new ProxiedSocket( + socketID, + socketIP, + socketUser, + this.socketEmitter, + frontendConnection); + + if (!this.frontendProxiedSockets.hasOwnProperty(mapKey)) { + this.frontendProxiedSockets[mapKey] = {}; + } else if (this.frontendProxiedSockets[mapKey].hasOwnProperty(socketID)) { + logger.error(`Duplicate SocketConnectEvent for ${socketID}`); + return; + } + + this.frontendProxiedSockets[mapKey][socketID] = proxiedSocket; + ioServer.handleConnection(proxiedSocket); + } + + onSocketFrame(frontendConnection, socketID, event, args) { + const mapKey = frontendConnection.endpoint; + const socketMap = this.frontendProxiedSockets[mapKey]; + if (!socketMap || !socketMap.hasOwnProperty(socketID)) { + logger.error(`Received SocketFrameEvent for nonexistent socket`, + { socketID, event }); + return; + } + + const socket = socketMap[socketID]; + socket.onProxiedEventReceived.apply(socket, [event].concat(args)); + } +} diff --git a/src/io/cluster/redisclusterclient.js b/src/io/cluster/redisclusterclient.js new file mode 100644 index 00000000..b4c7eb83 --- /dev/null +++ b/src/io/cluster/redisclusterclient.js @@ -0,0 +1,17 @@ +class RedisClusterClient { + constructor(frontendPool) { + this.frontendPool = frontendPool; + } + + getSocketConfig(channel) { + return this.frontendPool.getFrontends(channel).then(result => { + if (!Array.isArray(result)) { + result = []; + } + + return { servers: result }; + }); + } +} + +export { RedisClusterClient }; diff --git a/src/io/ioserver.js b/src/io/ioserver.js index d2e95a87..099fc861 100644 --- a/src/io/ioserver.js +++ b/src/io/ioserver.js @@ -276,7 +276,9 @@ module.exports = { bound[id] = null; }); - } + }, + + handleConnection: handleConnection }; /* Clean out old rate limiters */ diff --git a/src/legacymodule.js b/src/legacymodule.js new file mode 100644 index 00000000..e2d18f3f --- /dev/null +++ b/src/legacymodule.js @@ -0,0 +1,13 @@ +import NullClusterClient from './io/cluster/nullclusterclient'; + +class LegacyModule { + getClusterClient() { + return new NullClusterClient(); + } + + onReady() { + + } +} + +export { LegacyModule }; diff --git a/src/server.js b/src/server.js index 795efece..541834c1 100644 --- a/src/server.js +++ b/src/server.js @@ -47,6 +47,8 @@ import IOConfiguration from './configuration/ioconfig'; import WebConfiguration from './configuration/webconfig'; import NullClusterClient from './io/cluster/nullclusterclient'; import session from './session'; +import { BackendModule } from './backend/backendmodule'; +import { LegacyModule } from './legacymodule'; var Server = function () { var self = this; @@ -58,6 +60,14 @@ var Server = function () { self.infogetter = null; self.servers = {}; + // backend init + var initModule; + if (true) { + initModule = new BackendModule(); + } else { + initModule = new LegacyModule(); + } + // database init ------------------------------------------------------ var Database = require("./database"); self.db = Database; @@ -67,7 +77,7 @@ var Server = function () { // webserver init ----------------------------------------------------- const ioConfig = IOConfiguration.fromOldConfig(Config); const webConfig = WebConfiguration.fromOldConfig(Config); - const clusterClient = new NullClusterClient(ioConfig); + const clusterClient = initModule.getClusterClient(); const channelIndex = new LocalChannelIndex(); self.express = express(); require("./web/webserver").init(self.express, @@ -133,6 +143,8 @@ var Server = function () { // setuid require("./setuid"); + + initModule.onReady(); }; Server.prototype.getHTTPIP = function (req) {