From f2ee0968d4f84c5cff791ceb390b69e589aed2f9 Mon Sep 17 00:00:00 2001 From: Matt Willian Date: Tue, 4 Nov 2025 16:25:38 -0800 Subject: [PATCH 1/2] feat: improve redis cluster handling --- lib/queue-factory.ts | 89 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 75 insertions(+), 14 deletions(-) diff --git a/lib/queue-factory.ts b/lib/queue-factory.ts index db64dff..985aaa8 100644 --- a/lib/queue-factory.ts +++ b/lib/queue-factory.ts @@ -1,4 +1,5 @@ import { Redis, Cluster, RedisOptions } from "ioredis"; +import { ConnectionOptions as TlsConnectionOptions } from "tls"; import { QueueType, getQueueType, redisOptsFromUrl } from "./utils"; import { Queue } from "bullmq"; @@ -16,6 +17,13 @@ const maxTime = 40000; // We keep a redis client that we can reuse for all the queues. let redisClients: Record = {} as any; +type NestedRedisOptions = Partial & { + tls?: TlsConnectionOptions; +}; +type RedisOptionsWithNested = RedisOptions & { + redisOptions?: NestedRedisOptions; +}; + export interface FoundQueue { prefix: string; name: string; @@ -167,21 +175,52 @@ export function getRedisClient( if (!redisClients[key]) { if (clusterNodes && clusterNodes.length) { - const { username, password } = redisOptsFromUrl(clusterNodes[0]); + const { username: nodeUsername, password: nodePassword } = + redisOptsFromUrl(clusterNodes[0]); + const { + username: redisOptsUsername, + password: redisOptsPassword, + tls: redisOptsTls, + redisOptions: suppliedRedisOptionsRaw, + ...clusterLevelOpts + } = redisOpts as RedisOptionsWithNested; + const suppliedRedisOptions: NestedRedisOptions = + suppliedRedisOptionsRaw ?? {}; + const baseRedisOptions: NestedRedisOptions = { + ...suppliedRedisOptions, + }; + delete baseRedisOptions.username; + delete baseRedisOptions.password; + delete baseRedisOptions.tls; + + const finalUsername = + redisOptsUsername ?? + suppliedRedisOptions.username ?? + nodeUsername; + if (finalUsername !== undefined) { + baseRedisOptions.username = finalUsername; + } + + const finalPassword = + redisOptsPassword ?? + suppliedRedisOptions.password ?? + nodePassword; + if (finalPassword !== undefined) { + baseRedisOptions.password = finalPassword; + } + + const mergedTls = mergeTlsConfigs( + getClusterTlsFromEnv(), + suppliedRedisOptions.tls, + redisOptsTls + ); + if (mergedTls) { + baseRedisOptions.tls = mergedTls; + } + redisClients[key] = new Redis.Cluster(clusterNodes, { - ...redisOpts, - redisOptions: { - username, - password, - tls: process.env.REDIS_CLUSTER_TLS - ? { - cert: Buffer.from( - process.env.REDIS_CLUSTER_TLS ?? "", - "base64" - ).toString("ascii"), - } - : undefined, - }, + ...clusterLevelOpts, + redisOptions: baseRedisOptions, }); } else { redisClients[key] = new Redis(redisOpts); @@ -300,3 +339,25 @@ export function createQueue( ); } } + +function getClusterTlsFromEnv(): TlsConnectionOptions | undefined { + if (!process.env.REDIS_CLUSTER_TLS) { + return undefined; + } + return { + cert: Buffer.from( + process.env.REDIS_CLUSTER_TLS ?? "", + "base64" + ).toString("ascii"), + }; +} + +function mergeTlsConfigs( + ...configs: Array +): TlsConnectionOptions | undefined { + const valid = configs.filter(Boolean) as TlsConnectionOptions[]; + if (!valid.length) { + return undefined; + } + return Object.assign({}, ...valid); +} From c8e51dea881f2514c5e643bd726cbad4271e93f4 Mon Sep 17 00:00:00 2001 From: Matt Willian Date: Mon, 10 Nov 2025 11:30:45 -0800 Subject: [PATCH 2/2] Inline cluster redis option merging --- lib/queue-factory.ts | 112 +++++++++++++++++-------------------------- 1 file changed, 45 insertions(+), 67 deletions(-) diff --git a/lib/queue-factory.ts b/lib/queue-factory.ts index 985aaa8..cce89fd 100644 --- a/lib/queue-factory.ts +++ b/lib/queue-factory.ts @@ -17,13 +17,6 @@ const maxTime = 40000; // We keep a redis client that we can reuse for all the queues. let redisClients: Record = {} as any; -type NestedRedisOptions = Partial & { - tls?: TlsConnectionOptions; -}; -type RedisOptionsWithNested = RedisOptions & { - redisOptions?: NestedRedisOptions; -}; - export interface FoundQueue { prefix: string; name: string; @@ -175,52 +168,59 @@ export function getRedisClient( if (!redisClients[key]) { if (clusterNodes && clusterNodes.length) { - const { username: nodeUsername, password: nodePassword } = - redisOptsFromUrl(clusterNodes[0]); - const { - username: redisOptsUsername, - password: redisOptsPassword, - tls: redisOptsTls, - redisOptions: suppliedRedisOptionsRaw, - ...clusterLevelOpts - } = redisOpts as RedisOptionsWithNested; - const suppliedRedisOptions: NestedRedisOptions = - suppliedRedisOptionsRaw ?? {}; - const baseRedisOptions: NestedRedisOptions = { - ...suppliedRedisOptions, + const firstClusterNode = clusterNodes[0]; + const nodeCredentials: Partial = firstClusterNode + ? redisOptsFromUrl(firstClusterNode) + : {}; + const userRedisOptions = + ((redisOpts as any).redisOptions as Record) ?? {}; + const redisOptions: Record = { + ...userRedisOptions, }; - delete baseRedisOptions.username; - delete baseRedisOptions.password; - delete baseRedisOptions.tls; - - const finalUsername = - redisOptsUsername ?? - suppliedRedisOptions.username ?? - nodeUsername; - if (finalUsername !== undefined) { - baseRedisOptions.username = finalUsername; + + const resolvedUsername = + redisOpts.username ?? + userRedisOptions.username ?? + nodeCredentials.username; + if (resolvedUsername !== undefined) { + redisOptions.username = resolvedUsername; + } else { + delete redisOptions.username; } - const finalPassword = - redisOptsPassword ?? - suppliedRedisOptions.password ?? - nodePassword; - if (finalPassword !== undefined) { - baseRedisOptions.password = finalPassword; + const resolvedPassword = + redisOpts.password ?? + userRedisOptions.password ?? + nodeCredentials.password; + if (resolvedPassword !== undefined) { + redisOptions.password = resolvedPassword; + } else { + delete redisOptions.password; } - const mergedTls = mergeTlsConfigs( - getClusterTlsFromEnv(), - suppliedRedisOptions.tls, - redisOptsTls - ); - if (mergedTls) { - baseRedisOptions.tls = mergedTls; + const tlsFromEnv = process.env.REDIS_CLUSTER_TLS + ? ({ + cert: Buffer.from( + process.env.REDIS_CLUSTER_TLS, + "base64" + ).toString("ascii"), + } as TlsConnectionOptions) + : undefined; + const mergedTls = Object.assign( + {}, + tlsFromEnv ?? {}, + (userRedisOptions.tls as TlsConnectionOptions | undefined) ?? {}, + (redisOpts.tls as TlsConnectionOptions | undefined) ?? {} + ) as TlsConnectionOptions; + if (Object.keys(mergedTls).length) { + redisOptions.tls = mergedTls; + } else { + delete redisOptions.tls; } redisClients[key] = new Redis.Cluster(clusterNodes, { - ...clusterLevelOpts, - redisOptions: baseRedisOptions, + ...redisOpts, + redisOptions, }); } else { redisClients[key] = new Redis(redisOpts); @@ -339,25 +339,3 @@ export function createQueue( ); } } - -function getClusterTlsFromEnv(): TlsConnectionOptions | undefined { - if (!process.env.REDIS_CLUSTER_TLS) { - return undefined; - } - return { - cert: Buffer.from( - process.env.REDIS_CLUSTER_TLS ?? "", - "base64" - ).toString("ascii"), - }; -} - -function mergeTlsConfigs( - ...configs: Array -): TlsConnectionOptions | undefined { - const valid = configs.filter(Boolean) as TlsConnectionOptions[]; - if (!valid.length) { - return undefined; - } - return Object.assign({}, ...valid); -}