Real-Time Data Pipeline

Challenge

Build a scalable ETL pipeline inspired by Netflix and Uber's analytics platforms

Real-Time Data Pipeline Challenge

🎯 Challenge Overview

Build a high-throughput, fault-tolerant data pipeline that can ingest, process, and analyze streaming data in real-time. This challenge is inspired by the data platforms used by Netflix (viewing analytics), Uber (ride matching and pricing), and Airbnb (booking and pricing optimization), where processing millions of events per second is crucial for business operations.

🏢 Real-World Context

Modern data-driven companies process massive amounts of streaming data:

  • Netflix: Processes 8+ trillion events daily for recommendations and content optimization
  • Uber: Handles 15+ million trips daily with real-time pricing and driver matching
  • Airbnb: Analyzes 150+ million user interactions daily for pricing and search ranking
  • Spotify: Processes 2.5+ billion streaming events daily for music recommendations
  • LinkedIn: Ingests 1+ trillion member actions monthly for feed personalization

Your challenge is to build a system that can handle high-velocity data streams while maintaining low-latency processing and ensuring data consistency.

📋 Requirements

Core Features

  1. Data Ingestion:

    • Handle 100,000+ events per second
    • Support multiple data sources (APIs, databases, files)
    • Schema validation and evolution
    • Dead letter queue for failed messages
  2. Stream Processing:

    • Real-time transformations and aggregations
    • Windowed analytics (sliding/tumbling windows)
    • Stateful processing with exactly-once semantics
    • Complex event processing and pattern detection
  3. Data Storage & Serving:

    • Time-series data for metrics and monitoring
    • OLAP cube for business intelligence queries
    • Real-time dashboards and alerting
    • Data lake for historical analysis

Technical Specifications

  • Process 100k+ events/second with <1 second latency
  • 99.9% uptime with automatic failover
  • Horizontal scaling across multiple nodes
  • Support for both structured and unstructured data
  • Exactly-once processing guarantees

🛠 Implementation Guide

Phase 1: Kafka-Based Data Ingestion

# Kafka producer for high-throughput ingestion
from kafka import KafkaProducer
import json
import time
from typing import Dict, Any

class EventProducer:
    def __init__(self, bootstrap_servers: str, topic: str):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            batch_size=16384,  # Batch size for efficiency
            linger_ms=10,      # Wait time for batching
            compression_type='gzip',
            retries=3,
            acks='all'         # Wait for all replicas
        )
        self.topic = topic
    
    def send_event(self, event: Dict[str, Any]):
        """Send event with timestamp and partition key"""
        enriched_event = {
            **event,
            'timestamp': int(time.time() * 1000),
            'pipeline_id': 'data-pipeline-v1'
        }
        
        # Use user_id as partition key for ordered processing
        partition_key = str(event.get('user_id', 'default'))
        
        future = self.producer.send(
            self.topic,
            value=enriched_event,
            key=partition_key.encode('utf-8')
        )
        
        return future

# Example usage
producer = EventProducer('localhost:9092', 'user-events')

# Simulate Netflix-style viewing events
viewing_event = {
    'event_type': 'video_play',
    'user_id': 'user_12345',
    'content_id': 'movie_67890',
    'device_type': 'smart_tv',
    'quality': '4k',
    'location': 'US-CA-SF'
}

producer.send_event(viewing_event)

Phase 2: Spark Streaming for Real-Time Processing

# Apache Spark Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def create_spark_session():
    return SparkSession.builder \
        .appName("RealTimeDataPipeline") \
        .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
        .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
        .getOrCreate()

def process_user_events(spark):
    # Define schema for incoming events
    event_schema = StructType([
        StructField("event_type", StringType()),
        StructField("user_id", StringType()),
        StructField("content_id", StringType()),
        StructField("device_type", StringType()),
        StructField("timestamp", LongType()),
        StructField("location", StringType())
    ])
    
    # Read from Kafka
    events_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "user-events") \
        .option("startingOffsets", "latest") \
        .load() \
        .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
        .select("data.*")
    
    # Real-time aggregations
    windowed_stats = events_df \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "5 minutes"),
            col("event_type"),
            col("device_type")
        ) \
        .agg(
            count("*").alias("event_count"),
            countDistinct("user_id").alias("unique_users"),
            avg("timestamp").alias("avg_timestamp")
        )
    
    # Write to multiple sinks
    query = windowed_stats.writeStream \
        .outputMode("update") \
        .format("console") \
        .option("truncate", False) \
        .trigger(processingTime='30 seconds') \
        .start()
    
    return query

# Complex Event Processing - Detect user behavior patterns
def detect_binge_watching(events_df):
    """Detect users watching multiple episodes in sequence"""
    
    user_sessions = events_df \
        .filter(col("event_type") == "video_complete") \
        .withWatermark("timestamp", "2 hours") \
        .groupBy(
            col("user_id"),
            window(col("timestamp"), "4 hours", "1 hour")
        ) \
        .agg(
            count("*").alias("videos_watched"),
            collect_list("content_id").alias("content_sequence")
        ) \
        .filter(col("videos_watched") >= 3)  # 3+ videos = binge session
    
    return user_sessions

Phase 3: Time-Series Analytics and Monitoring

# InfluxDB integration for time-series metrics
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

