Streaming: Improve Redis connection options handling (#31623)

This commit is contained in:
Emelia Smith 2024-09-04 16:10:26 +02:00 committed by GitHub
parent 585e369e0b
commit 9ba81eae3e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 142 additions and 42 deletions

View file

@ -111,6 +111,35 @@ const startServer = async () => {
const server = http.createServer(); const server = http.createServer();
const wss = new WebSocketServer({ noServer: true }); const wss = new WebSocketServer({ noServer: true });
/**
* Adds a namespace to Redis keys or channel names
* Fixes: https://github.com/redis/ioredis/issues/1910
* @param {string} keyOrChannel
* @returns {string}
*/
function redisNamespaced(keyOrChannel) {
if (redisConfig.namespace) {
return `${redisConfig.namespace}:${keyOrChannel}`;
} else {
return keyOrChannel;
}
}
/**
* Removes the redis namespace from a channel name
* @param {string} channel
* @returns {string}
*/
function redisUnnamespaced(channel) {
if (typeof redisConfig.namespace === "string") {
// Note: this removes the configured namespace and the colon that is used
// to separate it:
return channel.slice(redisConfig.namespace.length + 1);
} else {
return channel;
}
}
// Set the X-Request-Id header on WebSockets: // Set the X-Request-Id header on WebSockets:
wss.on("headers", function onHeaders(headers, req) { wss.on("headers", function onHeaders(headers, req) {
headers.push(`X-Request-Id: ${req.id}`); headers.push(`X-Request-Id: ${req.id}`);
@ -200,7 +229,6 @@ const startServer = async () => {
const subs = {}; const subs = {};
const redisSubscribeClient = Redis.createClient(redisConfig, logger); const redisSubscribeClient = Redis.createClient(redisConfig, logger);
const { redisPrefix } = redisConfig;
// When checking metrics in the browser, the favicon is requested this // When checking metrics in the browser, the favicon is requested this
// prevents the request from falling through to the API Router, which would // prevents the request from falling through to the API Router, which would
@ -222,7 +250,7 @@ const startServer = async () => {
const interval = 6 * 60; const interval = 6 * 60;
const tellSubscribed = () => { const tellSubscribed = () => {
channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3)); channels.forEach(channel => redisClient.set(redisNamespaced(`subscribed:${channel}`), '1', 'EX', interval * 3));
}; };
tellSubscribed(); tellSubscribed();
@ -240,11 +268,10 @@ const startServer = async () => {
*/ */
const onRedisMessage = (channel, message) => { const onRedisMessage = (channel, message) => {
metrics.redisMessagesReceived.inc(); metrics.redisMessagesReceived.inc();
logger.debug(`New message on channel ${channel}`);
const callbacks = subs[channel]; const key = redisUnnamespaced(channel);
const callbacks = subs[key];
logger.debug(`New message on channel ${redisPrefix}${channel}`);
if (!callbacks) { if (!callbacks) {
return; return;
} }
@ -273,7 +300,8 @@ const startServer = async () => {
if (subs[channel].length === 0) { if (subs[channel].length === 0) {
logger.debug(`Subscribe ${channel}`); logger.debug(`Subscribe ${channel}`);
redisSubscribeClient.subscribe(channel, (err, count) => {
redisSubscribeClient.subscribe(redisNamespaced(channel), (err, count) => {
if (err) { if (err) {
logger.error(`Error subscribing to ${channel}`); logger.error(`Error subscribing to ${channel}`);
} else if (typeof count === 'number') { } else if (typeof count === 'number') {
@ -300,7 +328,9 @@ const startServer = async () => {
if (subs[channel].length === 0) { if (subs[channel].length === 0) {
logger.debug(`Unsubscribe ${channel}`); logger.debug(`Unsubscribe ${channel}`);
redisSubscribeClient.unsubscribe(channel, (err, count) => {
// FIXME: https://github.com/redis/ioredis/issues/1910
redisSubscribeClient.unsubscribe(redisNamespaced(channel), (err, count) => {
if (err) { if (err) {
logger.error(`Error unsubscribing to ${channel}`); logger.error(`Error unsubscribing to ${channel}`);
} else if (typeof count === 'number') { } else if (typeof count === 'number') {
@ -481,14 +511,14 @@ const startServer = async () => {
}); });
res.on('close', () => { res.on('close', () => {
unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener); unsubscribe(accessTokenChannelId, listener);
unsubscribe(`${redisPrefix}${systemChannelId}`, listener); unsubscribe(systemChannelId, listener);
metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
}); });
subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); subscribe(accessTokenChannelId, listener);
subscribe(`${redisPrefix}${systemChannelId}`, listener); subscribe(systemChannelId, listener);
metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2); metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
}; };
@ -805,11 +835,11 @@ const startServer = async () => {
}; };
channelIds.forEach(id => { channelIds.forEach(id => {
subscribe(`${redisPrefix}${id}`, listener); subscribe(id, listener);
}); });
if (typeof attachCloseHandler === 'function') { if (typeof attachCloseHandler === 'function') {
attachCloseHandler(channelIds.map(id => `${redisPrefix}${id}`), listener); attachCloseHandler(channelIds, listener);
} }
return listener; return listener;
@ -1156,7 +1186,7 @@ const startServer = async () => {
} }
channelIds.forEach(channelId => { channelIds.forEach(channelId => {
unsubscribe(`${redisPrefix}${channelId}`, subscription.listener); unsubscribe(channelId, subscription.listener);
}); });
metrics.connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); metrics.connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
@ -1200,8 +1230,8 @@ const startServer = async () => {
}, },
}); });
subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); subscribe(accessTokenChannelId, listener);
subscribe(`${redisPrefix}${systemChannelId}`, listener); subscribe(systemChannelId, listener);
subscriptions[accessTokenChannelId] = { subscriptions[accessTokenChannelId] = {
channelName: 'system', channelName: 'system',

View file

@ -4,44 +4,114 @@ import { parseIntFromEnvValue } from './utils.js';
/** /**
* @typedef RedisConfiguration * @typedef RedisConfiguration
* @property {import('ioredis').RedisOptions} redisParams * @property {string|undefined} namespace
* @property {string} redisPrefix * @property {string|undefined} url
* @property {string|undefined} redisUrl * @property {import('ioredis').RedisOptions} options
*/ */
/**
*
* @param {NodeJS.ProcessEnv} env
* @returns {boolean}
*/
function hasSentinelConfiguration(env) {
return (
typeof env.REDIS_SENTINELS === 'string' &&
env.REDIS_SENTINELS.length > 0 &&
typeof env.REDIS_SENTINEL_MASTER === 'string' &&
env.REDIS_SENTINEL_MASTER.length > 0
);
}
/**
*
* @param {NodeJS.ProcessEnv} env
* @param {import('ioredis').SentinelConnectionOptions} commonOptions
* @returns {import('ioredis').SentinelConnectionOptions}
*/
function getSentinelConfiguration(env, commonOptions) {
const redisDatabase = parseIntFromEnvValue(env.REDIS_DB, 0, 'REDIS_DB');
const sentinelPort = parseIntFromEnvValue(env.REDIS_SENTINEL_PORT, 26379, 'REDIS_SENTINEL_PORT');
const sentinels = env.REDIS_SENTINELS.split(',').map((sentinel) => {
const [host, port] = sentinel.split(':', 2);
/** @type {import('ioredis').SentinelAddress} */
return {
host: host,
port: port ?? sentinelPort,
// Force support for both IPv6 and IPv4, by default ioredis sets this to 4,
// only allowing IPv4 connections:
// https://github.com/redis/ioredis/issues/1576
family: 0
};
});
return {
db: redisDatabase,
name: env.REDIS_SENTINEL_MASTER,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
sentinelUsername: env.REDIS_SENTINEL_USERNAME ?? env.REDIS_USERNAME,
sentinelPassword: env.REDIS_SENTINEL_PASSWORD ?? env.REDIS_PASSWORD,
sentinels,
...commonOptions,
};
}
/** /**
* @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from * @param {NodeJS.ProcessEnv} env the `process.env` value to read configuration from
* @returns {RedisConfiguration} configuration for the Redis connection * @returns {RedisConfiguration} configuration for the Redis connection
*/ */
export function configFromEnv(env) { export function configFromEnv(env) {
// ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*, const redisNamespace = env.REDIS_NAMESPACE;
// which means we can't use it. But this is something that should be looked into.
const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
// These options apply for both REDIS_URL based connections and connections
// using the other REDIS_* environment variables:
const commonOptions = {
// Force support for both IPv6 and IPv4, by default ioredis sets this to 4,
// only allowing IPv4 connections:
// https://github.com/redis/ioredis/issues/1576
family: 0
// Note: we don't use auto-prefixing of keys since this doesn't apply to
// subscribe/unsubscribe which have "channel" instead of "key" arguments
};
// If we receive REDIS_URL, don't continue parsing any other REDIS_*
// environment variables:
if (typeof env.REDIS_URL === 'string' && env.REDIS_URL.length > 0) {
return {
url: env.REDIS_URL,
options: commonOptions,
namespace: redisNamespace
};
}
// If we have configuration for Redis Sentinel mode, prefer that:
if (hasSentinelConfiguration(env)) {
return {
options: getSentinelConfiguration(env, commonOptions),
namespace: redisNamespace
};
}
// Finally, handle all the other REDIS_* environment variables:
let redisPort = parseIntFromEnvValue(env.REDIS_PORT, 6379, 'REDIS_PORT'); let redisPort = parseIntFromEnvValue(env.REDIS_PORT, 6379, 'REDIS_PORT');
let redisDatabase = parseIntFromEnvValue(env.REDIS_DB, 0, 'REDIS_DB'); let redisDatabase = parseIntFromEnvValue(env.REDIS_DB, 0, 'REDIS_DB');
/** @type {import('ioredis').RedisOptions} */ /** @type {import('ioredis').RedisOptions} */
const redisParams = { const options = {
host: env.REDIS_HOST || '127.0.0.1', host: env.REDIS_HOST ?? '127.0.0.1',
port: redisPort, port: redisPort,
// Force support for both IPv6 and IPv4, by default ioredis sets this to 4,
// only allowing IPv4 connections:
// https://github.com/redis/ioredis/issues/1576
family: 0,
db: redisDatabase, db: redisDatabase,
password: env.REDIS_PASSWORD || undefined, username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
...commonOptions,
}; };
// redisParams.path takes precedence over host and port.
if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
redisParams.path = env.REDIS_URL.slice(7);
}
return { return {
redisParams, options,
redisPrefix, namespace: redisNamespace
redisUrl: typeof env.REDIS_URL === 'string' ? env.REDIS_URL : undefined,
}; };
} }
@ -50,13 +120,13 @@ export function configFromEnv(env) {
* @param {import('pino').Logger} logger * @param {import('pino').Logger} logger
* @returns {Redis} * @returns {Redis}
*/ */
export function createClient({ redisParams, redisUrl }, logger) { export function createClient({ url, options }, logger) {
let client; let client;
if (typeof redisUrl === 'string') { if (typeof url === 'string') {
client = new Redis(redisUrl, redisParams); client = new Redis(url, options);
} else { } else {
client = new Redis(redisParams); client = new Redis(options);
} }
client.on('error', (err) => logger.error({ err }, 'Redis Client Error!')); client.on('error', (err) => logger.error({ err }, 'Redis Client Error!'));