Merge pull request #507 from calzoneman/channel-store

Refactor channel storage to allow database store
This commit is contained in:
Calvin Montgomery 2015-10-04 23:22:05 -07:00
commit 9f4461a779
15 changed files with 484 additions and 105 deletions

19
NEWS.md
View file

@ -1,3 +1,22 @@
2015-10-04
==========
* The channel data storage system has been refactored a bit. For
compatibility, the default remains to store JSON objects for each channel in
the `chandump` folder, however there is now also the option of storing
channel data in the database. You can take advantage of this by setting
`channel-storage: type: 'database'` in your `config.yaml`.
- In order to migrate existing channel data from the `chandump` files to the
database, run `node lib/channel-storage/migrate.js`.
* The database storage method uses foreign keys to associate the channel data
with the corresponding row in the `channels` table. This requires that the
tables be stored using the InnoDB engine rather than MyISAM. If your CyTube
tables defaulted to MyISAM, you can fix them by running
```sql
ALTER TABLE `channels` ENGINE = InnoDB;
```
2015-09-21 2015-09-21
========== ==========

View file

@ -219,3 +219,13 @@ setuid:
user: 'user' user: 'user'
# how long to wait in ms before changing uid/gid # how long to wait in ms before changing uid/gid
timeout: 15 timeout: 15
# Determines channel data storage mechanism.
# Defaults to 'file', in which channel data is JSON stringified and saved to a file
# in the `chandump/` folder. This is the legacy behavior of CyTube.
# The other possible option is 'database', in which case each key-value pair of
# channel data is stored as a row in the `channel_data` database table.
# To migrate legacy chandump files to the database, shut down CyTube (to prevent
# concurrent updates), then run `node lib/channel-storage/migrate.js`.
channel-storage:
type: 'file'

View file

@ -2,7 +2,7 @@
"author": "Calvin Montgomery", "author": "Calvin Montgomery",
"name": "CyTube", "name": "CyTube",
"description": "Online media synchronizer and chat", "description": "Online media synchronizer and chat",
"version": "3.10.0", "version": "3.11.0",
"repository": { "repository": {
"url": "http://github.com/calzoneman/sync" "url": "http://github.com/calzoneman/sync"
}, },
@ -10,10 +10,12 @@
"dependencies": { "dependencies": {
"babel": "^5.8.23", "babel": "^5.8.23",
"bcrypt": "^0.8.5", "bcrypt": "^0.8.5",
"bluebird": "^2.10.1",
"body-parser": "^1.14.0", "body-parser": "^1.14.0",
"cheerio": "^0.19.0", "cheerio": "^0.19.0",
"compression": "^1.5.2", "compression": "^1.5.2",
"cookie-parser": "^1.4.0", "cookie-parser": "^1.4.0",
"create-error": "^0.3.1",
"csrf": "^3.0.0", "csrf": "^3.0.0",
"cytube-mediaquery": "git://github.com/CyTube/mediaquery", "cytube-mediaquery": "git://github.com/CyTube/mediaquery",
"cytubefilters": "git://github.com/calzoneman/cytubefilters#095b7956", "cytubefilters": "git://github.com/calzoneman/cytubefilters#095b7956",
@ -37,9 +39,9 @@
}, },
"scripts": { "scripts": {
"build-player": "$npm_node_execpath build-player.js", "build-player": "$npm_node_execpath build-player.js",
"build-server": "babel --source-maps --out-dir lib/ src/", "build-server": "babel --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/",
"postinstall": "./postinstall.sh", "postinstall": "./postinstall.sh",
"server-dev": "babel --watch --source-maps --out-dir lib/ src/" "server-dev": "babel --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/"
}, },
"devDependencies": { "devDependencies": {
"coffee-script": "^1.9.2" "coffee-script": "^1.9.2"

View file

@ -64,7 +64,9 @@ function initChannelDumper(Server) {
for (var i = 0; i < Server.channels.length; i++) { for (var i = 0; i < Server.channels.length; i++) {
var chan = Server.channels[i]; var chan = Server.channels[i];
if (!chan.dead && chan.users && chan.users.length > 0) { if (!chan.dead && chan.users && chan.users.length > 0) {
chan.saveState(); chan.saveState().catch(err => {
Logger.errlog.log(`Failed to save /r/${chan.name}: ${err.stack}`);
});
} }
} }
}, CHANNEL_SAVE_INTERVAL); }, CHANNEL_SAVE_INTERVAL);

