Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All **user-facing**, notable changes will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.18.0] - 2026-01-19
### Added
- `auxiliary_endpoints_v2` - new syntax for defining auxiliary endpoints with more options
### Changed
- now requires Python version 3.10 or higher

## [1.17.0] - 2024-11-05
### Added
- Memory profiler (memray) can be enabled and managed at runtime by endpoints.
Expand Down
24 changes: 24 additions & 0 deletions docs/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,30 @@ def explain(self, x: float, y: float) -> dict[str, float]:
return {'x_importance': x / result, 'y_importance': y / result}
```

If you need more control over those endpoints, you can define `auxiliary_endpoints_v2` method instead.
At this moment this new syntax is incompatible with original `auxiliary_endpoints` - you can use only one
or the other in the same job.
`auxiliary_endpoints_v2` returns list of `EndpointConfig`s that allows choosing between defining endpoints
as POST and GET as well as full control over where arguments are coming from(body, query, path) and can
pass additional arguments to FastAPI to handle additional use cases such as tagging endpoints.
```python
def auxiliary_endpoints_v2(self) -> List[EndpointConfig]:
"""
Dict of custom endpoint paths (besides "/perform") handled by Entrypoint methods.
EndpointConfig consists of a path to the endpoint, http method(POST or GET only),
handler function for the endpoint and optional parameters dict that is passed to FastAPI.
"""
return [
EndpointConfig('/multiply/{path}', HTTPMethod.POST, self.multiply, other_options=dict(tags=["items"])),
]

