Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions backend/app/alembic/versions/058_v1_query_optimization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""v1.0 query optimization: project_id + composite indexes, drop is_deleted

Revision ID: 058
Revises: 057
Create Date: 2026-05-05 12:00:00.000000

Bundles three coordinated changes for v1.0 lock:

1. Single-column `project_id` btree indexes on every table-mapped model
that filters by project_id (the dominant tenant filter).
organization_id-only access is rare and intentionally deferred.
Tables already covered by a leading-column index are skipped:
- openai_assistant: UNIQUE(project_id, assistant_id) leads with project_id
- batch_job: ix_batch_job_project_id (migration 036)

2. Composite + partial indexes for hot list/pagination paths matching:
WHERE project_id = ? [AND deleted_at IS NULL] ORDER BY <ts> DESC
Plus a small partial index `ix_evaluation_run_processing` for the
cron polling queries that filter by (type, status='processing')
without an organization_id predicate.

3. Drop the redundant `is_deleted` boolean from every table that also
carries `deleted_at`. `deleted_at IS NULL` becomes the single source
of truth for soft-delete: same query cost when paired with a partial
index, preserves audit timestamp, no dual-write drift.
Affected tables: openai_assistant, apikey, document,
openai_conversation, fine_tuning, model_evaluation.

Execution model:
Phase A (transactional): backfill deleted_at where is_deleted was true
but deleted_at was never set, then drop the is_deleted columns.
Phase B (autocommit_block): CREATE INDEX CONCURRENTLY for every index
so no AccessExclusiveLock is taken on hot tables.
"""

import sqlalchemy as sa
from alembic import op


revision = "058"
down_revision = "057"
branch_labels = None
depends_on = None


# Tables that currently carry both `is_deleted` and `deleted_at`.
IS_DELETED_TABLES = [
"openai_assistant",
"apikey",
"document",
"openai_conversation",
"fine_tuning",
"model_evaluation",
]


# Single-column FK / multi-tenant filter indexes (P0).
# (table_name, column_name, index_name)
FK_INDEXES: list[tuple[str, str, str]] = [
# project_id across tables that filter by tenant
("apikey", "project_id", "ix_apikey_project_id"),
("credential", "project_id", "ix_credential_project_id"),
("collection", "project_id", "ix_collection_project_id"),
("collection_jobs", "project_id", "ix_collection_jobs_project_id"),
("document", "project_id", "ix_document_project_id"),
("evaluation_dataset", "project_id", "ix_evaluation_dataset_project_id"),
("evaluation_run", "project_id", "ix_evaluation_run_project_id"),
("file", "project_id", "ix_file_project_id"),
("fine_tuning", "project_id", "ix_fine_tuning_project_id"),
("job", "project_id", "ix_job_project_id"),
("llm_call", "project_id", "ix_llm_call_project_id"),
("llm_chain", "project_id", "ix_llm_chain_project_id"),
("model_evaluation", "project_id", "ix_model_evaluation_project_id"),
("openai_conversation", "project_id", "ix_openai_conversation_project_id"),
("stt_result", "project_id", "ix_stt_result_project_id"),
("stt_sample", "project_id", "ix_stt_sample_project_id"),
("tts_result", "project_id", "ix_tts_result_project_id"),
("user_project", "project_id", "ix_user_project_project_id"),
# Other un-indexed FKs surfaced by the audit
("apikey", "user_id", "ix_apikey_user_id"),
("collection_jobs", "collection_id", "ix_collection_jobs_collection_id"),
(
"doc_transformation_job",
"source_document_id",
"ix_doc_transformation_job_source_document_id",
),
(
"doc_transformation_job",
"transformed_document_id",
"ix_doc_transformation_job_transformed_document_id",
),
("evaluation_run", "dataset_id", "ix_evaluation_run_dataset_id"),
]


# Composite + partial indexes (P1). (index_name, body_after_INDEX_NAME, schema)
# `schema` is the unquoted PG schema for downgrade DROP INDEX, or None for
# the default (public) schema. The upgrade body already names the schema
# inline in its ON clause; the field exists so downgrade doesn't have to
# string-sniff it back out.
COMPOSITE_INDEXES: list[tuple[str, str, str | None]] = [
(
"ix_document_project_inserted_at_active",
'ON "document" ("project_id", "inserted_at" DESC) WHERE "deleted_at" IS NULL',
None,
),
(
"ix_openai_conversation_project_inserted_at_active",
'ON "openai_conversation" ("project_id", "inserted_at" DESC) WHERE "deleted_at" IS NULL',
None,
),
(
"ix_openai_conversation_ancestor_project_inserted_at_active",
'ON "openai_conversation" ("ancestor_response_id", "project_id", "inserted_at" DESC) WHERE "deleted_at" IS NULL',
None,
),
(
"ix_openai_conversation_response_project_active",
'ON "openai_conversation" ("response_id", "project_id") WHERE "deleted_at" IS NULL',
None,
),
(
"ix_collection_jobs_project_status_inserted_at",
'ON "collection_jobs" ("project_id", "status", "inserted_at" DESC)',
None,
),
(
"ix_evaluation_run_org_project_type_inserted_at",
'ON "evaluation_run" ("organization_id", "project_id", "type", "inserted_at" DESC)',
None,
),
# Partial index for cron polling queries that filter by
# (type, status='processing') without an organization_id predicate
# (crud/evaluations/cron_utils.py and crud/evaluations/processing.py).
# The composite above leads with organization_id and does not serve
# these unscoped scans.
(
"ix_evaluation_run_processing",
'ON "evaluation_run" ("type", "batch_job_id") WHERE "status" = \'processing\'',
None,
),
(
"ix_evaluation_dataset_org_project_type_inserted_at",
'ON "evaluation_dataset" ("organization_id", "project_id", "type", "inserted_at" DESC)',
None,
),
(
"ix_model_evaluation_document_project_updated_at",
'ON "model_evaluation" ("document_id", "project_id", "updated_at" DESC) WHERE "deleted_at" IS NULL',
None,
),
(
"ix_model_config_active_provider_name",
'ON "global"."model_config" ("is_active", "provider", "model_name")',
"global",
),
(
"ix_collection_project_active",
'ON "collection" ("project_id") WHERE "deleted_at" IS NULL',
None,
),
# Composite FK indexes that match the actual query shape
(
"ix_fine_tuning_document_project",
'ON "fine_tuning" ("document_id", "project_id")',
None,
),
(
"ix_model_evaluation_fine_tuning_project",
'ON "model_evaluation" ("fine_tuning_id", "project_id")',
None,
),
# Partial index for active-key listing on apikey
(
"ix_apikey_project_active",
'ON "apikey" ("project_id") WHERE "deleted_at" IS NULL',
None,
),
]


def upgrade():
# Phase A (transactional): preserve audit timestamp, drop redundant column.
for table in IS_DELETED_TABLES:
op.execute(
f"UPDATE {table} "
f"SET deleted_at = NOW() "
f"WHERE is_deleted = TRUE AND deleted_at IS NULL"
)
op.drop_column(table, "is_deleted")

# Phase B (autocommit): CONCURRENTLY index creation. Each statement
# runs in its own implicit transaction, required by the CONCURRENTLY
# variant.
with op.get_context().autocommit_block():
for table, column, index in FK_INDEXES:
op.execute(
f'CREATE INDEX CONCURRENTLY IF NOT EXISTS "{index}" '
f'ON "{table}" ("{column}")'
)
for index, body, _schema in COMPOSITE_INDEXES:
op.execute(f'CREATE INDEX CONCURRENTLY IF NOT EXISTS "{index}" {body}')
Comment on lines +182 to +202
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

find backend/app/alembic/versions -name "058_v1_query_optimization.py" -type f

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 133


🏁 Script executed:

wc -l backend/app/alembic/versions/058_v1_query_optimization.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 137


🏁 Script executed:

cat -n backend/app/alembic/versions/058_v1_query_optimization.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 10237


Add type hints to upgrade() and downgrade() functions, and redesign migration phases to prevent unrecoverable partial failures.

The upgrade() and downgrade() functions are missing return type annotations (-> None), violating the type hints guideline.

More critically, autocommit_block() at line 195 commits Phase A changes (UPDATE + drop_column) before Phase B index creation begins. If any CREATE INDEX CONCURRENTLY fails, the revision is not recorded as complete but the is_deleted columns are already dropped. A rerun fails on drop_column() with a missing column error, leaving the migration unrecoverable without manual intervention. Wrap Phase A operations inside conditional checks (e.g., column_exists() before drop) or consolidate transaction semantics so either all changes commit together or all roll back on any failure.

🧰 Tools
🪛 OpenGrep (1.20.0)

[ERROR] 202-202: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/alembic/versions/058_v1_query_optimization.py` around lines 182 -
202, Add explicit return type annotations "-> None" to the upgrade() and
downgrade() functions, and prevent unrecoverable partial commits by ensuring
Phase A (the UPDATE and drop_column on IS_DELETED_TABLES) cannot be committed
before Phase B index creation fails: either run Phase A inside the same
transactional boundary as Phase B (avoid using
op.get_context().autocommit_block() for Phase A) or keep autocommit for Phase B
only and guard each drop with a runtime existence check (use a helper like
column_exists(table, "is_deleted") before calling op.drop_column) so a retry
won't error on missing columns; locate these changes around upgrade(),
autocommit_block(), IS_DELETED_TABLES, FK_INDEXES, and COMPOSITE_INDEXES and
apply the same pattern to downgrade().



def downgrade():
with op.get_context().autocommit_block():
for index, _body, schema in COMPOSITE_INDEXES:
qualified = f'"{schema}"."{index}"' if schema else f'"{index}"'
op.execute(f"DROP INDEX CONCURRENTLY IF EXISTS {qualified}")
for _table, _column, index in FK_INDEXES:
op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{index}"')

for table in IS_DELETED_TABLES:
op.add_column(
table,
sa.Column(
"is_deleted",
sa.Boolean(),
nullable=False,
server_default=sa.text("false"),
comment="Soft delete flag",
),
)
op.execute(f"UPDATE {table} SET is_deleted = TRUE WHERE deleted_at IS NOT NULL")
98 changes: 98 additions & 0 deletions backend/app/alembic/versions/059_drop_redundant_indexes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""drop redundant indexes superseded by 058 composites

Revision ID: 059
Revises: 058
Create Date: 2026-05-05 14:00:00.000000

Drops indexes that are now redundant after migration 058 added the
real composite/partial indexes that match actual query shapes:

ix_project_name
Subsumed by uq_project_name_org_id (name is leading column).
No code path queries Project.name without organization_id.

ix_credential_provider
Subsumed by uq_credential_org_project_provider. All four CRUD
paths in crud/credentials.py filter (org, project, provider) — never
provider alone.

ix_openai_conversation_previous_response_id
Zero query consumers; previous_response_id is read but never
filtered on in any WHERE clause.

ix_openai_conversation_response_id
Superseded by ix_openai_conversation_response_project_active
(project-scoped partial), which exactly matches CRUD predicates
in crud/openai_conversation.py:get_conversation_by_response_id.

ix_openai_conversation_ancestor_response_id
Superseded by
ix_openai_conversation_ancestor_project_inserted_at_active, which
matches the (ancestor_response_id, project_id) + ORDER BY shape
used in crud/openai_conversation.py:get_conversation_by_ancestor_id
and the /responses thread reconstruction path.

idx_file_type
Low cardinality (4 enum values) and the only consumer in
crud/file.py:147 always pairs file_type with (organization_id,
project_id). idx_file_org_project covers the query; an extra
in-memory filter on file_type is cheaper than a second index hit.

idx_eval_run_status_org / idx_eval_run_status_project
Both lead with low-cardinality status. Real CRUD queries lead with
(organization_id, project_id, type), now covered by
ix_evaluation_run_org_project_type_inserted_at.

Uses DROP INDEX CONCURRENTLY so no AccessExclusiveLock is taken.
Downgrade recreates the original indexes (also concurrently) so the
schema can be restored bit-for-bit if needed.
"""

