GitHunt
OL

olivierbenard/cdc-data-platform-reference

Reference implementation of a CDC-driven data platform focusing on event ordering, idempotency, and append-only ingestion with explicit trade-offs in data modeling and privacy boundaries.

CDC Data Platform - DuckDB + Airflow Reference Implementation

License
CI Status
Python
DuckDB
Airflow

Why this project exists

This project is a reference implementation of a Change Data Capture (CDC) pipeline designed to explore correctness, ordering, and auditability in event-driven data systems.

In CDC-based architectures, data (log events) arrives as a stream of events that may be duplicated, delayed, or delivered out of order. Building reliable analytical datasets on top of such inputs requires explicit handling of these characteristics.

This repository was created to make those challenges and their solutions explicit.

It focuses on:

  • preserving the raw event log as a complete and immutable source of truth
  • handling out-of-order and duplicated events deterministically
  • enforcing idempotency across pipeline re-executions
  • separating ingestion, parsing, and business transformations into clear layers (raw -> staging_pii -> staging -> marts)
  • introducing privacy boundaries as part of the data model

The goal is not to optimize for minimal implementation, but to surface the architectural decisions required to build a correct and auditable CDC pipeline.

For a complementary perspective focused on batch-oriented analytics pipelines and transformation modelling, see the
analytics-batch-pipeline-reference project.

Together, both projects illustrate how ingestion design shifts depending on whether the system is event-driven (CDC) or batch-oriented.

What this demonstrates

This project focuses on the design principles required to build reliable CDC-driven data platforms.

Immutable raw ingestion

  • The raw layer is strictly append-only, with no primary key or uniqueness constraints
  • All events are preserved as received, ensuring full auditability and replay capability
  • Data quality issues are not rejected at ingestion but made observable downstream
  • Additional fields are added into the table's schema for full auditability of the ingestion event.

This contrasts with the batch pipeline reference, where ingestion-time constraints (e.g. payload hash-based deduplication) is introduced to simplify downstream processing.

Deterministic event processing

  • Deduplication is performed using CDC-native identifiers (event_uuid)
  • Ordering is explicitly defined (source_timestamp → read_timestamp → event_uuid)
  • The pipeline produces consistent results regardless of ingestion order

Idempotency by design

  • Reprocessing the same input produces identical outputs
  • MERGE-based transformations ensure safe incremental updates
  • Duplicate events do not lead to duplicated business records

Layered data architecture

  • Clear separation between:
    • raw (immutable event log)
    • staging_pii (typed and deduplicated events)
    • staging (privacy-safe transformations)
    • marts (business-facing datasets)
  • Each layer has an explicit contract and responsibility

Data is a liability before it is an asset

  • Only parsing and exposing what is needed downstream is critical.
  • Keeping all raw fields forever in the analytical layer is an anti-pattern because it creates unnecessary re-identification risk, schema fragility, and cognitive overhead for consumers.
  • Instead, the raw layer stores the complete payload envelope, and each downstream layer explicitly selects what it promotes.

Handling of CDC-specific challenges

  • Out-of-order events
  • Late-arriving data
  • Duplicate delivery from upstream systems
  • Logical deletes and state reconstruction

Data privacy as a structural concern

  • Explicit separation between PII and privacy-safe layers
  • Controlled field promotion across layers
  • Transformation of sensitive attributes into analytical representations

Trade-off driven ingestion design

  • No constraints at ingestion time maximizes auditability and resilience
  • Deduplication and correctness are enforced in transformation layers rather than at write time

This illustrates a different design point compared to batch-oriented pipelines, where constraints may be introduced earlier in the pipeline to reduce downstream complexity.

Debuggability over strict constraints

  • Raw data ingestion never fails due to schema constraints.
  • If a record is malformed, it lands in raw and is handled (flagged, quarantined, or skipped) in the transformation layers.
  • This prevents silent data loss and makes anomalies inspectable.

Overview

This repository is a production-grade reference implementation of a local-first analytical data platform built to ingest, process, and expose Change Data Capture (CDC) events originating from a MongoDB source, transforming them into analytics-ready tables using DuckDB as the embedded OLAP engine and Apache Airflow for orchestration.

The platform supports three execution modes, each targeting a different stage of the development lifecycle:

Mode Command Use Case
Python CLI (Poetry) make run_python_script Local development, step-by-step debugging
Docker Service make run_python_service Reproducible execution, CI-like environment
Airflow DAG make airflow Orchestrated, production-like scheduling

