diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3df9535..9795346 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -4,6 +4,12 @@ All **user-facing**, notable changes will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.18.0] - 2026-01-19 +### Added +- `auxiliary_endpoints_v2` - new syntax for defining auxiliary endpoints with more options +### Changed +- now requires Python version 3.10 or higher + ## [1.17.0] - 2024-11-05 ### Added - Memory profiler (memray) can be enabled and managed at runtime by endpoints. diff --git a/docs/user_guide.md b/docs/user_guide.md index 5fa4a2d..f18ea28 100644 --- a/docs/user_guide.md +++ b/docs/user_guide.md @@ -145,6 +145,30 @@ def explain(self, x: float, y: float) -> dict[str, float]: return {'x_importance': x / result, 'y_importance': y / result} ``` +If you need more control over those endpoints, you can define `auxiliary_endpoints_v2` method instead. +At this moment this new syntax is incompatible with original `auxiliary_endpoints` - you can use only one +or the other in the same job. +`auxiliary_endpoints_v2` returns list of `EndpointConfig`s that allows choosing between defining endpoints +as POST and GET as well as full control over where arguments are coming from(body, query, path) and can +pass additional arguments to FastAPI to handle additional use cases such as tagging endpoints. +```python +def auxiliary_endpoints_v2(self) -> List[EndpointConfig]: + """ + Dict of custom endpoint paths (besides "/perform") handled by Entrypoint methods. + EndpointConfig consists of a path to the endpoint, http method(POST or GET only), + handler function for the endpoint and optional parameters dict that is passed to FastAPI. + """ + return [ + EndpointConfig('/multiply/{path}', HTTPMethod.POST, self.multiply, other_options=dict(tags=["items"])), + ] + + def multiply(self, body: Annotated[float, Body(examples=[1.2])], query: Annotated[float, Query(example=2.4)], path: Annotated[float, Path(example=234.21)]) -> float: + """ + Standard FastAPI methods of documenting and configuring parameters between body, query and path work for auxiliary endpoints. + """ + return body * query * path +``` + If you want to define example data for your auxiliary endpoints, you can implement `docs_input_examples` method returning mapping of Job's endpoints to corresponding exemplary inputs. diff --git a/pyproject.toml b/pyproject.toml index 2b1b6dc..1dbd2f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "racetrack_job_runner" -version = "1.17.0" # should be in sync with src/racetrack_job_wrapper/__init__.py +version = "1.18.0" # should be in sync with src/racetrack_job_wrapper/__init__.py description = "Racetrack Job Runner" license = {text = "Apache License 2.0"} authors = [ @@ -12,7 +12,7 @@ classifiers = [ "License :: OSI Approved :: Apache Software License", ] readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.10" dynamic = ["dependencies"] [project.urls] diff --git a/sample/python-auxiliary-endpoints-v2/entrypoint.py b/sample/python-auxiliary-endpoints-v2/entrypoint.py new file mode 100644 index 0000000..ec15ef5 --- /dev/null +++ b/sample/python-auxiliary-endpoints-v2/entrypoint.py @@ -0,0 +1,50 @@ +from http import HTTPMethod +import random +from typing import Annotated, Dict, List + +from fastapi import Body, Path, Query + +from racetrack_job_wrapper.endpoint_config import EndpointConfig + + +class Job: + def perform(self, x: float, y: float) -> float: + """ + Add numbers. + :param x: First element to add. + :param y: Second element to add. + :return: Sum of the numbers. + """ + return x + y + + def auxiliary_endpoints_v2(self) -> List[EndpointConfig]: + """ + Dict of custom endpoint paths (besides "/perform") handled by Entrypoint methods. + EndpointConfig consists of a path to the endpoint, http method(POST or GET only), + handler function for the endpoint and optional parameters dict that is passed to FastAPI. + """ + return [ + EndpointConfig('/multiply/{path}', HTTPMethod.POST, self.multiply, other_options=dict(tags=["items"])), + EndpointConfig('/random', HTTPMethod.GET, self.random, other_options=dict(tags=["items"])), + ] + + def multiply(self, body: Annotated[float, Body(examples=[1.2])], query: Annotated[float, Query(example=2.4)], path: Annotated[float, Path(example=234.21)]) -> float: + """ + Standard FastAPI methods of documenting and configuring parameters between body, query and path work for auxiliary endpoints. + """ + return body * query * path + + def random(self, start: float, end: float) -> float: + """Return random number within a range""" + return random.uniform(start, end) + + def docs_input_examples(self) -> Dict[str, Dict]: + """Return mapping of Job's endpoints to corresponding exemplary inputs.""" + return { + '/perform': { + 'x': 40, + 'y': 2, + } + } + + diff --git a/src/racetrack_job_wrapper/__init__.py b/src/racetrack_job_wrapper/__init__.py index 707b74f..9f45204 100644 --- a/src/racetrack_job_wrapper/__init__.py +++ b/src/racetrack_job_wrapper/__init__.py @@ -1,2 +1,2 @@ name = 'racetrack_job_wrapper' -__version__ = "1.17.0" # should be in sync with pyproject.toml +__version__ = "1.18.0" # should be in sync with pyproject.toml diff --git a/src/racetrack_job_wrapper/endpoint_config.py b/src/racetrack_job_wrapper/endpoint_config.py new file mode 100644 index 0000000..024d2ae --- /dev/null +++ b/src/racetrack_job_wrapper/endpoint_config.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass, field +from http import HTTPMethod +from typing import Any, Callable, Dict + + +@dataclass +class EndpointConfig: + """Definition of a single HTTP endpoint. + + Attributes: + path: HTTP path at which the endpoint is registered. + method: HTTP verb used to bind the handler - only GET and POST are supported at moment. + handler: Callable invoked when the endpoint is hit. + other_options: Additional FastAPI route options. + """ + + path: str + method: HTTPMethod + handler: Callable + other_options: Dict[str, Any] = field(default_factory=dict) diff --git a/src/racetrack_job_wrapper/entrypoint.py b/src/racetrack_job_wrapper/entrypoint.py index 89f6627..eecaec5 100644 --- a/src/racetrack_job_wrapper/entrypoint.py +++ b/src/racetrack_job_wrapper/entrypoint.py @@ -2,6 +2,8 @@ from inspect import getfullargspec from typing import Dict, List, Optional, Tuple, Iterable, Any, Callable, Mapping, Union +from racetrack_job_wrapper.endpoint_config import EndpointConfig + WsgiApplication = Callable[ [ Mapping[str, object], # environ @@ -140,6 +142,11 @@ def list_auxiliary_endpoints(entrypoint: JobEntrypoint) -> Dict[str, Callable]: return {} return getattr(entrypoint, 'auxiliary_endpoints')() +def list_auxiliary_endpoints_v2(entrypoint: JobEntrypoint) -> List[EndpointConfig]: + if not hasattr(entrypoint, 'auxiliary_endpoints_v2'): + return [] + return getattr(entrypoint, 'auxiliary_endpoints_v2')() + def list_static_endpoints(entrypoint: JobEntrypoint) -> Dict[str, Union[Tuple, str]]: if not hasattr(entrypoint, 'static_endpoints'): diff --git a/src/racetrack_job_wrapper/wrapper_api.py b/src/racetrack_job_wrapper/wrapper_api.py index fef906e..25b417c 100644 --- a/src/racetrack_job_wrapper/wrapper_api.py +++ b/src/racetrack_job_wrapper/wrapper_api.py @@ -1,3 +1,5 @@ +import functools +from http import HTTPMethod import inspect import mimetypes import os @@ -6,12 +8,13 @@ import time from pathlib import Path -from typing import Any, Callable, Dict, Tuple, Union, Optional +from typing import Annotated, Any, Callable, Dict, List, Tuple, Union, Optional from contextvars import ContextVar -from fastapi import Body, FastAPI, APIRouter, Request, Response, HTTPException +from fastapi import Body, FastAPI, APIRouter, Query, Request, Response, HTTPException from fastapi.responses import RedirectResponse +from racetrack_job_wrapper.endpoint_config import EndpointConfig from racetrack_job_wrapper.profiler import MemoryProfiler from racetrack_job_wrapper.webview import setup_webview_endpoints from racetrack_job_wrapper.concurrency import AtomicInteger @@ -20,6 +23,7 @@ JobEntrypoint, list_entrypoint_parameters, list_auxiliary_endpoints, + list_auxiliary_endpoints_v2, list_static_endpoints, ) from racetrack_job_wrapper.health import setup_health_endpoints, HealthState @@ -127,7 +131,12 @@ def _setup_api_endpoints( options: EndpointOptions, ): _setup_perform_endpoint(options) - _setup_auxiliary_endpoints(options) + + if hasattr(entrypoint, 'auxiliary_endpoints'): + _setup_auxiliary_endpoints(options) + else: + _setup_auxiliary_endpoints_v2(options) + _setup_static_endpoints(api, entrypoint) if MemoryProfiler.is_enabled(): _setup_profiler_endpoints(api) @@ -164,6 +173,7 @@ def _get_parameters(): def _setup_auxiliary_endpoints(options: EndpointOptions): """Configure custom auxiliary endpoints defined by user in an entypoint""" auxiliary_endpoints = list_auxiliary_endpoints(options.entrypoint) + for endpoint_path in sorted(auxiliary_endpoints.keys()): endpoint_method: Callable = auxiliary_endpoints[endpoint_path] @@ -192,6 +202,72 @@ def _auxiliary_endpoint(payload: Dict[str, Any] = Body(default=example_input)) - _add_endpoint(endpoint_path, endpoint_method) logger.info(f'configured auxiliary endpoint: {endpoint_path}') +def _setup_auxiliary_endpoints_v2(options: EndpointOptions): + """Configure custom auxiliary endpoints defined by user in an entypoint""" + auxiliary_endpoints: List[EndpointConfig] = list_auxiliary_endpoints_v2(options.entrypoint) + + for endpoint_config in auxiliary_endpoints: + endpoint_path = endpoint_config.path + endpoint_name = endpoint_path.replace('/', '_') + if not endpoint_path.startswith('/'): + endpoint_path = '/' + endpoint_path + + # keep these variables inside closure as next loop cycle will overwrite it + def _add_endpoint(_endpoint_path: str, _endpoint_handler: Callable, _endpoint_method: HTTPMethod, _other_options: Dict[str, Any]): + summary = f"Call auxiliary endpoint: {_endpoint_path}" + description = "Call auxiliary endpoint" + endpoint_docs = inspect.getdoc(_endpoint_handler) + if endpoint_docs: + description = f"Call auxiliary endpoint: {endpoint_docs}" + + def forwarder(func): + @functools.wraps(func) + def forward(*args, **kwargs): + metric_requests_started.inc() + metric_endpoint_requests_started.labels(endpoint=endpoint_path).inc() + start_time = time.time() + try: + def _endpoint_caller() -> Any: + return func(*args, **kwargs) + + result = options.concurrency_runner(_endpoint_caller) + return to_json_serializable(result) + + except TypeError as e: + metric_request_internal_errors.labels(endpoint=endpoint_path).inc() + raise ValueError(f'failed to call a function: {e}') + except BaseException as e: + metric_request_internal_errors.labels(endpoint=endpoint_path).inc() + raise e + finally: + metric_request_duration.labels(endpoint=endpoint_path).observe(time.time() - start_time) + metric_requests_done.inc() + metric_last_call_timestamp.set(time.time()) + + return forward + + match _endpoint_method: + case HTTPMethod.POST: + options.api.post( + _endpoint_path, + operation_id=f'auxiliary_endpoint_{endpoint_name}', + summary=summary, + description=description, + **_other_options, + )(forwarder(_endpoint_handler)) + case HTTPMethod.GET: + options.api.get( + _endpoint_path, + operation_id=f'auxiliary_endpoint_{endpoint_name}', + summary=summary, + description=description, + **_other_options, + )(forwarder(_endpoint_handler)) + case _: + logger.error(f"method {_endpoint_method} chosen for path {_endpoint_path} is not supported") + + _add_endpoint(endpoint_path, endpoint_config.handler, endpoint_config.method, endpoint_config.other_options) + logger.info(f'configured auxiliary endpoint: {endpoint_path}') def _call_job_endpoint( endpoint_method: Callable, diff --git a/tests/wrap/test_auxiliary_endpoints_v2.py b/tests/wrap/test_auxiliary_endpoints_v2.py new file mode 100644 index 0000000..1237370 --- /dev/null +++ b/tests/wrap/test_auxiliary_endpoints_v2.py @@ -0,0 +1,78 @@ +from http import HTTPMethod +from typing import Annotated, Callable, Dict, List + +from fastapi import Body +from racetrack_job_wrapper.endpoint_config import EndpointConfig +from racetrack_job_wrapper.wrapper_api import create_api_app +from racetrack_job_wrapper.health import HealthState +from fastapi.testclient import TestClient + + +def test_auxiliary_endpoints_v2(): + class TestEntrypoint: + def perform(self, x: float, y: float) -> float: + """ + Add numbers. + :param x: First element to add. + :param y: Second element to add. + :return: Sum of the numbers. + """ + return x + y + + def auxiliary_endpoints_v2(self) -> List[EndpointConfig]: + return [ + EndpointConfig('/explain', HTTPMethod.POST, self.explain), + EndpointConfig('/random', HTTPMethod.GET, self.random), + ] + + def explain(self, x: Annotated[float, Body()], y: Annotated[float, Body()]) -> Dict[str, float]: + """ + Explain feature importance of a model result. + :param x: First element to add. + :param y: Second element to add. + :return: Dict of feature importance. + """ + result = self.perform(x, y) + return {'x_importance': x / result, 'y_importance': y / result} + + def random(self) -> float: + """Return random number""" + return 4 # chosen by fair dice roll + + def docs_input_examples(self) -> Dict[str, Dict]: + return { + '/perform': { + 'x': 40, + 'y': 2, + }, + '/explain': { + 'x': 1, + 'y': 2, + }, + '/random': {}, + } + + entrypoint = TestEntrypoint() + fastapi_app = create_api_app(entrypoint, HealthState(live=True, ready=True)) + + client = TestClient(fastapi_app) + + response = client.post( + "/api/v1/perform", + json={"x": 40, "y": 2}, + ) + assert response.status_code == 200 + assert response.json() == 42 + + response = client.post( + "/api/v1/explain", + json={"x": 2, "y": 8}, + ) + assert response.status_code == 200 + assert response.json() == {'x_importance': 0.2, 'y_importance': 0.8} + + response = client.get( + "/api/v1/random", + ) + assert response.status_code == 200 + assert response.json() == 4