diff --git a/index.js b/index.js index fb8e3cc0..729d6cb4 100644 --- a/index.js +++ b/index.js @@ -73,5 +73,7 @@ function handleLine(line) { Logger.syslog.log("Switch " + args[0] + " is now " + (Switches.isActive(args[0]) ? "ON" : "OFF")); } + } else if (line.indexOf("/reload-partitions") === 0) { + sv.reloadPartitionMap(); } } diff --git a/src/io/cluster/partitionclusterclient.js b/src/io/cluster/partitionclusterclient.js index 9d5324ef..13d29ff2 100644 --- a/src/io/cluster/partitionclusterclient.js +++ b/src/io/cluster/partitionclusterclient.js @@ -6,9 +6,8 @@ class PartitionClusterClient { } getSocketConfig(channel) { - return Promise.resolve({ - servers: this.partitionDecider.getPartitionForChannel(channel) - }); + return Promise.resolve( + this.partitionDecider.getPartitionForChannel(channel)); } } diff --git a/src/partition/partitiondecider.js b/src/partition/partitiondecider.js index 425d6207..b9677d95 100644 --- a/src/partition/partitiondecider.js +++ b/src/partition/partitiondecider.js @@ -2,27 +2,28 @@ import { murmurHash1 } from '../util/murmur'; class PartitionDecider { constructor(config) { - this.identity = config.getIdentity(); - this.partitionMap = config.getPartitionMap(); - this.pool = config.getPool(); - this.overrideMap = config.getOverrideMap(); + this.config = config; } getPartitionForChannel(channel) { - return this.partitionMap[this.getPartitionIdentityForChannel(channel)]; + const partitionMap = this.config.getPartitionMap(); + return partitionMap[this.getPartitionIdentityForChannel(channel)]; } getPartitionIdentityForChannel(channel) { - if (this.overrideMap.hasOwnProperty(channel)) { - return this.overrideMap[channel]; + const overrideMap = this.config.getOverrideMap(); + if (overrideMap.hasOwnProperty(channel)) { + return overrideMap[channel]; } else { - const i = murmurHash1(channel) % this.pool.length; - return this.pool[i]; + const pool = this.config.getPool(); + const i = murmurHash1(channel) % pool.length; + return pool[i]; } } isChannelOnThisPartition(channel) { - return this.getPartitionIdentityForChannel(channel) === this.identity; + return this.getPartitionIdentityForChannel(channel) === + this.config.getIdentity(); } } diff --git a/src/server.js b/src/server.js index 95f5d3bd..97e57d52 100644 --- a/src/server.js +++ b/src/server.js @@ -68,12 +68,12 @@ var Server = function () { Switches.setActive(Switches.DUAL_BACKEND, true); } const BackendModule = require('./backend/backendmodule').BackendModule; - initModule = new BackendModule(); + initModule = this.initModule = new BackendModule(); } else if (Config.get('enable-partition')) { - initModule = new PartitionModule(); + initModule = this.initModule = new PartitionModule(); self.partitionDecider = initModule.getPartitionDecider(); } else { - initModule = new LegacyModule(); + initModule = this.initModule = new LegacyModule(); } // database init ------------------------------------------------------ @@ -302,3 +302,43 @@ Server.prototype.shutdown = function () { process.exit(1); }); }; + +Server.prototype.reloadPartitionMap = function () { + if (!Config.get("enable-partition")) { + return; + } + + var config; + try { + config = this.initModule.loadPartitionMap(); + } catch (error) { + return; + } + + this.initModule.partitionConfig.config = config.config; + + const channels = Array.prototype.slice.call(this.channels); + Promise.reduce(channels, (_, channel) => { + if (channel.dead) { + return; + } + + if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) { + Logger.syslog.log("Partition changed for " + channel.uniqueName); + return channel.saveState().then(() => { + channel.broadcastAll("partitionChange", + this.partitionDecider.getPartitionForChannel(channel.uniqueName)); + const users = Array.prototype.slice.call(channel.users); + users.forEach(u => { + try { + u.socket.disconnect(); + } catch (error) { + } + }); + this.unloadChannel(channel); + }); + } + }, 0).then(() => { + Logger.syslog.log("Partition reload complete"); + }); +}; diff --git a/www/js/callbacks.js b/www/js/callbacks.js index eb716e41..0794b40c 100644 --- a/www/js/callbacks.js +++ b/www/js/callbacks.js @@ -1030,6 +1030,12 @@ Callbacks = { "unneeded playlist items, filters, and/or emotes. Changes to the channel " + "will not be saved until the size is reduced to under the limit.") .attr("id", "chandumptoobig"); + }, + + partitionChange: function (socketConfig) { + window.socket.disconnect(); + ioServerConnect(socketConfig); + setupCallbacks(); } } @@ -1065,6 +1071,50 @@ setupCallbacks = function() { }); }; +function ioServerConnect(socketConfig) { + if (socketConfig.error) { + makeAlert("Error", "Socket.io configuration returned error: " + + socketConfig.error, "alert-danger") + .appendTo($("#announcements")); + return; + } + + var servers; + if (socketConfig.alt && socketConfig.alt.length > 0 && + localStorage.useAltServer === "true") { + servers = socketConfig.alt; + console.log("Using alt servers: " + JSON.stringify(servers)); + } else { + servers = socketConfig.servers; + } + + var chosenServer = null; + servers.forEach(function (server) { + if (chosenServer === null) { + chosenServer = server; + } else if (server.secure && !chosenServer.secure) { + chosenServer = server; + } else if (!server.ipv6Only && chosenServer.ipv6Only) { + chosenServer = server; + } + }); + + console.log("Connecting to " + JSON.stringify(chosenServer)); + + if (chosenServer === null) { + makeAlert("Error", + "Socket.io configuration was unable to find a suitable server", + "alert-danger") + .appendTo($("#announcements")); + } + + var opts = { + secure: chosenServer.secure + }; + + window.socket = io(chosenServer.url, opts); +} + (function () { if (typeof io === "undefined") { var script = document.getElementById("socketio-js"); @@ -1084,47 +1134,7 @@ setupCallbacks = function() { $.getJSON("/socketconfig/" + CHANNEL.name + ".json") .done(function (socketConfig) { - if (socketConfig.error) { - makeAlert("Error", "Socket.io configuration returned error: " + - socketConfig.error, "alert-danger") - .appendTo($("#announcements")); - return; - } - - var servers; - if (socketConfig.alt && socketConfig.alt.length > 0 && - localStorage.useAltServer === "true") { - servers = socketConfig.alt; - console.log("Using alt servers: " + JSON.stringify(servers)); - } else { - servers = socketConfig.servers; - } - - var chosenServer = null; - servers.forEach(function (server) { - if (chosenServer === null) { - chosenServer = server; - } else if (server.secure && !chosenServer.secure) { - chosenServer = server; - } else if (!server.ipv6Only && chosenServer.ipv6Only) { - chosenServer = server; - } - }); - - console.log("Connecting to " + JSON.stringify(chosenServer)); - - if (chosenServer === null) { - makeAlert("Error", - "Socket.io configuration was unable to find a suitable server", - "alert-danger") - .appendTo($("#announcements")); - } - - var opts = { - secure: chosenServer.secure - }; - - socket = io(chosenServer.url, opts); + ioServerConnect(socketConfig); setupCallbacks(); }).fail(function () { makeAlert("Error", "Failed to retrieve socket.io configuration",