Skip to content

Conversation

@tino097
Copy link
Member

@tino097 tino097 commented Nov 17, 2025

With this PR we are reorganizing the project structure into different directories for better separation of concerns

Each pipeline stage has a single, well-defined responsibility:

  • DownloadStage: Resource fetching, hash checking, ZIP extraction
  • FormatConverterStage: Spreadsheet/spatial/CSV conversion & normalization
  • ValidationStage: RFC4180 validation, sortcheck, deduplication
  • AnalysisStage: Type inference, statistics, frequency tables, PII screening
  • DatabaseStage: PostgreSQL COPY operations
  • IndexingStage: Automatic index creation based on cardinality
  • FormulaStage: DRUF Jinja2 formula processing (optional, requires scheming)
  • MetadataStage: Auto-aliasing, summary stats, resource metadata updates

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR reorganizes the DataPusher Plus codebase from a monolithic jobs.py file (1624 lines) into a modular pipeline architecture with distinct stages. The refactoring separates concerns into individual stage classes (Download, FormatConverter, Validation, Analysis, Database, Indexing, Formula, Metadata), each with a single well-defined responsibility. The original monolithic implementation is preserved in jobs_legacy.py for reference, while jobs.py now serves as a backward compatibility wrapper.

Key Changes:

  • Introduced a pipeline architecture with 8 distinct processing stages
  • Created a ProcessingContext dataclass to manage shared state between stages
  • Split monolithic code into modular, testable components
  • Maintained backward compatibility through wrapper functions

Reviewed Changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 17 comments.

Show a summary per file
File Description
ckanext/datapusher_plus/jobs_legacy.py Preserved copy of original monolithic implementation for reference
ckanext/datapusher_plus/jobs.py Backward compatibility wrapper that re-exports refactored functions
ckanext/datapusher_plus/jobs/__init__.py Module initialization with exports
ckanext/datapusher_plus/jobs/pipeline.py Main pipeline orchestration and execution logic
ckanext/datapusher_plus/jobs/context.py ProcessingContext dataclass for shared state management
ckanext/datapusher_plus/jobs/stages/base.py Abstract base class for all pipeline stages
ckanext/datapusher_plus/jobs/stages/download.py Download stage implementation
ckanext/datapusher_plus/jobs/stages/format_converter.py Format conversion stage implementation
ckanext/datapusher_plus/jobs/stages/validation.py CSV validation and deduplication stage
ckanext/datapusher_plus/jobs/stages/analysis.py Type inference and statistics stage
ckanext/datapusher_plus/jobs/stages/database.py PostgreSQL COPY operations stage
ckanext/datapusher_plus/jobs/stages/indexing.py Automatic index creation stage
ckanext/datapusher_plus/jobs/stages/formula.py DRUF Jinja2 formula processing stage
ckanext/datapusher_plus/jobs/stages/metadata.py Resource metadata updates and aliasing stage

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

)
sortcheck_json = json.loads(str(stdout_content))
except (json.JSONDecodeError, AttributeError) as e:
raise utils.JobError(f"Failed to parse sortcheck JSONoutput: {e}")
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'JSONoutput' to 'JSON output' (should have space).

Suggested change
raise utils.JobError(f"Failed to parse sortcheck JSONoutput: {e}")
raise utils.JobError(f"Failed to parse sortcheck JSON output: {e}")

Copilot uses AI. Check for mistakes.
Comment on lines +173 to +174
f'"{len(unsafe_headers)} unsafe" header names found '
f"({unsafe_headers}). Sanitizing...\""
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mismatched quotation marks - the log message starts with a single quote but ends with escaped double quote followed by single quote. Should be consistent: f'"{len(unsafe_headers)} unsafe" header names found ({unsafe_headers}). Sanitizing...'

Suggested change
f'"{len(unsafe_headers)} unsafe" header names found '
f"({unsafe_headers}). Sanitizing...\""
f"{len(unsafe_headers)} unsafe header names found "
f"({unsafe_headers}). Sanitizing..."

Copilot uses AI. Check for mistakes.
Comment on lines +287 to +291
context = stage(context)

# If stage returns None, stop pipeline
if context is None:
context.logger.info(f"Pipeline stopped after stage: {stage.name}")
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attempting to access context.logger after context is None will cause an AttributeError. When a stage returns None, the context is None and cannot be accessed. The logger should be accessed before the None check or stored separately.

Suggested change
context = stage(context)
# If stage returns None, stop pipeline
if context is None:
context.logger.info(f"Pipeline stopped after stage: {stage.name}")
logger = context.logger
context = stage(context)
# If stage returns None, stop pipeline
if context is None:
logger.info(f"Pipeline stopped after stage: {stage.name}")

Copilot uses AI. Check for mistakes.
)

# Process resource suggestion formulae
package = self._process_resource_suggestions(
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable package is not used.

Suggested change
package = self._process_resource_suggestions(
self._process_resource_suggestions(

Copilot uses AI. Check for mistakes.
else:
qsv_input_utf_8_encoded_csv = tmp
try:
qsv.input(tmp, trim_headers=True, output_file=qsv_input_csv)
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable qsv_input_utf_8_encoded_csv is not used.

Suggested change
qsv.input(tmp, trim_headers=True, output_file=qsv_input_csv)
qsv.input(qsv_input_utf_8_encoded_csv, trim_headers=True, output_file=qsv_input_csv)

Copilot uses AI. Check for mistakes.
f"Resource too large to download: {DataSize(int(cl)):.2MB} "
f"> max ({DataSize(int(max_content_length)):.2MB})."
)
except ValueError:
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except ValueError:
except ValueError:
# If content-length header is missing or malformed, skip size check and proceed.

Copilot uses AI. Check for mistakes.
raise utils.JobError(
f"Resource too large to download: {DataSize(int(cl)):.2MB} > max ({DataSize(int(max_content_length)):.2MB})."
)
except ValueError:
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except ValueError:
except ValueError:
# If content-length header is missing or malformed, ignore and proceed as file size is unknown.

Copilot uses AI. Check for mistakes.
# to truncate overly long strings from causing issues with
# Python's CSV reader and Postgres's limits with the COPY command
if spatial_format_flag:
env = os.environ.copy()
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement is unreachable.

Copilot uses AI. Check for mistakes.
Comment on lines +598 to +600
f = open(qsv_input_utf_8_encoded_csv, "wb")
f.write(cmd.stdout)
f.close()
Copy link

Copilot AI Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File may not be closed if this operation raises an exception.

Suggested change
f = open(qsv_input_utf_8_encoded_csv, "wb")
f.write(cmd.stdout)
f.close()
with open(qsv_input_utf_8_encoded_csv, "wb") as f:
f.write(cmd.stdout)

Copilot uses AI. Check for mistakes.
@jqnatividad jqnatividad mentioned this pull request Nov 17, 2025
10 tasks
tino097 and others added 2 commits November 27, 2025 14:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants