Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ OPENAI_API_KEY=""
KAAPI_GUARDRAILS_AUTH=""
KAAPI_GUARDRAILS_URL=""

OTEL_ENABLED=true
OTEL_SERVICE_NAME=kaapi-backend

SMTP_HOST=
SMTP_PORT=
SMTP_TLS=True
Expand Down
23 changes: 21 additions & 2 deletions backend/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
from opentelemetry import trace
from pydantic import ValidationError
from sqlmodel import Session

Expand Down Expand Up @@ -42,6 +43,19 @@ def get_db() -> Generator[Session, None, None]:
TokenDep = Annotated[str, Depends(reusable_oauth2)]


def _set_tenant_span_attributes(auth_context: AuthContext) -> None:
"""Tag the active OTel span with tenant context so traces in Sentry can be
filtered by user / org / project IDs."""
span = trace.get_current_span()
if not span.is_recording():
return
span.set_attribute("user.id", str(auth_context.user.id))
if auth_context.organization:
span.set_attribute("tenant.org_id", auth_context.organization.id)
if auth_context.project:
span.set_attribute("tenant.project_id", auth_context.project.id)


def _authenticate_with_jwt(session: Session, token: str) -> AuthContext:
"""Validate a JWT token and return the authenticated user context."""
try:
Expand Down Expand Up @@ -147,16 +161,21 @@ def get_auth_context(
if not auth_context.project.is_active:
raise HTTPException(status_code=403, detail="Inactive Project")

_set_tenant_span_attributes(auth_context)
return auth_context

# 2. Try Authorization: Bearer <token> header
if token:
return _authenticate_with_jwt(session, token)
auth_context = _authenticate_with_jwt(session, token)
_set_tenant_span_attributes(auth_context)
return auth_context

# 3. Try access_token cookie
cookie_token = request.cookies.get("access_token")
if cookie_token:
return _authenticate_with_jwt(session, cookie_token)
auth_context = _authenticate_with_jwt(session, cookie_token)
_set_tenant_span_attributes(auth_context)
return auth_context

raise HTTPException(status_code=401, detail="Invalid Authorization format")

Expand Down
149 changes: 84 additions & 65 deletions backend/app/api/routes/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from app.api.deps import SessionDep, AuthContextDep
from app.api.permissions import Permission, require_permission
from app.core.telemetry import log_context
from app.crud import (
CollectionCrud,
CollectionJobCrud,
Expand Down Expand Up @@ -89,51 +90,60 @@ def create_collection(
current_user: AuthContextDep,
request: CreationRequest,
):
if request.callback_url:
validate_callback_url(str(request.callback_url))

if request.name:
ensure_unique_name(session, current_user.project_.id, request.name)
with log_context(
tag="collection",
system="collection",
lifecycle="api.collection.create",
action="create",
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
):
if request.callback_url:
validate_callback_url(str(request.callback_url))

if request.name:
ensure_unique_name(session, current_user.project_.id, request.name)

unique_documents = list(dict.fromkeys(request.documents))

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
)
)

unique_documents = list(dict.fromkeys(request.documents))
# True if both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.CREATE,
create_service.start_job(
db=session,
request=request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
docs_num=len(unique_documents),
documents=[str(doc_id) for doc_id in unique_documents],
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)
)

# True iff both model and instructions were provided in the request body
with_assistant = bool(
getattr(request, "model", None) and getattr(request, "instructions", None)
)

create_service.start_job(
db=session,
request=request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
with_assistant=with_assistant,
)

metadata = None
if not with_assistant:
metadata = {
"note": (
"This job will create a vector store only (no Assistant). "
"Assistant creation happens when both 'model' and 'instructions' are included."
)
}

return APIResponse.success_response(
CollectionJobImmediatePublic.model_validate(collection_job), metadata=metadata
)
metadata = None
if not with_assistant:
metadata = {
"note": (
"This job will create a vector store only (no Assistant). "
"Assistant creation happens when both 'model' and 'instructions' are included."
)
}

return APIResponse.success_response(
CollectionJobImmediatePublic.model_validate(collection_job),
metadata=metadata,
)


@router.delete(
Expand All @@ -149,37 +159,46 @@ def delete_collection(
collection_id: UUID = FastPath(description="Collection to delete"),
request: CallbackRequest | None = Body(default=None),
):
if request and request.callback_url:
validate_callback_url(str(request.callback_url))

_ = CollectionCrud(session, current_user.project_.id).read_one(collection_id)

deletion_request = DeletionRequest(
with log_context(
tag="collection",
system="collection",
lifecycle="api.collection.delete",
action="delete",
collection_id=collection_id,
callback_url=request.callback_url if request else None,
)
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
):
if request and request.callback_url:
validate_callback_url(str(request.callback_url))

collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.DELETE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
_ = CollectionCrud(session, current_user.project_.id).read_one(collection_id)

deletion_request = DeletionRequest(
collection_id=collection_id,
callback_url=request.callback_url if request else None,
)
)

delete_service.start_job(
db=session,
request=deletion_request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
)
collection_job_crud = CollectionJobCrud(session, current_user.project_.id)
collection_job = collection_job_crud.create(
CollectionJobCreate(
action_type=CollectionActionType.DELETE,
project_id=current_user.project_.id,
status=CollectionJobStatus.PENDING,
collection_id=collection_id,
)
)

delete_service.start_job(
db=session,
request=deletion_request,
collection_job_id=collection_job.id,
project_id=current_user.project_.id,
organization_id=current_user.organization_.id,
)

return APIResponse.success_response(
CollectionJobImmediatePublic.model_validate(collection_job)
)
return APIResponse.success_response(
CollectionJobImmediatePublic.model_validate(collection_job)
)


@router.get(
Expand Down
Loading