-
Notifications
You must be signed in to change notification settings - Fork 10
Migration Script to Add Glific NGO to AI Platform: Onboarding, Credential and Assistant Sync #291
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
439b1ba
4b539ab
2e2213c
a4d157b
b8bdc81
18fa5eb
87e9d09
74437bc
48a06ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,3 +6,5 @@ app.egg-info | |
| htmlcov | ||
| .cache | ||
| .venv | ||
| *.logs | ||
| *.log | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| import csv | ||
| import logging | ||
| from abc import ABC, abstractmethod | ||
| from pathlib import Path | ||
| from typing import List, Dict | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class BaseCSVProcessor(ABC): | ||
| """Base class for CSV processing with common functionality.""" | ||
|
|
||
| def __init__(self, input_file: str, output_file: str, headers: List[str]): | ||
| self.input_file = Path(input_file) | ||
| self.output_file = Path(output_file) | ||
| self.headers = headers | ||
| self._setup_logging() | ||
| self._init_output_csv() | ||
|
|
||
| def _setup_logging(self) -> None: | ||
| """Configure logging for the processor.""" | ||
| log_file = self.output_file.parent / f"{self.__class__.__name__.lower()}.logs" | ||
| logging.basicConfig( | ||
| filename=str(log_file), | ||
| level=logging.INFO, | ||
| format="%(asctime)s - %(levelname)s - %(message)s", | ||
| ) | ||
| console_handler = logging.StreamHandler() | ||
| console_handler.setLevel(logging.INFO) | ||
| console_handler.setFormatter( | ||
| logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") | ||
| ) | ||
|
|
||
| logging.getLogger().addHandler(console_handler) | ||
|
|
||
| def _init_output_csv(self) -> None: | ||
| """Initialize CSV file with headers.""" | ||
| try: | ||
| with open(self.output_file, "w", newline="", encoding="utf-8") as f: | ||
| writer = csv.DictWriter(f, fieldnames=self.headers) | ||
| writer.writeheader() | ||
| except Exception as e: | ||
| logger.error(f"Error initializing output file {self.output_file}: {str(e)}") | ||
| raise | ||
|
|
||
| def load_csv(self) -> List[Dict[str, str]]: | ||
| """Load CSV file into list of dictionaries.""" | ||
| try: | ||
| with open(self.input_file, newline="", encoding="utf-8") as f: | ||
| return list(csv.DictReader(f)) | ||
| except FileNotFoundError: | ||
| logger.error(f"Input file not found: {self.input_file}") | ||
| raise | ||
| except Exception as e: | ||
| logger.error(f"Error reading CSV file {self.input_file}: {str(e)}") | ||
| raise | ||
|
|
||
| def append_to_csv(self, row: Dict[str, str]) -> None: | ||
| """Append a single row to the output CSV.""" | ||
| try: | ||
| with open(self.output_file, "a", newline="", encoding="utf-8") as f: | ||
| writer = csv.DictWriter(f, fieldnames=self.headers) | ||
| writer.writerow(row) | ||
| except Exception as e: | ||
| logger.error(f"Error appending to output file {self.output_file}: {str(e)}") | ||
| raise | ||
|
|
||
| @abstractmethod | ||
| def validate_csv(self, rows: List[Dict[str, str]]) -> bool: | ||
| """Validate CSV data before processing.""" | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def process_rows(self, rows: List[Dict[str, str]]) -> None: | ||
| """Process CSV rows and write results incrementally.""" | ||
| pass | ||
|
|
||
| def run(self) -> None: | ||
| """Execute the complete processing pipeline.""" | ||
| logger.info(f"Starting {self.__class__.__name__}...") | ||
| try: | ||
| rows = self.load_csv() | ||
| if not self.validate_csv(rows): | ||
| logger.error("Validation failed. Aborting processing.") | ||
| return | ||
| self.process_rows(rows) | ||
| logger.info(f"{self.__class__.__name__} completed successfully.") | ||
| except Exception as e: | ||
| logger.error(f"Processing failed: {str(e)}", exc_info=True) | ||
| raise | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| import logging | ||
| import requests | ||
| from requests.adapters import HTTPAdapter | ||
| from urllib3.util.retry import Retry | ||
| from typing import Tuple, Dict, Optional | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class APIClient: | ||
| """Client for making API requests with retry and error handling.""" | ||
|
|
||
| def __init__(self, api_key: str): | ||
| self.headers = { | ||
| "accept": "application/json", | ||
| "Content-Type": "application/json", | ||
| "X-API-KEY": api_key, | ||
| } | ||
| self.session = requests.Session() | ||
| retries = Retry( | ||
| total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504] | ||
| ) | ||
| adapter = HTTPAdapter(max_retries=retries) | ||
| self.session.mount("https://", adapter) | ||
| self.session.mount("http://", adapter) | ||
|
|
||
| def post(self, url: str, data: Optional[Dict] = None) -> Tuple[bool, Dict]: | ||
| """Make a POST request to the specified URL.""" | ||
| try: | ||
| response = self.session.post( | ||
| url, headers=self.headers, json=data, timeout=10 | ||
| ) | ||
| response.raise_for_status() | ||
| return True, response.json() | ||
| except requests.exceptions.HTTPError as http_err: | ||
| try: | ||
| error_detail = response.json().get("error", "No error detail provided.") | ||
| except Exception: | ||
| error_detail = "Unable to parse error response." | ||
| logger.error( | ||
| "HTTP error while posting to %s: %s | Response error: %s", | ||
| url, | ||
| str(http_err), | ||
| error_detail, | ||
| exc_info=True, | ||
| ) | ||
| return False, {"error": error_detail} | ||
| except requests.exceptions.RequestException as e: | ||
| logger.error("Request to %s failed: %s", url, str(e), exc_info=True) | ||
| return False, {"error": str(e)} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| { | ||
| "base_url": "http://localhost:8000/api/v1", | ||
| "api_key": "SuperUserApiKey", | ||
| "openai_key": "openai_api_key_example", | ||
| "assistant_ingest": { | ||
| "input_csv": "sample_input.csv", | ||
| "output_csv": "assistants_output.csv" | ||
| }, | ||
| "organization_onboarding": { | ||
| "input_csv": "sample_input.csv", | ||
| "output_csv": "orgs_output.csv" | ||
| }, | ||
| "sync_credentials": { | ||
| "input_csv": "sample_input.csv", | ||
| "output_csv": "credentials_output.csv" | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| import logging | ||
| from typing import List, Dict, Set | ||
| from glific_migration.base_processor import BaseCSVProcessor | ||
| from glific_migration.client import APIClient | ||
| from glific_migration.validator import ( | ||
| validate_required_fields, | ||
| validate_email_format, | ||
| validate_password, | ||
| ) | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class OnboardProcessor(BaseCSVProcessor): | ||
| """Processor for handling organization onboarding.""" | ||
|
|
||
| HEADERS = [ | ||
| "organization_name", | ||
| "organization_id", | ||
| "project_name", | ||
| "project_id", | ||
| "user_name", | ||
| "user_id", | ||
| "api_key", | ||
| "success", | ||
| "response_from_endpoint", | ||
| ] | ||
| REQUIRED_FIELDS = { | ||
| "organization_name", | ||
| "project_name", | ||
| "email", | ||
| "password", | ||
| "user_name", | ||
| } | ||
|
|
||
| def __init__(self, input_file: str, output_file: str, api_url: str, api_key: str): | ||
| super().__init__(input_file, output_file, self.HEADERS) | ||
| self.client = APIClient(api_key) | ||
| self.api_url = api_url | ||
|
|
||
| def validate_csv(self, rows: List[Dict[str, str]]) -> bool: | ||
| """Validate CSV data for organization onboarding.""" | ||
| seen_projects = set() | ||
| validation_errors = [] | ||
|
|
||
| for idx, row in enumerate(rows, start=1): | ||
| row_errors = [] | ||
|
|
||
| missing = validate_required_fields(row, self.REQUIRED_FIELDS) | ||
| if missing: | ||
| row_errors.append(f"Missing fields: {', '.join(missing)}") | ||
|
|
||
| project_name = row.get("project_name", "") | ||
| if project_name in seen_projects: | ||
| row_errors.append(f"Duplicate project name '{project_name}'") | ||
| else: | ||
| seen_projects.add(project_name) | ||
|
|
||
| ok, msg = validate_email_format(row.get("email", "")) | ||
| if not ok: | ||
| row_errors.append(f"Invalid email: {msg}") | ||
|
|
||
| if not validate_password(row.get("password", "")): | ||
| row_errors.append("Password must be at least 8 characters") | ||
|
|
||
| if row_errors: | ||
| validation_errors.extend(f"Row {idx}: {error}" for error in row_errors) | ||
|
|
||
| if validation_errors: | ||
| logger.error("CSV validation failed with the following issues:") | ||
| for error in validation_errors: | ||
| logger.error(" - %s", error) | ||
| return False | ||
|
|
||
| logger.info("CSV validation passed.") | ||
| return True | ||
|
|
||
| def process_rows(self, rows: List[Dict[str, str]]) -> None: | ||
| """Process rows for organization onboarding and write to CSV after each request.""" | ||
| for idx, row in enumerate(rows, start=1): | ||
| logger.info( | ||
| f"Sending API request for row {idx} (project: {row.get('project_name', '')})..." | ||
| ) | ||
| success, resp = self.client.post(self.api_url, data=row) | ||
| logger.info(f"Row {idx} processed. Success: {success}") | ||
|
|
||
| row_result = { | ||
| **row, | ||
| "success": "yes" if success else "no", | ||
| "response_from_endpoint": str(resp), | ||
| } | ||
| row_result.update(resp if success else {}) | ||
| self.append_to_csv({k: row_result.get(k, "") for k in self.HEADERS}) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| from pathlib import Path | ||
| import json | ||
| from glific_migration.organization_onboarding.processor import OnboardProcessor | ||
|
|
||
| base_dir = Path(__file__).parent.resolve() | ||
|
|
||
|
|
||
| def main(): | ||
| with open(base_dir / "../config.json", "r") as file: | ||
| config = json.load(file) | ||
|
|
||
| input_file = base_dir / config["organization_onboarding"]["input_csv"] | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to avoid magic strings like "organization_onboarding", "input_csv". Libraries like dynaconf can help; they support key traversing like |
||
| output_file = base_dir / config["organization_onboarding"]["output_csv"] | ||
| api_url = config["base_url"] + "/onboard" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. API endpoints like "/onboard" may also be kept in a string constant somewhere in a common file. The routes in Not an urgent change, can be picked up as part of a separate PR where all such routes, other constant strings are refactored. |
||
| api_key = config["api_key"] | ||
|
avirajsingh7 marked this conversation as resolved.
|
||
|
|
||
| processor = OnboardProcessor( | ||
| input_file=str(input_file), | ||
| output_file=str(output_file), | ||
| api_url=api_url, | ||
| api_key=api_key, | ||
| ) | ||
| processor.run() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| organization_name,project_name,email,password,user_name | ||
| Glific Foundation,Chatbot Project,onboard1@glific.org,TestPass123,admin1 | ||
| TestOrg,EducationBot,onboard2@test.org,TestPass456,edubot | ||
| Acme NGO,SurveyBot,onboard3@acme.org,TestPass789,survey_lead |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| import logging | ||
| from typing import List, Dict, Set | ||
| from glific_migration.base_processor import BaseCSVProcessor | ||
| from glific_migration.client import APIClient | ||
| from glific_migration.validator import ( | ||
| validate_required_fields, | ||
| is_valid_api_key, | ||
| is_valid_assistant_id, | ||
| ) | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class AssistantIngestProcessor(BaseCSVProcessor): | ||
| """Processor for handling assistant ingestion.""" | ||
|
|
||
| HEADERS = ["assistant_id", "api_key", "success", "response_from_endpoint"] | ||
| REQUIRED_FIELDS = {"assistant_id", "api_key"} | ||
|
|
||
| def __init__(self, input_file: str, output_file: str, base_url: str): | ||
| super().__init__(input_file, output_file, self.HEADERS) | ||
| self.base_url = base_url.rstrip("/") | ||
|
|
||
| def validate_csv(self, rows: List[Dict[str, str]]) -> bool: | ||
| """Validate CSV data for assistant ingestion.""" | ||
| validation_errors = [] | ||
|
|
||
| for idx, row in enumerate(rows, start=1): | ||
| row_errors = [] | ||
|
|
||
| missing = validate_required_fields(row, self.REQUIRED_FIELDS) | ||
| if missing: | ||
| row_errors.append(f"Missing fields: {', '.join(missing)}") | ||
|
|
||
| if not row.get("assistant_id", "").strip(): | ||
| row_errors.append("Empty assistant_id") | ||
|
|
||
| if not row.get("api_key", "").strip(): | ||
| row_errors.append("Empty api_key") | ||
|
|
||
| if row.get("assistant_id") and not is_valid_assistant_id( | ||
| row["assistant_id"] | ||
| ): | ||
| row_errors.append(f"Invalid assistant_id format: {row['assistant_id']}") | ||
|
|
||
| if row.get("api_key") and not is_valid_api_key(row["api_key"]): | ||
| row_errors.append(f"Invalid api_key format: {row['api_key']}") | ||
|
|
||
| if row_errors: | ||
| validation_errors.extend(f"Row {idx}: {err}" for err in row_errors) | ||
|
|
||
| if validation_errors: | ||
| logger.error("CSV validation failed with the following issues:") | ||
| for error in validation_errors: | ||
| logger.error(" - %s", error) | ||
| return False | ||
|
|
||
| logger.info("CSV validation passed.") | ||
| return True | ||
|
|
||
| def process_rows(self, rows: List[Dict[str, str]]) -> None: | ||
| """Process rows for assistant ingestion and write to CSV after each request.""" | ||
| for idx, row in enumerate(rows, start=1): | ||
| assistant_id = row["assistant_id"] | ||
| api_key = row["api_key"] | ||
|
|
||
| logger.info( | ||
| f"Ingesting assistant for row {idx} (assistant_id: {assistant_id})" | ||
| ) | ||
| url = f"{self.base_url}/assistant/{assistant_id}/ingest" | ||
| client = APIClient(api_key=api_key) | ||
| success, resp = client.post(url) | ||
| logger.info(f"Row {idx} processed. Success: {success}") | ||
|
|
||
| result = { | ||
| "assistant_id": assistant_id, | ||
| "api_key": api_key, | ||
| "success": "yes" if success else "no", | ||
| "response_from_endpoint": str(resp), | ||
| } | ||
| self.append_to_csv(result) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| assistant_id,api_key | ||
| asst_1,ApiKey abc123 | ||
| asst_2,ApiKey def456 | ||
| asst_3,ApiKey ghi789 |
Uh oh!
There was an error while loading. Please reload this page.