Skip to content

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Dec 9, 2025

Summary

Fixes #772 - When force_full_refresh=True was specified, the configured catalog was still being built with sync_mode=SyncMode.incremental, causing connectors like Postgres to expect cursor fields even during full refresh operations.

The root cause was that get_configured_catalog() always hardcoded sync_mode=SyncMode.incremental regardless of the force_full_refresh parameter. The parameter only affected whether state was passed to the connector, not the actual sync mode in the catalog.

Changes:

  • Added force_full_refresh parameter to get_configured_catalog()
  • Added logic to select appropriate sync mode based on stream support and refresh mode
  • Updated all call sites (read(), _get_airbyte_message_iterator(), Destination.write()) to pass the parameter

Review & Testing Checklist for Human

  • Verify sync mode logic: When force_full_refresh=True, streams should use full_refresh if supported by the stream, otherwise fall back to incremental. Confirm this matches expected Airbyte protocol behavior.
  • Test with a real connector: Run a sync with force_full_refresh=True against a connector that supports both sync modes (e.g., Postgres) and verify it no longer requests cursor fields
  • Check for missed call sites: Verify there are no other places calling get_configured_catalog() that should also pass force_full_refresh (grep shows caches/base.py:409 also calls it but doesn't have access to force_full_refresh context)

Notes

Summary by CodeRabbit

  • New Features
    • Added a "force full refresh" option that triggers a full data resynchronization; streams automatically adjust their sync mode when enabled.
    • Read operations now consistently respect the forced full-refresh intent across sources so prior incremental state is ignored and a complete refresh is performed.

✏️ Tip: You can customize this high-level summary in your review settings.

When force_full_refresh=True, the configured catalog now uses
SyncMode.full_refresh for streams that support it, instead of
always using SyncMode.incremental.

This fixes issue #772 where the Postgres source would default to
incremental sync even when force_full_refresh=True was specified.

The fix updates get_configured_catalog() to accept a force_full_refresh
parameter and properly set the sync mode based on:
- If force_full_refresh=True: prefer full_refresh if supported
- If force_full_refresh=False: prefer incremental if supported

Fixes #772

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #ask-devin-ai:

@Devin - Triage the latest 10 issues in the PyAirbyte repo that are from external community members / contributors. Don't comment on them directly but look into them and prepare for me a set of suggestions on how to disposition each.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1765247038245949

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

github-actions bot commented Dec 9, 2025

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765247774-fix-force-full-refresh' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765247774-fix-force-full-refresh'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Dec 9, 2025

PyTest Results (Fast Tests Only, No Creds)

348 tests  ±0   348 ✅ ±0   5m 42s ⏱️ -26s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 475ffa3. ± Comparison against base commit 1b48476.

♻️ This comment has been updated with latest results.

Make _get_sync_mode defensive by using getattr to handle mock objects
or older stream objects that don't have the supported_sync_modes
attribute. This preserves backward compatibility with existing tests
while still fixing the force_full_refresh behavior for real streams.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 9, 2025

📝 Walkthrough

Walkthrough

Propagates a new keyword-only parameter force_full_refresh: bool through source catalog configuration and read flows; adds a per-stream _get_sync_mode helper to set sync modes based on that flag; destinations forward the flag when constructing catalogs from a Source.

Changes

Cohort / File(s) Summary
Source catalog configuration
airbyte/sources/base.py
Added keyword-only force_full_refresh: bool to Source.get_configured_catalog(). Introduced helper _get_sync_mode(stream: AirbyteStream) -> SyncMode and use it to set each ConfiguredAirbyteStream.sync_mode. Propagated force_full_refresh through _get_airbyte_message_iterator and read(); when force_full_refresh=True, iterator is invoked without state.
Destination catalog construction
airbyte/destinations/base.py
When constructing a CatalogProvider from a Source instance, forwards force_full_refresh into source.get_configured_catalog(..., force_full_refresh=force_full_refresh) when applicable.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Could you verify _get_sync_mode() handles streams without incremental support so it never selects incremental when force_full_refresh=True? wdyt?
  • Could you check all call sites (including tests and any CatalogProvider consumers) accept and pass the new keyword-only parameter? wdyt?
  • Could you validate that suppressing state during reads (caching/resume paths) does not unintentionally drop resume state? wdyt?

Pre-merge checks and finishing touches

✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding logic to honor force_full_refresh when determining sync mode in the configured catalog.
Linked Issues check ✅ Passed The PR directly addresses issue #772 by implementing force_full_refresh parameter in get_configured_catalog and selecting sync modes based on stream capabilities and refresh intent.
Out of Scope Changes check ✅ Passed All changes are scoped to fixing force_full_refresh handling in catalog sync mode configuration across source, destination, and message iterator paths.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1765247774-fix-force-full-refresh

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9abd32a and 475ffa3.

📒 Files selected for processing (1)
  • airbyte/sources/base.py (5 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.

Applied to files:

  • airbyte/sources/base.py
📚 Learning: 2024-10-06T17:43:09.319Z
Learnt from: aaronsteers
Repo: airbytehq/PyAirbyte PR: 411
File: airbyte/cli.py:26-26
Timestamp: 2024-10-06T17:43:09.319Z
Learning: In the PyAirbyte project, when reviewing Python code and encountering a TODO comment without an issue link, I should post a friendly reminder to resolve it before merging, instead of suggesting to add an issue link.

Applied to files:

  • airbyte/sources/base.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
airbyte/sources/base.py (6)

420-425: Nice use of keyword-only parameter!

Adding force_full_refresh as a keyword-only parameter with a default value of False maintains backward compatibility nicely. This ensures existing call sites continue to work without modification. 👍


449-462: Logic looks solid, but a heads-up on an edge case

The implementation correctly prefers full_refresh when force_full_refresh=True and falls back to incremental otherwise. The defensive getattr for handling mocks is a nice touch!

One minor consideration: if a stream only supports full_refresh (i.e., SyncMode.incremental is not in supported_sync_modes) and force_full_refresh=False, this would still return SyncMode.incremental, which could potentially cause issues with such streams. I realize this preserves the previous behavior, so it's not a regression introduced here.

Would it be worth logging a warning in that scenario, or is this intentionally left as-is for now to keep the scope focused? Just wanted to flag it for awareness, wdyt?


464-469: Clean integration of the sync mode helper

Swapping the hardcoded SyncMode.incremental with _get_sync_mode(stream) is the right approach. The stream object is correctly available from the list comprehension context.


748-756: Correctly wired up!

The force_full_refresh parameter is properly passed to get_configured_catalog, and the existing logic at line 753 ensures state is set to None when full refresh is requested. This ensures both the catalog sync mode and state handling are aligned.


891-896: This is the core fix for issue #772 - looks good!

By passing force_full_refresh to get_configured_catalog when constructing the CatalogProvider, the configured catalog will now have the correct sync mode per stream. This should resolve the "No cursor field specified for stream" error when using Postgres (or similar connectors) with force_full_refresh=True.


932-943: Warning message is now consistent with the parameter name

The warning correctly references force_full_refresh=True to match the actual method parameter. Thanks for addressing the earlier feedback!


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte/sources/base.py (1)

537-547: get_records docstring talks about full refresh, but catalog is configured as incremental

The get_records docstring says:

“Generate a configured catalog that syncs the given stream in full_refresh mode”

But the implementation calls:

configured_catalog = self.get_configured_catalog(streams=[stream])

and, with the current _get_sync_mode, that will always set sync_mode=SyncMode.incremental when force_full_refresh isn’t passed.

Given the new force_full_refresh support, would it be clearer to either:

  • Align behavior with the docs by forcing full refresh here:
-        configured_catalog = self.get_configured_catalog(streams=[stream])
+        configured_catalog = self.get_configured_catalog(
+            streams=[stream],
+            force_full_refresh=True,
+        )
  • Or, if you prefer incremental semantics for get_records, update the docstring to no longer promise full refresh?

Curious which behavior you’d like to standardize on here, wdyt?

🧹 Nitpick comments (1)
airbyte/sources/base.py (1)

420-435: Sync mode selection vs supported_sync_modes and docstring seem slightly misaligned

The new force_full_refresh plumbing and _get_sync_mode helper look like they’ll fix the Postgres case where full refresh was requested but the catalog still said incremental. One thing that stood out though:

  • The docstring says: “If force_full_refresh is True, streams will be configured with full_refresh sync mode when supported by the stream. Otherwise, incremental sync mode is used when supported.”
  • In _get_sync_mode, the non‑forced branch always returns SyncMode.incremental, even if the stream only advertises full_refresh (or has no supported_sync_modes), so we’re not actually “using incremental when supported” — we’re just always using incremental.

This preserves previous behavior, but it means:

  • Streams that only support full_refresh (or don’t list modes) will still be configured as incremental.
  • The docstring oversells what the default branch does.

Would you consider either:

  1. Making the default path respect supported_sync_modes (while keeping a final incremental fallback for full backward compatibility), e.g.:
-        def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
-            """Determine the sync mode for a stream based on force_full_refresh and support."""
-            # Use getattr to handle mocks or streams without supported_sync_modes attribute
-            supported_modes = getattr(stream, "supported_sync_modes", None)
-
-            if force_full_refresh:
-                # When force_full_refresh is True, prefer full_refresh if supported
-                if supported_modes and SyncMode.full_refresh in supported_modes:
-                    return SyncMode.full_refresh
-                # Fall back to incremental if full_refresh is not supported
-                return SyncMode.incremental
-
-            # Default behavior: preserve previous semantics (always incremental)
-            return SyncMode.incremental
+        def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
+            """Determine the sync mode for a stream based on force_full_refresh and support."""
+            # Use getattr to handle mocks or streams without supported_sync_modes attribute
+            supported_modes = getattr(stream, "supported_sync_modes", None) or []
+
+            if force_full_refresh:
+                # When force_full_refresh is True, prefer full_refresh if supported
+                if SyncMode.full_refresh in supported_modes:
+                    return SyncMode.full_refresh
+                if SyncMode.incremental in supported_modes:
+                    return SyncMode.incremental
+
+                # If the stream doesn’t advertise modes, keep historical behaviour.
+                return SyncMode.incremental
+
+            # Default behaviour when not forcing full refresh:
+            if SyncMode.incremental in supported_modes:
+                return SyncMode.incremental
+            if SyncMode.full_refresh in supported_modes:
+                return SyncMode.full_refresh
+
+            # Fallback for streams that don’t advertise modes (keeps old semantics).
+            return SyncMode.incremental
  1. Or, if you explicitly want to avoid changing default semantics in this PR, tightening the docstring/comment to say that the non‑forced branch always uses incremental (regardless of supported_sync_modes) so readers aren’t surprised?

Wdyt?

Also applies to: 449-462, 469-488

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f9c090d and 9abd32a.

📒 Files selected for processing (1)
  • airbyte/sources/base.py (4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.

Applied to files:

  • airbyte/sources/base.py
🧬 Code graph analysis (1)
airbyte/sources/base.py (2)
airbyte/strategies.py (1)
  • destination_sync_mode (80-92)
airbyte/shared/catalog_providers.py (1)
  • CatalogProvider (31-226)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/sources/base.py (1)

738-755: Propagation of force_full_refresh into iterator and state handling looks solid

Wiring force_full_refresh through to get_configured_catalog(...) and explicitly passing state=None when it’s True matches the intended semantics (connector sees full_refresh where supported and no state). This should address the Postgres “incremental without cursor” case for both multi‑stream and single‑stream reads from this iterator.

I don’t see any issues in this block; keeping the flag keyword‑only also preserves backwards compatibility for existing callers. 👍 Wdyt?

@github-actions
Copy link

github-actions bot commented Dec 9, 2025

PyTest Results (Full)

416 tests  ±0   399 ✅  - 1   26m 27s ⏱️ + 1m 12s
  1 suites ±0    16 💤 ±0 
  1 files   ±0     1 ❌ +1 

For more details on these failures, see this check.

Results for commit 475ffa3. ± Comparison against base commit 1b48476.

♻️ This comment has been updated with latest results.

…force_full_refresh)

Co-Authored-By: AJ Steers <aj@airbyte.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Postgres source defaults to incremental sync even when full refresh is requested in PyAirbyte

2 participants