-
Notifications
You must be signed in to change notification settings - Fork 33
Feature/reorganize project structure #264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR reorganizes the DataPusher Plus codebase from a monolithic jobs.py file (1624 lines) into a modular pipeline architecture with distinct stages. The refactoring separates concerns into individual stage classes (Download, FormatConverter, Validation, Analysis, Database, Indexing, Formula, Metadata), each with a single well-defined responsibility. The original monolithic implementation is preserved in jobs_legacy.py for reference, while jobs.py now serves as a backward compatibility wrapper.
Key Changes:
- Introduced a pipeline architecture with 8 distinct processing stages
- Created a
ProcessingContextdataclass to manage shared state between stages - Split monolithic code into modular, testable components
- Maintained backward compatibility through wrapper functions
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
ckanext/datapusher_plus/jobs_legacy.py |
Preserved copy of original monolithic implementation for reference |
ckanext/datapusher_plus/jobs.py |
Backward compatibility wrapper that re-exports refactored functions |
ckanext/datapusher_plus/jobs/__init__.py |
Module initialization with exports |
ckanext/datapusher_plus/jobs/pipeline.py |
Main pipeline orchestration and execution logic |
ckanext/datapusher_plus/jobs/context.py |
ProcessingContext dataclass for shared state management |
ckanext/datapusher_plus/jobs/stages/base.py |
Abstract base class for all pipeline stages |
ckanext/datapusher_plus/jobs/stages/download.py |
Download stage implementation |
ckanext/datapusher_plus/jobs/stages/format_converter.py |
Format conversion stage implementation |
ckanext/datapusher_plus/jobs/stages/validation.py |
CSV validation and deduplication stage |
ckanext/datapusher_plus/jobs/stages/analysis.py |
Type inference and statistics stage |
ckanext/datapusher_plus/jobs/stages/database.py |
PostgreSQL COPY operations stage |
ckanext/datapusher_plus/jobs/stages/indexing.py |
Automatic index creation stage |
ckanext/datapusher_plus/jobs/stages/formula.py |
DRUF Jinja2 formula processing stage |
ckanext/datapusher_plus/jobs/stages/metadata.py |
Resource metadata updates and aliasing stage |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) | ||
| sortcheck_json = json.loads(str(stdout_content)) | ||
| except (json.JSONDecodeError, AttributeError) as e: | ||
| raise utils.JobError(f"Failed to parse sortcheck JSONoutput: {e}") |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'JSONoutput' to 'JSON output' (should have space).
| raise utils.JobError(f"Failed to parse sortcheck JSONoutput: {e}") | |
| raise utils.JobError(f"Failed to parse sortcheck JSON output: {e}") |
| f'"{len(unsafe_headers)} unsafe" header names found ' | ||
| f"({unsafe_headers}). Sanitizing...\"" |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mismatched quotation marks - the log message starts with a single quote but ends with escaped double quote followed by single quote. Should be consistent: f'"{len(unsafe_headers)} unsafe" header names found ({unsafe_headers}). Sanitizing...'
| f'"{len(unsafe_headers)} unsafe" header names found ' | |
| f"({unsafe_headers}). Sanitizing...\"" | |
| f"{len(unsafe_headers)} unsafe header names found " | |
| f"({unsafe_headers}). Sanitizing..." |
| context = stage(context) | ||
|
|
||
| # If stage returns None, stop pipeline | ||
| if context is None: | ||
| context.logger.info(f"Pipeline stopped after stage: {stage.name}") |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Attempting to access context.logger after context is None will cause an AttributeError. When a stage returns None, the context is None and cannot be accessed. The logger should be accessed before the None check or stored separately.
| context = stage(context) | |
| # If stage returns None, stop pipeline | |
| if context is None: | |
| context.logger.info(f"Pipeline stopped after stage: {stage.name}") | |
| logger = context.logger | |
| context = stage(context) | |
| # If stage returns None, stop pipeline | |
| if context is None: | |
| logger.info(f"Pipeline stopped after stage: {stage.name}") |
| ) | ||
|
|
||
| # Process resource suggestion formulae | ||
| package = self._process_resource_suggestions( |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable package is not used.
| package = self._process_resource_suggestions( | |
| self._process_resource_suggestions( |
| else: | ||
| qsv_input_utf_8_encoded_csv = tmp | ||
| try: | ||
| qsv.input(tmp, trim_headers=True, output_file=qsv_input_csv) |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable qsv_input_utf_8_encoded_csv is not used.
| qsv.input(tmp, trim_headers=True, output_file=qsv_input_csv) | |
| qsv.input(qsv_input_utf_8_encoded_csv, trim_headers=True, output_file=qsv_input_csv) |
| f"Resource too large to download: {DataSize(int(cl)):.2MB} " | ||
| f"> max ({DataSize(int(max_content_length)):.2MB})." | ||
| ) | ||
| except ValueError: |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| except ValueError: | |
| except ValueError: | |
| # If content-length header is missing or malformed, skip size check and proceed. |
| raise utils.JobError( | ||
| f"Resource too large to download: {DataSize(int(cl)):.2MB} > max ({DataSize(int(max_content_length)):.2MB})." | ||
| ) | ||
| except ValueError: |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| except ValueError: | |
| except ValueError: | |
| # If content-length header is missing or malformed, ignore and proceed as file size is unknown. |
| # to truncate overly long strings from causing issues with | ||
| # Python's CSV reader and Postgres's limits with the COPY command | ||
| if spatial_format_flag: | ||
| env = os.environ.copy() |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This statement is unreachable.
| f = open(qsv_input_utf_8_encoded_csv, "wb") | ||
| f.write(cmd.stdout) | ||
| f.close() |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File may not be closed if this operation raises an exception.
| f = open(qsv_input_utf_8_encoded_csv, "wb") | |
| f.write(cmd.stdout) | |
| f.close() | |
| with open(qsv_input_utf_8_encoded_csv, "wb") as f: | |
| f.write(cmd.stdout) |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
With this PR we are reorganizing the project structure into different directories for better separation of concerns
Each pipeline stage has a single, well-defined responsibility: