Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,34 @@ QWEN_CODE_OAUTH_1=""
# Path to your iFlow credential file (e.g., ~/.iflow/oauth_creds.json).
IFLOW_OAUTH_1=""

# --- GitHub Copilot ---
# GitHub Copilot uses Device Flow OAuth authentication.
# After first-time setup, the proxy stores credentials in 'oauth_creds/' directory.
# You can also pre-configure credentials via environment variables:
#
# COPILOT_GITHUB_TOKEN - Your GitHub OAuth token (long-lived, from Device Flow)
# COPILOT_ENTERPRISE_URL - Optional: GitHub Enterprise URL (e.g., company.ghe.com)
#
# For multiple Copilot accounts, use numbered variables:
# COPILOT_1_GITHUB_TOKEN="ghp_..."
# COPILOT_2_GITHUB_TOKEN="ghp_..."
COPILOT_GITHUB_TOKEN=""
COPILOT_ENTERPRISE_URL=""

# --- Copilot X-Initiator Header Control ---
# Controls the X-Initiator header behavior (affects Copilot's response style):
# - COPILOT_FORCE_AGENT_HEADER: Always use "agent" mode (default: false)
# - COPILOT_AGENT_PERCENTAGE: For first messages, % chance of "agent" (0-100, default: 100)
# Set to 0 for always "user", 100 for always "agent", or a value in between for random.
# Based on: https://github.com/Tarquinen/dotfiles/tree/main/.config/opencode/plugin/copilot-force-agent-header
COPILOT_FORCE_AGENT_HEADER=false
COPILOT_AGENT_PERCENTAGE=100

# --- Copilot Available Models ---
# Comma-separated list of Copilot models to expose. Leave empty for defaults.
# Default models: gpt-4o, gpt-4.1, gpt-4.1-mini, claude-3.5-sonnet, claude-sonnet-4, o3-mini, o1, gemini-2.0-flash-001
COPILOT_MODELS=""


# ------------------------------------------------------------------------------
# | [ADVANCED] Provider-Specific Settings |
Expand Down
83 changes: 54 additions & 29 deletions src/rotator_library/credential_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pathlib import Path
from typing import Dict, List, Optional, Set

lib_logger = logging.getLogger('rotator_library')
lib_logger = logging.getLogger("rotator_library")

OAUTH_BASE_DIR = Path.cwd() / "oauth_creds"
OAUTH_BASE_DIR.mkdir(exist_ok=True)
Expand All @@ -16,6 +16,7 @@
"qwen_code": Path.home() / ".qwen",
"iflow": Path.home() / ".iflow",
"antigravity": Path.home() / ".antigravity",
"copilot": Path.home() / ".copilot",
# Add other providers like 'claude' here if they have a standard CLI path
}

Expand All @@ -26,45 +27,47 @@
"antigravity": "ANTIGRAVITY",
"qwen_code": "QWEN_CODE",
"iflow": "IFLOW",
"copilot": "COPILOT",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads up: There's a naming mismatch that might confuse users. The credential manager's _discover_env_oauth_credentials() looks for COPILOT_ACCESS_TOKEN and COPILOT_REFRESH_TOKEN patterns, but copilot_auth_base.py primarily uses COPILOT_GITHUB_TOKEN. This means the automatic env credential discovery won't find Copilot credentials unless users also set the _REFRESH_TOKEN variant.

Might want to either:

  1. Align the naming conventions, or
  2. Add special handling for Copilot in the discovery logic, or
  3. Document this clearly in .env.example

}


class CredentialManager:
"""
Discovers OAuth credential files from standard locations, copies them locally,
and updates the configuration to use the local paths.

Also discovers environment variable-based OAuth credentials for stateless deployments.
Supports two env var formats:

1. Single credential (legacy): PROVIDER_ACCESS_TOKEN, PROVIDER_REFRESH_TOKEN
2. Multiple credentials (numbered): PROVIDER_1_ACCESS_TOKEN, PROVIDER_2_ACCESS_TOKEN, etc.

When env-based credentials are detected, virtual paths like "env://provider/1" are created.
"""

def __init__(self, env_vars: Dict[str, str]):
self.env_vars = env_vars

