@@ -15,9 +15,10 @@ const url = process.env.MONGODB_URL || 'mongodb://localhost:27017'
1515const configRoot = require ( '../../config' ) . hostConfig
1616const dsOptions = configRoot . adapters . datasources . DataSourceMongoDb . options || {
1717 runOffline : true ,
18- numConns : 2 ,
18+ numConns : 2
1919}
2020const cacheSize = configRoot . adapters . cacheSize || 3000
21+ let connPool
2122
2223/**
2324 * @type {Map<string,MongoClient> }
@@ -35,55 +36,64 @@ const mongoOpts = {
3536 * even when the database is offline.
3637 */
3738export class DataSourceMongoDb extends DataSource {
38- constructor ( map , name , namespace , options = { } ) {
39+ constructor ( map , name , namespace , options = { } ) {
3940 super ( map , name , namespace , options )
4041 this . cacheSize = cacheSize
4142 this . mongoOpts = mongoOpts
4243 this . runOffline = dsOptions . runOffline
4344 this . url = url
4445 }
4546
46- connect ( client ) {
47- return async function ( ) {
48- let timeout = false
49- const timerId = setTimeout ( ( ) => {
50- timeout = true
51- } , 500 )
52- await client . connect ( )
53- clearTimeout ( timerId )
54- if ( timeout ) throw new Error ( 'mongo conn timeout' )
55- }
56- }
57-
58- async connectionPool ( ) {
47+ /**
48+ *
49+ * @returns {Promise<import('mongodb').Db> }
50+ */
51+ async connectionPool ( ) {
5952 return new Promise ( ( resolve , reject ) => {
6053 if ( this . db ) return resolve ( this . db )
6154 MongoClient . connect (
6255 this . url ,
6356 {
6457 ...this . mongoOpts ,
6558 poolSize : dsOptions . numConns || 2 ,
59+ connectTimeoutMS : 500
6660 } ,
67- ( err , database ) => {
61+ ( err , db ) => {
6862 if ( err ) return reject ( err )
69- resolve ( ( this . db = database . db ( this . namespace ) ) )
63+ this . db = db ( this . namespace )
64+ resolve ( this . db )
7065 }
7166 )
7267 } )
7368 }
7469
75- async connection ( ) {
70+ connect ( client ) {
71+ return async function ( ) {
72+ let timeout = false
73+ const timerId = setTimeout ( ( ) => {
74+ timeout = true
75+ } , 500 )
76+ await client . connect ( )
77+ clearTimeout ( timerId )
78+ if ( timeout ) throw new Error ( 'mongo conn timeout' )
79+ }
80+ }
81+
82+ async connection ( ) {
7683 try {
7784 while ( connections . length < ( dsOptions . numConns || 1 ) ) {
78- const client = new MongoClient ( this . url , this . mongoOpts )
85+ const client = new MongoClient ( this . url , {
86+ ...this . mongoOpts ,
87+ connectTimeoutMS : 500
88+ } )
7989 const thresholds = {
8090 default : {
8191 errorRate : 1 ,
8292 callVolume : 1 ,
8393 intervalMs : 10000 ,
8494 testDelay : 300000 ,
85- // fallbackFn: () => client.emit('connectionClosed ')
86- } ,
95+ fallbackFn : ( ) => console . log ( 'circuit open ')
96+ }
8797 }
8898 const breaker = CircuitBreaker (
8999 'mongodb.connect' ,
@@ -104,17 +114,18 @@ export class DataSourceMongoDb extends DataSource {
104114 }
105115 }
106116
107- async collection ( ) {
117+ async collection ( ) {
108118 try {
109119 return ( await this . connection ( ) ) . db ( this . namespace ) . collection ( this . name )
120+ // return (await this.connectionPool()).collection(this.name)
110121 } catch { }
111122 }
112123
113- async find ( id ) {
124+ async find ( id ) {
114125 try {
115126 return ( await this . collection ( ) ) . findOne ( { _id : id } )
116127 } catch ( error ) {
117- console . error ( { fn : this . findDb . name , error } )
128+ console . error ( { fn : this . find . name , error } )
118129 }
119130 }
120131
@@ -128,7 +139,7 @@ export class DataSourceMongoDb extends DataSource {
128139 * @param {* } id
129140 * @param {* } data
130141 */
131- async save ( id , data ) {
142+ async save ( id , data ) {
132143 try {
133144 const col = await this . collection ( )
134145 col . replaceOne ( { _id : id } , { ...data , _id : id } , { upsert : true } )
@@ -150,19 +161,19 @@ export class DataSourceMongoDb extends DataSource {
150161 * @param {number } highWaterMark num of docs per batch write
151162 * @returns
152163 */
153- createWriteStream ( filter = { } , highWaterMark = HIGHWATERMARK ) {
164+ createWriteStream ( filter = { } , highWaterMark = HIGHWATERMARK ) {
154165 try {
155166 let objects = [ ]
156167 const ctx = this
157168
158- async function upsert ( ) {
169+ async function upsert ( ) {
159170 const operations = objects . map ( obj => {
160171 return {
161172 replaceOne : {
162173 filter : { ...filter , _id : obj . id } ,
163174 replacement : { ...obj , _id : obj . id } ,
164- upsert : true ,
165- } ,
175+ upsert : true
176+ }
166177 }
167178 } )
168179
@@ -181,17 +192,17 @@ export class DataSourceMongoDb extends DataSource {
181192 const writable = new Writable ( {
182193 objectMode : true ,
183194
184- async write ( chunk , _encoding , next ) {
195+ async write ( chunk , _encoding , next ) {
185196 objects . push ( chunk )
186197 // if true time to flush buffer and write to db
187198 if ( objects . length >= highWaterMark ) await upsert ( )
188199 next ( )
189200 } ,
190201
191- end ( chunk , _ , done ) {
202+ end ( chunk , _ , done ) {
192203 objects . push ( chunk )
193204 done ( )
194- } ,
205+ }
195206 } )
196207
197208 writable . on ( 'finish' , async ( ) => await upsert ( ) )
@@ -209,7 +220,7 @@ export class DataSourceMongoDb extends DataSource {
209220 *
210221 * @returns {Promise<import('mongodb').AbstractCursor> }
211222 */
212- async mongoFind ( { filter, sort, limit, aggregate, skip } = { } ) {
223+ async mongoFind ( { filter, sort, limit, aggregate, skip } = { } ) {
213224 console . log ( { fn : this . mongoFind . name , filter } )
214225 let cursor = ( await this . collection ( ) ) . find ( filter )
215226 if ( sort ) cursor = cursor . sort ( sort )
@@ -219,7 +230,7 @@ export class DataSourceMongoDb extends DataSource {
219230 return cursor
220231 }
221232
222- processOptions ( param ) {
233+ processOptions ( param ) {
223234 const { options = { } , query = { } } = param
224235 return { ...options , ...processQuery ( query ) }
225236 }
@@ -229,7 +240,7 @@ export class DataSourceMongoDb extends DataSource {
229240 * @override
230241 * @param {import('../../domain/datasource').listOptions } param
231242 */
232- async list ( param ) {
243+ async list ( param ) {
233244 try {
234245 const options = this . processOptions ( param )
235246 console . log ( { options } )
@@ -248,19 +259,19 @@ export class DataSourceMongoDb extends DataSource {
248259 *
249260 * @override
250261 */
251- async count ( ) {
262+ async count ( ) {
252263 return {
253264 total : await this . countDb ( ) ,
254265 cached : this . getCacheSize ( ) ,
255- bytes : this . getCacheSizeBytes ( ) ,
266+ bytes : this . getCacheSizeBytes ( )
256267 }
257268 }
258269
259270 /**
260271 * @override
261272 * @returns
262273 */
263- async countDb ( ) {
274+ async countDb ( ) {
264275 return ( await this . collection ( ) ) . countDocuments ( )
265276 }
266277
@@ -271,7 +282,7 @@ export class DataSourceMongoDb extends DataSource {
271282 * @override
272283 * @param {* } id
273284 */
274- async delete ( id ) {
285+ async delete ( id ) {
275286 try {
276287 await ( await this . collection ( ) ) . deleteOne ( { _id : id } )
277288 } catch ( error ) {
0 commit comments