-
Notifications
You must be signed in to change notification settings - Fork 10
Job Management: Preserve llm_call details #823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8cc7b37
cff973b
013e841
0ce7dad
07f4fc6
486af51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| """make llm_call input_type, provider, model nullable | ||
|
|
||
| Revision ID: 058 | ||
| Revises: 057 | ||
| Create Date: 2026-05-11 00:00:00.000000 | ||
|
|
||
| """ | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "058" | ||
| down_revision = "057" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| op.alter_column("llm_call", "input_type", nullable=True) | ||
| op.alter_column("llm_call", "provider", nullable=True) | ||
| op.alter_column("llm_call", "model", nullable=True) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| op.alter_column("llm_call", "model", nullable=False) | ||
| op.alter_column("llm_call", "provider", nullable=False) | ||
| op.alter_column("llm_call", "input_type", nullable=False) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def serialize_input(query_input: QueryInput | str) -> str: | ||
| def serialize_input(query_input: QueryInput | str | list) -> str: | ||
| """Serialize query input for database storage. | ||
|
|
||
| For text: stores the actual content value | ||
|
|
@@ -31,6 +31,15 @@ def serialize_input(query_input: QueryInput | str) -> str: | |
| elif isinstance(query_input, TextInput): | ||
| return query_input.content.value | ||
| elif isinstance(query_input, AudioInput): | ||
| if query_input.content.format == "url": | ||
| return json.dumps( | ||
| { | ||
| "type": "audio", | ||
| "format": "url", | ||
| "mime_type": query_input.content.mime_type, | ||
| "url": query_input.content.value, | ||
| } | ||
| ) | ||
| return json.dumps( | ||
| { | ||
| "type": "audio", | ||
|
|
@@ -187,11 +196,12 @@ def update_llm_call_response( | |
| db_llm_call.provider_response_id = provider_response_id | ||
|
|
||
| if content is not None: | ||
| # For audio outputs (AudioOutput model): calculate size metadata from base64 content | ||
| # AudioOutput serializes as: {"type": "audio", "content": {"format": "base64", "value": "...", "mime_type": "..."}} | ||
| # For audio outputs: calculate size only when content is still base64 (not a URI) | ||
| if content.get("type") == "audio": | ||
| audio_value = content.get("content", {}).get("value") | ||
| if audio_value: | ||
| audio_content = content.get("content", {}) | ||
| audio_format = audio_content.get("format") | ||
| audio_value = audio_content.get("value") | ||
| if audio_value and audio_format == "base64": | ||
| try: | ||
| audio_data = base64.b64decode(audio_value) | ||
| content["audio_size_bytes"] = len(audio_data) | ||
|
|
@@ -218,6 +228,27 @@ def update_llm_call_response( | |
| return db_llm_call | ||
|
|
||
|
|
||
| def update_llm_call_input( | ||
| session: Session, | ||
| llm_call_id: UUID, | ||
| s3_uri: str, | ||
| ) -> None: | ||
| """Overwrite llm_call.input with an S3 URI after uploading STT audio.""" | ||
| db_llm_call = session.get(LlmCall, llm_call_id) | ||
| if not db_llm_call: | ||
| logger.warning( | ||
| f"[update_llm_call_input] LLM call not found | llm_call_id={llm_call_id}" | ||
| ) | ||
| return | ||
| db_llm_call.input = s3_uri | ||
| db_llm_call.updated_at = now() | ||
| session.add(db_llm_call) | ||
| session.commit() | ||
| logger.info( | ||
| f"[update_llm_call_input] Updated input URI | llm_call_id={llm_call_id}" | ||
| ) | ||
|
|
||
|
|
||
| def get_llm_call_by_id( | ||
| session: Session, | ||
| llm_call_id: UUID, | ||
|
|
@@ -248,3 +279,96 @@ def get_llm_calls_by_job_id( | |
| ) | ||
|
|
||
| return list(session.exec(statement).all()) | ||
|
|
||
|
|
||
| def get_llm_call_by_job_id(session: Session, job_id: UUID) -> LlmCall | None: | ||
| """Return the single active LlmCall for a standalone job (no chain_id).""" | ||
| statement = select(LlmCall).where( | ||
| LlmCall.job_id == job_id, | ||
| LlmCall.chain_id.is_(None), | ||
| LlmCall.deleted_at.is_(None), | ||
| ) | ||
| return session.exec(statement).first() | ||
|
Comment on lines
+284
to
+291
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make Using Suggested query tightening def get_llm_call_by_job_id(session: Session, job_id: UUID) -> LlmCall | None:
"""Return the single active LlmCall for a standalone job (no chain_id)."""
statement = select(LlmCall).where(
LlmCall.job_id == job_id,
LlmCall.chain_id.is_(None),
LlmCall.deleted_at.is_(None),
+ LlmCall.provider.is_(None),
+ LlmCall.model.is_(None),
+ LlmCall.content.is_(None),
)
- return session.exec(statement).first()
+ statement = statement.order_by(LlmCall.created_at.desc())
+ return session.exec(statement).first()🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| def create_llm_call_pending( | ||
| session: Session, | ||
| *, | ||
| job_id: UUID, | ||
| project_id: int, | ||
| organization_id: int, | ||
| request: LLMCallRequest, | ||
| chain_id: UUID | None = None, | ||
| ) -> LlmCall: | ||
| """Create a minimal LlmCall row at job-creation time. | ||
|
|
||
| Only fields available before config resolution are populated. | ||
| input_type, output_type, provider, model, content, usage stay NULL | ||
| and are filled in by the Celery task via update_llm_call_resolved_fields(). | ||
| """ | ||
| config_dict: dict[str, Any] | None = None | ||
| if request.config.is_stored_config: | ||
| config_dict = { | ||
| "config_id": str(request.config.id), | ||
| "config_version": request.config.version, | ||
| } | ||
|
|
||
| conversation_id = None | ||
| auto_create = None | ||
| if request.query.conversation: | ||
| conversation_id = request.query.conversation.id | ||
| auto_create = request.query.conversation.auto_create | ||
|
|
||
| db_llm_call = LlmCall( | ||
| job_id=job_id, | ||
| project_id=project_id, | ||
| organization_id=organization_id, | ||
| chain_id=chain_id, | ||
| input=serialize_input(request.query.input), | ||
| conversation_id=conversation_id, | ||
| auto_create=auto_create, | ||
| config=config_dict, | ||
| ) | ||
|
|
||
| session.add(db_llm_call) | ||
| session.commit() | ||
| session.refresh(db_llm_call) | ||
|
|
||
| logger.info( | ||
| f"[create_llm_call_pending] Created pending LLM call id={db_llm_call.id}, job_id={job_id}" | ||
| ) | ||
|
|
||
| return db_llm_call | ||
|
|
||
|
|
||
| def update_llm_call_resolved_fields( | ||
| session: Session, | ||
| *, | ||
| llm_call_id: UUID, | ||
| input_type: str, | ||
| output_type: str | None, | ||
| provider: str | None, | ||
| model: str | None, | ||
| config: dict[str, Any], | ||
| ) -> LlmCall: | ||
| """Populate config-resolved fields on a pending LlmCall row.""" | ||
| db_llm_call = session.get(LlmCall, llm_call_id) | ||
| if not db_llm_call: | ||
| raise ValueError(f"LLM call not found with id={llm_call_id}") | ||
|
|
||
| db_llm_call.input_type = input_type | ||
| db_llm_call.output_type = output_type | ||
| db_llm_call.provider = provider | ||
| db_llm_call.model = model | ||
| db_llm_call.config = config | ||
| db_llm_call.updated_at = now() | ||
|
|
||
| session.add(db_llm_call) | ||
| session.commit() | ||
| session.refresh(db_llm_call) | ||
|
|
||
| logger.info( | ||
| f"[update_llm_call_resolved_fields] Updated resolved fields | llm_call_id={llm_call_id}, provider={provider}, model={model}" | ||
| ) | ||
|
|
||
| return db_llm_call | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Downgrade can fail once NULL rows exist.
Setting these columns back to
nullable=Falsewithout backfilling will fail on rollback if any row has NULL ininput_type,provider, ormodel.Suggested migration hardening
🤖 Prompt for AI Agents