Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 62 additions & 25 deletions packages/storage-azure/src/staticHandler.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,60 @@
import type { ContainerClient } from '@azure/storage-blob'
import type { BlobDownloadResponseParsed, ContainerClient } from '@azure/storage-blob'
import type { StaticHandler } from '@payloadcms/plugin-cloud-storage/types'
import type { CollectionConfig } from 'payload'
import type { Readable } from 'stream'

import { RestError } from '@azure/storage-blob'
import { getFilePrefix } from '@payloadcms/plugin-cloud-storage/utilities'
import path from 'path'
import { getRangeRequestInfo } from 'payload/internal'

const isNodeReadableStream = (
body: BlobDownloadResponseParsed['readableStreamBody'],
): body is Readable => {
return (
typeof body === 'object' &&
body !== null &&
'pipe' in body &&
typeof body.pipe === 'function' &&
'destroy' in body &&
typeof body.destroy === 'function'
)
}

const abortRequestAndDestroyStream = ({
abortController,
blob,
}: {
abortController: AbortController
blob?: BlobDownloadResponseParsed
}) => {
try {
abortController.abort()
} catch {
/* noop */
}
if (blob?.readableStreamBody && isNodeReadableStream(blob.readableStreamBody)) {
blob.readableStreamBody.destroy()
}
}

interface Args {
collection: CollectionConfig
getStorageClient: () => ContainerClient
}

export const getHandler = ({ collection, getStorageClient }: Args): StaticHandler => {
return async (req, { headers: incomingHeaders, params: { clientUploadContext, filename } }) => {
let blob: BlobDownloadResponseParsed | undefined = undefined
let streamed = false

const abortController = new AbortController()
if (req.signal) {
req.signal.addEventListener('abort', () => {
abortRequestAndDestroyStream({ abortController, blob })
})
}

try {
const prefix = await getFilePrefix({ clientUploadContext, collection, filename, req })
const blockBlobClient = getStorageClient().getBlockBlobClient(
Expand All @@ -40,13 +81,14 @@ export const getHandler = ({ collection, getStorageClient }: Args): StaticHandle
}

// Download with range if partial
const blob =
blob =
rangeResult.type === 'partial'
? await blockBlobClient.download(
rangeResult.rangeStart,
rangeResult.rangeEnd - rangeResult.rangeStart + 1,
{ abortSignal: abortController.signal },
)
: await blockBlobClient.download()
: await blockBlobClient.download(0, undefined, { abortSignal: abortController.signal })

let headers = new Headers(incomingHeaders)

Expand Down Expand Up @@ -83,36 +125,31 @@ export const getHandler = ({ collection, getStorageClient }: Args): StaticHandle
})
}

// Manually create a ReadableStream for the web from a Node.js stream.
const readableStream = new ReadableStream({
start(controller) {
const nodeStream = blob.readableStreamBody
if (!nodeStream) {
throw new Error('No readable stream body')
}

nodeStream.on('data', (chunk) => {
controller.enqueue(new Uint8Array(chunk))
})
nodeStream.on('end', () => {
controller.close()
})
nodeStream.on('error', (err) => {
controller.error(err)
})
},
})
if (!blob.readableStreamBody || !isNodeReadableStream(blob.readableStreamBody)) {
return new Response('Internal Server Error', { status: 500 })
}

return new Response(readableStream, {
headers,
status: rangeResult.status,
const stream = blob.readableStreamBody
stream.on('error', (err: Error) => {
req.payload.logger.error({
err,
msg: 'Error while streaming Azure blob (aborting)',
})
abortRequestAndDestroyStream({ abortController, blob })
})

streamed = true
return new Response(stream, { headers, status: rangeResult.status })
} catch (err: unknown) {
if (err instanceof RestError && err.statusCode === 404) {
return new Response(null, { status: 404, statusText: 'Not Found' })
}
req.payload.logger.error(err)
return new Response('Internal Server Error', { status: 500 })
} finally {
if (!streamed) {
abortRequestAndDestroyStream({ abortController, blob })
}
}
}
}