Skip to content

Commit eb3af29

Browse files
Merge pull request #33 from sebastiankrll/enhance/ws-bandwith
Enhance/ws bandwith
2 parents 7a76624 + b81c876 commit eb3af29

File tree

13 files changed

+365
-301
lines changed

13 files changed

+365
-301
lines changed

apps/api/src/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,10 @@ app.get(
169169
return;
170170
}
171171

172-
res.json(all);
172+
const buffer = Buffer.from(all, "base64");
173+
res.setHeader("Content-Type", "application/octet-stream");
174+
res.setHeader("Content-Encoding", "gzip");
175+
res.send(buffer);
173176
}),
174177
);
175178

apps/ingestion/src/airport.ts

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ const TAF_URL = "https://aviationweather.gov/data/cache/tafs.cache.xml.gz";
3131
const WEATHER_FETCH_INTERVAL = 600_000;
3232

3333
let cached: AirportLong[] = [];
34-
let deleted: string[] = [];
3534
let updated: AirportShort[] = [];
36-
let added: AirportShort[] = [];
35+
let added: Required<AirportShort>[] = [];
3736

3837
export async function mapAirports(pilotsLong: PilotLong[]): Promise<AirportLong[]> {
3938
await updateWeather();
@@ -126,31 +125,48 @@ export async function mapAirports(pilotsLong: PilotLong[]): Promise<AirportLong[
126125
}
127126

128127
const airportsLong = Object.values(airportRecord);
128+
setAirportDelta(airportsLong);
129129

130-
const deletedLong = cached.filter((a) => !airportsLong.some((b) => b.icao === a.icao));
131-
const addedLong = airportsLong.filter((a) => !cached.some((b) => b.icao === a.icao));
132-
const updatedLong = airportsLong.filter((a) => cached.some((b) => b.icao === a.icao));
130+
return airportsLong;
131+
}
133132

134-
deleted = deletedLong.map((a) => a.icao);
135-
added = addedLong.map(getAirportShort);
136-
updated = updatedLong.map(getAirportShort);
137-
// console.log(airportsLong[0])
133+
function setAirportDelta(airportsLong: AirportLong[]): void {
134+
updated = [];
135+
added = [];
136+
137+
for (const a of airportsLong) {
138+
const cachedAirport = cached.find((c) => c.icao === a.icao);
139+
140+
if (!cachedAirport) {
141+
added.push(getAirportShort(a) as Required<AirportShort>);
142+
} else {
143+
const airportShort = getAirportShort(a, cachedAirport);
144+
updated.push(airportShort);
145+
}
146+
}
138147

139148
cached = airportsLong;
140-
return airportsLong;
141149
}
142150