What the pipeline does

  1. Ingests partitioned JSONL CDC event files (organized by date/hour partitions)
  2. Applies CDC ordering semantics: correctly sequencing INSERT, UPDATE, and DELETE operations even when events arrive out-of-order or duplicated
  3. Enforces data privacy transformations: isolating and removing PII at explicit layer boundaries
  4. Produces a current-state user snapshot: a clean, analytics-ready marts.users_current table with one row per user

Prerequisites

Tool Version Notes
Docker any recent Container execution
Docker Compose V2 docker compose (not docker-compose)
GNU Make any Entry points
Python >= 3.12 Required

For local development:

Tool Version Notes
Poetry >= 1.8.3 Dependency management

Quick validation:

docker --version
docker compose version
make --version
python --version
poetry --version

Testing & Branch Coverage (Development)

To run the test suite:

make checks

The fail_under = 80 threshold acts as a quality gate. It prevents merging code that significantly reduces test coverage without a deliberate decision to accept it.

How it runs in CI

The GitHub Actions workflow runs the full test suite with branch coverage on every push and pull request.

Source Data

The input reference dataset is provided as a .tar archive under data/. It contains partitioned JSONL files organised by ingestion timestamp, following the layout data/users/YYYY/MM/DD/HH/.

To extract it:

make unzip_data

Each JSONL file contains one CDC event per line. A single event envelope looks like this:

{
    "uuid": "3af1d1af-1de1-4e33-91cd-7c2bf250ff65",
    "read_timestamp": "2026-02-05T16:52:41.017737Z",
    "source_timestamp": "2026-02-05T16:23:00.017737Z",
    "object": "users",
    "read_method": "mongodb-change-stream",
    "stream_name": "mongodb-test-stream",
    "sort_keys": [1770304980, 2796],
    "source_metadata": {
        "database": "app",
        "collection": "users",
        "primary_keys": ["_id"],
        "change_type": "INSERT",
        "is_deleted": false,
        "cluster_time": {"t": 1770304980, "i": 2796},
        "txn_number": null,
        "lsid": null
    },
    "payload": {
        "_id": "2d85cdb5399741789e67e27b",
        "firstname": "Taylor",
        "lastname": "Gordon",
        "email": "erin5615@example.com",
        "phone": "001-811-254-4899x659",
        "birthday": "1982-04-12",
        "gender": "female",
        "address": {
            "street": "41569 Hayley Rapid Suite 516",
            "streetName": "Sawyer Unions",
            "buildingNumber": "25430",
            "city": "Livingstonside",
            "zipcode": "17064",
            "country": "Italy",
            "country_code": "PG",
            "latitude": -45.864413,
            "longitude": 38.167421},
            "website": "https://brooks-carlson.com/",
            "image": "http://placeimg.com/640/480/people?21029"
    }
}

Key fields the pipeline relies on:

Field Type Description
uuid string Logical event identity (primary deduplication key)
source_timestamp ISO 8601 When the change occurred in MongoDB
read_timestamp ISO 8601 When the CDC connector read the event
operationType enum insert, update, or delete
payload._id string User identifier (primary key in the snapshot)
update_description object Diff of changed fields (present on update events only)

Note: The read_timestamp is always >= source_timestamp. The gap between them is the CDC ingestion latency, surfaced in the business queries (see Business Logic).

Project Structure

.
├── src/                        # Core pipeline logic (Python)
│   ├── pipeline/               # Ingestion, discovery, transformation logic
│   └── duckdb_client/          # DB bootstrap, CLI, helpers
├── analysis/                   # Business interpretation of results
├── sql/                        # SQL transformations (layered)
│   ├── init/                   # Schema initialization scripts
│   ├── 1*.sql                  # Transformative SQL queries
│   └── 20_*.sql                # Business-facing snapshot tables
├── airflow/                    # Airflow DAG definition + setup scripts
├── data/                       # Input CDC dataset (partitioned JSONL)
│   └── users/YYYY/MM/DD/HH/    # Hourly partition layout
├── db/                         # DuckDB database (generated at runtime)
├── tests/                      # Unit + integration tests
├── docker-compose.yml          # Orchestration (Airflow + services)
├── Makefile                    # All entry points
└── .env.template               # Environment variable template

Key Modules

  • src/pipeline/
    • ingestion, discovery, transformation logic
  • src/duckdb_client/
    • DB bootstrap, CLI, helpers
  • sql/init/
    • schema initialization
  • sql/10-19/
    • transformation layers (raw to marts)

Execution Paths (Python Script, Docker Service and Airflow)

This section is structured to illustrate how data processing can evolve from:

  • simple script, to
  • containerized batch, to
  • orchestrated data pipeline running on a modern data platform