def multiply(self, body: Annotated[float, Body(examples=[1.2])], query: Annotated[float, Query(example=2.4)], path: Annotated[float, Path(example=234.21)]) -> float:
"""
Standard FastAPI methods of documenting and configuring parameters between body, query and path work for auxiliary endpoints.
"""
return body * query * path
```

If you want to define example data for your auxiliary endpoints,
you can implement `docs_input_examples` method returning
mapping of Job's endpoints to corresponding exemplary inputs.
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "racetrack_job_runner"
version = "1.17.0" # should be in sync with src/racetrack_job_wrapper/__init__.py
version = "1.18.0" # should be in sync with src/racetrack_job_wrapper/__init__.py
description = "Racetrack Job Runner"
license = {text = "Apache License 2.0"}
authors = [
Expand All @@ -12,7 +12,7 @@ classifiers = [
"License :: OSI Approved :: Apache Software License",
]
readme = "README.md"
requires-python = ">=3.8"
requires-python = ">=3.10"
dynamic = ["dependencies"]

[project.urls]
Expand Down
50 changes: 50 additions & 0 deletions sample/python-auxiliary-endpoints-v2/entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from http import HTTPMethod
import random
from typing import Annotated, Dict, List

from fastapi import Body, Path, Query

from racetrack_job_wrapper.endpoint_config import EndpointConfig


class Job:
def perform(self, x: float, y: float) -> float:
"""
Add numbers.
:param x: First element to add.
:param y: Second element to add.
:return: Sum of the numbers.
"""
return x + y

def auxiliary_endpoints_v2(self) -> List[EndpointConfig]:
"""
Dict of custom endpoint paths (besides "/perform") handled by Entrypoint methods.
EndpointConfig consists of a path to the endpoint, http method(POST or GET only),
handler function for the endpoint and optional parameters dict that is passed to FastAPI.
"""
return [
EndpointConfig('/multiply/{path}', HTTPMethod.POST, self.multiply, other_options=dict(tags=["items"])),
EndpointConfig('/random', HTTPMethod.GET, self.random, other_options=dict(tags=["items"])),
]

def multiply(self, body: Annotated[float, Body(examples=[1.2])], query: Annotated[float, Query(example=2.4)], path: Annotated[float, Path(example=234.21)]) -> float:
"""
Standard FastAPI methods of documenting and configuring parameters between body, query and path work for auxiliary endpoints.
"""
return body * query * path

def random(self, start: float, end: float) -> float:
"""Return random number within a range"""
return random.uniform(start, end)

def docs_input_examples(self) -> Dict[str, Dict]:
"""Return mapping of Job's endpoints to corresponding exemplary inputs."""
return {
'/perform': {
'x': 40,
'y': 2,
}
}


2 changes: 1 addition & 1 deletion src/racetrack_job_wrapper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
name = 'racetrack_job_wrapper'
__version__ = "1.17.0" # should be in sync with pyproject.toml
__version__ = "1.18.0" # should be in sync with pyproject.toml
20 changes: 20 additions & 0 deletions src/racetrack_job_wrapper/endpoint_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dataclasses import dataclass, field
from http import HTTPMethod
from typing import Any, Callable, Dict


@dataclass
class EndpointConfig:
"""Definition of a single HTTP endpoint.

Attributes:
path: HTTP path at which the endpoint is registered.
method: HTTP verb used to bind the handler - only GET and POST are supported at moment.
handler: Callable invoked when the endpoint is hit.
other_options: Additional FastAPI route options.
"""

path: str
method: HTTPMethod
handler: Callable
other_options: Dict[str, Any] = field(default_factory=dict)
7 changes: 7 additions & 0 deletions src/racetrack_job_wrapper/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from inspect import getfullargspec
from typing import Dict, List, Optional, Tuple, Iterable, Any, Callable, Mapping, Union

from racetrack_job_wrapper.endpoint_config import EndpointConfig

WsgiApplication = Callable[
[
Mapping[str, object], # environ
Expand Down Expand Up @@ -140,6 +142,11 @@ def list_auxiliary_endpoints(entrypoint: JobEntrypoint) -> Dict[str, Callable]:
return {}
return getattr(entrypoint, 'auxiliary_endpoints')()

def list_auxiliary_endpoints_v2(entrypoint: JobEntrypoint) -> List[EndpointConfig]:
if not hasattr(entrypoint, 'auxiliary_endpoints_v2'):
return []
return getattr(entrypoint, 'auxiliary_endpoints_v2')()


def list_static_endpoints(entrypoint: JobEntrypoint) -> Dict[str, Union[Tuple, str]]:
if not hasattr(entrypoint, 'static_endpoints'):
Expand Down
82 changes: 79 additions & 3 deletions src/racetrack_job_wrapper/wrapper_api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import functools
from http import HTTPMethod
import inspect
import mimetypes
import os
Expand All @@ -6,12 +8,13 @@

import time
from pathlib import Path
from typing import Any, Callable, Dict, Tuple, Union, Optional
from typing import Annotated, Any, Callable, Dict, List, Tuple, Union, Optional
from contextvars import ContextVar

from fastapi import Body, FastAPI, APIRouter, Request, Response, HTTPException
from fastapi import Body, FastAPI, APIRouter, Query, Request, Response, HTTPException
from fastapi.responses import RedirectResponse

from racetrack_job_wrapper.endpoint_config import EndpointConfig
from racetrack_job_wrapper.profiler import MemoryProfiler
from racetrack_job_wrapper.webview import setup_webview_endpoints
from racetrack_job_wrapper.concurrency import AtomicInteger
Expand All @@ -20,6 +23,7 @@
JobEntrypoint,
list_entrypoint_parameters,
list_auxiliary_endpoints,
list_auxiliary_endpoints_v2,
list_static_endpoints,
)
from racetrack_job_wrapper.health import setup_health_endpoints, HealthState
Expand Down Expand Up @@ -127,7 +131,12 @@ def _setup_api_endpoints(
options: EndpointOptions,
):
_setup_perform_endpoint(options)
_setup_auxiliary_endpoints(options)

if hasattr(entrypoint, 'auxiliary_endpoints'):
_setup_auxiliary_endpoints(options)
else:
_setup_auxiliary_endpoints_v2(options)

_setup_static_endpoints(api, entrypoint)
if MemoryProfiler.is_enabled():
_setup_profiler_endpoints(api)
Expand Down Expand Up @@ -164,6 +173,7 @@ def _get_parameters():
def _setup_auxiliary_endpoints(options: EndpointOptions):
"""Configure custom auxiliary endpoints defined by user in an entypoint"""
auxiliary_endpoints = list_auxiliary_endpoints(options.entrypoint)

for endpoint_path in sorted(auxiliary_endpoints.keys()):

endpoint_method: Callable = auxiliary_endpoints[endpoint_path]
Expand Down Expand Up @@ -192,6 +202,72 @@ def _auxiliary_endpoint(payload: Dict[str, Any] = Body(default=example_input)) -
_add_endpoint(endpoint_path, endpoint_method)
logger.info(f'configured auxiliary endpoint: {endpoint_path}')

def _setup_auxiliary_endpoints_v2(options: EndpointOptions):
"""Configure custom auxiliary endpoints defined by user in an entypoint"""
auxiliary_endpoints: List[EndpointConfig] = list_auxiliary_endpoints_v2(options.entrypoint)

for endpoint_config in auxiliary_endpoints:
endpoint_path = endpoint_config.path
endpoint_name = endpoint_path.replace('/', '_')
if not endpoint_path.startswith('/'):
endpoint_path = '/' + endpoint_path

# keep these variables inside closure as next loop cycle will overwrite it
def _add_endpoint(_endpoint_path: str, _endpoint_handler: Callable, _endpoint_method: HTTPMethod, _other_options: Dict[str, Any]):
summary = f"Call auxiliary endpoint: {_endpoint_path}"
description = "Call auxiliary endpoint"
endpoint_docs = inspect.getdoc(_endpoint_handler)
if endpoint_docs:
description = f"Call auxiliary endpoint: {endpoint_docs}"

def forwarder(func):
@functools.wraps(func)
def forward(*args, **kwargs):
metric_requests_started.inc()
metric_endpoint_requests_started.labels(endpoint=endpoint_path).inc()
start_time = time.time()
try:
def _endpoint_caller() -> Any:
return func(*args, **kwargs)

result = options.concurrency_runner(_endpoint_caller)
return to_json_serializable(result)

except TypeError as e:
metric_request_internal_errors.labels(endpoint=endpoint_path).inc()
raise ValueError(f'failed to call a function: {e}')
except BaseException as e:
metric_request_internal_errors.labels(endpoint=endpoint_path).inc()
raise e
finally:
metric_request_duration.labels(endpoint=endpoint_path).observe(time.time() - start_time)
metric_requests_done.inc()
metric_last_call_timestamp.set(time.time())

return forward

match _endpoint_method:
case HTTPMethod.POST:
options.api.post(
_endpoint_path,
operation_id=f'auxiliary_endpoint_{endpoint_name}',
summary=summary,
description=description,
**_other_options,
)(forwarder(_endpoint_handler))
case HTTPMethod.GET:
options.api.get(
_endpoint_path,
operation_id=f'auxiliary_endpoint_{endpoint_name}',
summary=summary,
description=description,
**_other_options,
)(forwarder(_endpoint_handler))
case _:
logger.error(f"method {_endpoint_method} chosen for path {_endpoint_path} is not supported")

_add_endpoint(endpoint_path, endpoint_config.handler, endpoint_config.method, endpoint_config.other_options)
logger.info(f'configured auxiliary endpoint: {endpoint_path}')

def _call_job_endpoint(
endpoint_method: Callable,
Expand Down
78 changes: 78 additions & 0 deletions tests/wrap/test_auxiliary_endpoints_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from http import HTTPMethod
from typing import Annotated, Callable, Dict, List

from fastapi import Body
from racetrack_job_wrapper.endpoint_config import EndpointConfig
from racetrack_job_wrapper.wrapper_api import create_api_app
from racetrack_job_wrapper.health import HealthState
from fastapi.testclient import TestClient


def test_auxiliary_endpoints_v2():
class TestEntrypoint:
def perform(self, x: float, y: float) -> float:
"""
Add numbers.
:param x: First element to add.
:param y: Second element to add.
:return: Sum of the numbers.
"""
return x + y

def auxiliary_endpoints_v2(self) -> List[EndpointConfig]:
return [
EndpointConfig('/explain', HTTPMethod.POST, self.explain),
EndpointConfig('/random', HTTPMethod.GET, self.random),
]

def explain(self, x: Annotated[float, Body()], y: Annotated[float, Body()]) -> Dict[str, float]:
"""
Explain feature importance of a model result.
:param x: First element to add.
:param y: Second element to add.
:return: Dict of feature importance.
"""
result = self.perform(x, y)
return {'x_importance': x / result, 'y_importance': y / result}

def random(self) -> float:
"""Return random number"""
return 4 # chosen by fair dice roll

def docs_input_examples(self) -> Dict[str, Dict]:
return {
'/perform': {
'x': 40,
'y': 2,
},
'/explain': {
'x': 1,
'y': 2,
},
'/random': {},
}

entrypoint = TestEntrypoint()
fastapi_app = create_api_app(entrypoint, HealthState(live=True, ready=True))

client = TestClient(fastapi_app)

response = client.post(
"/api/v1/perform",
json={"x": 40, "y": 2},
)
assert response.status_code == 200
assert response.json() == 42

response = client.post(
"/api/v1/explain",
json={"x": 2, "y": 8},
)
assert response.status_code == 200
assert response.json() == {'x_importance': 0.2, 'y_importance': 0.8}

response = client.get(
"/api/v1/random",
)
assert response.status_code == 200
assert response.json() == 4