def _discover_env_oauth_credentials(self) -> Dict[str, List[str]]:
"""
Discover OAuth credentials defined via environment variables.

Supports two formats:
1. Single credential: ANTIGRAVITY_ACCESS_TOKEN + ANTIGRAVITY_REFRESH_TOKEN
2. Multiple credentials: ANTIGRAVITY_1_ACCESS_TOKEN + ANTIGRAVITY_1_REFRESH_TOKEN, etc.

Returns:
Dict mapping provider name to list of virtual paths (e.g., "env://antigravity/1")
"""
env_credentials: Dict[str, Set[str]] = {}

for provider, env_prefix in ENV_OAUTH_PROVIDERS.items():
found_indices: Set[str] = set()

# Check for numbered credentials (PROVIDER_N_ACCESS_TOKEN pattern)
# Pattern: ANTIGRAVITY_1_ACCESS_TOKEN, ANTIGRAVITY_2_ACCESS_TOKEN, etc.
numbered_pattern = re.compile(rf"^{env_prefix}_(\d+)_ACCESS_TOKEN$")

for key in self.env_vars.keys():
match = numbered_pattern.match(key)
if match:
Expand All @@ -73,28 +76,34 @@ def _discover_env_oauth_credentials(self) -> Dict[str, List[str]]:
refresh_key = f"{env_prefix}_{index}_REFRESH_TOKEN"
if refresh_key in self.env_vars and self.env_vars[refresh_key]:
found_indices.add(index)

# Check for legacy single credential (PROVIDER_ACCESS_TOKEN pattern)
# Only use this if no numbered credentials exist
if not found_indices:
access_key = f"{env_prefix}_ACCESS_TOKEN"
refresh_key = f"{env_prefix}_REFRESH_TOKEN"
if (access_key in self.env_vars and self.env_vars[access_key] and
refresh_key in self.env_vars and self.env_vars[refresh_key]):
if (
access_key in self.env_vars
and self.env_vars[access_key]
and refresh_key in self.env_vars
and self.env_vars[refresh_key]
):
# Use "0" as the index for legacy single credential
found_indices.add("0")

if found_indices:
env_credentials[provider] = found_indices
lib_logger.info(f"Found {len(found_indices)} env-based credential(s) for {provider}")

lib_logger.info(
f"Found {len(found_indices)} env-based credential(s) for {provider}"
)

# Convert to virtual paths
result: Dict[str, List[str]] = {}
for provider, indices in env_credentials.items():
# Sort indices numerically for consistent ordering
sorted_indices = sorted(indices, key=lambda x: int(x))
result[provider] = [f"env://{provider}/{idx}" for idx in sorted_indices]

return result

def discover_and_prepare(self) -> Dict[str, List[str]]:
Expand All @@ -105,7 +114,9 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
# These take priority for stateless deployments
env_oauth_creds = self._discover_env_oauth_credentials()
for provider, virtual_paths in env_oauth_creds.items():
lib_logger.info(f"Using {len(virtual_paths)} env-based credential(s) for {provider}")
lib_logger.info(
f"Using {len(virtual_paths)} env-based credential(s) for {provider}"
)
final_config[provider] = virtual_paths

# Extract OAuth file paths from environment variables
Expand All @@ -115,21 +126,29 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
provider = key.split("_OAUTH_")[0].lower()
if provider not in env_oauth_paths:
env_oauth_paths[provider] = []
if value: # Only consider non-empty values
if value: # Only consider non-empty values
env_oauth_paths[provider].append(value)

# PHASE 2: Discover file-based OAuth credentials
for provider, default_dir in DEFAULT_OAUTH_DIRS.items():
# Skip if already discovered from environment variables
if provider in final_config:
lib_logger.debug(f"Skipping file discovery for {provider} - using env-based credentials")
lib_logger.debug(
f"Skipping file discovery for {provider} - using env-based credentials"
)
continue