Notes:

  • The result produced by each execution path were used as a reference point for comparison and validation.
  • A dual implementation costs more time and energy, but it creates a comparison surface.
    • That surface is valuable because mismatches often reveal hidden assumptions, undefined semantics, or accidental behaviour in one path.
    • Here, the Airflow implementation (that can later be completed with dbt and close-to-production database like Postgres), adds operational semantics, shifting the thinking from "is the logic correct?" from "is the platform robust?".

Python (Reference Implementation)

make run_python_script

Uses the Poetry-managed virtual environment to run the full pipeline end-to-end. This is the recommended mode for local development. It gives direct control over each pipeline step, exposes Python-level stack traces, and avoids the overhead of container orchestration.

Docker (Reproductible Execution)

make run_python_service

Packages the pipeline in a Docker container. All dependencies are pinned, no Poetry installation is required on the host. Mirrors what a production execution environment would look like, making it ideal for CI/CD pipelines or peer review.

Note: Bootstrapping the local Duck Database is done using a Python script. To prevent local dependencies hassle (poetry, python), the init script is executed via a one-off docker container instance run via:

run_bootstrap_image:
	docker compose --profile batch run --rm --build duckdb-bootstrap

Airflow (Orchestrated Pipeline)

Step 1: Generate the Airflow Fernet Key

Airflow requires a Fernet key to encrypt sensitive metadata stored in its database.

Generate a fresh key:

make fernet_key

Copy the output and set it in your docker-compose.yml:

AIRFLOW_FERNET_KEY=<your_generated_key>

Step 2: Configure your environment

cp .env.template .env
# then fill in required values

Step 3: Start Airflow

make airflow

Access the Airflow UI at http://localhost:8080.

You can trigger the DAG manually from the UI:

AF DAG

Why sequential rather than parallel?

Each layer has a strict dependency on the layer above it. staging_pii cannot be populated until the raw events exist, and marts cannot be built until the privacy-safe staging table is ready. Parallelism would be appropriate when processing independent partitions (e.g., multiple source tables running simultaneously), not for dependent transformation layers.

Audit table and UUID tracking:

  • audit results are written to marts.pipeline_audit.
  • this makes every pipeline run observable and traceable without relying on log scraping.

Why Airflow Initialisation Runs Separately

For local reproducibility, Airflow is run as a single unified container (combining webserver + scheduler) rather than the standard multi-service decomposition. This simplifies setup significantly with no need to manage separate webserver, scheduler, worker, and triggerer services.

The trade-off is that Airflow metadata DB initialisation and default admin user creation cannot happen at container start (which would create a race condition). Instead, a dedicated init script runs after the containers are healthy:

make init_airflow

This can safely be re-run at any time to reset the Airflow metadata state.

Inspecting the Data

Connect to the DuckDB file (db/analytics.duckdb) using DBeaver or the built-in CLI:

make duckdb_cli

Schemas available after a successful pipeline run:

Schema Purpose
raw Immutable append-only event envelope
staging_pii Typed, deduplicated events (PII retained)
staging Privacy-safe analytical events
marts Business-facing snapshot tables

Example exploration queries:

-- How many raw events were ingested?
SELECT COUNT(*) FROM raw.raw_user_cdc_events;

-- Inspect the schema without loading data
SELECT * FROM raw.raw_user_cdc_events WHERE 1 = 0;

-- Check for events with missing UUIDs (data quality signal)
SELECT COUNT(*) FROM raw.raw_user_cdc_events WHERE event_uuid IS NULL;

-- How many unique users are in the current snapshot?
SELECT COUNT(*) FROM marts.users_current;

-- Verify DELETE handling — no deleted users should appear in the snapshot
SELECT COUNT(*) FROM marts.users_current WHERE is_deleted = TRUE;

Data Platform Implementation

Why DuckDB?

DuckDB was selected as analytical engine for several reasons:

  • Native JSON support (efficient parsing of CDC payloads)
  • Embedded (no infrastructure overhead e.g. compared to Postgres, even though manageable)
  • Strong OLAP capabilities
  • SQL-first (aligns with analytical workflows)

Example of DuckDB's native JSON power:

-- read directly from JSONL without Python
SELECT *
FROM read_json_auto('data/users/2026/02/05/16/25/events-20260223_143220.jsonl');

DuckDB CLI wrapper: Rather than installing DuckDB globally, a Python-based CLI wrapper (src/duckdb/cli.py) is provided. It exposes an interactive SQL REPL backed by the project's DuckDB file:

make duckdb_cli

