@@ -6,59 +6,56 @@ import type { ACL } from '../../acl';
66import type Logger from '@matrixai/logger' ;
77import type { VaultsGitInfoGetMessage } from './types' ;
88import type { VaultAction } from '../../vaults/types' ;
9+ import type { JSONRPCRequest } from '@/rpc/types' ;
10+ import type { ContextTimed } from '@matrixai/contexts' ;
11+ import type { JSONValue } from '@/types' ;
12+ import { ReadableStream } from 'stream/web' ;
913import * as agentErrors from '../errors' ;
1014import * as vaultsUtils from '../../vaults/utils' ;
1115import * as vaultsErrors from '../../vaults/errors' ;
12- import { ServerHandler } from '../../rpc/handlers' ;
16+ import { RawHandler } from '../../rpc/handlers' ;
1317import { validateSync } from '../../validation' ;
14- import { matchSync } from '../../utils' ;
18+ import { matchSync , never } from '../../utils' ;
1519import * as validationUtils from '../../validation/utils' ;
1620import * as nodesUtils from '../../nodes/utils' ;
1721import * as agentUtils from '../utils' ;
22+ import * as utils from '../../utils' ;
1823
19- class VaultsGitInfoGetHandler extends ServerHandler <
20- {
21- db : DB ;
22- vaultManager : VaultManager ;
23- acl : ACL ;
24- logger : Logger ;
25- } ,
26- AgentRPCRequestParams < VaultsGitInfoGetMessage > ,
27- AgentRPCResponseResult < VaultInfo | GitPackMessage >
28- > {
29- public async * handle (
30- input : AgentRPCRequestParams < VaultsGitInfoGetMessage > ,
31- _cancel ,
32- meta ,
33- ) : AsyncGenerator < VaultInfo | GitPackMessage > {
24+ class VaultsGitInfoGetHandler extends RawHandler < {
25+ db : DB ;
26+ vaultManager : VaultManager ;
27+ acl : ACL ;
28+ logger : Logger ;
29+ } > {
30+ public async handle (
31+ input : [ JSONRPCRequest , ReadableStream < Uint8Array > ] ,
32+ cancel : ( reason ?: any ) => void ,
33+ meta : Record < string , JSONValue > | undefined ,
34+ ctx : ContextTimed ,
35+ ) : Promise < [ JSONValue , ReadableStream < Uint8Array > ] > {
3436 const { db, vaultManager, acl } = this . container ;
35- yield * db . withTransactionG ( async function * (
36- tran ,
37- ) : AsyncGenerator < VaultInfo | GitPackMessage > {
37+ const [ headerMessage , inputStream ] = input ;
38+ const params = headerMessage . params ;
39+ if ( params == null || ! utils . isObject ( params ) ) never ( ) ;
40+ if (
41+ ! ( 'vaultNameOrId' in params ) ||
42+ typeof params . vaultNameOrId != 'string'
43+ ) {
44+ never ( ) ;
45+ }
46+ if ( ! ( 'action' in params ) || typeof params . action != 'string' ) never ( ) ;
47+ const vaultNameOrId = params . vaultNameOrId ;
48+ const actionType = validationUtils . parseVaultAction ( params . action ) ;
49+ const data = await db . withTransactionF ( async ( tran ) => {
3850 const vaultIdFromName = await vaultManager . getVaultId (
39- input . vaultNameOrId ,
51+ vaultNameOrId ,
4052 tran ,
4153 ) ;
4254 const vaultId =
43- vaultIdFromName ?? vaultsUtils . decodeVaultId ( input . vaultNameOrId ) ;
55+ vaultIdFromName ?? vaultsUtils . decodeVaultId ( vaultNameOrId ) ;
4456 if ( vaultId == null ) {
4557 throw new vaultsErrors . ErrorVaultsVaultUndefined ( ) ;
4658 }
47- const {
48- actionType,
49- } : {
50- actionType : VaultAction ;
51- } = validateSync (
52- ( keyPath , value ) => {
53- return matchSync ( keyPath ) (
54- [ [ 'actionType' ] , ( ) => validationUtils . parseVaultAction ( value ) ] ,
55- ( ) => value ,
56- ) ;
57- } ,
58- {
59- actionType : input . action ,
60- } ,
61- ) ;
6259 const vaultName = ( await vaultManager . getVaultMeta ( vaultId , tran ) )
6360 ?. vaultName ;
6461 if ( vaultName == null ) {
@@ -85,20 +82,35 @@ class VaultsGitInfoGetHandler extends ServerHandler<
8582 ) ;
8683 }
8784
88- yield {
89- vaultName : vaultName ,
90- vaultIdEncoded : vaultsUtils . encodeVaultId ( vaultId ) ,
85+ return {
86+ vaultId ,
87+ vaultName ,
9188 } ;
92- for await ( const byte of vaultManager . handleInfoRequest ( vaultId , tran ) ) {
93- if ( byte !== null ) {
94- yield {
95- chunk : byte . toString ( 'binary' ) ,
96- } ;
97- } else {
98- return ;
89+ } ) ;
90+
91+ // TODO: Needs to handle cancellation
92+ const stream = new ReadableStream ( {
93+ start : async ( controller ) => {
94+ for await ( const buffer of vaultManager . handleInfoRequest (
95+ data . vaultId ,
96+ ) ) {
97+ if ( buffer != null ) {
98+ controller . enqueue ( buffer ) ;
99+ } else {
100+ break ;
101+ }
99102 }
100- }
103+ controller . close ( ) ;
104+ } ,
101105 } ) ;
106+
107+ return [
108+ {
109+ vaultName : data . vaultName ,
110+ vaultIdEncoded : vaultsUtils . encodeVaultId ( data . vaultId ) ,
111+ } ,
112+ stream ,
113+ ] ;
102114 }
103115}
104116
0 commit comments