Compare commits

...

16 commits

Author SHA1 Message Date
Calvin Montgomery 5864836ce7 Lock uws to 10.148.1 for now because the maintainer went apeshit 2018-07-26 21:10:01 -07:00
Calvin Montgomery 17cf6023db Merge branch '3.0' into uws 2018-07-26 21:02:01 -07:00
Calvin Montgomery 3584b124e3 Fix double setupCallbacks() 2018-07-11 21:32:31 -07:00
Calvin Montgomery 3322e5404f Make uws listener configurable 2018-07-11 21:24:39 -07:00
Calvin Montgomery 2340da1cb5 Adjust retry mechanism 2018-07-10 21:52:41 -07:00
Calvin Montgomery 07502575eb Add once() to clientside ws shim 2018-07-10 21:52:41 -07:00
Calvin Montgomery 17911d43fc Start working on reconnect logic 2018-07-10 21:52:41 -07:00
Calvin Montgomery f0ba3a998a Handle errors in broadcast emit 2018-07-10 21:52:41 -07:00
Calvin Montgomery 3547a51f2c Remove node.js 6 from .travis.yml since uws no longer supports it 2018-07-10 21:52:41 -07:00
Calvin Montgomery 85947d326e Fix mismatch between single-emit and broadcast encoding 2018-07-10 21:52:41 -07:00
Calvin Montgomery 4cad31d82c Add test for typecheckedOn 2018-07-10 21:52:41 -07:00
Calvin Montgomery 86e023d233 Refactor protocol and add acks 2018-07-10 21:52:41 -07:00
Calvin Montgomery d369f1ebe4 Add uws middleware 2018-07-10 21:52:41 -07:00
Calvin Montgomery 61b856c2c9 Refactor socket.io middleware for future uws compatibility 2018-07-10 21:52:41 -07:00
Calvin Montgomery 1021cc706a More work on uws integration 2018-07-10 21:52:41 -07:00
Calvin Montgomery fb5f92b7ee Initial experiment for UWS support 2018-07-10 21:52:41 -07:00
15 changed files with 845 additions and 28 deletions

View file

@ -11,4 +11,3 @@ env:
node_js:
- "10"
- "8"
- "6"

View file