# Connected to DuckDB database: db/analytics.duckdb
# Type SQL and press Enter. Type 'exit' or 'quit' to leave.

duckdb> SELECT COUNT(*) FROM marts.users_current;
duckdb> exit

Layered Architecture - In Depth

Raw Layer: raw.raw_user_cdc_events

This is the immutable event log — the foundation of the entire platform. Every CDC event ingested, regardless of quality, lands here.

Why no primary key?

The raw layer intentionally has no primary key constraint. A primary key would mean DuckDB rejects duplicate inserts, which creates a silent data loss risk: if the same event file is ingested twice (e.g., after a failed pipeline run), the second run would silently skip records instead of flagging the issue. By allowing duplicates at the raw layer, deduplication becomes an explicit, observable transformation in staging_pii and not an implicit side effect of a constraint.

Key columns:

Column Description
run_id UUID identifying the pipeline execution that produced this row
file_source Path to the source JSONL file
partition Logical partition (e.g., 2026/02/05/16)
ingestion_context Execution mode: python_script, python_service, or airflow_dag
payload_json Full raw JSON envelope — never modified
read_timestamp When the event was read by the pipeline
event_uuid Extracted CDC event identifier (may be null for unreliable events)

Design rationale: Data loss at ingestion is categorically worse than dirty data in the raw layer. Dirty data can be cleaned or filtered downstream. Lost data cannot be recovered without re-ingesting from the source.

Staging PII Layer: staging_pii.stg_user_events_typed

This layer parses, types, and deduplicates the raw JSON envelope into structured, column-oriented records while retaining all PII fields for completeness.

Field promotion strategy:

Only fields with clear downstream business rationale are extracted as typed columns. For example, structured address fields (street, city, postcode) are intentionally not flattened here — they are not needed by any downstream analytics query, and flattening them would expose PII unnecessarily while adding schema complexity with no benefit.

Deduplication using event_uuid:

SELECT *,
  ROW_NUMBER() OVER (
    PARTITION BY event_uuid
    ORDER BY read_timestamp ASC, source_timestamp ASC
  ) AS row_num
FROM parsed_events
QUALIFY row_num = 1;

The deduplication uses event_uuid — not a payload_hash. Here is why:

  • event_uuid = logical event identity: the CDC system assigned this identifier to represent a specific change event
  • payload_hash = byte-level identity: two payloads are identical only if every byte matches

In a CDC context, the same logical event (same UUID) can arrive with slightly different payload bytes (e.g., field ordering differences, whitespace normalization by the CDC connector). Using payload hash for deduplication would generate false duplicates — retaining two records for what is logically the same event. event_uuid is the semantically correct deduplication key.

Events with null event_uuid are not promoted to staging_pii — they are flagged in marts.pipeline_audit instead. These represent unreliable events that cannot be reliably deduplicated.

Why MERGE instead of full refresh?

MERGE INTO staging_pii.stg_user_events_typed AS target
USING new_events AS source
ON target.event_uuid = source.event_uuid
WHEN NOT MATCHED THEN INSERT ...
WHEN MATCHED THEN UPDATE ...;

Using MERGE (upsert) in the staging layer provides incremental processing — only new or changed events are processed. This is important because:

  1. The raw layer is append-only and grows continuously
  2. Re-processing the entire raw history on every run would become prohibitively expensive at scale
  3. Idempotency is preserved: re-running with the same events produces the same staging_pii state

Staging (Privacy-Safe) Layer: staging.stg_user_events_privacy_safe

This layer removes direct PII while preserving analytical utility. It is the primary layer consumed by analysts and BI tools.

Privacy transformations applied:

Field Raw Value Transformed Value Rationale
email alice@gmail.com Removed Direct PII
email_hashed sha256('alice@gmail.com') Identity tracking without exposure
email_domain gmail.com Business analytics (provider distribution)
name Alice Müller Removed Direct PII
date_of_birth 1989-04-12 Removed Direct PII
age_group [30-40] Re-identification safe bucketing
country US / USA / United States US Normalized ISO code

Age group calculation:

CONCAT(
  '[',
  CAST(FLOOR(date_diff('year', date_of_birth, CURRENT_DATE) / 10) * 10 AS INT),
  '-',
  CAST(FLOOR(date_diff('year', date_of_birth, CURRENT_DATE) / 10) * 10 + 10 AS INT),
  ']'
) AS age_group

Non-determinism caveat: Using CURRENT_DATE means a user born in 1989 will shift from age group [30-40] to [40-50] after their 40th birthday, even if the underlying raw data never changes. This breaks backfill reproducibility — running the pipeline on the same input two years apart will produce different age groups.