from alembic import op


revision = "059"
down_revision = "058"
branch_labels = None
depends_on = None


# (index_name, recreate_sql_body)
# recreate_sql_body is "ON \"<table>\" (<columns>)" used by downgrade only.
INDEXES_TO_DROP: list[tuple[str, str]] = [
("ix_project_name", 'ON "project" ("name")'),
("ix_credential_provider", 'ON "credential" ("provider")'),
(
"ix_openai_conversation_previous_response_id",
'ON "openai_conversation" ("previous_response_id")',
),
(
"ix_openai_conversation_response_id",
'ON "openai_conversation" ("response_id")',
),
(
"ix_openai_conversation_ancestor_response_id",
'ON "openai_conversation" ("ancestor_response_id")',
),
("idx_file_type", 'ON "file" ("file_type")'),
(
"idx_eval_run_status_org",
'ON "evaluation_run" ("status", "organization_id")',
),
(
"idx_eval_run_status_project",
'ON "evaluation_run" ("status", "project_id")',
),
]


def upgrade():
with op.get_context().autocommit_block():
for index_name, _body in INDEXES_TO_DROP:
op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{index_name}"')


def downgrade():
with op.get_context().autocommit_block():
for index_name, body in INDEXES_TO_DROP:
op.execute(f'CREATE INDEX CONCURRENTLY IF NOT EXISTS "{index_name}" {body}')
Comment on lines +89 to +98
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
python - <<'PY'
import ast
from pathlib import Path

