Skip to content

Commit bb3f97f

Browse files
committed
finish gRPC refactor with nodeIntegration off
1 parent 002c04c commit bb3f97f

File tree

3 files changed

+110
-331
lines changed

3 files changed

+110
-331
lines changed

grpc_mockData/server.js

Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const path = require("path");
2+
const fs = require("fs");
23
const hl = require("highland");
34
const Mali = require("mali");
45
// Mali needs the old grpc as a peer dependency so that should be installed as well
@@ -16,7 +17,6 @@ async function sayHello(ctx) {
1617
// sets key-value pair inside ctx.response.metadata as a replacement for headers
1718
ctx.set("UNARY", "true");
1819
ctx.res = { message: "Hello " + ctx.req.name };
19-
console.log(`set sayHello response from gRPC server: ${ctx.res.message}`);
2020
}
2121
// nested Unary stream
2222
async function sayHelloNested(ctx) {
@@ -30,14 +30,11 @@ async function sayHelloNested(ctx) {
3030
{ message: "Hello! " + secondPerson },
3131
],
3232
};
33-
console.log(
34-
`set sayHelloNested response from gRPC server ${ctx.res.serverMessage[0]} ${ctx.res.serverMessage[1]}`
35-
);
3633
}
3734
// Server-Side Stream
3835
// used highland library to manage asynchronous data
3936
async function sayHellosSs(ctx) {
40-
ctx.set("SERVER-SIDE STREAM", "true");
37+
ctx.set("Server-side-stream", "true");
4138
// In case of UNARY and RESPONSE_STREAM calls it is simply the gRPC call's request
4239

4340
const dataStream = [
@@ -58,64 +55,52 @@ async function sayHellosSs(ctx) {
5855
const reqMessages = { message: "hello!!! " + ctx.req.name };
5956
// combine template with reqMessage
6057
const updatedStream = [...dataStream, reqMessages];
61-
// research what await hl(array of objects) does
6258
const makeStreamData = await hl(updatedStream);
6359
ctx.res = makeStreamData;
64-
6560
// ends server stream
6661
ctx.res.end();
6762
}
6863

6964
// Client-Side stream
70-
function sayHelloCs(ctx) {
65+
async function sayHelloCs(ctx) {
7166
// create new metadata
72-
const metadata = new grpc.Metadata();
73-
metadata.set("it", "works?");
74-
metadata.set("indeed", "it do");
75-
metadata.set("clientStream", "indubitably");
76-
// The execution context provides scripts and templates with access to the watch metadata
77-
console.dir(ctx.metadata, { depth: 3, colors: true });
78-
// console.log('got sayHelloClients')
79-
let counter = 0;
67+
ctx.set("client-side-stream", "true");
68+
8069
const messages = [];
81-
// client streaming calls to write messages and end writing before you can get the response
70+
8271
return new Promise((resolve, reject) => {
72+
// ctx.req is the incoming readable stream
8373
hl(ctx.req)
8474
.map((message) => {
85-
counter++;
86-
// console.log('message content',message.name)
87-
ctx.response.res = { message: "Client stream: " + message.name };
88-
messages.push(message.name);
89-
ctx.sendMetadata(metadata);
75+
console.log("parsed stream message with name key, ", message);
76+
// currently the proto file is setup to only read streams with the key "name"
77+
// other named keys will be pushed as an empty object
78+
messages.push(message);
79+
return undefined;
9080
})
91-
// returns all the elements as an array
9281
.collect()
9382
.toCallback((err, result) => {
9483
if (err) return reject(err);
95-
// console.log(`done sayHelloClients counter ${counter}`)
96-
ctx.response.res = { message: "SAYHELLOCs Client stream: " + messages };
97-
// console.log(ctx.response.res)
98-
resolve();
84+
console.log("messages ->", messages);
85+
ctx.response.res = { message: `received ${messages.length} messages` };
86+
return resolve();
9987
});
10088
});
10189
}
10290

10391
// Bi-Di stream
10492
function sayHelloBidi(ctx) {
10593
// create new metadata
106-
const metadata = new grpc.Metadata();
107-
metadata.set("it", "works?");
108-
metadata.set("indeed", "it do");
109-
// console.log("got sayHelloBidi");
94+
ctx.set("bidi-stream", "true");
95+
console.log("got sayHelloBidi");
11096
// The execution context provides scripts and templates with access to the watch metadata
11197
console.dir(ctx.metadata, { depth: 3, colors: true });
11298
let counter = 0;
113-
ctx.req.on("data", (d) => {
99+
ctx.req.on("data", (data) => {
114100
counter++;
115-
ctx.res.write({ message: "bidi stream: " + d.name });
101+
ctx.res.write({ message: "bidi stream: " + data.name });
116102
});
117-
metadata.set("bidiStream", "ohyes");
118-
ctx.sendMetadata(metadata);
103+
119104
// calls end to client before closing server
120105
ctx.req.on("end", () => {
121106
// console.log(`done sayHelloBidi counter ${counter}`);

main.js

Lines changed: 90 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,6 @@ ipcMain.on("fetch-meta-and-client", (event, data) => {
553553
const { rpcType } = data;
554554
const PROTO_PATH = reqResObj.protoPath;
555555
const { packageName, service, url, rpc, queryArr } = data.reqResObj;
556-
557556
const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
558557
keepCase: true,
559558
longs: String,
@@ -612,7 +611,44 @@ ipcMain.on("fetch-meta-and-client", (event, data) => {
612611
});
613612
mainWindow.webContents.send("reqResUpdate", reqResObj);
614613
});
614+
} else if (rpcType === "SERVER STREAM") {
615+
console.log("SERVER STREAM inside gRPC");
616+
const timesArr = [];
617+
// Open Connection for SERVER Stream
618+
reqResObj.connection = "open";
619+
reqResObj.timeSent = Date.now();
620+
const call = client[rpc](reqResObj.queryArr[0], meta);
621+
call.on("data", (resp) => {
622+
const time = {};
623+
time.timeReceived = Date.now();
624+
time.timeSent = reqResObj.timeSent;
625+
// add server response to reqResObj and dispatch to state/store
626+
reqResObj.response.events.push(resp);
627+
reqResObj.response.times.push(time);
628+
reqResObj.timeReceived = time.timeReceived; // overwritten on each call to get the final value
629+
630+
mainWindow.webContents.send("reqResUpdate", reqResObj);
631+
});
632+
call.on("error", () => {
633+
// for fatal error from server
634+
console.log("server side stream error");
635+
});
636+
call.on("end", () => {
637+
// Close Connection for SERVER Stream
638+
reqResObj.connection = "closed";
639+
// no need to push response to reqResObj, no event expected from on 'end'
640+
mainWindow.webContents.send("reqResUpdate", reqResObj);
641+
});
642+
call.on("metadata", (data) => {
643+
const metadata = data.internalRepr;
644+
// set metadata Map as headers
645+
metadata.forEach((value, key) => {
646+
reqResObj.response.headers[key] = value[0];
647+
});
648+
mainWindow.webContents.send("reqResUpdate", reqResObj);
649+
});
615650
} else if (rpcType === "CLIENT STREAM") {
651+
console.log("CLIENT STREAM inside gRPC");
616652
// create call and open client stream connection
617653
reqResObj.connection = "open";
618654
const timeSent = Date.now();
@@ -660,93 +696,59 @@ ipcMain.on("fetch-meta-and-client", (event, data) => {
660696
}
661697
call.end();
662698
}
663-
// else if (rpcType === "SERVER STREAM") {
664-
// const timesArr = [];
665-
// // Open Connection for SERVER Stream
666-
// reqResObj.connection = "open";
667-
// reqResObj.timeSent = Date.now();
668-
// const call = client[rpc](reqResObj.queryArr[0], meta);
669-
// call.on("data", (resp) => {
670-
// const time = {};
671-
// time.timeReceived = Date.now();
672-
// time.timeSent = reqResObj.timeSent;
673-
// // add server response to reqResObj and dispatch to state/store
674-
// reqResObj.response.events.push(resp);
675-
// reqResObj.response.times.push(time);
676-
// reqResObj.timeReceived = time.timeReceived; // overwritten on each call to get the final value
677-
678-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
679-
// });
680-
// call.on("error", () => {
681-
// // for fatal error from server
682-
// console.log("server side stream erring out");
683-
// });
684-
// call.on("end", () => {
685-
// // Close Connection for SERVER Stream
686-
// reqResObj.connection = "closed";
687-
// // no need to push response to reqResObj, no event expected from on 'end'
688-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
689-
// });
690-
// call.on("metadata", (metadata) => {
691-
// const keys = Object.keys(metadata._internal_repr);
692-
// for (let i = 0; i < keys.length; i += 1) {
693-
// const key = keys[i];
694-
// reqResObj.response.headers[key] = metadata._internal_repr[key][0];
695-
// }
696-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
697-
// });
698-
// }
699-
// //else BIDIRECTIONAL
700-
// else {
701-
// // Open duplex stream
702-
// let counter = 0;
703-
// const call = client[rpc](meta);
704-
// call.on("data", (response) => {
705-
// const curTimeObj = reqResObj.response.times[counter];
706-
// counter++;
707-
// //Close Individual Server Response for BIDIRECTIONAL Stream
708-
// reqResObj.connection = "pending";
709-
// curTimeObj.timeReceived = Date.now();
710-
// reqResObj.timeReceived = curTimeObj.timeReceived;
711-
// reqResObj.response.events.push(response);
712-
// reqResObj.response.times.push(curTimeObj);
713-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
714-
// }); // metadata from server
715-
// call.on("metadata", (metadata) => {
716-
// const keys = Object.keys(metadata._internal_repr);
717-
// for (let i = 0; i < keys.length; i += 1) {
718-
// const key = keys[i];
719-
// reqResObj.response.headers[key] = metadata._internal_repr[key][0];
720-
// }
721-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
722-
// });
723-
// call.on("error", () => {
724-
// console.log("server ended connection with error");
725-
// });
726-
// call.on("end", (data) => {
727-
// //Close Final Server Connection for BIDIRECTIONAL Stream
728-
// reqResObj.connection = "closed";
729-
// // no need to push response to reqResObj, no event expected from on 'end'
730-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
731-
// });
732-
733-
// for (let i = 0; i < queryArr.length; i++) {
734-
// const time = {};
735-
// const query = queryArr[i];
736-
// //Open Connection for BIDIRECTIONAL Stream
737-
// if (i === 0) {
738-
// reqResObj.connection = "open";
739-
// } else {
740-
// reqResObj.connection = "pending";
741-
// }
742-
// time.timeSent = Date.now();
743-
// reqResObj.timeSent = time.timeSent;
744-
// reqResObj.response.times.push(time);
745-
// call.write(query);
746-
// }
747-
// call.end();
748-
// }
749-
// store.default.dispatch(actions.reqResUpdate(reqResObj));
699+
700+
//else BIDIRECTIONAL
701+
else {
702+
console.log("BIDIRECTIONAL gRPC");
703+
let counter = 0;
704+
const call = client[rpc](meta);
705+
call.on("data", (response) => {
706+
const curTimeObj = reqResObj.response.times[counter];
707+
counter++;
708+
//Close Individual Server Response for BIDIRECTIONAL Stream
709+
reqResObj.connection = "pending";
710+
curTimeObj.timeReceived = Date.now();
711+
reqResObj.timeReceived = curTimeObj.timeReceived;
712+
reqResObj.response.events.push(response);
713+
reqResObj.response.times.push(curTimeObj);
714+
// update redux store
715+
mainWindow.webContents.send("reqResUpdate", reqResObj);
716+
}); // metadata from server
717+
call.on("metadata", (data) => {
718+
const metadata = data.internalRepr;
719+
720+
metadata.forEach((value, key) => {
721+
reqResObj.response.headers[key] = value[0];
722+
});
723+
mainWindow.webContents.send("reqResUpdate", reqResObj);
724+
});
725+
call.on("error", () => {
726+
console.log("server ended connection with error");
727+
});
728+
call.on("end", (data) => {
729+
//Close Final Server Connection for BIDIRECTIONAL Stream
730+
reqResObj.connection = "closed";
731+
// no need to push response to reqResObj, no event expected from on 'end'
732+
mainWindow.webContents.send("reqResUpdate", reqResObj);
733+
});
734+
735+
for (let i = 0; i < queryArr.length; i++) {
736+
const time = {};
737+
const query = queryArr[i];
738+
//Open Connection for BIDIRECTIONAL Stream
739+
if (i === 0) {
740+
reqResObj.connection = "open";
741+
} else {
742+
reqResObj.connection = "pending";
743+
}
744+
time.timeSent = Date.now();
745+
reqResObj.timeSent = time.timeSent;
746+
reqResObj.response.times.push(time);
747+
call.write(query);
748+
}
749+
call.end();
750+
}
751+
mainWindow.webContents.send("reqResUpdate", reqResObj);
750752
});
751753

752754
// ====================== OLDER STUFF =======================

0 commit comments

Comments
 (0)