feat: external-connector adapter#1646
Conversation
WalkthroughAdds an external connector framework: import-time discovery for ChangesExternal Connector Framework
Sequence Diagram(s)sequenceDiagram
participant Client
participant ExternalConnector
participant LibraryBackend
participant ServiceBackend
participant EncryptedFileTokenStore
Client->>ExternalConnector: request (list_files, get_file_content, authenticate)
ExternalConnector->>LibraryBackend: delegate (library mode)
ExternalConnector->>ServiceBackend: delegate (service mode)
ServiceBackend->>EncryptedFileTokenStore: load/persist oauth state
EncryptedFileTokenStore-->>ServiceBackend: oauth state saved/loaded
ServiceBackend->>RemoteService: POST /v1/{connector_type}/{operation}
RemoteService-->>ServiceBackend: response (+ refreshed oauth)
ServiceBackend-->>ExternalConnector: result
ExternalConnector-->>Client: final response
🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers:
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/connectors/connection_manager.py (1)
1-20:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix import block ordering to unblock Ruff (I001).
CI is failing on this file’s import block formatting. Reorder/group imports and initialize
loggerafter imports.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/connectors/connection_manager.py` around lines 1 - 20, The import block in connection_manager.py must be reordered into standard library imports first, third-party next (e.g., aiofiles), then local package imports, and the logger variable should be initialized after the get_logger import; specifically, group and sort imports (json, os, uuid, datetime, pathlib, typing/dataclasses) then aiofiles and any other third-party, then from .base and the connector modules, and finally import get_logger from utils.logging_config and call logger = get_logger(__name__) after that import; ensure no logger initialization appears before the imports so Ruff I001 is resolved.
🧹 Nitpick comments (2)
src/connectors/external/service_backend.py (2)
158-166: ⚡ Quick winRedundant base64 encode/decode cycle.
The payload is encoded to base64 on line 159 and immediately decoded back on line 164. This round-trip is unnecessary.
Proposed fix
async def handle_webhook(self, payload: Dict[str, Any]) -> List[str]: - body_b64 = base64.b64encode(json.dumps(payload).encode()).decode() result = await self.process_webhook_full( method="POST", headers=payload.get("_headers", {}), query_params={}, - body_bytes=base64.b64decode(body_b64), + body_bytes=json.dumps(payload).encode(), ) return result.get("file_ids", [])🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/connectors/external/service_backend.py` around lines 158 - 166, In handle_webhook, remove the redundant base64 encode/decode cycle: instead of creating body_b64 and immediately decoding it, pass the payload bytes directly to process_webhook_full as body_bytes (e.g. json.dumps(payload).encode()), keeping the same headers and query_params; update references to body_b64 to use the direct bytes and remove the unused variable so handle_webhook and its call to process_webhook_full are simpler and do not perform unnecessary encoding/decoding.
73-85: ⚖️ Poor tradeoffDynamic
os.environlookups also violate the config guidelines.While the dynamic nature of
CLIENT_ID_ENV_VAR/CLIENT_SECRET_ENV_VARmakes this more complex, these lookups should still be routed throughconfig/settings.py. Consider adding a helper function in settings that accepts the env var name as a parameter, keepingos.environaccess centralized.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/connectors/external/service_backend.py` around lines 73 - 85, The code performs direct os.environ lookups using upstream_cls.CLIENT_ID_ENV_VAR and CLIENT_SECRET_ENV_VAR when setting self.client_id and self.client_secret; move these environment accesses into the centralized config/settings layer by adding a helper like settings.get_env_var(name) (or similar) that accepts the env var name and returns its value, then replace the direct os.environ.get(...) calls in the block that uses get_connector_cls(connector_type)/upstream_cls and assigns client_id/client_secret to call that settings helper instead; ensure you still fall back to config.get("client_id") / config.get("client_secret") before calling the settings helper.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/connectors/connection_manager.py`:
- Around line 22-34: The _external_connector_types() function currently maps
known_types() to get_connector_cls(t) without handling per-plugin failures, so
any exception in get_connector_cls will abort the whole discovery; modify
_external_connector_types() to iterate over known_types(), call
get_connector_cls(t) inside a try-except for each t, on exception log a warning
(including the type name and exception) and skip that entry, and only include
successfully retrieved classes in the returned dict; keep existing ImportError
handling for the package import and use the get_connector_cls and known_types
symbols as the targets for the change.
In `@src/connectors/external/__init__.py`:
- Around line 28-29: The code reads OPENRAG_CONNECTORS_PATH directly from
os.environ (variable candidate) which violates the config boundary; change the
lookup to import and use the centralized setting from config.settings (e.g.
import config.settings and read the connectors path constant there) and replace
all uses of candidate = os.environ.get("OPENRAG_CONNECTORS_PATH") with the
setting (preserving the existing fallback/if-not-candidate behavior). Update any
related variable names if needed (candidate → connectors_path) and ensure no
direct os.environ access remains in src/connectors/external/__init__.py.
- Around line 21-26: The try/except around import openrag_connectors is too
broad and can hide real import errors; modify the exception handling to only
swallow missing-package errors by catching ModuleNotFoundError (or catching
ImportError as e and re-raising unless e.name == "openrag_connectors"), so that
genuine import failures inside openrag_connectors bubble up while still falling
back when the package itself is not installed; update the block surrounding the
import_openrag_connectors import statement in __init__.py accordingly.
In `@src/connectors/external/encrypted_token_store.py`:
- Around line 31-32: The delete() implementation has a TOCTOU race between
os.path.exists and os.remove and can raise FileNotFoundError under concurrency;
replace the exists-then-remove pattern in the EncryptedTokenStore.delete (or
method named delete) with a single remove call wrapped to swallow only
FileNotFoundError and re-raise other exceptions (e.g., try:
os.remove(self.file_path) except FileNotFoundError: pass) so deletion is
idempotent and race-safe, avoid logging any credential data, and let other
errors surface cleanly.
In `@src/connectors/external/service_backend.py`:
- Around line 15-16: Replace the direct stdlib import of logging with get_logger
from utils.logging_config: remove "import logging", add "from
utils.logging_config import get_logger" and create a module logger (e.g., logger
= get_logger(__name__)); then update any calls that use the stdlib logging
module (e.g., logging.info/debug/error) to use the module-level logger
(logger.info/debug/error) so the file-level logging uses the centralized
get_logger behavior—look for occurrences in service_backend.py and symbols like
logger or get_logger to apply the change.
- Around line 47-56: The code reads OPENRAG_CONNECTORS_URL and
OPENRAG_CONNECTORS_TOKEN directly from os.environ to set self.service_url and
self.bearer; instead import OPENRAG_CONNECTORS_URL and OPENRAG_CONNECTORS_TOKEN
from config.settings and use those values to assign self.service_url and
self.bearer (keep the same RuntimeError checks/messages if missing) so no
os.environ access remains in service_backend.py; update the top-level imports to
pull from config.settings and remove any direct os.environ usage.
- Around line 111-126: The current code holds the _lock_for(self.connection_id)
across the entire HTTP call and parsing, blocking other requests for up to the
HTTP timeout; change the structure so the httpx.AsyncClient POST,
r.raise_for_status(), reading r.content, r.headers.get("X-Refreshed-OAuth"), and
r.headers.get("X-Connector-Document") all happen outside the lock, then acquire
async with _lock_for(self.connection_id): only to call await
self._persist_refreshed(refreshed_b64) when refreshed_b64 is present; keep the
RuntimeError for missing meta header and json.loads(base64.b64decode(meta_b64))
outside the lock as well so only _persist_refreshed is serialized for the given
connection_id.
- Around line 219-220: The synchronous wrapper get_auth_url currently uses
asyncio.get_event_loop().run_until_complete which is deprecated; replace it with
asyncio.run(self._get_auth_url()) to avoid DeprecationWarning/RuntimeError on
modern Python, or alternatively convert get_auth_url into async def
get_auth_url(self) and return await self._get_auth_url() if you can change
external callers; locate the method named get_auth_url and the coroutine
_get_auth_url to apply one of these two fixes.
In `@src/connectors/external/shim.py`:
- Around line 19-25: Replace the direct stdlib logging and os.environ access in
shim.py: remove the import logging and any use of os.environ and instead obtain
a logger via get_logger from utils.logging_config (use get_logger(__name__)) and
read configuration values from the centralized settings module (config.settings)
rather than os.environ; update the module-level logger variable and any env
reads (references around the current logger variable and the code at line ~50)
to use get_logger and settings respectively so the file follows project-wide
logging and config conventions.
- Around line 76-83: The shim's async method list_files accepts **kwargs but
doesn't forward them to the backend, which can drop pagination/filter options;
update the call in list_files to pass through the received **kwargs to
self._backend.list_files (i.e., call
self._backend.list_files(page_token=page_token, max_files=max_files, **kwargs))
so all connector options are preserved.
---
Outside diff comments:
In `@src/connectors/connection_manager.py`:
- Around line 1-20: The import block in connection_manager.py must be reordered
into standard library imports first, third-party next (e.g., aiofiles), then
local package imports, and the logger variable should be initialized after the
get_logger import; specifically, group and sort imports (json, os, uuid,
datetime, pathlib, typing/dataclasses) then aiofiles and any other third-party,
then from .base and the connector modules, and finally import get_logger from
utils.logging_config and call logger = get_logger(__name__) after that import;
ensure no logger initialization appears before the imports so Ruff I001 is
resolved.
---
Nitpick comments:
In `@src/connectors/external/service_backend.py`:
- Around line 158-166: In handle_webhook, remove the redundant base64
encode/decode cycle: instead of creating body_b64 and immediately decoding it,
pass the payload bytes directly to process_webhook_full as body_bytes (e.g.
json.dumps(payload).encode()), keeping the same headers and query_params; update
references to body_b64 to use the direct bytes and remove the unused variable so
handle_webhook and its call to process_webhook_full are simpler and do not
perform unnecessary encoding/decoding.
- Around line 73-85: The code performs direct os.environ lookups using
upstream_cls.CLIENT_ID_ENV_VAR and CLIENT_SECRET_ENV_VAR when setting
self.client_id and self.client_secret; move these environment accesses into the
centralized config/settings layer by adding a helper like
settings.get_env_var(name) (or similar) that accepts the env var name and
returns its value, then replace the direct os.environ.get(...) calls in the
block that uses get_connector_cls(connector_type)/upstream_cls and assigns
client_id/client_secret to call that settings helper instead; ensure you still
fall back to config.get("client_id") / config.get("client_secret") before
calling the settings helper.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 19e54f1c-f620-4cb4-9c9a-411c77dbc8bf
📒 Files selected for processing (7)
pyproject.tomlsrc/connectors/connection_manager.pysrc/connectors/external/__init__.pysrc/connectors/external/encrypted_token_store.pysrc/connectors/external/library_backend.pysrc/connectors/external/service_backend.pysrc/connectors/external/shim.py
| def _external_connector_types() -> Dict[str, type]: | ||
| """Connector types discovered via the openrag_connectors registry. | ||
|
|
||
| Returns a mapping of connector_type -> upstream class. Empty if the | ||
| package is not installed (allows OpenRAG to run with only built-ins). | ||
| """ | ||
| try: | ||
| from openrag_connectors import get as get_connector_cls | ||
| from openrag_connectors import known_types | ||
| except ImportError: | ||
| return {} | ||
| return {t: get_connector_cls(t) for t in known_types()} | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n src/connectors/connection_manager.py | head -50Repository: langflow-ai/openrag
Length of output: 1994
🏁 Script executed:
cat -n src/connectors/connection_manager.py | sed -n '495,515p'Repository: langflow-ai/openrag
Length of output: 1240
🏁 Script executed:
cat -n src/connectors/connection_manager.py | sed -n '560,570p'Repository: langflow-ai/openrag
Length of output: 801
Harden external connector discovery against per-plugin registry failures.
If get_connector_cls(t) raises for one registered type, _external_connector_types() fails entirely, breaking both availability listing (lines 500–509) and external connector instantiation (lines 563–564).
Wrap individual connector lookups in try-except and log warnings for failures, allowing partial discovery:
Suggested patch
def _external_connector_types() -> Dict[str, type]:
"""Connector types discovered via the openrag_connectors registry.
Returns a mapping of connector_type -> upstream class. Empty if the
package is not installed (allows OpenRAG to run with only built-ins).
"""
try:
from openrag_connectors import get as get_connector_cls
from openrag_connectors import known_types
except ImportError:
return {}
- return {t: get_connector_cls(t) for t in known_types()}
+ discovered: Dict[str, type] = {}
+ for t in known_types():
+ try:
+ discovered[t] = get_connector_cls(t)
+ except Exception as e:
+ logger.warning("Skipping external connector type %s: %s", t, e)
+ return discovered🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/connection_manager.py` around lines 22 - 34, The
_external_connector_types() function currently maps known_types() to
get_connector_cls(t) without handling per-plugin failures, so any exception in
get_connector_cls will abort the whole discovery; modify
_external_connector_types() to iterate over known_types(), call
get_connector_cls(t) inside a try-except for each t, on exception log a warning
(including the type name and exception) and skip that entry, and only include
successfully retrieved classes in the returned dict; keep existing ImportError
handling for the package import and use the get_connector_cls and known_types
symbols as the targets for the change.
| try: | ||
| import openrag_connectors # noqa: F401 | ||
|
|
||
| return | ||
| except ImportError: | ||
| pass |
There was a problem hiding this comment.
Narrow import fallback to only “package not found” cases.
Catching broad ImportError here can hide real import failures inside openrag_connectors (e.g., missing transitive dependency) and silently switch to fallback path logic.
Proposed fix
def _ensure_package_importable() -> None:
try:
import openrag_connectors # noqa: F401
return
- except ImportError:
- pass
+ except ModuleNotFoundError as exc:
+ if exc.name != "openrag_connectors":
+ raise🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/__init__.py` around lines 21 - 26, The try/except
around import openrag_connectors is too broad and can hide real import errors;
modify the exception handling to only swallow missing-package errors by catching
ModuleNotFoundError (or catching ImportError as e and re-raising unless e.name
== "openrag_connectors"), so that genuine import failures inside
openrag_connectors bubble up while still falling back when the package itself is
not installed; update the block surrounding the import_openrag_connectors import
statement in __init__.py accordingly.
| candidate = os.environ.get("OPENRAG_CONNECTORS_PATH") | ||
| if not candidate: |
There was a problem hiding this comment.
Move connector-path config lookup into centralized settings.
Line 28 reads os.environ directly, which bypasses the project’s config boundary. Pull this value from config/settings.py and consume that setting here instead.
As per coding guidelines, Config values must come from config/settings.py (the only place os.environ is read); never access os.environ elsewhere in the codebase.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/__init__.py` around lines 28 - 29, The code reads
OPENRAG_CONNECTORS_PATH directly from os.environ (variable candidate) which
violates the config boundary; change the lookup to import and use the
centralized setting from config.settings (e.g. import config.settings and read
the connectors path constant there) and replace all uses of candidate =
os.environ.get("OPENRAG_CONNECTORS_PATH") with the setting (preserving the
existing fallback/if-not-candidate behavior). Update any related variable names
if needed (candidate → connectors_path) and ensure no direct os.environ access
remains in src/connectors/external/__init__.py.
| if os.path.exists(self.file_path): | ||
| os.remove(self.file_path) |
There was a problem hiding this comment.
Make delete() race-safe and idempotent.
Line 31 + Line 32 has a TOCTOU window (exists then remove) that can still raise FileNotFoundError under concurrent deletion.
Proposed fix
+from contextlib import suppress
+
async def delete(self) -> None:
- if os.path.exists(self.file_path):
- os.remove(self.file_path)
+ with suppress(FileNotFoundError):
+ os.remove(self.file_path)As per coding guidelines, Verify credentials are never logged, errors surface cleanly, and pagination is handled correctly.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/encrypted_token_store.py` around lines 31 - 32, The
delete() implementation has a TOCTOU race between os.path.exists and os.remove
and can raise FileNotFoundError under concurrency; replace the
exists-then-remove pattern in the EncryptedTokenStore.delete (or method named
delete) with a single remove call wrapped to swallow only FileNotFoundError and
re-raise other exceptions (e.g., try: os.remove(self.file_path) except
FileNotFoundError: pass) so deletion is idempotent and race-safe, avoid logging
any credential data, and let other errors surface cleanly.
| import logging | ||
| import os |
There was a problem hiding this comment.
Use get_logger from utils.logging_config instead of stdlib logging.
The coding guidelines require using get_logger from utils.logging_config rather than importing the stdlib logging module directly.
Proposed fix
-import logging
import os
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import httpx
from ..base import ConnectorDocument, DocumentACL
from .encrypted_token_store import EncryptedFileTokenStore
+from utils.logging_config import get_logger
-logger = logging.getLogger(__name__)
+logger = get_logger(__name__)As per coding guidelines: "src/**/*.py: Use get_logger from utils.logging_config — never import stdlib logging directly."
Also applies to: 26-26
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/service_backend.py` around lines 15 - 16, Replace the
direct stdlib import of logging with get_logger from utils.logging_config:
remove "import logging", add "from utils.logging_config import get_logger" and
create a module logger (e.g., logger = get_logger(__name__)); then update any
calls that use the stdlib logging module (e.g., logging.info/debug/error) to use
the module-level logger (logger.info/debug/error) so the file-level logging uses
the centralized get_logger behavior—look for occurrences in service_backend.py
and symbols like logger or get_logger to apply the change.
| self.service_url = os.environ.get("OPENRAG_CONNECTORS_URL") | ||
| self.bearer = os.environ.get("OPENRAG_CONNECTORS_TOKEN") | ||
| if not self.service_url: | ||
| raise RuntimeError( | ||
| "Service mode requires OPENRAG_CONNECTORS_URL (e.g. http://connectors-service:8000)" | ||
| ) | ||
| if not self.bearer: | ||
| raise RuntimeError( | ||
| "Service mode requires OPENRAG_CONNECTORS_TOKEN — shared bearer secret" | ||
| ) |
There was a problem hiding this comment.
Access config values through config/settings.py, not os.environ directly.
The coding guidelines prohibit direct os.environ access outside of config/settings.py. Move OPENRAG_CONNECTORS_URL and OPENRAG_CONNECTORS_TOKEN to the settings module and import them from there.
Suggested approach
In config/settings.py, add:
OPENRAG_CONNECTORS_URL = os.environ.get("OPENRAG_CONNECTORS_URL")
OPENRAG_CONNECTORS_TOKEN = os.environ.get("OPENRAG_CONNECTORS_TOKEN")Then in this file:
-import os
...
+from config.settings import OPENRAG_CONNECTORS_URL, OPENRAG_CONNECTORS_TOKEN
- self.service_url = os.environ.get("OPENRAG_CONNECTORS_URL")
- self.bearer = os.environ.get("OPENRAG_CONNECTORS_TOKEN")
+ self.service_url = OPENRAG_CONNECTORS_URL
+ self.bearer = OPENRAG_CONNECTORS_TOKENAs per coding guidelines: "src/**/*.py: Config values must come from config/settings.py (the only place os.environ is read); never access os.environ elsewhere in the codebase."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/service_backend.py` around lines 47 - 56, The code
reads OPENRAG_CONNECTORS_URL and OPENRAG_CONNECTORS_TOKEN directly from
os.environ to set self.service_url and self.bearer; instead import
OPENRAG_CONNECTORS_URL and OPENRAG_CONNECTORS_TOKEN from config.settings and use
those values to assign self.service_url and self.bearer (keep the same
RuntimeError checks/messages if missing) so no os.environ access remains in
service_backend.py; update the top-level imports to pull from config.settings
and remove any direct os.environ usage.
| async with _lock_for(self.connection_id): | ||
| async with httpx.AsyncClient(timeout=300) as http: | ||
| r = await http.post( | ||
| url, json=body, headers={"Authorization": f"Bearer {self.bearer}"} | ||
| ) | ||
| r.raise_for_status() | ||
| content = r.content | ||
|
|
||
| refreshed_b64 = r.headers.get("X-Refreshed-OAuth") | ||
| if refreshed_b64: | ||
| await self._persist_refreshed(refreshed_b64) | ||
|
|
||
| meta_b64 = r.headers.get("X-Connector-Document") | ||
| if not meta_b64: | ||
| raise RuntimeError("Service response missing X-Connector-Document header") | ||
| meta = json.loads(base64.b64decode(meta_b64)) |
There was a problem hiding this comment.
Lock held too long—blocks concurrent requests for up to 300s.
The lock guards the entire HTTP round-trip (300s timeout) plus response parsing, but only _persist_refreshed needs serialization. Holding it this long blocks all other calls for the same connection_id. Narrow the critical section to just the persist call.
Suggested restructure
async def get_file_content(self, file_id: str) -> ConnectorDocument:
body = await self._build_request_body({"file_id": file_id})
url = f"{self.service_url}/v1/{self.connector_type}/get_file_content"
- async with _lock_for(self.connection_id):
- async with httpx.AsyncClient(timeout=300) as http:
- r = await http.post(
- url, json=body, headers={"Authorization": f"Bearer {self.bearer}"}
- )
- r.raise_for_status()
- content = r.content
-
- refreshed_b64 = r.headers.get("X-Refreshed-OAuth")
- if refreshed_b64:
- await self._persist_refreshed(refreshed_b64)
+ async with httpx.AsyncClient(timeout=300) as http:
+ r = await http.post(
+ url, json=body, headers={"Authorization": f"Bearer {self.bearer}"}
+ )
+ r.raise_for_status()
+ content = r.content
+
+ refreshed_b64 = r.headers.get("X-Refreshed-OAuth")
+ if refreshed_b64:
+ async with _lock_for(self.connection_id):
+ await self._persist_refreshed(refreshed_b64)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async with _lock_for(self.connection_id): | |
| async with httpx.AsyncClient(timeout=300) as http: | |
| r = await http.post( | |
| url, json=body, headers={"Authorization": f"Bearer {self.bearer}"} | |
| ) | |
| r.raise_for_status() | |
| content = r.content | |
| refreshed_b64 = r.headers.get("X-Refreshed-OAuth") | |
| if refreshed_b64: | |
| await self._persist_refreshed(refreshed_b64) | |
| meta_b64 = r.headers.get("X-Connector-Document") | |
| if not meta_b64: | |
| raise RuntimeError("Service response missing X-Connector-Document header") | |
| meta = json.loads(base64.b64decode(meta_b64)) | |
| async with httpx.AsyncClient(timeout=300) as http: | |
| r = await http.post( | |
| url, json=body, headers={"Authorization": f"Bearer {self.bearer}"} | |
| ) | |
| r.raise_for_status() | |
| content = r.content | |
| refreshed_b64 = r.headers.get("X-Refreshed-OAuth") | |
| if refreshed_b64: | |
| async with _lock_for(self.connection_id): | |
| await self._persist_refreshed(refreshed_b64) | |
| meta_b64 = r.headers.get("X-Connector-Document") | |
| if not meta_b64: | |
| raise RuntimeError("Service response missing X-Connector-Document header") | |
| meta = json.loads(base64.b64decode(meta_b64)) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/service_backend.py` around lines 111 - 126, The
current code holds the _lock_for(self.connection_id) across the entire HTTP call
and parsing, blocking other requests for up to the HTTP timeout; change the
structure so the httpx.AsyncClient POST, r.raise_for_status(), reading
r.content, r.headers.get("X-Refreshed-OAuth"), and
r.headers.get("X-Connector-Document") all happen outside the lock, then acquire
async with _lock_for(self.connection_id): only to call await
self._persist_refreshed(refreshed_b64) when refreshed_b64 is present; keep the
RuntimeError for missing meta header and json.loads(base64.b64decode(meta_b64))
outside the lock as well so only _persist_refreshed is serialized for the given
connection_id.
| def get_auth_url(self) -> str: | ||
| return asyncio.get_event_loop().run_until_complete(self._get_auth_url()) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Python asyncio get_event_loop deprecation 3.10+
💡 Result:
The function asyncio.get_event_loop() has undergone significant changes since Python 3.10, moving toward deprecation and the removal of implicit event loop creation [1][2][3]. Current Status and Evolution: - Python 3.10–3.11: asyncio.get_event_loop() began emitting a DeprecationWarning in cases where no running event loop existed [1][4]. - Python 3.14: The behavior of get_event_loop() was changed to raise a RuntimeError if no current event loop is set [5][6]. - Broader Context: The entire asyncio policy system (including get_event_loop_policy()) is deprecated as of Python 3.14 and is scheduled for removal in Python 3.16 [5]. Recommended Alternatives: - General Use: For most applications, developers should use asyncio.run(), which manages the lifecycle of the event loop automatically (creating and closing it) [1][6]. - Inside Coroutines/Callbacks: Use asyncio.get_running_loop(), which is the preferred, stable, and non-deprecated way to access the currently running event loop [1][4][6]. - Managing Loop Creation: If you must manually create a loop (e.g., for library-level tasks or specific integration needs), use asyncio.new_event_loop() [7]. While there has been significant discussion in the Python community regarding the removal of implicit loop creation—specifically its impact on "batteries-included" convenience—the official guidance remains to standardize on asyncio.run() and asyncio.get_running_loop() [2][3][8]. Note that asyncio.set_event_loop() has been retained as a necessary mechanism for specific compatibility constraints (such as those required by frameworks like Tornado) [8].
Citations:
- 1: https://docs.python.org/3.10/library/asyncio-eventloop.html
- 2: Removal of implicit event loop creation in
asyncio.get_event_loop()breaks batteries-included promise python/cpython#149160 - 3: Finish deprecation in asyncio.get_event_loop() python/cpython#93453
- 4: https://docs.python.org/3.11/library/asyncio-eventloop.html
- 5: https://docs.python.org/3/library/asyncio-policy.html
- 6: https://docs.python.org/3/library/asyncio-eventloop.html
- 7: https://stackoverflow.com/questions/78577538/replacement-for-deprecated-asyncio-get-event-loop
- 8: asyncio: Re-reverse deprecation of set_event_loop? python/cpython#130322
🏁 Script executed:
# First, check Python version requirements
find . -maxdepth 2 -type f \( -name "setup.py" -o -name "pyproject.toml" -o -name ".python-version" -o -name "setup.cfg" \) | head -20Repository: langflow-ai/openrag
Length of output: 98
🏁 Script executed:
# Examine the file at lines 219-220 and surrounding context
head -250 src/connectors/external/service_backend.py | tail -50Repository: langflow-ai/openrag
Length of output: 2339
🏁 Script executed:
# Search for the get_auth_url function definition and usage
rg -n "get_auth_url" src/connectors/external/service_backend.py -A 3 -B 3Repository: langflow-ai/openrag
Length of output: 591
🏁 Script executed:
# Search for calls to get_auth_url in the codebase
rg -n "\.get_auth_url\(\)" --type pyRepository: langflow-ai/openrag
Length of output: 235
🏁 Script executed:
# Check if there's an async context (check for other async functions in this file)
rg -n "async def" src/connectors/external/service_backend.pyRepository: langflow-ai/openrag
Length of output: 1004
🏁 Script executed:
cat .python-versionRepository: langflow-ai/openrag
Length of output: 68
🏁 Script executed:
# Check Python version requirement in pyproject.toml
head -50 pyproject.toml | grep -A 10 "python"Repository: langflow-ai/openrag
Length of output: 1143
🏁 Script executed:
# Examine the shim.py caller context
head -120 src/connectors/external/shim.py | tail -30Repository: langflow-ai/openrag
Length of output: 1282
🏁 Script executed:
# Examine library_backend.py caller context
head -80 src/connectors/external/library_backend.py | tail -20Repository: langflow-ai/openrag
Length of output: 856
Fix deprecated asyncio.get_event_loop() pattern before Python 3.14 support.
asyncio.get_event_loop().run_until_complete() is deprecated in Python 3.10+ and will raise RuntimeError in Python 3.14+ when no event loop is set. The project requires Python 3.13+, so this is already emitting DeprecationWarning. Replace with asyncio.run() or convert get_auth_url() to async to align with current Python standards.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/service_backend.py` around lines 219 - 220, The
synchronous wrapper get_auth_url currently uses
asyncio.get_event_loop().run_until_complete which is deprecated; replace it with
asyncio.run(self._get_auth_url()) to avoid DeprecationWarning/RuntimeError on
modern Python, or alternatively convert get_auth_url into async def
get_auth_url(self) and return await self._get_auth_url() if you can change
external callers; locate the method named get_auth_url and the coroutine
_get_auth_url to apply one of these two fixes.
| import logging | ||
| import os | ||
| from typing import Any, Dict, List, Optional | ||
|
|
||
| from ..base import BaseConnector, ConnectorDocument | ||
|
|
||
| logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
Replace direct logging/os.environ usage with project logging and settings access.
Line 19/25 and Line 50 introduce new direct stdlib logging and env reads, which violate project-wide config/logging rules.
Suggested patch
-import logging
-import os
+from config.settings import settings
+from utils.logging_config import get_logger
from typing import Any, Dict, List, Optional
@@
-logger = logging.getLogger(__name__)
+logger = get_logger(__name__)
@@
- mode = os.environ.get("OPENRAG_EXTERNAL_CONNECTORS_MODE", "library").lower()
+ mode = settings.OPENRAG_EXTERNAL_CONNECTORS_MODE.lower()As per coding guidelines, src/**/*.py: "Use get_logger from utils.logging_config — never import stdlib logging directly. Config values must come from config/settings.py (the only place os.environ is read); never access os.environ elsewhere in the codebase."
Also applies to: 50-50
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/shim.py` around lines 19 - 25, Replace the direct
stdlib logging and os.environ access in shim.py: remove the import logging and
any use of os.environ and instead obtain a logger via get_logger from
utils.logging_config (use get_logger(__name__)) and read configuration values
from the centralized settings module (config.settings) rather than os.environ;
update the module-level logger variable and any env reads (references around the
current logger variable and the code at line ~50) to use get_logger and settings
respectively so the file follows project-wide logging and config conventions.
| async def list_files( | ||
| self, | ||
| page_token: Optional[str] = None, | ||
| max_files: Optional[int] = None, | ||
| **kwargs, | ||
| ) -> Dict[str, Any]: | ||
| return await self._backend.list_files(page_token=page_token, max_files=max_files) | ||
|
|
There was a problem hiding this comment.
Forward **kwargs to backend list_files to preserve connector options.
Line 80 accepts **kwargs, but Line 82 drops them. This can break external connector pagination/filter arguments silently.
Suggested patch
- return await self._backend.list_files(page_token=page_token, max_files=max_files)
+ return await self._backend.list_files(
+ page_token=page_token,
+ max_files=max_files,
+ **kwargs,
+ )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/external/shim.py` around lines 76 - 83, The shim's async
method list_files accepts **kwargs but doesn't forward them to the backend,
which can drop pagination/filter options; update the call in list_files to pass
through the received **kwargs to self._backend.list_files (i.e., call
self._backend.list_files(page_token=page_token, max_files=max_files, **kwargs))
so all connector options are preserved.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/connectors/external/service_backend.py (1)
73-85:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMove connector credential env lookups behind
config.settings.
self.client_idandself.client_secretstill callos.environ.get(...)directly based on the upstream connector's env var names. That bypasses the repo's config boundary the same way as the service URL/token reads do; please expose this lookup viaconfig/settings.py(or a helper there) and read it from here instead.As per coding guidelines,
src/**/*.py:Config values must come from config/settings.py (the only place os.environ is read); never access os.environ elsewhere in the codebase.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/connectors/external/service_backend.py` around lines 73 - 85, The code in ServiceBackend that sets self.client_id and self.client_secret uses os.environ.get(...) via upstream_cls.CLIENT_ID_ENV_VAR / CLIENT_SECRET_ENV_VAR; move those environment lookups into the repository config boundary by adding or using a helper in config/settings.py (e.g., a getter like get_setting_from_env or lookup_env_var) and replace the direct os.environ access in the ServiceBackend initialization so it reads the values from config/settings.py (use the same upstream_cls.CLIENT_ID_ENV_VAR and CLIENT_SECRET_ENV_VAR identifiers to pass into the settings helper and fall back to config.get("client_id")/config.get("client_secret") as before).src/connectors/connection_manager.py (2)
565-566:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winPass
connection_idinto external connector construction.
ExternalConnectoronly receivesconfig.confighere. Both external backends derive their token-store path and per-connection lock fromconfig["connection_id"]; without it they fall back to"default", so separate external connections can overwrite each other's OAuth state and share the same token file.Suggested patch
elif config.connector_type in _external_connector_types(): - return ExternalConnector(config.connector_type, config.config) + external_config = { + **config.config, + "connection_id": config.connection_id, + "user_id": config.user_id, + } + return ExternalConnector(config.connector_type, external_config)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/connectors/connection_manager.py` around lines 565 - 566, The ExternalConnector is being constructed with only (config.connector_type, config.config) so external backends lack the per-connection identifier; change the call in connection_manager (the branch with _external_connector_types()) to pass the connection_id as well (e.g. ExternalConnector(config.connector_type, config.config, config.config["connection_id"] or config.config.get("connection_id"))), and if needed update the ExternalConnector constructor signature and all callers to accept and propagate that connection_id so token-store paths and locks use the per-connection id instead of defaulting to "default".
495-499:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse the S3 availability check for
aws_s3.Lines 495-499 currently gate the S3 connector on
IBM_AUTH_ENABLED, so S3 will be marked unavailable even when AWS credentials or a saved S3 connection exist. This should go through_is_connector_available("aws_s3", user_id)instead of the IBM flag.Suggested patch
"aws_s3": { "name": S3Connector.CONNECTOR_NAME, "description": S3Connector.CONNECTOR_DESCRIPTION, "icon": S3Connector.CONNECTOR_ICON, - "available": os.environ.get("IBM_AUTH_ENABLED", "").lower() in ("1", "true", "yes"), + "available": self._is_connector_available("aws_s3", user_id), },As per coding guidelines,
src/**/*.py:Config values must come from config/settings.py (the only place os.environ is read); never access os.environ elsewhere in the codebase.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/connectors/connection_manager.py` around lines 495 - 499, The connector entry incorrectly checks os.environ.get("IBM_AUTH_ENABLED") to set the "available" flag for "aws_s3"; replace that logic to call the connection manager helper _is_connector_available("aws_s3", user_id) (passing the current user_id) and remove any direct os.environ access here; keep the rest of the dictionary using S3Connector.CONNECTOR_NAME, S3Connector.CONNECTOR_DESCRIPTION, and S3Connector.CONNECTOR_ICON so availability is determined by the central availability function and config is read only from settings.py/_is_connector_available.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/connectors/connection_manager.py`:
- Around line 4-21: Move the logger initialization (logger =
get_logger(__name__)) so it comes after all imports to avoid splitting the
import block (fixes E402); also update the typing imports by removing Dict and
List (keep Any and Optional only if actually used) and replace any Dict/List
annotations in this module with built-in dict and list or use PEP 604 union
syntax where applicable; key symbols to change: the logger variable assignment
and the typing import line that currently reads "from typing import Any, Dict,
List, Optional".
---
Outside diff comments:
In `@src/connectors/connection_manager.py`:
- Around line 565-566: The ExternalConnector is being constructed with only
(config.connector_type, config.config) so external backends lack the
per-connection identifier; change the call in connection_manager (the branch
with _external_connector_types()) to pass the connection_id as well (e.g.
ExternalConnector(config.connector_type, config.config,
config.config["connection_id"] or config.config.get("connection_id"))), and if
needed update the ExternalConnector constructor signature and all callers to
accept and propagate that connection_id so token-store paths and locks use the
per-connection id instead of defaulting to "default".
- Around line 495-499: The connector entry incorrectly checks
os.environ.get("IBM_AUTH_ENABLED") to set the "available" flag for "aws_s3";
replace that logic to call the connection manager helper
_is_connector_available("aws_s3", user_id) (passing the current user_id) and
remove any direct os.environ access here; keep the rest of the dictionary using
S3Connector.CONNECTOR_NAME, S3Connector.CONNECTOR_DESCRIPTION, and
S3Connector.CONNECTOR_ICON so availability is determined by the central
availability function and config is read only from
settings.py/_is_connector_available.
In `@src/connectors/external/service_backend.py`:
- Around line 73-85: The code in ServiceBackend that sets self.client_id and
self.client_secret uses os.environ.get(...) via upstream_cls.CLIENT_ID_ENV_VAR /
CLIENT_SECRET_ENV_VAR; move those environment lookups into the repository config
boundary by adding or using a helper in config/settings.py (e.g., a getter like
get_setting_from_env or lookup_env_var) and replace the direct os.environ access
in the ServiceBackend initialization so it reads the values from
config/settings.py (use the same upstream_cls.CLIENT_ID_ENV_VAR and
CLIENT_SECRET_ENV_VAR identifiers to pass into the settings helper and fall back
to config.get("client_id")/config.get("client_secret") as before).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a9288335-8f8f-4ca3-b0c3-139c3b377d20
📒 Files selected for processing (5)
src/connectors/connection_manager.pysrc/connectors/external/encrypted_token_store.pysrc/connectors/external/library_backend.pysrc/connectors/external/service_backend.pysrc/connectors/external/shim.py
| from dataclasses import asdict, dataclass | ||
| from datetime import datetime | ||
| from dataclasses import dataclass, asdict | ||
| from pathlib import Path | ||
| from typing import Any, Dict, List, Optional | ||
|
|
||
| import aiofiles | ||
|
|
||
| from utils.logging_config import get_logger | ||
|
|
||
| logger = get_logger(__name__) | ||
|
|
||
| from .aws_s3 import S3Connector | ||
| from .base import BaseConnector | ||
| from .external import ExternalConnector # generic adapter for openrag_connectors types | ||
| from .google_drive import GoogleDriveConnector | ||
| from .sharepoint import SharePointConnector | ||
| from .onedrive import OneDriveConnector | ||
| from .ibm_cos import IBMCOSConnector | ||
| from .aws_s3 import S3Connector | ||
| from .onedrive import OneDriveConnector | ||
| from .sharepoint import SharePointConnector |
There was a problem hiding this comment.
Fix the import block so Ruff passes.
logger = get_logger(__name__) currently splits the module imports, so Ruff flags Lines 15-21 with E402, and Dict/List/Optional are now stale after the PEP 604 updates.
Suggested patch
from dataclasses import asdict, dataclass
from datetime import datetime
from pathlib import Path
-from typing import Any, Dict, List, Optional
+from typing import Any
import aiofiles
from utils.logging_config import get_logger
-
-logger = get_logger(__name__)
-
from .aws_s3 import S3Connector
from .base import BaseConnector
from .external import ExternalConnector # generic adapter for openrag_connectors types
from .google_drive import GoogleDriveConnector
from .ibm_cos import IBMCOSConnector
from .onedrive import OneDriveConnector
from .sharepoint import SharePointConnector
+
+logger = get_logger(__name__)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from dataclasses import asdict, dataclass | |
| from datetime import datetime | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| import aiofiles | |
| from utils.logging_config import get_logger | |
| logger = get_logger(__name__) | |
| from .aws_s3 import S3Connector | |
| from .base import BaseConnector | |
| from .external import ExternalConnector # generic adapter for openrag_connectors types | |
| from .google_drive import GoogleDriveConnector | |
| from .sharepoint import SharePointConnector | |
| from .onedrive import OneDriveConnector | |
| from .ibm_cos import IBMCOSConnector | |
| from .aws_s3 import S3Connector | |
| from .onedrive import OneDriveConnector | |
| from .sharepoint import SharePointConnector | |
| from dataclasses import asdict, dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any | |
| import aiofiles | |
| from utils.logging_config import get_logger | |
| from .aws_s3 import S3Connector | |
| from .base import BaseConnector | |
| from .external import ExternalConnector # generic adapter for openrag_connectors types | |
| from .google_drive import GoogleDriveConnector | |
| from .ibm_cos import IBMCOSConnector | |
| from .onedrive import OneDriveConnector | |
| from .sharepoint import SharePointConnector | |
| logger = get_logger(__name__) |
🧰 Tools
🪛 GitHub Actions: autofix.ci / 0_ruff autofix.txt
[warning] 7-7: Ruff UP035: typing.Dict is deprecated, use dict instead.
[warning] 7-7: Ruff UP035: typing.List is deprecated, use list instead.
[error] 15-15: Ruff E402: Module level import not at top of file.
[error] 16-16: Ruff E402: Module level import not at top of file.
[error] 17-17: Ruff E402: Module level import not at top of file.
[error] 18-18: Ruff E402: Module level import not at top of file.
[error] 19-19: Ruff E402: Module level import not at top of file.
[error] 20-20: Ruff E402: Module level import not at top of file.
[error] 21-21: Ruff E402: Module level import not at top of file.
🪛 GitHub Actions: autofix.ci / ruff autofix
[warning] 7-7: UP035 typing.Dict is deprecated, use dict instead.
[warning] 7-7: UP035 typing.List is deprecated, use list instead.
[error] 15-15: E402 Module level import not at top of file (from .aws_s3 import S3Connector).
[error] 16-16: E402 Module level import not at top of file (from .base import BaseConnector).
[error] 17-17: E402 Module level import not at top of file (from .external import ExternalConnector).
[error] 18-18: E402 Module level import not at top of file (from .google_drive import GoogleDriveConnector).
[error] 19-19: E402 Module level import not at top of file (from .ibm_cos import IBMCOSConnector).
[error] 20-20: E402 Module level import not at top of file (from .onedrive import OneDriveConnector).
[error] 21-21: E402 Module level import not at top of file (from .sharepoint import SharePointConnector).
🪛 GitHub Actions: Lint Backend / 0_Ruff and mypy on changed files.txt
[error] 7-7: Ruff check failed: UP035 typing.Dict is deprecated; use dict instead.
🪛 GitHub Actions: Lint Backend / Ruff and mypy on changed files
[error] 7-7: Ruff check failed: UP035 typing.Dict is deprecated, use dict instead.
🪛 GitHub Check: Ruff and mypy on changed files
[failure] 19-19: ruff (E402)
src/connectors/connection_manager.py:19:1: E402 Module level import not at top of file
[failure] 18-18: ruff (E402)
src/connectors/connection_manager.py:18:1: E402 Module level import not at top of file
[failure] 17-17: ruff (E402)
src/connectors/connection_manager.py:17:1: E402 Module level import not at top of file
[failure] 16-16: ruff (E402)
src/connectors/connection_manager.py:16:1: E402 Module level import not at top of file
[failure] 15-15: ruff (E402)
src/connectors/connection_manager.py:15:1: E402 Module level import not at top of file
[failure] 7-7: ruff (F401)
src/connectors/connection_manager.py:7:37: F401 typing.Optional imported but unused
help: Remove unused import
[failure] 7-7: ruff (F401)
src/connectors/connection_manager.py:7:31: F401 typing.List imported but unused
help: Remove unused import
[failure] 7-7: ruff (F401)
src/connectors/connection_manager.py:7:25: F401 typing.Dict imported but unused
help: Remove unused import
[failure] 7-7: ruff (UP035)
src/connectors/connection_manager.py:7:1: UP035 typing.List is deprecated, use list instead
[failure] 7-7: ruff (UP035)
src/connectors/connection_manager.py:7:1: UP035 typing.Dict is deprecated, use dict instead
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/connectors/connection_manager.py` around lines 4 - 21, Move the logger
initialization (logger = get_logger(__name__)) so it comes after all imports to
avoid splitting the import block (fixes E402); also update the typing imports by
removing Dict and List (keep Any and Optional only if actually used) and replace
any Dict/List annotations in this module with built-in dict and list or use PEP
604 union syntax where applicable; key symbols to change: the logger variable
assignment and the typing import line that currently reads "from typing import
Any, Dict, List, Optional".
proposal for external connectors
Summary by CodeRabbit