From 9fd6d7159a8ec7d2fd4facc089be1b6185510e68 Mon Sep 17 00:00:00 2001 From: hector Date: Fri, 18 Oct 2024 13:56:02 +0200 Subject: [PATCH] (Feat) Add Basic API on RTB Objects --- .gitignore | 2 + handlers/ApiV2/BoxApi.py | 101 ++++++++++++++ handlers/ApiV2/CorporationApi.py | 90 ++++++++++++ handlers/ApiV2/FlagApi.py | 86 ++++++++++++ handlers/ApiV2/TeamApi.py | 43 ++++++ handlers/ApiV2/__init__.py | 0 handlers/__init__.py | 5 + models/Corporation.py | 2 + models/Team.py | 9 +- tests/testHandlersApiV2.py | 233 +++++++++++++++++++++++++++++++ 10 files changed, 570 insertions(+), 1 deletion(-) create mode 100644 handlers/ApiV2/BoxApi.py create mode 100644 handlers/ApiV2/CorporationApi.py create mode 100644 handlers/ApiV2/FlagApi.py create mode 100644 handlers/ApiV2/TeamApi.py create mode 100644 handlers/ApiV2/__init__.py create mode 100644 tests/testHandlersApiV2.py diff --git a/.gitignore b/.gitignore index 1aca22c7..e91d98ae 100644 --- a/.gitignore +++ b/.gitignore @@ -78,3 +78,5 @@ __pycache__ # Readme !*README.md + +venv/* \ No newline at end of file diff --git a/handlers/ApiV2/BoxApi.py b/handlers/ApiV2/BoxApi.py new file mode 100644 index 00000000..7a1ad741 --- /dev/null +++ b/handlers/ApiV2/BoxApi.py @@ -0,0 +1,101 @@ +import json +from ..BaseHandlers import BaseHandler +from libs.SecurityDecorators import apikey, restrict_ip_address +from models.Box import Box +from models.Corporation import Corporation +from models.GameLevel import GameLevel +import logging +from models import dbsession + +logger = logging.getLogger() + + +class BoxApiHandler(BaseHandler): + + @apikey + @restrict_ip_address + def get(self, id=None): + if id is None or id == "": + data = {"data": [box.to_dict() for box in Box.all()]} + else: + box = Box.by_id(id) + if box is not None: + data = {"data": box.to_dict()} + else: + data = {"message": "Box not found"} + self.write(json.dumps(data)) + + @apikey + @restrict_ip_address + def post(self, *args, **kwargs): + data = json.loads(self.request.body) + logger.info(f"Posted data : {data}") + if "corporation" not in data: + data = { + "data": {"corporation": None}, + "message": "Corporation is required", + } + self.write(json.dumps(data)) + return + if "name" not in data: + data = {"data": {"box": None}, "message": "Name is required"} + self.write(json.dumps(data)) + return + + if Box.by_name(data["name"]) is not None: + data = { + "data": {"box": None}, + "message": "This box already exists", + } + self.write(json.dumps(data)) + return + + if Corporation.by_name(data["corporation"]) is None: + data = { + "data": {"corporation": data["corporation"]}, + "message": "This corporation does not exist", + } + self.write(json.dumps(data)) + return + + new_box = Box() + new_box.name = data["name"] + new_box.description = data["description"] if "description" in data else "" + new_box.corporation_id = ( + Corporation.by_name(data["corporation"]).id + if "corporation" in data + else None + ) + + flag_submission_type = ( + data["flag_submission_type"] + if "flag_submission_type" in data + else "CLASSIC" + ) + if flag_submission_type not in ["CLASSIC", "TOKEN"]: + new_box.flag_submission_type = "CLASSIC" + else: + new_box.flag_submission_type = flag_submission_type + + new_box.game_level_id = GameLevel.by_number(0).id + + dbsession.add(new_box) + dbsession.commit() + data = { + "data": {"box": new_box.to_dict()}, + "message": "This box has been created", + } + self.write(json.dumps(data)) + + @apikey + @restrict_ip_address + def delete(self, id: str): + raise NotImplementedError() + + @apikey + @restrict_ip_address + def put(self, *args, **kwargs): + raise NotImplementedError() + + def check_xsrf_cookie(self): + pass diff --git a/handlers/ApiV2/CorporationApi.py b/handlers/ApiV2/CorporationApi.py new file mode 100644 index 00000000..0d619ecf --- /dev/null +++ b/handlers/ApiV2/CorporationApi.py @@ -0,0 +1,90 @@ +import json +from ..BaseHandlers import BaseHandler +from libs.SecurityDecorators import apikey, restrict_ip_address +from models.Corporation import Corporation +import logging +from models import Team, dbsession + +logger = logging.getLogger() + + +class CorporationApiHandler(BaseHandler): + + @apikey + @restrict_ip_address + def get(self, id: str = None): + if id is None or id == "": + data = { + "data": [corporation.to_dict() for corporation in Corporation.all()] + } + else: + corporation = Corporation.by_id(id) + if corporation is not None: + data = {"data": corporation.to_dict()} + else: + data = {"message": "Corporation not found"} + self.write(json.dumps(data)) + + @apikey + @restrict_ip_address + def post(self, *args, **kwargs): + data = json.loads(self.request.body) + logger.info(f"Post data : {data}") + + if Corporation.by_name(data["name"]) is not None: + data = { + "data": {"corporation": data["name"]}, + "message": "This corporation already exists", + } + self.write(json.dumps(data)) + return + + new_corporation = Corporation() + new_corporation.name = data["name"] + new_corporation.locked = data["locked"] if "locked" in data else False + new_corporation.description = ( + data["description"] if "description" in data else "" + ) + + dbsession.add(new_corporation) + dbsession.commit() + data = { + "data": { + "corporation": new_corporation.to_dict(), + }, + "message": "This corporation has been created", + } + self.write(json.dumps(data)) + + @apikey + @restrict_ip_address + def delete(self, id: str): + corporation = Corporation.by_id(id) + if corporation is not None: + dbsession.delete(corporation) + dbsession.commit() + self.write( + json.dumps( + { + "data": {"corporation": corporation.to_dict()}, + "message": "Corporation deleted", + } + ) + ) + else: + self.write( + json.dumps( + { + "data": {"corporation": None}, + "message": "Corporation not found", + } + ) + ) + + @apikey + @restrict_ip_address + def put(self, *args, **kwargs): + raise NotImplementedError() + + def check_xsrf_cookie(self): + pass diff --git a/handlers/ApiV2/FlagApi.py b/handlers/ApiV2/FlagApi.py new file mode 100644 index 00000000..98ef5e5b --- /dev/null +++ b/handlers/ApiV2/FlagApi.py @@ -0,0 +1,86 @@ +import json +from ..BaseHandlers import BaseHandler +from libs.SecurityDecorators import apikey, restrict_ip_address +from models.Box import Box +from models.Flag import Flag +import logging +from models import dbsession + + +logger = logging.getLogger() + + +class FlagApiHandler(BaseHandler): + + @apikey + @restrict_ip_address + def get(self, id=None): + if id is None or id == "": + data = {"data": [flag.to_dict() for flag in Flag.all()]} + else: + flag = Flag.by_id(id) + if flag is not None: + data = {"data": flag.to_dict()} + else: + data = {"message": "Flag not found"} + self.write(json.dumps(data)) + + @apikey + @restrict_ip_address + def post(self, *args, **kwargs): + data = json.loads(self.request.body) + logger.info(f"Post data : {data}") + + if "box" not in data: + data = { + "data": {"box": None}, + "message": "Box is required", + } + self.write(json.dumps(data)) + return + + if Box.by_name(data["box"]) is None: + data = { + "data": {"box": data["box"]}, + "message": "This box does not exist", + } + self.write(json.dumps(data)) + return + box = Box.by_name(data["box"]) + + if Flag.by_token_and_box_id(data["token"], box.id) is not None: + data = { + "data": { + "flag": data["token"], + "box": data["box"], + }, + "message": "This flag already exists", + } + self.write(json.dumps(data)) + return + + new_flag = Flag() + new_flag.name = data["name"] + new_flag.token = data["token"] + new_flag.value = int(data["value"]) if "value" in data else 1 + new_flag.box_id = box.id + new_flag.type = "static" + new_flag.description = data["description"] if "description" in data else "" + + dbsession.add(new_flag) + dbsession.commit() + data = {"data": new_flag.to_dict(), "message": "This flag has been created"} + self.write(json.dumps(data)) + + @apikey + @restrict_ip_address + def delete(self, id: str): + raise NotImplementedError() + + @apikey + @restrict_ip_address + def put(self, *args, **kwargs): + raise NotImplementedError() + + def check_xsrf_cookie(self): + pass diff --git a/handlers/ApiV2/TeamApi.py b/handlers/ApiV2/TeamApi.py new file mode 100644 index 00000000..54201d65 --- /dev/null +++ b/handlers/ApiV2/TeamApi.py @@ -0,0 +1,43 @@ +import json +from ..BaseHandlers import BaseHandler +from libs.SecurityDecorators import apikey, restrict_ip_address +from models.Corporation import Corporation +import logging +from models import Team, dbsession + +logger = logging.getLogger() + + +class TeamApiHandler(BaseHandler): + + @apikey + @restrict_ip_address + def get(self, id: str = None): + with_flags = self.get_argument("with_flags", "false").lower() == "true" + if id is None or id == "": + data = {"data": [team.to_dict(with_flags) for team in Team.all()]} + else: + team = Team.by_uuid(id) + if team is not None: + data = {"data": team.to_dict(with_flags)} + else: + data = {"message": "Team not found"} + self.write(json.dumps(data)) + + @apikey + @restrict_ip_address + def post(self, *args, **kwargs): + raise NotImplementedError() + + @apikey + @restrict_ip_address + def delete(self, id: str): + raise NotImplementedError() + + @apikey + @restrict_ip_address + def put(self, *args, **kwargs): + raise NotImplementedError() + + def check_xsrf_cookie(self): + pass diff --git a/handlers/ApiV2/__init__.py b/handlers/ApiV2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/handlers/__init__.py b/handlers/__init__.py index dec46de6..b3bd619c 100644 --- a/handlers/__init__.py +++ b/handlers/__init__.py @@ -56,6 +56,7 @@ from handlers.StaticFileHandler import StaticFileHandler from handlers.UpgradeHandlers import * from handlers.UserHandlers import * +from handlers.ApiV2 import FlagApi, TeamApi, CorporationApi, BoxApi from libs.ConsoleColors import * from libs.DatabaseConnection import DatabaseConnection from libs.Scoreboard import Scoreboard, score_bots @@ -195,6 +196,10 @@ def get_cookie_secret(): (r"/admin/resetdelete", AdminResetDeleteHandler), # API handlers - APIHandlers.py (r"/api/actions", APIActionHandler), + (r"/api/v2/corporation/?(.*)", CorporationApi.CorporationApiHandler), + (r"/api/v2/box/?(.*)", BoxApi.BoxApiHandler), + (r"/api/v2/flag/?(.*)", FlagApi.FlagApiHandler), + (r"/api/v2/team/?(.*)", TeamApi.TeamApiHandler), # Error handlers - ErrorHandlers.py (r"/403", UnauthorizedHandler), (r"/gamestatus", StopHandler), diff --git a/models/Corporation.py b/models/Corporation.py index 5e402dbf..0a5888ee 100644 --- a/models/Corporation.py +++ b/models/Corporation.py @@ -119,6 +119,8 @@ def to_dict(self): "uuid": self.uuid, "name": self.name, "description": self.description, + "id": self.id, + "locked": self.locked, # "boxes": [box.uuid for box in self.boxes], } diff --git a/models/Team.py b/models/Team.py index 9929d785..27ebe8ac 100644 --- a/models/Team.py +++ b/models/Team.py @@ -292,7 +292,7 @@ def file_by_file_name(self, file_name): ls = self.files.filter_by(file_name=file_name) return ls[0] if 0 < len(ls) else None - def to_dict(self): + def to_dict(self, with_flags: bool = False): """Use for JSON related tasks; return public data only""" return { "uuid": self.uuid, @@ -301,6 +301,13 @@ def to_dict(self): "money": self.money, "avatar": self.avatar, "notes": self.notes, + "score": { + "money": self.get_score("money"), + "flags": self.get_score("flag"), + "hints": self.get_score("hint"), + "bots": self.get_score("bot"), + }, + "flags": [flag.to_dict() for flag in self.flags] if with_flags else [], } def to_xml(self, parent): diff --git a/tests/testHandlersApiV2.py b/tests/testHandlersApiV2.py new file mode 100644 index 00000000..9d8c7ad5 --- /dev/null +++ b/tests/testHandlersApiV2.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +""" +Unit tests for everything in handlers/ +""" +import requests +import unittest +import string +import random + +URL = "https://rtb.docker.localhost" # The RTB URL +APIKEY = "TheApiKey" # The Api Key + + +class TestApiV2(unittest.TestCase): + + url = None + + @classmethod + def setUpClass(cls): + print("setUpClass") + cls.url = URL + cls.headers = {"Content-Type": "application/json", "apikey": APIKEY} + corps = cls.get_all_corporations(cls) + corps = [ + x + for x in corps.json()["corporations"] + if "testUT" in x["name"] or "TU_" in x["name"] + ] + for x in corps: + requests.delete( + f"{cls.url}/api/v2/corporation/{x['id']}", + headers=cls.headers, + verify=False, + ) + + def test_1_create_corp(self): + data = {"name": "testUT", "description": "Nothing as test", "locked": False} + rsp = requests.post( + f"{self.url}/api/v2/corporation", + json=data, + headers=self.headers, + verify=False, + ) + self.assertEqual(rsp.status_code, 200) + json_d = rsp.json() + self.assertEqual(json_d["corporation"]["name"], "testUT") + self.assertEqual(json_d["corporation"]["locked"], False) + self.assertEqual(json_d["corporation"]["description"], "Nothing as test") + + def test_2_get_corporation(self): + all = self.get_all_corporations() + corp = [x for x in all.json()["corporations"] if "testUT" in x["name"]] + self.assertEqual(len(corp), 1) + res = requests.get( + f"{self.url}/api/v2/corporation/{corp[0]['id']}", + headers=self.headers, + verify=False, + ) + self.assertEqual(res.status_code, 200) + json_d = res.json() + self.assertEqual(json_d["corporation"]["name"], "testUT") + self.assertEqual(json_d["corporation"]["locked"], False) + self.assertEqual(json_d["corporation"]["description"], "Nothing as test") + + def test_3_delete_corporation(self): + all = self.get_all_corporations() + corp = [x for x in all.json()["corporations"] if "testUT" in x["name"]] + res = requests.delete( + f"{self.url}/api/v2/corporation/{corp[0]['id']}", + headers=self.headers, + verify=False, + ) + self.assertEqual(res.status_code, 200) + + def test_4_complex(self): + self.test_1_create_corp() + + self.get_all_corporations() + data = { + "name": "testUTBox", + "description": "Nothing as a box test", + "corporation": "testUT", + } + rsp_box = requests.post( + f"{self.url}/api/v2/box", json=data, headers=self.headers, verify=False + ) + self.assertEqual(rsp_box.status_code, 200) + json_box = rsp_box.json() + + self.assertEqual(json_box["box"]["name"], "testUTBox") + self.assertEqual(json_box["box"]["description"], "Nothing as a box test") + + data = { + "box": "testUTBox", + "name": "testUTFlag", + "value": 10, + "token": "testUTToken", + "description": "Nothing as a flag test", + } + rsp_flag = requests.post( + f"{self.url}/api/v2/flag", json=data, headers=self.headers, verify=False + ) + self.assertEqual(rsp_flag.status_code, 200) + json_flag = rsp_flag.json() + + self.assertEqual(json_flag["flag"]["name"], "testUTFlag") + self.assertEqual(json_flag["flag"]["token"], "testUTToken") + self.assertEqual(json_flag["flag"]["description"], "Nothing as a flag test") + + self.assertEqual(json_flag["flag"]["value"], 10) + + data = { + "box": "testUTBox", + "name": "testUTFlag2", + "value": 20, + "token": "testUTToken2", + "description": "Nothing as a flag2 test", + } + rsp_flag = requests.post( + f"{self.url}/api/v2/flag", json=data, headers=self.headers, verify=False + ) + self.assertEqual(rsp_flag.status_code, 200) + json_flag = rsp_flag.json() + + self.assertEqual(json_flag["flag"]["name"], "testUTFlag2") + self.assertEqual(json_flag["flag"]["token"], "testUTToken2") + self.assertEqual(json_flag["flag"]["description"], "Nothing as a flag2 test") + self.assertEqual(json_flag["flag"]["value"], 20) + + def get_all_corporations(self): + corps = requests.get( + f"{self.url}/api/v2/corporation", headers=self.headers, verify=False + ) + return corps + + def get_all_boxes(self): + boxes = requests.get( + f"{self.url}/api/v2/box", headers=self.headers, verify=False + ) + return boxes + + def get_all_flags(self): + flags = requests.get( + f"{self.url}/api/v2/flag", headers=self.headers, verify=False + ) + return flags + + def test_5_complex(self): + + corporations = { + "TU_007": { + "description": "Corporation 007", + "boxes": { + "DECOUVERTE": "Box DECOUVERTE", + "ZONE_EDF": "Box ZONE_EDF", + "ZONE_THALES": "Box ZONE_THALES", + "ZONE_LES_ECHOS": "Box ZONE_LES_ECHOS", + "ZONE_FREE": "Box ZONE_FREE", + }, + }, + "TU_CORP_12": { + "description": "Corp 12", + "boxes": { + "DECOUVERTE": "Box DECOUVERTE", + "ZONE_TOTAL": "Box ZONE_TOTAL", + "ZONE_MBDA": "Box ZONE_MBDA", + "ZONE_LE_MONDE": "Box ZONE_LE_MONDE", + "ZONE_SFR": "Box ZONE_SFR", + }, + }, + "TU_OTHER_CORP": { + "description": "Other corp", + "boxes": { + "DECOUVERTE": "Box DECOUVERTE", + "ZONE_WEB": "Box ZONE_WEB", + "ZONE_PHYSIQUE": "Box ZONE_PHYSIQUE", + }, + }, + } + + letters = string.ascii_lowercase + + for k, v in corporations.items(): + self.create_corp(k, v["description"], False) + + for k_box, v_box in v["boxes"].items(): + self.create_box(k_box, v_box, k) + for i in range(random.randint(1, 10)): + self.create_flag( + f"flag_{''.join(random.choices(letters, k=5))}", + "Une fake description", + random.randint(1, 100), + f"CHE-{''.join(random.choices(letters, k=15))}", + k_box, + ) + + def create_corp(self, name, description, locked): + data = {"name": name, "description": description, "locked": locked} + rsp = requests.post( + f"{self.url}/api/v2/corporation", + json=data, + headers=self.headers, + verify=False, + ) + return rsp + + def create_box(self, name, description, corporation): + data = { + "name": name, + "description": description, + "corporation": corporation, + } + rsp = requests.post( + f"{self.url}/api/v2/box", json=data, headers=self.headers, verify=False + ) + + return rsp + + def create_flag(self, name, description, value, token, box): + data = { + "name": name, + "description": description, + "value": value, + "token": token, + "box": box, + } + rsp = requests.post( + f"{self.url}/api/v2/flag", + json=data, + headers=self.headers, + verify=False, + ) + return rsp