Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/configs/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,29 @@ import { ApiConfig } from "./types/ApiConfig";

export default (): {
api: ApiConfig;
} => ({
api: {
url: process.env.API_SERVICE_HOST as string,
httpPort: parseInt(process.env.API_SERVICE_PORT_HTTP as string),
wsPort: parseInt(
process.env.API_SERVICE_PORT_GAME_SERVER_NODE_WS as string,
),
},
});
} => {
const httpPort = parseInt(process.env.API_SERVICE_PORT_HTTP as string);
const wsPort = parseInt(
process.env.API_SERVICE_PORT_GAME_SERVER_NODE_WS as string,
);

if (isNaN(httpPort)) {
throw new Error(
`Invalid API_SERVICE_PORT_HTTP: '${process.env.API_SERVICE_PORT_HTTP}'`,
);
}

if (isNaN(wsPort)) {
throw new Error(
`Invalid API_SERVICE_PORT_GAME_SERVER_NODE_WS: '${process.env.API_SERVICE_PORT_GAME_SERVER_NODE_WS}'`,
);
}

return {
api: {
url: process.env.API_SERVICE_HOST as string,
httpPort,
wsPort,
},
};
};
14 changes: 10 additions & 4 deletions src/demos/demos.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,15 @@ export class DemosService {
continue;
case 406:
// demo is already uploaded
fs.unlinkSync(demo.fullPath);
if (fs.existsSync(demo.fullPath)) {
fs.unlinkSync(demo.fullPath);
}
continue;
case 410:
// match map not found
fs.unlinkSync(demo.fullPath);
if (fs.existsSync(demo.fullPath)) {
fs.unlinkSync(demo.fullPath);
}
continue;
}

Expand Down Expand Up @@ -135,13 +139,15 @@ export class DemosService {
},
);

fs.unlinkSync(demo.fullPath);
if (fs.existsSync(demo.fullPath)) {
fs.unlinkSync(demo.fullPath);
}
} catch (error) {
this.logger.error(`unable to get presigned url`, error);
} finally {
const matchDir = path.join(this.DEMO_DIR, demo.matchId);
if (await this.checkIfPathEmpty(matchDir)) {
fs.rmdirSync(matchDir, { recursive: true });
fs.rmSync(matchDir, { recursive: true });
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions src/kubernetes/kubernetes.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,17 @@ export class KubernetesService {
}

public async getNode() {
return await this.apiClient.readNode({
name: this.nodeName,
});
try {
return await this.apiClient.readNode({
name: this.nodeName,
});
} catch (error) {
this.logger.error(
`Failed to get node '${this.nodeName}' from K8s API`,
error instanceof Error ? error.message : error,
);
throw error;
}
}

public async getNodeStats(node: V1Node) {
Expand Down
24 changes: 18 additions & 6 deletions src/rcon/rcon.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ export class RconService {
password: matchData.id,
});

const SEND_TIMEOUT = 5000;
rcon.send = async (command) => {
const payload = (
await rcon.sendRaw(Buffer.from(command, "utf-8"))
).toString();
const sendPromise = rcon
.sendRaw(Buffer.from(command, "utf-8"))
.then((buf) => buf.toString());

return payload;
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error(`RCON send timeout after ${SEND_TIMEOUT}ms`)),
SEND_TIMEOUT,
);
});

return Promise.race([sendPromise, timeoutPromise]);
};

rcon
Expand All @@ -54,8 +62,9 @@ export class RconService {
});