View file

@ -0,0 +1,36 @@
import { FileStore } from './filestore';
import { DatabaseStore } from './dbstore';
import Config from '../config';
import Promise from 'bluebird';
var CHANNEL_STORE = null;
export function init() {
CHANNEL_STORE = loadChannelStore();
}
export function load(channelName) {
if (CHANNEL_STORE === null) {
return Promise.reject(new Error('ChannelStore not initialized yet'));
}
return CHANNEL_STORE.load(channelName);
}
export function save(channelName, data) {
if (CHANNEL_STORE === null) {
return Promise.reject(new Error('ChannelStore not initialized yet'));
}
return CHANNEL_STORE.save(channelName, data);
}
function loadChannelStore() {
switch (Config.get('channel-storage.type')) {
case 'database':
return new DatabaseStore();
case 'file':
default:
return new FileStore();
}
}

View file

@ -0,0 +1,91 @@
import Promise from 'bluebird';
import { ChannelStateSizeError,
ChannelNotFoundError } from '../errors';
import db from '../database';
import Logger from '../logger';
const SIZE_LIMIT = 1048576;
const QUERY_CHANNEL_ID_FOR_NAME = 'SELECT id FROM channels WHERE name = ?';
const QUERY_CHANNEL_DATA = 'SELECT `key`, `value` FROM channel_data WHERE channel_id = ?';
function queryAsync(query, substitutions) {
return new Promise((resolve, reject) => {
db.query(query, substitutions, (err, res) => {
if (err) {
if (!(err instanceof Error)) {
err = new Error(err);
}
reject(err);
} else {
resolve(res);
}
});
});
}
function buildUpdateQuery(numEntries) {
const values = [];
for (let i = 0; i < numEntries; i++) {
values.push('(?, ?, ?)');
}
return `INSERT INTO channel_data VALUES ${values.join(', ')} ` +
'ON DUPLICATE KEY UPDATE `value` = VALUES(`value`)';
}
export class DatabaseStore {
load(channelName) {
return queryAsync(QUERY_CHANNEL_ID_FOR_NAME, [channelName]).then((rows) => {
if (rows.length === 0) {
throw new ChannelNotFoundError(`Channel does not exist: "${channelName}"`);
}
return queryAsync(QUERY_CHANNEL_DATA, [rows[0].id]);
}).then(rows => {
const data = {};
rows.forEach(row => {
try {
data[row.key] = JSON.parse(row.value);
} catch (e) {
Logger.errlog.log(`Channel data for channel "${channelName}", ` +
`key "${row.key}" is invalid: ${e}`);
}
});
return data;
});
}
save(channelName, data) {
return queryAsync(QUERY_CHANNEL_ID_FOR_NAME, [channelName]).then((rows) => {
if (rows.length === 0) {
throw new ChannelNotFoundError(`Channel does not exist: "${channelName}"`);
}
let totalSize = 0;
let rowCount = 0;
const id = rows[0].id;
const substitutions = [];
for (const key in data) {
if (typeof data[key] === 'undefined') {
continue;
}
rowCount++;
const value = JSON.stringify(data[key]);
totalSize += value.length;
substitutions.push(id);
substitutions.push(key);
substitutions.push(value);
}
if (totalSize > SIZE_LIMIT) {
throw new ChannelStateSizeError('Channel state size is too large', {
limit: SIZE_LIMIT,
actual: totalSize
});
}
return queryAsync(buildUpdateQuery(rowCount), substitutions);
});
}
}

View file

