diff --git a/src/configs/api.ts b/src/configs/api.ts index 2fec6d8..14bdb96 100644 --- a/src/configs/api.ts +++ b/src/configs/api.ts @@ -2,12 +2,29 @@ import { ApiConfig } from "./types/ApiConfig"; export default (): { api: ApiConfig; -} => ({ - api: { - url: process.env.API_SERVICE_HOST as string, - httpPort: parseInt(process.env.API_SERVICE_PORT_HTTP as string), - wsPort: parseInt( - process.env.API_SERVICE_PORT_GAME_SERVER_NODE_WS as string, - ), - }, -}); +} => { + const httpPort = parseInt(process.env.API_SERVICE_PORT_HTTP as string); + const wsPort = parseInt( + process.env.API_SERVICE_PORT_GAME_SERVER_NODE_WS as string, + ); + + if (isNaN(httpPort)) { + throw new Error( + `Invalid API_SERVICE_PORT_HTTP: '${process.env.API_SERVICE_PORT_HTTP}'`, + ); + } + + if (isNaN(wsPort)) { + throw new Error( + `Invalid API_SERVICE_PORT_GAME_SERVER_NODE_WS: '${process.env.API_SERVICE_PORT_GAME_SERVER_NODE_WS}'`, + ); + } + + return { + api: { + url: process.env.API_SERVICE_HOST as string, + httpPort, + wsPort, + }, + }; +}; diff --git a/src/demos/demos.service.ts b/src/demos/demos.service.ts index cbdd5a6..dd057a7 100644 --- a/src/demos/demos.service.ts +++ b/src/demos/demos.service.ts @@ -75,11 +75,15 @@ export class DemosService { continue; case 406: // demo is already uploaded - fs.unlinkSync(demo.fullPath); + if (fs.existsSync(demo.fullPath)) { + fs.unlinkSync(demo.fullPath); + } continue; case 410: // match map not found - fs.unlinkSync(demo.fullPath); + if (fs.existsSync(demo.fullPath)) { + fs.unlinkSync(demo.fullPath); + } continue; } @@ -135,13 +139,15 @@ export class DemosService { }, ); - fs.unlinkSync(demo.fullPath); + if (fs.existsSync(demo.fullPath)) { + fs.unlinkSync(demo.fullPath); + } } catch (error) { this.logger.error(`unable to get presigned url`, error); } finally { const matchDir = path.join(this.DEMO_DIR, demo.matchId); if (await this.checkIfPathEmpty(matchDir)) { - fs.rmdirSync(matchDir, { recursive: true }); + fs.rmSync(matchDir, { recursive: true }); } } } diff --git a/src/kubernetes/kubernetes.service.ts b/src/kubernetes/kubernetes.service.ts index 070c214..542b48a 100644 --- a/src/kubernetes/kubernetes.service.ts +++ b/src/kubernetes/kubernetes.service.ts @@ -67,9 +67,17 @@ export class KubernetesService { } public async getNode() { - return await this.apiClient.readNode({ - name: this.nodeName, - }); + try { + return await this.apiClient.readNode({ + name: this.nodeName, + }); + } catch (error) { + this.logger.error( + `Failed to get node '${this.nodeName}' from K8s API`, + error instanceof Error ? error.message : error, + ); + throw error; + } } public async getNodeStats(node: V1Node) { diff --git a/src/rcon/rcon.service.ts b/src/rcon/rcon.service.ts index d9fba3e..b4130c0 100644 --- a/src/rcon/rcon.service.ts +++ b/src/rcon/rcon.service.ts @@ -34,12 +34,20 @@ export class RconService { password: matchData.id, }); + const SEND_TIMEOUT = 5000; rcon.send = async (command) => { - const payload = ( - await rcon.sendRaw(Buffer.from(command, "utf-8")) - ).toString(); + const sendPromise = rcon + .sendRaw(Buffer.from(command, "utf-8")) + .then((buf) => buf.toString()); - return payload; + const timeoutPromise = new Promise((_, reject) => { + setTimeout( + () => reject(new Error(`RCON send timeout after ${SEND_TIMEOUT}ms`)), + SEND_TIMEOUT, + ); + }); + + return Promise.race([sendPromise, timeoutPromise]); }; rcon @@ -54,8 +62,9 @@ export class RconService { }); try { + let connectTimer: ReturnType; const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => { + connectTimer = setTimeout(() => { reject( new Error( `RCON connection timeout after ${this.CONNECTION_TIMEOUT}ms`, @@ -65,6 +74,7 @@ export class RconService { }); await Promise.race([rcon.connect(), timeoutPromise]); + clearTimeout(connectTimer!); } catch (error) { this.logger.warn("RCON connect error:", error); try { @@ -74,11 +84,13 @@ export class RconService { } catch (cleanupError) { this.logger.warn("Error during RCON cleanup:", cleanupError); } + return null; } + this.connections[matchId] = rcon; this.setupConnectionTimeout(matchId); - return (this.connections[matchId] = rcon); + return rcon; } private setupConnectionTimeout(matchId: string) { diff --git a/src/redis/redis-manager/redis-manager.service.ts b/src/redis/redis-manager/redis-manager.service.ts index b3587f8..3bdc860 100644 --- a/src/redis/redis-manager/redis-manager.service.ts +++ b/src/redis/redis-manager/redis-manager.service.ts @@ -1,16 +1,18 @@ -import { Injectable, Logger } from "@nestjs/common"; +import { Injectable, Logger, OnApplicationShutdown } from "@nestjs/common"; import IORedis, { Redis, RedisOptions } from "ioredis"; import { ConfigService } from "@nestjs/config"; import { RedisConfig } from "../../configs/types/RedisConfig"; @Injectable() -export class RedisManagerService { +export class RedisManagerService implements OnApplicationShutdown { private config: RedisConfig; protected connections: { [key: string]: Redis; } = {}; + private healthCheckIntervals: ReturnType[] = []; + constructor( private readonly logger: Logger, private readonly configService: ConfigService, @@ -18,6 +20,22 @@ export class RedisManagerService { this.config = this.configService.get("redis")!; } + public async onApplicationShutdown() { + for (const interval of this.healthCheckIntervals) { + clearInterval(interval); + } + this.healthCheckIntervals = []; + + for (const [name, connection] of Object.entries(this.connections)) { + try { + connection.disconnect(); + } catch (error) { + this.logger.warn(`Error disconnecting Redis "${name}":`, error); + } + } + this.connections = {}; + } + public getConnection(connection = "default"): Redis { if (!this.connections[connection]) { const currentConnection: Redis = (this.connections[connection] = @@ -45,7 +63,7 @@ export class RedisManagerService { const pingTimeoutError = `did not receive ping in time (5 seconds)`; - setInterval(async () => { + const healthCheckInterval = setInterval(async () => { if (currentConnection.status === "ready") { await new Promise(async (resolve, reject) => { const timer = setTimeout(() => { @@ -65,6 +83,8 @@ export class RedisManagerService { }); } }, 5000); + + this.healthCheckIntervals.push(healthCheckInterval); }); } return this.connections[connection]; diff --git a/src/system/network.service.ts b/src/system/network.service.ts index 3631d01..5ce8b68 100644 --- a/src/system/network.service.ts +++ b/src/system/network.service.ts @@ -1,12 +1,22 @@ import os from "os"; -import { spawn } from "child_process"; -import { Injectable, Logger, OnApplicationBootstrap } from "@nestjs/common"; +import { ChildProcess, spawn } from "child_process"; +import { + Injectable, + Logger, + OnApplicationBootstrap, + OnApplicationShutdown, +} from "@nestjs/common"; @Injectable() -export class NetworkService implements OnApplicationBootstrap { +export class NetworkService + implements OnApplicationBootstrap, OnApplicationShutdown +{ public publicIP: string; public networkLimit?: number; + private ipCheckInterval: ReturnType; + private spawnedProcesses = new Set(); + constructor(private readonly logger: Logger) {} public async getNetworkLimit() { @@ -24,7 +34,7 @@ export class NetworkService implements OnApplicationBootstrap { public async onApplicationBootstrap() { await this.getPublicIP(); - setInterval( + this.ipCheckInterval = setInterval( async () => { await this.getPublicIP(); }, @@ -32,6 +42,15 @@ export class NetworkService implements OnApplicationBootstrap { ); } + public async onApplicationShutdown() { + clearInterval(this.ipCheckInterval); + + for (const proc of this.spawnedProcesses) { + proc.kill(); + } + this.spawnedProcesses.clear(); + } + public getLanIP() { return this.getLanInterface().ipv4?.address; } @@ -305,16 +324,20 @@ export class NetworkService implements OnApplicationBootstrap { done`, ]); + this.spawnedProcesses.add(monitor); + process.on(process.env.DEV ? "SIGUSR2" : "SIGTERM", () => { monitor.kill(); }); monitor.stdin.on("error", async (error) => { this.logger.error("Error running processs", error); + monitor.kill(); reject(error); }); monitor.stderr.on("data", (error) => { + monitor.kill(); reject(error.toString()); }); @@ -327,6 +350,7 @@ export class NetworkService implements OnApplicationBootstrap { }); monitor.on("close", () => { + this.spawnedProcesses.delete(monitor); resolve( returnData || "No data written to memory due to onData() handler", ); diff --git a/src/system/system.service.ts b/src/system/system.service.ts index 40b829c..63d9925 100644 --- a/src/system/system.service.ts +++ b/src/system/system.service.ts @@ -151,11 +151,14 @@ export class SystemService { for (const file of cpuGovernorFiles) { try { - governors[ - parseInt( - path.basename(path.dirname(path.dirname(file))).replace("cpu", ""), - ) - ] = fs.readFileSync(file, "utf8").trim(); + const cpuIndex = parseInt( + path.basename(path.dirname(path.dirname(file))).replace("cpu", ""), + ); + if (isNaN(cpuIndex)) { + this.logger.warn(`Could not parse CPU index from path: ${file}`); + continue; + } + governors[cpuIndex] = fs.readFileSync(file, "utf8").trim(); } catch (error) { this.logger.error(`Error getting CPU governor [${file}]: ${error}`); } diff --git a/src/utilities/throttle.ts b/src/utilities/throttle.ts index 3d1fc87..9e5b9a1 100644 --- a/src/utilities/throttle.ts +++ b/src/utilities/throttle.ts @@ -48,13 +48,14 @@ class Throttle { private totalBytes = 0; private isProcessing = false; private bytesPerSecond: number; + private intervalId: ReturnType; private queue: Array<{ chunk: Buffer; send: () => void; }> = []; constructor() { - setInterval(() => { + this.intervalId = setInterval(() => { this.totalBytes = 0; if (!this.isProcessing) { this.process(); @@ -62,6 +63,10 @@ class Throttle { }, 1000); } + public destroy() { + clearInterval(this.intervalId); + } + public setBytesPerSecond(bytesPerSecond: number) { if (this.bytesPerSecond !== bytesPerSecond) { this.bytesPerSecond = bytesPerSecond; diff --git a/src/webrtc/webrtc.service.ts b/src/webrtc/webrtc.service.ts index 813b286..8734257 100644 --- a/src/webrtc/webrtc.service.ts +++ b/src/webrtc/webrtc.service.ts @@ -100,15 +100,22 @@ export class WebrtcService { const latencyTestKey = `latency-test:${sessionId}`; - void this.redis - .multi() - .hset( - latencyTestKey, - region.toLowerCase().replace(" ", "_"), - JSON.stringify(results), - ) - .expire(latencyTestKey, 60 * 60) - .exec(); + try { + await this.redis + .multi() + .hset( + latencyTestKey, + region.toLowerCase().replace(" ", "_"), + JSON.stringify(results), + ) + .expire(latencyTestKey, 60 * 60) + .exec(); + } catch (error) { + this.logger.error( + `Failed to store latency results for session ${sessionId}`, + error, + ); + } datachannel.sendMessage( JSON.stringify({