@@ -21,6 +21,7 @@ import parseSchema from 'mongodb-schema';
2121import path from 'path' ;
2222import { signatures } from '@mongosh/shell-api' ;
2323import translator from '@mongosh/i18n' ;
24+ import { isAtlasStream } from 'mongodb-build-info' ;
2425import { Worker as WorkerThreads } from 'worker_threads' ;
2526
2627import { ExportToLanguageMode } from '../types/playgroundType' ;
@@ -65,6 +66,7 @@ export default class MongoDBService {
6566 _currentConnectionOptions ?: MongoClientOptions ;
6667
6768 _databaseCompletionItems : CompletionItem [ ] = [ ] ;
69+ _streamProcessorCompletionItems : CompletionItem [ ] = [ ] ;
6870 _shellSymbolCompletionItems : { [ symbol : string ] : CompletionItem [ ] } = { } ;
6971 _globalSymbolCompletionItems : CompletionItem [ ] = [ ] ;
7072 _collections : { [ database : string ] : string [ ] } = { } ;
@@ -147,6 +149,7 @@ export default class MongoDBService {
147149 databases : true ,
148150 collections : true ,
149151 fields : true ,
152+ streamProcessors : true ,
150153 } ) ;
151154 await this . _closeCurrentConnection ( ) ;
152155 }
@@ -174,6 +177,22 @@ export default class MongoDBService {
174177 ) ;
175178 }
176179
180+ if ( isAtlasStream ( connectionString || '' ) ) {
181+ await this . _getAndCacheStreamProcessors ( ) ;
182+ } else {
183+ await this . _getAndCacheDatabases ( ) ;
184+ }
185+
186+ this . _connection . console . log (
187+ `CliServiceProvider active connection has changed: { connectionId: ${ connectionId } }`
188+ ) ;
189+ return {
190+ successfullyConnected : true ,
191+ connectionId,
192+ } ;
193+ }
194+
195+ async _getAndCacheDatabases ( ) {
177196 try {
178197 // Get database names for the current connection.
179198 const databases = await this . _getDatabases ( ) ;
@@ -184,14 +203,17 @@ export default class MongoDBService {
184203 `LS get databases error: ${ util . inspect ( error ) } `
185204 ) ;
186205 }
206+ }
187207
188- this . _connection . console . log (
189- `CliServiceProvider active connection has changed: { connectionId: ${ connectionId } }`
190- ) ;
191- return {
192- successfullyConnected : true ,
193- connectionId,
194- } ;
208+ async _getAndCacheStreamProcessors ( ) {
209+ try {
210+ const processors = await this . _getStreamProcessors ( ) ;
211+ this . _cacheStreamProcessorCompletionItems ( processors ) ;
212+ } catch ( error ) {
213+ this . _connection . console . error (
214+ `LS get stream processors error: ${ util . inspect ( error ) } `
215+ ) ;
216+ }
195217 }
196218
197219 /**
@@ -245,9 +267,16 @@ export default class MongoDBService {
245267 )
246268 ) ;
247269
248- worker ?. on (
249- 'message' ,
250- ( { error, data } : { data ?: ShellEvaluateResult ; error ?: any } ) => {
270+ worker ?. on ( 'message' , ( { name, payload } ) => {
271+ if ( name === ServerCommands . SHOW_CONSOLE_OUTPUT ) {
272+ void this . _connection . sendNotification ( name , payload ) ;
273+ }
274+
275+ if ( name === ServerCommands . CODE_EXECUTION_RESULT ) {
276+ const { error, data } = payload as {
277+ data ?: ShellEvaluateResult ;
278+ error ?: any ;
279+ } ;
251280 if ( error ) {
252281 this . _connection . console . error (
253282 `WORKER error: ${ util . inspect ( error ) } `
@@ -261,7 +290,7 @@ export default class MongoDBService {
261290 resolve ( data ) ;
262291 } ) ;
263292 }
264- ) ;
293+ } ) ;
265294
266295 worker . postMessage ( {
267296 name : ServerCommands . EXECUTE_CODE_FROM_PLAYGROUND ,
@@ -294,6 +323,24 @@ export default class MongoDBService {
294323 } ) ;
295324 }
296325
326+ /**
327+ * Get stream processors names for the current connection.
328+ */
329+ async _getStreamProcessors ( ) : Promise < Document [ ] > {
330+ if ( this . _serviceProvider ) {
331+ try {
332+ const cmd = { listStreamProcessors : 1 } ;
333+ const result = await this . _serviceProvider . runCommand ( 'admin' , cmd ) ;
334+ return result . streamProcessors ?? [ ] ;
335+ } catch ( error ) {
336+ this . _connection . console . error (
337+ `LS get stream processors error: ${ error } `
338+ ) ;
339+ }
340+ }
341+ return [ ] ;
342+ }
343+
297344 /**
298345 * Get database names for the current connection.
299346 */
@@ -377,7 +424,7 @@ export default class MongoDBService {
377424 }
378425
379426 /**
380- * Return 'db' and 'use' completion items.
427+ * Return 'db', 'sp' and 'use' completion items.
381428 */
382429 _cacheGlobalSymbolCompletionItems ( ) {
383430 this . _globalSymbolCompletionItems = [
@@ -386,6 +433,11 @@ export default class MongoDBService {
386433 kind : CompletionItemKind . Method ,
387434 preselect : true ,
388435 } ,
436+ {
437+ label : 'sp' ,
438+ kind : CompletionItemKind . Method ,
439+ preselect : true ,
440+ } ,
389441 {
390442 label : 'use' ,
391443 kind : CompletionItemKind . Function ,
@@ -783,6 +835,18 @@ export default class MongoDBService {
783835 }
784836 }
785837
838+ /**
839+ * If the current node is 'sp.processor.<trigger>' or 'sp["processor"].<trigger>'.
840+ */
841+ _provideStreamProcessorSymbolCompletionItems ( state : CompletionState ) {
842+ if ( state . isStreamProcessorSymbol ) {
843+ this . _connection . console . log (
844+ 'VISITOR found stream processor symbol completions'
845+ ) ;
846+ return this . _shellSymbolCompletionItems . StreamProcessor ;
847+ }
848+ }
849+
786850 /**
787851 * If the current node is 'db.collection.find().<trigger>'.
788852 */
@@ -895,6 +959,37 @@ export default class MongoDBService {
895959 }
896960 }
897961
962+ /**
963+ * If the current node is 'sp.<trigger>'.
964+ */
965+ _provideSpSymbolCompletionItems ( state : CompletionState ) {
966+ if ( state . isSpSymbol ) {
967+ if ( state . isStreamProcessorName ) {
968+ this . _connection . console . log (
969+ 'VISITOR found sp symbol and stream processor name completions'
970+ ) ;
971+ return this . _shellSymbolCompletionItems . Streams . concat (
972+ this . _streamProcessorCompletionItems
973+ ) ;
974+ }
975+
976+ this . _connection . console . log ( 'VISITOR found sp symbol completions' ) ;
977+ return this . _shellSymbolCompletionItems . Streams ;
978+ }
979+ }
980+
981+ /**
982+ * If the current node is 'sp.get(<trigger>)'.
983+ */
984+ _provideStreamProcessorNameCompletionItems ( state : CompletionState ) {
985+ if ( state . isStreamProcessorName ) {
986+ this . _connection . console . log (
987+ 'VISITOR found stream processor name completions'
988+ ) ;
989+ return this . _streamProcessorCompletionItems ;
990+ }
991+ }
992+
898993 /**
899994 * If the current node can be used as a collection name
900995 * e.g. 'db.<trigger>.find()' or 'let a = db.<trigger>'.
@@ -965,6 +1060,7 @@ export default class MongoDBService {
9651060 this . _provideIdentifierObjectValueCompletionItems . bind ( this , state ) ,
9661061 this . _provideTextObjectValueCompletionItems . bind ( this , state ) ,
9671062 this . _provideCollectionSymbolCompletionItems . bind ( this , state ) ,
1063+ this . _provideStreamProcessorSymbolCompletionItems . bind ( this , state ) ,
9681064 this . _provideFindCursorCompletionItems . bind ( this , state ) ,
9691065 this . _provideAggregationCursorCompletionItems . bind ( this , state ) ,
9701066 this . _provideGlobalSymbolCompletionItems . bind ( this , state ) ,
@@ -974,13 +1070,15 @@ export default class MongoDBService {
9741070 currentLineText ,
9751071 position
9761072 ) ,
1073+ this . _provideSpSymbolCompletionItems . bind ( this , state ) ,
9771074 this . _provideCollectionNameCompletionItems . bind (
9781075 this ,
9791076 state ,
9801077 currentLineText ,
9811078 position
9821079 ) ,
9831080 this . _provideDbNameCompletionItems . bind ( this , state ) ,
1081+ this . _provideStreamProcessorNameCompletionItems . bind ( this , state ) ,
9841082 ] ;
9851083
9861084 for ( const func of completionOptions ) {
@@ -1117,6 +1215,18 @@ export default class MongoDBService {
11171215 this . _collections [ database ] = collections . map ( ( item ) => item . name ) ;
11181216 }
11191217
1218+ _cacheStreamProcessorCompletionItems ( processors : Document [ ] ) : void {
1219+ this . _streamProcessorCompletionItems = processors . map ( ( { name } ) => ( {
1220+ kind : CompletionItemKind . Folder ,
1221+ preselect : true ,
1222+ label : name ,
1223+ } ) ) ;
1224+ }
1225+
1226+ clearCachedStreamProcessors ( ) : void {
1227+ this . _streamProcessorCompletionItems = [ ] ;
1228+ }
1229+
11201230 clearCachedFields ( ) : void {
11211231 this . _fields = { } ;
11221232 }
@@ -1142,13 +1252,16 @@ export default class MongoDBService {
11421252
11431253 clearCachedCompletions ( clear : ClearCompletionsCache ) : void {
11441254 if ( clear . fields ) {
1145- this . _fields = { } ;
1255+ this . clearCachedFields ( ) ;
11461256 }
11471257 if ( clear . databases ) {
1148- this . _databaseCompletionItems = [ ] ;
1258+ this . clearCachedDatabases ( ) ;
11491259 }
11501260 if ( clear . collections ) {
1151- this . _collections = { } ;
1261+ this . clearCachedCollections ( ) ;
1262+ }
1263+ if ( clear . streamProcessors ) {
1264+ this . clearCachedStreamProcessors ( ) ;
11521265 }
11531266 }
11541267}
0 commit comments