Recommended fix: Use a fixed reference date tied to the pipeline run:

-- Option A: Use the pipeline snapshot date
date_diff('year', date_of_birth, DATE '2026-01-01')

-- Option B: Use the max source_timestamp in the dataset
date_diff('year', date_of_birth, (SELECT MAX(source_timestamp)::DATE FROM staging_pii.stg_user_events_typed))

The current implementation uses CURRENT_DATE as a pragmatic choice for this context. In a production system, this should be parameterised as a pipeline input.

MARTS Layer: marts.users_current

The business-facing snapshot table — one row per user, representing the latest known state.

How DELETEs are handled:

CDC delete events carry change_type = 'DELETE'. The marts build applies a two-stage approach:

  1. The full event history (ordered by source_timestamp -> read_timestamp -> event_uuid) is ranked per user_id using ROW_NUMBER().
  2. The latest event per user is selected. If that latest event is a DELETE, the user is excluded from the snapshot.
-- Simplified marts build logic
WITH ranked_events AS (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY user_id
      ORDER BY source_timestamp DESC, read_timestamp DESC, event_uuid DESC
    ) AS rn
  FROM staging.stg_user_events_privacy_safe
),
latest AS (
  SELECT * FROM ranked_events WHERE rn = 1
)
INSERT OR REPLACE INTO marts.users_current
SELECT * FROM latest
WHERE change_type != 'DELETE'
  AND is_deleted IS NOT TRUE;

The is_deleted flag provides a defensive double-check against CDC connectors that may emit delete events with incorrect change_type values.

Why full refresh for marts (not MERGE)?

Unlike staging_pii which uses incremental MERGE, the marts layer applies a full refresh on every run:

  • The current snapshot is truncated and rebuilt from the full staging history
  • This is simpler, deterministic, and avoids accumulating stale state
  • At the scale of this dataset (tens of thousands to low millions of users), this is computationally acceptable
  • At 50M+ users, this would need to transition to an incremental model (see Scaling)

Pipeline Steps - End to End

1. Discover JSONL partition files
          ↓
2. Ingest raw events → raw.raw_user_cdc_events (append-only)
          ↓
3. Parse JSON + type fields → staging_pii.stg_user_events_typed (MERGE, dedup by event_uuid)
          ↓
4. Apply privacy transformations → staging.stg_user_events_privacy_safe (MERGE)
          ↓
5. Build current-state snapshot → marts.users_current (full refresh)
          ↓
6. Run audit checks → marts.pipeline_audit (append)

Architectural Decisions & Trade-offs

Preserving the Raw JSON Envelope

Decision: Store the complete raw JSON payload as a single payload_json column in the raw layer, rather than parsing it at ingestion time.

Pro Con
Full lineage: any field can be re-extracted downstream without re-ingestion Storage overhead (raw JSON is verbose)
Schema evolution: new source fields are automatically captured Parsing cost deferred to transformation layer
Replay: any transformation bug can be fixed by re-running SQL, not re-ingesting Raw layer queries require JSON extraction
Audit: the exact bytes that arrived from the CDC source are preserved

At analytical scale, the storage overhead is offset by columnar compression in DuckDB's native format. The replay and auditability benefits are worth it.

UUID vs. Payload Hash for Deduplication

Property event_uuid payload_hash
Semantics Logical event identity (assigned by CDC system) Byte-level content identity
CDC-appropriate Yes Fragile (field order, whitespace)
False duplicate risk Low High
Business meaning Matches CDC contract Arbitrary

Decision: Use event_uuid as the primary deduplication key. The payload_hash is computed and stored as a lineage/debugging aid but is not used for deduplication logic.

Deduplication Strategy: First-Seen Wins

When multiple records share the same event_uuid, the ordering is:

source_timestamp ASC -> read_timestamp ASC -> event_uuid ASC (tie-breaker)

The first-seen record (earliest read_timestamp) is retained. This is a deliberate choice: it preserves the original event as it was first observed by the pipeline, before any potential re-delivery artifacts.

Merge vs. Full Refresh by Layer

Layer Strategy Rationale
raw Append-only Immutable event log
staging_pii MERGE (incremental) Grow incrementally; idempotent upsert
staging MERGE (incremental) Same as above
marts Full refresh Simple, deterministic snapshot; acceptable at this scale

Age Group Calculation Trade-off

Current (pragmatic): FLOOR(date_diff('year', date_of_birth, CURRENT_DATE) / 10)

Production recommendation: Parameterise with a fixed snapshot_date passed as a pipeline argument:

