The knexening: part 1
This commit is contained in:
parent
f968521936
commit
2a694e73af
|
@ -31,6 +31,7 @@
|
||||||
"graceful-fs": "^4.1.2",
|
"graceful-fs": "^4.1.2",
|
||||||
"http-errors": "^1.3.1",
|
"http-errors": "^1.3.1",
|
||||||
"json-typecheck": "^0.1.3",
|
"json-typecheck": "^0.1.3",
|
||||||
|
"knex": "^0.13.0",
|
||||||
"lodash": "^4.13.1",
|
"lodash": "^4.13.1",
|
||||||
"morgan": "^1.6.1",
|
"morgan": "^1.6.1",
|
||||||
"mysql": "^2.9.0",
|
"mysql": "^2.9.0",
|
||||||
|
|
132
src/database.js
132
src/database.js
|
@ -6,52 +6,68 @@ var net = require("net");
|
||||||
var util = require("./utilities");
|
var util = require("./utilities");
|
||||||
import * as Metrics from 'cytube-common/lib/metrics/metrics';
|
import * as Metrics from 'cytube-common/lib/metrics/metrics';
|
||||||
import { LoggerFactory } from '@calzoneman/jsli';
|
import { LoggerFactory } from '@calzoneman/jsli';
|
||||||
|
import knex from 'knex';
|
||||||
|
|
||||||
const LOGGER = LoggerFactory.getLogger('database');
|
const LOGGER = LoggerFactory.getLogger('database');
|
||||||
|
|
||||||
var pool = null;
|
|
||||||
var global_ipbans = {};
|
var global_ipbans = {};
|
||||||
|
let db = null;
|
||||||
|
|
||||||
|
class Database {
|
||||||
|
constructor() {
|
||||||
|
const config = {
|
||||||
|
client: 'mysql',
|
||||||
|
connection: {
|
||||||
|
host: Config.get('mysql.server'),
|
||||||
|
port: Config.get('mysql.port'),
|
||||||
|
user: Config.get('mysql.user'),
|
||||||
|
password: Config.get('mysql.password'),
|
||||||
|
database: Config.get('mysql.database'),
|
||||||
|
multipleStatements: true, // Legacy thing
|
||||||
|
charset: 'UTF8MB4_GENERAL_CI'
|
||||||
|
},
|
||||||
|
pool: {
|
||||||
|
min: Config.get('mysql.pool-size'),
|
||||||
|
max: Config.get('mysql.pool-size')
|
||||||
|
},
|
||||||
|
debug: !!process.env.KNEX_DEBUG
|
||||||
|
};
|
||||||
|
|
||||||
|
this.knex = knex(config);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
module.exports.init = function () {
|
module.exports.init = function () {
|
||||||
pool = mysql.createPool({
|
db = new Database();
|
||||||
host: Config.get("mysql.server"),
|
db.knex.raw('select 1 from dual')
|
||||||
port: Config.get("mysql.port"),
|
.catch(error => {
|
||||||
user: Config.get("mysql.user"),
|
LOGGER.error('Initial database connection failed: %s', error.stack);
|
||||||
password: Config.get("mysql.password"),
|
process.exit(1);
|
||||||
database: Config.get("mysql.database"),
|
}).then(() => {
|
||||||
multipleStatements: true,
|
process.nextTick(legacySetup);
|
||||||
charset: "UTF8MB4_GENERAL_CI", // Needed for emoji and other non-BMP unicode
|
|
||||||
connectionLimit: Config.get("mysql.pool-size")
|
|
||||||
});
|
|
||||||
|
|
||||||
// Test the connection
|
|
||||||
pool.getConnection(function (err, conn) {
|
|
||||||
if (err) {
|
|
||||||
LOGGER.error("Initial database connection failed: " + err.stack);
|
|
||||||
process.exit(1);
|
|
||||||
} else {
|
|
||||||
tables.init(module.exports.query, function (err) {
|
|
||||||
if (err) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
require("./database/update").checkVersion();
|
|
||||||
module.exports.loadAnnouncement();
|
|
||||||
});
|
});
|
||||||
// Refresh global IP bans
|
|
||||||
module.exports.listGlobalBans();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
pool.on("enqueue", function () {
|
|
||||||
Metrics.incCounter("db:queryQueued", 1);
|
|
||||||
});
|
|
||||||
|
|
||||||
global_ipbans = {};
|
global_ipbans = {};
|
||||||
module.exports.users = require("./database/accounts");
|
module.exports.users = require("./database/accounts");
|
||||||
module.exports.channels = require("./database/channels");
|
module.exports.channels = require("./database/channels");
|
||||||
module.exports.pool = pool;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
module.exports.getDB = function getDB() {
|
||||||
|
return db;
|
||||||
|
};
|
||||||
|
|
||||||
|
function legacySetup() {
|
||||||
|
tables.init(module.exports.query, function (err) {
|
||||||
|
if (err) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
require("./database/update").checkVersion();
|
||||||
|
module.exports.loadAnnouncement();
|
||||||
|
});
|
||||||
|
// Refresh global IP bans
|
||||||
|
module.exports.listGlobalBans();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute a database query
|
* Execute a database query
|
||||||
*/
|
*/
|
||||||
|
@ -60,50 +76,26 @@ module.exports.query = function (query, sub, callback) {
|
||||||
// 2nd argument is optional
|
// 2nd argument is optional
|
||||||
if (typeof sub === "function") {
|
if (typeof sub === "function") {
|
||||||
callback = sub;
|
callback = sub;
|
||||||
sub = false;
|
sub = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (typeof callback !== "function") {
|
if (typeof callback !== "function") {
|
||||||
callback = blackHole;
|
callback = blackHole;
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.getConnection(function (err, conn) {
|
if (process.env.SHOW_SQL) {
|
||||||
if (err) {
|
console.log(query);
|
||||||
LOGGER.error("! DB connection failed: " + err);
|
}
|
||||||
callback("Database failure", null);
|
|
||||||
} else {
|
|
||||||
function cback(err, res) {
|
|
||||||
conn.release();
|
|
||||||
if (err) {
|
|
||||||
LOGGER.error("! DB query failed: " + query);
|
|
||||||
if (sub) {
|
|
||||||
LOGGER.error("Substitutions: " + sub);
|
|
||||||
}
|
|
||||||
LOGGER.error(err);
|
|
||||||
callback("Database failure", null);
|
|
||||||
} else {
|
|
||||||
callback(null, res);
|
|
||||||
}
|
|
||||||
Metrics.stopTimer(timer);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (process.env.SHOW_SQL) {
|
db.knex.raw(query, sub)
|
||||||
console.log(query);
|
.then(res => {
|
||||||
}
|
process.nextTick(callback, null, res[0]);
|
||||||
|
}).catch(error => {
|
||||||
try {
|
LOGGER.error('Legacy DB query failed. Query: %s, Substitutions: %j, Error: %s', query, sub, error);
|
||||||
if (sub) {
|
process.nextTick(callback, 'Database failure', null);
|
||||||
conn.query(query, sub, cback);
|
}).finally(() => {
|
||||||
} else {
|
Metrics.stopTimer(timer);
|
||||||
conn.query(query, cback);
|
});
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
LOGGER.error("Broken query: " + error.stack);
|
|
||||||
callback("Broken query", null);
|
|
||||||
conn.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -363,29 +363,15 @@ function populateUsernameDedupeColumn(cb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Promise.map(rows, row => {
|
Promise.map(rows, row => {
|
||||||
return new Promise((resolve, reject) => {
|
const dedupedName = dbUsers.dedupeUsername(row.name);
|
||||||
db.pool.getConnection((error, conn) => {
|
LOGGER.info(`Deduping [${row.name}] as [${dedupedName}]`);
|
||||||
if (error) {
|
return db.getDB().knex.raw("UPDATE users SET name_dedupe = ? WHERE id = ?", [dedupedName, row.id])
|
||||||
reject(error);
|
.catch(error => {
|
||||||
return;
|
if (error.errno === 1062) {
|
||||||
}
|
LOGGER.info(`WARNING: could not set name_dedupe for [${row.name}] due to an existing row for [${dedupedName}]`);
|
||||||
|
} else {
|
||||||
const dedupedName = dbUsers.dedupeUsername(row.name);
|
throw error;
|
||||||
LOGGER.info(`Deduping [${row.name}] as [${dedupedName}]`);
|
}
|
||||||
conn.query("UPDATE users SET name_dedupe = ? WHERE id = ?", [dedupedName, row.id], (error, res) => {
|
|
||||||
conn.release();
|
|
||||||
if (error) {
|
|
||||||
if (error.errno === 1062) {
|
|
||||||
LOGGER.info(`WARNING: could not set name_dedupe for [${row.name}] due to an existing row for [${dedupedName}]`);
|
|
||||||
resolve();
|
|
||||||
} else {
|
|
||||||
reject(error);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
resolve();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}, { concurrency: 10 }).then(() => {
|
}, { concurrency: 10 }).then(() => {
|
||||||
cb();
|
cb();
|
||||||
|
|
Loading…
Reference in a new issue