143-
export function getAirportShort(a: AirportLong): AirportShort {
144-
return {
145-
icao: a.icao,
146-
dep_traffic: a.dep_traffic,
147-
arr_traffic: a.arr_traffic,
148-
};
151+
export function getAirportShort(a: AirportLong, c?: AirportLong): AirportShort {
152+
if (!c) {
153+
return {
154+
icao: a.icao,
155+
dep_traffic: a.dep_traffic,
156+
arr_traffic: a.arr_traffic,
157+
};
158+
} else {
159+
const airportShort: AirportShort = { icao: a.icao };
160+
161+
if (JSON.stringify(a.dep_traffic) !== JSON.stringify(c.dep_traffic)) airportShort.dep_traffic = a.dep_traffic;
162+
if (JSON.stringify(a.arr_traffic) !== JSON.stringify(c.arr_traffic)) airportShort.arr_traffic = a.arr_traffic;
163+
164+
return airportShort;
165+
}
149166
}
150167

151168
export function getAirportDelta(): AirportDelta {
152169
return {
153-
deleted,
154170
added,
155171
updated,
156172
};

apps/ingestion/src/controller.ts

Lines changed: 71 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,30 @@ import type { FIRFeature, SimAwareTraconFeature } from "@sr24/types/db";
33
import type { ControllerDelta, ControllerLong, ControllerMerged, ControllerShort, PilotLong, VatsimData } from "@sr24/types/vatsim";
44
import { haversineDistance } from "./utils/helpers.js";
55

6-
let cachedMerged: ControllerMerged[] = [];
7-
let deleted: string[] = [];
6+
let cached: ControllerMerged[] = [];
87
let updated: ControllerMerged[] = [];
98
let added: ControllerMerged[] = [];
109

1110
export async function mapControllers(vatsimData: VatsimData, pilotsLong: PilotLong[]): Promise<[ControllerLong[], ControllerMerged[]]> {
12-
const controllersLong: ControllerLong[] = vatsimData.controllers.map((controller) => {
13-
return {
14-
callsign: controller.callsign,
15-
frequency: parseFrequencyToKHz(controller.frequency),
16-
facility: controller.facility,
17-
atis: controller.text_atis,
18-
connections: 0,
19-
cid: controller.cid,
20-
name: controller.name,
21-
rating: controller.rating,
22-
server: controller.server,
23-
visual_range: controller.visual_range,
24-
logon_time: new Date(controller.logon_time),
25-
timestamp: new Date(controller.last_updated),
26-
};
27-
});
11+
const controllersLong: ControllerLong[] = vatsimData.controllers
12+
.map((controller) => {
13+
if (controller.facility === 0 && !controller.callsign.includes("OBS")) return null;
14+
return {
15+
callsign: controller.callsign,
16+
frequency: parseFrequencyToKHz(controller.frequency),
17+
facility: controller.facility,
18+
atis: controller.text_atis,
19+
connections: 0,
20+
cid: controller.cid,
21+
name: controller.name,
22+
rating: controller.rating,
23+
server: controller.server,
24+
visual_range: controller.visual_range,
25+
logon_time: new Date(controller.logon_time),
26+
timestamp: new Date(controller.last_updated),
27+
};
28+
})
29+
.filter((c) => c !== null);
2830

2931
getConnectionsCount(vatsimData, controllersLong, pilotsLong);
3032

@@ -46,19 +48,63 @@ export async function mapControllers(vatsimData: VatsimData, pilotsLong: PilotLo
4648
});
4749

4850
const merged = await mergeControllers(controllersLong);
51+
setControllerDelta(merged);
4952

50-
const deletedMerged = cachedMerged.filter((a) => !merged.some((b) => b.id === a.id));
51-
deleted = deletedMerged.map((c) => c.id);
52-
added = merged.filter((a) => !cachedMerged.some((b) => b.id === a.id));
53-
updated = merged.filter((a) => cachedMerged.some((b) => b.id === a.id));
54-
55-
cachedMerged = merged;
5653
return [controllersLong, merged];
5754
}
5855

56+
function setControllerDelta(merged: ControllerMerged[]): void {
57+
added = [];
58+
updated = [];
59+
60+
for (const m of merged) {
61+
const cachedMerged = cached.find((c) => c.id === m.id);
62+
if (!cachedMerged) {
63+
added.push(m);
64+
} else {
65+
const updatedControllers: ControllerShort[] = [];
66+
67+
for (const controller of m.controllers) {
68+
const cachedController = cachedMerged.controllers.find((c) => c.callsign === controller.callsign);
69+
const controllerShort = getControllerShort(controller, cachedController);
70+
updatedControllers.push(controllerShort);
71+
}
72+
73+
if (updatedControllers.length > 0) {
74+
updated.push({
75+
id: m.id,
76+
facility: m.facility,
77+
controllers: updatedControllers,
78+
});
79+
}
80+
}
81+
}
82+
83+
cached = merged;
84+
}
85+
86+
function getControllerShort(controller: ControllerShort, cachedController?: ControllerShort): ControllerShort {
87+
if (!cachedController) {
88+
return {
89+
callsign: controller.callsign,
90+
frequency: controller.frequency,
91+
facility: controller.facility,
92+
atis: controller.atis,
93+
connections: controller.connections,
94+
};
95+
} else {
96+
const controllerShort: ControllerShort = { callsign: controller.callsign, facility: controller.facility };
97+
98+
if (controller.frequency !== cachedController.frequency) controllerShort.frequency = controller.frequency;
99+
if (JSON.stringify(controller.atis) !== JSON.stringify(cachedController.atis)) controllerShort.atis = controller.atis;
100+
if (controller.connections !== cachedController.connections) controllerShort.connections = controller.connections;
101+
102+
return controllerShort;
103+
}
104+
}
105+
59106
export function getControllerDelta(): ControllerDelta {
60107
return {
61-
deleted,
62108
added,
63109
updated,
64110
};

apps/ingestion/src/index.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import "dotenv/config";
2+
import { promisify } from "node:util";
3+
import { gzip } from "node:zlib";
24
import { pgDeleteStalePilots, pgUpsertPilots } from "@sr24/db/pg";
35
import { rdsConnect, rdsPub, rdsSetMultiple, rdsSetMultipleTimeSeries, rdsSetSingle } from "@sr24/db/redis";
4-
import type { TrackPoint, VatsimData, VatsimTransceivers, WsAll, WsDelta } from "@sr24/types/vatsim";
6+
import type { AirportShort, PilotShort, TrackPoint, VatsimData, VatsimTransceivers, WsAll, WsDelta } from "@sr24/types/vatsim";
57
import axios from "axios";
68
import { getAirportDelta, getAirportShort, mapAirports } from "./airport.js";
79
import { getControllerDelta, mapControllers } from "./controller.js";
@@ -12,6 +14,8 @@ const VATSIM_DATA_URL = "https://data.vatsim.net/v3/vatsim-data.json";
1214
const VATSIM_TRANSCEIVERS_URL = "https://data.vatsim.net/v3/transceivers-data.json";
1315
const FETCH_INTERVAL = 5_000;
1416

17+
const gzipAsync = promisify(gzip);
18+
1519
let dbsInitialized = false;
1620
let updating = false;
1721
let lastVatsimUpdate = 0;
@@ -47,15 +51,17 @@ async function fetchVatsimData(): Promise<void> {
4751
airports: getAirportDelta(),
4852
controllers: getControllerDelta(),
4953
};
50-
rdsPub("ws:delta", delta);
54+
const gzDelta = await gzipAsync(JSON.stringify(delta));
55+
rdsPub("ws:delta", gzDelta.toString("base64"));
5156

5257
// Set full websocket data on redis ws:all
5358
const all: WsAll = {
54-
pilots: pilotsLong.map(getPilotShort),
55-
airports: airportsLong.map(getAirportShort),
59+
pilots: pilotsLong.map((p) => getPilotShort(p) as Required<PilotShort>),
60+
airports: airportsLong.map((a) => getAirportShort(a) as Required<AirportShort>),
5661
controllers: controllersMerged,
5762
};
58-
rdsSetSingle("ws:all", all);
63+
const gzAll = await gzipAsync(JSON.stringify(all));
64+
rdsSetSingle("ws:all", gzAll.toString("base64"));
5965

6066
// Set pilots, controllers and airports data in redis
6167
rdsSetMultiple(pilotsLong, "pilot", (p) => p.id, "pilots:live", 120);

apps/ingestion/src/pilot.ts

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,10 @@ const MILITARY_RATINGS = [
8080
];
8181

8282
let cached: PilotLong[] = [];
83-
let deleted: string[] = [];
8483
let updated: PilotShort[] = [];
85-
let added: PilotShort[] = [];
84+
let added: Required<PilotShort>[] = [];
8685

8786
export async function mapPilots(latestVatsimData: VatsimData): Promise<[PilotLong[], PilotLong[]]> {
88-
deleted = [];
89-
updated = [];
90-
added = [];
91-
9287
const pilotsLongPromises: Promise<PilotLong>[] = latestVatsimData.pilots.map(async (pilot) => {
9388
const id = getPilotId(pilot);
9489
const cachedPilot = cached.find((c) => c.id === id);
@@ -145,19 +140,12 @@ export async function mapPilots(latestVatsimData: VatsimData): Promise<[PilotLon
145140
});
146141

147142
const pilotsLong = await Promise.all(pilotsLongPromises);
148-
149143
const deletedLong = cached.filter((a) => !pilotsLong.some((b) => b.id === a.id));
150144
deletedLong.forEach((p) => {
151145
p.live = false;
152146
});
153147

154-
const addedLong = pilotsLong.filter((a) => !cached.some((b) => b.id === a.id));
155-
const updatedLong = pilotsLong.filter((a) => cached.some((b) => b.id === a.id));
156-
157-
added = addedLong.map(getPilotShort);
158-
updated = updatedLong.map(getPilotShort);
159-
deleted = deletedLong.map((p) => p.id);
160-
cached = pilotsLong;
148+
setPilotDelta(pilotsLong);
161149

162150
return [pilotsLong, deletedLong];
163151
}
@@ -175,31 +163,67 @@ function getPilotId(pilot: VatsimPilot): string {
175163
return b64url.slice(0, 10);
176164
}
177165

166+
function setPilotDelta(pilotsLong: PilotLong[]): void {
167+
added = [];
168+
updated = [];
169+
170+
for (const p of pilotsLong) {
171+
const cachedPilot = cached.find((c) => c.id === p.id);
172+
if (!cachedPilot) {
173+
added.push(getPilotShort(p) as Required<PilotShort>);
174+
} else {
175+
const pilotShort = getPilotShort(p, cachedPilot);
176+
updated.push(pilotShort);
177+
}
178+
}
179+
180+
cached = pilotsLong;
181+
}
182+
178183
export function getPilotDelta(): PilotDelta {
179184
return {
180-
deleted,
181185
added,
182186
updated,
183187
};
184188
}
185189

186-
export function getPilotShort(p: PilotLong): PilotShort {
187-
return {
188-
id: p.id,
189-
latitude: p.latitude,
190-
longitude: p.longitude,
191-
altitude_agl: p.altitude_agl,
192-
altitude_ms: p.altitude_ms,
193-
groundspeed: p.groundspeed,
194-
vertical_speed: p.vertical_speed,
195-
heading: p.heading,
196-
callsign: p.callsign,
197-
aircraft: p.aircraft,
198-
transponder: p.transponder,
199-
frequency: p.frequency,
200-
route: p.route,
201-
ghost: p.ghost,
202-
};
190+
export function getPilotShort(p: PilotLong, c?: PilotLong): PilotShort {
191+
if (!c) {
192+
return {
193+
id: p.id,
194+
latitude: p.latitude,
195+
longitude: p.longitude,
196+
altitude_agl: p.altitude_agl,
197+
altitude_ms: p.altitude_ms,
198+
groundspeed: p.groundspeed,
199+
vertical_speed: p.vertical_speed,
200+
heading: p.heading,
201+
callsign: p.callsign,
202+
aircraft: p.aircraft,
203+
transponder: p.transponder,
204+
frequency: p.frequency,
205+
route: p.route,
206+
ghost: p.ghost,
207+
};
208+
} else {
209+
const pilotShort: PilotShort = { id: p.id };
210+
211+
if (p.latitude !== c.latitude) pilotShort.latitude = p.latitude;
212+
if (p.longitude !== c.longitude) pilotShort.longitude = p.longitude;
213+
if (p.altitude_agl !== c.altitude_agl) pilotShort.altitude_agl = p.altitude_agl;
214+
if (p.altitude_ms !== c.altitude_ms) pilotShort.altitude_ms = p.altitude_ms;
215+
if (p.groundspeed !== c.groundspeed) pilotShort.groundspeed = p.groundspeed;
216+
if (p.vertical_speed !== c.vertical_speed) pilotShort.vertical_speed = p.vertical_speed;
217+
if (p.heading !== c.heading) pilotShort.heading = p.heading;
218+
if (p.callsign !== c.callsign) pilotShort.callsign = p.callsign;
219+
if (p.aircraft !== c.aircraft) pilotShort.aircraft = p.aircraft;
220+
if (p.transponder !== c.transponder) pilotShort.transponder = p.transponder;
221+
if (p.frequency !== c.frequency) pilotShort.frequency = p.frequency;
222+
if (p.route !== c.route) pilotShort.route = p.route;
223+
if (p.ghost !== c.ghost) pilotShort.ghost = p.ghost;
224+
225+
return pilotShort;
226+
}
203227
}
204228

205229
function calculateVerticalSpeed(current: PilotLong, cache: PilotLong | undefined): number {

0 commit comments

Comments
 (0)