@ -41,6 +41,7 @@
"sprintf-js": "^1.0.3",
"toml": "^2.3.0",
"uuid": "^3.2.1",
"uws": "10.148.1",
"yamljs": "^0.2.8"
},
"scripts": {

View file

@ -725,6 +725,7 @@ Channel.prototype.handleReadLog = function (user) {
Channel.prototype.broadcastToRoom = function (msg, data, ns) {
sio.instance.in(ns).emit(msg, data);
require('../io/uws').in(ns).emit(msg, data);
};
Channel.prototype.broadcastAll = function (msg, data) {

View file

@ -6,11 +6,47 @@ export default class IOConfiguration {
getSocketEndpoints() {
return this.config.endpoints.slice();
}
getUWSEndpoints() {
return this.config.uwsEndpoints.slice();
}
}
function getUWSEndpoints(oldConfig) {
const uwsEndpoints = oldConfig.get('listen').filter(it => it.uws)
.map(it => {
let domain;
if (it.https) {
domain = oldConfig.get('https.domain')
.replace(/^https/, 'wss');
} else {
domain = oldConfig.get('io.domain')
.replace(/^http/, 'ws');
}
return {
secure: !!it.https,
url: `${domain}:${it.port}`
};
});
uwsEndpoints.sort((a, b) => {
if (a.secure && !b.secure) {
return -1;
} else if (b.secure && !a.secure) {
return 1;
} else {
return 0;
}
});
return uwsEndpoints;
}
IOConfiguration.fromOldConfig = function (oldConfig) {
const config = {
endpoints: []
endpoints: [],
uwsEndpoints: getUWSEndpoints(oldConfig)
};
if (oldConfig.get('io.ipv4-ssl')) {

View file

@ -7,8 +7,10 @@ export default class NullClusterClient {
getSocketConfig(_channel) {
const servers = this.ioConfig.getSocketEndpoints();
const uwsServers = this.ioConfig.getUWSEndpoints();
return Promise.resolve({
servers: servers
servers: servers,
uwsServers: uwsServers
});
}
}

View file

@ -18,6 +18,7 @@ import { Counter, Gauge } from 'prom-client';
import Socket from 'socket.io/lib/socket';
import { TokenBucket } from '../util/token-bucket';
import http from 'http';
import { UWSServer } from './uws';
const LOGGER = require('@calzoneman/jsli')('ioserver');
@ -34,6 +35,20 @@ const authFailureCount = new Counter({
help: 'Number of failed authentications from session middleware'
});
class SocketIOContext {
constructor(socket) {
socket.handshake.connection = {
remoteAddress: socket.handshake.address
};
this.upgradeReq = socket.handshake;
this.ipAddress = null;
this.torConnection = null;
this.ipSessionFirstSeen = null;
this.user = null;
}
}
class IOServer {
constructor(options = {
proxyTrustFn: proxyaddr.compile('127.0.0.1')
@ -49,21 +64,16 @@ class IOServer {
// Map proxied sockets to the real IP address via X-Forwarded-For
// If the resulting address is a known Tor exit, flag it as such
ipProxyMiddleware(socket, next) {
if (!socket.context) socket.context = {};
try {
socket.handshake.connection = {
remoteAddress: socket.handshake.address
};
socket.context.ipAddress = proxyaddr(
socket.handshake,
socket.context.upgradeReq,
this.proxyTrustFn
);
if (!socket.context.ipAddress) {
throw new Error(
`Assertion failed: unexpected IP ${socket.context.ipAddress}`
'Could not determine IP address from ' +
socket.context.upgradeReq.connection.remoteAddress
);
}
} catch (error) {
@ -163,7 +173,7 @@ class IOServer {
// Parse cookies
cookieParsingMiddleware(socket, next) {
const req = socket.handshake;
const req = socket.context.upgradeReq;
if (req.headers.cookie) {
cookieParser(req, null, () => next());
} else {
@ -176,7 +186,7 @@ class IOServer {
// Determine session age from ip-session cookie
// (Used for restricting chat)
ipSessionCookieMiddleware(socket, next) {
const cookie = socket.handshake.signedCookies['ip-session'];
const cookie = socket.context.upgradeReq.signedCookies['ip-session'];
if (!cookie) {
socket.context.ipSessionFirstSeen = new Date();
next();
@ -197,7 +207,7 @@ class IOServer {
socket.context.aliases = [];
const promises = [];
const auth = socket.handshake.signedCookies.auth;
const auth = socket.context.upgradeReq.signedCookies.auth;
if (auth) {
promises.push(verifySession(auth).then(user => {
socket.context.user = Object.assign({}, user);
@ -266,6 +276,10 @@ class IOServer {
patchTypecheckedFunctions();
const io = this.io = sio.instance = sio();
io.use((socket, next) => {
socket.context = new SocketIOContext(socket);
next();
});
io.use(this.ipProxyMiddleware.bind(this));
io.use(this.ipBanMiddleware.bind(this));
io.use(this.ipThrottleMiddleware.bind(this));
@ -276,14 +290,29 @@ class IOServer {
io.on('connection', this.handleConnection.bind(this));
}
bindTo(servers) {
initUWS() {
const uws = this.uws = new UWSServer();
uws.use(this.ipProxyMiddleware.bind(this));
uws.use(this.ipBanMiddleware.bind(this));
uws.use(this.ipThrottleMiddleware.bind(this));
uws.use(this.cookieParsingMiddleware.bind(this));
uws.use(this.ipSessionCookieMiddleware.bind(this));
uws.use(this.authUserMiddleware.bind(this));
uws.on('connection', this.handleConnection.bind(this));
}
bindTo(sioServers, uwsServers) {
if (!this.io) {
throw new Error('Cannot bind: socket.io has not been initialized yet');
}
servers.forEach(server => {
sioServers.forEach(server => {
this.io.attach(server);
});
uwsServers.forEach(server => {
this.uws.attach(server);
});
}
}
@ -417,15 +446,23 @@ module.exports = {
});
ioServer.initSocketIO();
ioServer.initUWS();
const uniqueListenAddresses = new Set();
const servers = [];
const uwsServers = [];
Config.get("listen").forEach(function (bind) {
if (!bind.io) {
if (bind.io && bind.uws) {
throw new Error(
'Cannot bind both socket.io and uws to the same listener'
);
} else if (!bind.io && !bind.uws) {
return;
}
const serverList = bind.io ? servers : uwsServers;
const id = bind.ip + ":" + bind.port;
if (uniqueListenAddresses.has(id)) {
LOGGER.warn("Ignoring duplicate listen address %s", id);
@ -433,19 +470,21 @@ module.exports = {
}
if (srv.servers.hasOwnProperty(id)) {
servers.push(srv.servers[id]);
serverList.push(srv.servers[id]);
} else {
const server = http.createServer().listen(bind.port, bind.ip);
servers.push(server);
serverList.push(server);
}
uniqueListenAddresses.add(id);
});
ioServer.bindTo(servers);
ioServer.bindTo(servers, uwsServers);
},
IOServer: IOServer
IOServer: IOServer,
SocketIOContext: SocketIOContext
};
/* Clean out old rate limiters */

261
src/io/uws.js Normal file
View file

@ -0,0 +1,261 @@
import { EventEmitter } from 'events';
import { Multimap } from '../util/multimap';
import clone from 'clone';
import typecheck from 'json-typecheck';
import uws from 'uws';
const LOGGER = require('@calzoneman/jsli')('uws');
const TYPE_FRAME = 0;
const TYPE_ACK = 1;
const rooms = new Multimap();
class UWSContext {
constructor(upgradeReq) {
this.upgradeReq = upgradeReq;
this.ipAddress = null;
this.torConnection = null;
this.ipSessionFirstSeen = null;
this.user = null;
}
}
class UWSWrapper extends EventEmitter {
constructor(socket) {
super();
this._uwsSocket = socket;
this._joined = new Set();
this.disconnected = false;
this.context = new UWSContext({
connection: {
remoteAddress: socket._socket.remoteAddress
},
headers: clone(socket.upgradeReq.headers)
});
// socket.io metrics compatibility
this.client = {
conn: {
on: function(){},
transport: {
name: 'uws'
}
}
};
this._uwsSocket.on('message', message => {
try {
this._decode(message);
} catch (error) {
LOGGER.warn(
'Decode failed for client %s: %s',
this.context.ipAddress,
error
);
this.disconnect();
}
});
this._uwsSocket.on('close', () => {
this.disconnected = true;
for (let room of this._joined) {
rooms.delete(room, this);
}
this._joined.clear();
this._emit('disconnect');
});
this._uwsSocket.on('error', error => {
// TODO: determine what conditions cause this
LOGGER.error(
'Error for client %s: %s',
this.context.ipAddress,
error.stack
);
});
}
disconnect() {
this._uwsSocket.terminate();
}
emit(frame, payload) {
sendSafe(this, encode(frame, payload));
}
join(room) {
this._joined.add(room);
rooms.set(room, this);
}
leave(room) {
this._joined.delete(room);
rooms.delete(room, this);
}
typecheckedOn(frame, typeDef, cb) {
this.on(frame, (data, ack) => {
typecheck(data, typeDef, (err, data) => {
if (err) {
this.emit('errorMsg', {
msg: 'Unexpected error for message ' + frame + ': ' +
err.message
});
} else {
cb(data, ack);
}
});
});
}
typecheckedOnce(frame, typeDef, cb) {
this.once(frame, (data, ack) => {
typecheck(data, typeDef, (err, data) => {
if (err) {
this.emit('errorMsg', {
msg: 'Unexpected error for message ' + frame + ': ' +
err.message
});
} else {
cb(data, ack);
}
});
});
}
_ack(ackId, payload) {
sendSafe(
this,
JSON.stringify({
type: TYPE_ACK,
ackId,
payload
})
);
}
_decode(message) {
const { frame, type, ackId, payload } = JSON.parse(message);
if (type !== TYPE_FRAME) {
LOGGER.warn(
'Unexpected message type %s from client; dropping',
type
);
return;
}
const args = [payload];
if (typeof ackId === 'number') {
args.push(payload => this._ack(ackId, payload));
}
this._emit(frame, ...args);
}
}
Object.assign(UWSWrapper.prototype, { _emit: EventEmitter.prototype.emit });
class UWSServer extends EventEmitter {
constructor() {
super();
this._servers = [];
this._middleware = [];
}
use(cb) {
this._middleware.push(cb);
}
attach(server) {
const uwsServer = new uws.Server({
server,
perMessageDeflate: false
});
this._servers.push(uwsServer);
uwsServer.on('connection', socket => this._onConnection(socket));
server.on('listening', () => this.emit('listening'));
uwsServer.on('error', e => this.emit('error', e));
}
_onConnection(uwsSocket) {
const socket = new UWSWrapper(uwsSocket);
if (this._middleware.length === 0) {
this._acceptConnection(socket);
return;
}
let i = 0;
const self = this;
function next(error) {
if (error) {
socket.emit('error', error.message);
socket.disconnect();
return;
}
if (i >= self._middleware.length) {
self._acceptConnection(socket);
return;
}
process.nextTick(self._middleware[i], socket, next);
i++;
}
process.nextTick(next, null);
}
_acceptConnection(socket) {
socket.emit('connect');
this.emit('connection', socket);
}
shutdown() {
this._servers.forEach(sv => sv.close());
}
}
function encode(frame, payload) {
return JSON.stringify({
type: TYPE_FRAME,
frame,
payload
});
}
function sendSafe(socket, message) {
try {
socket._uwsSocket.send(message);
} catch (error) {
LOGGER.error(
'Error sending to client %s: %s',
socket.context.ipAddress,
error.stack
);
socket.disconnect();
}
}
function inRoom(room) {
return {
emit(frame, payload) {
const encoded = encode(frame, payload);
for (let wrapper of rooms.get(room)) {
sendSafe(wrapper, encoded);
}
}
};
}
export { UWSServer };
exports['in'] = inRoom;

45
src/util/multimap.js Normal file
View file

@ -0,0 +1,45 @@
class Multimap {
constructor() {
this._items = new Map();
}
get(key) {
if (this._items.has(key)) {
return this._items.get(key);
}
return new Set();
}
has(key, value) {
if (!this._items.has(key)) {
return false;
}
return this._items.get(key).has(value);
}
set(key, value) {
if (!this._items.has(key)) {
this._items.set(key, new Set());
}
return this._items.get(key).add(value);
}
delete(key, value) {
if (!this._items.has(key)) {
return false;
}
const res = this._items.get(key).delete(value);
if (this._items.get(key).size == 0) {
this._items.delete(key);
}
return res;
}
}
export { Multimap };

View file

@ -247,6 +247,7 @@ html(lang="en")
script(src="/js/player.js")
script(src="/js/paginator.js")
script(src="/js/ui.js")
script(src="/js/ws.js")
script(src="/js/callbacks.js")
script(defer, src="https://www.youtube.com/iframe_api")
script(defer, src="https://api.dmcdn.net/all.js")

View file

@ -1,5 +1,6 @@
const assert = require('assert');
const IOServer = require('../../lib/io/ioserver').IOServer;
const SocketIOContext = require('../../lib/io/ioserver').SocketIOContext;
describe('IOServer', () => {
let server;
@ -7,9 +8,6 @@ describe('IOServer', () => {
beforeEach(() => {
server = new IOServer();
socket = {
context: {
ipAddress: '9.9.9.9'
},
handshake: {
address: '127.0.0.1',
headers: {
@ -17,6 +15,7 @@ describe('IOServer', () => {
}
}
};
socket.context = new SocketIOContext(socket);
});
describe('#ipProxyMiddleware', () => {
@ -29,7 +28,7 @@ describe('IOServer', () => {
});
it('does not proxy from a non-trusted address', done => {
socket.handshake.address = '5.6.7.8';
socket.context.upgradeReq.connection.remoteAddress = '5.6.7.8';
server.ipProxyMiddleware(socket, error => {
assert(!error);
assert.strictEqual(socket.context.ipAddress, '5.6.7.8');

229
test/io/uws.js Normal file
View file

@ -0,0 +1,229 @@
const { EventEmitter } = require('events');
const assert = require('assert');
const { UWSServer } = require('../../lib/io/uws');
const inRoom = require('../../lib/io/uws')['in'];
const WebSocket = require('uws');
const http = require('http');
describe('UWSServer', () => {
const endpoint = 'ws://127.0.0.1:3000';
let httpServer;
let server;
let socket;
beforeEach(done => {
httpServer = http.createServer();
httpServer.listen(3000);
server = new UWSServer();
server.attach(httpServer);
server.on('error', e => { throw e; });
server.once('listening', done);
});
function connect() {
let socket = new WebSocket(endpoint);
socket.test = new EventEmitter();
socket.onmessage = message => {
const { type, frame, payload, ackId } = JSON.parse(message.data);
if (type === 0) {
socket.test.emit(frame, payload);
} else if (type === 1) {
socket.test.emit('ack', ackId, payload);
}
};
socket.onerror = e => { throw e; };
return socket;
}
afterEach(() => {
if (socket) socket.terminate();
socket = null;
if (server) server.shutdown();
server = null;
if (httpServer) httpServer.close();
httpServer = null;
});
it('accepts a connection immediately if there is no middleware', done => {
socket = connect();
socket.test.on('connect', done);
});
it('accepts a connection with middleware', done => {
let m1 = false, m2 = false;
server.use((socket, next) => {
m1 = true;
next();
});
server.use((socket, next) => {
m2 = true;
next();
});
socket = connect();
socket.test.on('connect', () => {
assert(m1);
assert(m2);
done();
});
});
it('rejects a connection with middleware', done => {
let m1 = false, m2 = false;
server.use((socket, next) => {
m1 = true;
next(new Error('broken'));
});
server.use((socket, next) => {
m2 = true;
next();
});
socket = connect();
socket.test.on('connect', () => {
throw new Error('Unexpected connect callback');
});
socket.test.on('error', e => {
assert.strictEqual(e, 'broken');
assert(!m2);
done();
});
});
it('receives a normal frame', done => {
server.on('connection', s => {
s.on('test', data => {
assert.deepStrictEqual(data, {foo: 'bar'});
done();
});
});
socket = connect();
socket.onopen = () => {
socket.send(JSON.stringify({
type: 0,
frame: 'test',
payload: { foo: 'bar' }
}));
};
});
it('sends a normal frame', done => {
server.on('connection', s => {
s.emit('test', { foo: 'bar' });
});
socket = connect();
socket.test.on('test', data => {
assert.deepStrictEqual(data, { foo: 'bar' });
done();
});
});
it('broadcasts to a room', done => {
server.on('connection', s => {
s.join('testroom');
inRoom('testroom').emit('test', { foo: 'bar' });
});
socket = connect();
socket.test.on('test', data => {
assert.deepStrictEqual(data, { foo: 'bar' });
done();
});
});
it('responds with an ack frame', done => {
server.on('connection', s => {
s.on('test', (data, ack) => {
assert.deepStrictEqual(data, {foo: 'bar'});
ack({ baz: 'quux' });
});
});
socket = connect();
socket.onopen = () => {
socket.send(JSON.stringify({
type: 0,
frame: 'test',
payload: { foo: 'bar' },
ackId: 1
}));
socket.test.on('ack', (ackId, payload) => {
assert.strictEqual(ackId, 1);
assert.deepStrictEqual(payload, { baz: 'quux' });
done();
});
};
});
it('typechecks input frames', done => {
server.on('connection', s => {
s.typecheckedOn('test', { foo: 'string' }, data => {
assert.fail('Should not have reached callback');
});
});
socket = connect();
socket.onopen = () => {
socket.send(JSON.stringify({
type: 0,
frame: 'test',
payload: { foo: 123 }
}));
socket.test.on('errorMsg', payload => {
assert.equal(
payload.msg,
'Unexpected error for message test: ' +
'Expected key foo to be of type string, instead got number'
);
done();
});
};
});
it('catches errors during socket.emit()', done => {
server.on('connection', s => {
s.join('testroom');
s._uwsSocket.send = () => { throw new Error('well darn'); };
s.emit('test', { foo: 'bar' });
done();
});
socket = connect();
});
it('catches errors during inRoom().emit()', done => {
server.on('connection', s => {
s.join('testroom');
s._uwsSocket.send = () => { throw new Error('well darn'); };
inRoom('testroom').emit('test', { foo: 'bar' });
done();
});
socket = connect();
});
it('sets disconnected = true after a disconnect', done => {
server.on('connection', s => {
assert.strictEqual(s.disconnected, false);
s.on('disconnect', () => {
assert.strictEqual(s.disconnected, true);
done();
});
s.disconnect();
});
socket = connect();
});
});

34
test/util/multimap.js Normal file
View file

@ -0,0 +1,34 @@
const assert = require('assert');
const { Multimap } = require('../../lib/util/multimap');
describe('Multimap', () => {
let map;
beforeEach(() => {
map = new Multimap();
});
it('returns the empty set for an unset key', () => {
assert.deepEqual(map.get('unknown'), new Set());
});
it('returns a set of values for a given key', () => {
map.set('a', 1);
map.set('a', 2);
map.set('a', 1);
assert.deepEqual(map.get('a'), new Set([1, 2]));
});
it('deletes a value for a given key', () => {
map.set('a', 1);
map.set('a', 2);
map.delete('a', 1);
assert.deepEqual(map.get('a'), new Set([2]));
map.delete('a', 2);
assert.deepEqual(map.get('a'), new Set());
});
});

View file

@ -1221,7 +1221,17 @@ function ioServerConnect(socketConfig) {
var USING_LETS_ENCRYPT = false;
function initSocket(socketConfig) {
if (socketConfig.uwsServers && socketConfig.uwsServers.length > 0) {
initWS(socketConfig.uwsServers);
} else {
initSocketIO(socketConfig);
}
}
function initSocketIO(socketConfig) {
console.log('Using socket.io');
function genericConnectionError() {
var message = "The socket.io library could not be loaded from <code>" +
source + "</code>. Ensure that it is not being blocked " +
@ -1295,10 +1305,17 @@ function checkLetsEncrypt(socketConfig, nonLetsEncryptError) {
});
}
function initWS(servers) {
console.log('Using WSShim');
console.log("Connecting to " + JSON.stringify(servers[0]));
window.socket = new WSShim(servers[0].url);
setupCallbacks();
}
(function () {
$.getJSON("/socketconfig/" + CHANNEL.name + ".json")
.done(function (socketConfig) {
initSocketIO(socketConfig);
initSocket(socketConfig);
}).fail(function () {
makeAlert("Error", "Failed to retrieve socket.io configuration. " +
"Please try again in a few minutes.",

View file

@ -3304,19 +3304,23 @@ function maybePromptToUpgradeUserscript() {
function backoffRetry(fn, cb, options) {
var jitter = options.jitter || 0;
var factor = options.factor || 1;
var currentFactor = factor;
var isRetryable = options.isRetryable || function () { return true; };
var maxDelay = options.maxDelay || Infinity;
var tries = 0;
function callback(error, result) {
tries++;
factor *= factor;
if (error) {
if (tries >= options.maxTries) {
console.log('Max tries exceeded');
cb(error, result);
} else if (isRetryable(error)) {
var offset = Math.random() * jitter;
var delay = options.delay * factor + offset;
var delay = Math.min(
options.delay * currentFactor,
maxDelay
) + offset;
console.log('Retrying on error: ' + error);
console.log('Waiting ' + delay + ' ms before retrying');
@ -3327,6 +3331,8 @@ function backoffRetry(fn, cb, options) {
} else {
cb(error, result);
}
currentFactor *= factor;
}
fn(callback);

147
www/js/ws.js Normal file
View file

@ -0,0 +1,147 @@
(function () {
var TYPE_FRAME = 0;
var TYPE_ACK = 1;
function WSShim(url) {
this._url = url;
this._listeners = Object.create(null);
this._connected = false;
this._ackId = 0;
this._pendingAcks = Object.create(null);
this._openWS();
}
WSShim.prototype.listeners = function listeners(frame) {
if (!Object.prototype.hasOwnProperty.call(this._listeners, frame)) {
this._listeners[frame] = [];
}
return this._listeners[frame];
};
WSShim.prototype.on = function on(frame, callback) {
this.listeners(frame).push(callback);
};
WSShim.prototype.once = function on(frame, callback) {
callback._once = true;
this.listeners(frame).push(callback);
};
WSShim.prototype.emit = function emit(frame, payload, ack) {
var message = {
type: TYPE_FRAME,
frame: frame,
payload: payload
};
if (ack && typeof ack === 'function') {
message.ackId = ++this._ackId;
this._pendingAcks[message.ackId] = ack;
}
this._ws.send(JSON.stringify(message));
};
WSShim.prototype._emit = function _emit(frame, payload) {
var hasOnce = false;
this.listeners(frame).forEach(function (cb) {
try {
if (cb._once) {
hasOnce = true;
}
cb(payload);
} catch (error) {
console.error('Error in callback for ' + frame + ': ' + error);
}
});
if (hasOnce) {
this._listeners[frame] = this._listeners[frame].filter(function (cb) {
return !cb._once;
});
}
};
WSShim.prototype._onopen = function _onopen() {
this._connected = true;
};
WSShim.prototype._onclose = function _onclose() {
if (!this._connected) {
return;
}
this._connected = false;
this._emit('disconnect');
// TODO: checking for KICKED here is insufficient;
// need to have some sort of explicit disconnect vs. connection loss
// check
if (!KICKED) {
var self = this;
function reconnectAsync(cb) {
self._openWS();
self._ws.addEventListener('open', function () {
cb(null);
});
self._ws.addEventListener('error', function (error) {
cb(error);
});
}
var retryOpts = {
delay: 1000,
jitter: 1000,
factor: 2,
maxDelay: 20000
};
setTimeout(function () {
backoffRetry(reconnectAsync, function(){}, retryOpts);
}, 1000);
}
};
WSShim.prototype._onmessage = function _onmessage(message) {
try {
var parsed = JSON.parse(message.data);
var type = parsed.type;
var frame = parsed.frame;
var payload = parsed.payload;
var ackId = parsed.ackId;
if (type === TYPE_ACK && ackId in this._pendingAcks) {
this._pendingAcks[ackId](payload);
delete this._pendingAcks[ackId];
} else if (type === TYPE_FRAME) {
this._emit(frame, payload);
}
} catch (error) {
console.error('Unparseable message from server: ' + message);
console.error(error.stack);
return;
}
};
WSShim.prototype._openWS = function _openWS() {
if (this._connected) {
throw new Error('Cannot _openWS() when already connected');
}
this._ws = new WebSocket(this._url);
this._ws.onopen = this._onopen.bind(this);
this._ws.onclose = this._onclose.bind(this);
this._ws.onmessage = this._onmessage.bind(this);
this._connected = false;
};
window.WSShim = WSShim;
})();