Streaming: refactor to custom Error classes (#28632)

Co-authored-by: Renaud Chaput <renchap@gmail.com>
Co-authored-by: Claire <claire.github-309c@sitedethib.com>
This commit is contained in:
Emelia Smith 2024-02-22 14:20:20 +01:00 committed by GitHub
parent ebe2086087
commit 491dd97642
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 103 additions and 42 deletions

51
streaming/errors.js Normal file
View file

@ -0,0 +1,51 @@
// @ts-check
/**
* Typed as a string because otherwise it's a const string, which means we can't
* override it in let statements.
* @type {string}
*/
const UNEXPECTED_ERROR_MESSAGE = 'An unexpected error occurred';
exports.UNKNOWN_ERROR_MESSAGE = UNEXPECTED_ERROR_MESSAGE;
/**
* Extracts the status and message properties from the error object, if
* available for public use. The `unknown` is for catch statements
* @param {Error | AuthenticationError | RequestError | unknown} err
*/
exports.extractStatusAndMessage = function(err) {
let statusCode = 500;
let errorMessage = UNEXPECTED_ERROR_MESSAGE;
if (err instanceof AuthenticationError || err instanceof RequestError) {
statusCode = err.status;
errorMessage = err.message;
}
return { statusCode, errorMessage };
};
class RequestError extends Error {
/**
* @param {string} message
*/
constructor(message) {
super(message);
this.name = "RequestError";
this.status = 400;
}
}
exports.RequestError = RequestError;
class AuthenticationError extends Error {
/**
* @param {string} message
*/
constructor(message) {
super(message);
this.name = "AuthenticationError";
this.status = 401;
}
}
exports.AuthenticationError = AuthenticationError;

View file

@ -14,6 +14,8 @@ const pg = require('pg');
const dbUrlToConfig = require('pg-connection-string').parse; const dbUrlToConfig = require('pg-connection-string').parse;
const WebSocket = require('ws'); const WebSocket = require('ws');
const errors = require('./errors');
const { AuthenticationError, RequestError } = require('./errors');
const { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } = require('./logging'); const { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } = require('./logging');
const { setupMetrics } = require('./metrics'); const { setupMetrics } = require('./metrics');
const { isTruthy, normalizeHashtag, firstParam } = require("./utils"); const { isTruthy, normalizeHashtag, firstParam } = require("./utils");
@ -324,7 +326,7 @@ const startServer = async () => {
// Unfortunately for using the on('upgrade') setup, we need to manually // Unfortunately for using the on('upgrade') setup, we need to manually
// write a HTTP Response to the Socket to close the connection upgrade // write a HTTP Response to the Socket to close the connection upgrade
// attempt, so the following code is to handle all of that. // attempt, so the following code is to handle all of that.
const statusCode = err.status ?? 401; const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);
/** @type {Record<string, string | number | import('pino-http').ReqId>} */ /** @type {Record<string, string | number | import('pino-http').ReqId>} */
const headers = { const headers = {
@ -332,7 +334,7 @@ const startServer = async () => {
'Content-Type': 'text/plain', 'Content-Type': 'text/plain',
'Content-Length': 0, 'Content-Length': 0,
'X-Request-Id': request.id, 'X-Request-Id': request.id,
'X-Error-Message': err.status ? err.toString() : 'An unexpected error occurred' 'X-Error-Message': errorMessage
}; };
// Ensure the socket is closed once we've finished writing to it: // Ensure the socket is closed once we've finished writing to it:
@ -350,7 +352,7 @@ const startServer = async () => {
statusCode, statusCode,
headers headers
} }
}, err.toString()); }, errorMessage);
return; return;
} }
@ -535,11 +537,7 @@ const startServer = async () => {
} }
if (result.rows.length === 0) { if (result.rows.length === 0) {
err = new Error('Invalid access token'); reject(new AuthenticationError('Invalid access token'));
// @ts-ignore
err.status = 401;
reject(err);
return; return;
} }
@ -570,11 +568,7 @@ const startServer = async () => {
const accessToken = location.query.access_token || req.headers['sec-websocket-protocol']; const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
if (!authorization && !accessToken) { if (!authorization && !accessToken) {
const err = new Error('Missing access token'); reject(new AuthenticationError('Missing access token'));
// @ts-ignore
err.status = 401;
reject(err);
return; return;
} }
@ -651,11 +645,7 @@ const startServer = async () => {
return; return;
} }
const err = new Error('Access token does not cover required scopes'); reject(new AuthenticationError('Access token does not have the required scopes'));
// @ts-ignore
err.status = 401;
reject(err);
}); });
/** /**
@ -731,11 +721,7 @@ const startServer = async () => {
// If no channelName can be found for the request, then we should terminate // If no channelName can be found for the request, then we should terminate
// the connection, as there's nothing to stream back // the connection, as there's nothing to stream back
if (!channelName) { if (!channelName) {
const err = new Error('Unknown channel requested'); next(new RequestError('Unknown channel requested'));
// @ts-ignore
err.status = 400;
next(err);
return; return;
} }
@ -762,10 +748,7 @@ const startServer = async () => {
return; return;
} }
const hasStatusCode = Object.hasOwnProperty.call(err, 'status'); const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);
// @ts-ignore
const statusCode = hasStatusCode ? err.status : 500;
const errorMessage = hasStatusCode ? err.toString() : 'An unexpected error occurred';
res.writeHead(statusCode, { 'Content-Type': 'application/json' }); res.writeHead(statusCode, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: errorMessage })); res.end(JSON.stringify({ error: errorMessage }));
@ -1140,7 +1123,7 @@ const startServer = async () => {
}; };
/** /**
* @param {any} res * @param {http.ServerResponse} res
*/ */
const httpNotFound = res => { const httpNotFound = res => {
res.writeHead(404, { 'Content-Type': 'application/json' }); res.writeHead(404, { 'Content-Type': 'application/json' });
@ -1155,16 +1138,29 @@ const startServer = async () => {
api.use(errorMiddleware); api.use(errorMiddleware);
api.get('/api/v1/streaming/*', (req, res) => { api.get('/api/v1/streaming/*', (req, res) => {
// @ts-ignore const channelName = channelNameFromPath(req);
channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
// FIXME: In theory we'd never actually reach here due to
// authenticationMiddleware catching this case, however, we need to refactor
// how those middlewares work, so I'm adding the extra check in here.
if (!channelName) {
httpNotFound(res);
return;
}
channelNameToIds(req, channelName, req.query).then(({ channelIds, options }) => {
const onSend = streamToHttp(req, res); const onSend = streamToHttp(req, res);
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
// @ts-ignore // @ts-ignore
streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering); streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering);
}).catch(err => { }).catch(err => {
res.log.info({ err }, 'Subscription error:', err.toString()); const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);
httpNotFound(res);
res.log.info({ err }, 'Eventsource subscription error');
res.writeHead(statusCode, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: errorMessage }));
}); });
}); });
@ -1265,8 +1261,8 @@ const startServer = async () => {
break; break;
case 'hashtag': case 'hashtag':
if (!params.tag || params.tag.length === 0) { if (!params.tag) {
reject('No tag for stream provided'); reject(new RequestError('Missing tag name parameter'));
} else { } else {
resolve({ resolve({
channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`], channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
@ -1276,8 +1272,8 @@ const startServer = async () => {
break; break;
case 'hashtag:local': case 'hashtag:local':
if (!params.tag || params.tag.length === 0) { if (!params.tag) {
reject('No tag for stream provided'); reject(new RequestError('Missing tag name parameter'));
} else { } else {
resolve({ resolve({
channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`], channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
@ -1287,19 +1283,23 @@ const startServer = async () => {
break; break;
case 'list': case 'list':
// @ts-ignore if (!params.list) {
reject(new RequestError('Missing list name parameter'));
return;
}
authorizeListAccess(params.list, req).then(() => { authorizeListAccess(params.list, req).then(() => {
resolve({ resolve({
channelIds: [`timeline:list:${params.list}`], channelIds: [`timeline:list:${params.list}`],
options: { needsFiltering: false }, options: { needsFiltering: false },
}); });
}).catch(() => { }).catch(() => {
reject('Not authorized to stream this list'); reject(new AuthenticationError('Not authorized to stream this list'));
}); });
break; break;
default: default:
reject('Unknown stream type'); reject(new RequestError('Unknown stream type'));
} }
}); });
@ -1353,8 +1353,17 @@ const startServer = async () => {
stopHeartbeat, stopHeartbeat,
}; };
}).catch(err => { }).catch(err => {
logger.error({ err }, 'Subscription error'); const {statusCode, errorMessage } = errors.extractStatusAndMessage(err);
websocket.send(JSON.stringify({ error: err.toString() }));
logger.error({ err }, 'Websocket subscription error');
// If we have a socket that is alive and open still, send the error back to the client:
if (websocket.isAlive && websocket.readyState === websocket.OPEN) {
websocket.send(JSON.stringify({
error: errorMessage,
status: statusCode
}));
}
}); });
}; };
@ -1393,10 +1402,11 @@ const startServer = async () => {
channelNameToIds(request, channelName, params).then(({ channelIds }) => { channelNameToIds(request, channelName, params).then(({ channelIds }) => {
removeSubscription(session, channelIds); removeSubscription(session, channelIds);
}).catch(err => { }).catch(err => {
logger.error({err}, 'Unsubscribe error'); logger.error({err}, 'Websocket unsubscribe error');
// If we have a socket that is alive and open still, send the error back to the client: // If we have a socket that is alive and open still, send the error back to the client:
if (websocket.isAlive && websocket.readyState === websocket.OPEN) { if (websocket.isAlive && websocket.readyState === websocket.OPEN) {
// TODO: Use a better error response here
websocket.send(JSON.stringify({ error: "Error unsubscribing from channel" })); websocket.send(JSON.stringify({ error: "Error unsubscribing from channel" }));
} }
}); });