Skip to content

Commit a7cf780

Browse files
committed
refactor streaming logic for simplified datasource
1 parent c826431 commit a7cf780

File tree

4 files changed

+142
-131
lines changed

4 files changed

+142
-131
lines changed

src/adapters/controllers/get-models.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export default function getModelsFactory (listModels) {
2020
httpRequest.stream = true
2121
return
2222
}
23-
23+
2424
const { content, contentType } = getContent(httpRequest, models)
2525

2626
return {

src/adapters/datasources/datasource-mongodb.js

Lines changed: 32 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

3-
import CircuitBreaker from '../../domain/circuit-breaker'
4-
import DataSource from '../../domain/datasource'
3+
const CircuitBreaker = require('../../domain/circuit-breaker').default
4+
const DataSource = require('../../domain/datasource').default
55

66
const HIGHWATERMARK = 50
77

@@ -55,6 +55,23 @@ export class DataSourceMongoDb extends DataSource {
5555
}
5656
}
5757

58+
async connectionPool () {
59+
return new Promise((resolve, reject) => {
60+
if (this.db) return resolve(this.db)
61+
MongoClient.connect(
62+
this.url,
63+
{
64+
...this.mongoOpts,
65+
poolSize: dsOptions.numConns || 2
66+
},
67+
(err, database) => {
68+
if (err) return reject(err)
69+
resolve((this.db = database.db(this.namespace)))
70+
}
71+
)
72+
})
73+
}
74+
5875
async connection () {
5976
try {
6077
while (connections.length < (dsOptions.numConns || 1)) {
@@ -69,7 +86,7 @@ export class DataSourceMongoDb extends DataSource {
6986
}
7087
}
7188
const breaker = CircuitBreaker(
72-
'mongo.conn',
89+
'mongodb.connect',
7390
this.connect(client),
7491
thresholds
7592
)
@@ -186,14 +203,12 @@ export class DataSourceMongoDb extends DataSource {
186203
/**
187204
*
188205
* @param {Object} filter Supposed to be a valid Mongo Filter
189-
* @param {Object} options Options to sort limit aggregate etc...
190-
* @param {Object} options.sort a valid Mongo sort object
191-
* @param {Number} options.limit a valid Mongo limit
192-
* @param {Object} options.aggregate a valid Mongo aggregate object
206+
* @param {Object} sort a valid Mongo sort object
207+
* @param {Number} limit a valid Mongo limit
208+
* @param {Object} aggregate a valid Mongo aggregate object
193209
*
194-
* @returns
210+
* @returns {Promise<import('mongodb').AbstractCursor>}
195211
*/
196-
197212
async mongoFind ({ filter, sort, limit, aggregate, skip } = {}) {
198213
console.log({ fn: this.mongoFind.name, filter })
199214
let cursor = (await this.collection()).find(filter)
@@ -204,112 +219,23 @@ export class DataSourceMongoDb extends DataSource {
204219
return cursor
205220
}
206221

207-
/**
208-
* Pipes to writable and streams list. List can be filtered. Stream
209-
* is serialized by default. Stream can be modified by transform.
210-
*
211-
* @param {{
212-
* filter:*
213-
* transform:Transform
214-
* serialize:boolean
215-
* }} param0
216-
* @returns
217-
*/
218-
streamList ({ writable, serialize, transform, options }) {
219-
try {
220-
let first = true
221-
222-
const serializer = new Transform({
223-
writableObjectMode: true,
224-
225-
// start of array
226-
construct (callback) {
227-
this.push('[')
228-
callback()
229-
},
230-
231-
// each chunk is a record
232-
transform (chunk, _encoding, next) {
233-
// comma-separate
234-
if (first) first = false
235-
else this.push(',')
236-
237-
// serialize record
238-
this.push(JSON.stringify(chunk))
239-
next()
240-
},
241-
242-
// end of array
243-
flush (callback) {
244-
this.push(']')
245-
callback()
246-
}
247-
})
248-
249-
return new Promise(async (resolve, reject) => {
250-
const readable = (await this.mongoFind(options)).stream()
251-
252-
readable.on('error', reject)
253-
readable.on('end', resolve)
254-
255-
// optionally transform db stream then pipe to output
256-
if (transform && serialize)
257-
readable
258-
.pipe(transform)
259-
.pipe(serializer)
260-
.pipe(writable)
261-
else if (transform) readable.pipe(transform).pipe(writable)
262-
else if (serialize) readable.pipe(serializer).pipe(writable)
263-
else readable.pipe(writable)
264-
})
265-
} catch (error) {}
266-
}
267-
268222
processOptions (param) {
269223
const { options = {}, query = {} } = param
270224
return { ...options, ...processQuery(query) }
271225
}
272226

273227
/**
274-
* Returns the set of objects satisfying the `filter` if specified;
275-
* otherwise returns all objects. If a `writable`stream is provided and `cached`
276-
* is false, the list is streamed. Otherwise the list is returned in
277-
* an array. A custom transform can be specified to modify the streamed
278-
* results. Using {@link createWriteStream} updates can be streamed back
279-
* to the db. With streams, we can support queries of very large tables,
280-
* with minimal memory overhead on the node server.
281228
*
282229
* @override
283-
* @param {{key1:string, keyN:string}} filter - e.g. http query
284-
* @param {{
285-
* writable: WritableStream,
286-
* cached: boolean,
287-
* serialize: boolean,
288-
* transform: Transform
289-
* }} params
290-
* - details
291-
* - `serialize` seriailize input to writable
292-
* - `cached` list cache only
293-
* - `transform` transform stream before writing
294-
* - `writable` writable stream for output
230+
* @param {import('../../domain/datasource').listOptions} param
295231
*/
296-
async list (param = {}) {
297-
const {
298-
writable = null,
299-
transform = null,
300-
serialize = false,
301-
query = {}
302-
} = param
303-
232+
async list (param) {
304233
try {
305-
if (query.__cached) return super.listSync(query)
306-
if (query.__count) return this.count()
307-
308234
const options = this.processOptions(param)
309235
console.log({ options })
310236

311-
if (writable) {
312-
return this.streamList({ writable, serialize, transform, options })
237+
if (param.streamRequested) {
238+
return (await this.mongoFind(options)).stream()
313239
}
314240

315241
return (await this.mongoFind(options)).toArray()
@@ -318,6 +244,10 @@ export class DataSourceMongoDb extends DataSource {
318244
}
319245
}
320246

247+
/**
248+
*
249+
* @override
250+
*/
321251
async count () {
322252
return {
323253
total: await this.countDb(),

src/domain/datasource-factory.js

Lines changed: 76 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
import { Transform } from 'stream'
3+
import { Readable, Transform, Writable } from 'node:stream'
44
import { isMainThread } from 'worker_threads'
55
/** @typedef {import('.').Model} Model */
66

@@ -94,39 +94,99 @@ const DsCoreExtensions = superclass =>
9494
}
9595
}
9696

97-
transform () {
97+
hydrate () {
9898
const ctx = this
9999

100100
return new Transform({
101101
objectMode: true,
102102

103-
transform (chunk, _encoding, next) {
103+
transform (chunk, encoding, next) {
104104
this.push(ModelFactory.loadModel(broker, ctx, chunk, ctx.name))
105105
next()
106106
}
107107
})
108108
}
109109

110+
serialize () {
111+
let first = true
112+
113+
return new Transform({
114+
objectMode: true,
115+
116+
// start of array
117+
construct (callback) {
118+
this.push('[')
119+
callback()
120+
},
121+
122+
// each chunk is a record
123+
transform (chunk, encoding, next) {
124+
// comma-separate
125+
if (first) first = false
126+
else this.push(',')
127+
128+
// serialize record
129+
this.push(JSON.stringify(chunk))
130+
next()
131+
},
132+
133+
// end of array
134+
flush (callback) {
135+
this.push(']')
136+
callback()
137+
}
138+
})
139+
}
140+
110141
/**
142+
*
143+
* @param {Model[]} list
144+
* @param {import('./datasource').listOptions} options
145+
* @returns {Array<Readable|Transform>}
146+
*/
147+
stream (list, options) {
148+
return new Promise((resolve, reject) => {
149+
options.writable.on('error', reject)
150+
options.writable.on('end', resolve)
151+
152+
if (!isMainThread) list.push(this.hydrate())
153+
if (options.transform) list.concat(options.transform)
154+
if (options.serialize) list.push(this.serialize())
155+
156+
return list.reduce((p, c) => p.pipe(c)).pipe(options.writable)
157+
})
158+
}
159+
160+
/**
161+
* Returns the set of objects satisfying the `filter` if specified;
162+
* otherwise returns all objects. If a `writable` stream is provided and
163+
* `cached` is false, the list is streamed. Otherwise the list is returned
164+
* in an array. One or more custom transforms can be specified to modify the
165+
* streamed results. Using {@link createWriteStream}, updates can be streamed
166+
* back to the storage provider. With streams, we can support queries of very
167+
* large tables, with minimal memory overhead on the node server.
168+
*
111169
* @override
112-
* @param {*} options
113-
* @returns
170+
* @param {import('../../domain/datasource').listOptions} param
114171
*/
115172
async list (options) {
116173
try {
117-
if (options?.writable)
118-
return isMainThread
119-
? super.list(options)
120-
: super.list({ ...options, transform: this.transform() })
174+
if (options?.query.__count) return this.count()
175+
if (options?.query.__cached) return this.listSync(options.query)
121176

122-
const arr = await super.list(options)
177+
const opts = { ...options, streamRequested: options.writable }
178+
const list = [await super.list(opts)].flat()
123179

124-
if (Array.isArray(arr))
125-
return isMainThread
126-
? arr
127-
: arr.map(model =>
128-
ModelFactory.loadModel(broker, this, model, this.name)
129-
)
180+
if (list.length < 1) return []
181+
182+
if (list[0] instanceof Readable || list[0] instanceof Transform)
183+
return this.stream(list, options)
184+
185+
return isMainThread
186+
? list
187+
: list.map(model =>
188+
ModelFactory.loadModel(broker, this, model, this.name)
189+
)
130190
} catch (error) {
131191
console.error({ fn: this.list.name, error })
132192
throw error

src/domain/datasource.js

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,38 @@
22

33
import { changeDataCapture } from './util/change-data-capture'
44

5+
/**
6+
* @typedef {{
7+
* 'some-key': 'some-val',
8+
* }} SumOfType
9+
*/
10+
11+
/**
12+
* @typedef {object} QueryType
13+
* @property {boolean} [__cached] list cache only
14+
* @property {number|SumOfType} [__count] number of object to return
15+
* @property {'and'|'or'|'not'} [__operand] operation to use for key-value pairs
16+
* @property {string|number|boolean} [key1] key-value pair 1
17+
* @property {string|number|boolean} [keyN] key-value pair n
18+
*/
19+
20+
/**
21+
* Query syntax provided by the storage vendor's native API
22+
* @typedef {object} VendorType
23+
*/
24+
25+
/**
26+
* @typedef {object} listOptions
27+
* @property {QueryType} query url query params
28+
* @property {VendorType} options Vendor-specific native query syntax
29+
* @property {import('stream').Writable} writable writable stream for output
30+
* @property {import('stream').Transform|import('stream').Transform[]} transform
31+
* transform stream before writing
32+
* @property {boolean} serialize seriailize input to writable
33+
* @property {boolean} streamRequested true if caller provided a writable stream -
34+
* indicates to the datasource that it should return a readable stream if supported
35+
*/
36+
537
/** change data capture */
638
const cdcEnabled = false // /true/i.test('CHANGE_DATA_CAPTURE')
739

@@ -122,18 +154,7 @@ export default class DataSource {
122154

123155
/**
124156
* list model instances
125-
* @param {{key1:string, keyN:string}} filter - e.g. http query
126-
* @param {{
127-
* writable: WritableStream,
128-
* cached: boolean,
129-
* serialize: boolean,
130-
* transform: Transform
131-
* }} options
132-
* - details
133-
* - `serialize` seriailize input to writable
134-
* - `cached` list cache only
135-
* - `transform` transform stream before writing
136-
* - `writable` writable stream for output
157+
* @param {listOptions} options
137158
* @returns {Promise<any[]>}
138159
*/
139160
async list (options) {

0 commit comments

Comments
 (0)