Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@ app.egg-info
htmlcov
.cache
.venv
*.logs
*.log
Empty file.
90 changes: 90 additions & 0 deletions backend/glific_migration/base_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import csv
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Dict

logger = logging.getLogger(__name__)


class BaseCSVProcessor(ABC):
"""Base class for CSV processing with common functionality."""

def __init__(self, input_file: str, output_file: str, headers: List[str]):
self.input_file = Path(input_file)
self.output_file = Path(output_file)
self.headers = headers
self._setup_logging()
self._init_output_csv()

def _setup_logging(self) -> None:
"""Configure logging for the processor."""
log_file = self.output_file.parent / f"{self.__class__.__name__.lower()}.logs"
logging.basicConfig(
filename=str(log_file),
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
)

logging.getLogger().addHandler(console_handler)

Comment thread
avirajsingh7 marked this conversation as resolved.
def _init_output_csv(self) -> None:
"""Initialize CSV file with headers."""
try:
with open(self.output_file, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=self.headers)
writer.writeheader()
except Exception as e:
logger.error(f"Error initializing output file {self.output_file}: {str(e)}")
raise

def load_csv(self) -> List[Dict[str, str]]:
"""Load CSV file into list of dictionaries."""
try:
with open(self.input_file, newline="", encoding="utf-8") as f:
return list(csv.DictReader(f))
except FileNotFoundError:
logger.error(f"Input file not found: {self.input_file}")
raise
except Exception as e:
logger.error(f"Error reading CSV file {self.input_file}: {str(e)}")
raise

def append_to_csv(self, row: Dict[str, str]) -> None:
"""Append a single row to the output CSV."""
try:
with open(self.output_file, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=self.headers)
writer.writerow(row)
except Exception as e:
logger.error(f"Error appending to output file {self.output_file}: {str(e)}")
raise

@abstractmethod
def validate_csv(self, rows: List[Dict[str, str]]) -> bool:
"""Validate CSV data before processing."""
pass

@abstractmethod
def process_rows(self, rows: List[Dict[str, str]]) -> None:
"""Process CSV rows and write results incrementally."""
pass

def run(self) -> None:
"""Execute the complete processing pipeline."""
logger.info(f"Starting {self.__class__.__name__}...")
try:
rows = self.load_csv()
if not self.validate_csv(rows):
logger.error("Validation failed. Aborting processing.")
return
self.process_rows(rows)
logger.info(f"{self.__class__.__name__} completed successfully.")
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
raise
50 changes: 50 additions & 0 deletions backend/glific_migration/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from typing import Tuple, Dict, Optional

logger = logging.getLogger(__name__)


class APIClient:
"""Client for making API requests with retry and error handling."""

def __init__(self, api_key: str):
self.headers = {
"accept": "application/json",
"Content-Type": "application/json",
"X-API-KEY": api_key,
}
self.session = requests.Session()
retries = Retry(
total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retries)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)

def post(self, url: str, data: Optional[Dict] = None) -> Tuple[bool, Dict]:
"""Make a POST request to the specified URL."""
try:
response = self.session.post(
url, headers=self.headers, json=data, timeout=10
)
response.raise_for_status()
return True, response.json()
except requests.exceptions.HTTPError as http_err:
try:
error_detail = response.json().get("error", "No error detail provided.")
except Exception:
error_detail = "Unable to parse error response."
logger.error(
"HTTP error while posting to %s: %s | Response error: %s",
url,
str(http_err),
error_detail,
exc_info=True,
)
return False, {"error": error_detail}
except requests.exceptions.RequestException as e:
logger.error("Request to %s failed: %s", url, str(e), exc_info=True)
return False, {"error": str(e)}
17 changes: 17 additions & 0 deletions backend/glific_migration/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"base_url": "http://localhost:8000/api/v1",
"api_key": "SuperUserApiKey",
"openai_key": "openai_api_key_example",
"assistant_ingest": {
"input_csv": "sample_input.csv",
"output_csv": "assistants_output.csv"
},
"organization_onboarding": {
"input_csv": "sample_input.csv",
"output_csv": "orgs_output.csv"
},
"sync_credentials": {
"input_csv": "sample_input.csv",
"output_csv": "credentials_output.csv"
}
}
93 changes: 93 additions & 0 deletions backend/glific_migration/organization_onboarding/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import logging
from typing import List, Dict, Set
from glific_migration.base_processor import BaseCSVProcessor
from glific_migration.client import APIClient
from glific_migration.validator import (
validate_required_fields,
validate_email_format,
validate_password,
)

logger = logging.getLogger(__name__)


class OnboardProcessor(BaseCSVProcessor):
"""Processor for handling organization onboarding."""

HEADERS = [
"organization_name",
"organization_id",
"project_name",
"project_id",
"user_name",
"user_id",
"api_key",
"success",
"response_from_endpoint",
]
REQUIRED_FIELDS = {
"organization_name",
"project_name",
"email",
"password",
"user_name",
}

def __init__(self, input_file: str, output_file: str, api_url: str, api_key: str):
super().__init__(input_file, output_file, self.HEADERS)
self.client = APIClient(api_key)
self.api_url = api_url

def validate_csv(self, rows: List[Dict[str, str]]) -> bool:
"""Validate CSV data for organization onboarding."""
seen_projects = set()
validation_errors = []

for idx, row in enumerate(rows, start=1):
row_errors = []

missing = validate_required_fields(row, self.REQUIRED_FIELDS)
if missing:
row_errors.append(f"Missing fields: {', '.join(missing)}")

project_name = row.get("project_name", "")
if project_name in seen_projects:
row_errors.append(f"Duplicate project name '{project_name}'")
else:
seen_projects.add(project_name)