# In pipeline orchestration
snapshot_date = datetime.date(2026, 1, 1)
-- In the transformation SQL
FLOOR(date_diff('year', date_of_birth, DATE '{{ snapshot_date }}') / 10)

This makes every pipeline run reproducible against the same snapshot date, which is essential for backfills and historical comparisons.

No Hard Constraints in Raw — By Design

The raw layer has no NOT NULL, UNIQUE, or CHECK constraints. This is intentional:

  • Prevents ingestion failures from blocking the pipeline when upstream data quality degrades
  • Makes anomalies inspectable — a record with a null UUID lands in raw and can be queried
  • Downstream layers handle qualitystaging_pii filters null-UUID records; marts.pipeline_audit surfaces them

The risk of "data contamination" propagating to analytics is mitigated by the explicit contract of each downstream layer, not by raw-layer constraints.

Idempotency & Determinism

The pipeline is fully idempotent. Running it N times on the same input produces identical output. This is guaranteed by:

  1. Deduplication by event_uuid — duplicate raw events resolve to a single staging record
  2. MERGE-based upserts — re-inserting the same data does not change the result
  3. Deterministic orderingsource_timestamp -> read_timestamp -> event_uuid provides a total order for tie-breaking
  4. Full refresh on marts — the snapshot is always rebuilt from scratch, eliminating accumulated state drift

Business Logic

All analytical business queries are implemented in sql/20_business_queries.sql. Examples:

-- distinct active users in the snapshot (not deleted)
-- Answer: 18,997
SELECT COUNT(DISTINCT user_id) AS active_user_count
FROM marts.users_current;

-- percentage of active users using Gmail
-- Answer: 30.38 %
SELECT
    ROUND(
        100.0 * SUM(
            CASE
                WHEN email_domain IN ('gmail.com', 'googlemail.com') THEN 1 -- Legacy German Gmail
                ELSE 0
            END
        ) / NULLIF(COUNT(*), 0), -- n / NULL = NULL
        2
    ) AS gmail_user_percentage
FROM marts.users_current
WHERE email_domain IS NOT NULL;

-- top 3 counties by number of Gmail users
-- Answer:
-- 1. Germany (1,172)
-- 2. Switzerland (235)
-- 3. Spain (228)
select country, COUNT(*) AS gmail_user_count
from marts.users_current
where email_domain IN ('gmail.com', 'googlemail.com')
group by country
order by gmail_user_count DESC
limit 3;

-- how many users changed their email at least once?
-- Answer: 90
with ordered_events AS (
	select
		user_id,
		lower(trim(email)) as email,
		source_timestamp,
		read_timestamp,
		event_uuid,
		LAG(lower(trim(email))) OVER (
			PARTITION BY user_id
			ORDER BY -- different fallbacks
				source_timestamp ASC NULLS LAST,
				read_timestamp ASC NULLS LAST,
				event_uuid ASC NULLS LAST
		) as previous_email
	from staging_pii.stg_user_events_typed
	where
		change_type IN ('INSERT', 'UPDATE')
		and user_id is not null
		and email is not null
)
select count(distinct user_id) as users_who_changed_email
from ordered_events
where
	previous_email is not null -- LAG() replaces with NULL if user_id has just 1 record
	and email <> previous_email; -- email = previous_email if user_id has multiple records but no UPDATE events


-- top 5 email domain transitions
-- Answer:
-- gmail.com -> icloud.com	  4
-- yahoo.com -> outlook.com	  4
-- example.com -> gmail.com   3
-- example.com -> outlook.com 3
-- gmail.com -> example.io	  3
with ordered_events AS (
	select
		user_id,
		email_domain,
		source_timestamp,
		read_timestamp,
		event_uuid,
		LAG(lower(trim(email_domain))) OVER (
			PARTITION BY user_id
			ORDER BY -- different fallbacks
				source_timestamp ASC NULLS LAST,
				read_timestamp ASC NULLS LAST,
				event_uuid ASC NULLS LAST
		) as previous_email_domain
	from staging.stg_user_events_privacy_safe
	where
		change_type IN ('INSERT', 'UPDATE')
		and user_id is not null
		and email_domain is not null
)
select
	previous_email_domain || ' -> ' || email_domain as domain_transition,
	COUNT(*) as transition_count
from ordered_events
where
	previous_email_domain is not null
	and email_domain is not null
	and previous_email_domain <> email_domain
group by domain_transition
order by
	transition_count desc,
	domain_transition asc
limit 5;