class MetricsCollector:
    def __init__(self, url: str, token: str, org: str, bucket: str):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.bucket = bucket
        self.org = org
    
    def write_pipeline_metrics(self, metrics: Dict[str, Any]):
        """Write pipeline performance metrics"""
        point = Point("pipeline_metrics") \
            .tag("pipeline_id", metrics.get("pipeline_id")) \
            .tag("stage", metrics.get("stage")) \
            .field("throughput_events_per_sec", metrics.get("throughput", 0)) \
            .field("latency_ms", metrics.get("latency_ms", 0)) \
            .field("error_rate", metrics.get("error_rate", 0)) \
            .field("cpu_usage", metrics.get("cpu_usage", 0)) \
            .field("memory_usage_mb", metrics.get("memory_usage_mb", 0))
        
        self.write_api.write(bucket=self.bucket, org=self.org, record=point)
    
    def write_business_metrics(self, metrics: Dict[str, Any]):
        """Write business KPIs"""
        point = Point("business_metrics") \
            .tag("metric_type", metrics.get("type")) \
            .field("active_users", metrics.get("active_users", 0)) \
            .field("conversion_rate", metrics.get("conversion_rate", 0)) \
            .field("revenue_usd", metrics.get("revenue", 0)) \
            .field("engagement_score", metrics.get("engagement", 0))
        
        self.write_api.write(bucket=self.bucket, org=self.org, record=point)

# Real-time alerting system
class AlertManager:
    def __init__(self, slack_webhook: str = None):
        self.slack_webhook = slack_webhook
        self.alert_thresholds = {
            'high_error_rate': 0.05,  # 5% error rate
            'low_throughput': 1000,   # <1k events/sec
            'high_latency': 5000      # >5 second latency
        }
    
    def check_alerts(self, metrics: Dict[str, Any]):
        """Check metrics against thresholds and send alerts"""
        alerts = []
        
        if metrics.get('error_rate', 0) > self.alert_thresholds['high_error_rate']:
            alerts.append(f"🚨 High error rate: {metrics['error_rate']:.2%}")
        
        if metrics.get('throughput', 0) < self.alert_thresholds['low_throughput']:
            alerts.append(f"⚠️ Low throughput: {metrics['throughput']} events/sec")
        
        if metrics.get('latency_ms', 0) > self.alert_thresholds['high_latency']:
            alerts.append(f"🐌 High latency: {metrics['latency_ms']}ms")
        
        for alert in alerts:
            self.send_alert(alert)
    
    def send_alert(self, message: str):
        """Send alert to monitoring channels"""
        print(f"ALERT: {message}")
        # Implementation for Slack, PagerDuty, etc.

📊 Success Metrics

  • Throughput: Process 100k+ events per second
  • Latency: End-to-end processing <1 second (95th percentile)
  • Availability: 99.9% uptime with <30 second recovery
  • Accuracy: 99.99% data accuracy with exactly-once processing
  • Scalability: Linear scaling with cluster size

🎁 Bonus Challenges

  1. Machine Learning Integration: Real-time feature engineering for ML models
  2. Multi-Cloud Deployment: Deploy across AWS, GCP, and Azure
  3. Schema Evolution: Handle schema changes without downtime
  4. Backpressure Handling: Graceful degradation during traffic spikes
  5. Cost Optimization: Dynamic scaling based on usage patterns
  6. Data Lineage: Track data flow and transformations for compliance

📚 Learning Outcomes

After completing this challenge, you'll understand:

  • Event-driven architecture and stream processing concepts
  • Apache Kafka for high-throughput message queuing
  • Apache Spark for distributed stream processing
  • Time-series databases and analytics
  • Monitoring and alerting for data pipelines
  • Scalability patterns for big data systems
  • Data consistency and fault tolerance strategies

🔧 Technical Stack

  • Message Queue: Apache Kafka or Amazon Kinesis
  • Stream Processing: Apache Spark, Apache Flink, or Kafka Streams
  • Time-Series DB: InfluxDB, TimescaleDB, or Prometheus
  • OLAP: Apache Druid, ClickHouse, or BigQuery
  • Monitoring: Grafana, DataDog, or custom dashboards
  • Orchestration: Apache Airflow or Kubernetes

🚀 Getting Started

  1. Set up Kafka cluster with multiple topics and partitions
  2. Implement event producers for different data sources
  3. Build Spark Streaming jobs for real-time processing
  4. Add time-series database for metrics storage
  5. Create monitoring dashboards and alerting rules
  6. Implement data quality checks and error handling
  7. Add automated scaling and deployment

🔗 Helpful Resources

🎯 Sample Event Schema

{
  "user_event": {
    "event_id": "evt_1234567890",
    "event_type": "video_play",
    "user_id": "user_12345",
    "session_id": "sess_abcdef",
    "timestamp": 1633024800000,
    "properties": {
      "content_id": "movie_67890",
      "device_type": "smart_tv",
      "video_quality": "4k",
      "playback_position": 0,
      "duration_ms": 7200000
    },
    "context": {
      "location": "US-CA-SF",
      "user_agent": "Netflix/1.0",
      "ip_address": "192.168.1.1"
    }
  }
}

Ready to build the data pipeline that powers the next unicorn startup? Start coding and experience the challenges of processing data at Netflix scale! 🚀📊

Architect's View

I think this logic should sit in the Domain layer, in a service object called RecommendationService maybe? This would work with entities such as Customer, Review, Product. Of course this service will eventually be orchestrated by use cases in the Application layer, but for now a simple Test Harness will suffice to provide the inputs and display the results. Slug: RealTimeDataPipeline

Layers

infrastructure
application
domain

© 2025 Sliced Logic. All rights reserved.

Real-Time Data Pipeline