Revert "Remove channel reference counter"

This reverts commit d678fa56d1.  The
reference counter, flawed as it is, was masking far more issues than I
realized.  It would require a more significant rearchitecture of the
code to remove it.  Probably better to keep it and try to improve it for
now.
This commit is contained in:
Calvin Montgomery 2021-01-09 13:03:38 -08:00
parent 3262f7822f
commit 00e9acbe4d
9 changed files with 158 additions and 64 deletions

View file

@ -16,6 +16,10 @@ describe('KickbanModule', () => {
beforeEach(() => {
mockChannel = {
name: channelName,
refCounter: {
ref() { },
unref() { }
},
logger: {
log() { }
},
@ -65,22 +69,11 @@ describe('KickbanModule', () => {
});
});
function patch(fn, after) {
let existing = kickban[fn];
kickban[fn] = async function () {
try {
await existing.apply(this, arguments)
} finally {
after();
}
};
}
describe('#handleCmdBan', () => {
it('inserts a valid ban', done => {
let kicked = false;
patch('banName', () => {
mockChannel.refCounter.unref = () => {
assert(kicked, 'Expected user to be kicked');
database.getDB().runTransaction(async tx => {
@ -97,7 +90,7 @@ describe('KickbanModule', () => {
done();
});
});
};
mockChannel.users = [{
getLowerName() {
@ -254,7 +247,7 @@ describe('KickbanModule', () => {
let firstUserKicked = false;
let secondUserKicked = false;
patch('banAll', () => {
mockChannel.refCounter.unref = () => {
assert(firstUserKicked, 'Expected banned user to be kicked');
assert(
secondUserKicked,
@ -286,7 +279,7 @@ describe('KickbanModule', () => {
done();
});
});
};
mockChannel.users = [{
getLowerName() {
@ -320,7 +313,7 @@ describe('KickbanModule', () => {
});
it('inserts a valid range ban', done => {
patch('banIP', () => {
mockChannel.refCounter.unref = () => {
database.getDB().runTransaction(async tx => {
const ipBan = await tx.table('channel_bans')
.where({
@ -335,7 +328,7 @@ describe('KickbanModule', () => {
done();
});
});
};
kickban.handleCmdIPBan(
mockUser,
@ -345,7 +338,7 @@ describe('KickbanModule', () => {
});
it('inserts a valid wide-range ban', done => {
patch('banIP', () => {
mockChannel.refCounter.unref = () => {
database.getDB().runTransaction(async tx => {
const ipBan = await tx.table('channel_bans')
.where({
@ -360,7 +353,7 @@ describe('KickbanModule', () => {
done();
});
});
};
kickban.handleCmdIPBan(
mockUser,
@ -372,7 +365,7 @@ describe('KickbanModule', () => {
it('inserts a valid IPv6 ban', done => {
const longIP = require('../../lib/utilities').expandIPv6('::abcd');
patch('banAll', () => {
mockChannel.refCounter.unref = () => {
database.getDB().runTransaction(async tx => {
const ipBan = await tx.table('channel_bans')
.where({
@ -387,7 +380,7 @@ describe('KickbanModule', () => {
done();
});
});
};
database.getDB().runTransaction(async tx => {
await tx.table('aliases')
@ -553,7 +546,7 @@ describe('KickbanModule', () => {
});
it('still adds the IP ban even if the name is already banned', done => {
patch('banIP', () => {
mockChannel.refCounter.unref = () => {
database.getDB().runTransaction(async tx => {
const ipBan = await tx.table('channel_bans')
.where({
@ -568,7 +561,7 @@ describe('KickbanModule', () => {
done();
});
});
};
database.getDB().runTransaction(tx => {
return tx.table('channel_bans')

View file

@ -2,7 +2,7 @@
"author": "Calvin Montgomery",
"name": "CyTube",
"description": "Online media synchronizer and chat",
"version": "3.74.1",
"version": "3.74.2",
"repository": {
"url": "http://github.com/calzoneman/sync"
},

View file

@ -14,6 +14,67 @@ const LOGGER = require('@calzoneman/jsli')('channel');
const USERCOUNT_THROTTLE = 10000;
class ReferenceCounter {
constructor(channel) {
this.channel = channel;
this.channelName = channel.name;
this.refCount = 0;
this.references = {};
}
ref(caller) {
if (caller) {
if (this.references.hasOwnProperty(caller)) {
this.references[caller]++;
} else {
this.references[caller] = 1;
}
}
this.refCount++;
}
unref(caller) {
if (caller) {
if (this.references.hasOwnProperty(caller)) {
this.references[caller]--;
if (this.references[caller] === 0) {
delete this.references[caller];
}
} else {
LOGGER.error("ReferenceCounter::unref() called by caller [" +
caller + "] but this caller had no active references! " +
`(channel: ${this.channelName})`);
return;
}
}
this.refCount--;
this.checkRefCount();
}
checkRefCount() {
if (this.refCount === 0) {
if (Object.keys(this.references).length > 0) {
LOGGER.error("ReferenceCounter::refCount reached 0 but still had " +
"active references: " +
JSON.stringify(Object.keys(this.references)) +
` (channel: ${this.channelName})`);
for (var caller in this.references) {
this.refCount += this.references[caller];
}
} else if (this.channel.users && this.channel.users.length > 0) {
LOGGER.error("ReferenceCounter::refCount reached 0 but still had " +
this.channel.users.length + " active users" +
` (channel: ${this.channelName})`);
this.refCount = this.channel.users.length;
} else {
this.channel.emit("empty");
}
}
}
}
function Channel(name) {
this.name = name;
this.uniqueName = name.toLowerCase();
@ -24,6 +85,7 @@ function Channel(name) {
)
);
this.users = [];
this.refCounter = new ReferenceCounter(this);
this.flags = 0;
this.id = 0;
this.ownerName = null;
@ -220,16 +282,17 @@ Channel.prototype.saveState = async function () {
Channel.prototype.checkModules = function (fn, args, cb) {
const self = this;
const refCaller = `Channel::checkModules/${fn}`;
this.waitFlag(Flags.C_READY, function () {
if (self.dead) return;
self.refCounter.ref(refCaller);
var keys = Object.keys(self.modules);
var next = function (err, result) {
if (self.dead) return;
if (result !== ChannelModule.PASSTHROUGH) {
/* Either an error occured, or the module denied the user access */
cb(err, result);
self.refCounter.unref(refCaller);
return;
}
@ -237,6 +300,7 @@ Channel.prototype.checkModules = function (fn, args, cb) {
if (m === undefined) {
/* No more modules to check */
cb(null, ChannelModule.PASSTHROUGH);
self.refCounter.unref(refCaller);
return;
}
@ -275,32 +339,28 @@ Channel.prototype.notifyModules = function (fn, args) {
Channel.prototype.joinUser = function (user, data) {
const self = this;
self.refCounter.ref("Channel::user");
self.waitFlag(Flags.C_READY, function () {
/* User closed the connection before the channel finished loading */
if (user.socket.disconnected) {
return;
}
if (self.dead) {
user.kick('Channel is not loaded');
self.refCounter.unref("Channel::user");
return;
}
user.channel = self;
user.waitFlag(Flags.U_LOGGED_IN, () => {
if (self.dead) {
user.kick('Channel is not loaded');
LOGGER.warn(
'Got U_LOGGED_IN for %s after channel already unloaded',
user.getName()
);
return;
}
if (user.is(Flags.U_REGISTERED)) {
db.channels.getRank(self.name, user.getName(), (error, rank) => {
if (!error) {
if (self.dead) {
user.kick('Channel is not loaded');
return;
}
user.setChannelRank(rank);
user.setFlag(Flags.U_HAS_CHANNEL_RANK);
if (user.inChannel()) {
@ -314,6 +374,13 @@ Channel.prototype.joinUser = function (user, data) {
}
});
if (user.socket.disconnected) {
self.refCounter.unref("Channel::user");
return;
} else if (self.dead) {
return;
}
self.checkModules("onUserPreJoin", [user, data], function (err, result) {
if (result === ChannelModule.PASSTHROUGH) {
user.channel = self;
@ -322,6 +389,7 @@ Channel.prototype.joinUser = function (user, data) {
user.channel = null;
user.account.channelRank = 0;
user.account.effectiveRank = user.account.globalRank;
self.refCounter.unref("Channel::user");
}
});
});
@ -425,8 +493,8 @@ Channel.prototype.partUser = function (user) {
});
this.broadcastUsercount();
this.refCounter.unref("Channel::user");
user.die();
if (this.users.length === 0) this.emit('empty');
};
Channel.prototype.maybeResendUserlist = function maybeResendUserlist(user, newRank, oldRank) {
@ -587,14 +655,13 @@ Channel.prototype.sendUserJoin = function (users, user) {
Channel.prototype.readLog = function (cb) {
const maxLen = 102400;
const file = this.logger.filename;
this.refCounter.ref("Channel::readLog");
const self = this;
fs.stat(file, function (err, data) {
if (err) {
self.refCounter.unref("Channel::readLog");
return cb(err, null);
}
if (self.dead) {
return cb(new Error('Channel unloaded'), null);
}
const start = Math.max(data.size - maxLen, 0);
const end = data.size - 1;
@ -610,6 +677,7 @@ Channel.prototype.readLog = function (cb) {
});
read.on("end", function () {
cb(null, buffer);
self.refCounter.unref("Channel::readLog");
});
});
};
@ -676,6 +744,10 @@ Channel.prototype.packInfo = function (isAdmin) {
}
}
if (isAdmin) {
data.activeLockCount = this.refCounter.refCount;
}
var self = this;
var keys = Object.keys(this.modules);
keys.forEach(function (k) {

View file

@ -77,11 +77,10 @@ KickBanModule.prototype.onUserPostJoin = function (user) {
}
const chan = this.channel;
const refCaller = "KickBanModule::onUserPostJoin";
user.waitFlag(Flags.U_LOGGED_IN, function () {
chan.refCounter.ref(refCaller);
db.channels.isNameBanned(chan.name, user.getName(), function (err, banned) {
if (chan.dead) {
return;
}
if (!err && banned) {
user.kick("You are banned from this channel.");
if (chan.modules.chat) {
@ -89,6 +88,7 @@ KickBanModule.prototype.onUserPostJoin = function (user) {
"name is banned)");
}
}
chan.refCounter.unref(refCaller);
});
});
@ -226,9 +226,14 @@ KickBanModule.prototype.handleCmdBan = function (user, msg, _meta) {
var name = args.shift().toLowerCase();
var reason = args.join(" ");
const chan = this.channel;
chan.refCounter.ref("KickBanModule::handleCmdBan");
this.banName(user, name, reason).catch(error => {
const message = error.message || error;
user.socket.emit("errorMsg", { msg: message });
}).then(() => {
chan.refCounter.unref("KickBanModule::handleCmdBan");
});
};
@ -252,9 +257,15 @@ KickBanModule.prototype.handleCmdIPBan = function (user, msg, _meta) {
}
var reason = args.join(" ");
const chan = this.channel;
chan.refCounter.ref("KickBanModule::handleCmdIPBan");
this.banAll(user, name, range, reason).catch(error => {
//console.log('!!!', error.stack);
const message = error.message || error;
user.socket.emit("errorMsg", { msg: message });
}).then(() => {
chan.refCounter.unref("KickBanModule::handleCmdIPBan");
});
};
@ -416,15 +427,14 @@ KickBanModule.prototype.handleUnban = function (user, data) {
}
var self = this;
this.channel.refCounter.ref("KickBanModule::handleUnban");
db.channels.unbanId(this.channel.name, data.id, function (err) {
if (err) {
self.channel.refCounter.unref("KickBanModule::handleUnban");
return user.socket.emit("errorMsg", {
msg: err
});
}
if (self.channel.dead) {
return;
}
self.sendUnban(self.channel.users, data);
self.channel.logger.log("[mod] " + user.getName() + " unbanned " + data.name);
@ -435,6 +445,7 @@ KickBanModule.prototype.handleUnban = function (user, data) {
banperm
);
}
self.channel.refCounter.unref("KickBanModule::handleUnban");
});
};

View file

@ -55,15 +55,18 @@ LibraryModule.prototype.handleUncache = function (user, data) {
}
const chan = this.channel;
chan.refCounter.ref("LibraryModule::handleUncache");
db.channels.deleteFromLibrary(chan.name, data.id, function (err, _res) {
if (chan.dead) {
return;
} else if (err) {
chan.refCounter.unref("LibraryModule::handleUncache");
return;
}
chan.logger.log("[library] " + user.getName() + " deleted " + data.id +
"from the library");
chan.refCounter.unref("LibraryModule::handleUncache");
});
};

View file

@ -47,8 +47,9 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) {
}
const self = this;
self.channel.refCounter.ref("MediaRefresherModule::initVimeo");
Vimeo.extract(data.id).then(function (direct) {
if (self.channel.dead) {
if (self.dead || self.channel.dead) {
self.unload();
return;
}
@ -62,11 +63,9 @@ MediaRefresherModule.prototype.initVimeo = function (data, cb) {
if (cb) cb();
}).catch(function (err) {
LOGGER.error("Unexpected vimeo::extract() fail: " + err.stack);
if (self.channel.dead) {
self.unload();
return;
}
if (cb) cb();
}).finally(() => {
self.channel.refCounter.unref("MediaRefresherModule::initVimeo");
});
};

View file

@ -511,19 +511,19 @@ PlaylistModule.prototype.queueStandard = function (user, data) {
};
const self = this;
this.channel.refCounter.ref("PlaylistModule::queueStandard");
counters.add("playlist:queue:count", 1);
this.semaphore.queue(function (lock) {
InfoGetter.getMedia(data.id, data.type, function (err, media) {
if (err) {
error(XSS.sanitizeText(String(err)));
return lock.release();
}
if (self.channel.dead) {
self.channel.refCounter.unref("PlaylistModule::queueStandard");
return lock.release();
}
self._addItem(media, data, user, function () {
lock.release();
self.channel.refCounter.unref("PlaylistModule::queueStandard");
});
});
});
@ -546,7 +546,7 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) {
return lock.release();
}
if (self.channel.dead) {
if (self.dead) {
return lock.release();
}
@ -562,6 +562,8 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) {
}
}
self.channel.refCounter.ref("PlaylistModule::queueYouTubePlaylist");
if (self.channel.modules.library && data.shouldAddToLibrary) {
self.channel.modules.library.cacheMediaList(vids);
data.shouldAddToLibrary = false;
@ -572,6 +574,8 @@ PlaylistModule.prototype.queueYouTubePlaylist = function (user, data) {
self._addItem(media, data, user);
});
self.channel.refCounter.unref("PlaylistModule::queueYouTubePlaylist");
lock.release();
});
});
@ -589,6 +593,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) {
}
var plitem = this.items.find(data);
self.channel.refCounter.ref("PlaylistModule::handleDelete");
this.semaphore.queue(function (lock) {
if (self._delete(data)) {
self.channel.logger.log("[playlist] " + user.getName() + " deleted " +
@ -596,6 +601,7 @@ PlaylistModule.prototype.handleDelete = function (user, data) {
}
lock.release();
self.channel.refCounter.unref("PlaylistModule::handleDelete");
});
};
@ -631,24 +637,26 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) {
}
const self = this;
self.channel.refCounter.ref("PlaylistModule::handleMoveMedia");
self.semaphore.queue(function (lock) {
if (self.channel.dead) {
return lock.release();
}
if (!self.items.remove(data.from)) {
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
return lock.release();
}
if (data.after === "prepend") {
if (!self.items.prepend(from)) {
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
return lock.release();
}
} else if (data.after === "append") {
if (!self.items.append(from)) {
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
return lock.release();
}
} else {
if (!self.items.insertAfter(from, data.after)) {
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
return lock.release();
}
}
@ -660,6 +668,7 @@ PlaylistModule.prototype.handleMoveMedia = function (user, data) {
(after ? " after " + after.media.title : ""));
self._listDirty = true;
lock.release();
self.channel.refCounter.unref("PlaylistModule::handleMoveMedia");
});
};
@ -1351,15 +1360,14 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) {
};
const self = this;
self.channel.refCounter.ref("PlaylistModule::handleQueuePlaylist");
db.getUserPlaylist(user.getName(), data.name, function (err, pl) {
if (err) {
self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist");
return user.socket.emit("errorMsg", {
msg: "Playlist load failed: " + err
});
}
if (self.channel.dead) {
return;
}
try {
if (data.pos === "next") {
@ -1394,6 +1402,8 @@ PlaylistModule.prototype.handleQueuePlaylist = function (user, data) {
msg: "Internal error occurred when loading playlist.",
link: null
});
} finally {
self.channel.refCounter.unref("PlaylistModule::handleQueuePlaylist");
}
});
};

View file

@ -357,9 +357,11 @@ Server.prototype.unloadChannel = function (chan, options) {
LOGGER.info("Unloaded channel " + chan.name);
chan.broadcastUsercount.cancel();
// Empty all outward references from the channel | TODO does this actually help?
// Empty all outward references from the channel
Object.keys(chan).forEach(key => {
delete chan[key];
if (key !== "refCounter") {
delete chan[key];
}
});
chan.dead = true;
promActiveChannels.dec();

View file

@ -421,6 +421,10 @@ function showChannelDetailModal(c) {
$("<td/>").text("Public").appendTo(tr);
$("<td/>").text(c.public).appendTo(tr);
tr = $("<tr/>").appendTo(table);
$("<td/>").text("ActiveLock Count").appendTo(tr);
$("<td/>").text(c.activeLockCount).appendTo(tr);
tr = $("<tr/>").appendTo(table);
$("<td/>").text("Chat Filter Count").appendTo(tr);
$("<td/>").text(c.chatFilterCount).appendTo(tr);