Add partition map reload
This commit is contained in:
parent
7faf2829b2
commit
6e772c6837
2
index.js
2
index.js
|
@ -73,5 +73,7 @@ function handleLine(line) {
|
|||
Logger.syslog.log("Switch " + args[0] + " is now " +
|
||||
(Switches.isActive(args[0]) ? "ON" : "OFF"));
|
||||
}
|
||||
} else if (line.indexOf("/reload-partitions") === 0) {
|
||||
sv.reloadPartitionMap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,8 @@ class PartitionClusterClient {
|
|||
}
|
||||
|
||||
getSocketConfig(channel) {
|
||||
return Promise.resolve({
|
||||
servers: this.partitionDecider.getPartitionForChannel(channel)
|
||||
});
|
||||
return Promise.resolve(
|
||||
this.partitionDecider.getPartitionForChannel(channel));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,27 +2,28 @@ import { murmurHash1 } from '../util/murmur';
|
|||
|
||||
class PartitionDecider {
|
||||
constructor(config) {
|
||||
this.identity = config.getIdentity();
|
||||
this.partitionMap = config.getPartitionMap();
|
||||
this.pool = config.getPool();
|
||||
this.overrideMap = config.getOverrideMap();
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
getPartitionForChannel(channel) {
|
||||
return this.partitionMap[this.getPartitionIdentityForChannel(channel)];
|
||||
const partitionMap = this.config.getPartitionMap();
|
||||
return partitionMap[this.getPartitionIdentityForChannel(channel)];
|
||||
}
|
||||
|
||||
getPartitionIdentityForChannel(channel) {
|
||||
if (this.overrideMap.hasOwnProperty(channel)) {
|
||||
return this.overrideMap[channel];
|
||||
const overrideMap = this.config.getOverrideMap();
|
||||
if (overrideMap.hasOwnProperty(channel)) {
|
||||
return overrideMap[channel];
|
||||
} else {
|
||||
const i = murmurHash1(channel) % this.pool.length;
|
||||
return this.pool[i];
|
||||
const pool = this.config.getPool();
|
||||
const i = murmurHash1(channel) % pool.length;
|
||||
return pool[i];
|
||||
}
|
||||
}
|
||||
|
||||
isChannelOnThisPartition(channel) {
|
||||
return this.getPartitionIdentityForChannel(channel) === this.identity;
|
||||
return this.getPartitionIdentityForChannel(channel) ===
|
||||
this.config.getIdentity();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -68,12 +68,12 @@ var Server = function () {
|
|||
Switches.setActive(Switches.DUAL_BACKEND, true);
|
||||
}
|
||||
const BackendModule = require('./backend/backendmodule').BackendModule;
|
||||
initModule = new BackendModule();
|
||||
initModule = this.initModule = new BackendModule();
|
||||
} else if (Config.get('enable-partition')) {
|
||||
initModule = new PartitionModule();
|
||||
initModule = this.initModule = new PartitionModule();
|
||||
self.partitionDecider = initModule.getPartitionDecider();
|
||||
} else {
|
||||
initModule = new LegacyModule();
|
||||
initModule = this.initModule = new LegacyModule();
|
||||
}
|
||||
|
||||
// database init ------------------------------------------------------
|
||||
|
@ -302,3 +302,43 @@ Server.prototype.shutdown = function () {
|
|||
process.exit(1);
|
||||
});
|
||||
};
|
||||
|
||||
Server.prototype.reloadPartitionMap = function () {
|
||||
if (!Config.get("enable-partition")) {
|
||||
return;
|
||||
}
|
||||
|
||||
var config;
|
||||
try {
|
||||
config = this.initModule.loadPartitionMap();
|
||||
} catch (error) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.initModule.partitionConfig.config = config.config;
|
||||
|
||||
const channels = Array.prototype.slice.call(this.channels);
|
||||
Promise.reduce(channels, (_, channel) => {
|
||||
if (channel.dead) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) {
|
||||
Logger.syslog.log("Partition changed for " + channel.uniqueName);
|
||||
return channel.saveState().then(() => {
|
||||
channel.broadcastAll("partitionChange",
|
||||
this.partitionDecider.getPartitionForChannel(channel.uniqueName));
|
||||
const users = Array.prototype.slice.call(channel.users);
|
||||
users.forEach(u => {
|
||||
try {
|
||||
u.socket.disconnect();
|
||||
} catch (error) {
|
||||
}
|
||||
});
|
||||
this.unloadChannel(channel);
|
||||
});
|
||||
}
|
||||
}, 0).then(() => {
|
||||
Logger.syslog.log("Partition reload complete");
|
||||
});
|
||||
};
|
||||
|
|
|
@ -1030,6 +1030,12 @@ Callbacks = {
|
|||
"unneeded playlist items, filters, and/or emotes. Changes to the channel " +
|
||||
"will not be saved until the size is reduced to under the limit.")
|
||||
.attr("id", "chandumptoobig");
|
||||
},
|
||||
|
||||
partitionChange: function (socketConfig) {
|
||||
window.socket.disconnect();
|
||||
ioServerConnect(socketConfig);
|
||||
setupCallbacks();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1065,6 +1071,50 @@ setupCallbacks = function() {
|
|||
});
|
||||
};
|
||||
|
||||
function ioServerConnect(socketConfig) {
|
||||
if (socketConfig.error) {
|
||||
makeAlert("Error", "Socket.io configuration returned error: " +
|
||||
socketConfig.error, "alert-danger")
|
||||
.appendTo($("#announcements"));
|
||||
return;
|
||||
}
|
||||
|
||||
var servers;
|
||||
if (socketConfig.alt && socketConfig.alt.length > 0 &&
|
||||
localStorage.useAltServer === "true") {
|
||||
servers = socketConfig.alt;
|
||||
console.log("Using alt servers: " + JSON.stringify(servers));
|
||||
} else {
|
||||
servers = socketConfig.servers;
|
||||
}
|
||||
|
||||
var chosenServer = null;
|
||||
servers.forEach(function (server) {
|
||||
if (chosenServer === null) {
|
||||
chosenServer = server;
|
||||
} else if (server.secure && !chosenServer.secure) {
|
||||
chosenServer = server;
|
||||
} else if (!server.ipv6Only && chosenServer.ipv6Only) {
|
||||
chosenServer = server;
|
||||
}
|
||||
});
|
||||
|
||||
console.log("Connecting to " + JSON.stringify(chosenServer));
|
||||
|
||||
if (chosenServer === null) {
|
||||
makeAlert("Error",
|
||||
"Socket.io configuration was unable to find a suitable server",
|
||||
"alert-danger")
|
||||
.appendTo($("#announcements"));
|
||||
}
|
||||
|
||||
var opts = {
|
||||
secure: chosenServer.secure
|
||||
};
|
||||
|
||||
window.socket = io(chosenServer.url, opts);
|
||||
}
|
||||
|
||||
(function () {
|
||||
if (typeof io === "undefined") {
|
||||
var script = document.getElementById("socketio-js");
|
||||
|
@ -1084,47 +1134,7 @@ setupCallbacks = function() {
|
|||
|
||||
$.getJSON("/socketconfig/" + CHANNEL.name + ".json")
|
||||
.done(function (socketConfig) {
|
||||
if (socketConfig.error) {
|
||||
makeAlert("Error", "Socket.io configuration returned error: " +
|
||||
socketConfig.error, "alert-danger")
|
||||
.appendTo($("#announcements"));
|
||||
return;
|
||||
}
|
||||
|
||||
var servers;
|
||||
if (socketConfig.alt && socketConfig.alt.length > 0 &&
|
||||
localStorage.useAltServer === "true") {
|
||||
servers = socketConfig.alt;
|
||||
console.log("Using alt servers: " + JSON.stringify(servers));
|
||||
} else {
|
||||
servers = socketConfig.servers;
|
||||
}
|
||||
|
||||
var chosenServer = null;
|
||||
servers.forEach(function (server) {
|
||||
if (chosenServer === null) {
|
||||
chosenServer = server;
|
||||
} else if (server.secure && !chosenServer.secure) {
|
||||
chosenServer = server;
|
||||
} else if (!server.ipv6Only && chosenServer.ipv6Only) {
|
||||
chosenServer = server;
|
||||
}
|
||||
});
|
||||
|
||||
console.log("Connecting to " + JSON.stringify(chosenServer));
|
||||
|
||||
if (chosenServer === null) {
|
||||
makeAlert("Error",
|
||||
"Socket.io configuration was unable to find a suitable server",
|
||||
"alert-danger")
|
||||
.appendTo($("#announcements"));
|
||||
}
|
||||
|
||||
var opts = {
|
||||
secure: chosenServer.secure
|
||||
};
|
||||
|
||||
socket = io(chosenServer.url, opts);
|
||||
ioServerConnect(socketConfig);
|
||||
setupCallbacks();
|
||||
}).fail(function () {
|
||||
makeAlert("Error", "Failed to retrieve socket.io configuration",
|
||||
|
|
Loading…
Reference in a new issue