Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions backend/alembic/versions/add_impersonation_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# SPDX-FileCopyrightText: 2025 Weibo, Inc.
#
# SPDX-License-Identifier: Apache-2.0

"""Add impersonation tables

Revision ID: add_impersonation_tables
Revises: add_user_preferences
Create Date: 2025-12-09 00:00:00.000000

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "add_impersonation_tables"
down_revision: Union[str, None] = "add_user_preferences"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# Create impersonation_requests table
op.create_table(
"impersonation_requests",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("admin_user_id", sa.Integer(), nullable=False),
sa.Column("target_user_id", sa.Integer(), nullable=False),
sa.Column("token", sa.String(64), nullable=False),
sa.Column("status", sa.String(20), nullable=False, server_default="pending"),
sa.Column("expires_at", sa.DateTime(), nullable=False),
sa.Column("approved_at", sa.DateTime(), nullable=True),
sa.Column("session_expires_at", sa.DateTime(), nullable=True),
sa.Column(
"created_at", sa.DateTime(), nullable=True, server_default=sa.func.now()
),
sa.Column(
"updated_at",
sa.DateTime(),
nullable=True,
server_default=sa.func.now(),
onupdate=sa.func.now(),
),
sa.ForeignKeyConstraint(
["admin_user_id"],
["users.id"],
name="fk_impersonation_requests_admin_user",
),
sa.ForeignKeyConstraint(
["target_user_id"],
["users.id"],
name="fk_impersonation_requests_target_user",
),
sa.PrimaryKeyConstraint("id"),
mysql_engine="InnoDB",
mysql_charset="utf8mb4",
mysql_collate="utf8mb4_unicode_ci",
)

# Create indexes for impersonation_requests
op.create_index(
"ix_impersonation_requests_id", "impersonation_requests", ["id"], unique=False
)
op.create_index(
"ix_impersonation_requests_admin_user_id",
"impersonation_requests",
["admin_user_id"],
unique=False,
)
op.create_index(
"ix_impersonation_requests_target_user_id",
"impersonation_requests",
["target_user_id"],
unique=False,
)
op.create_index(
"ix_impersonation_requests_token",
"impersonation_requests",
["token"],
unique=True,
)

# Create impersonation_audit_logs table
op.create_table(
"impersonation_audit_logs",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("impersonation_request_id", sa.Integer(), nullable=False),
sa.Column("admin_user_id", sa.Integer(), nullable=False),
sa.Column("target_user_id", sa.Integer(), nullable=False),
sa.Column("action", sa.String(50), nullable=False),
sa.Column("method", sa.String(10), nullable=False),
sa.Column("path", sa.String(500), nullable=False),
sa.Column("request_body", sa.Text(), nullable=True),
sa.Column("ip_address", sa.String(50), nullable=True),
sa.Column("user_agent", sa.String(500), nullable=True),
sa.Column(
"created_at", sa.DateTime(), nullable=True, server_default=sa.func.now()
),
sa.ForeignKeyConstraint(
["impersonation_request_id"],
["impersonation_requests.id"],
name="fk_impersonation_audit_logs_request",
),
sa.ForeignKeyConstraint(
["admin_user_id"],
["users.id"],
name="fk_impersonation_audit_logs_admin_user",
),
sa.ForeignKeyConstraint(
["target_user_id"],
["users.id"],
name="fk_impersonation_audit_logs_target_user",
),
sa.PrimaryKeyConstraint("id"),
mysql_engine="InnoDB",
mysql_charset="utf8mb4",
mysql_collate="utf8mb4_unicode_ci",
)

# Create indexes for impersonation_audit_logs
op.create_index(
"ix_impersonation_audit_logs_id", "impersonation_audit_logs", ["id"], unique=False
)
op.create_index(
"ix_impersonation_audit_logs_request_id",
"impersonation_audit_logs",
["impersonation_request_id"],
unique=False,
)
op.create_index(
"ix_impersonation_audit_logs_admin_user_id",
"impersonation_audit_logs",
["admin_user_id"],
unique=False,
)
op.create_index(
"ix_impersonation_audit_logs_target_user_id",
"impersonation_audit_logs",
["target_user_id"],
unique=False,
)


def downgrade() -> None:
# Drop impersonation_audit_logs table and indexes
op.drop_index("ix_impersonation_audit_logs_target_user_id", "impersonation_audit_logs")
op.drop_index("ix_impersonation_audit_logs_admin_user_id", "impersonation_audit_logs")
op.drop_index("ix_impersonation_audit_logs_request_id", "impersonation_audit_logs")
op.drop_index("ix_impersonation_audit_logs_id", "impersonation_audit_logs")
op.drop_table("impersonation_audit_logs")

# Drop impersonation_requests table and indexes
op.drop_index("ix_impersonation_requests_token", "impersonation_requests")
op.drop_index("ix_impersonation_requests_target_user_id", "impersonation_requests")
op.drop_index("ix_impersonation_requests_admin_user_id", "impersonation_requests")
op.drop_index("ix_impersonation_requests_id", "impersonation_requests")
op.drop_table("impersonation_requests")
17 changes: 16 additions & 1 deletion backend/app/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@
#
# SPDX-License-Identifier: Apache-2.0

from app.api.endpoints import admin, auth, oidc, quota, repository, users
from app.api.endpoints import (
admin,
auth,
impersonate_confirm,
impersonation,
oidc,
quota,
repository,
users,
)
from app.api.endpoints.adapter import (
agents,
attachments,
Expand All @@ -22,6 +31,12 @@
api_router.include_router(oidc.router, prefix="/auth/oidc", tags=["auth", "oidc"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(admin.router, prefix="/admin", tags=["admin"])
api_router.include_router(
impersonation.router, prefix="/admin", tags=["admin", "impersonation"]
)
api_router.include_router(
impersonate_confirm.router, prefix="/impersonate", tags=["impersonation"]
)
api_router.include_router(bots.router, prefix="/bots", tags=["bots"])
api_router.include_router(models.router, prefix="/models", tags=["public-models"])
api_router.include_router(shells.router, prefix="/shells", tags=["shells"])
Expand Down
115 changes: 115 additions & 0 deletions backend/app/api/endpoints/impersonate_confirm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# SPDX-FileCopyrightText: 2025 Weibo, Inc.
#
# SPDX-License-Identifier: Apache-2.0

"""
Public API routes for impersonation confirmation (user-facing).
"""

from datetime import datetime, timezone

from fastapi import APIRouter, Depends, HTTPException, Path, status
from sqlalchemy.orm import Session

from app.api.dependencies import get_db
from app.core.security import get_current_user
from app.models.user import User
from app.schemas.impersonation import ImpersonationConfirmInfo
from app.services.impersonation_service import impersonation_service

router = APIRouter()


@router.get("/confirm/{token}", response_model=ImpersonationConfirmInfo)
async def get_impersonation_confirm_info(
token: str = Path(..., description="Impersonation request token"),
db: Session = Depends(get_db),
):
"""
Get impersonation request information for the confirmation page.

