-
Notifications
You must be signed in to change notification settings - Fork 70
fix: Honor force_full_refresh in configured catalog sync mode #899
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
When force_full_refresh=True, the configured catalog now uses SyncMode.full_refresh for streams that support it, instead of always using SyncMode.incremental. This fixes issue #772 where the Postgres source would default to incremental sync even when force_full_refresh=True was specified. The fix updates get_configured_catalog() to accept a force_full_refresh parameter and properly set the sync mode based on: - If force_full_refresh=True: prefer full_refresh if supported - If force_full_refresh=False: prefer incremental if supported Fixes #772 Co-Authored-By: AJ Steers <aj@airbyte.io>
Original prompt from AJ Steers |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765247774-fix-force-full-refresh' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765247774-fix-force-full-refresh'Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
Make _get_sync_mode defensive by using getattr to handle mock objects or older stream objects that don't have the supported_sync_modes attribute. This preserves backward compatibility with existing tests while still fixing the force_full_refresh behavior for real streams. Co-Authored-By: AJ Steers <aj@airbyte.io>
📝 WalkthroughWalkthroughPropagates a new keyword-only parameter Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Pre-merge checks and finishing touches✅ Passed checks (5 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧠 Learnings (2)📚 Learning: 2024-10-08T15:34:31.026ZApplied to files:
📚 Learning: 2024-10-06T17:43:09.319ZApplied to files:
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
🔇 Additional comments (6)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/sources/base.py (1)
537-547:get_recordsdocstring talks about full refresh, but catalog is configured as incrementalThe
get_recordsdocstring says:“Generate a configured catalog that syncs the given stream in
full_refreshmode”But the implementation calls:
configured_catalog = self.get_configured_catalog(streams=[stream])and, with the current
_get_sync_mode, that will always setsync_mode=SyncMode.incrementalwhenforce_full_refreshisn’t passed.Given the new
force_full_refreshsupport, would it be clearer to either:
- Align behavior with the docs by forcing full refresh here:
- configured_catalog = self.get_configured_catalog(streams=[stream]) + configured_catalog = self.get_configured_catalog( + streams=[stream], + force_full_refresh=True, + )
- Or, if you prefer incremental semantics for
get_records, update the docstring to no longer promise full refresh?Curious which behavior you’d like to standardize on here, wdyt?
🧹 Nitpick comments (1)
airbyte/sources/base.py (1)
420-435: Sync mode selection vssupported_sync_modesand docstring seem slightly misalignedThe new
force_full_refreshplumbing and_get_sync_modehelper look like they’ll fix the Postgres case where full refresh was requested but the catalog still said incremental. One thing that stood out though:
- The docstring says: “If
force_full_refreshis True, streams will be configured withfull_refreshsync mode when supported by the stream. Otherwise, incremental sync mode is used when supported.”- In
_get_sync_mode, the non‑forced branch always returnsSyncMode.incremental, even if the stream only advertisesfull_refresh(or has nosupported_sync_modes), so we’re not actually “using incremental when supported” — we’re just always using incremental.This preserves previous behavior, but it means:
- Streams that only support
full_refresh(or don’t list modes) will still be configured as incremental.- The docstring oversells what the default branch does.
Would you consider either:
- Making the default path respect
supported_sync_modes(while keeping a final incremental fallback for full backward compatibility), e.g.:- def _get_sync_mode(stream: AirbyteStream) -> SyncMode: - """Determine the sync mode for a stream based on force_full_refresh and support.""" - # Use getattr to handle mocks or streams without supported_sync_modes attribute - supported_modes = getattr(stream, "supported_sync_modes", None) - - if force_full_refresh: - # When force_full_refresh is True, prefer full_refresh if supported - if supported_modes and SyncMode.full_refresh in supported_modes: - return SyncMode.full_refresh - # Fall back to incremental if full_refresh is not supported - return SyncMode.incremental - - # Default behavior: preserve previous semantics (always incremental) - return SyncMode.incremental + def _get_sync_mode(stream: AirbyteStream) -> SyncMode: + """Determine the sync mode for a stream based on force_full_refresh and support.""" + # Use getattr to handle mocks or streams without supported_sync_modes attribute + supported_modes = getattr(stream, "supported_sync_modes", None) or [] + + if force_full_refresh: + # When force_full_refresh is True, prefer full_refresh if supported + if SyncMode.full_refresh in supported_modes: + return SyncMode.full_refresh + if SyncMode.incremental in supported_modes: + return SyncMode.incremental + + # If the stream doesn’t advertise modes, keep historical behaviour. + return SyncMode.incremental + + # Default behaviour when not forcing full refresh: + if SyncMode.incremental in supported_modes: + return SyncMode.incremental + if SyncMode.full_refresh in supported_modes: + return SyncMode.full_refresh + + # Fallback for streams that don’t advertise modes (keeps old semantics). + return SyncMode.incremental
- Or, if you explicitly want to avoid changing default semantics in this PR, tightening the docstring/comment to say that the non‑forced branch always uses incremental (regardless of
supported_sync_modes) so readers aren’t surprised?Wdyt?
Also applies to: 449-462, 469-488
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/sources/base.py(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
🧬 Code graph analysis (1)
airbyte/sources/base.py (2)
airbyte/strategies.py (1)
destination_sync_mode(80-92)airbyte/shared/catalog_providers.py (1)
CatalogProvider(31-226)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/sources/base.py (1)
738-755: Propagation offorce_full_refreshinto iterator and state handling looks solidWiring
force_full_refreshthrough toget_configured_catalog(...)and explicitly passingstate=Nonewhen it’s True matches the intended semantics (connector seesfull_refreshwhere supported and no state). This should address the Postgres “incremental without cursor” case for both multi‑stream and single‑stream reads from this iterator.I don’t see any issues in this block; keeping the flag keyword‑only also preserves backwards compatibility for existing callers. 👍 Wdyt?
PyTest Results (Full)416 tests ±0 399 ✅ - 1 26m 27s ⏱️ + 1m 12s For more details on these failures, see this check. Results for commit 475ffa3. ± Comparison against base commit 1b48476. ♻️ This comment has been updated with latest results. |
…force_full_refresh) Co-Authored-By: AJ Steers <aj@airbyte.io>
Summary
Fixes #772 - When
force_full_refresh=Truewas specified, the configured catalog was still being built withsync_mode=SyncMode.incremental, causing connectors like Postgres to expect cursor fields even during full refresh operations.The root cause was that
get_configured_catalog()always hardcodedsync_mode=SyncMode.incrementalregardless of theforce_full_refreshparameter. The parameter only affected whether state was passed to the connector, not the actual sync mode in the catalog.Changes:
force_full_refreshparameter toget_configured_catalog()read(),_get_airbyte_message_iterator(),Destination.write()) to pass the parameterReview & Testing Checklist for Human
force_full_refresh=True, streams should usefull_refreshif supported by the stream, otherwise fall back toincremental. Confirm this matches expected Airbyte protocol behavior.force_full_refresh=Trueagainst a connector that supports both sync modes (e.g., Postgres) and verify it no longer requests cursor fieldsget_configured_catalog()that should also passforce_full_refresh(grep showscaches/base.py:409also calls it but doesn't have access toforce_full_refreshcontext)Notes
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.