From d2cce4f166dd7a3b6bd5dab3f396e7cb9e72c561 Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Sat, 15 Oct 2016 12:36:20 -0700 Subject: [PATCH 1/4] Work on auto reloading partition map from redis --- package.json | 1 + src/partition/partitionconfig.js | 12 ---- src/partition/partitiondecider.js | 18 ++++-- src/partition/partitionmap.js | 67 ++++++++++++++++++++++ src/partition/partitionmodule.js | 24 +++++++- src/partition/redispartitionmapreloader.js | 54 +++++++++++++++++ src/server.js | 23 +++----- 7 files changed, 164 insertions(+), 35 deletions(-) create mode 100644 src/partition/partitionmap.js create mode 100644 src/partition/redispartitionmapreloader.js diff --git a/package.json b/package.json index 9cbf6187..ff1f2026 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "socket.io-redis": "^1.0.0", "source-map-support": "^0.4.0", "status-message-polyfill": "git://github.com/calzoneman/status-message-polyfill", + "toml": "^2.3.0", "uuid": "^2.0.1", "yamljs": "^0.1.6" }, diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index b9ec4cdd..b3ccd46d 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -3,18 +3,6 @@ class PartitionConfig { this.config = config; } - getPartitionMap() { - return this.config.partitions; - } - - getOverrideMap() { - return this.config.overrides; - } - - getPool() { - return this.config.pool; - } - getIdentity() { return this.config.identity; } diff --git a/src/partition/partitiondecider.js b/src/partition/partitiondecider.js index 0f6226cc..a93668f3 100644 --- a/src/partition/partitiondecider.js +++ b/src/partition/partitiondecider.js @@ -1,24 +1,26 @@ import { murmurHash1 } from '../util/murmur'; class PartitionDecider { - constructor(config) { + constructor(config, partitionMap) { this.config = config; + this.partitionMap = partitionMap; } getPartitionForChannel(channel) { - const partitionMap = this.config.getPartitionMap(); - return partitionMap[this.getPartitionIdentityForChannel(channel)]; + return this.partitionMap.getPartitions()[this.getPartitionIdentityForChannel(channel)]; } getPartitionIdentityForChannel(channel) { channel = channel.toLowerCase(); - const overrideMap = this.config.getOverrideMap(); + const overrideMap = this.partitionMap.getOverrides(); if (overrideMap.hasOwnProperty(channel)) { return overrideMap[channel]; - } else { - const pool = this.config.getPool(); + } else if (this.partitionMap.getPool().length > 0) { + const pool = this.partitionMap.getPool(); const i = murmurHash1(channel) % pool.length; return pool[i]; + } else { + return { servers: [] }; } } @@ -26,6 +28,10 @@ class PartitionDecider { return this.getPartitionIdentityForChannel(channel) === this.config.getIdentity(); } + + setPartitionMap(newMap) { + this.partitionMap = newMap; + } } export { PartitionDecider }; diff --git a/src/partition/partitionmap.js b/src/partition/partitionmap.js new file mode 100644 index 00000000..2d7a3c96 --- /dev/null +++ b/src/partition/partitionmap.js @@ -0,0 +1,67 @@ +import crypto from 'crypto'; +import fs from 'fs'; +import toml from 'toml'; + +function sha256(input) { + var hash = crypto.createHash('sha256'); + hash.update(input); + return hash.digest('base64'); +} + +class PartitionMap { + /** + * @param {Map} partitions Map of node ids to io configs + * @param {Array} pool List of available nodes + * @param {Map} overrides Overrides for node assignment + */ + constructor(partitions, pool, overrides) { + this.partitions = partitions; + this.pool = pool; + this.overrides = overrides || {}; + this._hash = sha256(JSON.stringify(this.partitions) + + JSON.stringify(this.pool) + + JSON.stringify(this.overrides)); + } + + getHash() { + return this._hash; + } + + getPartitions() { + return this.partitions; + } + + getPool() { + return this.pool; + } + + getOverrides() { + return this.overrides; + } + + toJSON() { + return { + partitions: this.partitions, + pool: this.pool, + overrides: this.overrides, + hash: this._hash + }; + } + + static fromJSON(json) { + return new PartitionMap(json.partitions, json.pool, json.overrides); + } + + static fromFile(filename) { + const rawData = fs.readFileSync(filename).toString('utf8'); + const parsed = toml.parse(rawData); + + return PartitionMap.fromJSON(parsed); + } + + static empty() { + return new PartitionMap({}, [], {}); + } +} + +export { PartitionMap }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index 6c3b61cb..42639b53 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -7,6 +7,7 @@ import logger from 'cytube-common/lib/logger'; import LegacyConfig from '../config'; import path from 'path'; import { AnnouncementRefresher } from './announcementrefresher'; +import { RedisPartitionMapReloader } from './redispartitionmapreloader'; const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', 'partitions.toml'); @@ -23,13 +24,13 @@ class PartitionModule { initConfig() { logger.initialize(null, null, LegacyConfig.get('debug')); try { - this.partitionConfig = this.loadPartitionMap(); + this.partitionConfig = this.loadPartitionConfig(); } catch (error) { process.exit(1); } } - loadPartitionMap() { + loadPartitionConfig() { try { return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH); } catch (error) { @@ -44,9 +45,26 @@ class PartitionModule { } } + getPartitionMapReloader() { + if (!this.partitionMapReloader) { + const redisProvider = this.getRedisClientProvider(); + this.partitionMapReloader = new RedisPartitionMapReloader( + redisProvider.get(), // Client for GET partitionMap + redisProvider.get()); // Subscribe client + } + + return this.partitionMapReloader; + } + getPartitionDecider() { if (!this.partitionDecider) { - this.partitionDecider = new PartitionDecider(this.partitionConfig); + const reloader = this.getPartitionMapReloader(); + this.partitionDecider = new PartitionDecider(this.partitionConfig, + reloader.getPartitionMap()); + reloader.on('partitionMapChange', newMap => { + this.partitionDecider.setPartitionMap(newMap); + require('../server').getServer().handlePartitionMapChange(); + }); } return this.partitionDecider; diff --git a/src/partition/redispartitionmapreloader.js b/src/partition/redispartitionmapreloader.js new file mode 100644 index 00000000..114a94ba --- /dev/null +++ b/src/partition/redispartitionmapreloader.js @@ -0,0 +1,54 @@ +import { PartitionMap } from './partitionmap'; +import logger from 'cytube-common/lib/logger'; +import { EventEmitter } from 'events'; + +class RedisPartitionMapReloader extends EventEmitter { + constructor(redisClient, subClient) { + super(); + this.redisClient = redisClient; + this.subClient = subClient; + this.partitionMap = PartitionMap.empty(); + redisClient.once('ready', () => this.reload()); + subClient.once('ready', () => this.subscribe()); + } + + subscribe() { + this.subClient.subscribe('partitionMap'); + this.subClient.on('message', (channel, message) => { + if (channel !== 'partitionMap') { + logger.warn('RedisPartitionMapReloader received unexpected message ' + + `on redis channel ${channel}`); + return; + } + + this.reload(); + }); + } + + reload() { + this.redisClient.getAsync('partitionMap').then(result => { + var newMap = null; + try { + newMap = PartitionMap.fromJSON(JSON.parse(result)); + } catch (error) { + logger.error(`Failed to decode received partition map: ${error}`, + { payload: result }); + return; + } + + if (this.partitionMap.getHash() !== newMap.getHash()) { + logger.info(`Partition map changed (hash=${newMap.getHash()})`); + this.partitionMap = newMap; + this.emit('partitionMapChange', newMap); + } + }).catch(error => { + logger.error(`Failed to retrieve partition map from redis: ${error}`); + }); + } + + getPartitionMap() { + return this.partitionMap; + } +} + +export { RedisPartitionMapReloader }; diff --git a/src/server.js b/src/server.js index c5d629ae..e00cff90 100644 --- a/src/server.js +++ b/src/server.js @@ -333,20 +333,7 @@ Server.prototype.shutdown = function () { }); }; -Server.prototype.reloadPartitionMap = function () { - if (!Config.get("enable-partition")) { - return; - } - - var config; - try { - config = this.initModule.loadPartitionMap(); - } catch (error) { - return; - } - - this.initModule.partitionConfig.config = config.config; - +Server.prototype.handlePartitionMapChange = function () { const channels = Array.prototype.slice.call(this.channels); Promise.map(channels, channel => { if (channel.dead) { @@ -375,3 +362,11 @@ Server.prototype.reloadPartitionMap = function () { Logger.syslog.log("Partition reload complete"); }); }; + +Server.prototype.reloadPartitionMap = function () { + if (!Config.get("enable-partitions")) { + return; + } + + this.initModule.getPartitionMapReloader().reload(); +}; From 7117cd0a5e68abb5ae7c18c0d8c4e8f86284717c Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Sat, 15 Oct 2016 16:09:27 -0700 Subject: [PATCH 2/4] Fix typo --- src/server.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.js b/src/server.js index e00cff90..99e67059 100644 --- a/src/server.js +++ b/src/server.js @@ -364,7 +364,7 @@ Server.prototype.handlePartitionMapChange = function () { }; Server.prototype.reloadPartitionMap = function () { - if (!Config.get("enable-partitions")) { + if (!Config.get("enable-partition")) { return; } From 654d57b53e0f8720e035644678a4cb07ec26fba6 Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Sun, 16 Oct 2016 16:58:28 -0700 Subject: [PATCH 3/4] Add CLI for loading/saving partition map --- src/partition/partitioncli.js | 82 ++++++++++++++++++++++++++++++++ src/partition/partitionmodule.js | 5 +- 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 src/partition/partitioncli.js diff --git a/src/partition/partitioncli.js b/src/partition/partitioncli.js new file mode 100644 index 00000000..d7d2e0ca --- /dev/null +++ b/src/partition/partitioncli.js @@ -0,0 +1,82 @@ +import { PartitionModule } from './partitionmodule'; +import { PartitionMap } from './partitionmap'; +import fs from 'fs'; + +const partitionModule = new PartitionModule(); +partitionModule.cliMode = true; + +function savePartitionMap(filename) { + const reloader = partitionModule.getPartitionMapReloader(); + reloader.once('partitionMapChange', map => { + var toml = 'pool = [\n'; + map.getPool().forEach((poolEntry, i) => { + toml += ` '${poolEntry}'`; + if (i < map.getPool().length - 1) { + toml += ','; + } + + toml += '\n'; + }); + toml += ']\n\n'; + + const partitions = map.getPartitions(); + Object.keys(partitions).forEach(identity => { + partitions[identity].servers.forEach(serverDef => { + toml += `[[partitions.${identity}.servers]]\n`; + toml += `url = '${serverDef.url}'\n`; + toml += `secure = ${serverDef.secure}\n`; + toml += '\n'; + }); + }); + + toml += '[overrides]\n'; + const overrides = map.getOverrides(); + Object.keys(overrides).forEach(channel => { + toml += `${channel} = '${overrides[channel]}'\n`; + }); + + fs.writeFileSync(filename, toml); + console.log(`Wrote partition map to ${filename}`); + process.exit(0); + }); +} + +function loadPartitionMap(filename) { + var newMap; + + try { + newMap = PartitionMap.fromFile(filename); + } catch (error) { + console.error(`Failed to load partition map from ${filename}: ${error}`); + console.error(error.stack); + process.exit(1); + } + + const client = partitionModule.getRedisClientProvider().get(); + client.once('ready', () => { + client.multi() + .set('partitionMap', JSON.stringify(newMap)) + .publish('partitionMap', new Date().toISOString()) + .execAsync() + .then(result => { + console.log(`Result: ${result}`); + console.log(`Published new partition map from ${filename}`); + process.exit(0); + }).catch(error => { + console.error(`Failed to publish partition map: ${error}`); + console.error(error.stack); + process.exit(1); + }); + }); +} + +if (process.argv[2] === 'save') { + savePartitionMap(process.argv[3]); +} else if (process.argv[2] === 'load') { + loadPartitionMap(process.argv[3]); +} else { + console.error('Usage: ' + process.argv[0] + ' ' + process.argv[1] + ' '); + console.error(' "save" downloads the partition map and saves it to the specified file'); + console.error(' "load" loads the partition map from the specified file and publishes it'); + process.exit(1); +} diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index 42639b53..d698b012 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -15,6 +15,7 @@ const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', class PartitionModule { constructor() { this.initConfig(); + this.cliMode = false; } onReady() { @@ -63,7 +64,9 @@ class PartitionModule { reloader.getPartitionMap()); reloader.on('partitionMapChange', newMap => { this.partitionDecider.setPartitionMap(newMap); - require('../server').getServer().handlePartitionMapChange(); + if (!this.cliMode) { + require('../server').getServer().handlePartitionMapChange(); + } }); } From d159a16aca06f1afc37228db7e98de69eb91c070 Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Tue, 18 Oct 2016 23:13:25 -0700 Subject: [PATCH 4/4] Add configuration for redis key --- src/partition/partitioncli.js | 5 +++-- src/partition/partitionconfig.js | 8 ++++++++ src/partition/partitionmap.js | 16 ++++++++++++++++ src/partition/partitionmodule.js | 1 + src/partition/redispartitionmapreloader.js | 10 ++++++---- 5 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/partition/partitioncli.js b/src/partition/partitioncli.js index d7d2e0ca..8a97e18b 100644 --- a/src/partition/partitioncli.js +++ b/src/partition/partitioncli.js @@ -53,10 +53,11 @@ function loadPartitionMap(filename) { } const client = partitionModule.getRedisClientProvider().get(); + const config = partitionModule.partitionConfig; client.once('ready', () => { client.multi() - .set('partitionMap', JSON.stringify(newMap)) - .publish('partitionMap', new Date().toISOString()) + .set(config.getPartitionMapKey(), JSON.stringify(newMap)) + .publish(config.getPublishChannel(), new Date().toISOString()) .execAsync() .then(result => { console.log(`Result: ${result}`); diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index b3ccd46d..e2e3523b 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -10,6 +10,14 @@ class PartitionConfig { getRedisConfig() { return this.config.redis; } + + getPublishChannel() { + return this.config.redis.publishChannel; + } + + getPartitionMapKey() { + return this.config.redis.partitionMapKey; + } } export { PartitionConfig }; diff --git a/src/partition/partitionmap.js b/src/partition/partitionmap.js index 2d7a3c96..d3a96330 100644 --- a/src/partition/partitionmap.js +++ b/src/partition/partitionmap.js @@ -49,6 +49,22 @@ class PartitionMap { } static fromJSON(json) { + if (json === null) { + throw new Error('Cannot construct PartitionMap: input is null'); + } else if (typeof json !== 'object') { + throw new Error(`Cannot construct PartitionMap from input "${json}" of type ` + + typeof json); + } else if (!json.partitions || typeof json.partitions !== 'object') { + throw new Error('Cannot construct PartitionMap: field partitions must be ' + + `an object but was "${json.partitions}"`); + } else if (!json.overrides || typeof json.overrides !== 'object') { + throw new Error('Cannot construct PartitionMap: field overrides must be ' + + `an object but was "${json.overrides}"`); + } else if (!json.pool || !Array.isArray(json.pool)) { + throw new Error('Cannot construct PartitionMap: field pool must be ' + + `an array but was "${json.pool}"`); + } + return new PartitionMap(json.partitions, json.pool, json.overrides); } diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index d698b012..52a74bab 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -50,6 +50,7 @@ class PartitionModule { if (!this.partitionMapReloader) { const redisProvider = this.getRedisClientProvider(); this.partitionMapReloader = new RedisPartitionMapReloader( + this.partitionConfig, redisProvider.get(), // Client for GET partitionMap redisProvider.get()); // Subscribe client } diff --git a/src/partition/redispartitionmapreloader.js b/src/partition/redispartitionmapreloader.js index 114a94ba..a4eddc94 100644 --- a/src/partition/redispartitionmapreloader.js +++ b/src/partition/redispartitionmapreloader.js @@ -3,8 +3,9 @@ import logger from 'cytube-common/lib/logger'; import { EventEmitter } from 'events'; class RedisPartitionMapReloader extends EventEmitter { - constructor(redisClient, subClient) { + constructor(config, redisClient, subClient) { super(); + this.config = config; this.redisClient = redisClient; this.subClient = subClient; this.partitionMap = PartitionMap.empty(); @@ -13,20 +14,21 @@ class RedisPartitionMapReloader extends EventEmitter { } subscribe() { - this.subClient.subscribe('partitionMap'); + this.subClient.subscribe(this.config.getPublishChannel()); this.subClient.on('message', (channel, message) => { - if (channel !== 'partitionMap') { + if (channel !== this.config.getPublishChannel()) { logger.warn('RedisPartitionMapReloader received unexpected message ' + `on redis channel ${channel}`); return; } + logger.info(`Received partition map update message published at ${message}`); this.reload(); }); } reload() { - this.redisClient.getAsync('partitionMap').then(result => { + this.redisClient.getAsync(this.config.getPartitionMapKey()).then(result => { var newMap = null; try { newMap = PartitionMap.fromJSON(JSON.parse(result));