# Check for existing local credentials first. If found, use them and skip discovery.
local_provider_creds = sorted(list(OAUTH_BASE_DIR.glob(f"{provider}_oauth_*.json")))
local_provider_creds = sorted(
list(OAUTH_BASE_DIR.glob(f"{provider}_oauth_*.json"))
)
if local_provider_creds:
lib_logger.info(f"Found {len(local_provider_creds)} existing local credential(s) for {provider}. Skipping discovery.")
final_config[provider] = [str(p.resolve()) for p in local_provider_creds]
lib_logger.info(
f"Found {len(local_provider_creds)} existing local credential(s) for {provider}. Skipping discovery."
)
final_config[provider] = [
str(p.resolve()) for p in local_provider_creds
]
continue

# If no local credentials exist, proceed with a one-time discovery and copy.
Expand All @@ -140,13 +159,13 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
path = Path(path_str).expanduser()
if path.exists():
discovered_paths.add(path)

# 2. If no overrides are provided via .env, scan the default directory
# [MODIFIED] This logic is now disabled to prefer local-first credential management.
# if not discovered_paths and default_dir.exists():
# for json_file in default_dir.glob('*.json'):
# discovered_paths.add(json_file)

if not discovered_paths:
lib_logger.debug(f"No credential files found for provider: {provider}")
continue
Expand All @@ -161,13 +180,19 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
try:
# Since we've established no local files exist, we can copy directly.
shutil.copy(source_path, local_path)
lib_logger.info(f"Copied '{source_path.name}' to local pool at '{local_path}'.")
lib_logger.info(
f"Copied '{source_path.name}' to local pool at '{local_path}'."
)
prepared_paths.append(str(local_path.resolve()))
except Exception as e:
lib_logger.error(f"Failed to process OAuth file from '{source_path}': {e}")

lib_logger.error(
f"Failed to process OAuth file from '{source_path}': {e}"
)

if prepared_paths:
lib_logger.info(f"Discovered and prepared {len(prepared_paths)} credential(s) for provider: {provider}")
lib_logger.info(
f"Discovered and prepared {len(prepared_paths)} credential(s) for provider: {provider}"
)
final_config[provider] = prepared_paths

lib_logger.info("OAuth credential discovery complete.")
Expand Down
6 changes: 5 additions & 1 deletion src/rotator_library/provider_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
from .providers.qwen_auth_base import QwenAuthBase
from .providers.iflow_auth_base import IFlowAuthBase
from .providers.antigravity_auth_base import AntigravityAuthBase
from .providers.copilot_auth_base import CopilotAuthBase

PROVIDER_MAP = {
"gemini_cli": GeminiAuthBase,
"qwen_code": QwenAuthBase,
"iflow": IFlowAuthBase,
"antigravity": AntigravityAuthBase,
"copilot": CopilotAuthBase,
}


def get_provider_auth_class(provider_name: str):
"""
Returns the authentication class for a given provider.
Expand All @@ -21,8 +24,9 @@ def get_provider_auth_class(provider_name: str):
raise ValueError(f"Unknown provider: {provider_name}")
return provider_class


def get_available_providers():
"""
Returns a list of available provider names.
"""
return list(PROVIDER_MAP.keys())
return list(PROVIDER_MAP.keys())
11 changes: 9 additions & 2 deletions src/rotator_library/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ def _register_providers():
provider_name = "nvidia_nim"
PROVIDER_PLUGINS[provider_name] = attribute
import logging
logging.getLogger('rotator_library').debug(f"Registered provider: {provider_name}")

logging.getLogger("rotator_library").debug(
f"Registered provider: {provider_name}"
)

# Then, create dynamic plugins for custom OpenAI-compatible providers
# Use environment variables directly (load_dotenv already called in main.py)
Expand All @@ -114,6 +117,7 @@ def _register_providers():
"qwen_code",
"gemini_cli",
"antigravity",
"copilot",
]:
continue

Expand All @@ -129,7 +133,10 @@ def __init__(self):
plugin_class = create_plugin_class(provider_name)
PROVIDER_PLUGINS[provider_name] = plugin_class
import logging
logging.getLogger('rotator_library').debug(f"Registered dynamic provider: {provider_name}")

logging.getLogger("rotator_library").debug(
f"Registered dynamic provider: {provider_name}"
)


# Discover and register providers when the package is imported
Expand Down
Loading
Loading