Merge pull request #632 from calzoneman/partition-refactor

Refactor partitioning a bit
This commit is contained in:
Calvin Montgomery 2016-10-25 20:21:34 -07:00 committed by GitHub
commit bfad626b2d
8 changed files with 277 additions and 35 deletions

View file

@ -44,6 +44,7 @@
"socket.io-redis": "^1.0.0", "socket.io-redis": "^1.0.0",
"source-map-support": "^0.4.0", "source-map-support": "^0.4.0",
"status-message-polyfill": "git://github.com/calzoneman/status-message-polyfill", "status-message-polyfill": "git://github.com/calzoneman/status-message-polyfill",
"toml": "^2.3.0",
"uuid": "^2.0.1", "uuid": "^2.0.1",
"yamljs": "^0.1.6" "yamljs": "^0.1.6"
}, },

View file

@ -0,0 +1,83 @@
import { PartitionModule } from './partitionmodule';
import { PartitionMap } from './partitionmap';
import fs from 'fs';
const partitionModule = new PartitionModule();
partitionModule.cliMode = true;
function savePartitionMap(filename) {
const reloader = partitionModule.getPartitionMapReloader();
reloader.once('partitionMapChange', map => {
var toml = 'pool = [\n';
map.getPool().forEach((poolEntry, i) => {
toml += ` '${poolEntry}'`;
if (i < map.getPool().length - 1) {
toml += ',';
}
toml += '\n';
});
toml += ']\n\n';
const partitions = map.getPartitions();
Object.keys(partitions).forEach(identity => {
partitions[identity].servers.forEach(serverDef => {
toml += `[[partitions.${identity}.servers]]\n`;
toml += `url = '${serverDef.url}'\n`;
toml += `secure = ${serverDef.secure}\n`;
toml += '\n';
});
});
toml += '[overrides]\n';
const overrides = map.getOverrides();
Object.keys(overrides).forEach(channel => {
toml += `${channel} = '${overrides[channel]}'\n`;
});
fs.writeFileSync(filename, toml);
console.log(`Wrote partition map to ${filename}`);
process.exit(0);
});
}
function loadPartitionMap(filename) {
var newMap;
try {
newMap = PartitionMap.fromFile(filename);
} catch (error) {
console.error(`Failed to load partition map from ${filename}: ${error}`);
console.error(error.stack);
process.exit(1);
}
const client = partitionModule.getRedisClientProvider().get();
const config = partitionModule.partitionConfig;
client.once('ready', () => {
client.multi()
.set(config.getPartitionMapKey(), JSON.stringify(newMap))
.publish(config.getPublishChannel(), new Date().toISOString())
.execAsync()
.then(result => {
console.log(`Result: ${result}`);
console.log(`Published new partition map from ${filename}`);
process.exit(0);
}).catch(error => {
console.error(`Failed to publish partition map: ${error}`);
console.error(error.stack);
process.exit(1);
});
});
}
if (process.argv[2] === 'save') {
savePartitionMap(process.argv[3]);
} else if (process.argv[2] === 'load') {
loadPartitionMap(process.argv[3]);
} else {
console.error('Usage: ' + process.argv[0] + ' ' + process.argv[1] + ' <load|save> <filename>');
console.error(' "save" downloads the partition map and saves it to the specified file');
console.error(' "load" loads the partition map from the specified file and publishes it');
process.exit(1);
}

View file

@ -3,18 +3,6 @@ class PartitionConfig {
this.config = config; this.config = config;
} }
getPartitionMap() {
return this.config.partitions;
}
getOverrideMap() {
return this.config.overrides;
}
getPool() {
return this.config.pool;
}
getIdentity() { getIdentity() {
return this.config.identity; return this.config.identity;
} }
@ -22,6 +10,14 @@ class PartitionConfig {
getRedisConfig() { getRedisConfig() {
return this.config.redis; return this.config.redis;
} }
getPublishChannel() {
return this.config.redis.publishChannel;
}
getPartitionMapKey() {
return this.config.redis.partitionMapKey;
}
} }
export { PartitionConfig }; export { PartitionConfig };

View file

