diff --git a/backend/.gitignore b/backend/.gitignore index 63f67bcd2..395de54ed 100644 --- a/backend/.gitignore +++ b/backend/.gitignore @@ -6,3 +6,5 @@ app.egg-info htmlcov .cache .venv +*.logs +*.log diff --git a/backend/glific_migration/__init__.py b/backend/glific_migration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/glific_migration/base_processor.py b/backend/glific_migration/base_processor.py new file mode 100644 index 000000000..07b6be14c --- /dev/null +++ b/backend/glific_migration/base_processor.py @@ -0,0 +1,90 @@ +import csv +import logging +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List, Dict + +logger = logging.getLogger(__name__) + + +class BaseCSVProcessor(ABC): + """Base class for CSV processing with common functionality.""" + + def __init__(self, input_file: str, output_file: str, headers: List[str]): + self.input_file = Path(input_file) + self.output_file = Path(output_file) + self.headers = headers + self._setup_logging() + self._init_output_csv() + + def _setup_logging(self) -> None: + """Configure logging for the processor.""" + log_file = self.output_file.parent / f"{self.__class__.__name__.lower()}.logs" + logging.basicConfig( + filename=str(log_file), + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + ) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + ) + + logging.getLogger().addHandler(console_handler) + + def _init_output_csv(self) -> None: + """Initialize CSV file with headers.""" + try: + with open(self.output_file, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=self.headers) + writer.writeheader() + except Exception as e: + logger.error(f"Error initializing output file {self.output_file}: {str(e)}") + raise + + def load_csv(self) -> List[Dict[str, str]]: + """Load CSV file into list of dictionaries.""" + try: + with open(self.input_file, newline="", encoding="utf-8") as f: + return list(csv.DictReader(f)) + except FileNotFoundError: + logger.error(f"Input file not found: {self.input_file}") + raise + except Exception as e: + logger.error(f"Error reading CSV file {self.input_file}: {str(e)}") + raise + + def append_to_csv(self, row: Dict[str, str]) -> None: + """Append a single row to the output CSV.""" + try: + with open(self.output_file, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=self.headers) + writer.writerow(row) + except Exception as e: + logger.error(f"Error appending to output file {self.output_file}: {str(e)}") + raise + + @abstractmethod + def validate_csv(self, rows: List[Dict[str, str]]) -> bool: + """Validate CSV data before processing.""" + pass + + @abstractmethod + def process_rows(self, rows: List[Dict[str, str]]) -> None: + """Process CSV rows and write results incrementally.""" + pass + + def run(self) -> None: + """Execute the complete processing pipeline.""" + logger.info(f"Starting {self.__class__.__name__}...") + try: + rows = self.load_csv() + if not self.validate_csv(rows): + logger.error("Validation failed. Aborting processing.") + return + self.process_rows(rows) + logger.info(f"{self.__class__.__name__} completed successfully.") + except Exception as e: + logger.error(f"Processing failed: {str(e)}", exc_info=True) + raise diff --git a/backend/glific_migration/client.py b/backend/glific_migration/client.py new file mode 100644 index 000000000..8644ecc04 --- /dev/null +++ b/backend/glific_migration/client.py @@ -0,0 +1,50 @@ +import logging +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from typing import Tuple, Dict, Optional + +logger = logging.getLogger(__name__) + + +class APIClient: + """Client for making API requests with retry and error handling.""" + + def __init__(self, api_key: str): + self.headers = { + "accept": "application/json", + "Content-Type": "application/json", + "X-API-KEY": api_key, + } + self.session = requests.Session() + retries = Retry( + total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504] + ) + adapter = HTTPAdapter(max_retries=retries) + self.session.mount("https://", adapter) + self.session.mount("http://", adapter) + + def post(self, url: str, data: Optional[Dict] = None) -> Tuple[bool, Dict]: + """Make a POST request to the specified URL.""" + try: + response = self.session.post( + url, headers=self.headers, json=data, timeout=10 + ) + response.raise_for_status() + return True, response.json() + except requests.exceptions.HTTPError as http_err: + try: + error_detail = response.json().get("error", "No error detail provided.") + except Exception: + error_detail = "Unable to parse error response." + logger.error( + "HTTP error while posting to %s: %s | Response error: %s", + url, + str(http_err), + error_detail, + exc_info=True, + ) + return False, {"error": error_detail} + except requests.exceptions.RequestException as e: + logger.error("Request to %s failed: %s", url, str(e), exc_info=True) + return False, {"error": str(e)} diff --git a/backend/glific_migration/config.json b/backend/glific_migration/config.json new file mode 100644 index 000000000..c5f702dc7 --- /dev/null +++ b/backend/glific_migration/config.json @@ -0,0 +1,17 @@ +{ + "base_url": "http://localhost:8000/api/v1", + "api_key": "SuperUserApiKey", + "openai_key": "openai_api_key_example", + "assistant_ingest": { + "input_csv": "sample_input.csv", + "output_csv": "assistants_output.csv" + }, + "organization_onboarding": { + "input_csv": "sample_input.csv", + "output_csv": "orgs_output.csv" + }, + "sync_credentials": { + "input_csv": "sample_input.csv", + "output_csv": "credentials_output.csv" + } +} diff --git a/backend/glific_migration/organization_onboarding/processor.py b/backend/glific_migration/organization_onboarding/processor.py new file mode 100644 index 000000000..7d97a2af0 --- /dev/null +++ b/backend/glific_migration/organization_onboarding/processor.py @@ -0,0 +1,93 @@ +import logging +from typing import List, Dict, Set +from glific_migration.base_processor import BaseCSVProcessor +from glific_migration.client import APIClient +from glific_migration.validator import ( + validate_required_fields, + validate_email_format, + validate_password, +) + +logger = logging.getLogger(__name__) + + +class OnboardProcessor(BaseCSVProcessor): + """Processor for handling organization onboarding.""" + + HEADERS = [ + "organization_name", + "organization_id", + "project_name", + "project_id", + "user_name", + "user_id", + "api_key", + "success", + "response_from_endpoint", + ] + REQUIRED_FIELDS = { + "organization_name", + "project_name", + "email", + "password", + "user_name", + } + + def __init__(self, input_file: str, output_file: str, api_url: str, api_key: str): + super().__init__(input_file, output_file, self.HEADERS) + self.client = APIClient(api_key) + self.api_url = api_url + + def validate_csv(self, rows: List[Dict[str, str]]) -> bool: + """Validate CSV data for organization onboarding.""" + seen_projects = set() + validation_errors = [] + + for idx, row in enumerate(rows, start=1): + row_errors = [] + + missing = validate_required_fields(row, self.REQUIRED_FIELDS) + if missing: + row_errors.append(f"Missing fields: {', '.join(missing)}") + + project_name = row.get("project_name", "") + if project_name in seen_projects: + row_errors.append(f"Duplicate project name '{project_name}'") + else: + seen_projects.add(project_name) + + ok, msg = validate_email_format(row.get("email", "")) + if not ok: + row_errors.append(f"Invalid email: {msg}") + + if not validate_password(row.get("password", "")): + row_errors.append("Password must be at least 8 characters") + + if row_errors: + validation_errors.extend(f"Row {idx}: {error}" for error in row_errors) + + if validation_errors: + logger.error("CSV validation failed with the following issues:") + for error in validation_errors: + logger.error(" - %s", error) + return False + + logger.info("CSV validation passed.") + return True + + def process_rows(self, rows: List[Dict[str, str]]) -> None: + """Process rows for organization onboarding and write to CSV after each request.""" + for idx, row in enumerate(rows, start=1): + logger.info( + f"Sending API request for row {idx} (project: {row.get('project_name', '')})..." + ) + success, resp = self.client.post(self.api_url, data=row) + logger.info(f"Row {idx} processed. Success: {success}") + + row_result = { + **row, + "success": "yes" if success else "no", + "response_from_endpoint": str(resp), + } + row_result.update(resp if success else {}) + self.append_to_csv({k: row_result.get(k, "") for k in self.HEADERS}) diff --git a/backend/glific_migration/organization_onboarding/run_onboarding.py b/backend/glific_migration/organization_onboarding/run_onboarding.py new file mode 100644 index 000000000..943018c58 --- /dev/null +++ b/backend/glific_migration/organization_onboarding/run_onboarding.py @@ -0,0 +1,27 @@ +from pathlib import Path +import json +from glific_migration.organization_onboarding.processor import OnboardProcessor + +base_dir = Path(__file__).parent.resolve() + + +def main(): + with open(base_dir / "../config.json", "r") as file: + config = json.load(file) + + input_file = base_dir / config["organization_onboarding"]["input_csv"] + output_file = base_dir / config["organization_onboarding"]["output_csv"] + api_url = config["base_url"] + "/onboard" + api_key = config["api_key"] + + processor = OnboardProcessor( + input_file=str(input_file), + output_file=str(output_file), + api_url=api_url, + api_key=api_key, + ) + processor.run() + + +if __name__ == "__main__": + main() diff --git a/backend/glific_migration/organization_onboarding/sample_input.csv b/backend/glific_migration/organization_onboarding/sample_input.csv new file mode 100644 index 000000000..a46c5339c --- /dev/null +++ b/backend/glific_migration/organization_onboarding/sample_input.csv @@ -0,0 +1,4 @@ +organization_name,project_name,email,password,user_name +Glific Foundation,Chatbot Project,onboard1@glific.org,TestPass123,admin1 +TestOrg,EducationBot,onboard2@test.org,TestPass456,edubot +Acme NGO,SurveyBot,onboard3@acme.org,TestPass789,survey_lead diff --git a/backend/glific_migration/sync_assistant/processor.py b/backend/glific_migration/sync_assistant/processor.py new file mode 100644 index 000000000..3f3797072 --- /dev/null +++ b/backend/glific_migration/sync_assistant/processor.py @@ -0,0 +1,82 @@ +import logging +from typing import List, Dict, Set +from glific_migration.base_processor import BaseCSVProcessor +from glific_migration.client import APIClient +from glific_migration.validator import ( + validate_required_fields, + is_valid_api_key, + is_valid_assistant_id, +) + + +logger = logging.getLogger(__name__) + + +class AssistantIngestProcessor(BaseCSVProcessor): + """Processor for handling assistant ingestion.""" + + HEADERS = ["assistant_id", "api_key", "success", "response_from_endpoint"] + REQUIRED_FIELDS = {"assistant_id", "api_key"} + + def __init__(self, input_file: str, output_file: str, base_url: str): + super().__init__(input_file, output_file, self.HEADERS) + self.base_url = base_url.rstrip("/") + + def validate_csv(self, rows: List[Dict[str, str]]) -> bool: + """Validate CSV data for assistant ingestion.""" + validation_errors = [] + + for idx, row in enumerate(rows, start=1): + row_errors = [] + + missing = validate_required_fields(row, self.REQUIRED_FIELDS) + if missing: + row_errors.append(f"Missing fields: {', '.join(missing)}") + + if not row.get("assistant_id", "").strip(): + row_errors.append("Empty assistant_id") + + if not row.get("api_key", "").strip(): + row_errors.append("Empty api_key") + + if row.get("assistant_id") and not is_valid_assistant_id( + row["assistant_id"] + ): + row_errors.append(f"Invalid assistant_id format: {row['assistant_id']}") + + if row.get("api_key") and not is_valid_api_key(row["api_key"]): + row_errors.append(f"Invalid api_key format: {row['api_key']}") + + if row_errors: + validation_errors.extend(f"Row {idx}: {err}" for err in row_errors) + + if validation_errors: + logger.error("CSV validation failed with the following issues:") + for error in validation_errors: + logger.error(" - %s", error) + return False + + logger.info("CSV validation passed.") + return True + + def process_rows(self, rows: List[Dict[str, str]]) -> None: + """Process rows for assistant ingestion and write to CSV after each request.""" + for idx, row in enumerate(rows, start=1): + assistant_id = row["assistant_id"] + api_key = row["api_key"] + + logger.info( + f"Ingesting assistant for row {idx} (assistant_id: {assistant_id})" + ) + url = f"{self.base_url}/assistant/{assistant_id}/ingest" + client = APIClient(api_key=api_key) + success, resp = client.post(url) + logger.info(f"Row {idx} processed. Success: {success}") + + result = { + "assistant_id": assistant_id, + "api_key": api_key, + "success": "yes" if success else "no", + "response_from_endpoint": str(resp), + } + self.append_to_csv(result) diff --git a/backend/glific_migration/sync_assistant/sample_input.csv b/backend/glific_migration/sync_assistant/sample_input.csv new file mode 100644 index 000000000..1de470df7 --- /dev/null +++ b/backend/glific_migration/sync_assistant/sample_input.csv @@ -0,0 +1,4 @@ +assistant_id,api_key +asst_1,ApiKey abc123 +asst_2,ApiKey def456 +asst_3,ApiKey ghi789 diff --git a/backend/glific_migration/sync_assistant/sync_assistant.py b/backend/glific_migration/sync_assistant/sync_assistant.py new file mode 100644 index 000000000..6e7e517e3 --- /dev/null +++ b/backend/glific_migration/sync_assistant/sync_assistant.py @@ -0,0 +1,25 @@ +from pathlib import Path +import json +from glific_migration.sync_assistant.processor import AssistantIngestProcessor + +base_dir = Path(__file__).parent.resolve() + + +def main(): + with open(base_dir / "../config.json", "r") as file: + config = json.load(file) + + input_csv = base_dir / config["assistant_ingest"]["input_csv"] + output_csv = base_dir / config["assistant_ingest"]["output_csv"] + api_base_url = config["base_url"] + + processor = AssistantIngestProcessor( + input_file=str(input_csv), + output_file=str(output_csv), + base_url=api_base_url, + ) + processor.run() + + +if __name__ == "__main__": + main() diff --git a/backend/glific_migration/sync_credentials/processor.py b/backend/glific_migration/sync_credentials/processor.py new file mode 100644 index 000000000..5402df14b --- /dev/null +++ b/backend/glific_migration/sync_credentials/processor.py @@ -0,0 +1,88 @@ +import logging +from typing import List, Dict, Set +from glific_migration.base_processor import BaseCSVProcessor +from glific_migration.client import APIClient +from glific_migration.validator import validate_required_fields, is_valid_api_key + +logger = logging.getLogger(__name__) + + +class CredentialProcessor(BaseCSVProcessor): + """Processor for handling credential migration.""" + + HEADERS = ["organization_id", "project_id", "success", "response_from_endpoint"] + REQUIRED_FIELDS = {"organization_id", "project_id"} + + def __init__( + self, + input_file: str, + output_file: str, + api_url: str, + api_key: str, + openai_key: str, + ): + super().__init__(input_file, output_file, self.HEADERS) + self.client = APIClient(api_key) + self.api_url = api_url + self.openai_key = openai_key + + def validate_csv(self, rows: List[Dict[str, str]]) -> bool: + """Validate CSV data for credential processing.""" + validation_errors = [] + + for idx, row in enumerate(rows, start=1): + row_errors = [] + + missing = validate_required_fields(row, self.REQUIRED_FIELDS) + if missing: + row_errors.append(f"Missing fields: {', '.join(missing)}") + + try: + int(row.get("organization_id", "")) + int(row.get("project_id", "")) + except ValueError: + row_errors.append( + f"organization_id or project_id is not an integer: org_id='{row.get('organization_id')}', proj_id='{row.get('project_id')}'" + ) + + if row_errors: + validation_errors.extend(f"Row {idx}: {err}" for err in row_errors) + + if row.get("api_key") and not is_valid_api_key(row["api_key"]): + validation_errors.append(f"Invalid api_key format: {row['api_key']}") + + if validation_errors: + logger.error("CSV validation failed with the following issues:") + for error in validation_errors: + logger.error(" - %s", error) + return False + + logger.info("CSV validation passed.") + return True + + def process_rows(self, rows: List[Dict[str, str]]) -> None: + """Process rows for credential creation and write to CSV after each request.""" + for idx, row in enumerate(rows, start=1): + org_id = int(row["organization_id"]) + proj_id = int(row["project_id"]) + + payload = { + "organization_id": org_id, + "project_id": proj_id, + "is_active": True, + "credential": {"openai": {"api_key": self.openai_key}}, + } + + logger.info( + f"Sending credential request for row {idx} (org: {org_id}, project: {proj_id})..." + ) + success, resp = self.client.post(self.api_url, data=payload) + logger.info(f"Row {idx} processed. Success: {success}") + + result = { + "organization_id": org_id, + "project_id": proj_id, + "success": "yes" if success else "no", + "response_from_endpoint": str(resp), + } + self.append_to_csv(result) diff --git a/backend/glific_migration/sync_credentials/run_credentials.py b/backend/glific_migration/sync_credentials/run_credentials.py new file mode 100644 index 000000000..846819b41 --- /dev/null +++ b/backend/glific_migration/sync_credentials/run_credentials.py @@ -0,0 +1,29 @@ +from pathlib import Path +import json +from glific_migration.sync_credentials.processor import CredentialProcessor + +base_dir = Path(__file__).parent.resolve() + + +def main(): + with open(base_dir / "../config.json", "r") as file: + config = json.load(file) + + input_csv = base_dir / config["sync_credentials"]["input_csv"] + output_csv = base_dir / config["sync_credentials"]["output_csv"] + api_url = config["base_url"] + "/credentials/" + api_key = config["api_key"] + openai_key = config["openai_key"] + + processor = CredentialProcessor( + input_file=str(input_csv), + output_file=str(output_csv), + api_url=api_url, + api_key=api_key, + openai_key=openai_key, + ) + processor.run() + + +if __name__ == "__main__": + main() diff --git a/backend/glific_migration/sync_credentials/sample_input.csv b/backend/glific_migration/sync_credentials/sample_input.csv new file mode 100644 index 000000000..4ae16e7dd --- /dev/null +++ b/backend/glific_migration/sync_credentials/sample_input.csv @@ -0,0 +1,4 @@ +organization_id,project_id +2,3 +3,4 +4,5 diff --git a/backend/glific_migration/validator.py b/backend/glific_migration/validator.py new file mode 100644 index 000000000..526c10dcf --- /dev/null +++ b/backend/glific_migration/validator.py @@ -0,0 +1,39 @@ +from email_validator import validate_email, EmailNotValidError +from typing import List, Dict, Tuple, Set +import re + + +def validate_required_fields(row: Dict[str, str], fields: Set[str]) -> List[str]: + """Validate that required fields are present and non-empty.""" + return [f for f in fields if f not in row or not row[f].strip()] + + +def validate_email_format(email: str) -> Tuple[bool, str]: + """Validate email format.""" + try: + validate_email(email, check_deliverability=False) + return True, "" + except EmailNotValidError as e: + return False, str(e) + + +def validate_password(password: str) -> bool: + """Validate password length.""" + return len(password) >= 8 + + +def is_valid_api_key(api_key: str) -> bool: + """ + Validates that the API key is in the format: + 'ApiKey <43-character base64url-like token>' + """ + pattern = r"^ApiKey [A-Za-z0-9_-]{43}$" + return bool(re.fullmatch(pattern, api_key)) + + +def is_valid_assistant_id(assistant_id: str) -> bool: + """ + Validates OpenAI assistant ID. Should start with 'asst_' followed by 15+ alphanumeric chars. + """ + pattern = r"^asst_[a-zA-Z0-9]{15,}$" + return bool(re.fullmatch(pattern, assistant_id))