-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEventhandler.ts
More file actions
237 lines (207 loc) · 7.86 KB
/
Eventhandler.ts
File metadata and controls
237 lines (207 loc) · 7.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import * as net from 'net';
import peer from 'noise-peer';
import { v4 as getUUID } from 'uuid';
import Delegate from './Utils/Delegate/Delegate';
import { DetailedStatus } from './Utils/enums/DetailedStatus';
import { LogLevel } from './Utils/enums/LogLevel';
import { Configdata } from './Utils/interfaces/Configdata';
import { Eventdata } from './Utils/interfaces/Eventdata';
import { Response } from './Utils/interfaces/Response';
import { ResponseArray } from './Utils/interfaces/ResponseArray';
export default class EventHandler {
private modulename: string;
private config: Configdata;
private bindings: Map<string, Delegate<(...args) => unknown>> = new Map();
/*private kernelHostname: string;*/
private kernelPort: number;
private secStream: peer.NoisePeer;
private pendingMessages: Map<string, (value: ResponseArray | PromiseLike<ResponseArray>) => void>;
disposed = false;
static shutdownEvent = "control/shutdown";
private static instance = null;
/**
* Creates a new, not initialized, instance of an EventHandler
* @see {@link init} for initializing
* @param kernelhost Hostname of the kernel
* @param kernelport Port the kernel is listening on
* @param modulename optional - provide a custom name for this Eventhandler
* @param requestTimeout optional - timeout in milliseconds
* @param logLevel optional - provide a custom loglevel
*/
constructor(/*kernelhost: string,*/ kernelport: number, config: Configdata, modulename?:string) {
this.config = config;
/*this.kernelHostname = kernelhost;*/
this.kernelPort = kernelport;
this.modulename = modulename;
this.pendingMessages = new Map<string, (value: ResponseArray | PromiseLike<ResponseArray>) => void>();
}
/**
* Initializes the eventhandler
*/
init(): Promise<ResponseArray> {
//Dispose Logic for shutdowns
[`beforeExit`, `SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach((eventType) => {
process.on(eventType, async (code) => {
console.log(`About to exit with code: ${code}`);
await this.dispose();
process.exit(code); //Do not prevent any kind of user induced shutdown
});//Arrow function to preserve class context
})
EventHandler.instance = this;
var stream = net.connect(this.kernelPort/*, this.kernelHostname*/);
this.secStream = peer(stream, true);
this.secStream.on('data', async (body) => { //convert chunk buffers to string
const data: Eventdata = JSON.parse(body);
body = ""; //reset body
if(!data)
return;
if(this.pendingMessages.has(data.id))
{
this.pendingMessages.get(data.id).apply(null, [new ResponseArray(...((data.payload??[]) as Array<Response>))]);
this.pendingMessages.delete(data.id);
return;
}
const eventname = data.eventname;
if (!this.bindings.has(eventname)) //if there is no event, don't process
return;
const [results, unfinished] = await this.bindings.get(eventname).invokeAsync(data.timeout, data.payload); //invoke subscribed functions
const processedResults: Response = { //pack results
id: data.id,
modulename: this.modulename,
statuscode: unfinished==0?200:207,
detailedstatus: unfinished==0? "" : DetailedStatus.PARTIAL_TIMEOUT+"|"+unfinished,
content: results
}
this.secStream.write(JSON.stringify(processedResults)); //return results
});
return this.doRequest(this.secStream, "kernel/init", this.config.timeout, {});// Init client for incoming messages
}
/**
* Request data from other modules
* @param eventname The eventname to request
* @param payload optional - Additional parameters
* @returns All responses
*/
request(eventname: string, payload: unknown = {}): Promise<ResponseArray> {
return this.requestCustomTimeout(eventname, this.config.timeout, payload);
}
/**
* Request data from other modules with a custom timeout
* @param eventname The eventname to request
* @param timeout Timeout in milliseconds
* @param payload optional - Additional parameters
* @returns All responses
*/
requestCustomTimeout(eventname: string, timeout: number, payload: unknown = {}): Promise<ResponseArray> {
return this.doRequest(this.secStream, eventname, timeout, payload);
}
/**
* Request function for internal use (Logger is set to null to prevent response loops when using logging)
* @param eventname The eventname to request
* @param timeout Timeout in milliseconds
* @param payload optional - Additional parameters
* @returns All responses
*/
private requestInternal(eventname: string, timeout: number, payload: unknown = {}): Promise<ResponseArray>{
return this.doRequest(this.secStream, eventname, timeout, payload);
}
/**
* Subscribe to future events of the given event
* @param eventname Name of the event
* @param func Callback function
* @param classcontext optional - classcontext to execute function in
*/
subscribe(eventname: string, func: (...args) => unknown, classcontext?: unknown):void {
if (!this.bindings.has(eventname)) {
this.bindings.set(eventname, new Delegate())
this.request("kernel/subscribe", {
eventname: eventname
});
}
this.bindings.get(eventname).bind(func, classcontext);
}
/**
* Unsubscribe from future events of the given event
* @param eventname Name of the event
* @param func Callback function
* @param classcontext optional - classcontext to execute function in
*/
unsubscribe(eventname: string, func: (...args) => unknown, classcontext?: unknown):void {
if (!this.bindings.has(eventname))
return;
this.bindings.get(eventname).unbind(func, classcontext);
if(this.bindings.get(eventname).funcs.length==0){
this.bindings.delete(eventname);
return;
}
this.request("kernel/unsubscribe", {
eventname: eventname
});
}
/**
* Dispose the eventhandler
*/
async dispose(): Promise<void> {
if (this.disposed)
return;
this.disposed = true; //to prevent double disposal
[`beforeExit`, `SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`].forEach((eventType) => { //remove listeners to allow the process to stop
process.off(eventType, () => this.dispose);//Arrow function to preserve class context
})
this.bindings.clear(); //Remove all bindings
try {
await this.request("kernel/dispose"); //Notify kernel of dispose
this.secStream.destroy(); //Close server for incoming messages
} catch (e) {
console.error(e);
}
}
/**
* Get the provided modulename of this EventHandler
* @returns Modulename
*/
getModuleName(): string {
return this.modulename;
}
/**
* Special Log function. Currently hardcoded on Kernel side, will call Log modules later on.
* @param logLevel The Loglevel to use
* @param content The message to write
*/
async Log(logLevel: LogLevel, content: string): Promise<void> {
if (logLevel < this.config.loglevel)
return;
this.requestInternal("kernel/log", this.config.timeout, {
message: `[${new Date().toISOString()}] ${("[" + LogLevel[logLevel] + "]").padEnd(9, " ")} [${this.modulename}] ${content}`
})
}
/**
* Wraps all internal Request/Response Logics for easy use
* @param SecStream The secured noise-peer stream
* @param path The path to request
* @param payload JSON payload
* @param logger parameter to prevent Loops/Deadlocks
* @returns The Responses from the kernel/the modules
*/
private doRequest(SecStream: peer.NoisePeer, path: string, timeout:number, payload: unknown, logger = this): Promise<ResponseArray> {
let uuid = getUUID()
let prm = new Promise<ResponseArray>((resolve, reject) => {
this.pendingMessages.set(uuid, resolve);
const data: Eventdata = {
id: uuid,
pass: this.config.pass,
modulename: this.modulename,
eventname: path,
timeout: timeout,
payload: payload
}
logger?.Log(LogLevel.Debug, "Eventhandler wrote: " + JSON.stringify(data));
//Prepare request with response logic
SecStream.write(JSON.stringify(data));
});
return prm;
}
static getInstance() {
return EventHandler.instance;
}
}