-- average time span in minutes between first and last CDC event for users with more than one event
-- Answer: 2.86 minutes
with user_event_spans AS (
	select
		user_id,
		MIN(source_timestamp) as first_event_ts,
        MAX(source_timestamp) AS last_event_ts,
        COUNT(*) AS event_count
	from staging_pii.stg_user_events_typed
	where
		user_id is not NULL
	group by user_id
	having count(*) > 1
)
select
	ROUND(AVG(date_diff('minute', first_event_ts, last_event_ts)), 2) AS avg_event_span_minutes
from user_event_spans;

Production Architecture

This section addresses the follow-up questions that would typically be raised before deployment and productionisation.

Orchestration

The current implementation uses Airflow as a local orchestrator. In a production environment, the choice would depend on team maturity and scale:

Orchestrator Best For
Apache Airflow Large teams, complex dependency graphs, mature ecosystem
Dagster Asset-centric workflows, built-in data lineage
Prefect Pythonic API, fast iteration, hybrid deployments
dlt (Data Load Tool) Lightweight ingestion pipelines, schema inference

Late and Out-of-Order Events

CDC streams are inherently unreliable with respect to ordering. Events from the same user may arrive in any sequence. The platform handles this at two levels:

At deduplication time: Deterministic ordering (source_timestamp -> read_timestamp -> event_uuid) ensures the pipeline always produces the same result regardless of ingestion order.

For late-arriving events (watermarking): A watermarking strategy defines a maximum lateness tolerance. Any event arriving after the watermark for its partition is either:

  • Reprocessed in a dedicated late-data window
  • Flagged in pipeline_audit and surfaced for manual review

Conceptually:

Watermark = max(source_timestamp) - allowed_lateness_interval
Late event = source_timestamp < Watermark at time of ingestion

In a streaming context (Kafka + Flink), this is handled natively by the framework's event-time processing. In this batch pipeline, it is approximated by partition-level reprocessing.

Reprocessing window: The pipeline could be re-run for a specific partition range without affecting other partitions (left for implementation):

# Re-run only the partition for a specific date/hour
make run_python_script PARTITION=2026/02/05/16

Schema Evolution

Because the raw layer preserves the full JSON envelope, schema evolution is handled gracefully:

  • A new field appearing in the CDC payload is automatically captured in payload_json
  • It can be promoted to a typed column in staging_pii by adding a single json_extract expression
  • No re-ingestion is required: the field can be backfilled by re-running the staging transformation over existing raw data

This is a significant operational advantage over systems that parse at ingestion time.

Backfills

The pipeline supports targeted backfills via run_id isolation. Every row in raw carries a run_id (a UUID generated at pipeline start time). To backfill a specific date range:

  1. Identify the run_id values covering the target partitions
  2. Re-run the staging transformation filtered to those run_id values
  3. Re-run the marts build
-- Identify run_ids for a target partition
SELECT DISTINCT run_id, partition, MIN(read_timestamp) AS first_seen
FROM raw.raw_user_cdc_events
WHERE partition LIKE '2026/02/%'
GROUP BY 1, 2
ORDER BY 3;

Because the staging layer uses MERGE and the marts layer uses full refresh, backfilling specific partitions is safe and will not corrupt existing data outside the target range.

Scaling

At 50M users / 10M events per day, the following changes would be required:

Component Current At Scale
Storage DuckDB file BigQuery / Snowflake / Iceberg
Compute Single-node DuckDB Distributed SQL engine
Partitioning JSONL files by date/hour Partitioned tables (by event_date)
Transformation Full refresh marts Incremental models (partition pruning)
Deduplication DuckDB MERGE Streaming deduplication (Kafka / Flink)

The SQL transformations are largely portable to BigQuery or Snowflake with minor dialect adjustments. The architectural layers (raw -> staging_pii -> staging -> marts) would remain unchanged.

Latency

Current: Batch (minutes to hours depending on file arrival frequency)

Path to lower latency:

  1. Micro-batching (near real-time, minutes): Replace scheduled Airflow runs with short-interval triggers (every 1–5 minutes). Airflow's schedule_interval schedule handles this. The pipeline already supports partial ingestion of new partition files.

  2. Streaming ingestion (seconds): Replace JSONL file drops with a Kafka topic as the CDC event stream. A Kafka consumer reads events in real-time and writes them to DuckDB (or a cloud warehouse) as micro-batches. The staging_pii MERGE logic is unchanged — only the ingestion mechanism changes.

    MongoDB -> Debezium CDC Connector -> Kafka Topic -> Kafka Consumer -> raw layer -> transformations
    
  3. Incremental DAGs: Break the single cdc_pipeline DAG into partition-level tasks, enabling parallel processing of multiple hourly partitions simultaneously.

