From 7b58232c68394169009f4812dbb890b6e0c3a7b1 Mon Sep 17 00:00:00 2001 From: Victor Date: Mon, 12 Jan 2026 14:16:49 +0100 Subject: [PATCH] refactor!: replace dependencies option with required stale_scope --- Gemfile.lock | 2 +- README.md | 87 +- UPGRADE-GUIDE.md | 59 +- lib/etlify/model.rb | 8 +- lib/etlify/stale_records/finder.rb | 469 +-------- lib/etlify/version.rb | 2 +- spec/etlify/model_spec.rb | 27 +- spec/etlify/stale_records/batch_sync_spec.rb | 23 + spec/etlify/stale_records/finder_spec.rb | 955 +++---------------- spec/rails_helper.rb | 11 + 10 files changed, 338 insertions(+), 1305 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 3ce541c..ac3f4a5 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - etlify (0.9.2) + etlify (0.10.0) rails (>= 7.0) GEM diff --git a/README.md b/README.md index 03d41d1..78f1a2b 100644 --- a/README.md +++ b/README.md @@ -114,12 +114,77 @@ class User < ApplicationRecord id_property: :id, # Only sync when an email exists sync_if: ->(user) { user.email.present? }, - # useful if your object serialization includes dependencies - dependencies: [:investments] + # Required: defines which records need to be synced + stale_scope: Users::EtlifyStaleScopeQuery ) end ``` +### Writing a stale_scope + +The `stale_scope` is required and defines which records need to be synced. It must be a Proc or a Query Object responding to `.call(model_class, crm_name)` and returning an `ActiveRecord::Relation`. + +```ruby +# app/queries/users/etlify_stale_scope_query.rb +module Users + class EtlifyStaleScopeQuery + STALE_SQL = <<-SQL.squish + crm_synchronisations.id IS NULL + OR crm_synchronisations.crm_name != ? + OR crm_synchronisations.last_synced_at < users.updated_at + SQL + + def self.call(model, crm_name) + model + .left_joins(:crm_synchronisations) + .where(STALE_SQL, crm_name.to_s) + end + end +end +``` + +You can also use an inline Proc: + +```ruby +hubspot_etlified_with( + serializer: UserSerializer, + crm_object_type: "contacts", + id_property: :id, + stale_scope: ->(model, crm_name) do + model + .left_joins(:crm_synchronisations) + .where(<<-SQL.squish, crm_name.to_s) + crm_synchronisations.id IS NULL + OR crm_synchronisations.crm_name != ? + OR crm_synchronisations.last_synced_at < users.updated_at + SQL + end +) +``` + +For models with dependencies (e.g., sync when investments change): + +```ruby +# app/queries/users/etlify_stale_scope_query.rb +module Users + class EtlifyStaleScopeQuery + STALE_SQL = <<-SQL.squish + crm_synchronisations.id IS NULL + OR crm_synchronisations.crm_name != ? + OR crm_synchronisations.last_synced_at < users.updated_at + OR crm_synchronisations.last_synced_at < investments.updated_at + SQL + + def self.call(model, crm_name) + model + .left_joins(:crm_synchronisations, :investments) + .where(STALE_SQL, crm_name.to_s) + .distinct + end + end +end +``` + ### Writing a serializer ```ruby @@ -254,15 +319,13 @@ Etlify::CRM.register( ### How it works - `Etlify::StaleRecords::Finder` scans all **etlified models** - (those that called `#{crm_name}_etlified_with`) and builds, for each, - a **SQL relation selecting only the PKs** of stale records. -- A record is considered stale if: + (those that called `#{crm_name}_etlified_with`) and calls the `stale_scope` + to get a **SQL relation selecting only the PKs** of stale records. +- The `stale_scope` you define determines which records are considered stale. + Typically, a record is stale if: - it **has no** `crm_synchronisation` row, **or** - - its `last_synced_at` is **older** than the **max** `updated_at` among: - - its own row, - - and its declared dependencies (via `dependencies:` in `etlified_with`, - supporting `belongs_to`, `has_one`, `has_many`, `has_* :through`, - and polymorphic `belongs_to`). + - its `last_synced_at` is **older** than the record's `updated_at` + (or any related model you include in your scope). - `Etlify::StaleRecords::BatchSync` then iterates **by ID batches**: - in **async: true** mode (default): **enqueue** one job per ID without loading full records into memory; @@ -276,8 +339,8 @@ Etlify::CRM.register( via ActiveJob. - **Stable payloads**: ensure your serializers produce deterministic Hashes to benefit from **idempotence**. -- **Dependencies**: declare `dependencies:` accurately in `etlified_with` so - indirect changes trigger resyncs. +- **Stale scope**: write your `stale_scope` to include all related models that + affect your serializer output, so indirect changes trigger resyncs. - **Batch size**: adjust `batch_size` to your DB to balance throughput and memory. --- diff --git a/UPGRADE-GUIDE.md b/UPGRADE-GUIDE.md index 16a75ec..21081e1 100644 --- a/UPGRADE-GUIDE.md +++ b/UPGRADE-GUIDE.md @@ -1,3 +1,60 @@ +# UPGRADING FROM 0.9.x -> 0.10.0 + +⚠️ **Breaking changes ahead.** + +## Overview + +Etlify 0.10.0 replaces the `dependencies:` option with a required `stale_scope:` parameter. +This gives you full control over which records are considered stale. + +## Migration + +Replace `dependencies:` with `stale_scope:` in all your model configurations: + +**Before (0.9.x):** + +```ruby +hubspot_etlified_with( + serializer: UserSerializer, + crm_object_type: "contacts", + id_property: :id, + dependencies: [:company, :investments] +) +``` + +**After (0.10.0):** + +```ruby +hubspot_etlified_with( + serializer: UserSerializer, + crm_object_type: "contacts", + id_property: :id, + stale_scope: Users::EtlifyStaleScopeQuery +) + +# app/queries/users/etlify_stale_scope_query.rb +module Users + class EtlifyStaleScopeQuery + STALE_SQL = <<-SQL.squish + crm_synchronisations.id IS NULL + OR crm_synchronisations.crm_name != ? + OR crm_synchronisations.last_synced_at < users.updated_at + OR crm_synchronisations.last_synced_at < companies.updated_at + OR crm_synchronisations.last_synced_at < investments.updated_at + SQL + + def self.call(model, crm_name) + model + .left_joins(:crm_synchronisations, :company, :investments) + .where(STALE_SQL, crm_name.to_s) + .distinct + end + end +end +``` + +--- + # UPGRADING FROM 0.9.1 -> 0.9.2 - Nothing to do (bugfix) @@ -121,7 +178,7 @@ class User < ApplicationRecord serializer: Etlify::Serializers::UserSerializer, crm_object_type: "contacts", id_property: "email", - dependencies: [:company], + stale_scope: Users::EtlifyStaleScopeQuery, sync_if: ->(record) { record.email.present? } ) end diff --git a/lib/etlify/model.rb b/lib/etlify/model.rb index ee4cdfa..1c9e3a5 100644 --- a/lib/etlify/model.rb +++ b/lib/etlify/model.rb @@ -52,10 +52,14 @@ def define_crm_dsl_on(klass, crm_name) serializer:, crm_object_type:, id_property:, - dependencies: [], + stale_scope:, sync_if: ->(_r) { true }, job_class: nil | + unless stale_scope.respond_to?(:call) + raise ArgumentError, "stale_scope must respond to #call" + end + reg = Etlify::CRM.fetch(crm_name) conf = { @@ -63,7 +67,7 @@ def define_crm_dsl_on(klass, crm_name) guard: sync_if, crm_object_type: crm_object_type, id_property: id_property, - dependencies: Array(dependencies).map(&:to_sym), + stale_scope: stale_scope, adapter: reg.adapter, job_class: job_class || reg.options[:job_class], } diff --git a/lib/etlify/stale_records/finder.rb b/lib/etlify/stale_records/finder.rb index 084082b..a81f1de 100644 --- a/lib/etlify/stale_records/finder.rb +++ b/lib/etlify/stale_records/finder.rb @@ -11,7 +11,6 @@ module StaleRecords # # Each returned relation: # - selects only a single column "id" - # - is safe to `pluck(:id)` (no ambiguous columns) # - is ordered ASC by id (stable batching) class Finder class << self @@ -39,8 +38,6 @@ def call(models: nil, crm_name: nil) private - # ---------- Model discovery / filtering ---------- - def etlified_models(crm_name: nil) ActiveRecord::Base.descendants.select do |m| next false unless m.respond_to?(:table_exists?) && m.table_exists? @@ -64,469 +61,19 @@ def configured_crm_names_for(model, crm_name: nil) end end - # ---------- Core relation builder (Arel-based) ---------- - - # Build the "stale" ids-only relation for one model/CRM. - # - # Strategy: - # - LEFT OUTER JOIN crm_synchronisations scoped to given CRM. - # - WHERE (crm_sync.id IS NULL OR crm_sync.last_synced_at < threshold) - # - threshold = GREATEST(owner.updated_at, deps.updated_at..., epoch) - # - SELECT ". AS id" - # - Wrap in a subquery aliased as "etlify_stale_ids" - # - Outer SELECT projects: etlify_stale_ids.id AS id, ordered ASC + # Call the user-defined stale_scope and ensure proper format. def stale_relation_for(model, crm_name:) - conn = model.connection - owner_tbl = model.table_name - owner_arel = arel_table(model) - crm_arel = CrmSynchronisation.arel_table - - join_on = - crm_arel[:resource_type].eq(model.name) - .and(crm_arel[:resource_id].eq(owner_arel[model.primary_key])) - .and(crm_arel[:crm_name].eq(crm_name.to_s)) - - join_sql = owner_arel.create_join( - crm_arel, owner_arel.create_on(join_on), Arel::Nodes::OuterJoin - ) - - threshold_expr = latest_timestamp_arel(model, crm_name: crm_name, conn: conn) - last_synced_expr = - Arel::Nodes::NamedFunction.new( - "COALESCE", [crm_arel[:last_synced_at], epoch_arel(conn)] - ) - where_pred = crm_arel[:id].eq(nil).or(last_synced_expr.lt(threshold_expr)) - - qualified_pk_sql = - "#{conn.quote_table_name(owner_tbl)}." \ - "#{conn.quote_column_name(model.primary_key)}" - - inner_rel = - model.unscoped - .from(owner_arel) - .joins(join_sql) - .where(where_pred) - .select(Arel.sql("#{qualified_pk_sql} AS id")) - .reorder(Arel.sql("#{qualified_pk_sql} ASC")) - - sub_sql = inner_rel.to_sql - - # Key point: alias the subquery as the real table name so AR's pluck(:id) - # which emits `"#{owner_tbl}"."id"` remains valid. - tbl_alias = conn.quote_table_name(owner_tbl) - sub_from = Arel.sql("(#{sub_sql}) AS #{tbl_alias}") - - # Keep a single id column and stable order. - model.unscoped - .from(sub_from) - .select("id") - .reorder("id ASC") - end - - # ---------- Threshold (owner + dependencies) ---------- - - # Build an Arel expression representing the newest timestamp among: - # - owner's timestamp column (updated_at or created_at) - # - each configured dependency's newest timestamp - # - epoch fallback when values are NULL or missing - def latest_timestamp_arel(model, crm_name:, conn:) - owner_arel = arel_table(model) - parts = [] - - # Owner timestamp (prefer updated_at, fallback created_at, else epoch) - owner_ts_col = timestamp_column_for_model(model) - owner_ts = - if owner_ts_col - Arel::Nodes::NamedFunction.new( - fn_coalesce(conn), - [owner_arel[owner_ts_col], epoch_arel(conn)] - ) - else - epoch_arel(conn) - end - parts << owner_ts - - deps = - Array( - model.etlify_crms.dig(crm_name.to_sym, :dependencies) - ).map(&:to_sym) - - deps.each do |dep_name| - reflection = model.reflect_on_association(dep_name) - next unless reflection + conf = model.etlify_crms.fetch(crm_name) + stale_scope = conf[:stale_scope] - parts << dependency_max_timestamp_arel(model, reflection, conn) - end - - greatest_arel(conn, *parts) - end + relation = stale_scope.call(model, crm_name) - # Choose dependency strategy. - def dependency_max_timestamp_arel(model, reflection, conn) - if reflection.polymorphic? && reflection.macro == :belongs_to - # Safer fallback: do not scan the table for type discovery. - return epoch_arel(conn) + unless relation.is_a?(ActiveRecord::Relation) + raise ArgumentError, + "stale_scope must return an ActiveRecord::Relation, got #{relation.class}" end - if reflection.through_reflection - return Arel.sql( - through_dependency_timestamp_sql(model, reflection, conn) - ) - end - - if reflection.macro == :has_and_belongs_to_many - return Arel.sql( - habtm_dependency_timestamp_sql(model, reflection, conn) - ) - end - - direct_dependency_timestamp_arel(model, reflection, conn) - end - - # ---------- Direct associations (belongs_to / has_one / has_many) --- - - def direct_dependency_timestamp_arel(model, reflection, conn) - owner_arel = arel_table(model) - ts_col = dep_timestamp_column(reflection.klass) - - case reflection.macro - when :belongs_to - return epoch_arel(conn) unless ts_col - - dep_arel = reflection.klass.arel_table - - # Respect custom primary_key on the target. - dep_pk = - reflection.options[:primary_key] || - reflection.klass.primary_key - - fk = reflection.foreign_key - - sub = - dep_arel - .project(dep_arel[ts_col]) - .where(dep_arel[dep_pk].eq(owner_arel[fk])) - .take(1) - - Arel::Nodes::NamedFunction.new( - fn_coalesce(conn), - [Arel::Nodes::Grouping.new(sub), epoch_arel(conn)] - ) - - when :has_one, :has_many - return epoch_arel(conn) unless ts_col - - dep_arel = reflection.klass.arel_table - - # Use foreign_key on dependency pointing to owner primary key. - fk = reflection.foreign_key - - preds = [dep_arel[fk].eq(owner_arel[model.primary_key])] - - # Respect polymorphic :as on the dependency if present. - if (poly_as = reflection.options[:as]) - preds << dep_arel["#{poly_as}_type"].eq(model.name) - end - - sub = - dep_arel - .project( - Arel::Nodes::NamedFunction.new("MAX", [dep_arel[ts_col]]) - ) - .where(preds.reduce(&:and)) - - Arel::Nodes::NamedFunction.new( - fn_coalesce(conn), - [Arel::Nodes::Grouping.new(sub), epoch_arel(conn)] - ) - - else - epoch_arel(conn) - end - end - - # ----------------------------- HABTM ------------------------------- - - # Build MAX(timestamp) over the HABTM source table joined via the - # join table. Respect custom foreign keys when available. - def habtm_dependency_timestamp_sql(model, reflection, conn) - owner_tbl = model.table_name - source_tbl = reflection.klass.table_name - ts_col = dep_timestamp_column(reflection.klass) - return epoch_literal(conn) unless ts_col - - source_pk = - reflection.options[:association_primary_key] || - reflection.klass.primary_key - - source_qt = conn.quote_table_name(source_tbl) - source_tc = conn.quote_column_name(ts_col) - - join_tbl = reflection.join_table.to_s - jt_qt = conn.quote_table_name(join_tbl) - - owner_fk = reflection.foreign_key.to_s - source_fk = reflection.association_foreign_key.to_s - - preds = [] - preds << "#{jt_qt}.#{conn.quote_column_name(owner_fk)} = " \ - "#{conn.quote_table_name(owner_tbl)}." \ - "#{conn.quote_column_name(model.primary_key)}" - preds_sql = preds.map { |p| "(#{p})" }.join(" AND ") - - <<-SQL.squish - COALESCE(( - SELECT MAX(#{source_qt}.#{source_tc}) - FROM #{jt_qt} - INNER JOIN #{source_qt} - ON #{source_qt}.#{conn.quote_column_name(source_pk)} = - #{jt_qt}.#{conn.quote_column_name(source_fk)} - WHERE #{preds_sql} - ), #{epoch_literal(conn)}) - SQL - end - - # Simplified handler for complex has_many :through :through cases - # Uses ActiveRecord's own join building to handle nested :through associations - def through_via_activerecord_sql(model, reflection, ts_col, conn) - owner_tbl = model.table_name - target_tbl = reflection.klass.table_name - - # Build a sample query using ActiveRecord to get the join structure - # We'll use to_sql to get the full SQL with joins - sample_relation = model.unscoped.joins(reflection.name).where("1=0") - full_sql = sample_relation.to_sql - - # Extract the JOIN clauses from the full SQL - # Pattern: FROM "table" INNER JOIN ... WHERE - if full_sql =~ /FROM "#{owner_tbl}"(.+?)WHERE/m - joins_sql = $1.strip - else - # Fallback to epoch if we can't parse - return epoch_literal(conn) - end - - # Extract the target table alias from the joins - # Look for the last occurrence of the target table (it's the final one in the chain) - target_alias = nil - joins_sql.scan(/"#{Regexp.escape(target_tbl)}"(?:\s+(?:AS\s+)?"([^"]+)")?/) do |match| - target_alias = match[0] if match[0] - end - target_alias ||= target_tbl - - # Create an alias for the owner table in the subquery to avoid conflicts - # The outer query references "users"."id", so we need to use a different reference - owner_alias = "#{owner_tbl}_outer" - owner_table_quoted = conn.quote_table_name(owner_tbl) - owner_alias_quoted = conn.quote_table_name(owner_alias) - target_table_quoted = conn.quote_table_name(target_alias) - - # Replace the first occurrence of the owner table in FROM with an aliased version - # FROM "users" becomes FROM "users" AS "users_outer" - joins_with_alias = joins_sql.sub( - /\A(\s*)INNER JOIN "#{Regexp.escape(owner_tbl)}"/, - "\\1INNER JOIN #{owner_table_quoted} AS #{owner_alias_quoted}" - ) - - # Replace all references to the owner table in the joins to use the alias - joins_with_alias.gsub!(/"#{Regexp.escape(owner_tbl)}"\./, "#{owner_alias_quoted}.") - - <<-SQL.squish - COALESCE(( - SELECT MAX(#{target_table_quoted}.#{conn.quote_column_name(ts_col)}) - FROM #{owner_table_quoted} AS #{owner_alias_quoted} - #{joins_with_alias} - WHERE #{owner_alias_quoted}.#{conn.quote_column_name(model.primary_key)} = - #{owner_table_quoted}.#{conn.quote_column_name(model.primary_key)} - ), #{epoch_literal(conn)}) - SQL - end - - # -------------------------- has_* :through ------------------------- - # MAX(timestamp) over the source table joined via the through table. - # Handles nested :through and self-joins by aliasing source/join tables. - def through_dependency_timestamp_sql(model, reflection, conn) - through = reflection.through_reflection - source = reflection.source_reflection - ts_col = dep_timestamp_column(reflection.klass) - return epoch_literal(conn) unless ts_col - - # If the 'through' is itself a has_many :through, use ActiveRecord - # to build the join SQL automatically - if through.through_reflection - return through_via_activerecord_sql(model, reflection, ts_col, conn) - end - - owner_tbl = model.table_name - through_tbl = through.klass.table_name - source_tbl = reflection.klass.table_name - - # Always alias the source table to avoid duplicate-name errors when it is - # the same class/table as the owner (self-joins over :through). - src_alias = "#{source_tbl}_src" - - # Correlated predicate owner -> through (ex: users_profiles.user_id = users.id) - owner_fk = through.foreign_key - owner_to_through_preds = [ - "#{qt(conn, through_tbl)}.#{qc(conn, owner_fk)} = " \ - "#{qt(conn, owner_tbl)}.#{qc(conn, model.primary_key)}", - ] - if (as = through.options[:as]) - owner_to_through_preds << "#{qt(conn, through_tbl)}." \ - "#{qc(conn, "#{as}_type")} = #{conn.quote(model.name)}" - end - - # Nested through on the source side - nested_through = source.try(:through_reflection) - if nested_through - join_tbl = nested_through.klass.table_name - join_pk_to_thr = nested_through.foreign_key - - # Find the real FK from the join model to target klass, or fallback. - join_fk_to_src = - fk_from_join_to_target_klass(nested_through.klass, reflection.klass) || - "#{source.name.to_s.singularize}_id" - - <<-SQL.squish - COALESCE(( - SELECT MAX(#{q_alias(conn, src_alias)}.#{qc(conn, ts_col)}) - FROM #{qt(conn, through_tbl)} - INNER JOIN #{qt(conn, join_tbl)} - ON #{qt(conn, join_tbl)}.#{qc(conn, join_pk_to_thr)} = - #{qt(conn, through_tbl)}.#{qc(conn, through.klass.primary_key)} - INNER JOIN #{aliased_table(conn, source_tbl, src_alias)} - ON #{q_alias(conn, src_alias)}. - #{qc(conn, reflection.klass.primary_key)} = - #{qt(conn, join_tbl)}.#{qc(conn, join_fk_to_src)} - WHERE #{owner_to_through_preds.map { |p| "(#{p})" }.join(" AND ")} - ), #{epoch_literal(conn)}) - SQL - else - # Simple (non-nested) :through - through_pk = - through.options[:primary_key] || through.klass.primary_key - - if source.macro == :belongs_to - src_pk = source.options[:primary_key] || reflection.klass.primary_key - src_fk = source.foreign_key - join_on = - "#{q_alias(conn, src_alias)}.#{qc(conn, src_pk)} = " \ - "#{qt(conn, through_tbl)}.#{qc(conn, src_fk)}" - else - src_fk = - source.foreign_key || - reflection.options[:foreign_key] || - reflection - .klass - .reflections - .dig(source.name.to_s)&.foreign_key || - source.foreign_key - - join_on = - "#{q_alias(conn, src_alias)}.#{qc(conn, src_fk)} = " \ - "#{qt(conn, through_tbl)}.#{qc(conn, through_pk)}" - end - - <<-SQL.squish - COALESCE(( - SELECT MAX(#{q_alias(conn, src_alias)}.#{qc(conn, ts_col)}) - FROM #{qt(conn, through_tbl)} - INNER JOIN #{aliased_table(conn, source_tbl, src_alias)} - ON #{join_on} - WHERE #{owner_to_through_preds.map { |p| "(#{p})" }.join(" AND ")} - ), #{epoch_literal(conn)}) - SQL - end - end - - # ----------------------------- Helpers ----------------------------- - def qt(conn, table_name) - conn.quote_table_name(table_name) - end - - def qc(conn, col_name) - conn.quote_column_name(col_name) - end - - def q_alias(conn, alias_name) - # Quote SQL identifier used as an alias - conn.quote_table_name(alias_name) - end - - def aliased_table(conn, table_name, alias_name) - "#{qt(conn, table_name)} AS #{q_alias(conn, alias_name)}" - end - - def fk_from_join_to_target_klass(join_klass, target_klass) - # Cherche un belongs_to sur le join pointant vers la classe cible, - # retourne sa foreign_key s’il existe (ex: :suitability_questionnaire_id) - refl = join_klass.reflections.values.find do |r| - r.macro == :belongs_to && r.klass == target_klass - end - refl&.foreign_key - end - - # Pick a timestamp column for a given ActiveRecord class. - # Prefer "updated_at", fallback to "created_at", else nil. - def dep_timestamp_column(klass) - return nil unless klass.respond_to?(:column_names) - - cols = klass.column_names - return "updated_at" if cols.include?("updated_at") - return "created_at" if cols.include?("created_at") - nil - end - - def timestamp_column_for_model(model) - dep_timestamp_column(model) - end - - # Adapter-agnostic "greatest": - # - PostgreSQL -> GREATEST(a, b, c) - # - SQLite/others -> MAX(a, b, c) - # Accepts either a variadic list of Arel nodes or an array. - def greatest_arel(conn, *parts) - exprs = parts.flatten.compact - return (exprs.first || epoch_arel(conn)) if exprs.length <= 1 - - Arel::Nodes::NamedFunction.new(greatest_function_name(conn), exprs) - end - - def fn_coalesce(_conn) = "COALESCE" - - # Adapter-agnostic "epoch" as an Arel node. - # - PostgreSQL -> CAST('1970-01-01 00:00:00' AS TIMESTAMP) - # - SQLite (default in tests) -> DATETIME('1970-01-01 00:00:00') - def epoch_arel(conn) - if conn.adapter_name =~ /postgres/i - Arel::Nodes::NamedFunction.new( - "CAST", [Arel.sql("'1970-01-01 00:00:00' AS TIMESTAMP")] - ) - else - Arel::Nodes::NamedFunction.new( - "DATETIME", [Arel.sql("'1970-01-01 00:00:00'")] - ) - end - end - - def greatest_function_name(conn) - adapter = conn.adapter_name.to_s.downcase - adapter.include?("postgres") ? "GREATEST" : "MAX" - end - - # String literal of the epoch for raw SQL fragments. - def epoch_literal(conn) - adapter = conn.adapter_name.to_s.downcase - if adapter.include?("postgres") - "TIMESTAMP '1970-01-01 00:00:00'" - else - "DATETIME('1970-01-01 00:00:00')" - end - end - - # Unscoped arel_table helper. - def arel_table(model) - model.unscoped.arel_table + relation.select(:id).reorder(id: :asc) end end end diff --git a/lib/etlify/version.rb b/lib/etlify/version.rb index 7ad176c..7318d74 100644 --- a/lib/etlify/version.rb +++ b/lib/etlify/version.rb @@ -1,3 +1,3 @@ module Etlify - VERSION = "0.9.2" + VERSION = "0.10.0" end diff --git a/spec/etlify/model_spec.rb b/spec/etlify/model_spec.rb index 58268d4..506ee95 100644 --- a/spec/etlify/model_spec.rb +++ b/spec/etlify/model_spec.rb @@ -95,11 +95,12 @@ def self.hubspot_etlified_with(**) klass = build_including_class described_class.define_crm_dsl_on(klass, :hubspot) + stale_scope_proc = ->(model, _crm) { model.all } klass.hubspot_etlified_with( serializer: dummy_serializer, crm_object_type: :contact, id_property: :external_id, - dependencies: %w[company owner], + stale_scope: stale_scope_proc, sync_if: ->(r) { r.respond_to?(:active?) ? r.active? : true }, job_class: "OverrideJob" ) @@ -109,7 +110,7 @@ def self.hubspot_etlified_with(**) expect(conf[:guard]).to be_a(Proc) expect(conf[:crm_object_type]).to eq(:contact) expect(conf[:id_property]).to eq(:external_id) - expect(conf[:dependencies]).to eq(%i[company owner]) + expect(conf[:stale_scope]).to eq(stale_scope_proc) expect(conf[:adapter]).to eq(dummy_adapter) expect(conf[:job_class]).to eq("OverrideJob") end @@ -121,7 +122,8 @@ def self.hubspot_etlified_with(**) klass.hubspot_etlified_with( serializer: dummy_serializer, crm_object_type: :contact, - id_property: :external_id + id_property: :external_id, + stale_scope: ->(model, _crm) { model.all } ) conf = klass.etlify_crms[:hubspot] @@ -138,7 +140,8 @@ def self.hubspot_etlified_with(**) klass.hubspot_etlified_with( serializer: dummy_serializer, crm_object_type: :contact, - id_property: :external_id + id_property: :external_id, + stale_scope: ->(model, _crm) { model.all } ) expect(klass.etlify_crms.keys).to include(:salesforce, :hubspot) @@ -152,10 +155,24 @@ def self.hubspot_etlified_with(**) klass.hubspot_etlified_with( serializer: dummy_serializer, crm_object_type: :contact, - id_property: :external_id + id_property: :external_id, + stale_scope: ->(model, _crm) { model.all } ) end.to raise_error(RuntimeError, "boom") end + + it "raises when stale_scope does not respond to call" do + klass = build_including_class + described_class.define_crm_dsl_on(klass, :hubspot) + expect do + klass.hubspot_etlified_with( + serializer: dummy_serializer, + crm_object_type: :contact, + id_property: :external_id, + stale_scope: "not_callable" + ) + end.to raise_error(ArgumentError, /stale_scope must respond to #call/) + end end describe ".define_crm_instance_helpers_on" do diff --git a/spec/etlify/stale_records/batch_sync_spec.rb b/spec/etlify/stale_records/batch_sync_spec.rb index fa31f7a..502da18 100644 --- a/spec/etlify/stale_records/batch_sync_spec.rb +++ b/spec/etlify/stale_records/batch_sync_spec.rb @@ -12,6 +12,17 @@ let!(:company) { Company.create!(name: "CapSens", domain: "capsens.eu") } + let(:default_stale_scope) do + ->(model, crm_name) do + stale_sql = <<-SQL.squish + crm_synchronisations.id IS NULL + OR crm_synchronisations.crm_name != ? + OR crm_synchronisations.last_synced_at < #{model.table_name}.updated_at + SQL + model.left_joins(:crm_synchronisations).where(stale_sql, crm_name.to_s) + end + end + def create_user!(index:) User.create!( email: "user#{index}@example.com", @@ -28,11 +39,13 @@ def create_user!(index:) adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, salesforce: { adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, } ) @@ -64,11 +77,13 @@ def create_user!(index:) adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, salesforce: { adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, } ) @@ -99,11 +114,13 @@ def create_user!(index:) adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, salesforce: { adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, } ) @@ -139,6 +156,7 @@ def create_user!(index:) adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, } ) @@ -177,6 +195,7 @@ def create_user!(index:) adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, } ) @@ -232,11 +251,13 @@ def create_user!(index:) adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, salesforce: { adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, } ) @@ -270,11 +291,13 @@ def create_user!(index:) adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, salesforce: { adapter: Etlify::Adapters::NullAdapter.new, id_property: "email", crm_object_type: "contacts", + stale_scope: default_stale_scope, }, } diff --git a/spec/etlify/stale_records/finder_spec.rb b/spec/etlify/stale_records/finder_spec.rb index 041c275..1dfa56b 100644 --- a/spec/etlify/stale_records/finder_spec.rb +++ b/spec/etlify/stale_records/finder_spec.rb @@ -1,183 +1,6 @@ require "rails_helper" RSpec.describe Etlify::StaleRecords::Finder do - # ---------------- Schema bootstrap for dependency scenarios ---------------- - - before(:all) do - ActiveRecord::Schema.define do - create_table :profiles, force: true do |t| - t.integer :user_id - t.timestamps null: true - end - - create_table :notes, force: true do |t| - t.integer :user_id - t.string :body - t.timestamps null: true - end - - create_table :projects, force: true do |t| - t.string :name - t.timestamps null: true - end - - create_table :memberships, force: true do |t| - t.integer :user_id - t.integer :project_id - t.timestamps null: true - end - - create_table :uploads, force: true do |t| - t.string :owner_type - t.integer :owner_id - t.string :path - t.timestamps null: true - end - - create_table :activities, force: true do |t| - t.string :subject_type - t.integer :subject_id - t.timestamps null: true - end - - create_table :follows, force: true do |t| - t.integer :follower_id, null: false - t.integer :followee_id, null: false - t.timestamps null: true - end - - unless ActiveRecord::Base.connection - .column_exists?(:users, :avatarable_type) - add_column :users, :avatarable_type, :string - end - - unless ActiveRecord::Base.connection - .column_exists?(:users, :avatarable_id) - add_column :users, :avatarable_id, :integer - end - - create_table :photos, force: true do |t| - t.timestamps null: true - end - - create_table :documents, force: true do |t| - t.timestamps null: true - end - - create_table :tags, force: true do |t| - t.string :name - t.timestamps null: true - end - - create_table :tags_users, id: false, force: true do |t| - t.integer :tag_id - t.integer :user_id - end - - create_table :linkages, force: true do |t| - t.string :owner_type - t.integer :owner_id - t.integer :project_id - t.timestamps null: true - end - - create_table :subscriptions, force: true do |t| - # FK lives on source table -> references profiles.id - t.integer :users_profile_id - t.timestamps null: true - end - end - - User.reset_column_information - stub_models! - end - - # ----------------------------- Model helpers ------------------------------ - - def define_model_const(name) - Object.send(:remove_const, name) if Object.const_defined?(name) - klass = Class.new(ApplicationRecord) - klass.table_name = name.to_s.underscore.pluralize - yield klass if block_given? - Object.const_set(name, klass) - end - - def stub_models! - define_model_const("Profile") do |klass| - klass.belongs_to :user, optional: true - end - - define_model_const("Note") do |klass| - klass.belongs_to :user, optional: true - end - - define_model_const("Project") do |klass| - klass.has_many :memberships, dependent: :destroy - klass.has_many :users, through: :memberships - end - - define_model_const("Membership") do |klass| - klass.belongs_to :user - klass.belongs_to :project - end - - define_model_const("Upload") do |klass| - klass.belongs_to :owner, polymorphic: true, optional: true - end - - define_model_const("Activity") do |klass| - klass.belongs_to :subject, polymorphic: true, optional: true - end - - define_model_const("Linkage") do |klass| - klass.belongs_to :owner, polymorphic: true - klass.belongs_to :project - end - - define_model_const("Photo") - define_model_const("Document") - define_model_const("Tag") - - define_model_const("Subscription") do |klass| - klass.belongs_to :profile, - foreign_key: "users_profile_id", - optional: true - end - - define_model_const("Follow") do |klass| - klass.belongs_to :follower, class_name: "User", optional: false - klass.belongs_to :followee, class_name: "User", optional: false - end - - Profile.class_eval do - has_many :subscriptions, - class_name: "Subscription", - foreign_key: "users_profile_id", - dependent: :destroy - end - - # Extend User with associations needed by tests - User.class_eval do - has_one :profile, dependent: :destroy - has_many :notes, dependent: :destroy - has_many :memberships, dependent: :destroy - has_many :projects, through: :memberships - has_many :uploads, as: :owner, dependent: :destroy - has_many :activities, as: :subject, dependent: :destroy - belongs_to :avatarable, polymorphic: true, optional: true - has_and_belongs_to_many :tags, join_table: "tags_users" - has_many :linkages, as: :owner, dependent: :destroy - has_many :poly_projects, through: :linkages, source: :project - has_many :subscriptions, through: :profile - has_many :follows, class_name: "Follow", - foreign_key: "follower_id", - dependent: :destroy - has_many :followees, through: :follows, source: :followee - end - end - - # --------------------------------- Utils ---------------------------------- - def create_sync!(resource, crm:, last_synced_at:) CrmSynchronisation.create!( crm_name: crm.to_s, @@ -191,716 +14,204 @@ def now Time.now end - # Small helper to read ids for a given CRM on User def user_ids_for(crm) described_class.call(crm_name: crm)[User][crm].pluck(:id) end - # ---------------- Default multi-CRM config stub for User ------------------ - - before do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [ - :company, :notes, :profile, :projects, :uploads, :activities - ] - }, - salesforce: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "Id", - crm_object_type: "Lead", - dependencies: [:company] - } - } - ) - end - - # --------------------------- A. Model discovery --------------------------- - - describe ".call model discovery" do - it "includes AR descendants with config and existing table" do - user = User.create!(email: "a@b.c") - result = described_class.call - expect(result.keys).to include(User) - expect(result[User].keys).to include(:hubspot, :salesforce) - expect(result[User][:hubspot].arel.projections.size).to eq(1) - expect(result[User][:salesforce].arel.projections.size).to eq(1) - expect(user.id).to be_a(Integer) - end - - it "when crm_name is given, keeps only models configured for it" do - result = described_class.call(crm_name: :hubspot) - expect(result.keys).to include(User) - expect(result[User].keys).to eq([:hubspot]) - end - - it "when models: is given, restricts to that subset" do - result = described_class.call(models: [User]) - expect(result.keys).to eq([User]) - end - end - - # ------------------------------ B. Shape ---------------------------------- - - describe ".call return shape" do - it "returns { Model => { crm => relation } } for single CRM" do - result = described_class.call(crm_name: :hubspot) - expect(result).to be_a(Hash) - expect(result[User]).to be_a(Hash) - expect(result[User][:hubspot]).to be_a(ActiveRecord::Relation) - end - - it "includes one entry per CRM when multiple configured" do - result = described_class.call - expect(result[User].keys).to contain_exactly(:hubspot, :salesforce) - end - - it "relations select only primary key" do - relation = described_class.call[User][:hubspot] - projections = relation.arel.projections - expect(projections.size).to eq(1) - end - end + describe ".call" do + context "model discovery" do + it "includes AR descendants with etlify_crms and existing table" do + user = User.create!(email: "a@b.c") + result = described_class.call - # ----------------------- C. JOIN scoped to crm_name ----------------------- + expect(result.keys).to include(User) + expect(result[User].keys).to include(:hubspot) + expect(user.id).to be_a(Integer) + end - describe "JOIN scoped to crm_name" do - it "treats missing row for given crm as stale" do - user = User.create!(email: "x@x.x") - create_sync!(user, crm: :salesforce, last_synced_at: now) - expect(user_ids_for(:hubspot)).to include(user.id) - end + it "filters by crm_name when provided" do + result = described_class.call(crm_name: :hubspot) - it "stale only for the outdated CRM" do - user = User.create!(email: "x@x.x") - create_sync!(user, crm: :hubspot, last_synced_at: now - 3600) - create_sync!(user, crm: :salesforce, last_synced_at: now + 3600) - all_results = described_class.call - expect(all_results[User][:hubspot].pluck(:id)).to include(user.id) - expect(all_results[User][:salesforce].pluck(:id)) - .not_to include(user.id) - end + expect(result.keys).to include(User) + expect(result[User].keys).to eq([:hubspot]) + end - it "fresh for both CRMs yields no ids" do - user = User.create!(email: "x@x.x", updated_at: now - 10) - create_sync!(user, crm: :hubspot, last_synced_at: now) - create_sync!(user, crm: :salesforce, last_synced_at: now) - results = described_class.call - expect(results[User][:hubspot].pluck(:id)).not_to include(user.id) - expect(results[User][:salesforce].pluck(:id)).not_to include(user.id) - end - end + it "restricts to given models when provided" do + result = described_class.call(models: [User]) - # --------------------------- D. Staleness logic --------------------------- + expect(result.keys).to eq([User]) + end - describe "staleness threshold" do - it "missing crm_synchronisation row => stale" do - user = User.create!(email: "x@x.x") - expect(user_ids_for(:hubspot)).to include(user.id) - end + it "returns empty hash when no model qualifies" do + allow(User).to receive(:etlify_crms).and_return({}) - it "NULL last_synced_at acts like epoch and becomes stale" do - user = User.create!(email: "x@x.x") - CrmSynchronisation.create!( - crm_name: "hubspot", - resource_type: "User", - resource_id: user.id, - last_synced_at: nil - ) - expect(user_ids_for(:hubspot)).to include(user.id) - end + result = described_class.call(models: [User]) - it "compares strictly: < stale, == ok, > ok" do - time_zero = now - user = User.create!(email: "x@x.x", updated_at: time_zero) - create_sync!(user, crm: :hubspot, last_synced_at: time_zero - 1) - expect(user_ids_for(:hubspot)).to include(user.id) - - CrmSynchronisation.where( - resource_id: user.id, crm_name: "hubspot" - ).update_all(last_synced_at: time_zero) - expect(user_ids_for(:hubspot)).not_to include(user.id) - - CrmSynchronisation.where( - resource_id: user.id, crm_name: "hubspot" - ).update_all(last_synced_at: time_zero + 1) - expect(user_ids_for(:hubspot)).not_to include(user.id) + expect(result).to eq({}) + end end - it "no dependencies => threshold is owner's updated_at" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [] - } - } - ) - user = User.create!(email: "x@x.x", updated_at: now) - create_sync!(user, crm: :hubspot, last_synced_at: now - 1) - expect(user_ids_for(:hubspot)).to include(user.id) - end - end + context "return shape" do + it "returns { Model => { crm => relation } }" do + result = described_class.call(crm_name: :hubspot) - # -------------------------- E. Direct dependencies ------------------------ + expect(result).to be_a(Hash) + expect(result[User]).to be_a(Hash) + expect(result[User][:hubspot]).to be_a(ActiveRecord::Relation) + end - describe "dependencies direct associations" do - it "belongs_to: updating company makes user stale" do - company = Company.create!(name: "ACME") - user = User.create!(email: "u@x.x", company: company) - create_sync!(user, crm: :hubspot, last_synced_at: now) - company.update!(updated_at: now + 10) - expect(user_ids_for(:hubspot)).to include(user.id) - end + it "relations select only id column" do + relation = described_class.call[User][:hubspot] - it "belongs_to missing target falls back to epoch safely" do - user = User.create!(email: "u@x.x", company: nil) - create_sync!(user, crm: :hubspot, last_synced_at: now + 10) - expect(user_ids_for(:hubspot)).not_to include(user.id) - end + expect(relation.select_values).to include(:id) + end - it "has_one: updating profile makes user stale" do - user = User.create!(email: "u@x.x") - profile = user.create_profile! - create_sync!(user, crm: :hubspot, last_synced_at: now) - profile.update!(updated_at: now + 10) - expect(user_ids_for(:hubspot)).to include(user.id) - end + it "orders ids ascending for stable batching" do + first_user = User.create!(email: "a@x.x") + second_user = User.create!(email: "b@x.x") - it "has_many: newest note updated makes user stale" do - user = User.create!(email: "u@x.x") - user.notes.create!(body: "a", updated_at: now) - user.notes.create!(body: "b", updated_at: now + 20) - create_sync!(user, crm: :hubspot, last_synced_at: now + 5) - expect(user_ids_for(:hubspot)).to include(user.id) - end + ids = user_ids_for(:hubspot) - it "polymorphic has_many via :as ignores unrelated rows" do - first_user = User.create!(email: "u1@x.x") - second_user = User.create!(email: "u2@x.x") - first_user.uploads.create!(path: "p1", updated_at: now) - second_user.uploads.create!(path: "p2", updated_at: now + 60) - create_sync!(first_user, crm: :hubspot, last_synced_at: now + 10) - expect(user_ids_for(:hubspot)).not_to include(first_user.id) + expect(ids).to eq(ids.sort) + expect(ids).to include(first_user.id, second_user.id) + end end - end - # ----------- F. Through / polymorphic belongs_to (child side) ------------- + context "stale_scope integration" do + it "uses the stale_scope to find stale records" do + user = User.create!(email: "x@x.x", updated_at: now) - describe "through and polymorphic belongs_to" do - it "has_many :through: source newer marks user stale" do - user = User.create!(email: "u@x.x") - project = Project.create!(name: "P") - Membership.create!(user: user, project: project) - create_sync!(user, crm: :hubspot, last_synced_at: now) - project.update!(updated_at: now + 30) - expect(user_ids_for(:hubspot)).to include(user.id) - end + expect(user_ids_for(:hubspot)).to include(user.id) + end - it "polymorphic child: newest concrete subject wins" do - user = User.create!(email: "u@x.x") - activity = Activity.create!(subject: user, updated_at: now) - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:activities] - } - } - ) - create_sync!(user, crm: :hubspot, last_synced_at: now + 1) - expect(user_ids_for(:hubspot)).not_to include(user.id) - activity.update!(updated_at: now + 10) - expect(user_ids_for(:hubspot)).to include(user.id) - end + it "excludes synced records when last_synced_at >= updated_at" do + user = User.create!(email: "x@x.x", updated_at: now - 10) + create_sync!(user, crm: :hubspot, last_synced_at: now) - it "polymorphic with non-constantizable type is ignored safely" do - user = User.create!(email: "u@x.x") - timestamp_str = now.utc.strftime("%Y-%m-%d %H:%M:%S") - Activity.connection.execute( - "INSERT INTO activities (subject_type, subject_id, created_at, " \ - "updated_at) VALUES ('Nope::Missing', 123, '#{timestamp_str}', " \ - "'#{timestamp_str}')" - ) - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:activities] - } - } - ) - create_sync!(user, crm: :hubspot, last_synced_at: now + 5) - expect(user_ids_for(:hubspot)).not_to include(user.id) - end + expect(user_ids_for(:hubspot)).not_to include(user.id) + end - it "has_many :through with polymorphic through adds type predicate" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:poly_projects] - } - } - ) + it "includes records when last_synced_at < updated_at" do + user = User.create!(email: "x@x.x", updated_at: now) + create_sync!(user, crm: :hubspot, last_synced_at: now - 10) - user = User.create!(email: "t@x.x") - project = Project.create!(name: "P", updated_at: now) - Linkage.create!(owner: user, project: project) + expect(user_ids_for(:hubspot)).to include(user.id) + end - create_sync!(user, crm: :hubspot, last_synced_at: now + 1) - expect(user_ids_for(:hubspot)).not_to include(user.id) + it "includes records with no sync record" do + user = User.create!(email: "x@x.x") - project.update!(updated_at: now + 20) - expect(user_ids_for(:hubspot)).to include(user.id) + expect(user_ids_for(:hubspot)).to include(user.id) + end end - describe "has_many :through where FK lives on source table" do - it "marks owner stale when a source row becomes newer" do + context "stale_scope validation" do + it "raises when stale_scope returns non-relation" do allow(User).to receive(:etlify_crms).and_return( { hubspot: { adapter: Etlify::Adapters::NullAdapter.new, id_property: "id", crm_object_type: "contacts", - dependencies: [:subscriptions] - } + stale_scope: ->(_model, _crm) { [1, 2, 3] }, + }, } ) - user = User.create!(email: "x@x.x") - profile = Profile.create!(user: user, updated_at: now) - subscription = Subscription.create!( - users_profile_id: profile.id, - updated_at: now - ) - - create_sync!(user, crm: :hubspot, last_synced_at: now + 1) - expect(user_ids_for(:hubspot)).not_to include(user.id) - - subscription.update!(updated_at: now + 20) - expect(user_ids_for(:hubspot)).to include(user.id) + expect do + described_class.call(crm_name: :hubspot) + end.to raise_error(ArgumentError, /must return an ActiveRecord::Relation/) end end - end - - # --------------- Owner side belongs_to polymorphic (avatarable) ----------- - - describe "owner belongs_to polymorphic dependency" do - it "ignores owner-side polymorphic belongs_to (falls back to epoch)" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:avatarable] - } - } - ) - - user = User.create!(email: "p@x.x") - photo = Photo.create!(updated_at: now) - user.avatarable = photo - user.updated_at = now - user.save! - - # Since owner-side polymorphic belongs_to is ignored (epoch), - # updating the target should NOT make the owner stale. - create_sync!(user, crm: :hubspot, last_synced_at: now + 1) - expect(user_ids_for(:hubspot)).not_to include(user.id) - - photo.update!(updated_at: now + 20) - expect(user_ids_for(:hubspot)).not_to include(user.id) - end - - it "returns epoch when no concrete types exist (parts empty)" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:avatarable] - } - } - ) - user = User.create!(email: "q@x.x") - create_sync!(user, crm: :hubspot, last_synced_at: now + 10) - expect(user_ids_for(:hubspot)).not_to include(user.id) - end - end - - # -------------------------------- HABTM ----------------------------------- - - describe "HABTM dependency" do - it "marks stale when a tag becomes newer than last_sync" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:tags] - } - } - ) - user = User.create!(email: "habtm@x.x", updated_at: now) - tag = Tag.create!(name: "x", updated_at: now) - user.tags << tag - - create_sync!(user, crm: :hubspot, last_synced_at: now + 10) - expect(user_ids_for(:hubspot)).not_to include(user.id) - tag.update!(updated_at: now + 30) - expect(user_ids_for(:hubspot)).to include(user.id) - end - end - - # ---------------- Self-join has_many :through aliasing ------------------- - - describe "self-join has_many :through aliasing" do - it "aliases source table to avoid PG::DuplicateAlias on Postgres" do - # Configure Finder to scan the self-join dependency - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - # The dependency below triggers a users->users self-join - dependencies: [:followees] - } - } - ) - - # Minimal data to build an executable relation - follower = User.create!(email: "f@x.x", updated_at: now) - followee = User.create!(email: "g@x.x", updated_at: now) - Follow.create!( - follower_id: follower.id, - followee_id: followee.id, - updated_at: now - ) - create_sync!(follower, crm: :hubspot, last_synced_at: now + 1) - - relation = described_class.call(crm_name: :hubspot)[User][:hubspot] - sql = relation.to_sql - - # The SQL must alias the source users table (e.g. "users" AS "users_src") - expect(sql).to match(/INNER\s+JOIN\s+"users"\s+AS\s+"users_src"/i) - - # And it must be executable (no PG::DuplicateAlias at runtime) - expect { relation.to_a }.not_to raise_error - end - end - - # ---------------- Nested has_many :through (profile -> join -> questionnaire) -- - - describe "nested has_many :through (Profile -> Join -> SuitabilityQuestionnaire)" do - before(:context) do - conn = ActiveRecord::Base.connection - - unless conn.data_source_exists?("users_profiles_suitability_questionnaires") - ActiveRecord::Schema.define do - create_table :users_profiles_suitability_questionnaires, force: true do |t| - t.integer :users_profile_id, null: false - t.integer :suitability_questionnaire_id, null: false - t.timestamps null: true - end - add_index :users_profiles_suitability_questionnaires, - [:users_profile_id, :suitability_questionnaire_id], - unique: true, - name: "idx_upsq_on_profile_and_questionnaire" - end - end - - unless conn.data_source_exists?("capsens_suitability_questionnaire_questionnaires") - ActiveRecord::Schema.define do - create_table :capsens_suitability_questionnaire_questionnaires, force: true do |t| - t.timestamps null: true + context "with stale_scope as query object" do + let(:query_object) do + Class.new do + def self.call(model, crm_name) + stale_sql = <<-SQL.squish + crm_synchronisations.id IS NULL + OR crm_synchronisations.crm_name != ? + OR crm_synchronisations.last_synced_at < users.updated_at + SQL + model + .left_joins(:crm_synchronisations) + .where(stale_sql, crm_name.to_s) end end end - # === Model stubs (table names alignés sur la prod) ====================== - - Object.send(:remove_const, "SuitabilityQuestionnaire") \ - if Object.const_defined?("SuitabilityQuestionnaire") - class SuitabilityQuestionnaire < ApplicationRecord - self.table_name = "capsens_suitability_questionnaire_questionnaires" - end - - Object.send(:remove_const, "ProfilesSuitabilityQuestionnaire") \ - if Object.const_defined?("ProfilesSuitabilityQuestionnaire") - class ProfilesSuitabilityQuestionnaire < ApplicationRecord - self.table_name = "users_profiles_suitability_questionnaires" - - belongs_to :profile, - class_name: "Profile", - foreign_key: "users_profile_id", - optional: false - belongs_to :suitability_questionnaire, - class_name: "SuitabilityQuestionnaire", - optional: false + before do + allow(User).to receive(:etlify_crms).and_return( + { + hubspot: { + adapter: Etlify::Adapters::NullAdapter.new, + id_property: "id", + crm_object_type: "contacts", + stale_scope: query_object, + }, + } + ) end - # === Missing associations on Profile / User ========================= - - Profile.class_eval do - has_many :profiles_suitability_questionnaires, - class_name: "ProfilesSuitabilityQuestionnaire", - foreign_key: "users_profile_id", - dependent: :destroy + it "works with a query object class responding to .call" do + user = User.create!(email: "query@object.test", updated_at: now) - has_many :suitability_questionnaires, - through: :profiles_suitability_questionnaires, - source: :suitability_questionnaire - end - - User.class_eval do - has_many :suitability_questionnaires, - through: :profile, - source: :suitability_questionnaires + expect(user_ids_for(:hubspot)).to include(user.id) end - end - - it "marks user stale when a nested-through suitability questionnaire updates" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:suitability_questionnaires] - } - } - ) - - t0 = now - user = User.create!(email: "nested@x.x", updated_at: t0) - profile = Profile.create!(user: user, updated_at: t0) - - questionnaire = SuitabilityQuestionnaire.create!(created_at: t0, updated_at: t0) - ProfilesSuitabilityQuestionnaire.create!( - users_profile_id: profile.id, - suitability_questionnaire_id: questionnaire.id, - created_at: t0, - updated_at: t0 - ) - - # Fresh sync at t0 + 1 → not stale - create_sync!(user, crm: :hubspot, last_synced_at: t0 + 1) - expect(user_ids_for(:hubspot)).not_to include(user.id) - - # Update questionnaire at t0 + 20 → user becomes stale - questionnaire.update!(updated_at: t0 + 20) - expect(user_ids_for(:hubspot)).to include(user.id) - end - end - # -------------------------- G. Timestamp edge cases ----------------------- + it "excludes synced records" do + user = User.create!(email: "query@object.test", updated_at: now - 10) + create_sync!(user, crm: :hubspot, last_synced_at: now) - describe "timestamp edge cases" do - it "NULL updated_at are treated as epoch (no crash)" do - user = User.create!(email: "u@x.x") - note = user.notes.create!(body: "n") - Note.where(id: note.id).update_all(updated_at: nil) - create_sync!(user, crm: :hubspot, last_synced_at: now + 10) - expect(user_ids_for(:hubspot)).not_to include(user.id) - end - - it "children NULL updated_at does not mark stale unless owner newer" do - user = User.create!(email: "u@x.x", updated_at: now) - note = user.notes.create!(body: "n") - Note.where(id: note.id).update_all(updated_at: nil) - create_sync!(user, crm: :hubspot, last_synced_at: now + 5) - expect(user_ids_for(:hubspot)).not_to include(user.id) - end - end - - # ---------------- Adapter portability (integration-level) ----------------- - - describe "adapter portability (integration)" do - it "uses GREATEST on Postgres and MAX on SQLite with multiple deps" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:notes, :profile] - } - } - ) - relation = described_class.call(crm_name: :hubspot)[User][:hubspot] - sql_query = relation.to_sql - adapter_name = ActiveRecord::Base.connection.adapter_name.to_s.downcase - if adapter_name.include?("postgres") - expect(sql_query).to match(/GREATEST\(/i) - else - expect(sql_query).to match(/MAX\(/i) + expect(user_ids_for(:hubspot)).not_to include(user.id) end end - it "generates executable SQL (proper quoting) on current DB" do - relation = described_class.call(crm_name: :hubspot)[User][:hubspot] - expect { relation.to_a }.not_to raise_error - end - end - - # -------------------- I. CRM-specific dependencies isolation -------------- - - describe "CRM-specific dependencies isolation" do - it "changing a dep for CRM A does not mark CRM B stale" do - user = User.create!(email: "a@x.x") - company = Company.create!(name: "ACME") - user.update!(company: company) - create_sync!(user, crm: :hubspot, last_synced_at: now) - create_sync!(user, crm: :salesforce, last_synced_at: now) - user.notes.create!(body: "x", updated_at: now + 30) - results = described_class.call - expect(results[User][:hubspot].pluck(:id)).to include(user.id) - expect(results[User][:salesforce].pluck(:id)).not_to include(user.id) - end - - it "changing a dep for CRM B marks stale only for CRM B" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "id", - crm_object_type: "contacts", - dependencies: [:notes] - }, - salesforce: { - adapter: Etlify::Adapters::NullAdapter.new, - id_property: "Id", - crm_object_type: "Lead", - dependencies: [:company] - } - } - ) - user = User.create!(email: "b@x.x") - company = Company.create!(name: "ACME") - user.update!(company: company) - create_sync!(user, crm: :hubspot, last_synced_at: now + 30) - create_sync!(user, crm: :salesforce, last_synced_at: now) - company.update!(updated_at: now + 60) - results = described_class.call - expect(results[User][:salesforce].pluck(:id)).to include(user.id) - expect(results[User][:hubspot].pluck(:id)).not_to include(user.id) - end - end - - # --------------------------- J. Empty / absent CRM ------------------------ - - describe "empty and absent CRM cases" do - it "omits models not configured for targeted crm_name" do - allow(User).to receive(:etlify_crms).and_return( - { hubspot: User.etlify_crms[:hubspot] } - ) - results = described_class.call(crm_name: :salesforce) - expect(results).to eq({}) - end - - it "returns {} when no model qualifies" do - klass = Class.new(ApplicationRecord) do - self.table_name = "projects" - def self.etlify_crms = {} - end - Object.const_set("NopeModel", klass) - results = described_class.call(models: [NopeModel]) - expect(results).to eq({}) - ensure - if Object.const_defined?("NopeModel") - Object.send(:remove_const, "NopeModel") + context "multi-CRM support" do + let(:stale_scope) do + ->(model, crm_name) do + stale_sql = <<-SQL.squish + crm_synchronisations.id IS NULL + OR crm_synchronisations.crm_name != ? + OR crm_synchronisations.last_synced_at < users.updated_at + SQL + model.left_joins(:crm_synchronisations).where(stale_sql, crm_name.to_s) + end end - end - - it "relation exists but can be empty when nothing is stale" do - user = User.create!(email: "ok@x.x", updated_at: now - 1) - create_sync!(user, crm: :hubspot, last_synced_at: now) - relation = described_class.call(crm_name: :hubspot)[User][:hubspot] - expect(relation).to be_a(ActiveRecord::Relation) - expect(relation.pluck(:id)).to be_empty - end - end - - # ------------------------------ Robustness -------------------------------- - - describe "robustness" do - it "ignores unknown dependency names" do - allow(User).to receive(:etlify_crms).and_return( - { - hubspot: User.etlify_crms[:hubspot].merge( - dependencies: [:does_not_exist] - ) - } - ) - user = User.create!(email: "u@x.x", updated_at: now) - create_sync!(user, crm: :hubspot, last_synced_at: now + 10) - expect(user_ids_for(:hubspot)).not_to include(user.id) - end - - it "uses a single LEFT OUTER JOIN and exposes a single id column" do - relation = described_class.call(crm_name: :hubspot)[User][:hubspot] - sql_query = relation.to_sql - - # Inner subquery has exactly one LEFT OUTER JOIN on crm_synchronisations - expect(sql_query.scan(/LEFT OUTER JOIN/i).size).to eq(1) - - # Outer select exposes a single 'id' column from the subquery alias - expect(relation.arel.projections.size).to eq(1) - expect(sql_query).to match(/SELECT\s+"users"\."id"/i) - end - - it "quotes names safely to avoid crashes with reserved words" do - relation = described_class.call(crm_name: :hubspot)[User][:hubspot] - expect { relation.to_a }.not_to raise_error - end - - it "orders ids ascending for stable batching" do - first_user = User.create!(email: "a@x.x") - second_user = User.create!(email: "b@x.x") - user_ids = described_class.call(crm_name: :hubspot)[User][:hubspot] - .pluck(:id) - expect(user_ids).to eq(user_ids.sort) - expect(user_ids).to include(first_user.id, second_user.id) - end - it "skips models that define etlify_crms but have no table" do - klass = Class.new(ApplicationRecord) do - self.table_name = "nope_table" - def self.etlify_crms + before do + allow(User).to receive(:etlify_crms).and_return( { hubspot: { adapter: Etlify::Adapters::NullAdapter.new, id_property: "id", crm_object_type: "contacts", - dependencies: [] - } + stale_scope: stale_scope, + }, + salesforce: { + adapter: Etlify::Adapters::NullAdapter.new, + id_property: "Id", + crm_object_type: "Lead", + stale_scope: stale_scope, + }, } - end + ) end - Object.const_set("PhantomModel", klass) - results = described_class.call(models: [PhantomModel], crm_name: :hubspot) - expect(results).to eq({}) - ensure - if Object.const_defined?("PhantomModel") - Object.send(:remove_const, "PhantomModel") + + it "returns entries for each configured CRM" do + result = described_class.call + + expect(result[User].keys).to contain_exactly(:hubspot, :salesforce) + end + + it "scopes staleness per CRM independently" do + user = User.create!(email: "x@x.x", updated_at: now) + create_sync!(user, crm: :hubspot, last_synced_at: now + 10) + + results = described_class.call + + expect(results[User][:hubspot].pluck(:id)).not_to include(user.id) + expect(results[User][:salesforce].pluck(:id)).to include(user.id) end end end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 17668de..171d945 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -95,12 +95,23 @@ def build_crm_payload(crm_name:) Etlify::Serializers::UserSerializer.new(self).as_crm_payload end + STALE_SQL = <<-SQL.squish + crm_synchronisations.id IS NULL + OR crm_synchronisations.crm_name != ? + OR crm_synchronisations.last_synced_at < users.updated_at + SQL + def self.etlify_crms { hubspot: { adapter: Etlify::Adapters::NullAdapter.new, id_property: "id", crm_object_type: "contacts", + stale_scope: ->(model, crm_name) do + model + .left_joins(:crm_synchronisations) + .where(STALE_SQL, crm_name.to_s) + end, }, } end