Add redis-based channel index
This commit is contained in:
parent
5b9948f709
commit
b6bb0aa56d
|
@ -44,9 +44,9 @@
|
|||
},
|
||||
"scripts": {
|
||||
"build-player": "$npm_node_execpath build-player.js",
|
||||
"build-server": "babel --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/",
|
||||
"build-server": "babel -D --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/",
|
||||
"postinstall": "./postinstall.sh",
|
||||
"server-dev": "babel --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/"
|
||||
"server-dev": "babel -D --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/"
|
||||
},
|
||||
"devDependencies": {
|
||||
"coffee-script": "^1.9.2"
|
||||
|
|
62
src/partition/partitionchannelindex.js
Normal file
62
src/partition/partitionchannelindex.js
Normal file
|
@ -0,0 +1,62 @@
|
|||
import Promise from 'bluebird';
|
||||
import uuid from 'uuid';
|
||||
import { runLuaScript } from 'cytube-common/lib/redis/lualoader';
|
||||
import path from 'path';
|
||||
import Logger from '../logger';
|
||||
|
||||
var SERVER = null;
|
||||
const CHANNEL_INDEX = 'publicChannelList';
|
||||
const CACHE_REFRESH_INTERVAL = 30 * 1000;
|
||||
const CACHE_EXPIRE_DELAY = 40 * 1000;
|
||||
const READ_CHANNEL_LIST = path.join(__dirname, 'read_channel_list.lua')
|
||||
|
||||
class PartitionChannelIndex {
|
||||
constructor(redisClient) {
|
||||
this.redisClient = redisClient;
|
||||
this.uid = uuid.v4();
|
||||
this.cachedList = [];
|
||||
process.nextTick(() => {
|
||||
SERVER = require('../server').getServer();
|
||||
this.refreshCache();
|
||||
setInterval(this.refreshCache.bind(this), CACHE_REFRESH_INTERVAL);
|
||||
});
|
||||
}
|
||||
|
||||
refreshCache() {
|
||||
this.publishLocalChannels();
|
||||
runLuaScript(this.redisClient, READ_CHANNEL_LIST, [
|
||||
0,
|
||||
Date.now() - CACHE_EXPIRE_DELAY
|
||||
]).then(result => {
|
||||
this.cachedList = JSON.parse(result);
|
||||
}).catch(error => {
|
||||
Logger.errlog.log(`Failed to refresh channel list: ${error.stack}`);
|
||||
});
|
||||
}
|
||||
|
||||
publishLocalChannels() {
|
||||
const channels = SERVER.packChannelList(true).map(channel => {
|
||||
return {
|
||||
name: channel.name,
|
||||
mediatitle: channel.mediatitle,
|
||||
pagetitle: channel.pagetitle,
|
||||
usercount: channel.usercount
|
||||
};
|
||||
});
|
||||
|
||||
const entry = JSON.stringify({
|
||||
timestamp: Date.now(),
|
||||
channels
|
||||
});
|
||||
|
||||
this.redisClient.hsetAsync(CHANNEL_INDEX, this.uid, entry).catch(error => {
|
||||
Logger.errlog.log(`Failed to publish local channel list: ${error.stack}`);
|
||||
});
|
||||
}
|
||||
|
||||
listPublicChannels() {
|
||||
return Promise.resolve(this.cachedList);
|
||||
}
|
||||
}
|
||||
|
||||
export { PartitionChannelIndex };
|
|
@ -18,6 +18,10 @@ class PartitionConfig {
|
|||
getIdentity() {
|
||||
return this.config.identity;
|
||||
}
|
||||
|
||||
getRedisConfig() {
|
||||
return this.config.redis;
|
||||
}
|
||||
}
|
||||
|
||||
export { PartitionConfig };
|
||||
|
|
|
@ -2,6 +2,7 @@ import { loadFromToml } from 'cytube-common/lib/configuration/configloader';
|
|||
import { PartitionConfig } from './partitionconfig';
|
||||
import { PartitionDecider } from './partitiondecider';
|
||||
import { PartitionClusterClient } from '../io/cluster/partitionclusterclient';
|
||||
import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider';
|
||||
import logger from 'cytube-common/lib/logger';
|
||||
import LegacyConfig from '../config';
|
||||
import path from 'path';
|
||||
|
@ -58,6 +59,16 @@ class PartitionModule {
|
|||
|
||||
return this.partitionClusterClient;
|
||||
}
|
||||
|
||||
getRedisClientProvider() {
|
||||
if (!this.redisClientProvider) {
|
||||
this.redisClientProvider = new RedisClientProvider(
|
||||
this.partitionConfig.getRedisConfig()
|
||||
);
|
||||
}
|
||||
|
||||
return this.redisClientProvider;
|
||||
}
|
||||
}
|
||||
|
||||
export { PartitionModule };
|
||||
|
|
30
src/partition/read_channel_list.lua
Normal file
30
src/partition/read_channel_list.lua
Normal file
|
@ -0,0 +1,30 @@
|
|||
local entries = redis.call('hgetall', 'publicChannelList')
|
||||
if #entries == 0 then
|
||||
return '[]'
|
||||
end
|
||||
|
||||
local channelList = {}
|
||||
-- ARGV[1] holds the expiration timestamp. Anything older than this
|
||||
-- will be discarded.
|
||||
local expiration = tonumber(ARGV[1])
|
||||
for i = 1, #entries, 2 do
|
||||
local uid = entries[i]
|
||||
local entry = cjson.decode(entries[i+1])
|
||||
local timestamp = tonumber(entry['timestamp'])
|
||||
if timestamp < expiration then
|
||||
redis.call('hdel', 'publicChannelList', uid)
|
||||
else
|
||||
local channels = entry['channels']
|
||||
for j = 1, #channels do
|
||||
channelList[#channelList+1] = channels[j]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
-- Necessary to check for this condition because
|
||||
-- if the table is empty, cjson will encode it as an object ('{}')
|
||||
if #channelList == 0 then
|
||||
return '[]'
|
||||
else
|
||||
return cjson.encode(channelList)
|
||||
end
|
|
@ -43,6 +43,7 @@ var db = require("./database");
|
|||
var Flags = require("./flags");
|
||||
var sio = require("socket.io");
|
||||
import LocalChannelIndex from './web/localchannelindex';
|
||||
import { PartitionChannelIndex } from './partition/partitionchannelindex';
|
||||
import IOConfiguration from './configuration/ioconfig';
|
||||
import WebConfiguration from './configuration/webconfig';
|
||||
import NullClusterClient from './io/cluster/nullclusterclient';
|
||||
|
@ -86,7 +87,14 @@ var Server = function () {
|
|||
const ioConfig = IOConfiguration.fromOldConfig(Config);
|
||||
const webConfig = WebConfiguration.fromOldConfig(Config);
|
||||
const clusterClient = initModule.getClusterClient();
|
||||
const channelIndex = new LocalChannelIndex();
|
||||
var channelIndex;
|
||||
if (Config.get("enable-partition")) {
|
||||
channelIndex = new PartitionChannelIndex(
|
||||
initModule.getRedisClientProvider().get()
|
||||
);
|
||||
} else {
|
||||
channelIndex = new LocalChannelIndex();
|
||||
}
|
||||
self.express = express();
|
||||
require("./web/webserver").init(self.express,
|
||||
webConfig,
|
||||
|
|
Loading…
Reference in a new issue