Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/redis/redis-manager/redis-manager.service.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
import { Injectable, Logger } from "@nestjs/common";
import { Injectable, Logger, OnApplicationShutdown } from "@nestjs/common";
import IORedis, { Redis, RedisOptions } from "ioredis";
import { ConfigService } from "@nestjs/config";
import { RedisConfig } from "../../configs/types/RedisConfig";

@Injectable()
export class RedisManagerService {
export class RedisManagerService implements OnApplicationShutdown {
private config: RedisConfig;

protected connections: {
[key: string]: Redis;
} = {};

private healthCheckIntervals: ReturnType<typeof setInterval>[] = [];

constructor(
private readonly logger: Logger,
private readonly configService: ConfigService,
) {
this.config = this.configService.get<RedisConfig>("redis")!;
}

public async onApplicationShutdown() {
for (const interval of this.healthCheckIntervals) {
clearInterval(interval);
}
this.healthCheckIntervals = [];

for (const [name, connection] of Object.entries(this.connections)) {
try {
connection.disconnect();
} catch (error) {
this.logger.warn(`Error disconnecting Redis "${name}":`, error);
}
}
this.connections = {};
}

public getConnection(connection = "default"): Redis {
if (!this.connections[connection]) {
const currentConnection: Redis = (this.connections[connection] =
Expand Down Expand Up @@ -45,7 +63,7 @@ export class RedisManagerService {

const pingTimeoutError = `did not receive ping in time (5 seconds)`;

setInterval(async () => {
const healthCheckInterval = setInterval(async () => {
if (currentConnection.status === "ready") {
await new Promise(async (resolve, reject) => {
const timer = setTimeout(() => {
Expand All @@ -65,6 +83,8 @@ export class RedisManagerService {
});
}
}, 5000);

this.healthCheckIntervals.push(healthCheckInterval);
});
}
return this.connections[connection];
Expand Down
34 changes: 27 additions & 7 deletions src/system/network.service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import os from "os";
import { spawn } from "child_process";
import { Injectable, Logger, OnApplicationBootstrap } from "@nestjs/common";
import { ChildProcess, spawn } from "child_process";
import {
Injectable,
Logger,
OnApplicationBootstrap,
OnApplicationShutdown,
} from "@nestjs/common";

@Injectable()
export class NetworkService implements OnApplicationBootstrap {
export class NetworkService
implements OnApplicationBootstrap, OnApplicationShutdown
{
public publicIP: string;
public networkLimit?: number;

private ipCheckInterval: ReturnType<typeof setInterval>;
private spawnedProcesses = new Set<ChildProcess>();

constructor(private readonly logger: Logger) {}

public async getNetworkLimit() {
Expand All @@ -24,14 +34,23 @@ export class NetworkService implements OnApplicationBootstrap {
public async onApplicationBootstrap() {
await this.getPublicIP();

setInterval(
this.ipCheckInterval = setInterval(
async () => {
await this.getPublicIP();
},
5 * 60 * 1000,
);
}

public async onApplicationShutdown() {
clearInterval(this.ipCheckInterval);

for (const proc of this.spawnedProcesses) {
proc.kill();
}
this.spawnedProcesses.clear();
}

public getLanIP() {
return this.getLanInterface().ipv4?.address;
}
Expand Down Expand Up @@ -305,16 +324,16 @@ export class NetworkService implements OnApplicationBootstrap {
done`,
]);

process.on(process.env.DEV ? "SIGUSR2" : "SIGTERM", () => {
monitor.kill();
});
this.spawnedProcesses.add(monitor);

monitor.stdin.on("error", async (error) => {
this.logger.error("Error running processs", error);
monitor.kill();
reject(error);
});

monitor.stderr.on("data", (error) => {
monitor.kill();
reject(error.toString());
});

Expand All @@ -327,6 +346,7 @@ export class NetworkService implements OnApplicationBootstrap {
});

monitor.on("close", () => {
this.spawnedProcesses.delete(monitor);
resolve(
returnData || "No data written to memory due to onData() handler",
);
Expand Down
7 changes: 6 additions & 1 deletion src/utilities/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,25 @@ class Throttle {
private totalBytes = 0;
private isProcessing = false;
private bytesPerSecond: number;
private intervalId: ReturnType<typeof setInterval>;
private queue: Array<{
chunk: Buffer;
send: () => void;
}> = [];

constructor() {
setInterval(() => {
this.intervalId = setInterval(() => {
this.totalBytes = 0;
if (!this.isProcessing) {
this.process();
}
}, 1000);
}

public destroy() {
clearInterval(this.intervalId);
}

public setBytesPerSecond(bytesPerSecond: number) {
if (this.bytesPerSecond !== bytesPerSecond) {
this.bytesPerSecond = bytesPerSecond;
Expand Down
Loading