try {
let connectTimer: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => {
connectTimer = setTimeout(() => {
reject(
new Error(
`RCON connection timeout after ${this.CONNECTION_TIMEOUT}ms`,
Expand All @@ -65,6 +74,7 @@ export class RconService {
});

await Promise.race([rcon.connect(), timeoutPromise]);
clearTimeout(connectTimer!);
} catch (error) {
this.logger.warn("RCON connect error:", error);
try {
Expand All @@ -74,11 +84,13 @@ export class RconService {
} catch (cleanupError) {
this.logger.warn("Error during RCON cleanup:", cleanupError);
}
return null;
}

this.connections[matchId] = rcon;
this.setupConnectionTimeout(matchId);

return (this.connections[matchId] = rcon);
return rcon;
}

private setupConnectionTimeout(matchId: string) {
Expand Down
26 changes: 23 additions & 3 deletions src/redis/redis-manager/redis-manager.service.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
import { Injectable, Logger } from "@nestjs/common";
import { Injectable, Logger, OnApplicationShutdown } from "@nestjs/common";
import IORedis, { Redis, RedisOptions } from "ioredis";
import { ConfigService } from "@nestjs/config";
import { RedisConfig } from "../../configs/types/RedisConfig";

@Injectable()
export class RedisManagerService {
export class RedisManagerService implements OnApplicationShutdown {
private config: RedisConfig;

protected connections: {
[key: string]: Redis;
} = {};

private healthCheckIntervals: ReturnType<typeof setInterval>[] = [];

constructor(
private readonly logger: Logger,
private readonly configService: ConfigService,
) {
this.config = this.configService.get<RedisConfig>("redis")!;
}

public async onApplicationShutdown() {
for (const interval of this.healthCheckIntervals) {
clearInterval(interval);
}
this.healthCheckIntervals = [];

for (const [name, connection] of Object.entries(this.connections)) {
try {
connection.disconnect();
} catch (error) {
this.logger.warn(`Error disconnecting Redis "${name}":`, error);
}
}
this.connections = {};
}

public getConnection(connection = "default"): Redis {
if (!this.connections[connection]) {
const currentConnection: Redis = (this.connections[connection] =
Expand Down Expand Up @@ -45,7 +63,7 @@ export class RedisManagerService {

const pingTimeoutError = `did not receive ping in time (5 seconds)`;

setInterval(async () => {
const healthCheckInterval = setInterval(async () => {
if (currentConnection.status === "ready") {
await new Promise(async (resolve, reject) => {
const timer = setTimeout(() => {
Expand All @@ -65,6 +83,8 @@ export class RedisManagerService {
});
}
}, 5000);

this.healthCheckIntervals.push(healthCheckInterval);
});
}
return this.connections[connection];
Expand Down
32 changes: 28 additions & 4 deletions src/system/network.service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import os from "os";
import { spawn } from "child_process";
import { Injectable, Logger, OnApplicationBootstrap } from "@nestjs/common";
import { ChildProcess, spawn } from "child_process";
import {
Injectable,
Logger,
OnApplicationBootstrap,
OnApplicationShutdown,
} from "@nestjs/common";

@Injectable()
export class NetworkService implements OnApplicationBootstrap {
export class NetworkService
implements OnApplicationBootstrap, OnApplicationShutdown
{
public publicIP: string;
public networkLimit?: number;

private ipCheckInterval: ReturnType<typeof setInterval>;
private spawnedProcesses = new Set<ChildProcess>();

constructor(private readonly logger: Logger) {}

public async getNetworkLimit() {
Expand All @@ -24,14 +34,23 @@ export class NetworkService implements OnApplicationBootstrap {
public async onApplicationBootstrap() {
await this.getPublicIP();

setInterval(
this.ipCheckInterval = setInterval(
async () => {
await this.getPublicIP();
},
5 * 60 * 1000,
);
}

public async onApplicationShutdown() {
clearInterval(this.ipCheckInterval);

for (const proc of this.spawnedProcesses) {
proc.kill();
}
this.spawnedProcesses.clear();
}

public getLanIP() {
return this.getLanInterface().ipv4?.address;
}
Expand Down Expand Up @@ -305,16 +324,20 @@ export class NetworkService implements OnApplicationBootstrap {
done`,
]);

this.spawnedProcesses.add(monitor);

process.on(process.env.DEV ? "SIGUSR2" : "SIGTERM", () => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is required for local dev.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restored the process.on(DEV ? 'SIGUSR2' : 'SIGTERM') signal handler.

monitor.kill();
});

monitor.stdin.on("error", async (error) => {
this.logger.error("Error running processs", error);
monitor.kill();
reject(error);
});

monitor.stderr.on("data", (error) => {
monitor.kill();
reject(error.toString());
});

Expand All @@ -327,6 +350,7 @@ export class NetworkService implements OnApplicationBootstrap {
});

monitor.on("close", () => {
this.spawnedProcesses.delete(monitor);
resolve(
returnData || "No data written to memory due to onData() handler",
);
Expand Down
13 changes: 8 additions & 5 deletions src/system/system.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,14 @@ export class SystemService {

for (const file of cpuGovernorFiles) {
try {
governors[
parseInt(
path.basename(path.dirname(path.dirname(file))).replace("cpu", ""),
)
] = fs.readFileSync(file, "utf8").trim();
const cpuIndex = parseInt(
path.basename(path.dirname(path.dirname(file))).replace("cpu", ""),
);
if (isNaN(cpuIndex)) {
this.logger.warn(`Could not parse CPU index from path: ${file}`);
continue;
}
governors[cpuIndex] = fs.readFileSync(file, "utf8").trim();
} catch (error) {
this.logger.error(`Error getting CPU governor [${file}]: ${error}`);
}
Expand Down
7 changes: 6 additions & 1 deletion src/utilities/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,25 @@ class Throttle {
private totalBytes = 0;
private isProcessing = false;
private bytesPerSecond: number;
private intervalId: ReturnType<typeof setInterval>;
private queue: Array<{
chunk: Buffer;
send: () => void;
}> = [];

constructor() {
setInterval(() => {
this.intervalId = setInterval(() => {
this.totalBytes = 0;
if (!this.isProcessing) {
this.process();
}
}, 1000);
}

public destroy() {
clearInterval(this.intervalId);
}

public setBytesPerSecond(bytesPerSecond: number) {
if (this.bytesPerSecond !== bytesPerSecond) {
this.bytesPerSecond = bytesPerSecond;
Expand Down
25 changes: 16 additions & 9 deletions src/webrtc/webrtc.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,22 @@ export class WebrtcService {

const latencyTestKey = `latency-test:${sessionId}`;

void this.redis
.multi()
.hset(
latencyTestKey,
region.toLowerCase().replace(" ", "_"),
JSON.stringify(results),
)
.expire(latencyTestKey, 60 * 60)
.exec();
try {
await this.redis
.multi()
.hset(
latencyTestKey,
region.toLowerCase().replace(" ", "_"),
JSON.stringify(results),
)
.expire(latencyTestKey, 60 * 60)
.exec();
} catch (error) {
this.logger.error(
`Failed to store latency results for session ${sessionId}`,
error,
);
}

datachannel.sendMessage(
JSON.stringify({
Expand Down
Loading