@ -1,24 +1,26 @@
import { murmurHash1 } from '../util/murmur'; import { murmurHash1 } from '../util/murmur';
class PartitionDecider { class PartitionDecider {
constructor(config) { constructor(config, partitionMap) {
this.config = config; this.config = config;
this.partitionMap = partitionMap;
} }
getPartitionForChannel(channel) { getPartitionForChannel(channel) {
const partitionMap = this.config.getPartitionMap(); return this.partitionMap.getPartitions()[this.getPartitionIdentityForChannel(channel)];
return partitionMap[this.getPartitionIdentityForChannel(channel)];
} }
getPartitionIdentityForChannel(channel) { getPartitionIdentityForChannel(channel) {
channel = channel.toLowerCase(); channel = channel.toLowerCase();
const overrideMap = this.config.getOverrideMap(); const overrideMap = this.partitionMap.getOverrides();
if (overrideMap.hasOwnProperty(channel)) { if (overrideMap.hasOwnProperty(channel)) {
return overrideMap[channel]; return overrideMap[channel];
} else { } else if (this.partitionMap.getPool().length > 0) {
const pool = this.config.getPool(); const pool = this.partitionMap.getPool();
const i = murmurHash1(channel) % pool.length; const i = murmurHash1(channel) % pool.length;
return pool[i]; return pool[i];
} else {
return { servers: [] };
} }
} }
@ -26,6 +28,10 @@ class PartitionDecider {
return this.getPartitionIdentityForChannel(channel) === return this.getPartitionIdentityForChannel(channel) ===
this.config.getIdentity(); this.config.getIdentity();
} }
setPartitionMap(newMap) {
this.partitionMap = newMap;
}
} }
export { PartitionDecider }; export { PartitionDecider };

View file

@ -0,0 +1,83 @@
import crypto from 'crypto';
import fs from 'fs';
import toml from 'toml';
function sha256(input) {
var hash = crypto.createHash('sha256');
hash.update(input);
return hash.digest('base64');
}
class PartitionMap {
/**
* @param {Map<string, object>} partitions Map of node ids to io configs
* @param {Array<string>} pool List of available nodes
* @param {Map<string, string>} overrides Overrides for node assignment
*/
constructor(partitions, pool, overrides) {
this.partitions = partitions;
this.pool = pool;
this.overrides = overrides || {};
this._hash = sha256(JSON.stringify(this.partitions)
+ JSON.stringify(this.pool)
+ JSON.stringify(this.overrides));
}
getHash() {
return this._hash;
}
getPartitions() {
return this.partitions;
}
getPool() {
return this.pool;
}
getOverrides() {
return this.overrides;
}
toJSON() {
return {
partitions: this.partitions,
pool: this.pool,
overrides: this.overrides,
hash: this._hash
};
}
static fromJSON(json) {
if (json === null) {
throw new Error('Cannot construct PartitionMap: input is null');
} else if (typeof json !== 'object') {
throw new Error(`Cannot construct PartitionMap from input "${json}" of type `
+ typeof json);
} else if (!json.partitions || typeof json.partitions !== 'object') {
throw new Error('Cannot construct PartitionMap: field partitions must be '
+ `an object but was "${json.partitions}"`);
} else if (!json.overrides || typeof json.overrides !== 'object') {
throw new Error('Cannot construct PartitionMap: field overrides must be '
+ `an object but was "${json.overrides}"`);
} else if (!json.pool || !Array.isArray(json.pool)) {
throw new Error('Cannot construct PartitionMap: field pool must be '
+ `an array but was "${json.pool}"`);
}
return new PartitionMap(json.partitions, json.pool, json.overrides);
}
static fromFile(filename) {
const rawData = fs.readFileSync(filename).toString('utf8');
const parsed = toml.parse(rawData);
return PartitionMap.fromJSON(parsed);
}
static empty() {
return new PartitionMap({}, [], {});
}
}
export { PartitionMap };

View file

