Real-Time Data Pipeline
Challenge
Build a scalable ETL pipeline inspired by Netflix and Uber's analytics platforms
Real-Time Data Pipeline Challenge
🎯 Challenge Overview
Build a high-throughput, fault-tolerant data pipeline that can ingest, process, and analyze streaming data in real-time. This challenge is inspired by the data platforms used by Netflix (viewing analytics), Uber (ride matching and pricing), and Airbnb (booking and pricing optimization), where processing millions of events per second is crucial for business operations.
🏢 Real-World Context
Modern data-driven companies process massive amounts of streaming data:
- Netflix: Processes 8+ trillion events daily for recommendations and content optimization
- Uber: Handles 15+ million trips daily with real-time pricing and driver matching
- Airbnb: Analyzes 150+ million user interactions daily for pricing and search ranking
- Spotify: Processes 2.5+ billion streaming events daily for music recommendations
- LinkedIn: Ingests 1+ trillion member actions monthly for feed personalization
Your challenge is to build a system that can handle high-velocity data streams while maintaining low-latency processing and ensuring data consistency.
📋 Requirements
Core Features
-
Data Ingestion:
- Handle 100,000+ events per second
- Support multiple data sources (APIs, databases, files)
- Schema validation and evolution
- Dead letter queue for failed messages
-
Stream Processing:
- Real-time transformations and aggregations
- Windowed analytics (sliding/tumbling windows)
- Stateful processing with exactly-once semantics
- Complex event processing and pattern detection
-
Data Storage & Serving:
- Time-series data for metrics and monitoring
- OLAP cube for business intelligence queries
- Real-time dashboards and alerting
- Data lake for historical analysis
Technical Specifications
- Process 100k+ events/second with <1 second latency
- 99.9% uptime with automatic failover
- Horizontal scaling across multiple nodes
- Support for both structured and unstructured data
- Exactly-once processing guarantees
🛠 Implementation Guide
Phase 1: Kafka-Based Data Ingestion
# Kafka producer for high-throughput ingestion
from kafka import KafkaProducer
import json
import time
from typing import Dict, Any
class EventProducer:
def __init__(self, bootstrap_servers: str, topic: str):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
batch_size=16384, # Batch size for efficiency
linger_ms=10, # Wait time for batching
compression_type='gzip',
retries=3,
acks='all' # Wait for all replicas
)
self.topic = topic
def send_event(self, event: Dict[str, Any]):
"""Send event with timestamp and partition key"""
enriched_event = {
**event,
'timestamp': int(time.time() * 1000),
'pipeline_id': 'data-pipeline-v1'
}
# Use user_id as partition key for ordered processing
partition_key = str(event.get('user_id', 'default'))
future = self.producer.send(
self.topic,
value=enriched_event,
key=partition_key.encode('utf-8')
)
return future
# Example usage
producer = EventProducer('localhost:9092', 'user-events')
# Simulate Netflix-style viewing events
viewing_event = {
'event_type': 'video_play',
'user_id': 'user_12345',
'content_id': 'movie_67890',
'device_type': 'smart_tv',
'quality': '4k',
'location': 'US-CA-SF'
}
producer.send_event(viewing_event)
Phase 2: Spark Streaming for Real-Time Processing
# Apache Spark Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
def create_spark_session():
return SparkSession.builder \
.appName("RealTimeDataPipeline") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
.getOrCreate()
def process_user_events(spark):
# Define schema for incoming events
event_schema = StructType([
StructField("event_type", StringType()),
StructField("user_id", StringType()),
StructField("content_id", StringType()),
StructField("device_type", StringType()),
StructField("timestamp", LongType()),
StructField("location", StringType())
])
# Read from Kafka
events_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"), event_schema).alias("data")) \
.select("data.*")
# Real-time aggregations
windowed_stats = events_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("event_type"),
col("device_type")
) \
.agg(
count("*").alias("event_count"),
countDistinct("user_id").alias("unique_users"),
avg("timestamp").alias("avg_timestamp")
)
# Write to multiple sinks
query = windowed_stats.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime='30 seconds') \
.start()
return query
# Complex Event Processing - Detect user behavior patterns
def detect_binge_watching(events_df):
"""Detect users watching multiple episodes in sequence"""
user_sessions = events_df \
.filter(col("event_type") == "video_complete") \
.withWatermark("timestamp", "2 hours") \
.groupBy(
col("user_id"),
window(col("timestamp"), "4 hours", "1 hour")
) \
.agg(
count("*").alias("videos_watched"),
collect_list("content_id").alias("content_sequence")
) \
.filter(col("videos_watched") >= 3) # 3+ videos = binge session
return user_sessions
Phase 3: Time-Series Analytics and Monitoring
# InfluxDB integration for time-series metrics
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
class MetricsCollector:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.org = org
def write_pipeline_metrics(self, metrics: Dict[str, Any]):
"""Write pipeline performance metrics"""
point = Point("pipeline_metrics") \
.tag("pipeline_id", metrics.get("pipeline_id")) \
.tag("stage", metrics.get("stage")) \
.field("throughput_events_per_sec", metrics.get("throughput", 0)) \
.field("latency_ms", metrics.get("latency_ms", 0)) \
.field("error_rate", metrics.get("error_rate", 0)) \
.field("cpu_usage", metrics.get("cpu_usage", 0)) \
.field("memory_usage_mb", metrics.get("memory_usage_mb", 0))
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def write_business_metrics(self, metrics: Dict[str, Any]):
"""Write business KPIs"""
point = Point("business_metrics") \
.tag("metric_type", metrics.get("type")) \
.field("active_users", metrics.get("active_users", 0)) \
.field("conversion_rate", metrics.get("conversion_rate", 0)) \
.field("revenue_usd", metrics.get("revenue", 0)) \
.field("engagement_score", metrics.get("engagement", 0))
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
# Real-time alerting system
class AlertManager:
def __init__(self, slack_webhook: str = None):
self.slack_webhook = slack_webhook
self.alert_thresholds = {
'high_error_rate': 0.05, # 5% error rate
'low_throughput': 1000, # <1k events/sec
'high_latency': 5000 # >5 second latency
}
def check_alerts(self, metrics: Dict[str, Any]):
"""Check metrics against thresholds and send alerts"""
alerts = []
if metrics.get('error_rate', 0) > self.alert_thresholds['high_error_rate']:
alerts.append(f"🚨 High error rate: {metrics['error_rate']:.2%}")
if metrics.get('throughput', 0) < self.alert_thresholds['low_throughput']:
alerts.append(f"⚠️ Low throughput: {metrics['throughput']} events/sec")
if metrics.get('latency_ms', 0) > self.alert_thresholds['high_latency']:
alerts.append(f"🐌 High latency: {metrics['latency_ms']}ms")
for alert in alerts:
self.send_alert(alert)
def send_alert(self, message: str):
"""Send alert to monitoring channels"""
print(f"ALERT: {message}")
# Implementation for Slack, PagerDuty, etc.
📊 Success Metrics
- Throughput: Process 100k+ events per second
- Latency: End-to-end processing <1 second (95th percentile)
- Availability: 99.9% uptime with <30 second recovery
- Accuracy: 99.99% data accuracy with exactly-once processing
- Scalability: Linear scaling with cluster size
🎁 Bonus Challenges
- Machine Learning Integration: Real-time feature engineering for ML models
- Multi-Cloud Deployment: Deploy across AWS, GCP, and Azure
- Schema Evolution: Handle schema changes without downtime
- Backpressure Handling: Graceful degradation during traffic spikes
- Cost Optimization: Dynamic scaling based on usage patterns
- Data Lineage: Track data flow and transformations for compliance
📚 Learning Outcomes
After completing this challenge, you'll understand:
- Event-driven architecture and stream processing concepts
- Apache Kafka for high-throughput message queuing
- Apache Spark for distributed stream processing
- Time-series databases and analytics
- Monitoring and alerting for data pipelines
- Scalability patterns for big data systems
- Data consistency and fault tolerance strategies
🔧 Technical Stack
- Message Queue: Apache Kafka or Amazon Kinesis
- Stream Processing: Apache Spark, Apache Flink, or Kafka Streams
- Time-Series DB: InfluxDB, TimescaleDB, or Prometheus
- OLAP: Apache Druid, ClickHouse, or BigQuery
- Monitoring: Grafana, DataDog, or custom dashboards
- Orchestration: Apache Airflow or Kubernetes
🚀 Getting Started
- Set up Kafka cluster with multiple topics and partitions
- Implement event producers for different data sources
- Build Spark Streaming jobs for real-time processing
- Add time-series database for metrics storage
- Create monitoring dashboards and alerting rules
- Implement data quality checks and error handling
- Add automated scaling and deployment
🔗 Helpful Resources
- Apache Kafka Documentation
- Spark Structured Streaming
- Designing Data-Intensive Applications
- Netflix Data Pipeline Architecture
- Uber's Real-Time Analytics
🎯 Sample Event Schema
{
"user_event": {
"event_id": "evt_1234567890",
"event_type": "video_play",
"user_id": "user_12345",
"session_id": "sess_abcdef",
"timestamp": 1633024800000,
"properties": {
"content_id": "movie_67890",
"device_type": "smart_tv",
"video_quality": "4k",
"playback_position": 0,
"duration_ms": 7200000
},
"context": {
"location": "US-CA-SF",
"user_agent": "Netflix/1.0",
"ip_address": "192.168.1.1"
}
}
}
Ready to build the data pipeline that powers the next unicorn startup? Start coding and experience the challenges of processing data at Netflix scale! 🚀📊
Architect's View
I think this logic should sit in the Domain layer, in a service object called RecommendationService maybe? This would work with entities such as Customer, Review, Product. Of course this service will eventually be orchestrated by use cases in the Application layer, but for now a simple Test Harness will suffice to provide the inputs and display the results. Slug: RealTimeDataPipeline
Layers