@ -0,0 +1,56 @@
import * as Promise from 'bluebird';
import { stat } from 'fs';
import * as fs from 'graceful-fs';
import path from 'path';
import { ChannelStateSizeError } from '../errors';
const readFileAsync = Promise.promisify(fs.readFile);
const writeFileAsync = Promise.promisify(fs.writeFile);
const readdirAsync = Promise.promisify(fs.readdir);
const statAsync = Promise.promisify(stat);
const SIZE_LIMIT = 1048576;
const CHANDUMP_DIR = path.resolve(__dirname, '..', '..', 'chandump');
export class FileStore {
filenameForChannel(channelName) {
return path.join(CHANDUMP_DIR, channelName);
}
load(channelName) {
const filename = this.filenameForChannel(channelName);
return statAsync(filename).then(stats => {
if (stats.size > SIZE_LIMIT) {
throw new ChannelStateSizeError('Channel state file is too large', {
limit: SIZE_LIMIT,
actual: stats.size
});
} else {
return readFileAsync(filename);
}
}).then(fileContents => {
try {
return JSON.parse(fileContents);
} catch (e) {
throw new Error('Channel state file is not valid JSON: ' + e);
}
});
}
save(channelName, data) {
const filename = this.filenameForChannel(channelName);
const fileContents = new Buffer(JSON.stringify(data), 'utf8');
if (fileContents.length > SIZE_LIMIT) {
return Promise.reject(new ChannelStateSizeError(
'Channel state size is too large', {
limit: SIZE_LIMIT,
actual: fileContents.length
}));
}
return writeFileAsync(filename, fileContents);
}
listChannels() {
return readdirAsync(CHANDUMP_DIR);
}
}

View file

@ -0,0 +1,164 @@
import Config from '../config';
import Promise from 'bluebird';
import db from '../database';
import { FileStore } from './filestore';
import { DatabaseStore } from './dbstore';
import { sanitizeHTML } from '../xss';
import { ChannelNotFoundError } from '../errors';
const QUERY_CHANNEL_NAMES = 'SELECT name FROM channels WHERE 1';
const EXPECTED_KEYS = [
'chatbuffer',
'chatmuted',
'css',
'emotes',
'filters',
'js',
'motd',
'openPlaylist',
'opts',
'permissions',
'playlist',
'poll'
];
function queryAsync(query, substitutions) {
return new Promise((resolve, reject) => {
db.query(query, substitutions, (err, res) => {
if (err) {
if (!(err instanceof Error)) {
err = new Error(err);
}
reject(err);
} else {
resolve(res);
}
});
});
}
function fixOldChandump(data) {
const converted = {};
EXPECTED_KEYS.forEach(key => {
converted[key] = data[key];
});
if (data.queue) {
converted.playlist = {
pl: data.queue.map(item => {
return {
media: {
id: item.id,
title: item.title,
seconds: item.seconds,
duration: item.duration,
type: item.type,
meta: {}
},
queueby: item.queueby,
temp: item.temp
};
}),
pos: data.position,
time: data.currentTime
};
}
if (data.hasOwnProperty('openqueue')) {
converted.openPlaylist = data.openqueue;
}
if (data.hasOwnProperty('playlistLock')) {
converted.openPlaylist = !data.playlistLock;
}
if (data.chatbuffer) {
converted.chatbuffer = data.chatbuffer.map(entry => {
return {
username: entry.username,
msg: entry.msg,
meta: entry.meta || {
addClass: entry.msgclass ? entry.msgclass : undefined
},
time: entry.time
};
});
}
if (data.motd && data.motd.motd) {
converted.motd = sanitizeHTML(data.motd.motd).replace(/\n/g, '<br>\n');
}
if (data.opts && data.opts.customcss) {
converted.opts.externalcss = data.opts.customcss;
}
if (data.opts && data.opts.customjs) {
converted.opts.externaljs = data.opts.customjs;
}
if (data.filters && data.filters.length > 0 && Array.isArray(data.filters[0])) {
converted.filters = data.filters.map(filter => {
let [source, replace, active] = filter;
return {
source: source,
replace: replace,
flags: 'g',
active: active,
filterlinks: false
};
});
}
return converted;
}
function migrate(src, dest) {
return src.listChannels().then(names => {
return Promise.reduce(names, (_, name) => {
// A long time ago there was a bug where CyTube would save a different
// chandump depending on the capitalization of the channel name in the URL.
// This was fixed, but there are still some really old chandumps with
// uppercase letters in the name.
//
// If another chandump exists which is all lowercase, then that one is
// canonical. Otherwise, it's safe to load the existing capitalization,
// convert it, and save.
if (name !== name.toLowerCase()) {
if (names.indexOf(name.toLowerCase()) >= 0) {
return Promise.resolve();
}
}
return src.load(name).then(data => {
data = fixOldChandump(data);
return dest.save(name, data);
}).then(() => {
console.log(`Migrated /r/${name}`);
}).catch(ChannelNotFoundError, err => {
console.log(`Skipping /r/${name} (not present in the database)`);
}).catch(err => {
console.error(`Failed to migrate /r/${name}: ${err.stack}`);
});
});
});
}
function main() {
Config.load('config.yaml');
db.init();
const src = new FileStore();
const dest = new DatabaseStore();
Promise.delay(1000).then(() => {
return migrate(src, dest);
}).then(() => {
console.log('Migration complete');
process.exit(0);
}).catch(err => {
console.error(`Migration failed: ${err.stack}`);
process.exit(1);
});
}
main();