path = Path("backend/app/alembic/versions/059_drop_redundant_indexes.py")
tree = ast.parse(path.read_text())

for node in tree.body:
    if isinstance(node, ast.FunctionDef):
        if node.returns is None:
            print(f"{node.name}: missing return type annotation")
PY

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 157


Add explicit return type hints to migration functions.

The upgrade() (line 89) and downgrade() (line 95) functions must declare -> None to comply with repository typing guidelines for Python files.

♻️ Suggested patch
-def upgrade():
+def upgrade() -> None:
     with op.get_context().autocommit_block():
         for index_name, _body in INDEXES_TO_DROP:
             op.execute(f'DROP INDEX CONCURRENTLY IF EXISTS "{index_name}"')
 
 
-def downgrade():
+def downgrade() -> None:
     with op.get_context().autocommit_block():
         for index_name, body in INDEXES_TO_DROP:
             op.execute(f'CREATE INDEX CONCURRENTLY IF NOT EXISTS "{index_name}" {body}')
🧰 Tools
🪛 OpenGrep (1.20.0)

[ERROR] 92-92: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)


[ERROR] 98-98: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.

(coderabbit.sql-injection.python-fstring-execute)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/alembic/versions/059_drop_redundant_indexes.py` around lines 89 -
98, The migration functions lack explicit return type hints; update the function
signatures for upgrade and downgrade to declare return type None (i.e., change
upgrade() and downgrade() to upgrade() -> None and downgrade() -> None) so they
comply with the repository typing guidelines while keeping the existing bodies
(including op.get_context().autocommit_block() and the loops over
INDEXES_TO_DROP) unchanged.

Loading
Loading