Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
416 changes: 134 additions & 282 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ models:

vars:
segment_product_analytics:
segment_schema: "test_segment_export" # This is where your Segment tables are located
segment_schema: "SEGMENT_EXPORT_SAMPLE" # This is where your Segment tables are located
99 changes: 0 additions & 99 deletions macros/build_point_in_time_user_properties.sql

This file was deleted.

1 change: 1 addition & 0 deletions macros/get_event_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
{% set relations = [] %}

{% if execute %}
{{ log("Found tables: " ~ results.rows, info=True) }}
{% for row in results %}
{% set relation = api.Relation.create(
database=database,
Expand Down
74 changes: 49 additions & 25 deletions macros/union_event_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,42 @@
database: The database to search in (defaults to target.database)

Returns:
A SQL query that unions all event tables with consistent column names and types
A dict with:
- sql: SQL query that unions all event tables with consistent column names and types
- user_columns: list of user property columns from identifies table
#}
{% macro union_event_tables(schema, database=target.database) %}
{%- set event_relations = get_event_tables(schema=schema) -%}
{# Prevent querying of db in parsing mode. This works because this macro does not create any new refs. -#}
{%- if not execute -%}
{{ return('') }}
{%- set custom_event_relations = get_event_tables(schema=schema) -%}
{%- if not execute -%}
{{ return({'sql': '', 'user_columns': []}) }}
{%- endif -%}

{{ log("Custom event relations: " ~ custom_event_relations, info=True) }}

{%- set user_columns = [] -%}
{%- set union_sqls = [] -%}

{# Always include standard tables #}
{%- set standard_table_names = ['tracks', 'pages', 'identifies'] -%}
{%- set standard_event_relations = [] -%}
{%- for table_name in standard_table_names -%}
{%- set relation = api.Relation.create(
database=database,
schema=schema,
identifier=table_name if target.type == 'bigquery' else table_name.upper()
) -%}
{%- do standard_event_relations.append(relation) -%}
{%- endfor -%}

{{ log("Standard event relations: " ~ standard_event_relations, info=True) }}

{# Combine standard and custom event tables #}
{%- set event_relations = standard_event_relations + custom_event_relations -%}

{%- for relation in event_relations -%}
{%- set cols = adapter.get_columns_in_relation(relation) -%}
{%- set context_columns = [] -%}
{%- set custom_columns = [] -%}
{%- set user_columns = [] -%}
{%- set has_event_text = false -%}
{%- set excluded_columns = ['ANONYMOUS_ID', 'USER_ID', 'ID', 'TIMESTAMP', 'SENT_AT', 'RECEIVED_AT', 'ORIGINAL_TIMESTAMP', 'UUID_TS', 'EVENT', 'EVENT_TEXT'] -%}
{%- for col in cols -%}
Expand All @@ -34,35 +56,37 @@
{%- if col.column.startswith('CONTEXT_') -%}
{%- do context_columns.append(col.column) -%}
{%- elif not col.column.startswith('CONTEXT_') and col.column not in excluded_columns -%}
{%- if relation.identifier == 'identifies' -%}
{%- if relation.identifier | lower == 'identifies' -%}
{%- do user_columns.append(col.column) -%}
{%- else -%}
{%- do custom_columns.append(col.column) -%}
{%- endif -%}
{%- endif -%}
{%- endfor %}

SELECT
{{ log("Processing table: " ~ relation.identifier ~ " with event_text: " ~ has_event_text, info=True) }}

{%- set sql -%}
(SELECT
ANONYMOUS_ID as device_id,
USER_ID as user_id,
{% if relation.identifier == 'tracks' %}
{% if relation.identifier | lower == 'tracks' %}
{% if has_event_text %}
EVENT_TEXT
{% else %}
EVENT
{% endif %}
{% elif relation.identifier == 'pages' %}
{% elif relation.identifier | lower == 'pages' %}
'Page Viewed'
{% elif relation.identifier == 'identifies' %}
{% elif relation.identifier | lower == 'identifies' %}
'User Identified'
{% else %}
COALESCE(EVENT, '{{ relation.identifier }}')
{% endif %} as event_name,
ID as event_id,
MESSAGE_ID as event_id,
TIMESTAMP as server_ts,
SENT_AT as device_ts,
{% if target.type == 'bigquery' %}
-- Merge context fields and custom properties into properties JSON
TO_JSON(
STRUCT(
{% for column in context_columns %}
Expand All @@ -75,8 +99,7 @@
{% endfor %}
)
) as properties,
-- For identifies table, collect user traits into user_properties
{% if relation.identifier == 'identifies' %}
{% if relation.identifier | lower == 'identifies' %}
TO_JSON(
STRUCT(
{% for column in user_columns %}
Expand All @@ -89,32 +112,33 @@
JSON '{}' as user_properties
{% endif %}
{% else %}
-- Snowflake version
object_construct(
{%- for column in context_columns -%}
'{{ column[8:] | lower }}', {{ column }}
{%- if not loop.last -%}, {% endif -%}
{%- if not loop.last or custom_columns -%}, {% endif -%}
{%- endfor -%}
{%- if context_columns and custom_columns -%}, {% endif -%}
{%- for column in custom_columns -%}
'{{ column | lower }}', {{ column }}
{%- if not loop.last -%}, {% endif -%}
{%- endfor -%}
) as properties,
{% if relation.identifier == 'identifies' %}
)::VARIANT as properties,
{% if relation.identifier | lower == 'identifies' %}
object_construct(
{%- for column in user_columns -%}
'{{ column | lower }}', {{ column }}
{%- if not loop.last -%}, {% endif -%}
{%- endfor -%}
) as user_properties
)::VARIANT as user_properties
{% else %}
object_construct() as user_properties
object_construct()::VARIANT as user_properties
{% endif %}
{% endif %}
FROM {{ relation }}
{% if not loop.last -%}
UNION ALL
{%- endif -%}
LIMIT 1000)
{%- endset %}
{%- do union_sqls.append(sql) -%}
{%- endfor -%}

{%- set union_sql = union_sqls | join('\nUNION ALL\n') -%}
{{ return({'sql': union_sql, 'user_columns': user_columns}) }}
{%- endmacro -%}
12 changes: 8 additions & 4 deletions models/allevents.sql
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
{{ config(materialized='table') }}

{% set union_result = union_event_tables(schema=var('segment_schema')) %}
{% set user_columns = union_result['user_columns'] %}

with union_event_stream as (
{{ union_event_tables(schema=var('segment_schema'))}}
{{ union_result['sql'] }}
),

user_properties_timeline as (
SELECT
USER_ID as user_id,
ANONYMOUS_ID as anonymous_id,
TIMESTAMP as event_ts,
{% if target.type == 'bigquery' %}
TO_JSON(
Expand All @@ -25,8 +29,8 @@ user_properties_timeline as (
{%- endfor -%}
) as user_properties
{% endif %}
FROM {{ source(var('segment_schema'), 'identifies') }}
WHERE USER_ID IS NOT NULL
FROM {{ source('segment_schema', 'identifies') }}
WHERE USER_ID IS NOT NULL OR ANONYMOUS_ID IS NOT NULL
)

SELECT
Expand All @@ -50,5 +54,5 @@ SELECT
END as user_properties
FROM union_event_stream e
LEFT JOIN user_properties_timeline up
ON up.user_id = e.user_id
ON (up.user_id = e.user_id OR up.anonymous_id = e.device_id)
AND up.event_ts <= e.server_ts
12 changes: 0 additions & 12 deletions models/schema.yml

This file was deleted.

6 changes: 3 additions & 3 deletions models/staging/segment_sources.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
version: 2

sources:
- name: segment_export
database: ld-product-analytics-dev
schema: test_segment_export
- name: segment_schema
database: LD_PRODUCT_ANALYTICS
schema: SEGMENT_EXPORT_SAMPLE
tables:
- name: tracks
identifier: "{% if target.type == 'bigquery' %}tracks{% else %}TRACKS{% endif %}"
Expand Down
12 changes: 0 additions & 12 deletions ~/.dbt/profiles.yml

This file was deleted.