Data Privacy & AI Readiness

Implemented Measures

Measure Implementation
Pseudonymization SHA-256 hash of email address
Data minimization PII removed at staging layer boundary
Layer separation staging_pii access restricted; analysts query staging only
Aggregation Date of birth -> 10-year age bucket
Normalization Country variants -> ISO codes

Right to be Forgotten (GDPR)

Handling a deletion request for user_id = X:

  1. Raw layer: Insert a logical tombstone event with change_type = 'FORGET' and user_id = X. The original raw events are not physically deleted (audit log). A scheduled compaction job can physically purge after the legally required retention period.

  2. Staging layers: The next pipeline run will process the tombstone event and remove all rows for user_id = X from staging_pii and staging via a MERGE DELETE operation.

  3. Marts: The full refresh will exclude the deleted user from users_current on the next run.

This propagation is event-driven — no manual intervention required for each deletion request. The tombstone event drives the entire cascade.

Differential Privacy (for AI/ML use cases)

If the dataset is used to train ML models, raw aggregates may leak individual-level information even after pseudonymization. Differential privacy adds calibrated statistical noise to query results, providing a mathematical guarantee that no individual's presence or absence in the dataset can be inferred.

Example: Instead of exposing COUNT(*) directly in an analytical API, the system returns COUNT(*) + Laplace(sensitivity/epsilon), where epsilon is the privacy budget. This makes the output statistically indistinguishable from a result computed on a dataset with any single row added or removed.

In practice, frameworks (Google's DP Library, OpenDP) implement these mechanisms and can be applied at the marts query layer.

Potential Enhancements

dbt Integration

dbt (data build tool) would replace the raw SQL files in sql/ with a managed transformation framework providing:

  • Lineage graph: visual DAG of table dependencies
  • Built-in tests: not_null, unique, accepted_values, relationships assertions on every model
  • Documentation: auto-generated data catalog from model YAML
  • Version control: each model is a .sql file with a clear contract

The layered architecture (raw -> staging_pii -> staging -> marts) maps directly to dbt's model organization conventions.

Data Contracts

A data contract is a formal, machine-readable agreement between a data producer (e.g., the CDC source team) and a data consumer (e.g., this pipeline). It specifies:

  • Expected fields and their types
  • Nullable / required constraints
  • Acceptable value ranges
  • SLA on freshness and completeness

Example (YAML contract for the CDC source):

contract:
  name: user_cdc_events
  version: "1.2.0"
  fields:
    - name: uuid
      type: string
      required: true
      description: "Unique event identifier assigned by the CDC system"
    - name: payload._id
      type: string
      required: true
      description: "MongoDB document ID (user identifier)"
    - name: operationType
      type: enum
      values: [insert, update, delete]
      required: true
  sla:
    max_latency_minutes: 5
    min_completeness_pct: 99.5

Tools can enforce contracts at the producer side.

Schema Registry

A schema registry is a centralized service that stores and versions the schema of every event type in the system. When a producer publishes an event, the registry validates it against the registered schema before it reaches consumers.

In a Kafka-based architecture:

Producer -> Schema Registry (validate against Avro/Protobuf schema) -> Kafka Topic -> Consumer

Benefits for this pipeline:

  • Breaking change detection: If the CDC source adds a required field or changes a type, the registry rejects the event before it contaminates the raw layer
  • Schema evolution governance: Backward/forward compatibility rules are enforced centrally
  • Consumer contract: Consumers (this pipeline) can retrieve the schema from the registry at runtime instead of hardcoding field names

Great Expectations

Great Expectations is a data quality framework that lets you define expectations (assertions about your data) and run them as validation checkpoints in your pipeline.

Example expectations for staging.stg_user_events_privacy_safe:

# No null user_ids
expect_column_values_to_not_be_null("user_id")

# Email domain is always present when email_hashed is present
expect_column_pair_values_to_be_equal("email_hashed is null", "email_domain is null")

# Country codes are valid ISO 2-letter codes
expect_column_values_to_be_in_set("country", valid_iso_countries)

# Snapshot freshness — latest event is recent
expect_column_max_to_be_between("source_timestamp", min_value=yesterday, max_value=now)

These run as a pipeline step after each transformation layer and write results to a validation report. Failed expectations block promotion to the next layer or trigger alerts.

Languages

Python92.9%Makefile5.0%Dockerfile1.5%Shell0.6%

Contributors

Created March 22, 2026
Updated March 22, 2026