Add message bus for #677

This commit is contained in:
Calvin Montgomery 2017-08-15 18:23:03 -07:00
parent 9ee650461f
commit d16cfb7328
9 changed files with 207 additions and 5 deletions

View file

@ -2,7 +2,7 @@
"author": "Calvin Montgomery", "author": "Calvin Montgomery",
"name": "CyTube", "name": "CyTube",
"description": "Online media synchronizer and chat", "description": "Online media synchronizer and chat",
"version": "3.46.2", "version": "3.47.0",
"repository": { "repository": {
"url": "http://github.com/calzoneman/sync" "url": "http://github.com/calzoneman/sync"
}, },

View file

@ -1,6 +1,7 @@
import NullClusterClient from './io/cluster/nullclusterclient'; import NullClusterClient from './io/cluster/nullclusterclient';
import Config from './config'; import Config from './config';
import IOConfiguration from './configuration/ioconfig'; import IOConfiguration from './configuration/ioconfig';
import { EventEmitter } from 'events';
class LegacyModule { class LegacyModule {
getIOConfig() { getIOConfig() {
@ -15,6 +16,10 @@ class LegacyModule {
return new NullClusterClient(this.getIOConfig()); return new NullClusterClient(this.getIOConfig());
} }
getGlobalMessageBus() {
return new EventEmitter();
}
onReady() { onReady() {
} }

View file

@ -22,6 +22,10 @@ class PartitionConfig {
getAnnouncementChannel() { getAnnouncementChannel() {
return this.config.redis.announcementChannel || 'serverAnnouncements'; return this.config.redis.announcementChannel || 'serverAnnouncements';
} }
getGlobalMessageBusChannel() {
return this.config.redis.globalMessageBusChannel || 'globalMessages';
}
} }
export { PartitionConfig }; export { PartitionConfig };

View file

@ -7,6 +7,7 @@ import LegacyConfig from '../config';
import path from 'path'; import path from 'path';
import { AnnouncementRefresher } from './announcementrefresher'; import { AnnouncementRefresher } from './announcementrefresher';
import { RedisPartitionMapReloader } from './redispartitionmapreloader'; import { RedisPartitionMapReloader } from './redispartitionmapreloader';
import { RedisMessageBus } from '../pubsub/redis';
const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf',
'partitions.toml'); 'partitions.toml');
@ -104,6 +105,19 @@ class PartitionModule {
return this.announcementRefresher; return this.announcementRefresher;
} }
getGlobalMessageBus() {
if (!this.globalMessageBus) {
const provider = this.getRedisClientProvider();
this.globalMessageBus = new RedisMessageBus(
provider.get(),
provider.get(),
this.partitionConfig.getGlobalMessageBusChannel()
);
}
return this.globalMessageBus;
}
} }
export { PartitionModule }; export { PartitionModule };

70
src/pubsub/redis.js Normal file
View file

@ -0,0 +1,70 @@
import { EventEmitter } from 'events';
import { v4 as uuidv4 } from 'uuid';
const LOGGER = require('@calzoneman/jsli')('redis-messagebus');
class RedisMessageBus extends EventEmitter {
constructor(pubClient, subClient, channel) {
super();
this.pubClient = pubClient;
this.subClient = subClient;
this.channel = channel;
this.publisherID = uuidv4();
subClient.once('ready', this.subscribe.bind(this));
}
subscribe() {
this.subClient.subscribe(this.channel);
this.subClient.on('message', this.onMessage.bind(this));
LOGGER.info('Subscribed to Redis messages on channel %s', this.channel);
}
onMessage(channel, message) {
if (channel !== this.channel) {
LOGGER.warn('Ignoring message from mismatched channel "%s"', channel);
return;
}
try {
const { event, payload } = JSON.parse(message);
this._emit(event, payload);
} catch (error) {
if (error instanceof SyntaxError) {
LOGGER.error(
'Malformed message received: %s (message: "%s")',
message,
error
);
} else {
LOGGER.error('Unexpected error decoding message: %s', error.stack);
}
return;
}
}
async emit(event, payload) {
try {
const message = JSON.stringify({
time: new Date(),
publisher: this.publisherID,
event,
payload
});
await this.pubClient.publish(this.channel, message);
} catch (error) {
LOGGER.error('Unable to send event %s: %s', event, error);
}
}
}
Object.assign(RedisMessageBus.prototype, {
_emit: EventEmitter.prototype.emit
});
export { RedisMessageBus };

View file

