diff --git a/src/io/cluster/partitionclusterclient.js b/src/io/cluster/partitionclusterclient.js new file mode 100644 index 00000000..9d5324ef --- /dev/null +++ b/src/io/cluster/partitionclusterclient.js @@ -0,0 +1,15 @@ +import Promise from 'bluebird'; + +class PartitionClusterClient { + constructor(partitionDecider) { + this.partitionDecider = partitionDecider; + } + + getSocketConfig(channel) { + return Promise.resolve({ + servers: this.partitionDecider.getPartitionForChannel(channel) + }); + } +} + +export { PartitionClusterClient }; diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js new file mode 100644 index 00000000..7684b95e --- /dev/null +++ b/src/partition/partitionconfig.js @@ -0,0 +1,23 @@ +class PartitionConfig { + constructor(config) { + this.config = config; + } + + getPartitionMap() { + return this.config.partitions; + } + + getOverrideMap() { + return this.config.overrides; + } + + getPool() { + return this.config.pool; + } + + getIdentity() { + return this.config.identity; + } +} + +export { PartitionConfig }; diff --git a/src/partition/partitiondecider.js b/src/partition/partitiondecider.js new file mode 100644 index 00000000..425d6207 --- /dev/null +++ b/src/partition/partitiondecider.js @@ -0,0 +1,29 @@ +import { murmurHash1 } from '../util/murmur'; + +class PartitionDecider { + constructor(config) { + this.identity = config.getIdentity(); + this.partitionMap = config.getPartitionMap(); + this.pool = config.getPool(); + this.overrideMap = config.getOverrideMap(); + } + + getPartitionForChannel(channel) { + return this.partitionMap[this.getPartitionIdentityForChannel(channel)]; + } + + getPartitionIdentityForChannel(channel) { + if (this.overrideMap.hasOwnProperty(channel)) { + return this.overrideMap[channel]; + } else { + const i = murmurHash1(channel) % this.pool.length; + return this.pool[i]; + } + } + + isChannelOnThisPartition(channel) { + return this.getPartitionIdentityForChannel(channel) === this.identity; + } +} + +export { PartitionDecider }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js new file mode 100644 index 00000000..4c1adb2f --- /dev/null +++ b/src/partition/partitionmodule.js @@ -0,0 +1,63 @@ +import { loadFromToml } from 'cytube-common/lib/configuration/configloader'; +import { PartitionConfig } from './partitionconfig'; +import { PartitionDecider } from './partitiondecider'; +import { PartitionClusterClient } from '../io/cluster/partitionclusterclient'; +import logger from 'cytube-common/lib/logger'; +import LegacyConfig from '../config'; +import path from 'path'; + +const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', + 'partitions.toml'); + +class PartitionModule { + constructor() { + this.initConfig(); + } + + onReady() { + + } + + initConfig() { + logger.initialize(null, null, LegacyConfig.get('debug')); + try { + this.partitionConfig = this.loadPartitionMap(); + } catch (error) { + process.exit(1); + } + } + + loadPartitionMap() { + try { + return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH); + } catch (error) { + if (typeof error.line !== 'undefined') { + logger.error(`Error in ${PARTITION_CONFIG_PATH}: ${error} ` + + `(line ${error.line})`); + } else { + logger.error(`Error loading ${PARTITION_CONFIG_PATH}: ` + + `${error.stack}`); + } + throw error; + } + } + + getPartitionDecider() { + if (!this.partitionDecider) { + this.partitionDecider = new PartitionDecider(this.partitionConfig); + } + + return this.partitionDecider; + } + + getClusterClient() { + if (!this.partitionClusterClient) { + this.partitionClusterClient = new PartitionClusterClient( + this.getPartitionDecider()); + } + + return this.partitionClusterClient; + } +} + +export { PartitionModule }; diff --git a/src/server.js b/src/server.js index 91ac7a2f..a131e176 100644 --- a/src/server.js +++ b/src/server.js @@ -48,6 +48,7 @@ import WebConfiguration from './configuration/webconfig'; import NullClusterClient from './io/cluster/nullclusterclient'; import session from './session'; import { LegacyModule } from './legacymodule'; +import { PartitionModule } from './partition/partitionmodule'; import * as Switches from './switches'; var Server = function () { @@ -68,6 +69,8 @@ var Server = function () { } const BackendModule = require('./backend/backendmodule').BackendModule; initModule = new BackendModule(); + } else if (Config.get('enable-partition')) { + initModule = new PartitionModule(); } else { initModule = new LegacyModule(); } diff --git a/src/util/murmur.js b/src/util/murmur.js new file mode 100644 index 00000000..60416891 --- /dev/null +++ b/src/util/murmur.js @@ -0,0 +1,35 @@ +const SEED = 0x1234; +const M = 0xc6a4a793; +const R = 16; + +export function murmurHash1(str) { + const buffer = new Buffer(str, 'utf8'); + var length = buffer.length; + var h = SEED ^ (length * M); + + while (length >= 4) { + var k = buffer.readUInt32LE(buffer.length - length); + h += k; + h *= M; + h ^= h >> 16; + length -= 4; + } + + switch (length) { + case 3: + h += buffer[buffer.length - 3] >> 16; + case 2: + h += buffer[buffer.length - 2] >> 8; + case 1: + h += buffer[buffer.length - 1]; + h *= M; + h ^= h >> R; + } + + h *= M; + h ^= h >> 10; + h *= M; + h ^= h >> 17; + + return h; +}