ok, msg = validate_email_format(row.get("email", ""))
if not ok:
row_errors.append(f"Invalid email: {msg}")

if not validate_password(row.get("password", "")):
row_errors.append("Password must be at least 8 characters")

if row_errors:
validation_errors.extend(f"Row {idx}: {error}" for error in row_errors)

if validation_errors:
logger.error("CSV validation failed with the following issues:")
for error in validation_errors:
logger.error(" - %s", error)
return False

logger.info("CSV validation passed.")
return True

def process_rows(self, rows: List[Dict[str, str]]) -> None:
"""Process rows for organization onboarding and write to CSV after each request."""
for idx, row in enumerate(rows, start=1):
logger.info(
f"Sending API request for row {idx} (project: {row.get('project_name', '')})..."
)
success, resp = self.client.post(self.api_url, data=row)
logger.info(f"Row {idx} processed. Success: {success}")

row_result = {
**row,
"success": "yes" if success else "no",
"response_from_endpoint": str(resp),
}
row_result.update(resp if success else {})
self.append_to_csv({k: row_result.get(k, "") for k in self.HEADERS})
27 changes: 27 additions & 0 deletions backend/glific_migration/organization_onboarding/run_onboarding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path
import json
from glific_migration.organization_onboarding.processor import OnboardProcessor

base_dir = Path(__file__).parent.resolve()


def main():
with open(base_dir / "../config.json", "r") as file:
config = json.load(file)

input_file = base_dir / config["organization_onboarding"]["input_csv"]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to avoid magic strings like "organization_onboarding", "input_csv".

Libraries like dynaconf can help; they support key traversing like settings.organization_onboarding.input_csv

output_file = base_dir / config["organization_onboarding"]["output_csv"]
api_url = config["base_url"] + "/onboard"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API endpoints like "/onboard" may also be kept in a string constant somewhere in a common file.

The routes in backend/app/api/routes/onboarding.py can also use the same string constant.

@router.post(
    "/onboard",
    dependencies=[Depends(get_current_active_superuser)],
    response_model=OnboardingResponse,
)

Not an urgent change, can be picked up as part of a separate PR where all such routes, other constant strings are refactored.

api_key = config["api_key"]
Comment thread
avirajsingh7 marked this conversation as resolved.

processor = OnboardProcessor(
input_file=str(input_file),
output_file=str(output_file),
api_url=api_url,
api_key=api_key,
)
processor.run()


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
organization_name,project_name,email,password,user_name
Glific Foundation,Chatbot Project,onboard1@glific.org,TestPass123,admin1
TestOrg,EducationBot,onboard2@test.org,TestPass456,edubot
Acme NGO,SurveyBot,onboard3@acme.org,TestPass789,survey_lead
82 changes: 82 additions & 0 deletions backend/glific_migration/sync_assistant/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import logging
from typing import List, Dict, Set
from glific_migration.base_processor import BaseCSVProcessor
from glific_migration.client import APIClient
from glific_migration.validator import (
validate_required_fields,
is_valid_api_key,
is_valid_assistant_id,
)


logger = logging.getLogger(__name__)


class AssistantIngestProcessor(BaseCSVProcessor):
"""Processor for handling assistant ingestion."""

HEADERS = ["assistant_id", "api_key", "success", "response_from_endpoint"]
REQUIRED_FIELDS = {"assistant_id", "api_key"}

def __init__(self, input_file: str, output_file: str, base_url: str):
super().__init__(input_file, output_file, self.HEADERS)
self.base_url = base_url.rstrip("/")

def validate_csv(self, rows: List[Dict[str, str]]) -> bool:
"""Validate CSV data for assistant ingestion."""
validation_errors = []

for idx, row in enumerate(rows, start=1):
row_errors = []

missing = validate_required_fields(row, self.REQUIRED_FIELDS)
if missing:
row_errors.append(f"Missing fields: {', '.join(missing)}")

if not row.get("assistant_id", "").strip():
row_errors.append("Empty assistant_id")

if not row.get("api_key", "").strip():
row_errors.append("Empty api_key")

if row.get("assistant_id") and not is_valid_assistant_id(
row["assistant_id"]
):
row_errors.append(f"Invalid assistant_id format: {row['assistant_id']}")

if row.get("api_key") and not is_valid_api_key(row["api_key"]):
row_errors.append(f"Invalid api_key format: {row['api_key']}")

if row_errors:
validation_errors.extend(f"Row {idx}: {err}" for err in row_errors)

if validation_errors:
logger.error("CSV validation failed with the following issues:")
for error in validation_errors:
logger.error(" - %s", error)
return False

logger.info("CSV validation passed.")
return True

def process_rows(self, rows: List[Dict[str, str]]) -> None:
"""Process rows for assistant ingestion and write to CSV after each request."""
for idx, row in enumerate(rows, start=1):
assistant_id = row["assistant_id"]
api_key = row["api_key"]

logger.info(
f"Ingesting assistant for row {idx} (assistant_id: {assistant_id})"
)
url = f"{self.base_url}/assistant/{assistant_id}/ingest"
client = APIClient(api_key=api_key)
success, resp = client.post(url)
logger.info(f"Row {idx} processed. Success: {success}")

result = {
"assistant_id": assistant_id,
"api_key": api_key,
"success": "yes" if success else "no",
"response_from_endpoint": str(resp),
}
self.append_to_csv(result)
4 changes: 4 additions & 0 deletions backend/glific_migration/sync_assistant/sample_input.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
assistant_id,api_key
asst_1,ApiKey abc123
asst_2,ApiKey def456
asst_3,ApiKey ghi789
Loading