RemoYukoff/aqueductus
A data testing framework that executes queries on configurable data providers and validates the results with customizable YAML-defined assertions. Ensure data integrity, consistency, and reliability effortlessly.
Aqueductus
A powerful Python framework for validating data quality across different data sources through SQL queries and customizable assertions. Perfect for data engineers and analysts who need to ensure data consistency and quality.
✨ Key Features
-
🔌 Multiple Data Sources
- Amazon Athena
- MySQL
- PostgresSQL
- SQLite
- Extensible architecture for adding more providers
-
🔍 Rich Test Types
- Row presence validation
- Negative testing (absence of rows)
- Column completeness checks
- Value distribution analysis
- Row count validation
- Column existence verification
- Pattern matching with regex support
- Comparative operators (>, <, =)
-
🛠️ Advanced Configuration
- Environment and placeholder variables support
- Multiple data source configurations
- CSV file integration
- Cross-provider testing
-
📊 Flexible Reporting
- Console output
- JSON export
- JUnit XML (CI/CD friendly)
- Markdown reports
🚀 Quick Start
Installation
To install Aqueductus with SQLite support (default):
pip install aqueductusTo use it with MySQL, PostgreSQL, or Athena, install the corresponding extras:
pip install aqueductus[mysql,postgresql,athena]Quick Start
- Create a configuration file (e.g.,
config.yaml):
providers:
- name: my_athena
type: athena
config:
region: ${AWS_REGION}
work_group: ${AWS_WORKGROUP}
aws_access_key_id: ${AWS_ACCESS_KEY_ID}
aws_secret_access_key: ${AWS_SECRET_ACCESS_KEY}
tests:
- name: user_data_validation
provider: my_athena
query: >
SELECT user_id, status
FROM <<table>>
WHERE created_date = <<expected_date>>
# Verify specific rows exist
contains_rows:
source: inline
rows:
- column1: "value1"
column2: "value2"
ignore_columns:
- timestamp
# Verify column completeness
column_ratio:
- column: status
value: "active"
min_ratio: 0.95- Run the tests:
aqueductus config.yaml🛠️ Using Placeholders
Placeholders allow you to dynamically replace certain parts of your queries or configuration files with values defined in your environment.
How It Works
-
Define Placeholders in Your YAML File:
In your YAML file (or any file you want to use), you can use placeholders in the format<<placeholder>>. For example:query: "SELECT * FROM <<table_name>> WHERE <<column_name>> = 'some_value';"
-
Define the Corresponding Values in
environment.py:
In theenvironment.pyfile, define a dictionary calledPLACEHOLDERwhere you map each placeholder to its corresponding value. For example:PLACEHOLDER = { "table_name": "users", "column_name": "id" }
-
Aqueductus Automatically Replaces Placeholders:
When Aqueductus processes the YAML file, it will automatically replace any<<placeholder>>with the value defined in thePLACEHOLDERdictionary fromenvironment.py. In this case, the query will be replaced like so:query: "SELECT * FROM users WHERE id = 'some_value';"
This allows you to easily reuse queries or configurations with different values based on your environment or specific use case.
📚 Test Types
1. Contains Rows
Verifies that specific rows exist in the query results:
contains_rows:
source: inline
rows:
- column1: "value1"
column2: "value2"
ignore_columns:
- timestamp2. Not Contains Rows
Ensures specific rows do not exist:
not_contains_rows:
rows:
- column1: "invalid"
column2: "invalid"3. Column Ratio
Validates the ratio of values in a column:
column_ratio:
- column: status
value: "active"
min_ratio: 0.95
max_ratio: 1.04. Row Count
Verifies the exact number of rows:
row_count: 1005. Columns Exist
Ensures required columns are present:
columns_exists:
- column1
- column2🔄 Data Sources
CSV Integration
Load test data from CSV files:
contains_rows:
source: csv
path: tests/expected_data.csvCross-Provider Testing
Compare data across different providers:
contains_rows:
source: provider
provider: other_athena
query: SELECT * FROM reference_table
map: # Optional column mapping
source_col: target_col📝 Output Formats
The framework supports multiple output formats:
# Single format
aqueductus config.yaml --format json
# Multiple formats
aqueductus config.yaml --format console,json,junitAvailable formats:
console: Human-readable console outputjson: JSON file outputjunit: JUnit XML for CI/CD integrationmarkdown: Markdown report
🛠️ Development
By default, Aqueductus searches for providers.py, testers.py, and reporters.py files in the root directory. Each file will automatically detect and load subclasses for the corresponding provider, test type, or reporter, making it easy to add new components without additional configuration.
Adding a New Provider
Reference to provider implementation:
class DataProvider(ABC):
@abstractmethod
def __init__(self, config: dict[str, Any]) -> None:
pass
@abstractmethod
def execute_query(self, query: str) -> list[dict[str, Any]]:
pass
Adding a New Test Type
Reference to test type implementation:
class DataTest(ABC):
def __init__(
self,
query_results: list[dict[str, Any]],
config: Any,
providers: dict[str, DataProvider],
):
self.query_results = query_results
self.config = config
self.providers = providers
@abstractmethod
def run(self) -> dict[str, Any]:
pass
Adding a New Reporter
Reference to reporter implementation:
class Reporter(ABC):
@abstractmethod
def generate_report(self, test_results: list[dict[str, Any]]) -> None:
pass