@ -72,6 +72,9 @@ var Server = function () {
initModule = this.initModule = new LegacyModule(); initModule = this.initModule = new LegacyModule();
} }
const globalMessageBus = this.initModule.getGlobalMessageBus();
globalMessageBus.on('UserProfileChanged', this.handleUserProfileChange.bind(this));
// database init ------------------------------------------------------ // database init ------------------------------------------------------
var Database = require("./database"); var Database = require("./database");
self.db = Database; self.db = Database;
@ -96,7 +99,8 @@ var Server = function () {
ioConfig, ioConfig,
clusterClient, clusterClient,
channelIndex, channelIndex,
session); session,
globalMessageBus);
// http/https/sio server init ----------------------------------------- // http/https/sio server init -----------------------------------------
var key = "", cert = "", ca = undefined; var key = "", cert = "", ca = undefined;
@ -391,3 +395,36 @@ Server.prototype.reloadPartitionMap = function () {
this.initModule.getPartitionMapReloader().reload(); this.initModule.getPartitionMapReloader().reload();
}; };
Server.prototype.handleUserProfileChange = function (event) {
try {
const lname = event.user.toLowerCase();
// Probably not the most efficient thing in the world, but w/e
// profile changes are not high volume
this.channels.forEach(channel => {
if (channel.dead) return;
channel.users.forEach(user => {
if (user.getLowerName() === lname && user.account.user) {
user.account.user.profile = {
image: event.profile.image,
text: event.profile.text
};
user.account.update();
channel.sendUserProfile(channel.users, user);
LOGGER.info(
'Updated profile for user %s in channel %s',
lname,
channel.name
);
}
});
});
} catch (error) {
LOGGER.error('handleUserProfileChange failed: %s', error);
}
};

View file

@ -17,6 +17,8 @@ const url = require("url");
const LOGGER = require('@calzoneman/jsli')('database/accounts'); const LOGGER = require('@calzoneman/jsli')('database/accounts');
let globalMessageBus;
/** /**
* Handles a GET request for /account/edit * Handles a GET request for /account/edit
*/ */
@ -455,6 +457,14 @@ async function handleAccountProfile(req, res) {
return; return;
} }
globalMessageBus.emit('UserProfileChanged', {
user: user.name,
profile: {
image,
text
}
});
sendPug(res, "account-profile", { sendPug(res, "account-profile", {
profileImage: image, profileImage: image,
profileText: text, profileText: text,
@ -661,7 +671,9 @@ module.exports = {
/** /**
* Initialize the module * Initialize the module
*/ */
init: function (app) { init: function (app, _globalMessageBus) {
globalMessageBus = _globalMessageBus;
app.get("/account/edit", handleAccountEditPage); app.get("/account/edit", handleAccountEditPage);
app.post("/account/edit", handleAccountEdit); app.post("/account/edit", handleAccountEdit);
app.get("/account/channels", handleAccountChannelPage); app.get("/account/channels", handleAccountChannelPage);

View file

@ -162,7 +162,15 @@ module.exports = {
/** /**
* Initializes webserver callbacks * Initializes webserver callbacks
*/ */
init: function (app, webConfig, ioConfig, clusterClient, channelIndex, session) { init: function (
app,
webConfig,
ioConfig,
clusterClient,
channelIndex,
session,
globalMessageBus
) {
const chanPath = Config.get('channel-path'); const chanPath = Config.get('channel-path');
initPrometheus(app); initPrometheus(app);
@ -217,7 +225,7 @@ module.exports = {
app.get('/useragreement', handleUserAgreement); app.get('/useragreement', handleUserAgreement);
require('./routes/contact')(app, webConfig); require('./routes/contact')(app, webConfig);
require('./auth').init(app); require('./auth').init(app);
require('./account').init(app); require('./account').init(app, globalMessageBus);
require('./acp').init(app); require('./acp').init(app);
require('../google2vtt').attach(app); require('../google2vtt').attach(app);
require('./routes/google_drive_userscript')(app); require('./routes/google_drive_userscript')(app);

52
test/pubsub/redis.js Normal file
View file

@ -0,0 +1,52 @@
const assert = require('assert');
const { RedisMessageBus } = require('../../lib/pubsub/redis');
const { EventEmitter } = require('events');
const sinon = require('sinon');
describe('RedisMessageBus', () => {
let pubClient, subClient, messageBus, publishSpy, subscribeSpy;
beforeEach(() => {
pubClient = { publish: () => {} };
subClient = new EventEmitter();
subClient.subscribe = () => {};
subscribeSpy = sinon.spy(subClient, 'subscribe');
publishSpy = sinon.spy(pubClient, 'publish');
messageBus = new RedisMessageBus(pubClient, subClient, 'test');
subClient.emit('ready');
});
describe('#onMessage', () => {
it('processes a valid message', done => {
messageBus.once('testEvent', payload => {
assert(subscribeSpy.withArgs('test').calledOnce);
assert.deepStrictEqual(payload, { foo: 'bar' });
done();
});
messageBus.onMessage('test', '{"event":"testEvent","payload":{"foo":"bar"}}');
});
it('processes a syntactically invalid message', done => {
messageBus.onMessage('test', 'not valid json lol');
done();
});
});
describe('#emit', () => {
it('emits messages', () => {
messageBus.emit('testEvent', { foo: 'bar' });
assert(publishSpy.withArgs('test', sinon.match(arg => {
arg = JSON.parse(arg);
return arg.event === 'testEvent' && arg.payload.foo === 'bar';
})).calledOnce);
});
});
});