Skip to content

Commit 76c058a

Browse files
committed
wire up nestedUnary calls and test
1 parent c1ab4f8 commit 76c058a

File tree

2 files changed

+173
-94
lines changed

2 files changed

+173
-94
lines changed

grpc_mockData/server.js

Lines changed: 31 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -9,105 +9,58 @@ const grpc = require("@grpc/grpc-js");
99
const PROTO_PATH = path.join(__dirname, "./protos/hw2.proto");
1010
const HOSTPORT = "0.0.0.0:50051";
1111

12-
const dataStream = [
13-
{
14-
message: "You",
15-
},
16-
{
17-
message: "Are",
18-
},
19-
{
20-
message: "doing IT",
21-
},
22-
{
23-
message: "Champ",
24-
},
25-
];
26-
27-
/**
28-
* Implements the SayHello RPC method.
29-
*/
30-
3112
// Unary stream
3213
// ctx = watch execution context
3314
async function sayHello(ctx) {
34-
// create new metadata
35-
36-
// const metadata = new grpc.Metadata();
37-
ctx.set("it", "works?");
38-
ctx.response.set("indeed", "it do");
39-
// Watcher creates a watch execution context for the watch
40-
// The execution context provides scripts and templates with access to the watch metadata
41-
// console.log("received metadata from client request", ctx.metadata);
42-
// console.dir(ctx.metadata, { depth: 3, colors: true });
43-
// console.log(`got sayHello request name: ${ctx.req.name}`);
44-
45-
// an alias to ctx.response.res
46-
// This is set only in case of DUPLEX calls, to the the gRPC call reference itself
47-
// ctx.res = { message: "Hello " + ctx.req.name };
48-
ctx.response.res = { message: "Hello " + ctx.req.name };
49-
// ctx.res will do the same as above
50-
console.log("ctx.res", ctx.res);
51-
console.log("ctx.response", ctx.response);
52-
// send response header metadata object directly as an argument and that is set and sent
53-
// metadata.set("UNARY", "yes");
54-
// ctx.sendMetadata(metadata);
55-
// console.log("metadata is", metadata);
15+
// ctx contains both req and res objects
16+
// sets key-value pair inside ctx.response.metadata as a replacement for headers
17+
ctx.set("UNARY", "true");
18+
ctx.res = { message: "Hello " + ctx.req.name };
5619
console.log(`set sayHello response from gRPC server: ${ctx.res.message}`);
5720
}
5821
// nested Unary stream
59-
60-
function sayHelloNested(ctx) {
61-
// create new metadata
62-
const metadata = new grpc.Metadata();
63-
metadata.set("it", "works?");
64-
metadata.set("indeed", "it do");
65-
// Watcher creates a watch execution context for the watch
66-
// The execution context provides scripts and templates with access to the watch metadata
67-
// console.log("received metadata from client request", ctx.metadata)
68-
// console.dir(ctx.metadata, { depth: 3, colors: true });
69-
// console.log("ctx line 64 from server.js", ctx)
70-
22+
async function sayHelloNested(ctx) {
23+
ctx.set("UNARY", "true");
7124
// nested unary response call
7225
const firstPerson = ctx.req.firstPerson.name;
7326
const secondPerson = ctx.req.secondPerson.name;
74-
// console.log("firstPerson line 68 from server.js:", firstPerson)
7527
ctx.res = {
7628
serverMessage: [
7729
{ message: "Hello! " + firstPerson },
7830
{ message: "Hello! " + secondPerson },
7931
],
8032
};
81-
82-
// send response header metadata object directly as an argument and that is set and sent
83-
ctx.sendMetadata(metadata);
33+
console.log(
34+
`set sayHelloNested response from gRPC server ${ctx.res.serverMessage[0]} ${ctx.res.serverMessage[1]}`
35+
);
8436
}
8537
// Server-Side Stream
8638
// used highland library to manage asynchronous data
8739
async function sayHellos(ctx) {
88-
// create new metadata
89-
const metadata = new grpc.Metadata();
90-
metadata.set("it", "works?");
91-
metadata.set("indeed", "it do");
92-
// The execution context provides scripts and templates with access to the watch metadata
93-
// console.dir(ctx.metadata, { depth: 3, colors: true });
94-
// converts a request into strings
95-
// console.log(`got sayHellos request name:`, JSON.stringify(ctx.req, null, 4));
96-
97-
// alias for ctx.request.req
40+
ctx.set("SERVER-SIDE STREAM", "true");
9841
// In case of UNARY and RESPONSE_STREAM calls it is simply the gRPC call's request
9942

100-
let reqMessages = { message: "hello!!! " + ctx.req.name };
101-
102-
dataStream.push(reqMessages);
103-
reqMessages = dataStream;
104-
const streamData = await hl(reqMessages);
105-
ctx.res = streamData;
106-
metadata.set("serverStream", "indeed");
107-
dataStream.pop();
108-
109-
// send response header metadata object directly as an argument and that is set and sent
110-
ctx.sendMetadata(metadata);
43+
const dataStream = [
44+
{
45+
message: "You",
46+
},
47+
{
48+
message: "Are",
49+
},
50+
{
51+
message: "doing IT",
52+
},
53+
{
54+
message: "Champ",
55+
},
56+
];
57+
58+
const reqMessages = { message: "hello!!! " + ctx.req.name };
59+
// combine template with reqMessage
60+
const updatedStream = [...dataStream, reqMessages];
61+
// research what await hl(array of objects) does
62+
const makeStreamData = await hl(updatedStream);
63+
ctx.res = makeStreamData;
11164

11265
// ends server stream
11366
ctx.res.end();

main.js

Lines changed: 142 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -576,10 +576,6 @@ ipcMain.on("fetch-meta-and-client", (event, data) => {
576576
const currentHeader = metaArr[i];
577577
meta.add(currentHeader.key, currentHeader.value);
578578
}
579-
// just send a hardcoded 'name' - 'test' key value pair
580-
// meta.add("name", "SayHello");
581-
582-
console.log("line 582 meta is", meta);
583579

584580
if (rpcType === "UNARY") {
585581
console.log("\n \n inside UNARY if statement");
@@ -605,22 +601,152 @@ ipcMain.on("fetch-meta-and-client", (event, data) => {
605601
reqResObj.response.events.push(data);
606602
reqResObj.response.times.push(time);
607603
// send stuff back for store
608-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
604+
mainWindow.webContents.send("reqResUpdate", reqResObj);
609605
}) // metadata from server
610-
.on("metadata", (metadata) => {
611-
console.log("\n metadata back from server!! \n");
612-
console.log("\n the data coming BACK from the server is \n", metadata);
606+
.on("metadata", (data) => {
607+
// metadata is a Map, not an object
608+
const metadata = data.internalRepr;
609+
// set metadata Map as headers
610+
metadata.forEach((value, key) => {
611+
reqResObj.response.headers[key] = value[0];
612+
});
613+
mainWindow.webContents.send("reqResUpdate", reqResObj);
614+
});
615+
} else if (rpcType === "CLIENT STREAM") {
616+
// create call and open client stream connection
617+
reqResObj.connection = "open";
618+
const timeSent = Date.now();
619+
reqResObj.timeSent = timeSent;
620+
const call = client[rpc](meta, function (error, response) {
621+
if (error) {
622+
console.log("error in client stream", error);
623+
return undefined;
624+
}
625+
//Close Connection for client Stream
626+
reqResObj.connection = "closed";
627+
const curTime = Date.now();
628+
reqResObj.response.times.forEach((time) => {
629+
time.timeReceived = curTime;
630+
reqResObj.timeReceived = time.timeReceived;
631+
});
632+
reqResObj.response.events.push(response);
633+
store.default.dispatch(actions.reqResUpdate(reqResObj));
634+
}).on("metadata", (metadata) => {
635+
// if metadata is sent back from the server, analyze and handle
636+
const keys = Object.keys(metadata._internal_repr);
637+
for (let i = 0; i < keys.length; i += 1) {
638+
const key = keys[i];
639+
reqResObj.response.headers[key] = metadata._internal_repr[key][0];
640+
}
641+
store.default.dispatch(actions.reqResUpdate(reqResObj));
642+
});
613643

614-
// const keys = Object.keys(metadata._internal_repr);
615-
// for (let i = 0; i < keys.length; i += 1) {
616-
// const key = keys[i];
617-
// reqResObj.response.headers[key] = metadata._internal_repr[key][0];
618-
// }
644+
for (let i = 0; i < queryArr.length; i++) {
645+
const query = queryArr[i];
646+
// Open Connection for client Stream
647+
// this needs additional work to provide correct sent time for each
648+
// request without overwrite
649+
const time = {};
619650

620-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
621-
});
651+
reqResObj.connection = "pending";
652+
653+
time.timeSent = timeSent;
654+
reqResObj.response.times.push(time);
655+
656+
//reqResObj.connectionType = 'plain';
657+
// reqResObj.timeSent = Date.now();
658+
659+
call.write(query);
660+
}
661+
call.end();
622662
}
623-
// mainWindow.webContents.send("meta-and-client", meta, client);
663+
// else if (rpcType === "SERVER STREAM") {
664+
// const timesArr = [];
665+
// // Open Connection for SERVER Stream
666+
// reqResObj.connection = "open";
667+
// reqResObj.timeSent = Date.now();
668+
// const call = client[rpc](reqResObj.queryArr[0], meta);
669+
// call.on("data", (resp) => {
670+
// const time = {};
671+
// time.timeReceived = Date.now();
672+
// time.timeSent = reqResObj.timeSent;
673+
// // add server response to reqResObj and dispatch to state/store
674+
// reqResObj.response.events.push(resp);
675+
// reqResObj.response.times.push(time);
676+
// reqResObj.timeReceived = time.timeReceived; // overwritten on each call to get the final value
677+
678+
// store.default.dispatch(actions.reqResUpdate(reqResObj));
679+
// });
680+
// call.on("error", () => {
681+
// // for fatal error from server
682+
// console.log("server side stream erring out");
683+
// });
684+
// call.on("end", () => {
685+
// // Close Connection for SERVER Stream
686+
// reqResObj.connection = "closed";
687+
// // no need to push response to reqResObj, no event expected from on 'end'
688+
// store.default.dispatch(actions.reqResUpdate(reqResObj));
689+
// });
690+
// call.on("metadata", (metadata) => {
691+
// const keys = Object.keys(metadata._internal_repr);
692+
// for (let i = 0; i < keys.length; i += 1) {
693+
// const key = keys[i];
694+
// reqResObj.response.headers[key] = metadata._internal_repr[key][0];
695+
// }
696+
// store.default.dispatch(actions.reqResUpdate(reqResObj));
697+
// });
698+
// }
699+
// //else BIDIRECTIONAL
700+
// else {
701+
// // Open duplex stream
702+
// let counter = 0;
703+
// const call = client[rpc](meta);
704+
// call.on("data", (response) => {
705+
// const curTimeObj = reqResObj.response.times[counter];
706+
// counter++;
707+
// //Close Individual Server Response for BIDIRECTIONAL Stream
708+
// reqResObj.connection = "pending";
709+
// curTimeObj.timeReceived = Date.now();
710+
// reqResObj.timeReceived = curTimeObj.timeReceived;
711+
// reqResObj.response.events.push(response);
712+
// reqResObj.response.times.push(curTimeObj);
713+
// store.default.dispatch(actions.reqResUpdate(reqResObj));
714+
// }); // metadata from server
715+
// call.on("metadata", (metadata) => {
716+
// const keys = Object.keys(metadata._internal_repr);
717+
// for (let i = 0; i < keys.length; i += 1) {
718+
// const key = keys[i];
719+
// reqResObj.response.headers[key] = metadata._internal_repr[key][0];
720+
// }
721+
// store.default.dispatch(actions.reqResUpdate(reqResObj));
722+
// });
723+
// call.on("error", () => {
724+
// console.log("server ended connection with error");
725+
// });
726+
// call.on("end", (data) => {
727+
// //Close Final Server Connection for BIDIRECTIONAL Stream
728+
// reqResObj.connection = "closed";
729+
// // no need to push response to reqResObj, no event expected from on 'end'
730+
// store.default.dispatch(actions.reqResUpdate(reqResObj));
731+
// });
732+
733+
// for (let i = 0; i < queryArr.length; i++) {
734+
// const time = {};
735+
// const query = queryArr[i];
736+
// //Open Connection for BIDIRECTIONAL Stream
737+
// if (i === 0) {
738+
// reqResObj.connection = "open";
739+
// } else {
740+
// reqResObj.connection = "pending";
741+
// }
742+
// time.timeSent = Date.now();
743+
// reqResObj.timeSent = time.timeSent;
744+
// reqResObj.response.times.push(time);
745+
// call.write(query);
746+
// }
747+
// call.end();
748+
// }
749+
// store.default.dispatch(actions.reqResUpdate(reqResObj));
624750
});
625751

626752
// ====================== OLDER STUFF =======================

0 commit comments

Comments
 (0)