Skip to content

Commit 900c8ef

Browse files
committed
improvement(data-drains): extract normalizePrefix and buildObjectKey to shared utils
1 parent 74a7833 commit 900c8ef

4 files changed

Lines changed: 42 additions & 82 deletions

File tree

apps/sim/lib/data-drains/destinations/azure_blob.ts

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@sim/logger'
22
import { generateShortId } from '@sim/utils/id'
33
import { z } from 'zod'
4+
import { buildObjectKey, normalizePrefix } from '@/lib/data-drains/destinations/utils'
45
import type { DrainDestination } from '@/lib/data-drains/types'
56

67
const logger = createLogger('DataDrainAzureBlobDestination')
@@ -93,31 +94,6 @@ async function buildClients(
9394
return { containerClient: blobServiceClient.getContainerClient(config.containerName) }
9495
}
9596

96-
function normalizePrefix(raw: string | undefined): string {
97-
if (!raw) return ''
98-
const trimmed = raw.replace(/^\/+/, '').replace(/\/+$/, '')
99-
return trimmed.length === 0 ? '' : `${trimmed}/`
100-
}
101-
102-
function buildBlobName(
103-
config: AzureBlobDestinationConfig,
104-
metadata: {
105-
drainId: string
106-
runId: string
107-
source: string
108-
sequence: number
109-
runStartedAt: Date
110-
}
111-
): string {
112-
const partition = metadata.runStartedAt
113-
const yyyy = partition.getUTCFullYear().toString().padStart(4, '0')
114-
const mm = (partition.getUTCMonth() + 1).toString().padStart(2, '0')
115-
const dd = partition.getUTCDate().toString().padStart(2, '0')
116-
const seq = metadata.sequence.toString().padStart(5, '0')
117-
const prefix = normalizePrefix(config.prefix)
118-
return `${prefix}${metadata.source}/${metadata.drainId}/${yyyy}/${mm}/${dd}/${metadata.runId}-${seq}.ndjson`
119-
}
120-
12197
interface AzureRestErrorLike {
12298
statusCode?: number
12399
code?: string
@@ -182,7 +158,7 @@ export const azureBlobDestination: DrainDestination<
182158
async deliver({ body, contentType, metadata, signal }) {
183159
if (clientsPromise === null) clientsPromise = buildClients(config, credentials)
184160
const { containerClient } = await clientsPromise
185-
const blobName = buildBlobName(config, metadata)
161+
const blobName = buildObjectKey(config.prefix, metadata)
186162
const blockBlobClient = containerClient.getBlockBlobClient(blobName)
187163
await withAzureErrorContext('put-object', () =>
188164
blockBlobClient.upload(body, body.byteLength, {

apps/sim/lib/data-drains/destinations/gcs.ts

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { generateShortId } from '@sim/utils/id'
44
import { JWT } from 'google-auth-library'
55
import { z } from 'zod'
66
import {
7+
buildObjectKey,
8+
normalizePrefix,
79
type ParsedServiceAccount,
810
parseServiceAccount,
911
refineServiceAccountJson,
@@ -62,31 +64,6 @@ const gcsCredentialsSchema = z
6264
export type GCSDestinationConfig = z.infer<typeof gcsConfigSchema>
6365
export type GCSDestinationCredentials = z.infer<typeof gcsCredentialsSchema>
6466

65-
function normalizePrefix(raw: string | undefined): string {
66-
if (!raw) return ''
67-
const trimmed = raw.replace(/^\/+/, '').replace(/\/+$/, '')
68-
return trimmed.length === 0 ? '' : `${trimmed}/`
69-
}
70-
71-
function buildObjectName(
72-
config: GCSDestinationConfig,
73-
metadata: {
74-
drainId: string
75-
runId: string
76-
source: string
77-
sequence: number
78-
runStartedAt: Date
79-
}
80-
): string {
81-
const partition = metadata.runStartedAt
82-
const yyyy = partition.getUTCFullYear().toString().padStart(4, '0')
83-
const mm = (partition.getUTCMonth() + 1).toString().padStart(2, '0')
84-
const dd = partition.getUTCDate().toString().padStart(2, '0')
85-
const seq = metadata.sequence.toString().padStart(5, '0')
86-
const prefix = normalizePrefix(config.prefix)
87-
return `${prefix}${metadata.source}/${metadata.drainId}/${yyyy}/${mm}/${dd}/${metadata.runId}-${seq}.ndjson`
88-
}
89-
9067
function buildJwt(account: ParsedServiceAccount): JWT {
9168
return new JWT({ email: account.clientEmail, key: account.privateKey, scopes: [SCOPE] })
9269
}
@@ -278,7 +255,7 @@ export const gcsDestination: DrainDestination<GCSDestinationConfig, GCSDestinati
278255
const jwt = buildJwt(account)
279256
return {
280257
async deliver({ body, contentType, metadata, signal }) {
281-
const objectName = buildObjectName(config, metadata)
258+
const objectName = buildObjectKey(config.prefix, metadata)
282259
await uploadObject('put-object', {
283260
bucket: config.bucket,
284261
objectName,

apps/sim/lib/data-drains/destinations/s3.ts

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { generateShortId } from '@sim/utils/id'
99
import { z } from 'zod'
1010
import { validateExternalUrl } from '@/lib/core/security/input-validation'
1111
import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
12+
import { buildObjectKey, normalizePrefix } from '@/lib/data-drains/destinations/utils'
1213
import type { DrainDestination } from '@/lib/data-drains/types'
1314

1415
const logger = createLogger('DataDrainS3Destination')
@@ -58,35 +59,6 @@ function buildClient(config: S3DestinationConfig, credentials: S3DestinationCred
5859
})
5960
}
6061

61-
function normalizePrefix(raw: string | undefined): string {
62-
if (!raw) return ''
63-
// S3 keys cannot start with `/` (creates an empty-name segment); also
64-
// collapse trailing slashes so the joiner produces a single boundary.
65-
const trimmed = raw.replace(/^\/+/, '').replace(/\/+$/, '')
66-
return trimmed.length === 0 ? '' : `${trimmed}/`
67-
}
68-
69-
function buildKey(
70-
config: S3DestinationConfig,
71-
metadata: {
72-
drainId: string
73-
runId: string
74-
source: string
75-
sequence: number
76-
runStartedAt: Date
77-
}
78-
): string {
79-
// Partition by the run's start time so all chunks from one run share a
80-
// single date prefix even if delivery crosses a midnight boundary.
81-
const partition = metadata.runStartedAt
82-
const yyyy = partition.getUTCFullYear().toString().padStart(4, '0')
83-
const mm = (partition.getUTCMonth() + 1).toString().padStart(2, '0')
84-
const dd = partition.getUTCDate().toString().padStart(2, '0')
85-
const seq = metadata.sequence.toString().padStart(5, '0')
86-
const prefix = normalizePrefix(config.prefix)
87-
return `${prefix}${metadata.source}/${metadata.drainId}/${yyyy}/${mm}/${dd}/${metadata.runId}-${seq}.ndjson`
88-
}
89-
9062
function isS3ServiceException(error: unknown): error is S3ServiceException {
9163
return (
9264
typeof error === 'object' &&
@@ -192,7 +164,7 @@ export const s3Destination: DrainDestination<S3DestinationConfig, S3DestinationC
192164
async deliver({ body, contentType, metadata, signal }) {
193165
if (endpointCheck === null) endpointCheck = assertEndpointIsPublic(config.endpoint)
194166
await endpointCheck
195-
const key = buildKey(config, metadata)
167+
const key = buildObjectKey(config.prefix, metadata)
196168
await withS3ErrorContext('put-object', () =>
197169
client.send(
198170
new PutObjectCommand({

apps/sim/lib/data-drains/destinations/utils.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,41 @@ export function sleepUntilAborted(ms: number, signal: AbortSignal): Promise<void
2121
})
2222
}
2323

24+
/**
25+
* Strips leading and trailing slashes from a path prefix and re-appends a
26+
* single trailing slash. Object stores reject keys that begin with `/`
27+
* (it produces an empty-name segment), and we want exactly one boundary
28+
* between prefix and the rest of the key.
29+
*/
30+
export function normalizePrefix(raw: string | undefined): string {
31+
if (!raw) return ''
32+
const trimmed = raw.replace(/^\/+/, '').replace(/\/+$/, '')
33+
return trimmed.length === 0 ? '' : `${trimmed}/`
34+
}
35+
36+
export interface ObjectKeyMetadata {
37+
drainId: string
38+
runId: string
39+
source: string
40+
sequence: number
41+
runStartedAt: Date
42+
}
43+
44+
/**
45+
* Builds a date-partitioned NDJSON object key for blob-store destinations.
46+
* Layout: `<prefix><source>/<drainId>/<YYYY>/<MM>/<DD>/<runId>-<seq>.ndjson`.
47+
* Partition uses the run's start time so all chunks from a run share one
48+
* date prefix even if delivery crosses a UTC midnight boundary.
49+
*/
50+
export function buildObjectKey(prefix: string | undefined, metadata: ObjectKeyMetadata): string {
51+
const partition = metadata.runStartedAt
52+
const yyyy = partition.getUTCFullYear().toString().padStart(4, '0')
53+
const mm = (partition.getUTCMonth() + 1).toString().padStart(2, '0')
54+
const dd = partition.getUTCDate().toString().padStart(2, '0')
55+
const seq = metadata.sequence.toString().padStart(5, '0')
56+
return `${normalizePrefix(prefix)}${metadata.source}/${metadata.drainId}/${yyyy}/${mm}/${dd}/${metadata.runId}-${seq}.ndjson`
57+
}
58+
2459
export interface ParsedServiceAccount {
2560
clientEmail: string
2661
privateKey: string

0 commit comments

Comments
 (0)