From ab11ffa7c9a6c2bd047ff23c4c28f7909b22fe2d Mon Sep 17 00:00:00 2001 From: Paul Popus Date: Thu, 26 Feb 2026 13:32:10 +0000 Subject: [PATCH] fix(storage-azure): add stream aborts for error handling and connection closure --- packages/storage-azure/src/staticHandler.ts | 87 +++++++++++++++------ 1 file changed, 62 insertions(+), 25 deletions(-) diff --git a/packages/storage-azure/src/staticHandler.ts b/packages/storage-azure/src/staticHandler.ts index 24467cb95cc..2f9dfd482e3 100644 --- a/packages/storage-azure/src/staticHandler.ts +++ b/packages/storage-azure/src/staticHandler.ts @@ -1,12 +1,43 @@ -import type { ContainerClient } from '@azure/storage-blob' +import type { BlobDownloadResponseParsed, ContainerClient } from '@azure/storage-blob' import type { StaticHandler } from '@payloadcms/plugin-cloud-storage/types' import type { CollectionConfig } from 'payload' +import type { Readable } from 'stream' import { RestError } from '@azure/storage-blob' import { getFilePrefix } from '@payloadcms/plugin-cloud-storage/utilities' import path from 'path' import { getRangeRequestInfo } from 'payload/internal' +const isNodeReadableStream = ( + body: BlobDownloadResponseParsed['readableStreamBody'], +): body is Readable => { + return ( + typeof body === 'object' && + body !== null && + 'pipe' in body && + typeof body.pipe === 'function' && + 'destroy' in body && + typeof body.destroy === 'function' + ) +} + +const abortRequestAndDestroyStream = ({ + abortController, + blob, +}: { + abortController: AbortController + blob?: BlobDownloadResponseParsed +}) => { + try { + abortController.abort() + } catch { + /* noop */ + } + if (blob?.readableStreamBody && isNodeReadableStream(blob.readableStreamBody)) { + blob.readableStreamBody.destroy() + } +} + interface Args { collection: CollectionConfig getStorageClient: () => ContainerClient @@ -14,6 +45,16 @@ interface Args { export const getHandler = ({ collection, getStorageClient }: Args): StaticHandler => { return async (req, { headers: incomingHeaders, params: { clientUploadContext, filename } }) => { + let blob: BlobDownloadResponseParsed | undefined = undefined + let streamed = false + + const abortController = new AbortController() + if (req.signal) { + req.signal.addEventListener('abort', () => { + abortRequestAndDestroyStream({ abortController, blob }) + }) + } + try { const prefix = await getFilePrefix({ clientUploadContext, collection, filename, req }) const blockBlobClient = getStorageClient().getBlockBlobClient( @@ -40,13 +81,14 @@ export const getHandler = ({ collection, getStorageClient }: Args): StaticHandle } // Download with range if partial - const blob = + blob = rangeResult.type === 'partial' ? await blockBlobClient.download( rangeResult.rangeStart, rangeResult.rangeEnd - rangeResult.rangeStart + 1, + { abortSignal: abortController.signal }, ) - : await blockBlobClient.download() + : await blockBlobClient.download(0, undefined, { abortSignal: abortController.signal }) let headers = new Headers(incomingHeaders) @@ -83,36 +125,31 @@ export const getHandler = ({ collection, getStorageClient }: Args): StaticHandle }) } - // Manually create a ReadableStream for the web from a Node.js stream. - const readableStream = new ReadableStream({ - start(controller) { - const nodeStream = blob.readableStreamBody - if (!nodeStream) { - throw new Error('No readable stream body') - } - - nodeStream.on('data', (chunk) => { - controller.enqueue(new Uint8Array(chunk)) - }) - nodeStream.on('end', () => { - controller.close() - }) - nodeStream.on('error', (err) => { - controller.error(err) - }) - }, - }) + if (!blob.readableStreamBody || !isNodeReadableStream(blob.readableStreamBody)) { + return new Response('Internal Server Error', { status: 500 }) + } - return new Response(readableStream, { - headers, - status: rangeResult.status, + const stream = blob.readableStreamBody + stream.on('error', (err: Error) => { + req.payload.logger.error({ + err, + msg: 'Error while streaming Azure blob (aborting)', + }) + abortRequestAndDestroyStream({ abortController, blob }) }) + + streamed = true + return new Response(stream, { headers, status: rangeResult.status }) } catch (err: unknown) { if (err instanceof RestError && err.statusCode === 404) { return new Response(null, { status: 404, statusText: 'Not Found' }) } req.payload.logger.error(err) return new Response('Internal Server Error', { status: 500 }) + } finally { + if (!streamed) { + abortRequestAndDestroyStream({ abortController, blob }) + } } } }