View file

@ -8,8 +8,9 @@ var fs = require("graceful-fs");
var path = require("path"); var path = require("path");
var sio = require("socket.io"); var sio = require("socket.io");
var db = require("../database"); var db = require("../database");
import * as ChannelStore from '../channel-storage/channelstore';
const SIZE_LIMIT = 1048576; import { ChannelStateSizeError } from '../errors';
import Promise from 'bluebird';
/** /**
* Previously, async channel functions were riddled with race conditions due to * Previously, async channel functions were riddled with race conditions due to
@ -150,17 +151,15 @@ Channel.prototype.getDiskSize = function (cb) {
}; };
Channel.prototype.loadState = function () { Channel.prototype.loadState = function () {
var self = this;
var file = path.join(__dirname, "..", "..", "chandump", self.uniqueName);
/* Don't load from disk if not registered */ /* Don't load from disk if not registered */
if (!self.is(Flags.C_REGISTERED)) { if (!this.is(Flags.C_REGISTERED)) {
self.modules.permissions.loadUnregistered(); this.modules.permissions.loadUnregistered();
self.setFlag(Flags.C_READY); this.setFlag(Flags.C_READY);
return; return;
} }
var errorLoad = function (msg) { const self = this;
function errorLoad(msg) {
if (self.modules.customization) { if (self.modules.customization) {
self.modules.customization.load({ self.modules.customization.load({
motd: msg motd: msg
@ -168,100 +167,69 @@ Channel.prototype.loadState = function () {
} }
self.setFlag(Flags.C_READY | Flags.C_ERROR); self.setFlag(Flags.C_READY | Flags.C_ERROR);
}; }
fs.stat(file, function (err, stats) { ChannelStore.load(this.uniqueName).then(data => {
if (!err) { Object.keys(this.modules).forEach(m => {
var mb = stats.size / 1048576;
mb = Math.floor(mb * 100) / 100;
if (mb > SIZE_LIMIT / 1048576) {
Logger.errlog.log("Large chandump detected: " + self.uniqueName +
" (" + mb + " MiB)");
var msg = "This channel's state size has exceeded the memory limit " +
"enforced by this server. Please contact an administrator " +
"for assistance.";
errorLoad(msg);
return;
}
}
continueLoad();
});
var continueLoad = function () {
fs.readFile(file, function (err, data) {
if (err) {
/* ENOENT means the file didn't exist. This is normal for new channels */
if (err.code === "ENOENT") {
self.setFlag(Flags.C_READY);
Object.keys(self.modules).forEach(function (m) {
self.modules[m].load({});
});
} else {
Logger.errlog.log("Failed to open channel dump " + self.uniqueName);
Logger.errlog.log(err);
errorLoad("Unknown error occurred when loading channel state. " +
"Contact an administrator for assistance.");
}
return;
}
self.logger.log("[init] Loading channel state from disk");
try { try {
data = JSON.parse(data); this.modules[m].load(data);
Object.keys(self.modules).forEach(function (m) {
self.modules[m].load(data);
});
self.setFlag(Flags.C_READY);
} catch (e) { } catch (e) {
Logger.errlog.log("Channel dump for " + self.uniqueName + " is not " + Logger.errlog.log("Failed to load module " + m + " for channel " +
"valid"); this.uniqueName);
Logger.errlog.log(e);
errorLoad("Unknown error occurred when loading channel state. Contact " +
"an administrator for assistance.");
} }
}); });
}; this.setFlag(Flags.C_READY);
}).catch(ChannelStateSizeError, err => {
const message = "This channel's state size has exceeded the memory limit " +
"enforced by this server. Please contact an administrator " +
"for assistance.";
Logger.errlog.log(err.stack);
errorLoad(message);
}).catch(err => {
if (err.code === 'ENOENT') {
Object.keys(this.modules).forEach(m => {
this.modules[m].load({});
});
this.setFlag(Flags.C_READY);
return;
} else {
const message = "An error occurred when loading this channel's data from " +
"disk. Please contact an administrator for assistance. " +
`The error was: ${err}`;
Logger.errlog.log(err.stack);
errorLoad(message);
}
});
}; };
Channel.prototype.saveState = function () { Channel.prototype.saveState = function () {
var self = this; if (!this.is(Flags.C_REGISTERED)) {
var file = path.join(__dirname, "..", "..", "chandump", self.uniqueName); return Promise.resolve();
/**
* Don't overwrite saved state data if the current state is dirty,
* or if this channel is unregistered
*/
if (self.is(Flags.C_ERROR) || !self.is(Flags.C_REGISTERED)) {
return;
} }
self.logger.log("[init] Saving channel state to disk"); if (this.is(Flags.C_ERROR)) {
var data = {}; return Promise.reject(new Error(`Channel is in error state`));
Object.keys(this.modules).forEach(function (m) { }
self.modules[m].save(data);
this.logger.log("[init] Saving channel state to disk");
const data = {};
Object.keys(this.modules).forEach(m => {
this.modules[m].save(data);
}); });
var json = JSON.stringify(data); return ChannelStore.save(this.uniqueName, data).catch(ChannelStateSizeError, err => {
/** this.users.forEach(u => {
* Synchronous on purpose. if (u.account.effectiveRank >= 2) {
* When the server is shutting down, saveState() is called on all channels and u.socket.emit("warnLargeChandump", {
* then the process terminates. Async writeFile causes a race condition that wipes limit: err.limit,
* channels. actual: err.actual
*/ });
var err = fs.writeFileSync(file, json); }
});
// Check for large chandump and warn moderators/admins throw err;
self.getDiskSize(function (err, size) {
if (!err && size > SIZE_LIMIT && self.users) {
self.users.forEach(function (u) {
if (u.account.effectiveRank >= 2) {
u.socket.emit("warnLargeChandump", {
limit: SIZE_LIMIT,
actual: size
});
}
});
}
}); });
}; };

