justEhmadSaeed/traffic_processing_pipeline
a real-time data ingestion and processing pipeline for traffic events
Build a real-time data ingestion and processing pipeline for traffic events
The system will:
- Ingest live traffic sensor data from different road sensors using Kafka.
- Process the data in real-time using PySpark Structured Streaming.
- Perform data quality validation to ensure that:
- No missing or corrupted records exist.
- The sensor data is within valid ranges.
- Duplicates are handled properly.
- Aggregate traffic patterns to analyze trends (e.g., sudden speed drops, high congestion)
- Write the final processed data back to kafka topics for real-time dashboards.
Project Setup
Kafka Setup
-
Launch the docker containers with the following command:
docker-compose -f docker-compose.yml up -d
-
Verify that the containers are running:
docker-compose -f docker-compose.yml ps
-
Get inside the docker container running kafka broker and create kafka topics for traffic events and analysis:
docker exec -it kafka1 bashunset KAFKA_OPTS kafka-topics --create --topic traffic_data --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 kafka-topics --create --topic traffic_analysis --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 -
Confirm the topics were created successfully:
kafka-topics --list --bootstrap-server localhost:9092
-
Close the docker terminal and create a virtual environment and install the required dependencies:
python3 -m venv venv source venv/bin/activate pip install -r requirements.txt -
Run the producer to start sending traffic data to the Kafka topic:
python producer.py
-
Run the consumer to start processing the traffic data:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5 streaming.py
-
Monitor the output in the console to see the processed data.
Prometheus and Grafana Setup
-
Open http://localhost:9090/targets in your web browser to verify that Prometheus is scraping the metrics from the Kafka broker and Spark job.
-
Ensure that the kafka1:7071 and host.docker.internal:8000 targets are up and running.
-
You can query Kafka metrics by going to the "Graph" tab in Prometheus and entering the following queries:
vehicle_count_total
-
Open Grafana in your web browser at
http://localhost:3000(default: admin/admin). -
Configuration > Data Sources > Add Prometheus data source:
- URL:
http://prometheus:9090 - Click "Save & Test" to verify the connection.
- URL:
-
Create a New Dashboard and add visualization.
-
Use the following queries to create visualizations:
vehicle_count_total
-
Save the dashboard and view the real-time traffic data.
Project Architecture
Query Analysis Implementation Notes
Compute Real time traffic volume
For Computing real time traffic volume we grouped data by sensor_id in 5 minutes window. After that we used agg and sum functions to compute total vehicle count per sensor.
Relevant Docs:
Detect Congestion Hotspots
For computing congestion hotspots, we used the lag function to go 1 and 2 steps behind, then we filtered those records where congestion_level remained high in all 3 intervals.
Relevant Docs:
Calculate Avg Speed per sensor
For computing average speed per sensor we again took the help of agg, avg and groupBy methods but this time we used an overlapping window of 10 minutes with 5 minutes of overlap to get a better average.
Relevant Docs:
Identify Sudden speed drops
For identifying sudden speed drops we used lag function and compared speed at previous interval with speed at next interval. If the change in average_speed was above or equal to 0.5 we termed it as sudden speed drop.
Relevant Docs:
Find the 3 Busiest Sensors
For identifying the busiest sensor we grouped data into 30 minutes window using timestamp and sensor_id and took sum of the vechicle count per sensor. After that we ordered the results in descending order and picked the top 3 results.
Relevant Docs:
