olivierbenard/cdc-data-platform-reference
Reference implementation of a CDC-driven data platform focusing on event ordering, idempotency, and append-only ingestion with explicit trade-offs in data modeling and privacy boundaries.
CDC Data Platform - DuckDB + Airflow Reference Implementation
Why this project exists
This project is a reference implementation of a Change Data Capture (CDC) pipeline designed to explore correctness, ordering, and auditability in event-driven data systems.
In CDC-based architectures, data (log events) arrives as a stream of events that may be duplicated, delayed, or delivered out of order. Building reliable analytical datasets on top of such inputs requires explicit handling of these characteristics.
This repository was created to make those challenges and their solutions explicit.
It focuses on:
- preserving the raw event log as a complete and immutable source of truth
- handling out-of-order and duplicated events deterministically
- enforcing idempotency across pipeline re-executions
- separating ingestion, parsing, and business transformations into clear layers (
raw->staging_pii->staging->marts) - introducing privacy boundaries as part of the data model
The goal is not to optimize for minimal implementation, but to surface the architectural decisions required to build a correct and auditable CDC pipeline.
For a complementary perspective focused on batch-oriented analytics pipelines and transformation modelling, see the
analytics-batch-pipeline-reference project.
Together, both projects illustrate how ingestion design shifts depending on whether the system is event-driven (CDC) or batch-oriented.
What this demonstrates
This project focuses on the design principles required to build reliable CDC-driven data platforms.
Immutable raw ingestion
- The raw layer is strictly append-only, with no primary key or uniqueness constraints
- All events are preserved as received, ensuring full auditability and replay capability
- Data quality issues are not rejected at ingestion but made observable downstream
- Additional fields are added into the table's schema for full auditability of the ingestion event.
This contrasts with the batch pipeline reference, where ingestion-time constraints (e.g. payload hash-based deduplication) is introduced to simplify downstream processing.
Deterministic event processing
- Deduplication is performed using CDC-native identifiers (
event_uuid) - Ordering is explicitly defined (
source_timestamp → read_timestamp → event_uuid) - The pipeline produces consistent results regardless of ingestion order
Idempotency by design
- Reprocessing the same input produces identical outputs
- MERGE-based transformations ensure safe incremental updates
- Duplicate events do not lead to duplicated business records
Layered data architecture
- Clear separation between:
raw(immutable event log)staging_pii(typed and deduplicated events)staging(privacy-safe transformations)marts(business-facing datasets)
- Each layer has an explicit contract and responsibility
Data is a liability before it is an asset
- Only parsing and exposing what is needed downstream is critical.
- Keeping all raw fields forever in the analytical layer is an anti-pattern because it creates unnecessary re-identification risk, schema fragility, and cognitive overhead for consumers.
- Instead, the raw layer stores the complete payload envelope, and each downstream layer explicitly selects what it promotes.
Handling of CDC-specific challenges
- Out-of-order events
- Late-arriving data
- Duplicate delivery from upstream systems
- Logical deletes and state reconstruction
Data privacy as a structural concern
- Explicit separation between PII and privacy-safe layers
- Controlled field promotion across layers
- Transformation of sensitive attributes into analytical representations
Trade-off driven ingestion design
- No constraints at ingestion time maximizes auditability and resilience
- Deduplication and correctness are enforced in transformation layers rather than at write time
This illustrates a different design point compared to batch-oriented pipelines, where constraints may be introduced earlier in the pipeline to reduce downstream complexity.
Debuggability over strict constraints
- Raw data ingestion never fails due to schema constraints.
- If a record is malformed, it lands in
rawand is handled (flagged, quarantined, or skipped) in the transformation layers. - This prevents silent data loss and makes anomalies inspectable.
Overview
This repository is a production-grade reference implementation of a local-first analytical data platform built to ingest, process, and expose Change Data Capture (CDC) events originating from a MongoDB source, transforming them into analytics-ready tables using DuckDB as the embedded OLAP engine and Apache Airflow for orchestration.
The platform supports three execution modes, each targeting a different stage of the development lifecycle:
| Mode | Command | Use Case |
|---|---|---|
| Python CLI (Poetry) | make run_python_script |
Local development, step-by-step debugging |
| Docker Service | make run_python_service |
Reproducible execution, CI-like environment |
| Airflow DAG | make airflow |
Orchestrated, production-like scheduling |
What the pipeline does
- Ingests partitioned JSONL CDC event files (organized by date/hour partitions)
- Applies CDC ordering semantics: correctly sequencing
INSERT,UPDATE, andDELETEoperations even when events arrive out-of-order or duplicated - Enforces data privacy transformations: isolating and removing PII at explicit layer boundaries
- Produces a current-state user snapshot: a clean, analytics-ready
marts.users_currenttable with one row per user
Prerequisites
| Tool | Version | Notes |
|---|---|---|
| Docker | any recent | Container execution |
| Docker Compose | V2 | docker compose (not docker-compose) |
| GNU Make | any | Entry points |
| Python | >= 3.12 | Required |
For local development:
| Tool | Version | Notes |
|---|---|---|
| Poetry | >= 1.8.3 | Dependency management |
Quick validation:
docker --version
docker compose version
make --version
python --version
poetry --versionTesting & Branch Coverage (Development)
To run the test suite:
make checksThe fail_under = 80 threshold acts as a quality gate. It prevents merging code that significantly reduces test coverage without a deliberate decision to accept it.
How it runs in CI
The GitHub Actions workflow runs the full test suite with branch coverage on every push and pull request.
Source Data
The input reference dataset is provided as a .tar archive under data/. It contains partitioned JSONL files organised by ingestion timestamp, following the layout data/users/YYYY/MM/DD/HH/.
To extract it:
make unzip_dataEach JSONL file contains one CDC event per line. A single event envelope looks like this:
{
"uuid": "3af1d1af-1de1-4e33-91cd-7c2bf250ff65",
"read_timestamp": "2026-02-05T16:52:41.017737Z",
"source_timestamp": "2026-02-05T16:23:00.017737Z",
"object": "users",
"read_method": "mongodb-change-stream",
"stream_name": "mongodb-test-stream",
"sort_keys": [1770304980, 2796],
"source_metadata": {
"database": "app",
"collection": "users",
"primary_keys": ["_id"],
"change_type": "INSERT",
"is_deleted": false,
"cluster_time": {"t": 1770304980, "i": 2796},
"txn_number": null,
"lsid": null
},
"payload": {
"_id": "2d85cdb5399741789e67e27b",
"firstname": "Taylor",
"lastname": "Gordon",
"email": "erin5615@example.com",
"phone": "001-811-254-4899x659",
"birthday": "1982-04-12",
"gender": "female",
"address": {
"street": "41569 Hayley Rapid Suite 516",
"streetName": "Sawyer Unions",
"buildingNumber": "25430",
"city": "Livingstonside",
"zipcode": "17064",
"country": "Italy",
"country_code": "PG",
"latitude": -45.864413,
"longitude": 38.167421},
"website": "https://brooks-carlson.com/",
"image": "http://placeimg.com/640/480/people?21029"
}
}Key fields the pipeline relies on:
| Field | Type | Description |
|---|---|---|
uuid |
string | Logical event identity (primary deduplication key) |
source_timestamp |
ISO 8601 | When the change occurred in MongoDB |
read_timestamp |
ISO 8601 | When the CDC connector read the event |
operationType |
enum | insert, update, or delete |
payload._id |
string | User identifier (primary key in the snapshot) |
update_description |
object | Diff of changed fields (present on update events only) |
Note: The read_timestamp is always >= source_timestamp. The gap between them is the CDC ingestion latency, surfaced in the business queries (see Business Logic).
Project Structure
.
├── src/ # Core pipeline logic (Python)
│ ├── pipeline/ # Ingestion, discovery, transformation logic
│ └── duckdb_client/ # DB bootstrap, CLI, helpers
├── analysis/ # Business interpretation of results
├── sql/ # SQL transformations (layered)
│ ├── init/ # Schema initialization scripts
│ ├── 1*.sql # Transformative SQL queries
│ └── 20_*.sql # Business-facing snapshot tables
├── airflow/ # Airflow DAG definition + setup scripts
├── data/ # Input CDC dataset (partitioned JSONL)
│ └── users/YYYY/MM/DD/HH/ # Hourly partition layout
├── db/ # DuckDB database (generated at runtime)
├── tests/ # Unit + integration tests
├── docker-compose.yml # Orchestration (Airflow + services)
├── Makefile # All entry points
└── .env.template # Environment variable templateKey Modules
src/pipeline/- ingestion, discovery, transformation logic
src/duckdb_client/- DB bootstrap, CLI, helpers
sql/init/- schema initialization
sql/10-19/- transformation layers (raw to marts)
Execution Paths (Python Script, Docker Service and Airflow)
This section is structured to illustrate how data processing can evolve from:
- simple script, to
- containerized batch, to
- orchestrated data pipeline running on a modern data platform
Notes:
- The result produced by each execution path were used as a reference point for comparison and validation.
- A dual implementation costs more time and energy, but it creates a comparison surface.
- That surface is valuable because mismatches often reveal hidden assumptions, undefined semantics, or accidental behaviour in one path.
- Here, the Airflow implementation (that can later be completed with dbt and close-to-production database like Postgres), adds operational semantics, shifting the thinking from "is the logic correct?" from "is the platform robust?".
Python (Reference Implementation)
make run_python_scriptUses the Poetry-managed virtual environment to run the full pipeline end-to-end. This is the recommended mode for local development. It gives direct control over each pipeline step, exposes Python-level stack traces, and avoids the overhead of container orchestration.
Docker (Reproductible Execution)
make run_python_servicePackages the pipeline in a Docker container. All dependencies are pinned, no Poetry installation is required on the host. Mirrors what a production execution environment would look like, making it ideal for CI/CD pipelines or peer review.
Note: Bootstrapping the local Duck Database is done using a Python script. To prevent local dependencies hassle (poetry, python), the init script is executed via a one-off docker container instance run via:
run_bootstrap_image:
docker compose --profile batch run --rm --build duckdb-bootstrapAirflow (Orchestrated Pipeline)
Step 1: Generate the Airflow Fernet Key
Airflow requires a Fernet key to encrypt sensitive metadata stored in its database.
Generate a fresh key:
make fernet_keyCopy the output and set it in your docker-compose.yml:
AIRFLOW_FERNET_KEY=<your_generated_key>Step 2: Configure your environment
cp .env.template .env
# then fill in required valuesStep 3: Start Airflow
make airflowAccess the Airflow UI at http://localhost:8080.
You can trigger the DAG manually from the UI:
Why sequential rather than parallel?
Each layer has a strict dependency on the layer above it. staging_pii cannot be populated until the raw events exist, and marts cannot be built until the privacy-safe staging table is ready. Parallelism would be appropriate when processing independent partitions (e.g., multiple source tables running simultaneously), not for dependent transformation layers.
Audit table and UUID tracking:
- audit results are written to
marts.pipeline_audit. - this makes every pipeline run observable and traceable without relying on log scraping.
Why Airflow Initialisation Runs Separately
For local reproducibility, Airflow is run as a single unified container (combining webserver + scheduler) rather than the standard multi-service decomposition. This simplifies setup significantly with no need to manage separate webserver, scheduler, worker, and triggerer services.
The trade-off is that Airflow metadata DB initialisation and default admin user creation cannot happen at container start (which would create a race condition). Instead, a dedicated init script runs after the containers are healthy:
make init_airflowThis can safely be re-run at any time to reset the Airflow metadata state.
Inspecting the Data
Connect to the DuckDB file (db/analytics.duckdb) using DBeaver or the built-in CLI:
make duckdb_cliSchemas available after a successful pipeline run:
| Schema | Purpose |
|---|---|
raw |
Immutable append-only event envelope |
staging_pii |
Typed, deduplicated events (PII retained) |
staging |
Privacy-safe analytical events |
marts |
Business-facing snapshot tables |
Example exploration queries:
-- How many raw events were ingested?
SELECT COUNT(*) FROM raw.raw_user_cdc_events;
-- Inspect the schema without loading data
SELECT * FROM raw.raw_user_cdc_events WHERE 1 = 0;
-- Check for events with missing UUIDs (data quality signal)
SELECT COUNT(*) FROM raw.raw_user_cdc_events WHERE event_uuid IS NULL;
-- How many unique users are in the current snapshot?
SELECT COUNT(*) FROM marts.users_current;
-- Verify DELETE handling — no deleted users should appear in the snapshot
SELECT COUNT(*) FROM marts.users_current WHERE is_deleted = TRUE;Data Platform Implementation
Why DuckDB?
DuckDB was selected as analytical engine for several reasons:
- Native JSON support (efficient parsing of CDC payloads)
- Embedded (no infrastructure overhead e.g. compared to Postgres, even though manageable)
- Strong OLAP capabilities
- SQL-first (aligns with analytical workflows)
Example of DuckDB's native JSON power:
-- read directly from JSONL without Python
SELECT *
FROM read_json_auto('data/users/2026/02/05/16/25/events-20260223_143220.jsonl');DuckDB CLI wrapper: Rather than installing DuckDB globally, a Python-based CLI wrapper (src/duckdb/cli.py) is provided. It exposes an interactive SQL REPL backed by the project's DuckDB file:
make duckdb_cli
# Connected to DuckDB database: db/analytics.duckdb
# Type SQL and press Enter. Type 'exit' or 'quit' to leave.
duckdb> SELECT COUNT(*) FROM marts.users_current;
duckdb> exitLayered Architecture - In Depth
Raw Layer: raw.raw_user_cdc_events
This is the immutable event log — the foundation of the entire platform. Every CDC event ingested, regardless of quality, lands here.
Why no primary key?
The raw layer intentionally has no primary key constraint. A primary key would mean DuckDB rejects duplicate inserts, which creates a silent data loss risk: if the same event file is ingested twice (e.g., after a failed pipeline run), the second run would silently skip records instead of flagging the issue. By allowing duplicates at the raw layer, deduplication becomes an explicit, observable transformation in staging_pii and not an implicit side effect of a constraint.
Key columns:
| Column | Description |
|---|---|
run_id |
UUID identifying the pipeline execution that produced this row |
file_source |
Path to the source JSONL file |
partition |
Logical partition (e.g., 2026/02/05/16) |
ingestion_context |
Execution mode: python_script, python_service, or airflow_dag |
payload_json |
Full raw JSON envelope — never modified |
read_timestamp |
When the event was read by the pipeline |
event_uuid |
Extracted CDC event identifier (may be null for unreliable events) |
Design rationale: Data loss at ingestion is categorically worse than dirty data in the raw layer. Dirty data can be cleaned or filtered downstream. Lost data cannot be recovered without re-ingesting from the source.
Staging PII Layer: staging_pii.stg_user_events_typed
This layer parses, types, and deduplicates the raw JSON envelope into structured, column-oriented records while retaining all PII fields for completeness.
Field promotion strategy:
Only fields with clear downstream business rationale are extracted as typed columns. For example, structured address fields (street, city, postcode) are intentionally not flattened here — they are not needed by any downstream analytics query, and flattening them would expose PII unnecessarily while adding schema complexity with no benefit.
Deduplication using event_uuid:
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY event_uuid
ORDER BY read_timestamp ASC, source_timestamp ASC
) AS row_num
FROM parsed_events
QUALIFY row_num = 1;The deduplication uses event_uuid — not a payload_hash. Here is why:
event_uuid= logical event identity: the CDC system assigned this identifier to represent a specific change eventpayload_hash= byte-level identity: two payloads are identical only if every byte matches
In a CDC context, the same logical event (same UUID) can arrive with slightly different payload bytes (e.g., field ordering differences, whitespace normalization by the CDC connector). Using payload hash for deduplication would generate false duplicates — retaining two records for what is logically the same event. event_uuid is the semantically correct deduplication key.
Events with null event_uuid are not promoted to staging_pii — they are flagged in marts.pipeline_audit instead. These represent unreliable events that cannot be reliably deduplicated.
Why MERGE instead of full refresh?
MERGE INTO staging_pii.stg_user_events_typed AS target
USING new_events AS source
ON target.event_uuid = source.event_uuid
WHEN NOT MATCHED THEN INSERT ...
WHEN MATCHED THEN UPDATE ...;Using MERGE (upsert) in the staging layer provides incremental processing — only new or changed events are processed. This is important because:
- The raw layer is append-only and grows continuously
- Re-processing the entire raw history on every run would become prohibitively expensive at scale
- Idempotency is preserved: re-running with the same events produces the same
staging_piistate
Staging (Privacy-Safe) Layer: staging.stg_user_events_privacy_safe
This layer removes direct PII while preserving analytical utility. It is the primary layer consumed by analysts and BI tools.
Privacy transformations applied:
| Field | Raw Value | Transformed Value | Rationale |
|---|---|---|---|
email |
alice@gmail.com |
Removed | Direct PII |
email_hashed |
— | sha256('alice@gmail.com') |
Identity tracking without exposure |
email_domain |
— | gmail.com |
Business analytics (provider distribution) |
name |
Alice Müller |
Removed | Direct PII |
date_of_birth |
1989-04-12 |
Removed | Direct PII |
age_group |
— | [30-40] |
Re-identification safe bucketing |
country |
US / USA / United States |
US |
Normalized ISO code |
Age group calculation:
CONCAT(
'[',
CAST(FLOOR(date_diff('year', date_of_birth, CURRENT_DATE) / 10) * 10 AS INT),
'-',
CAST(FLOOR(date_diff('year', date_of_birth, CURRENT_DATE) / 10) * 10 + 10 AS INT),
']'
) AS age_groupNon-determinism caveat: Using CURRENT_DATE means a user born in 1989 will shift from age group [30-40] to [40-50] after their 40th birthday, even if the underlying raw data never changes. This breaks backfill reproducibility — running the pipeline on the same input two years apart will produce different age groups.
Recommended fix: Use a fixed reference date tied to the pipeline run:
-- Option A: Use the pipeline snapshot date
date_diff('year', date_of_birth, DATE '2026-01-01')
-- Option B: Use the max source_timestamp in the dataset
date_diff('year', date_of_birth, (SELECT MAX(source_timestamp)::DATE FROM staging_pii.stg_user_events_typed))The current implementation uses CURRENT_DATE as a pragmatic choice for this context. In a production system, this should be parameterised as a pipeline input.
MARTS Layer: marts.users_current
The business-facing snapshot table — one row per user, representing the latest known state.
How DELETEs are handled:
CDC delete events carry change_type = 'DELETE'. The marts build applies a two-stage approach:
- The full event history (ordered by
source_timestamp -> read_timestamp -> event_uuid) is ranked peruser_idusingROW_NUMBER(). - The latest event per user is selected. If that latest event is a DELETE, the user is excluded from the snapshot.
-- Simplified marts build logic
WITH ranked_events AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY user_id
ORDER BY source_timestamp DESC, read_timestamp DESC, event_uuid DESC
) AS rn
FROM staging.stg_user_events_privacy_safe
),
latest AS (
SELECT * FROM ranked_events WHERE rn = 1
)
INSERT OR REPLACE INTO marts.users_current
SELECT * FROM latest
WHERE change_type != 'DELETE'
AND is_deleted IS NOT TRUE;The is_deleted flag provides a defensive double-check against CDC connectors that may emit delete events with incorrect change_type values.
Why full refresh for marts (not MERGE)?
Unlike staging_pii which uses incremental MERGE, the marts layer applies a full refresh on every run:
- The current snapshot is truncated and rebuilt from the full staging history
- This is simpler, deterministic, and avoids accumulating stale state
- At the scale of this dataset (tens of thousands to low millions of users), this is computationally acceptable
- At 50M+ users, this would need to transition to an incremental model (see Scaling)
Pipeline Steps - End to End
1. Discover JSONL partition files
↓
2. Ingest raw events → raw.raw_user_cdc_events (append-only)
↓
3. Parse JSON + type fields → staging_pii.stg_user_events_typed (MERGE, dedup by event_uuid)
↓
4. Apply privacy transformations → staging.stg_user_events_privacy_safe (MERGE)
↓
5. Build current-state snapshot → marts.users_current (full refresh)
↓
6. Run audit checks → marts.pipeline_audit (append)
Architectural Decisions & Trade-offs
Preserving the Raw JSON Envelope
Decision: Store the complete raw JSON payload as a single payload_json column in the raw layer, rather than parsing it at ingestion time.
| Pro | Con |
|---|---|
| Full lineage: any field can be re-extracted downstream without re-ingestion | Storage overhead (raw JSON is verbose) |
| Schema evolution: new source fields are automatically captured | Parsing cost deferred to transformation layer |
| Replay: any transformation bug can be fixed by re-running SQL, not re-ingesting | Raw layer queries require JSON extraction |
| Audit: the exact bytes that arrived from the CDC source are preserved |
At analytical scale, the storage overhead is offset by columnar compression in DuckDB's native format. The replay and auditability benefits are worth it.
UUID vs. Payload Hash for Deduplication
| Property | event_uuid |
payload_hash |
|---|---|---|
| Semantics | Logical event identity (assigned by CDC system) | Byte-level content identity |
| CDC-appropriate | Yes | Fragile (field order, whitespace) |
| False duplicate risk | Low | High |
| Business meaning | Matches CDC contract | Arbitrary |
Decision: Use event_uuid as the primary deduplication key. The payload_hash is computed and stored as a lineage/debugging aid but is not used for deduplication logic.
Deduplication Strategy: First-Seen Wins
When multiple records share the same event_uuid, the ordering is:
source_timestamp ASC -> read_timestamp ASC -> event_uuid ASC (tie-breaker)
The first-seen record (earliest read_timestamp) is retained. This is a deliberate choice: it preserves the original event as it was first observed by the pipeline, before any potential re-delivery artifacts.
Merge vs. Full Refresh by Layer
| Layer | Strategy | Rationale |
|---|---|---|
raw |
Append-only | Immutable event log |
staging_pii |
MERGE (incremental) | Grow incrementally; idempotent upsert |
staging |
MERGE (incremental) | Same as above |
marts |
Full refresh | Simple, deterministic snapshot; acceptable at this scale |
Age Group Calculation Trade-off
Current (pragmatic): FLOOR(date_diff('year', date_of_birth, CURRENT_DATE) / 10)
Production recommendation: Parameterise with a fixed snapshot_date passed as a pipeline argument:
# In pipeline orchestration
snapshot_date = datetime.date(2026, 1, 1)-- In the transformation SQL
FLOOR(date_diff('year', date_of_birth, DATE '{{ snapshot_date }}') / 10)This makes every pipeline run reproducible against the same snapshot date, which is essential for backfills and historical comparisons.
No Hard Constraints in Raw — By Design
The raw layer has no NOT NULL, UNIQUE, or CHECK constraints. This is intentional:
- Prevents ingestion failures from blocking the pipeline when upstream data quality degrades
- Makes anomalies inspectable — a record with a null UUID lands in
rawand can be queried - Downstream layers handle quality —
staging_piifilters null-UUID records;marts.pipeline_auditsurfaces them
The risk of "data contamination" propagating to analytics is mitigated by the explicit contract of each downstream layer, not by raw-layer constraints.
Idempotency & Determinism
The pipeline is fully idempotent. Running it N times on the same input produces identical output. This is guaranteed by:
- Deduplication by
event_uuid— duplicate raw events resolve to a single staging record - MERGE-based upserts — re-inserting the same data does not change the result
- Deterministic ordering —
source_timestamp -> read_timestamp -> event_uuidprovides a total order for tie-breaking - Full refresh on marts — the snapshot is always rebuilt from scratch, eliminating accumulated state drift
Business Logic
All analytical business queries are implemented in sql/20_business_queries.sql. Examples:
-- distinct active users in the snapshot (not deleted)
-- Answer: 18,997
SELECT COUNT(DISTINCT user_id) AS active_user_count
FROM marts.users_current;
-- percentage of active users using Gmail
-- Answer: 30.38 %
SELECT
ROUND(
100.0 * SUM(
CASE
WHEN email_domain IN ('gmail.com', 'googlemail.com') THEN 1 -- Legacy German Gmail
ELSE 0
END
) / NULLIF(COUNT(*), 0), -- n / NULL = NULL
2
) AS gmail_user_percentage
FROM marts.users_current
WHERE email_domain IS NOT NULL;
-- top 3 counties by number of Gmail users
-- Answer:
-- 1. Germany (1,172)
-- 2. Switzerland (235)
-- 3. Spain (228)
select country, COUNT(*) AS gmail_user_count
from marts.users_current
where email_domain IN ('gmail.com', 'googlemail.com')
group by country
order by gmail_user_count DESC
limit 3;
-- how many users changed their email at least once?
-- Answer: 90
with ordered_events AS (
select
user_id,
lower(trim(email)) as email,
source_timestamp,
read_timestamp,
event_uuid,
LAG(lower(trim(email))) OVER (
PARTITION BY user_id
ORDER BY -- different fallbacks
source_timestamp ASC NULLS LAST,
read_timestamp ASC NULLS LAST,
event_uuid ASC NULLS LAST
) as previous_email
from staging_pii.stg_user_events_typed
where
change_type IN ('INSERT', 'UPDATE')
and user_id is not null
and email is not null
)
select count(distinct user_id) as users_who_changed_email
from ordered_events
where
previous_email is not null -- LAG() replaces with NULL if user_id has just 1 record
and email <> previous_email; -- email = previous_email if user_id has multiple records but no UPDATE events
-- top 5 email domain transitions
-- Answer:
-- gmail.com -> icloud.com 4
-- yahoo.com -> outlook.com 4
-- example.com -> gmail.com 3
-- example.com -> outlook.com 3
-- gmail.com -> example.io 3
with ordered_events AS (
select
user_id,
email_domain,
source_timestamp,
read_timestamp,
event_uuid,
LAG(lower(trim(email_domain))) OVER (
PARTITION BY user_id
ORDER BY -- different fallbacks
source_timestamp ASC NULLS LAST,
read_timestamp ASC NULLS LAST,
event_uuid ASC NULLS LAST
) as previous_email_domain
from staging.stg_user_events_privacy_safe
where
change_type IN ('INSERT', 'UPDATE')
and user_id is not null
and email_domain is not null
)
select
previous_email_domain || ' -> ' || email_domain as domain_transition,
COUNT(*) as transition_count
from ordered_events
where
previous_email_domain is not null
and email_domain is not null
and previous_email_domain <> email_domain
group by domain_transition
order by
transition_count desc,
domain_transition asc
limit 5;
-- average time span in minutes between first and last CDC event for users with more than one event
-- Answer: 2.86 minutes
with user_event_spans AS (
select
user_id,
MIN(source_timestamp) as first_event_ts,
MAX(source_timestamp) AS last_event_ts,
COUNT(*) AS event_count
from staging_pii.stg_user_events_typed
where
user_id is not NULL
group by user_id
having count(*) > 1
)
select
ROUND(AVG(date_diff('minute', first_event_ts, last_event_ts)), 2) AS avg_event_span_minutes
from user_event_spans;Production Architecture
This section addresses the follow-up questions that would typically be raised before deployment and productionisation.
Orchestration
The current implementation uses Airflow as a local orchestrator. In a production environment, the choice would depend on team maturity and scale:
| Orchestrator | Best For |
|---|---|
| Apache Airflow | Large teams, complex dependency graphs, mature ecosystem |
| Dagster | Asset-centric workflows, built-in data lineage |
| Prefect | Pythonic API, fast iteration, hybrid deployments |
| dlt (Data Load Tool) | Lightweight ingestion pipelines, schema inference |
Late and Out-of-Order Events
CDC streams are inherently unreliable with respect to ordering. Events from the same user may arrive in any sequence. The platform handles this at two levels:
At deduplication time: Deterministic ordering (source_timestamp -> read_timestamp -> event_uuid) ensures the pipeline always produces the same result regardless of ingestion order.
For late-arriving events (watermarking): A watermarking strategy defines a maximum lateness tolerance. Any event arriving after the watermark for its partition is either:
- Reprocessed in a dedicated late-data window
- Flagged in
pipeline_auditand surfaced for manual review
Conceptually:
Watermark = max(source_timestamp) - allowed_lateness_interval
Late event = source_timestamp < Watermark at time of ingestion
In a streaming context (Kafka + Flink), this is handled natively by the framework's event-time processing. In this batch pipeline, it is approximated by partition-level reprocessing.
Reprocessing window: The pipeline could be re-run for a specific partition range without affecting other partitions (left for implementation):
# Re-run only the partition for a specific date/hour
make run_python_script PARTITION=2026/02/05/16Schema Evolution
Because the raw layer preserves the full JSON envelope, schema evolution is handled gracefully:
- A new field appearing in the CDC payload is automatically captured in
payload_json - It can be promoted to a typed column in
staging_piiby adding a singlejson_extractexpression - No re-ingestion is required: the field can be backfilled by re-running the staging transformation over existing raw data
This is a significant operational advantage over systems that parse at ingestion time.
Backfills
The pipeline supports targeted backfills via run_id isolation. Every row in raw carries a run_id (a UUID generated at pipeline start time). To backfill a specific date range:
- Identify the
run_idvalues covering the target partitions - Re-run the staging transformation filtered to those
run_idvalues - Re-run the marts build
-- Identify run_ids for a target partition
SELECT DISTINCT run_id, partition, MIN(read_timestamp) AS first_seen
FROM raw.raw_user_cdc_events
WHERE partition LIKE '2026/02/%'
GROUP BY 1, 2
ORDER BY 3;Because the staging layer uses MERGE and the marts layer uses full refresh, backfilling specific partitions is safe and will not corrupt existing data outside the target range.
Scaling
At 50M users / 10M events per day, the following changes would be required:
| Component | Current | At Scale |
|---|---|---|
| Storage | DuckDB file | BigQuery / Snowflake / Iceberg |
| Compute | Single-node DuckDB | Distributed SQL engine |
| Partitioning | JSONL files by date/hour | Partitioned tables (by event_date) |
| Transformation | Full refresh marts | Incremental models (partition pruning) |
| Deduplication | DuckDB MERGE | Streaming deduplication (Kafka / Flink) |
The SQL transformations are largely portable to BigQuery or Snowflake with minor dialect adjustments. The architectural layers (raw -> staging_pii -> staging -> marts) would remain unchanged.
Latency
Current: Batch (minutes to hours depending on file arrival frequency)
Path to lower latency:
-
Micro-batching (near real-time, minutes): Replace scheduled Airflow runs with short-interval triggers (every 1–5 minutes). Airflow's
schedule_intervalschedule handles this. The pipeline already supports partial ingestion of new partition files. -
Streaming ingestion (seconds): Replace JSONL file drops with a Kafka topic as the CDC event stream. A Kafka consumer reads events in real-time and writes them to DuckDB (or a cloud warehouse) as micro-batches. The
staging_piiMERGE logic is unchanged — only the ingestion mechanism changes.MongoDB -> Debezium CDC Connector -> Kafka Topic -> Kafka Consumer -> raw layer -> transformations -
Incremental DAGs: Break the single
cdc_pipelineDAG into partition-level tasks, enabling parallel processing of multiple hourly partitions simultaneously.
Data Privacy & AI Readiness
Implemented Measures
| Measure | Implementation |
|---|---|
| Pseudonymization | SHA-256 hash of email address |
| Data minimization | PII removed at staging layer boundary |
| Layer separation | staging_pii access restricted; analysts query staging only |
| Aggregation | Date of birth -> 10-year age bucket |
| Normalization | Country variants -> ISO codes |
Right to be Forgotten (GDPR)
Handling a deletion request for user_id = X:
-
Raw layer: Insert a logical tombstone event with
change_type = 'FORGET'anduser_id = X. The original raw events are not physically deleted (audit log). A scheduled compaction job can physically purge after the legally required retention period. -
Staging layers: The next pipeline run will process the tombstone event and remove all rows for
user_id = Xfromstaging_piiandstagingvia a MERGE DELETE operation. -
Marts: The full refresh will exclude the deleted user from
users_currenton the next run.
This propagation is event-driven — no manual intervention required for each deletion request. The tombstone event drives the entire cascade.
Differential Privacy (for AI/ML use cases)
If the dataset is used to train ML models, raw aggregates may leak individual-level information even after pseudonymization. Differential privacy adds calibrated statistical noise to query results, providing a mathematical guarantee that no individual's presence or absence in the dataset can be inferred.
Example: Instead of exposing COUNT(*) directly in an analytical API, the system returns COUNT(*) + Laplace(sensitivity/epsilon), where epsilon is the privacy budget. This makes the output statistically indistinguishable from a result computed on a dataset with any single row added or removed.
In practice, frameworks (Google's DP Library, OpenDP) implement these mechanisms and can be applied at the marts query layer.
Potential Enhancements
dbt Integration
dbt (data build tool) would replace the raw SQL files in sql/ with a managed transformation framework providing:
- Lineage graph: visual DAG of table dependencies
- Built-in tests:
not_null,unique,accepted_values,relationshipsassertions on every model - Documentation: auto-generated data catalog from model YAML
- Version control: each model is a
.sqlfile with a clear contract
The layered architecture (raw -> staging_pii -> staging -> marts) maps directly to dbt's model organization conventions.
Data Contracts
A data contract is a formal, machine-readable agreement between a data producer (e.g., the CDC source team) and a data consumer (e.g., this pipeline). It specifies:
- Expected fields and their types
- Nullable / required constraints
- Acceptable value ranges
- SLA on freshness and completeness
Example (YAML contract for the CDC source):
contract:
name: user_cdc_events
version: "1.2.0"
fields:
- name: uuid
type: string
required: true
description: "Unique event identifier assigned by the CDC system"
- name: payload._id
type: string
required: true
description: "MongoDB document ID (user identifier)"
- name: operationType
type: enum
values: [insert, update, delete]
required: true
sla:
max_latency_minutes: 5
min_completeness_pct: 99.5Tools can enforce contracts at the producer side.
Schema Registry
A schema registry is a centralized service that stores and versions the schema of every event type in the system. When a producer publishes an event, the registry validates it against the registered schema before it reaches consumers.
In a Kafka-based architecture:
Producer -> Schema Registry (validate against Avro/Protobuf schema) -> Kafka Topic -> Consumer
Benefits for this pipeline:
- Breaking change detection: If the CDC source adds a required field or changes a type, the registry rejects the event before it contaminates the raw layer
- Schema evolution governance: Backward/forward compatibility rules are enforced centrally
- Consumer contract: Consumers (this pipeline) can retrieve the schema from the registry at runtime instead of hardcoding field names
Great Expectations
Great Expectations is a data quality framework that lets you define expectations (assertions about your data) and run them as validation checkpoints in your pipeline.
Example expectations for staging.stg_user_events_privacy_safe:
# No null user_ids
expect_column_values_to_not_be_null("user_id")
# Email domain is always present when email_hashed is present
expect_column_pair_values_to_be_equal("email_hashed is null", "email_domain is null")
# Country codes are valid ISO 2-letter codes
expect_column_values_to_be_in_set("country", valid_iso_countries)
# Snapshot freshness — latest event is recent
expect_column_max_to_be_between("source_timestamp", min_value=yesterday, max_value=now)These run as a pipeline step after each transformation layer and write results to a validation report. Failed expectations block promotion to the next layer or trigger alerts.