11const path = require ( "path" ) ;
2- const Mali = require ( "mali" ) ;
3- // consider replacing highland with normal node code for converting array to streams
2+ const fs = require ( "fs" ) ;
43const hl = require ( "highland" ) ;
4+ const Mali = require ( "mali" ) ;
5+ // Mali needs the old grpc as a peer dependency so that should be installed as well
56const grpc = require ( "@grpc/grpc-js" ) ;
67
8+ // consider replacing highland with normal node code for converting array to streams
9+
710const PROTO_PATH = path . join ( __dirname , "./protos/hw2.proto" ) ;
811const HOSTPORT = "0.0.0.0:50051" ;
912
10- const dataStream = [
11- {
12- message : "You" ,
13- } ,
14- {
15- message : "Are" ,
16- } ,
17- {
18- message : "doing IT" ,
19- } ,
20- {
21- message : "Champ" ,
22- } ,
23- ] ;
24-
25- /**
26- * Implements the SayHello RPC method.
27- */
28-
2913// Unary stream
3014// ctx = watch execution context
31- function sayHello ( ctx ) {
32- // create new metadata
33- let metadata = new grpc . Metadata ( ) ;
34- metadata . set ( "it" , "works?" ) ;
35- metadata . set ( "indeed" , "it do" ) ;
36- // Watcher creates a watch execution context for the watch
37- // The execution context provides scripts and templates with access to the watch metadata
38- // console.log("received metadata from client request", ctx.metadata)
39- // console.dir(ctx.metadata, { depth: 3, colors: true });
40- // console.log(`got sayHello request name: ${ctx.req.name}`);
41-
42- // an alias to ctx.response.res
43- // This is set only in case of DUPLEX calls, to the the gRPC call reference itself
15+ async function sayHello ( ctx ) {
16+ // ctx contains both req and res objects
17+ // sets key-value pair inside ctx.response.metadata as a replacement for headers
18+ ctx . set ( "UNARY" , "true" ) ;
4419 ctx . res = { message : "Hello " + ctx . req . name } ;
45-
46- // send response header metadata object directly as an argument and that is set and sent
47- metadata . set ( "UNARY" , "yes" ) ;
48- ctx . sendMetadata ( metadata ) ;
49-
50- // console.log(`set sayHello response: ${ctx.res.message}`);
5120}
5221// nested Unary stream
53-
54- function sayHelloNested ( ctx ) {
55- // create new metadata
56- let metadata = new grpc . Metadata ( ) ;
57- metadata . set ( "it" , "works?" ) ;
58- metadata . set ( "indeed" , "it do" ) ;
59- // Watcher creates a watch execution context for the watch
60- // The execution context provides scripts and templates with access to the watch metadata
61- // console.log("received metadata from client request", ctx.metadata)
62- // console.dir(ctx.metadata, { depth: 3, colors: true });
63- // console.log("ctx line 64 from server.js", ctx)
64-
22+ async function sayHelloNested ( ctx ) {
23+ ctx . set ( "UNARY" , "true" ) ;
6524 // nested unary response call
66- let firstPerson = ctx . req . firstPerson . name ;
67- let secondPerson = ctx . req . secondPerson . name ;
68- // console.log("firstPerson line 68 from server.js:", firstPerson)
25+ const firstPerson = ctx . req . firstPerson . name ;
26+ const secondPerson = ctx . req . secondPerson . name ;
6927 ctx . res = {
7028 serverMessage : [
7129 { message : "Hello! " + firstPerson } ,
7230 { message : "Hello! " + secondPerson } ,
7331 ] ,
7432 } ;
75-
76- // send response header metadata object directly as an argument and that is set and sent
77- ctx . sendMetadata ( metadata ) ;
7833}
7934// Server-Side Stream
8035// used highland library to manage asynchronous data
81- async function sayHellos ( ctx ) {
82- // create new metadata
83- let metadata = new grpc . Metadata ( ) ;
84- metadata . set ( "it" , "works?" ) ;
85- metadata . set ( "indeed" , "it do" ) ;
86- // The execution context provides scripts and templates with access to the watch metadata
87- // console.dir(ctx.metadata, { depth: 3, colors: true });
88- // converts a request into strings
89- // console.log(`got sayHellos request name:`, JSON.stringify(ctx.req, null, 4));
90-
91- // alias for ctx.request.req
36+ async function sayHellosSs ( ctx ) {
37+ ctx . set ( "Server-side-stream" , "true" ) ;
9238 // In case of UNARY and RESPONSE_STREAM calls it is simply the gRPC call's request
9339
94- let reqMessages = { message : "hello!!! " + ctx . req . name } ;
95-
96- dataStream . push ( reqMessages ) ;
97- reqMessages = dataStream ;
98- let streamData = await hl ( reqMessages ) ;
99- ctx . res = streamData ;
100- metadata . set ( "serverStream" , "indeed" ) ;
101- dataStream . pop ( ) ;
102-
103- // send response header metadata object directly as an argument and that is set and sent
104- ctx . sendMetadata ( metadata ) ;
105-
40+ const dataStream = [
41+ {
42+ message : "You" ,
43+ } ,
44+ {
45+ message : "Are" ,
46+ } ,
47+ {
48+ message : "doing IT" ,
49+ } ,
50+ {
51+ message : "Champ" ,
52+ } ,
53+ ] ;
54+
55+ const reqMessages = { message : "hello!!! " + ctx . req . name } ;
56+ // combine template with reqMessage
57+ const updatedStream = [ ...dataStream , reqMessages ] ;
58+ const makeStreamData = await hl ( updatedStream ) ;
59+ ctx . res = makeStreamData ;
10660 // ends server stream
10761 ctx . res . end ( ) ;
10862}
10963
11064// Client-Side stream
111- function sayHelloCs ( ctx ) {
65+ async function sayHelloCs ( ctx ) {
11266 // create new metadata
113- let metadata = new grpc . Metadata ( ) ;
114- metadata . set ( "it" , "works?" ) ;
115- metadata . set ( "indeed" , "it do" ) ;
116- metadata . set ( "clientStream" , "indubitably" ) ;
117- // The execution context provides scripts and templates with access to the watch metadata
118- console . dir ( ctx . metadata , { depth : 3 , colors : true } ) ;
119- // console.log('got sayHelloClients')
120- let counter = 0 ;
121- let messages = [ ] ;
122- // client streaming calls to write messages and end writing before you can get the response
67+ ctx . set ( "client-side-stream" , "true" ) ;
68+
69+ const messages = [ ] ;
70+
12371 return new Promise ( ( resolve , reject ) => {
72+ // ctx.req is the incoming readable stream
12473 hl ( ctx . req )
12574 . map ( ( message ) => {
126- counter ++ ;
127- // console.log('message content',message. name)
128- ctx . response . res = { message : "Client stream: " + message . name } ;
129- messages . push ( message . name ) ;
130- ctx . sendMetadata ( metadata ) ;
75+ console . log ( "parsed stream message with name key, " , message ) ;
76+ // currently the proto file is setup to only read streams with the key " name"
77+ // other named keys will be pushed as an empty object
78+ messages . push ( message ) ;
79+ return undefined ;
13180 } )
132- // returns all the elements as an array
13381 . collect ( )
13482 . toCallback ( ( err , result ) => {
13583 if ( err ) return reject ( err ) ;
136- // console.log(`done sayHelloClients counter ${counter}`)
137- ctx . response . res = { message : "SAYHELLOCs Client stream: " + messages } ;
138- // console.log(ctx.response.res)
139- resolve ( ) ;
84+ console . log ( "messages ->" , messages ) ;
85+ ctx . response . res = { message : `received ${ messages . length } messages` } ;
86+ return resolve ( ) ;
14087 } ) ;
14188 } ) ;
14289}
14390
14491// Bi-Di stream
14592function sayHelloBidi ( ctx ) {
14693 // create new metadata
147- let metadata = new grpc . Metadata ( ) ;
148- metadata . set ( "it" , "works?" ) ;
149- metadata . set ( "indeed" , "it do" ) ;
150- // console.log("got sayHelloBidi");
94+ ctx . set ( "bidi-stream" , "true" ) ;
95+ console . log ( "got sayHelloBidi" ) ;
15196 // The execution context provides scripts and templates with access to the watch metadata
15297 console . dir ( ctx . metadata , { depth : 3 , colors : true } ) ;
15398 let counter = 0 ;
154- ctx . req . on ( "data" , ( d ) => {
99+ ctx . req . on ( "data" , ( data ) => {
155100 counter ++ ;
156- ctx . res . write ( { message : "bidi stream: " + d . name } ) ;
101+ ctx . res . write ( { message : "bidi stream: " + data . name } ) ;
157102 } ) ;
158- metadata . set ( "bidiStream" , "ohyes" ) ;
159- ctx . sendMetadata ( metadata ) ;
103+
160104 // calls end to client before closing server
161105 ctx . req . on ( "end" , ( ) => {
162106 // console.log(`done sayHelloBidi counter ${counter}`);
@@ -171,7 +115,7 @@ function sayHelloBidi(ctx) {
171115 */
172116function main ( ) {
173117 const app = new Mali ( PROTO_PATH , "Greeter" ) ;
174- app . use ( { sayHello, sayHelloNested, sayHellos , sayHelloCs, sayHelloBidi } ) ;
118+ app . use ( { sayHello, sayHelloNested, sayHellosSs , sayHelloCs, sayHelloBidi } ) ;
175119 app . start ( HOSTPORT ) ;
176120 console . log ( `Greeter service running @ ${ HOSTPORT } ` ) ;
177121}
0 commit comments