diff --git a/apps/web-backend/api/models/workspace.py b/apps/web-backend/api/models/workspace.py new file mode 100644 index 000000000..b3f63ecb1 --- /dev/null +++ b/apps/web-backend/api/models/workspace.py @@ -0,0 +1,91 @@ +""" +Workspace and membership models for cloud-hosted Auto Code (team mode). + +A Workspace is the tenant boundary that owns specs, runs, and repositories. +In single-user mode one "Personal" workspace is bootstrapped; in team mode there +are many, each with role-based members (see WorkspaceUser). Roles are checked in +core/permissions.py. +""" + +from datetime import UTC, datetime + +from core.database import Base +from sqlalchemy import ( + CheckConstraint, + Column, + DateTime, + ForeignKey, + Integer, + String, + UniqueConstraint, +) +from sqlalchemy.orm import relationship + + +class Workspace(Base): + """A tenant boundary owning specs/runs/repositories, with role-based members.""" + + __tablename__ = "workspaces" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String(255), nullable=False) + # RESTRICT, not CASCADE: deleting a user must not silently destroy a whole + # workspace (and its specs/runs). Ownership transfer/deletion is explicit + # application logic (Track C). + owner_id = Column( + Integer, + ForeignKey("users.id", ondelete="RESTRICT"), + nullable=False, + index=True, + ) + + created_at = Column(DateTime, default=lambda: datetime.now(UTC), nullable=False) + updated_at = Column( + DateTime, + default=lambda: datetime.now(UTC), + onupdate=lambda: datetime.now(UTC), + nullable=False, + ) + + owner = relationship("User") + members = relationship( + "WorkspaceUser", back_populates="workspace", cascade="all, delete-orphan" + ) + + def __repr__(self) -> str: + return f"" + + +class WorkspaceUser(Base): + """Membership linking a user to a workspace with a role (owner/editor/viewer).""" + + __tablename__ = "workspace_users" + __table_args__ = ( + UniqueConstraint("workspace_id", "user_id", name="uq_workspace_user"), + # Mirror the WorkspaceRole closed set (core/permissions.py) at the DB level. + CheckConstraint( + "role IN ('owner', 'editor', 'viewer')", name="ck_workspace_user_role" + ), + ) + + id = Column(Integer, primary_key=True, index=True) + workspace_id = Column( + Integer, + ForeignKey("workspaces.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + user_id = Column( + Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True + ) + role = Column(String(20), nullable=False, default="viewer") + created_at = Column(DateTime, default=lambda: datetime.now(UTC), nullable=False) + + workspace = relationship("Workspace", back_populates="members") + user = relationship("User") + + def __repr__(self) -> str: + return ( + f"" + ) diff --git a/apps/web-backend/core/permissions.py b/apps/web-backend/core/permissions.py new file mode 100644 index 000000000..de1e24b4b --- /dev/null +++ b/apps/web-backend/core/permissions.py @@ -0,0 +1,110 @@ +""" +Workspace-scoped role checks for cloud-hosted Auto Code (team mode). + +Roles are hierarchical: owner > editor > viewer. ``check_workspace_access`` is +the pure, unit-testable core; ``require_workspace_access`` wraps it as a FastAPI +dependency for routes that carry a ``workspace_id``. In single-user mode the lone +"Personal" workspace owner satisfies every check. +""" + +from enum import Enum + +from api.models.workspace import Workspace, WorkspaceUser +from fastapi import Depends, HTTPException, status +from sqlalchemy.orm import Session + +from core.database import get_db +from core.security import require_auth + + +class WorkspaceRole(str, Enum): + OWNER = "owner" + EDITOR = "editor" + VIEWER = "viewer" + + +# Higher rank = more privilege. +_ROLE_RANK: dict[WorkspaceRole, int] = { + WorkspaceRole.VIEWER: 0, + WorkspaceRole.EDITOR: 1, + WorkspaceRole.OWNER: 2, +} + + +def role_satisfies(actual: "WorkspaceRole | str", required: "WorkspaceRole | str") -> bool: + """True when ``actual`` grants at least the privilege of ``required``.""" + try: + return _ROLE_RANK[WorkspaceRole(actual)] >= _ROLE_RANK[WorkspaceRole(required)] + except (ValueError, KeyError): + return False + + +def user_role_in_workspace( + db: Session, user_id: int, workspace_id: int +) -> WorkspaceRole | None: + """Return the user's effective role in the workspace, or None if no access. + + The workspace owner (``Workspace.owner_id``) always has owner-level access, + even without an explicit membership row — this is what makes the single-user + "Personal" workspace owner satisfy every check. Everyone else gets the role + from their ``WorkspaceUser`` membership. + """ + workspace = db.query(Workspace).filter(Workspace.id == workspace_id).first() + if workspace is not None and workspace.owner_id == user_id: + return WorkspaceRole.OWNER + + membership = ( + db.query(WorkspaceUser) + .filter( + WorkspaceUser.workspace_id == workspace_id, + WorkspaceUser.user_id == user_id, + ) + .first() + ) + if membership is None: + return None + try: + return WorkspaceRole(membership.role) + except ValueError: + return None + + +def check_workspace_access( + db: Session, + user_id: int, + workspace_id: int, + required_role: WorkspaceRole = WorkspaceRole.VIEWER, +) -> bool: + """True when the user has at least ``required_role`` (owner or membership).""" + role = user_role_in_workspace(db, user_id, workspace_id) + return role is not None and role_satisfies(role, required_role) + + +def require_workspace_access(required_role: WorkspaceRole = WorkspaceRole.VIEWER): + """FastAPI dependency factory. + + Returns a dependency that raises 403 unless the authenticated user has at + least ``required_role`` in the route's ``workspace_id`` path/query param. + On success it returns the resolved WorkspaceRole. + """ + + def dependency( + workspace_id: int, + auth: dict = Depends(require_auth), + db: Session = Depends(get_db), + ) -> WorkspaceRole: + sub = auth.get("sub") + user_id = int(sub) if sub is not None and str(sub).isdigit() else None + role = ( + user_role_in_workspace(db, user_id, workspace_id) + if user_id is not None + else None + ) + if role is None or not role_satisfies(role, required_role): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Insufficient workspace permissions", + ) + return role + + return dependency diff --git a/apps/web-backend/migrations/versions/003_create_workspaces.py b/apps/web-backend/migrations/versions/003_create_workspaces.py new file mode 100644 index 000000000..ebcf1ea53 --- /dev/null +++ b/apps/web-backend/migrations/versions/003_create_workspaces.py @@ -0,0 +1,108 @@ +"""Create workspaces and workspace_users tables + +Revision ID: 003 +Revises: 002 +Create Date: 2026-06-24 00:00:00.000000 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# Alembic requires these module-level identifiers +__all__ = [ + "revision", + "down_revision", + "branch_labels", + "depends_on", + "upgrade", + "downgrade", +] + +# revision identifiers, used by Alembic. +revision: str = "003" +down_revision: str | None = "002" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Create workspaces and workspace_users (membership + role).""" + op.create_table( + "workspaces", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("name", sa.String(length=255), nullable=False), + sa.Column("owner_id", sa.Integer(), nullable=False), + sa.Column( + "created_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.Column( + "updated_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.PrimaryKeyConstraint("id"), + # RESTRICT: deleting a user must not cascade-delete their workspaces; + # ownership transfer/deletion is handled in application logic. + sa.ForeignKeyConstraint(["owner_id"], ["users.id"], ondelete="RESTRICT"), + ) + op.create_index( + op.f("ix_workspaces_owner_id"), "workspaces", ["owner_id"], unique=False + ) + + op.create_table( + "workspace_users", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("workspace_id", sa.Integer(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column( + "role", sa.String(length=20), nullable=False, server_default="viewer" + ), + sa.Column( + "created_at", + sa.DateTime(), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint( + ["workspace_id"], ["workspaces.id"], ondelete="CASCADE" + ), + sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"), + sa.UniqueConstraint("workspace_id", "user_id", name="uq_workspace_user"), + # Mirror the WorkspaceRole closed set (core/permissions.py) at the DB level. + sa.CheckConstraint( + "role IN ('owner', 'editor', 'viewer')", name="ck_workspace_user_role" + ), + ) + op.create_index( + op.f("ix_workspace_users_workspace_id"), + "workspace_users", + ["workspace_id"], + unique=False, + ) + op.create_index( + op.f("ix_workspace_users_user_id"), + "workspace_users", + ["user_id"], + unique=False, + ) + + +def downgrade() -> None: + """Drop workspace_users and workspaces.""" + op.drop_index( + op.f("ix_workspace_users_user_id"), table_name="workspace_users" + ) + op.drop_index( + op.f("ix_workspace_users_workspace_id"), table_name="workspace_users" + ) + op.drop_table("workspace_users") + op.drop_index(op.f("ix_workspaces_owner_id"), table_name="workspaces") + op.drop_table("workspaces") diff --git a/apps/web-backend/tests/conftest.py b/apps/web-backend/tests/conftest.py index 9178ce768..885b8f806 100644 --- a/apps/web-backend/tests/conftest.py +++ b/apps/web-backend/tests/conftest.py @@ -19,6 +19,7 @@ # Import all models so Base.metadata knows about all tables # These imports register models with SQLAlchemy Base.metadata from api.models.user import User # noqa: F401 +from api.models.workspace import Workspace, WorkspaceUser # noqa: F401 # Import application components from core.database import Base, get_db @@ -29,7 +30,7 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.pool import StaticPool -__all__ = ["User", "GitRepository"] +__all__ = ["User", "GitRepository", "Workspace", "WorkspaceUser"] @pytest.fixture(scope="session", autouse=True) diff --git a/apps/web-backend/tests/test_workspaces.py b/apps/web-backend/tests/test_workspaces.py new file mode 100644 index 000000000..2c719ea89 --- /dev/null +++ b/apps/web-backend/tests/test_workspaces.py @@ -0,0 +1,136 @@ +"""Tests for workspace models and role-based access (C1). + +Relies on the shared `test_db` fixture (in-memory SQLite with all tables created). +""" + +import secrets + +import pytest +from api.models.user import User +from api.models.workspace import Workspace, WorkspaceUser +from core.permissions import ( + WorkspaceRole, + check_workspace_access, + role_satisfies, + user_role_in_workspace, +) +from sqlalchemy.exc import IntegrityError + + +def _make_user(db, email: str) -> User: + # Use a random throwaway hash (not a literal) directly: these tests exercise + # workspace logic, not auth, so we avoid the bcrypt backend entirely. + user = User(email=email, hashed_password=secrets.token_hex(16)) + db.add(user) + db.commit() + db.refresh(user) + return user + + +def test_workspace_membership_relationships(test_db): + owner = _make_user(test_db, "owner@test.com") + workspace = Workspace(name="Team A", owner_id=owner.id) + test_db.add(workspace) + test_db.commit() + test_db.refresh(workspace) + + member = _make_user(test_db, "dev@test.com") + membership = WorkspaceUser( + workspace_id=workspace.id, user_id=member.id, role="editor" + ) + test_db.add(membership) + test_db.commit() + test_db.refresh(membership) + + assert workspace.owner.email == "owner@test.com" + assert len(workspace.members) == 1 + assert workspace.members[0].user.email == "dev@test.com" + assert membership.workspace.name == "Team A" + + +def test_membership_is_unique_per_user(test_db): + owner = _make_user(test_db, "o2@test.com") + workspace = Workspace(name="WS", owner_id=owner.id) + test_db.add(workspace) + test_db.commit() + test_db.refresh(workspace) + + test_db.add( + WorkspaceUser(workspace_id=workspace.id, user_id=owner.id, role="owner") + ) + test_db.commit() + + # A second membership for the same (workspace, user) violates the unique constraint. + test_db.add( + WorkspaceUser(workspace_id=workspace.id, user_id=owner.id, role="viewer") + ) + + with pytest.raises(IntegrityError): + test_db.commit() + test_db.rollback() + + +def test_role_hierarchy(): + assert role_satisfies(WorkspaceRole.OWNER, WorkspaceRole.EDITOR) + assert role_satisfies(WorkspaceRole.EDITOR, WorkspaceRole.VIEWER) + assert role_satisfies(WorkspaceRole.VIEWER, WorkspaceRole.VIEWER) + assert not role_satisfies(WorkspaceRole.VIEWER, WorkspaceRole.EDITOR) + assert not role_satisfies(WorkspaceRole.EDITOR, WorkspaceRole.OWNER) + # Unknown roles never satisfy a requirement. + assert not role_satisfies("bogus", WorkspaceRole.VIEWER) + + +def test_check_workspace_access(test_db): + owner = _make_user(test_db, "owner3@test.com") + workspace = Workspace(name="WS3", owner_id=owner.id) + test_db.add(workspace) + test_db.commit() + test_db.refresh(workspace) + + member = _make_user(test_db, "editor@test.com") + test_db.add( + WorkspaceUser(workspace_id=workspace.id, user_id=member.id, role="editor") + ) + test_db.commit() + + # An editor satisfies viewer + editor, but not owner. + assert check_workspace_access(test_db, member.id, workspace.id, WorkspaceRole.VIEWER) + assert check_workspace_access(test_db, member.id, workspace.id, WorkspaceRole.EDITOR) + assert not check_workspace_access( + test_db, member.id, workspace.id, WorkspaceRole.OWNER + ) + + # A non-member has no role and no access. + outsider = _make_user(test_db, "outsider@test.com") + assert user_role_in_workspace(test_db, outsider.id, workspace.id) is None + assert not check_workspace_access(test_db, outsider.id, workspace.id) + + +def test_workspace_owner_has_owner_access_without_membership(test_db): + # The single-user "Personal" workspace has an owner but no membership row; + # access must still derive from Workspace.owner_id at owner level. + owner = _make_user(test_db, "soleowner@test.com") + workspace = Workspace(name="Personal", owner_id=owner.id) + test_db.add(workspace) + test_db.commit() + test_db.refresh(workspace) + + assert user_role_in_workspace(test_db, owner.id, workspace.id) == WorkspaceRole.OWNER + assert check_workspace_access(test_db, owner.id, workspace.id, WorkspaceRole.OWNER) + + +def test_invalid_role_is_rejected_by_db(test_db): + # The CHECK constraint mirrors the WorkspaceRole closed set, so roles outside + # {owner, editor, viewer} cannot be persisted. + owner = _make_user(test_db, "ck@test.com") + workspace = Workspace(name="WS-CK", owner_id=owner.id) + test_db.add(workspace) + test_db.commit() + test_db.refresh(workspace) + + test_db.add( + WorkspaceUser(workspace_id=workspace.id, user_id=owner.id, role="superadmin") + ) + with pytest.raises(IntegrityError): + test_db.commit() + test_db.rollback()