This endpoint is publicly accessible so users can view the request
before logging in to approve/reject it.
"""
request = impersonation_service.get_request_by_token(db, token)

# Calculate remaining time
now = datetime.now(timezone.utc)
if request.expires_at.tzinfo is None:
expires_at = request.expires_at.replace(tzinfo=timezone.utc)
else:
expires_at = request.expires_at

remaining_seconds = max(0, int((expires_at - now).total_seconds()))

# Check if expired
if remaining_seconds == 0 and request.status == "pending":
request.status = "expired"
db.commit()

return ImpersonationConfirmInfo(
id=request.id,
admin_user_name=request.admin_user.user_name,
target_user_name=request.target_user.user_name,
status=request.status,
expires_at=request.expires_at,
remaining_seconds=remaining_seconds,
created_at=request.created_at,
)


@router.post("/confirm/{token}/approve", response_model=ImpersonationConfirmInfo)
async def approve_impersonation_request(
token: str = Path(..., description="Impersonation request token"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Approve an impersonation request.

The target user must be logged in to approve the request.
"""
request = impersonation_service.approve_request(db, token, current_user)

# Calculate remaining time for session
now = datetime.now(timezone.utc)
if request.session_expires_at.tzinfo is None:
session_expires_at = request.session_expires_at.replace(tzinfo=timezone.utc)
else:
session_expires_at = request.session_expires_at

remaining_seconds = max(0, int((session_expires_at - now).total_seconds()))

return ImpersonationConfirmInfo(
id=request.id,
admin_user_name=request.admin_user.user_name,
target_user_name=request.target_user.user_name,
status=request.status,
expires_at=request.session_expires_at, # Use session expiry after approval
remaining_seconds=remaining_seconds,
created_at=request.created_at,
)


@router.post("/confirm/{token}/reject", response_model=ImpersonationConfirmInfo)
async def reject_impersonation_request(
token: str = Path(..., description="Impersonation request token"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Reject an impersonation request.

The target user must be logged in to reject the request.
"""
request = impersonation_service.reject_request(db, token, current_user)

return ImpersonationConfirmInfo(
id=request.id,
admin_user_name=request.admin_user.user_name,
target_user_name=request.target_user.user_name,
status=request.status,
expires_at=request.expires_at,
remaining_seconds=0,
created_at=request.created_at,
)
Loading
Loading