diff --git a/package.json b/package.json index 840d59a0..7d0afaf4 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "graceful-fs": "^4.1.2", "http-errors": "^1.3.1", "json-typecheck": "^0.1.3", + "knex": "^0.13.0", "lodash": "^4.13.1", "morgan": "^1.6.1", "mysql": "^2.9.0", diff --git a/src/database.js b/src/database.js index 98608e80..c42dbaef 100644 --- a/src/database.js +++ b/src/database.js @@ -6,52 +6,68 @@ var net = require("net"); var util = require("./utilities"); import * as Metrics from 'cytube-common/lib/metrics/metrics'; import { LoggerFactory } from '@calzoneman/jsli'; +import knex from 'knex'; const LOGGER = LoggerFactory.getLogger('database'); -var pool = null; var global_ipbans = {}; +let db = null; + +class Database { + constructor() { + const config = { + client: 'mysql', + connection: { + host: Config.get('mysql.server'), + port: Config.get('mysql.port'), + user: Config.get('mysql.user'), + password: Config.get('mysql.password'), + database: Config.get('mysql.database'), + multipleStatements: true, // Legacy thing + charset: 'UTF8MB4_GENERAL_CI' + }, + pool: { + min: Config.get('mysql.pool-size'), + max: Config.get('mysql.pool-size') + }, + debug: !!process.env.KNEX_DEBUG + }; + + this.knex = knex(config); + } +} module.exports.init = function () { - pool = mysql.createPool({ - host: Config.get("mysql.server"), - port: Config.get("mysql.port"), - user: Config.get("mysql.user"), - password: Config.get("mysql.password"), - database: Config.get("mysql.database"), - multipleStatements: true, - charset: "UTF8MB4_GENERAL_CI", // Needed for emoji and other non-BMP unicode - connectionLimit: Config.get("mysql.pool-size") - }); - - // Test the connection - pool.getConnection(function (err, conn) { - if (err) { - LOGGER.error("Initial database connection failed: " + err.stack); - process.exit(1); - } else { - tables.init(module.exports.query, function (err) { - if (err) { - return; - } - require("./database/update").checkVersion(); - module.exports.loadAnnouncement(); + db = new Database(); + db.knex.raw('select 1 from dual') + .catch(error => { + LOGGER.error('Initial database connection failed: %s', error.stack); + process.exit(1); + }).then(() => { + process.nextTick(legacySetup); }); - // Refresh global IP bans - module.exports.listGlobalBans(); - } - }); - - pool.on("enqueue", function () { - Metrics.incCounter("db:queryQueued", 1); - }); global_ipbans = {}; module.exports.users = require("./database/accounts"); module.exports.channels = require("./database/channels"); - module.exports.pool = pool; }; +module.exports.getDB = function getDB() { + return db; +}; + +function legacySetup() { + tables.init(module.exports.query, function (err) { + if (err) { + return; + } + require("./database/update").checkVersion(); + module.exports.loadAnnouncement(); + }); + // Refresh global IP bans + module.exports.listGlobalBans(); +} + /** * Execute a database query */ @@ -60,50 +76,26 @@ module.exports.query = function (query, sub, callback) { // 2nd argument is optional if (typeof sub === "function") { callback = sub; - sub = false; + sub = undefined; } if (typeof callback !== "function") { callback = blackHole; } - pool.getConnection(function (err, conn) { - if (err) { - LOGGER.error("! DB connection failed: " + err); - callback("Database failure", null); - } else { - function cback(err, res) { - conn.release(); - if (err) { - LOGGER.error("! DB query failed: " + query); - if (sub) { - LOGGER.error("Substitutions: " + sub); - } - LOGGER.error(err); - callback("Database failure", null); - } else { - callback(null, res); - } - Metrics.stopTimer(timer); - } + if (process.env.SHOW_SQL) { + console.log(query); + } - if (process.env.SHOW_SQL) { - console.log(query); - } - - try { - if (sub) { - conn.query(query, sub, cback); - } else { - conn.query(query, cback); - } - } catch (error) { - LOGGER.error("Broken query: " + error.stack); - callback("Broken query", null); - conn.release(); - } - } - }); + db.knex.raw(query, sub) + .then(res => { + process.nextTick(callback, null, res[0]); + }).catch(error => { + LOGGER.error('Legacy DB query failed. Query: %s, Substitutions: %j, Error: %s', query, sub, error); + process.nextTick(callback, 'Database failure', null); + }).finally(() => { + Metrics.stopTimer(timer); + }); }; /** diff --git a/src/database/update.js b/src/database/update.js index dcee7712..04d010bc 100644 --- a/src/database/update.js +++ b/src/database/update.js @@ -363,29 +363,15 @@ function populateUsernameDedupeColumn(cb) { } Promise.map(rows, row => { - return new Promise((resolve, reject) => { - db.pool.getConnection((error, conn) => { - if (error) { - reject(error); - return; - } - - const dedupedName = dbUsers.dedupeUsername(row.name); - LOGGER.info(`Deduping [${row.name}] as [${dedupedName}]`); - conn.query("UPDATE users SET name_dedupe = ? WHERE id = ?", [dedupedName, row.id], (error, res) => { - conn.release(); - if (error) { - if (error.errno === 1062) { - LOGGER.info(`WARNING: could not set name_dedupe for [${row.name}] due to an existing row for [${dedupedName}]`); - resolve(); - } else { - reject(error); - } - } else { - resolve(); - } - }); - }); + const dedupedName = dbUsers.dedupeUsername(row.name); + LOGGER.info(`Deduping [${row.name}] as [${dedupedName}]`); + return db.getDB().knex.raw("UPDATE users SET name_dedupe = ? WHERE id = ?", [dedupedName, row.id]) + .catch(error => { + if (error.errno === 1062) { + LOGGER.info(`WARNING: could not set name_dedupe for [${row.name}] due to an existing row for [${dedupedName}]`); + } else { + throw error; + } }); }, { concurrency: 10 }).then(() => { cb();