View file

@ -110,6 +110,9 @@ var defaults = {
"user": "nobody", "user": "nobody",
"timeout": 15 "timeout": 15
}, },
"channel-storage": {
type: "file"
}
}; };
/** /**

View file

@ -556,6 +556,11 @@ module.exports.listStats = function (callback) {
/* Misc */ /* Misc */
module.exports.loadAnnouncement = function () { module.exports.loadAnnouncement = function () {
// Temporary workaround
if (!Server.getServer || !Server.getServer()) {
return;
}
var query = "SELECT * FROM `meta` WHERE `key`='announcement'"; var query = "SELECT * FROM `meta` WHERE `key`='announcement'";
module.exports.query(query, function (err, rows) { module.exports.query(query, function (err, rows) {
if (err) { if (err) {

View file

@ -104,6 +104,15 @@ const TBL_BANS = "" +
"INDEX (`ip`, `channel`), INDEX (`name`, `channel`)" + "INDEX (`ip`, `channel`), INDEX (`name`, `channel`)" +
") CHARACTER SET utf8"; ") CHARACTER SET utf8";
const TBL_CHANNEL_DATA = "" +
"CREATE TABLE IF NOT EXISTS `channel_data` (" +
"`channel_id` INT NOT NULL," +
"`key` VARCHAR(20) NOT NULL," +
"`value` MEDIUMTEXT CHARACTER SET utf8mb4 NOT NULL," +
"PRIMARY KEY (`channel_id`, `key`)," +
"FOREIGN KEY (`channel_id`) REFERENCES `channels`(`id`) ON DELETE CASCADE" +
") CHARACTER SET utf8";
module.exports.init = function (queryfn, cb) { module.exports.init = function (queryfn, cb) {
var tables = { var tables = {
users: TBL_USERS, users: TBL_USERS,
@ -116,7 +125,8 @@ module.exports.init = function (queryfn, cb) {
user_playlists: TBL_USER_PLAYLISTS, user_playlists: TBL_USER_PLAYLISTS,
aliases: TBL_ALIASES, aliases: TBL_ALIASES,
stats: TBL_STATS, stats: TBL_STATS,
meta: TBL_META meta: TBL_META,
channel_data: TBL_CHANNEL_DATA
}; };
var AsyncQueue = require("../asyncqueue"); var AsyncQueue = require("../asyncqueue");

4
src/errors.js Normal file
View file

@ -0,0 +1,4 @@
import createError from 'create-error';
export const ChannelStateSizeError = createError('ChannelStateSizeError');
export const ChannelNotFoundError = createError('ChannelNotFoundError');

View file

@ -1,6 +1,8 @@
const VERSION = require("../package.json").version; const VERSION = require("../package.json").version;
var singleton = null; var singleton = null;
var Config = require("./config"); var Config = require("./config");
var Promise = require("bluebird");
import * as ChannelStore from './channel-storage/channelstore';
module.exports = { module.exports = {
init: function () { init: function () {
@ -55,6 +57,7 @@ var Server = function () {
var Database = require("./database"); var Database = require("./database");
self.db = Database; self.db = Database;
self.db.init(); self.db.init();
ChannelStore.init();
// webserver init ----------------------------------------------------- // webserver init -----------------------------------------------------
self.express = express(); self.express = express();
@ -226,13 +229,18 @@ Server.prototype.announce = function (data) {
Server.prototype.shutdown = function () { Server.prototype.shutdown = function () {
Logger.syslog.log("Unloading channels"); Logger.syslog.log("Unloading channels");
for (var i = 0; i < this.channels.length; i++) { Promise.map(this.channels, channel => {
if (this.channels[i].is(Flags.C_REGISTERED)) { return channel.saveState().tap(() => {
Logger.syslog.log("Saving /r/" + this.channels[i].name); Logger.syslog.log(`Saved /r/${channel.name}`);
this.channels[i].saveState(); }).catch(err => {
} Logger.errlog.log(`Failed to save /r/${channel.name}: ${err.stack}`);
} });
Logger.syslog.log("Goodbye"); }).then(() => {
process.exit(0); Logger.syslog.log("Goodbye");
process.exit(0);
}).catch(err => {
Logger.errlog.log(`Caught error while saving channels: ${err.stack}`);
process.exit(1);
});
}; };

View file

@ -1072,8 +1072,9 @@ Callbacks = {
errDialog("This channel currently exceeds the maximum size of " + errDialog("This channel currently exceeds the maximum size of " +
toHumanReadable(data.limit) + " (channel size is " + toHumanReadable(data.limit) + " (channel size is " +
toHumanReadable(data.actual) + "). Please reduce the size by removing " + toHumanReadable(data.actual) + "). Please reduce the size by removing " +
"unneeded playlist items, filters, and/or emotes or else the channel will " + "unneeded playlist items, filters, and/or emotes. Changes to the channel " +
"be unable to load the next time it is reloaded").attr("id", "chandumptoobig"); "will not be saved until the size is reduced to under the limit.")
.attr("id", "chandumptoobig");
} }
} }