Compare commits
16 commits
postgreSQL
...
uws
Author | SHA1 | Date | |
---|---|---|---|
5864836ce7 | |||
17cf6023db | |||
3584b124e3 | |||
3322e5404f | |||
2340da1cb5 | |||
07502575eb | |||
17911d43fc | |||
f0ba3a998a | |||
3547a51f2c | |||
85947d326e | |||
4cad31d82c | |||
86e023d233 | |||
d369f1ebe4 | |||
61b856c2c9 | |||
1021cc706a | |||
fb5f92b7ee |
|
@ -11,4 +11,3 @@ env:
|
|||
node_js:
|
||||
- "10"
|
||||
- "8"
|
||||
- "6"
|
||||
|
|
|
@ -41,6 +41,7 @@
|
|||
"sprintf-js": "^1.0.3",
|
||||
"toml": "^2.3.0",
|
||||
"uuid": "^3.2.1",
|
||||
"uws": "10.148.1",
|
||||
"yamljs": "^0.2.8"
|
||||
},
|
||||
"scripts": {
|
||||
|
|
|
@ -725,6 +725,7 @@ Channel.prototype.handleReadLog = function (user) {
|
|||
|
||||
Channel.prototype.broadcastToRoom = function (msg, data, ns) {
|
||||
sio.instance.in(ns).emit(msg, data);
|
||||
require('../io/uws').in(ns).emit(msg, data);
|
||||
};
|
||||
|
||||
Channel.prototype.broadcastAll = function (msg, data) {
|
||||
|
|
|
@ -6,11 +6,47 @@ export default class IOConfiguration {
|
|||
getSocketEndpoints() {
|
||||
return this.config.endpoints.slice();
|
||||
}
|
||||
|
||||
getUWSEndpoints() {
|
||||
return this.config.uwsEndpoints.slice();
|
||||
}
|
||||
}
|
||||
|
||||
function getUWSEndpoints(oldConfig) {
|
||||
const uwsEndpoints = oldConfig.get('listen').filter(it => it.uws)
|
||||
.map(it => {
|
||||
let domain;
|
||||
if (it.https) {
|
||||
domain = oldConfig.get('https.domain')
|
||||
.replace(/^https/, 'wss');
|
||||
} else {
|
||||
domain = oldConfig.get('io.domain')
|
||||
.replace(/^http/, 'ws');
|
||||
}
|
||||
|
||||
return {
|
||||
secure: !!it.https,
|
||||
url: `${domain}:${it.port}`
|
||||
};
|
||||
});
|
||||
|
||||
uwsEndpoints.sort((a, b) => {
|
||||
if (a.secure && !b.secure) {
|
||||
return -1;
|
||||
} else if (b.secure && !a.secure) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
});
|
||||
|
||||
return uwsEndpoints;
|
||||
}
|
||||
|
||||
IOConfiguration.fromOldConfig = function (oldConfig) {
|
||||
const config = {
|
||||
endpoints: []
|
||||
endpoints: [],
|
||||
uwsEndpoints: getUWSEndpoints(oldConfig)
|
||||
};
|
||||
|
||||
if (oldConfig.get('io.ipv4-ssl')) {
|
||||
|
|
|
@ -7,8 +7,10 @@ export default class NullClusterClient {
|
|||
|
||||
getSocketConfig(_channel) {
|
||||
const servers = this.ioConfig.getSocketEndpoints();
|
||||
const uwsServers = this.ioConfig.getUWSEndpoints();
|
||||
return Promise.resolve({
|
||||
servers: servers
|
||||
servers: servers,
|
||||
uwsServers: uwsServers
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import { Counter, Gauge } from 'prom-client';
|
|||
import Socket from 'socket.io/lib/socket';
|
||||
import { TokenBucket } from '../util/token-bucket';
|
||||
import http from 'http';
|
||||
import { UWSServer } from './uws';
|
||||
|
||||
const LOGGER = require('@calzoneman/jsli')('ioserver');
|
||||
|
||||
|
@ -34,6 +35,20 @@ const authFailureCount = new Counter({
|
|||
help: 'Number of failed authentications from session middleware'
|
||||
});
|
||||
|
||||
class SocketIOContext {
|
||||
constructor(socket) {
|
||||
socket.handshake.connection = {
|
||||
remoteAddress: socket.handshake.address
|
||||
};
|
||||
|
||||
this.upgradeReq = socket.handshake;
|
||||
this.ipAddress = null;
|
||||
this.torConnection = null;
|
||||
this.ipSessionFirstSeen = null;
|
||||
this.user = null;
|
||||
}
|
||||
}
|
||||
|
||||
class IOServer {
|
||||
constructor(options = {
|
||||
proxyTrustFn: proxyaddr.compile('127.0.0.1')
|
||||
|
@ -49,21 +64,16 @@ class IOServer {
|
|||
// Map proxied sockets to the real IP address via X-Forwarded-For
|
||||
// If the resulting address is a known Tor exit, flag it as such
|
||||
ipProxyMiddleware(socket, next) {
|
||||
if (!socket.context) socket.context = {};
|
||||
|
||||
try {
|
||||
socket.handshake.connection = {
|
||||
remoteAddress: socket.handshake.address
|
||||
};
|
||||
|
||||
socket.context.ipAddress = proxyaddr(
|
||||
socket.handshake,
|
||||
socket.context.upgradeReq,
|
||||
this.proxyTrustFn
|
||||
);
|
||||
|
||||
if (!socket.context.ipAddress) {
|
||||
throw new Error(
|
||||
`Assertion failed: unexpected IP ${socket.context.ipAddress}`
|
||||
'Could not determine IP address from ' +
|
||||
socket.context.upgradeReq.connection.remoteAddress
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
|
@ -163,7 +173,7 @@ class IOServer {
|
|||
|
||||
// Parse cookies
|
||||
cookieParsingMiddleware(socket, next) {
|
||||
const req = socket.handshake;
|
||||
const req = socket.context.upgradeReq;
|
||||
if (req.headers.cookie) {
|
||||
cookieParser(req, null, () => next());
|
||||
} else {
|
||||
|
@ -176,7 +186,7 @@ class IOServer {
|
|||
// Determine session age from ip-session cookie
|
||||
// (Used for restricting chat)
|
||||
ipSessionCookieMiddleware(socket, next) {
|
||||
const cookie = socket.handshake.signedCookies['ip-session'];
|
||||
const cookie = socket.context.upgradeReq.signedCookies['ip-session'];
|
||||
if (!cookie) {
|
||||
socket.context.ipSessionFirstSeen = new Date();
|
||||
next();
|
||||
|
@ -197,7 +207,7 @@ class IOServer {
|
|||
socket.context.aliases = [];
|
||||
|
||||
const promises = [];
|
||||
const auth = socket.handshake.signedCookies.auth;
|
||||
const auth = socket.context.upgradeReq.signedCookies.auth;
|
||||
if (auth) {
|
||||
promises.push(verifySession(auth).then(user => {
|
||||
socket.context.user = Object.assign({}, user);
|
||||
|
@ -266,6 +276,10 @@ class IOServer {
|
|||
patchTypecheckedFunctions();
|
||||
|
||||
const io = this.io = sio.instance = sio();
|
||||
io.use((socket, next) => {
|
||||
socket.context = new SocketIOContext(socket);
|
||||
next();
|
||||
});
|
||||
io.use(this.ipProxyMiddleware.bind(this));
|
||||
io.use(this.ipBanMiddleware.bind(this));
|
||||
io.use(this.ipThrottleMiddleware.bind(this));
|
||||
|
@ -276,14 +290,29 @@ class IOServer {
|
|||
io.on('connection', this.handleConnection.bind(this));
|
||||
}
|
||||
|
||||
bindTo(servers) {
|
||||
initUWS() {
|
||||
const uws = this.uws = new UWSServer();
|
||||
|
||||
uws.use(this.ipProxyMiddleware.bind(this));
|
||||
uws.use(this.ipBanMiddleware.bind(this));
|
||||
uws.use(this.ipThrottleMiddleware.bind(this));
|
||||
uws.use(this.cookieParsingMiddleware.bind(this));
|
||||
uws.use(this.ipSessionCookieMiddleware.bind(this));
|
||||
uws.use(this.authUserMiddleware.bind(this));
|
||||
uws.on('connection', this.handleConnection.bind(this));
|
||||
}
|
||||
|
||||
bindTo(sioServers, uwsServers) {
|
||||
if (!this.io) {
|
||||
throw new Error('Cannot bind: socket.io has not been initialized yet');
|
||||
}
|
||||
|
||||
servers.forEach(server => {
|
||||
sioServers.forEach(server => {
|
||||
this.io.attach(server);
|
||||
});
|
||||
uwsServers.forEach(server => {
|
||||
this.uws.attach(server);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -417,15 +446,23 @@ module.exports = {
|
|||
});
|
||||
|
||||
ioServer.initSocketIO();
|
||||
ioServer.initUWS();
|
||||
|
||||
const uniqueListenAddresses = new Set();
|
||||
const servers = [];
|
||||
const uwsServers = [];
|
||||
|
||||
Config.get("listen").forEach(function (bind) {
|
||||
if (!bind.io) {
|
||||
if (bind.io && bind.uws) {
|
||||
throw new Error(
|
||||
'Cannot bind both socket.io and uws to the same listener'
|
||||
);
|
||||
} else if (!bind.io && !bind.uws) {
|
||||
return;
|
||||
}
|
||||
|
||||
const serverList = bind.io ? servers : uwsServers;
|
||||
|
||||
const id = bind.ip + ":" + bind.port;
|
||||
if (uniqueListenAddresses.has(id)) {
|
||||
LOGGER.warn("Ignoring duplicate listen address %s", id);
|
||||
|
@ -433,19 +470,21 @@ module.exports = {
|
|||
}
|
||||
|
||||
if (srv.servers.hasOwnProperty(id)) {
|
||||
servers.push(srv.servers[id]);
|
||||
serverList.push(srv.servers[id]);
|
||||
} else {
|
||||
const server = http.createServer().listen(bind.port, bind.ip);
|
||||
servers.push(server);
|
||||
serverList.push(server);
|
||||
}
|
||||
|
||||
uniqueListenAddresses.add(id);
|
||||
});
|
||||
|
||||
ioServer.bindTo(servers);
|
||||
ioServer.bindTo(servers, uwsServers);
|
||||
},
|
||||
|
||||
IOServer: IOServer
|
||||
IOServer: IOServer,
|
||||
|
||||
SocketIOContext: SocketIOContext
|
||||
};
|
||||
|
||||
/* Clean out old rate limiters */
|
||||
|
|
261
src/io/uws.js
Normal file
261
src/io/uws.js
Normal file
|
@ -0,0 +1,261 @@
|
|||
import { EventEmitter } from 'events';
|
||||
import { Multimap } from '../util/multimap';
|
||||
import clone from 'clone';
|
||||
import typecheck from 'json-typecheck';
|
||||
import uws from 'uws';
|
||||
|
||||
const LOGGER = require('@calzoneman/jsli')('uws');
|
||||
|
||||
const TYPE_FRAME = 0;
|
||||
const TYPE_ACK = 1;
|
||||
|
||||
const rooms = new Multimap();
|
||||
|
||||
class UWSContext {
|
||||
constructor(upgradeReq) {
|
||||
this.upgradeReq = upgradeReq;
|
||||
this.ipAddress = null;
|
||||
this.torConnection = null;
|
||||
this.ipSessionFirstSeen = null;
|
||||
this.user = null;
|
||||
}
|
||||
}
|
||||
|
||||
class UWSWrapper extends EventEmitter {
|
||||
constructor(socket) {
|
||||
super();
|
||||
|
||||
this._uwsSocket = socket;
|
||||
this._joined = new Set();
|
||||
this.disconnected = false;
|
||||
|
||||
this.context = new UWSContext({
|
||||
connection: {
|
||||
remoteAddress: socket._socket.remoteAddress
|
||||
},
|
||||
headers: clone(socket.upgradeReq.headers)
|
||||
});
|
||||
// socket.io metrics compatibility
|
||||
this.client = {
|
||||
conn: {
|
||||
on: function(){},
|
||||
transport: {
|
||||
name: 'uws'
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
this._uwsSocket.on('message', message => {
|
||||
try {
|
||||
this._decode(message);
|
||||
} catch (error) {
|
||||
LOGGER.warn(
|
||||
'Decode failed for client %s: %s',
|
||||
this.context.ipAddress,
|
||||
error
|
||||
);
|
||||
this.disconnect();
|
||||
}
|
||||
});
|
||||
|
||||
this._uwsSocket.on('close', () => {
|
||||
this.disconnected = true;
|
||||
|
||||
for (let room of this._joined) {
|
||||
rooms.delete(room, this);
|
||||
}
|
||||
|
||||
this._joined.clear();
|
||||
this._emit('disconnect');
|
||||
});
|
||||
|
||||
this._uwsSocket.on('error', error => {
|
||||
// TODO: determine what conditions cause this
|
||||
LOGGER.error(
|
||||
'Error for client %s: %s',
|
||||
this.context.ipAddress,
|
||||
error.stack
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this._uwsSocket.terminate();
|
||||
}
|
||||
|
||||
emit(frame, payload) {
|
||||
sendSafe(this, encode(frame, payload));
|
||||
}
|
||||
|
||||
join(room) {
|
||||
this._joined.add(room);
|
||||
rooms.set(room, this);
|
||||
}
|
||||
|
||||
leave(room) {
|
||||
this._joined.delete(room);
|
||||
rooms.delete(room, this);
|
||||
}
|
||||
|
||||
typecheckedOn(frame, typeDef, cb) {
|
||||
this.on(frame, (data, ack) => {
|
||||
typecheck(data, typeDef, (err, data) => {
|
||||
if (err) {
|
||||
this.emit('errorMsg', {
|
||||
msg: 'Unexpected error for message ' + frame + ': ' +
|
||||
err.message
|
||||
});
|
||||
} else {
|
||||
cb(data, ack);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
typecheckedOnce(frame, typeDef, cb) {
|
||||
this.once(frame, (data, ack) => {
|
||||
typecheck(data, typeDef, (err, data) => {
|
||||
if (err) {
|
||||
this.emit('errorMsg', {
|
||||
msg: 'Unexpected error for message ' + frame + ': ' +
|
||||
err.message
|
||||
});
|
||||
} else {
|
||||
cb(data, ack);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
_ack(ackId, payload) {
|
||||
sendSafe(
|
||||
this,
|
||||
JSON.stringify({
|
||||
type: TYPE_ACK,
|
||||
ackId,
|
||||
payload
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
_decode(message) {
|
||||
const { frame, type, ackId, payload } = JSON.parse(message);
|
||||
|
||||
if (type !== TYPE_FRAME) {
|
||||
LOGGER.warn(
|
||||
'Unexpected message type %s from client; dropping',
|
||||
type
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const args = [payload];
|
||||
|
||||
if (typeof ackId === 'number') {
|
||||
args.push(payload => this._ack(ackId, payload));
|
||||
}
|
||||
|
||||
this._emit(frame, ...args);
|
||||
}
|
||||
}
|
||||
|
||||
Object.assign(UWSWrapper.prototype, { _emit: EventEmitter.prototype.emit });
|
||||
|
||||
class UWSServer extends EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
|
||||
this._servers = [];
|
||||
this._middleware = [];
|
||||
}
|
||||
|
||||
use(cb) {
|
||||
this._middleware.push(cb);
|
||||
}
|
||||
|
||||
attach(server) {
|
||||
const uwsServer = new uws.Server({
|
||||
server,
|
||||
perMessageDeflate: false
|
||||
});
|
||||
this._servers.push(uwsServer);
|
||||
|
||||
uwsServer.on('connection', socket => this._onConnection(socket));
|
||||
server.on('listening', () => this.emit('listening'));
|
||||
uwsServer.on('error', e => this.emit('error', e));
|
||||
}
|
||||
|
||||
_onConnection(uwsSocket) {
|
||||
const socket = new UWSWrapper(uwsSocket);
|
||||
|
||||
if (this._middleware.length === 0) {
|
||||
this._acceptConnection(socket);
|
||||
return;
|
||||
}
|
||||
|
||||
let i = 0;
|
||||
const self = this;
|
||||
function next(error) {
|
||||
if (error) {
|
||||
socket.emit('error', error.message);
|
||||
socket.disconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
if (i >= self._middleware.length) {
|
||||
self._acceptConnection(socket);
|
||||
return;
|
||||
}
|
||||
|
||||
process.nextTick(self._middleware[i], socket, next);
|
||||
i++;
|
||||
}
|
||||
|
||||
process.nextTick(next, null);
|
||||
}
|
||||
|
||||
_acceptConnection(socket) {
|
||||
socket.emit('connect');
|
||||
this.emit('connection', socket);
|
||||
}
|
||||
|
||||
shutdown() {
|
||||
this._servers.forEach(sv => sv.close());
|
||||
}
|
||||
}
|
||||
|
||||
function encode(frame, payload) {
|
||||
return JSON.stringify({
|
||||
type: TYPE_FRAME,
|
||||
frame,
|
||||
payload
|
||||
});
|
||||
}
|
||||
|
||||
function sendSafe(socket, message) {
|
||||
try {
|
||||
socket._uwsSocket.send(message);
|
||||
} catch (error) {
|
||||
LOGGER.error(
|
||||
'Error sending to client %s: %s',
|
||||
socket.context.ipAddress,
|
||||
error.stack
|
||||
);
|
||||
socket.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
function inRoom(room) {
|
||||
return {
|
||||
emit(frame, payload) {
|
||||
const encoded = encode(frame, payload);
|
||||
|
||||
for (let wrapper of rooms.get(room)) {
|
||||
sendSafe(wrapper, encoded);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export { UWSServer };
|
||||
exports['in'] = inRoom;
|
45
src/util/multimap.js
Normal file
45
src/util/multimap.js
Normal file
|
@ -0,0 +1,45 @@
|
|||
class Multimap {
|
||||
constructor() {
|
||||
this._items = new Map();
|
||||
}
|
||||
|
||||
get(key) {
|
||||
if (this._items.has(key)) {
|
||||
return this._items.get(key);
|
||||
}
|
||||
|
||||
return new Set();
|
||||
}
|
||||
|
||||
has(key, value) {
|
||||
if (!this._items.has(key)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return this._items.get(key).has(value);
|
||||
}
|
||||
|
||||
set(key, value) {
|
||||
if (!this._items.has(key)) {
|
||||
this._items.set(key, new Set());
|
||||
}
|
||||
|
||||
return this._items.get(key).add(value);
|
||||
}
|
||||
|
||||
delete(key, value) {
|
||||
if (!this._items.has(key)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const res = this._items.get(key).delete(value);
|
||||
|
||||
if (this._items.get(key).size == 0) {
|
||||
this._items.delete(key);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
export { Multimap };
|
|
@ -247,6 +247,7 @@ html(lang="en")
|
|||
script(src="/js/player.js")
|
||||
script(src="/js/paginator.js")
|
||||
script(src="/js/ui.js")
|
||||
script(src="/js/ws.js")
|
||||
script(src="/js/callbacks.js")
|
||||
script(defer, src="https://www.youtube.com/iframe_api")
|
||||
script(defer, src="https://api.dmcdn.net/all.js")
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const assert = require('assert');
|
||||
const IOServer = require('../../lib/io/ioserver').IOServer;
|
||||
const SocketIOContext = require('../../lib/io/ioserver').SocketIOContext;
|
||||
|
||||
describe('IOServer', () => {
|
||||
let server;
|
||||
|
@ -7,9 +8,6 @@ describe('IOServer', () => {
|
|||
beforeEach(() => {
|
||||
server = new IOServer();
|
||||
socket = {
|
||||
context: {
|
||||
ipAddress: '9.9.9.9'
|
||||
},
|
||||
handshake: {
|
||||
address: '127.0.0.1',
|
||||
headers: {
|
||||
|
@ -17,6 +15,7 @@ describe('IOServer', () => {
|
|||
}
|
||||
}
|
||||
};
|
||||
socket.context = new SocketIOContext(socket);
|
||||
});
|
||||
|
||||
describe('#ipProxyMiddleware', () => {
|
||||
|
@ -29,7 +28,7 @@ describe('IOServer', () => {
|
|||
});
|
||||
|
||||
it('does not proxy from a non-trusted address', done => {
|
||||
socket.handshake.address = '5.6.7.8';
|
||||
socket.context.upgradeReq.connection.remoteAddress = '5.6.7.8';
|
||||
server.ipProxyMiddleware(socket, error => {
|
||||
assert(!error);
|
||||
assert.strictEqual(socket.context.ipAddress, '5.6.7.8');
|
||||
|
|
229
test/io/uws.js
Normal file
229
test/io/uws.js
Normal file
|
@ -0,0 +1,229 @@
|
|||
const { EventEmitter } = require('events');
|
||||
const assert = require('assert');
|
||||
const { UWSServer } = require('../../lib/io/uws');
|
||||
const inRoom = require('../../lib/io/uws')['in'];
|
||||
const WebSocket = require('uws');
|
||||
const http = require('http');
|
||||
|
||||
describe('UWSServer', () => {
|
||||
const endpoint = 'ws://127.0.0.1:3000';
|
||||
|
||||
let httpServer;
|
||||
let server;
|
||||
let socket;
|
||||
beforeEach(done => {
|
||||
httpServer = http.createServer();
|
||||
httpServer.listen(3000);
|
||||
|
||||
server = new UWSServer();
|
||||
server.attach(httpServer);
|
||||
server.on('error', e => { throw e; });
|
||||
server.once('listening', done);
|
||||
});
|
||||
|
||||
function connect() {
|
||||
let socket = new WebSocket(endpoint);
|
||||
socket.test = new EventEmitter();
|
||||
|
||||
socket.onmessage = message => {
|
||||
const { type, frame, payload, ackId } = JSON.parse(message.data);
|
||||
|
||||
if (type === 0) {
|
||||
socket.test.emit(frame, payload);
|
||||
} else if (type === 1) {
|
||||
socket.test.emit('ack', ackId, payload);
|
||||
}
|
||||
};
|
||||
socket.onerror = e => { throw e; };
|
||||
|
||||
return socket;
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
if (socket) socket.terminate();
|
||||
socket = null;
|
||||
if (server) server.shutdown();
|
||||
server = null;
|
||||
if (httpServer) httpServer.close();
|
||||
httpServer = null;
|
||||
});
|
||||
|
||||
it('accepts a connection immediately if there is no middleware', done => {
|
||||
socket = connect();
|
||||
socket.test.on('connect', done);
|
||||
});
|
||||
|
||||
it('accepts a connection with middleware', done => {
|
||||
let m1 = false, m2 = false;
|
||||
server.use((socket, next) => {
|
||||
m1 = true;
|
||||
next();
|
||||
});
|
||||
server.use((socket, next) => {
|
||||
m2 = true;
|
||||
next();
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
socket.test.on('connect', () => {
|
||||
assert(m1);
|
||||
assert(m2);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('rejects a connection with middleware', done => {
|
||||
let m1 = false, m2 = false;
|
||||
server.use((socket, next) => {
|
||||
m1 = true;
|
||||
next(new Error('broken'));
|
||||
});
|
||||
server.use((socket, next) => {
|
||||
m2 = true;
|
||||
next();
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
socket.test.on('connect', () => {
|
||||
throw new Error('Unexpected connect callback');
|
||||
});
|
||||
socket.test.on('error', e => {
|
||||
assert.strictEqual(e, 'broken');
|
||||
assert(!m2);
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('receives a normal frame', done => {
|
||||
server.on('connection', s => {
|
||||
s.on('test', data => {
|
||||
assert.deepStrictEqual(data, {foo: 'bar'});
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
socket.onopen = () => {
|
||||
socket.send(JSON.stringify({
|
||||
type: 0,
|
||||
frame: 'test',
|
||||
payload: { foo: 'bar' }
|
||||
}));
|
||||
};
|
||||
});
|
||||
|
||||
it('sends a normal frame', done => {
|
||||
server.on('connection', s => {
|
||||
s.emit('test', { foo: 'bar' });
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
socket.test.on('test', data => {
|
||||
assert.deepStrictEqual(data, { foo: 'bar' });
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('broadcasts to a room', done => {
|
||||
server.on('connection', s => {
|
||||
s.join('testroom');
|
||||
inRoom('testroom').emit('test', { foo: 'bar' });
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
socket.test.on('test', data => {
|
||||
assert.deepStrictEqual(data, { foo: 'bar' });
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('responds with an ack frame', done => {
|
||||
server.on('connection', s => {
|
||||
s.on('test', (data, ack) => {
|
||||
assert.deepStrictEqual(data, {foo: 'bar'});
|
||||
ack({ baz: 'quux' });
|
||||
});
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
socket.onopen = () => {
|
||||
socket.send(JSON.stringify({
|
||||
type: 0,
|
||||
frame: 'test',
|
||||
payload: { foo: 'bar' },
|
||||
ackId: 1
|
||||
}));
|
||||
|
||||
socket.test.on('ack', (ackId, payload) => {
|
||||
assert.strictEqual(ackId, 1);
|
||||
assert.deepStrictEqual(payload, { baz: 'quux' });
|
||||
done();
|
||||
});
|
||||
};
|
||||
});
|
||||
|
||||
it('typechecks input frames', done => {
|
||||
server.on('connection', s => {
|
||||
s.typecheckedOn('test', { foo: 'string' }, data => {
|
||||
assert.fail('Should not have reached callback');
|
||||
});
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
socket.onopen = () => {
|
||||
socket.send(JSON.stringify({
|
||||
type: 0,
|
||||
frame: 'test',
|
||||
payload: { foo: 123 }
|
||||
}));
|
||||
|
||||
socket.test.on('errorMsg', payload => {
|
||||
assert.equal(
|
||||
payload.msg,
|
||||
'Unexpected error for message test: ' +
|
||||
'Expected key foo to be of type string, instead got number'
|
||||
);
|
||||
done();
|
||||
});
|
||||
};
|
||||
});
|
||||
|
||||
it('catches errors during socket.emit()', done => {
|
||||
server.on('connection', s => {
|
||||
s.join('testroom');
|
||||
s._uwsSocket.send = () => { throw new Error('well darn'); };
|
||||
|
||||
s.emit('test', { foo: 'bar' });
|
||||
done();
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
});
|
||||
|
||||
it('catches errors during inRoom().emit()', done => {
|
||||
server.on('connection', s => {
|
||||
s.join('testroom');
|
||||
s._uwsSocket.send = () => { throw new Error('well darn'); };
|
||||
|
||||
inRoom('testroom').emit('test', { foo: 'bar' });
|
||||
done();
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
});
|
||||
|
||||
it('sets disconnected = true after a disconnect', done => {
|
||||
server.on('connection', s => {
|
||||
assert.strictEqual(s.disconnected, false);
|
||||
|
||||
s.on('disconnect', () => {
|
||||
assert.strictEqual(s.disconnected, true);
|
||||
done();
|
||||
});
|
||||
|
||||
s.disconnect();
|
||||
});
|
||||
|
||||
socket = connect();
|
||||
});
|
||||
});
|
34
test/util/multimap.js
Normal file
34
test/util/multimap.js
Normal file
|
@ -0,0 +1,34 @@
|
|||
const assert = require('assert');
|
||||
const { Multimap } = require('../../lib/util/multimap');
|
||||
|
||||
describe('Multimap', () => {
|
||||
let map;
|
||||
|
||||
beforeEach(() => {
|
||||
map = new Multimap();
|
||||
});
|
||||
|
||||
it('returns the empty set for an unset key', () => {
|
||||
assert.deepEqual(map.get('unknown'), new Set());
|
||||
});
|
||||
|
||||
it('returns a set of values for a given key', () => {
|
||||
map.set('a', 1);
|
||||
map.set('a', 2);
|
||||
map.set('a', 1);
|
||||
|
||||
assert.deepEqual(map.get('a'), new Set([1, 2]));
|
||||
});
|
||||
|
||||
it('deletes a value for a given key', () => {
|
||||
map.set('a', 1);
|
||||
map.set('a', 2);
|
||||
map.delete('a', 1);
|
||||
|
||||
assert.deepEqual(map.get('a'), new Set([2]));
|
||||
|
||||
map.delete('a', 2);
|
||||
|
||||
assert.deepEqual(map.get('a'), new Set());
|
||||
});
|
||||
});
|
|
@ -1221,7 +1221,17 @@ function ioServerConnect(socketConfig) {
|
|||
|
||||
var USING_LETS_ENCRYPT = false;
|
||||
|
||||
function initSocket(socketConfig) {
|
||||
if (socketConfig.uwsServers && socketConfig.uwsServers.length > 0) {
|
||||
initWS(socketConfig.uwsServers);
|
||||
} else {
|
||||
initSocketIO(socketConfig);
|
||||
}
|
||||
}
|
||||
|
||||
function initSocketIO(socketConfig) {
|
||||
console.log('Using socket.io');
|
||||
|
||||
function genericConnectionError() {
|
||||
var message = "The socket.io library could not be loaded from <code>" +
|
||||
source + "</code>. Ensure that it is not being blocked " +
|
||||
|
@ -1295,10 +1305,17 @@ function checkLetsEncrypt(socketConfig, nonLetsEncryptError) {
|
|||
});
|
||||
}
|
||||
|
||||
function initWS(servers) {
|
||||
console.log('Using WSShim');
|
||||
console.log("Connecting to " + JSON.stringify(servers[0]));
|
||||
window.socket = new WSShim(servers[0].url);
|
||||
setupCallbacks();
|
||||
}
|
||||
|
||||
(function () {
|
||||
$.getJSON("/socketconfig/" + CHANNEL.name + ".json")
|
||||
.done(function (socketConfig) {
|
||||
initSocketIO(socketConfig);
|
||||
initSocket(socketConfig);
|
||||
}).fail(function () {
|
||||
makeAlert("Error", "Failed to retrieve socket.io configuration. " +
|
||||
"Please try again in a few minutes.",
|
||||
|
|
|
@ -3304,19 +3304,23 @@ function maybePromptToUpgradeUserscript() {
|
|||
function backoffRetry(fn, cb, options) {
|
||||
var jitter = options.jitter || 0;
|
||||
var factor = options.factor || 1;
|
||||
var currentFactor = factor;
|
||||
var isRetryable = options.isRetryable || function () { return true; };
|
||||
var maxDelay = options.maxDelay || Infinity;
|
||||
var tries = 0;
|
||||
|
||||
function callback(error, result) {
|
||||
tries++;
|
||||
factor *= factor;
|
||||
if (error) {
|
||||
if (tries >= options.maxTries) {
|
||||
console.log('Max tries exceeded');
|
||||
cb(error, result);
|
||||
} else if (isRetryable(error)) {
|
||||
var offset = Math.random() * jitter;
|
||||
var delay = options.delay * factor + offset;
|
||||
var delay = Math.min(
|
||||
options.delay * currentFactor,
|
||||
maxDelay
|
||||
) + offset;
|
||||
console.log('Retrying on error: ' + error);
|
||||
console.log('Waiting ' + delay + ' ms before retrying');
|
||||
|
||||
|
@ -3327,6 +3331,8 @@ function backoffRetry(fn, cb, options) {
|
|||
} else {
|
||||
cb(error, result);
|
||||
}
|
||||
|
||||
currentFactor *= factor;
|
||||
}
|
||||
|
||||
fn(callback);
|
||||
|
|
147
www/js/ws.js
Normal file
147
www/js/ws.js
Normal file
|
@ -0,0 +1,147 @@
|
|||
(function () {
|
||||
var TYPE_FRAME = 0;
|
||||
var TYPE_ACK = 1;
|
||||
|
||||
function WSShim(url) {
|
||||
this._url = url;
|
||||
this._listeners = Object.create(null);
|
||||
this._connected = false;
|
||||
|
||||
this._ackId = 0;
|
||||
this._pendingAcks = Object.create(null);
|
||||
|
||||
this._openWS();
|
||||
}
|
||||
|
||||
WSShim.prototype.listeners = function listeners(frame) {
|
||||
if (!Object.prototype.hasOwnProperty.call(this._listeners, frame)) {
|
||||
this._listeners[frame] = [];
|
||||
}
|
||||
|
||||
return this._listeners[frame];
|
||||
};
|
||||
|
||||
WSShim.prototype.on = function on(frame, callback) {
|
||||
this.listeners(frame).push(callback);
|
||||
};
|
||||
|
||||
WSShim.prototype.once = function on(frame, callback) {
|
||||
callback._once = true;
|
||||
this.listeners(frame).push(callback);
|
||||
};
|
||||
|
||||
WSShim.prototype.emit = function emit(frame, payload, ack) {
|
||||
var message = {
|
||||
type: TYPE_FRAME,
|
||||
frame: frame,
|
||||
payload: payload
|
||||
};
|
||||
|
||||
if (ack && typeof ack === 'function') {
|
||||
message.ackId = ++this._ackId;
|
||||
this._pendingAcks[message.ackId] = ack;
|
||||
}
|
||||
|
||||
this._ws.send(JSON.stringify(message));
|
||||
};
|
||||
|
||||
WSShim.prototype._emit = function _emit(frame, payload) {
|
||||
var hasOnce = false;
|
||||
|
||||
this.listeners(frame).forEach(function (cb) {
|
||||
try {
|
||||
if (cb._once) {
|
||||
hasOnce = true;
|
||||
}
|
||||
|
||||
cb(payload);
|
||||
} catch (error) {
|
||||
console.error('Error in callback for ' + frame + ': ' + error);
|
||||
}
|
||||
});
|
||||
|
||||
if (hasOnce) {
|
||||
this._listeners[frame] = this._listeners[frame].filter(function (cb) {
|
||||
return !cb._once;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
WSShim.prototype._onopen = function _onopen() {
|
||||
this._connected = true;
|
||||
};
|
||||
|
||||
WSShim.prototype._onclose = function _onclose() {
|
||||
if (!this._connected) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._connected = false;
|
||||
this._emit('disconnect');
|
||||
|
||||
// TODO: checking for KICKED here is insufficient;
|
||||
// need to have some sort of explicit disconnect vs. connection loss
|
||||
// check
|
||||
if (!KICKED) {
|
||||
var self = this;
|
||||
|
||||
function reconnectAsync(cb) {
|
||||
self._openWS();
|
||||
|
||||
self._ws.addEventListener('open', function () {
|
||||
cb(null);
|
||||
});
|
||||
|
||||
self._ws.addEventListener('error', function (error) {
|
||||
cb(error);
|
||||
});
|
||||
}
|
||||
|
||||
var retryOpts = {
|
||||
delay: 1000,
|
||||
jitter: 1000,
|
||||
factor: 2,
|
||||
maxDelay: 20000
|
||||
};
|
||||
|
||||
setTimeout(function () {
|
||||
backoffRetry(reconnectAsync, function(){}, retryOpts);
|
||||
}, 1000);
|
||||
}
|
||||
};
|
||||
|
||||
WSShim.prototype._onmessage = function _onmessage(message) {
|
||||
try {
|
||||
var parsed = JSON.parse(message.data);
|
||||
var type = parsed.type;
|
||||
var frame = parsed.frame;
|
||||
var payload = parsed.payload;
|
||||
var ackId = parsed.ackId;
|
||||
|
||||
if (type === TYPE_ACK && ackId in this._pendingAcks) {
|
||||
this._pendingAcks[ackId](payload);
|
||||
delete this._pendingAcks[ackId];
|
||||
} else if (type === TYPE_FRAME) {
|
||||
this._emit(frame, payload);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Unparseable message from server: ' + message);
|
||||
console.error(error.stack);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
WSShim.prototype._openWS = function _openWS() {
|
||||
if (this._connected) {
|
||||
throw new Error('Cannot _openWS() when already connected');
|
||||
}
|
||||
|
||||
this._ws = new WebSocket(this._url);
|
||||
this._ws.onopen = this._onopen.bind(this);
|
||||
this._ws.onclose = this._onclose.bind(this);
|
||||
this._ws.onmessage = this._onmessage.bind(this);
|
||||
this._connected = false;
|
||||
};
|
||||
|
||||
window.WSShim = WSShim;
|
||||
})();
|
Loading…
Reference in a new issue