From d678fa56d127b93cfa10ccc53af81a6f3c9fa830 Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Wed, 11 Nov 2020 22:04:04 -0800 Subject: [PATCH] Remove channel reference counter This was an old attempt at gracefully unloading channels that still had pending callbacks. Its implementation was always flawed, and the number of places where it was used is small enough to replace with straightforward checks for whether the channel has been unloaded after an asynchronous operation. Hopefully fixes the stuck 0 user channels issue. --- integration_test/channel/kickban.js | 39 +++++----- package.json | 2 +- src/channel/channel.js | 106 +++++----------------------- src/channel/kickban.js | 23 ++---- src/channel/library.js | 3 - src/channel/mediarefresher.js | 9 +-- src/channel/playlist.js | 30 +++----- src/server.js | 6 +- www/js/acp.js | 4 -- 9 files changed, 64 insertions(+), 158 deletions(-) diff --git a/integration_test/channel/kickban.js b/integration_test/channel/kickban.js index f153bb93..c633adc6 100644 --- a/integration_test/channel/kickban.js +++ b/integration_test/channel/kickban.js @@ -16,10 +16,6 @@ describe('KickbanModule', () => { beforeEach(() => { mockChannel = { name: channelName, - refCounter: { - ref() { }, - unref() { } - }, logger: { log() { } }, @@ -69,11 +65,22 @@ describe('KickbanModule', () => { }); }); + function patch(fn, after) { + let existing = kickban[fn]; + kickban[fn] = async function () { + try { + await existing.apply(this, arguments) + } finally { + after(); + } + }; + } + describe('#handleCmdBan', () => { it('inserts a valid ban', done => { let kicked = false; - mockChannel.refCounter.unref = () => { + patch('banName', () => { assert(kicked, 'Expected user to be kicked'); database.getDB().runTransaction(async tx => { @@ -90,7 +97,7 @@ describe('KickbanModule', () => { done(); }); - }; + }); mockChannel.users = [{ getLowerName() { @@ -247,7 +254,7 @@ describe('KickbanModule', () => { let firstUserKicked = false; let secondUserKicked = false; - mockChannel.refCounter.unref = () => { + patch('banAll', () => { assert(firstUserKicked, 'Expected banned user to be kicked'); assert( secondUserKicked, @@ -279,7 +286,7 @@ describe('KickbanModule', () => { done(); }); - }; + }); mockChannel.users = [{ getLowerName() { @@ -313,7 +320,7 @@ describe('KickbanModule', () => { }); it('inserts a valid range ban', done => { - mockChannel.refCounter.unref = () => { + patch('banIP', () => { database.getDB().runTransaction(async tx => { const ipBan = await tx.table('channel_bans') .where({ @@ -328,7 +335,7 @@ describe('KickbanModule', () => { done(); }); - }; + }); kickban.handleCmdIPBan( mockUser, @@ -338,7 +345,7 @@ describe('KickbanModule', () => { }); it('inserts a valid wide-range ban', done => { - mockChannel.refCounter.unref = () => { + patch('banIP', () => { database.getDB().runTransaction(async tx => { const ipBan = await tx.table('channel_bans') .where({ @@ -353,7 +360,7 @@ describe('KickbanModule', () => { done(); }); - }; + }); kickban.handleCmdIPBan( mockUser, @@ -365,7 +372,7 @@ describe('KickbanModule', () => { it('inserts a valid IPv6 ban', done => { const longIP = require('../../lib/utilities').expandIPv6('::abcd'); - mockChannel.refCounter.unref = () => { + patch('banAll', () => { database.getDB().runTransaction(async tx => { const ipBan = await tx.table('channel_bans') .where({ @@ -380,7 +387,7 @@ describe('KickbanModule', () => { done(); }); - }; + }); database.getDB().runTransaction(async tx => { await tx.table('aliases') @@ -546,7 +553,7 @@ describe('KickbanModule', () => { }); it('still adds the IP ban even if the name is already banned', done => { - mockChannel.refCounter.unref = () => { + patch('banIP', () => { database.getDB().runTransaction(async tx => { const ipBan = await tx.table('channel_bans') .where({ @@ -561,7 +568,7 @@ describe('KickbanModule', () => { done(); }); - }; + }); database.getDB().runTransaction(tx => { return tx.table('channel_bans') diff --git a/package.json b/package.json index 8b225dbe..ae936509 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Calvin Montgomery", "name": "CyTube", "description": "Online media synchronizer and chat", - "version": "3.73.0", + "version": "3.74.0", "repository": { "url": "http://github.com/calzoneman/sync" }, diff --git a/src/channel/channel.js b/src/channel/channel.js index f60b1d5a..eb3fc8eb 100644 --- a/src/channel/channel.js +++ b/src/channel/channel.js @@ -14,67 +14,6 @@ const LOGGER = require('@calzoneman/jsli')('channel'); const USERCOUNT_THROTTLE = 10000; -class ReferenceCounter { - constructor(channel) { - this.channel = channel; - this.channelName = channel.name; - this.refCount = 0; - this.references = {}; - } - - ref(caller) { - if (caller) { - if (this.references.hasOwnProperty(caller)) { - this.references[caller]++; - } else { - this.references[caller] = 1; - } - } - - this.refCount++; - } - - unref(caller) { - if (caller) { - if (this.references.hasOwnProperty(caller)) { - this.references[caller]--; - if (this.references[caller] === 0) { - delete this.references[caller]; - } - } else { - LOGGER.error("ReferenceCounter::unref() called by caller [" + - caller + "] but this caller had no active references! " + - `(channel: ${this.channelName})`); - return; - } - } - - this.refCount--; - this.checkRefCount(); - } - - checkRefCount() { - if (this.refCount === 0) { - if (Object.keys(this.references).length > 0) { - LOGGER.error("ReferenceCounter::refCount reached 0 but still had " + - "active references: " + - JSON.stringify(Object.keys(this.references)) + - ` (channel: ${this.channelName})`); - for (var caller in this.references) { - this.refCount += this.references[caller]; - } - } else if (this.channel.users && this.channel.users.length > 0) { - LOGGER.error("ReferenceCounter::refCount reached 0 but still had " + - this.channel.users.length + " active users" + - ` (channel: ${this.channelName})`); - this.refCount = this.channel.users.length; - } else { - this.channel.emit("empty"); - } - } - } -} - function Channel(name) { this.name = name; this.uniqueName = name.toLowerCase(); @@ -85,7 +24,6 @@ function Channel(name) { ) ); this.users = []; - this.refCounter = new ReferenceCounter(this); this.flags = 0; this.id = 0; this.ownerName = null; @@ -282,17 +220,16 @@ Channel.prototype.saveState = async function () { Channel.prototype.checkModules = function (fn, args, cb) { const self = this; - const refCaller = `Channel::checkModules/${fn}`; this.waitFlag(Flags.C_READY, function () { if (self.dead) return; - self.refCounter.ref(refCaller); var keys = Object.keys(self.modules); var next = function (err, result) { + if (self.dead) return; + if (result !== ChannelModule.PASSTHROUGH) { /* Either an error occured, or the module denied the user access */ cb(err, result); - self.refCounter.unref(refCaller); return; } @@ -300,7 +237,6 @@ Channel.prototype.checkModules = function (fn, args, cb) { if (m === undefined) { /* No more modules to check */ cb(null, ChannelModule.PASSTHROUGH); - self.refCounter.unref(refCaller); return; } @@ -339,28 +275,32 @@ Channel.prototype.notifyModules = function (fn, args) { Channel.prototype.joinUser = function (user, data) { const self = this; - self.refCounter.ref("Channel::user"); self.waitFlag(Flags.C_READY, function () { - /* User closed the connection before the channel finished loading */ if (user.socket.disconnected) { - self.refCounter.unref("Channel::user"); + return; + } + + if (self.dead) { + user.kick('Channel is not loaded'); return; } user.channel = self; user.waitFlag(Flags.U_LOGGED_IN, () => { if (self.dead) { - LOGGER.warn( - 'Got U_LOGGED_IN for %s after channel already unloaded', - user.getName() - ); + user.kick('Channel is not loaded'); return; } if (user.is(Flags.U_REGISTERED)) { db.channels.getRank(self.name, user.getName(), (error, rank) => { if (!error) { + if (self.dead) { + user.kick('Channel is not loaded'); + return; + } + user.setChannelRank(rank); user.setFlag(Flags.U_HAS_CHANNEL_RANK); if (user.inChannel()) { @@ -374,13 +314,6 @@ Channel.prototype.joinUser = function (user, data) { } }); - if (user.socket.disconnected) { - self.refCounter.unref("Channel::user"); - return; - } else if (self.dead) { - return; - } - self.checkModules("onUserPreJoin", [user, data], function (err, result) { if (result === ChannelModule.PASSTHROUGH) { user.channel = self; @@ -389,7 +322,6 @@ Channel.prototype.joinUser = function (user, data) { user.channel = null; user.account.channelRank = 0; user.account.effectiveRank = user.account.globalRank; - self.refCounter.unref("Channel::user"); } }); }); @@ -493,8 +425,8 @@ Channel.prototype.partUser = function (user) { }); this.broadcastUsercount(); - this.refCounter.unref("Channel::user"); user.die(); + if (this.users.length === 0) this.emit('empty'); }; Channel.prototype.maybeResendUserlist = function maybeResendUserlist(user, newRank, oldRank) { @@ -655,13 +587,14 @@ Channel.prototype.sendUserJoin = function (users, user) { Channel.prototype.readLog = function (cb) { const maxLen = 102400; const file = this.logger.filename; - this.refCounter.ref("Channel::readLog"); const self = this; fs.stat(file, function (err, data) { if (err) { - self.refCounter.unref("Channel::readLog"); return cb(err, null); } + if (self.dead) { + return cb(new Error('Channel unloaded'), null); + } const start = Math.max(data.size - maxLen, 0); const end = data.size - 1; @@ -677,7 +610,6 @@ Channel.prototype.readLog = function (cb) { }); read.on("end", function () { cb(null, buffer); - self.refCounter.unref("Channel::readLog"); }); }); }; @@ -744,10 +676,6 @@ Channel.prototype.packInfo = function (isAdmin) { } } - if (isAdmin) { - data.activeLockCount = this.refCounter.refCount; - } - var self = this; var keys = Object.keys(this.modules); keys.forEach(function (k) { diff --git a/src/channel/kickban.js b/src/channel/kickban.js index 9c342bf3..a325d20e 100644 --- a/src/channel/kickban.js +++ b/src/channel/kickban.js @@ -77,10 +77,11 @@ KickBanModule.prototype.onUserPostJoin = function (user) { } const chan = this.channel; - const refCaller = "KickBanModule::onUserPostJoin"; user.waitFlag(Flags.U_LOGGED_IN, function () { - chan.refCounter.ref(refCaller); db.channels.isNameBanned(chan.name, user.getName(), function (err, banned) { + if (chan.dead) { + return; + } if (!err && banned) { user.kick("You are banned from this channel."); if (chan.modules.chat) { @@ -88,7 +89,6 @@ KickBanModule.prototype.onUserPostJoin = function (user) { "name is banned)"); } } - chan.refCounter.unref(refCaller); }); }); @@ -226,14 +226,9 @@ KickBanModule.prototype.handleCmdBan = function (user, msg, _meta) { var name = args.shift().toLowerCase(); var reason = args.join(" "); - const chan = this.channel; - chan.refCounter.ref("KickBanModule::handleCmdBan"); - this.banName(user, name, reason).catch(error => { const message = error.message || error; user.socket.emit("errorMsg", { msg: message }); - }).then(() => { - chan.refCounter.unref("KickBanModule::handleCmdBan"); }); }; @@ -257,15 +252,9 @@ KickBanModule.prototype.handleCmdIPBan = function (user, msg, _meta) { } var reason = args.join(" "); - const chan = this.channel; - chan.refCounter.ref("KickBanModule::handleCmdIPBan"); - this.banAll(user, name, range, reason).catch(error => { - //console.log('!!!', error.stack); const message = error.message || error; user.socket.emit("errorMsg", { msg: message }); - }).then(() => { - chan.refCounter.unref("KickBanModule::handleCmdIPBan"); }); }; @@ -427,14 +416,15 @@ KickBanModule.prototype.handleUnban = function (user, data) { } var self = this; - this.channel.refCounter.ref("KickBanModule::handleUnban"); db.channels.unbanId(this.channel.name, data.id, function (err) { if (err) { - self.channel.refCounter.unref("KickBanModule::handleUnban"); return user.socket.emit("errorMsg", { msg: err }); } + if (self.channel.dead) { + return; + } self.sendUnban(self.channel.users, data); self.channel.logger.log("[mod] " + user.getName() + " unbanned " + data.name); @@ -445,7 +435,6 @@ KickBanModule.prototype.handleUnban = function (user, data) { banperm ); } - self.channel.refCounter.unref("KickBanModule::handleUnban"); }); }; diff --git a/src/channel/library.js b/src/channel/library.js index 31b34316..c0df5b45 100644 --- a/src/channel/library.js +++ b/src/channel/library.js @@ -55,18 +55,15 @@ LibraryModule.prototype.handleUncache = function (user, data) { } const chan = this.channel; - chan.refCounter.ref("LibraryModule::handleUncache"); db.channels.deleteFromLibrary(chan.name, data.id, function (err, _res) { if (chan.dead) { return; } else if (err) { - chan.refCounter.unref("LibraryModule::handleUncache"); return; } chan.logger.log("[library] " + user.getName() + " deleted " + data.id + "from the library"); - chan.refCounter.unref("LibraryModule::handleUncache"); }); }; diff --git a/src/channel/mediarefresher.js b/src/channel/mediarefresher.js index f8c53f57..4f9c4dbe 100644 --- a/src/channel/mediarefresher.js +++ b/src/channel/mediarefresher.js @@ -47,9 +47,8 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) { } const self = this; - self.channel.refCounter.ref("MediaRefresherModule::initVimeo"); Vimeo.extract(data.id).then(function (direct) { - if (self.dead || self.channel.dead) { + if (self.channel.dead) { self.unload(); return; } @@ -63,9 +62,11 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) { if (cb) cb(); }).catch(function (err) { LOGGER.error("Unexpected vimeo::extract() fail: " + err.stack); + if (self.channel.dead) { + self.unload(); + return; + } if (cb) cb(); - }).finally(() => { - self.channel.refCounter.unref("MediaRefresherModule::initVimeo"); }); }; diff --git a/src/channel/playlist.js b/src/channel/playlist.js index b398e7f7..96ca8391 100644 --- a/src/channel/playlist.js +++ b/src/channel/playlist.js @@ -511,19 +511,19 @@ PlaylistModule.prototype.queueStandard = function (user, data) { }; const self = this; - this.channel.refCounter.ref("PlaylistModule::queueStandard"); counters.add("playlist:queue:count", 1); this.semaphore.queue(function (lock) { InfoGetter.getMedia(data.id, data.type, function (err, media) { if (err) { error(XSS.sanitizeText(String(err))); - self.channel.refCounter.unref("PlaylistModule::queueStandard"); + return lock.release(); + } + if (self.channel.dead) { return lock.release(); } self._addItem(media, data, user, function () { lock.release(); - self.channel.refCounter.unref("PlaylistModule::queueStandard"); }); }); }); @@ -546,7 +546,7 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) { return lock.release(); } - if (self.dead) { + if (self.channel.dead) { return lock.release(); } @@ -562,8 +562,6 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) { } } - self.channel.refCounter.ref("PlaylistModule::queueYouTubePlaylist"); - if (self.channel.modules.library && data.shouldAddToLibrary) { self.channel.modules.library.cacheMediaList(vids); data.shouldAddToLibrary = false; @@ -574,8 +572,6 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) { self._addItem(media, data, user); }); - self.channel.refCounter.unref("PlaylistModule::queueYouTubePlaylist"); - lock.release(); }); }); @@ -593,7 +589,6 @@ PlaylistModule.prototype.handleDelete = function (user, data) { } var plitem = this.items.find(data); - self.channel.refCounter.ref("PlaylistModule::handleDelete"); this.semaphore.queue(function (lock) { if (self._delete(data)) { self.channel.logger.log("[playlist] " + user.getName() + " deleted " + @@ -601,7 +596,6 @@ PlaylistModule.prototype.handleDelete = function (user, data) { } lock.release(); - self.channel.refCounter.unref("PlaylistModule::handleDelete"); }); }; @@ -637,26 +631,24 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) { } const self = this; - self.channel.refCounter.ref("PlaylistModule::handleMoveMedia"); self.semaphore.queue(function (lock) { + if (self.channel.dead) { + return lock.release(); + } if (!self.items.remove(data.from)) { - self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } if (data.after === "prepend") { if (!self.items.prepend(from)) { - self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } else if (data.after === "append") { if (!self.items.append(from)) { - self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } else { if (!self.items.insertAfter(from, data.after)) { - self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); return lock.release(); } } @@ -668,7 +660,6 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) { (after ? " after " + after.media.title : "")); self._listDirty = true; lock.release(); - self.channel.refCounter.unref("PlaylistModule::handleMoveMedia"); }); }; @@ -1356,14 +1347,15 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { }; const self = this; - self.channel.refCounter.ref("PlaylistModule::handleQueuePlaylist"); db.getUserPlaylist(user.getName(), data.name, function (err, pl) { if (err) { - self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist"); return user.socket.emit("errorMsg", { msg: "Playlist load failed: " + err }); } + if (self.channel.dead) { + return; + } try { if (data.pos === "next") { @@ -1398,8 +1390,6 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) { msg: "Internal error occurred when loading playlist.", link: null }); - } finally { - self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist"); } }); }; diff --git a/src/server.js b/src/server.js index b6be7c70..ec487854 100644 --- a/src/server.js +++ b/src/server.js @@ -357,11 +357,9 @@ Server.prototype.unloadChannel = function (chan, options) { LOGGER.info("Unloaded channel " + chan.name); chan.broadcastUsercount.cancel(); - // Empty all outward references from the channel + // Empty all outward references from the channel | TODO does this actually help? Object.keys(chan).forEach(key => { - if (key !== "refCounter") { - delete chan[key]; - } + delete chan[key]; }); chan.dead = true; promActiveChannels.dec(); diff --git a/www/js/acp.js b/www/js/acp.js index 55de7ae6..1613ab21 100644 --- a/www/js/acp.js +++ b/www/js/acp.js @@ -421,10 +421,6 @@ function showChannelDetailModal(c) { $("").text("Public").appendTo(tr); $("").text(c.public).appendTo(tr); - tr = $("").appendTo(table); - $("").text("ActiveLock Count").appendTo(tr); - $("").text(c.activeLockCount).appendTo(tr); - tr = $("").appendTo(table); $("").text("Chat Filter Count").appendTo(tr); $("").text(c.chatFilterCount).appendTo(tr);