@ -7,6 +7,7 @@ import logger from 'cytube-common/lib/logger';
import LegacyConfig from '../config'; import LegacyConfig from '../config';
import path from 'path'; import path from 'path';
import { AnnouncementRefresher } from './announcementrefresher'; import { AnnouncementRefresher } from './announcementrefresher';
import { RedisPartitionMapReloader } from './redispartitionmapreloader';
const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf',
'partitions.toml'); 'partitions.toml');
@ -14,6 +15,7 @@ const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf',
class PartitionModule { class PartitionModule {
constructor() { constructor() {
this.initConfig(); this.initConfig();
this.cliMode = false;
} }
onReady() { onReady() {
@ -23,13 +25,13 @@ class PartitionModule {
initConfig() { initConfig() {
logger.initialize(null, null, LegacyConfig.get('debug')); logger.initialize(null, null, LegacyConfig.get('debug'));
try { try {
this.partitionConfig = this.loadPartitionMap(); this.partitionConfig = this.loadPartitionConfig();
} catch (error) { } catch (error) {
process.exit(1); process.exit(1);
} }
} }
loadPartitionMap() { loadPartitionConfig() {
try { try {
return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH); return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH);
} catch (error) { } catch (error) {
@ -44,9 +46,29 @@ class PartitionModule {
} }
} }
getPartitionMapReloader() {
if (!this.partitionMapReloader) {
const redisProvider = this.getRedisClientProvider();
this.partitionMapReloader = new RedisPartitionMapReloader(
this.partitionConfig,
redisProvider.get(), // Client for GET partitionMap
redisProvider.get()); // Subscribe client
}
return this.partitionMapReloader;
}
getPartitionDecider() { getPartitionDecider() {
if (!this.partitionDecider) { if (!this.partitionDecider) {
this.partitionDecider = new PartitionDecider(this.partitionConfig); const reloader = this.getPartitionMapReloader();
this.partitionDecider = new PartitionDecider(this.partitionConfig,
reloader.getPartitionMap());
reloader.on('partitionMapChange', newMap => {
this.partitionDecider.setPartitionMap(newMap);
if (!this.cliMode) {
require('../server').getServer().handlePartitionMapChange();
}
});
} }
return this.partitionDecider; return this.partitionDecider;

View file

@ -0,0 +1,56 @@
import { PartitionMap } from './partitionmap';
import logger from 'cytube-common/lib/logger';
import { EventEmitter } from 'events';
class RedisPartitionMapReloader extends EventEmitter {
constructor(config, redisClient, subClient) {
super();
this.config = config;
this.redisClient = redisClient;
this.subClient = subClient;
this.partitionMap = PartitionMap.empty();
redisClient.once('ready', () => this.reload());
subClient.once('ready', () => this.subscribe());
}
subscribe() {
this.subClient.subscribe(this.config.getPublishChannel());
this.subClient.on('message', (channel, message) => {
if (channel !== this.config.getPublishChannel()) {
logger.warn('RedisPartitionMapReloader received unexpected message '
+ `on redis channel ${channel}`);
return;
}
logger.info(`Received partition map update message published at ${message}`);
this.reload();
});
}
reload() {
this.redisClient.getAsync(this.config.getPartitionMapKey()).then(result => {
var newMap = null;
try {
newMap = PartitionMap.fromJSON(JSON.parse(result));
} catch (error) {
logger.error(`Failed to decode received partition map: ${error}`,
{ payload: result });
return;
}
if (this.partitionMap.getHash() !== newMap.getHash()) {
logger.info(`Partition map changed (hash=${newMap.getHash()})`);
this.partitionMap = newMap;
this.emit('partitionMapChange', newMap);
}
}).catch(error => {
logger.error(`Failed to retrieve partition map from redis: ${error}`);
});
}
getPartitionMap() {
return this.partitionMap;
}
}
export { RedisPartitionMapReloader };

View file

@ -333,20 +333,7 @@ Server.prototype.shutdown = function () {
}); });
}; };
Server.prototype.reloadPartitionMap = function () { Server.prototype.handlePartitionMapChange = function () {
if (!Config.get("enable-partition")) {
return;
}
var config;
try {
config = this.initModule.loadPartitionMap();
} catch (error) {
return;
}
this.initModule.partitionConfig.config = config.config;
const channels = Array.prototype.slice.call(this.channels); const channels = Array.prototype.slice.call(this.channels);
Promise.map(channels, channel => { Promise.map(channels, channel => {
if (channel.dead) { if (channel.dead) {
@ -375,3 +362,11 @@ Server.prototype.reloadPartitionMap = function () {
Logger.syslog.log("Partition reload complete"); Logger.syslog.log("Partition reload complete");
}); });
}; };
Server.prototype.reloadPartitionMap = function () {
if (!Config.get("enable-partition")) {
return;
}
this.initModule.getPartitionMapReloader().reload();
};