Lê Duy Khương (Daniel)

Năng suất & công cụ dev

Apache Spark — Hướng dẫn đào tạo (Phần 2)

Structured Streaming, MLlib, credit scoring, fraud detection.

2026-03-1717 phút đọcVI

Structured Streaming & MLlib for Lakehouse

Đối tượng: Data Engineers, Data Scientists, ML Engineers Thời lượng đào tạo: 3-4 giờ (Phần 2) Cập nhật lần cuối: 26/06/2025 Phiên bản: 1.0.0 Tiền đề: Hoàn thành Phần 1 - Spark Fundamentals


Mục lục

  1. Structured Streaming Fundamentals
  2. Real-time Analytics cho Lakehouse
  3. MLlib Machine Learning
  4. Credit Scoring Models
  5. Fraud Detection System
  6. Advanced Hands-on Labs

Structured Streaming Fundamentals

Structured Streaming trong Lakehouse Real-time Analytics

Structured Streaming là scalable và fault-tolerant stream processing engine được xây dựng trên Spark SQL engine, cho phép Lakehouse xử lý dữ liệu streaming theo cách tương tự như batch processing.

Kiến trúc Stream Processing cho Lakehouse

┌─────────────────────────────────────────────────────────────────┐
│           Lakehouse Real-time Data Processing Architecture            │
├─────────────────────────────────────────────────────────────────┤
│ Data Sources (Streaming)                                        │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│
│ │   Kafka     │ │ Mobile App  │ │   ATM       │ │   Website   ││
│ │ (Core Bus)  │ │ Events      │ │ Transactions│ │   Activity  ││
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘│
│                               ↓                                │
├─────────────────────────────────────────────────────────────────┤
│ Structured Streaming Processing                                 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐            │
│ │ Event        │ │ Aggregation  │ │ ML Inference │            │
│ │ Enrichment   │ │ & Windowing  │ │ (Real-time)  │            │
│ │              │ │              │ │              │            │
│ │ • Customer   │ │ • Tumbling   │ │ • Fraud      │            │
│ │   Profile    │ │   Windows    │ │   Detection  │            │
│ │ • Historical │ │ • Sliding    │ │ • Credit     │            │
│ │   Behavior   │ │   Windows    │ │   Scoring    │            │
│ │ • Rules      │ │ • Session    │ │ • Risk       │            │
│ │   Engine     │ │   Windows    │ │   Assessment │            │
│ └──────────────┘ └──────────────┘ └──────────────┘            │
│                               ↓                                │
├─────────────────────────────────────────────────────────────────┤
│ Real-time Outputs                                               │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐            │
│ │   Alerts     │ │ Dashboards   │ │  Data Lake   │            │
│ │ (Kafka/SMS)  │ │ (WebSocket)  │ │  (MinIO)     │            │
│ └──────────────┘ └──────────────┘ └──────────────┘            │
└─────────────────────────────────────────────────────────────────┘

Core Concepts của Structured Streaming

# Khái niệm cơ bản về Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
 
def structured_streaming_concepts():
    """
    Giải thích các khái niệm cơ bản của Structured Streaming
    """
    spark = SparkSession.builder \
        .appName("StructuredStreaming-Concepts") \
        .master("spark://spark-master:7077") \
        .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
        .getOrCreate()
    
    print("🌊 Structured Streaming Core Concepts")
    print("="*50)
    
    # 1. Input Sources
    print("1. Input Sources cho Lakehouse:")
    print("   📡 Kafka Topics:")
    print("      - customer-events: Hoạt động khách hàng")
    print("      - lakehouse-transaction-events: Giao dịch real-time")
    print("      - lakehouse-loan-events: Sự kiện vay vốn")
    print("      - lakehouse-system-events: Log hệ thống")
    
    # 2. Streaming DataFrame
    print("\n2. Streaming DataFrame - Unbounded Table:")
    print("   📊 Mỗi streaming DataFrame được coi như một bảng vô hạn")
    print("   🔄 Dữ liệu mới được append liên tục")
    print("   ⚡ Query engine tự động xử lý dữ liệu mới")
    
    # 3. Output Modes
    print("\n3. Output Modes:")
    output_modes = {
        "Append": "Chỉ output các records mới (default cho non-aggregation)",
        "Complete": "Output toàn bộ result table (cho aggregation)",
        "Update": "Chỉ output records có update (cho aggregation with watermark)"
    }
    
    for mode, description in output_modes.items():
        print(f"   • {mode}: {description}")
    
    # 4. Triggers
    print("\n4. Processing Triggers:")
    triggers = {
        "Default": "Xử lý micro-batch ngay khi batch trước hoàn thành",
        "Fixed Interval": "Xử lý theo interval cố định (ví dụ: 30 giây)",
        "One-time": "Xử lý một lần rồi dừng (useful for testing)",
        "Continuous": "Low-latency processing (experimental)"
    }
    
    for trigger, description in triggers.items():
        print(f"   • {trigger}: {description}")
    
    spark.stop()
 
# Chạy concepts demo
structured_streaming_concepts()

Thiết lập Kafka Integration

# Cấu hình Kafka integration cho Structured Streaming
def setup_kafka_streaming():
    """
    Thiết lập kết nối Kafka cho Structured Streaming
    """
    spark = SparkSession.builder \
        .appName("Kafka-Streaming") \
        .master("spark://spark-master:7077") \
        .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
        .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
        .getOrCreate()
    
    # Kafka configuration
    kafka_config = {
        "kafka.bootstrap.servers": "kafka:9092",
        "subscribe": "customer-events",
        "startingOffsets": "latest",
        "failOnDataLoss": "false"
    }
    
    print("🔗 Kafka Streaming Configuration:")
    for key, value in kafka_config.items():
        print(f"   {key}: {value}")
    
    # Đọc từ Kafka
    kafka_stream = spark \
        .readStream \
        .format("kafka") \
        .options(**kafka_config) \
        .load()
    
    print("\n📋 Kafka Stream Schema:")
    kafka_stream.printSchema()
    
    # Parse JSON data từ Kafka
    json_schema = StructType([
        StructField("customer_id", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("timestamp", TimestampType(), True),
        StructField("amount", DoubleType(), True),
        StructField("location", StringType(), True),
        StructField("device_info", StringType(), True)
    ])
    
    parsed_stream = kafka_stream.select(
        col("key").cast("string").alias("message_key"),
        from_json(col("value").cast("string"), json_schema).alias("data"),
        col("timestamp").alias("kafka_timestamp"),
        col("partition"),
        col("offset")
    ).select("message_key", "data.*", "kafka_timestamp", "partition", "offset")
    
    print("\n📊 Parsed Stream Schema:")
    parsed_stream.printSchema()
    
    return spark, parsed_stream
 
# Test Kafka setup
# spark, stream = setup_kafka_streaming()

Real-time Analytics cho Lakehouse

Real-time Customer Activity Monitoring

def realtime_customer_monitoring():
    """
    Giám sát hoạt động khách hàng real-time
    """
    spark, customer_stream = setup_kafka_streaming()
    
    print("👥 Real-time Customer Activity Monitoring")
    print("="*50)
    
    # 1. Basic stream processing - Customer activity count
    activity_counts = customer_stream \
        .groupBy(
            window(col("timestamp"), "1 minute"),
            col("customer_id"),
            col("event_type")
        ) \
        .count() \
        .withColumn("window_start", col("window.start")) \
        .withColumn("window_end", col("window.end")) \
        .drop("window")
    
    # 2. Suspicious activity detection
    suspicious_activity = customer_stream \
        .groupBy(
            window(col("timestamp"), "5 minutes"),
            col("customer_id")
        ) \
        .agg(
            count("*").alias("total_events"),
            countDistinct("location").alias("unique_locations"),
            sum(when(col("event_type") == "LOGIN_FAILED", 1).otherwise(0)).alias("failed_logins"),
            sum(when(col("event_type") == "TRANSACTION", col("amount")).otherwise(0)).alias("total_amount")
        ) \
        .withColumn("risk_score",
            when(col("failed_logins") >= 3, 100)
            .when(col("unique_locations") >= 3, 80)
            .when(col("total_amount") > 10000000, 70)  # > 10M VND
            .when(col("total_events") >= 20, 60)
            .otherwise(0)
        ) \
        .filter(col("risk_score") > 50)
    
    # 3. Customer behavior analytics
    behavior_analytics = customer_stream \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "10 minutes", "5 minutes"),  # Sliding window
            col("customer_id")
        ) \
        .agg(
            count("*").alias("activity_count"),
            countDistinct("event_type").alias("event_diversity"),
            avg("amount").alias("avg_transaction_amount"),
            max("amount").alias("max_transaction_amount"),
            collect_list("location").alias("locations_visited")
        ) \
        .withColumn("behavior_score",
            col("activity_count") * 0.3 +
            col("event_diversity") * 0.4 +
            (col("avg_transaction_amount") / 1000000) * 0.3  # Normalize amount
        )
    
    # Output streams
    print("📊 Setting up output streams...")
    
    # Console output cho development
    query1 = activity_counts.writeStream \
        .outputMode("update") \
        .format("console") \
        .option("truncate", False) \
        .trigger(processingTime="30 seconds") \
        .start()
    
    # File output cho persistent storage
    query2 = behavior_analytics.writeStream \
        .outputMode("append") \
        .format("parquet") \
        .option("path", "/tmp/lakehouse-customer-behavior") \
        .option("checkpointLocation", "/tmp/lakehouse-behavior-checkpoint") \
        .trigger(processingTime="1 minute") \
        .start()
    
    # Kafka output cho alerts
    alert_stream = suspicious_activity.select(
        col("customer_id").alias("key"),
        to_json(struct("*")).alias("value")
    )
    
    query3 = alert_stream.writeStream \
        .outputMode("update") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "lakehouse-customer-alerts") \
        .option("checkpointLocation", "/tmp/lakehouse-alerts-checkpoint") \
        .start()
    
    return [query1, query2, query3]

Real-time Fraud Detection

def realtime_fraud_detection():
    """
    Hệ thống phát hiện gian lận real-time
    """
    spark, transaction_stream = setup_kafka_streaming()
    
    print("🚨 Real-time Fraud Detection System")
    print("="*50)
    
    # Load customer profile data (batch) để enrich stream
    customer_profiles = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
        .option("dbtable", "core.customers") \
        .option("user", "lakehouse_admin") \
        .option("password", "CHANGE_ME_PASSWORD") \
        .load() \
        .select("customer_id", "customer_code", "credit_score", "kyc_status")
    
    # Broadcast customer profiles để join với stream
    broadcast_profiles = broadcast(customer_profiles)
    
    # Enrich stream với customer data
    enriched_stream = transaction_stream \
        .join(broadcast_profiles, "customer_id", "left") \
        .withColumn("hour_of_day", hour(col("timestamp"))) \
        .withColumn("day_of_week", dayofweek(col("timestamp")))
    
    # Fraud detection rules
    fraud_indicators = enriched_stream \
        .withColumn("fraud_indicators", 
            array(
                # Rule 1: Large transaction outside business hours
                when((col("amount") > 5000000) & 
                     ((col("hour_of_day") < 6) | (col("hour_of_day") > 22)), 
                     lit("LARGE_AMOUNT_OFF_HOURS")).otherwise(lit(None)),
                
                # Rule 2: Multiple locations in short time
                when(col("event_type") == "ATM_WITHDRAWAL", 
                     lit("LOCATION_CHECK_NEEDED")).otherwise(lit(None)),
                
                # Rule 3: Transaction amount vs credit score mismatch
                when((col("amount") > 10000000) & (col("credit_score") < 600), 
                     lit("AMOUNT_CREDIT_MISMATCH")).otherwise(lit(None)),
                
                # Rule 4: Pending KYC with large transaction
                when((col("kyc_status") == "PENDING") & (col("amount") > 2000000), 
                     lit("PENDING_KYC_LARGE_AMOUNT")).otherwise(lit(None))
            )
        ) \
        .withColumn("fraud_indicators_clean", 
                   filter(col("fraud_indicators"), lambda x: x.isNotNull())) \
        .withColumn("fraud_score", size(col("fraud_indicators_clean")) * 25) \
        .filter(col("fraud_score") > 0)
    
    # Aggregate potential fraud by customer
    fraud_summary = fraud_indicators \
        .withWatermark("timestamp", "5 minutes") \
        .groupBy(
            window(col("timestamp"), "10 minutes"),
            col("customer_id"),
            col("customer_code")
        ) \
        .agg(
            count("*").alias("suspicious_transactions"),
            sum("amount").alias("total_suspicious_amount"),
            max("fraud_score").alias("max_fraud_score"),
            collect_set("fraud_indicators_clean").alias("all_fraud_indicators")
        ) \
        .withColumn("final_fraud_score",
            col("max_fraud_score") + 
            (col("suspicious_transactions") * 10) +
            when(col("total_suspicious_amount") > 50000000, 50).otherwise(0)
        ) \
        .filter(col("final_fraud_score") >= 75)  # High-confidence fraud
    
    # Real-time ML inference (simplified)
    ml_fraud_detection = enriched_stream \
        .withColumn("ml_features", 
            array(
                col("amount").cast("double"),
                col("hour_of_day").cast("double"),
                col("credit_score").cast("double"),
                when(col("kyc_status") == "APPROVED", 1.0).otherwise(0.0)
            )
        ) \
        .withColumn("ml_fraud_probability",
            # Simplified ML model (trong thực tế sẽ load trained model)
            when(
                (col("amount") > 10000000) & 
                (col("credit_score") < 650) & 
                ((col("hour_of_day") < 6) | (col("hour_of_day") > 22)),
                0.95
            ).when(
                (col("amount") > 5000000) & (col("credit_score") < 700),
                0.75
            ).when(
                col("amount") > 20000000,
                0.85
            ).otherwise(0.1)
        ) \
        .filter(col("ml_fraud_probability") > 0.7)
    
    # Output cho fraud alerts
    fraud_alerts = fraud_summary.select(
        col("customer_id"),
        col("customer_code"),
        col("final_fraud_score"),
        col("total_suspicious_amount"),
        col("all_fraud_indicators"),
        current_timestamp().alias("alert_time")
    )
    
    # Write fraud alerts to multiple sinks
    # 1. Database cho investigation team
    fraud_db_query = fraud_alerts.writeStream \
        .outputMode("append") \
        .foreachBatch(lambda batch_df, batch_id: 
            batch_df.write \
                .format("jdbc") \
                .option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
                .option("dbtable", "lakehouse_alerts.fraud_alerts") \
                .option("user", "lakehouse_admin") \
                .option("password", "CHANGE_ME_PASSWORD") \
                .mode("append") \
                .save()
        ) \
        .option("checkpointLocation", "/tmp/lakehouse-fraud-db-checkpoint") \
        .start()
    
    # 2. Kafka cho real-time notifications
    fraud_kafka_query = fraud_alerts.select(
        col("customer_id").alias("key"),
        to_json(struct("*")).alias("value")
    ).writeStream \
        .outputMode("append") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "lakehouse-fraud-alerts") \
        .option("checkpointLocation", "/tmp/lakehouse-fraud-kafka-checkpoint") \
        .start()
    
    # 3. File system cho audit trail
    fraud_audit_query = fraud_alerts.writeStream \
        .outputMode("append") \
        .format("delta") \
        .option("path", "s3a://processed-data/fraud-alerts/") \
        .option("checkpointLocation", "/tmp/lakehouse-fraud-audit-checkpoint") \
        .trigger(processingTime="2 minutes") \
        .start()
    
    return [fraud_db_query, fraud_kafka_query, fraud_audit_query]

Real-time Risk Assessment

def realtime_risk_assessment():
    """
    Đánh giá rủi ro real-time cho quyết định vay vốn
    """
    spark, loan_application_stream = setup_kafka_streaming()
    
    print("⚖️ Real-time Risk Assessment System")
    print("="*50)
    
    # Load historical data cho risk scoring
    historical_loans = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
        .option("dbtable", """
            (SELECT 
                customer_id,
                COUNT(*) as historical_loan_count,
                SUM(loan_amount) as total_historical_amount,
                AVG(CASE WHEN status = 'DEFAULTED' THEN 1.0 ELSE 0.0 END) as default_rate,
                MAX(loan_amount) as max_loan_amount
             FROM core.loans 
             GROUP BY customer_id) as historical_data
        """) \
        .option("user", "lakehouse_admin") \
        .option("password", "CHANGE_ME_PASSWORD") \
        .load()
    
    # Enrich loan applications với historical data
    enriched_applications = loan_application_stream \
        .join(broadcast(historical_loans), "customer_id", "left") \
        .fillna({
            "historical_loan_count": 0,
            "total_historical_amount": 0.0,
            "default_rate": 0.0,
            "max_loan_amount": 0.0
        })
    
    # Real-time risk scoring
    risk_scored_applications = enriched_applications \
        .withColumn("debt_to_income_ratio", 
                   col("total_historical_amount") / col("declared_income")) \
        .withColumn("loan_to_max_ratio", 
                   col("requested_amount") / greatest(col("max_loan_amount"), lit(1000000))) \
        .withColumn("risk_factors",
            array(
                # Credit score factor (0-40 points)
                when(col("credit_score") >= 750, 40)
                .when(col("credit_score") >= 700, 30)
                .when(col("credit_score") >= 650, 20)
                .when(col("credit_score") >= 600, 10)
                .otherwise(0).alias("credit_score_points"),
                
                # Historical performance factor (0-25 points)
                when(col("default_rate") == 0, 25)
                .when(col("default_rate") <= 0.1, 20)
                .when(col("default_rate") <= 0.2, 10)
                .otherwise(0).alias("history_points"),
                
                # Debt burden factor (0-20 points)
                when(col("debt_to_income_ratio") <= 0.3, 20)
                .when(col("debt_to_income_ratio") <= 0.5, 15)
                .when(col("debt_to_income_ratio") <= 0.7, 10)
                .otherwise(0).alias("debt_burden_points"),
                
                # Loan size factor (0-15 points)
                when(col("loan_to_max_ratio") <= 1.0, 15)
                .when(col("loan_to_max_ratio") <= 1.5, 10)
                .when(col("loan_to_max_ratio") <= 2.0, 5)
                .otherwise(0).alias("loan_size_points")
            )
        ) \
        .withColumn("total_risk_score", 
                   col("risk_factors")[0] + col("risk_factors")[1] + 
                   col("risk_factors")[2] + col("risk_factors")[3]) \
        .withColumn("risk_category",
            when(col("total_risk_score") >= 80, "LOW_RISK")
            .when(col("total_risk_score") >= 60, "MEDIUM_RISK")
            .when(col("total_risk_score") >= 40, "HIGH_RISK")
            .otherwise("VERY_HIGH_RISK")
        ) \
        .withColumn("recommendation",
            when(col("risk_category") == "LOW_RISK", "AUTO_APPROVE")
            .when(col("risk_category") == "MEDIUM_RISK", "MANUAL_REVIEW")
            .otherwise("AUTO_REJECT")
        ) \
        .withColumn("assessment_timestamp", current_timestamp())
    
    # Real-time decision making
    auto_decisions = risk_scored_applications \
        .filter(col("recommendation").isin("AUTO_APPROVE", "AUTO_REJECT"))
    
    manual_review_cases = risk_scored_applications \
        .filter(col("recommendation") == "MANUAL_REVIEW")
    
    # Market conditions adjustment (real-time)
    market_adjusted_scores = risk_scored_applications \
        .withColumn("market_adjustment",
            # Giả lập điều kiện thị trường real-time
            when(hour(current_timestamp()).between(9, 17), 5)  # Business hours bonus
            .when(month(current_timestamp()).isin(11, 12), -10)  # Year-end caution
            .otherwise(0)
        ) \
        .withColumn("final_risk_score", 
                   col("total_risk_score") + col("market_adjustment")) \
        .withColumn("final_recommendation",
            when(col("final_risk_score") >= 85, "AUTO_APPROVE")
            .when(col("final_risk_score") >= 65, "MANUAL_REVIEW")
            .otherwise("AUTO_REJECT")
        )
    
    # Output processing
    # 1. Auto-approved loans -> loan origination system
    auto_approved_query = market_adjusted_scores \
        .filter(col("final_recommendation") == "AUTO_APPROVE") \
        .select(
            "customer_id", "loan_application_id", "requested_amount",
            "final_risk_score", "risk_category", "assessment_timestamp"
        ) \
        .writeStream \
        .outputMode("append") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "lakehouse-auto-approved-loans") \
        .option("checkpointLocation", "/tmp/lakehouse-auto-approved-checkpoint") \
        .start()
    
    # 2. Manual review cases -> review queue
    manual_review_query = market_adjusted_scores \
        .filter(col("final_recommendation") == "MANUAL_REVIEW") \
        .writeStream \
        .outputMode("append") \
        .foreachBatch(lambda batch_df, batch_id:
            batch_df.write \
                .format("jdbc") \
                .option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
                .option("dbtable", "lakehouse_workflow.manual_review_queue") \
                .option("user", "lakehouse_admin") \
                .option("password", "CHANGE_ME_PASSWORD") \
                .mode("append") \
                .save()
        ) \
        .option("checkpointLocation", "/tmp/lakehouse-manual-review-checkpoint") \
        .start()
    
    # 3. All assessments -> data lake for analysis
    all_assessments_query = market_adjusted_scores \
        .writeStream \
        .outputMode("append") \
        .format("delta") \
        .option("path", "s3a://processed-data/risk-assessments/") \
        .option("checkpointLocation", "/tmp/lakehouse-risk-assessments-checkpoint") \
        .partitionBy("risk_category") \
        .trigger(processingTime="1 minute") \
        .start()
    
    return [auto_approved_query, manual_review_query, all_assessments_query]

MLlib Machine Learning

MLlib trong Lakehouse Context

MLlib cung cấp scalable machine learning library cho Lakehouse để xây dựng các mô hình:

  • Credit Scoring: Đánh giá khả năng trả nợ
  • Fraud Detection: Phát hiện gian lận
  • Customer Segmentation: Phân khúc khách hàng
  • Churn Prediction: Dự đoán khách hàng rời bỏ
  • Recommendation Systems: Gợi ý sản phẩm

ML Pipeline Architecture

from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import *
 
def lakehouse_ml_pipeline_architecture():
    """
    Kiến trúc ML Pipeline cho Lakehouse
    """
    print("🤖 Lakehouse ML Pipeline Architecture")
    print("="*50)
    
    architecture = {
        "Data Ingestion": {
            "Sources": ["PostgreSQL", "Kafka Streams", "MinIO", "External APIs"],
            "Formats": ["JSON", "Parquet", "CSV", "Avro"],
            "Volume": "1M+ records/day"
        },
        "Feature Engineering": {
            "Techniques": ["StringIndexer", "VectorAssembler", "StandardScaler", "PCA"],
            "Custom Features": ["Debt-to-Income", "Payment History", "Behavioral Patterns"],
            "Feature Store": "Delta Lake với versioning"
        },
        "Model Training": {
            "Algorithms": ["Random Forest", "Gradient Boosting", "Logistic Regression"],
            "Validation": ["Cross-validation", "Train/Validation/Test split"],
            "Hyperparameter Tuning": "Grid Search với MLlib"
        },
        "Model Serving": {
            "Batch Inference": "Spark jobs chạy hàng ngày",
            "Real-time Inference": "Structured Streaming",
            "Model Registry": "MLflow tracking"
        },
        "Monitoring": {
            "Metrics": ["Accuracy", "Precision", "Recall", "AUC-ROC"],
            "Drift Detection": "Statistical tests on predictions",
            "Performance": "Latency và throughput monitoring"
        }
    }
    
    for stage, details in architecture.items():
        print(f"\n📋 {stage}:")
        for key, value in details.items():
            if isinstance(value, list):
                print(f"   • {key}: {', '.join(value)}")
            else:
                print(f"   • {key}: {value}")

Feature Engineering Framework

def lakehouse_feature_engineering():
    """
    Feature Engineering framework cho Lakehouse ML models
    """
    spark = SparkSession.builder \
        .appName("Lakehouse-Feature-Engineering") \
        .master("spark://spark-master:7077") \
        .getOrCreate()
    
    print("⚙️ Lakehouse Feature Engineering Framework")
    print("="*50)
    
    # Load raw data
    customers_df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
        .option("dbtable", "core.customers") \
        .option("user", "lakehouse_admin") \
        .option("password", "CHANGE_ME_PASSWORD") \
        .load()
    
    loans_df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
        .option("dbtable", "core.loans") \
        .option("user", "lakehouse_admin") \
        .option("password", "CHANGE_ME_PASSWORD") \
        .load()
    
    # 1. Customer Demographics Features
    customer_features = customers_df \
        .withColumn("age", floor(datediff(current_date(), col("date_of_birth")) / 365.25)) \
        .withColumn("account_age_days", datediff(current_date(), col("created_at"))) \
        .withColumn("has_email", col("email").isNotNull().cast("int")) \
        .withColumn("has_phone", col("phone_number").isNotNull().cast("int")) \
        .withColumn("kyc_approved", (col("kyc_status") == "APPROVED").cast("int"))
    
    # 2. Loan History Features
    loan_history = loans_df \
        .groupBy("customer_id") \
        .agg(
            count("loan_id").alias("total_loans"),
            sum("loan_amount").alias("total_borrowed"),
            avg("loan_amount").alias("avg_loan_amount"),
            max("loan_amount").alias("max_loan_amount"),
            min("loan_amount").alias("min_loan_amount"),
            avg("interest_rate").alias("avg_interest_rate"),
            countDistinct("product_type").alias("product_diversity"),
            sum(when(col("status") == "APPROVED", 1).otherwise(0)).alias("approved_loans"),
            sum(when(col("status") == "DISBURSED", 1).otherwise(0)).alias("disbursed_loans"),
            sum(when(col("status") == "DEFAULTED", 1).otherwise(0)).alias("defaulted_loans"),
            max(col("application_date")).alias("last_loan_date")
        ) \
        .withColumn("approval_rate", col("approved_loans") / col("total_loans")) \
        .withColumn("disbursement_rate", col("disbursed_loans") / col("total_loans")) \
        .withColumn("default_rate", col("defaulted_loans") / col("total_loans")) \
        .withColumn("days_since_last_loan", 
                   datediff(current_date(), col("last_loan_date")))
    
    # 3. Financial Behavior Features
    financial_features = loan_history \
        .withColumn("debt_burden_category",
            when(col("total_borrowed") <= 50000000, "LOW")
            .when(col("total_borrowed") <= 200000000, "MEDIUM")
            .otherwise("HIGH")
        ) \
        .withColumn("loan_frequency_category",
            when(col("total_loans") <= 2, "OCCASIONAL")
            .when(col("total_loans") <= 5, "REGULAR")
            .otherwise("FREQUENT")
        ) \
        .withColumn("risk_indicators", 
            col("default_rate") * 50 + 
            (1 - col("approval_rate")) * 30 + 
            when(col("days_since_last_loan") < 30, 20).otherwise(0)
        )
    
    # 4. Combine all features
    ml_features = customer_features \
        .join(financial_features, "customer_id", "left") \
        .fillna({
            "total_loans": 0,
            "total_borrowed": 0.0,
            "avg_loan_amount": 0.0,
            "max_loan_amount": 0.0,
            "min_loan_amount": 0.0,
            "avg_interest_rate": 0.0,
            "product_diversity": 0,
            "approved_loans": 0,
            "disbursed_loans": 0,
            "defaulted_loans": 0,
            "approval_rate": 0.0,
            "disbursement_rate": 0.0,
            "default_rate": 0.0,
            "days_since_last_loan": 9999,
            "debt_burden_category": "LOW",
            "loan_frequency_category": "OCCASIONAL",
            "risk_indicators": 0.0
        })
    
    # 5. Feature preprocessing pipeline
    # Categorical encoding
    string_indexers = [
        StringIndexer(inputCol="debt_burden_category", outputCol="debt_burden_idx"),
        StringIndexer(inputCol="loan_frequency_category", outputCol="loan_frequency_idx")
    ]
    
    # One-hot encoding
    one_hot_encoders = [
        OneHotEncoder(inputCol="debt_burden_idx", outputCol="debt_burden_vec"),
        OneHotEncoder(inputCol="loan_frequency_idx", outputCol="loan_frequency_vec")
    ]
    
    # Feature assembly
    feature_cols = [
        "age", "account_age_days", "has_email", "has_phone", "kyc_approved",
        "credit_score", "total_loans", "total_borrowed", "avg_loan_amount",
        "product_diversity", "approval_rate", "default_rate", "risk_indicators",
        "debt_burden_vec", "loan_frequency_vec"
    ]
    
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
    
    # Feature scaling
    scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features")
    
    # Create preprocessing pipeline
    preprocessing_pipeline = Pipeline(stages=string_indexers + one_hot_encoders + [assembler, scaler])
    
    print("✅ Feature engineering pipeline created")
    print(f"📊 Input features: {len(feature_cols)} columns")
    print(f"🎯 Feature types: Demographics, Loan History, Financial Behavior")
    
    return ml_features, preprocessing_pipeline, spark
 
# Run feature engineering
# features_df, pipeline, spark = lakehouse_feature_engineering()

Credit Scoring Models

Credit Scoring Model Development

def develop_credit_scoring_model():
    """
    Phát triển mô hình Credit Scoring cho Lakehouse
    """
    # Sử dụng features từ function trước
    features_df, preprocessing_pipeline, spark = lakehouse_feature_engineering()
    
    print("💳 Credit Scoring Model Development")
    print("="*50)
    
    # Tạo target variable (credit_risk_label)
    # Dựa trên default rate và credit score
    labeled_data = features_df \
        .withColumn("credit_risk_label",
            when((col("default_rate") > 0.2) | (col("credit_score") < 600), 1.0)
            .otherwise(0.0)
        ) \
        .filter(col("total_loans") > 0)  # Chỉ lấy customers có history
    
    print(f"📊 Training dataset size: {labeled_data.count()} customers")
    
    # Check class distribution
    class_distribution = labeled_data.groupBy("credit_risk_label").count()
    print("📈 Class distribution:")
    class_distribution.show()
    
    # Split data
    train_data, test_data = labeled_data.randomSplit([0.8, 0.2], seed=42)
    
    print(f"🎯 Training set: {train_data.count()} samples")
    print(f"🧪 Test set: {test_data.count()} samples")
    
    # Fit preprocessing pipeline
    preprocessing_model = preprocessing_pipeline.fit(train_data)
    train_processed = preprocessing_model.transform(train_data)
    test_processed = preprocessing_model.transform(test_data)
    
    # Model training với multiple algorithms
    models = {}
    
    # 1. Logistic Regression
    lr = LogisticRegression(
        featuresCol="scaled_features",
        labelCol="credit_risk_label",
        maxIter=100,
        regParam=0.01
    )
    
    # 2. Random Forest
    rf = RandomForestClassifier(
        featuresCol="scaled_features",
        labelCol="credit_risk_label",
        numTrees=100,
        maxDepth=10,
        seed=42
    )
    
    # 3. Gradient Boosted Trees
    gbt = GBTClassifier(
        featuresCol="scaled_features",
        labelCol="credit_risk_label",
        maxIter=50,
        maxDepth=8,
        seed=42
    )
    
    # Train models
    print("\n🚀 Training models...")
    
    models["Logistic_Regression"] = lr.fit(train_processed)
    print("✅ Logistic Regression trained")
    
    models["Random_Forest"] = rf.fit(train_processed)
    print("✅ Random Forest trained")
    
    models["Gradient_Boosting"] = gbt.fit(train_processed)
    print("✅ Gradient Boosting trained")
    
    # Model evaluation
    evaluator_binary = BinaryClassificationEvaluator(
        labelCol="credit_risk_label",
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )
    
    evaluator_multiclass = MulticlassClassificationEvaluator(
        labelCol="credit_risk_label",
        predictionCol="prediction",
        metricName="accuracy"
    )
    
    print("\n📊 Model Performance Evaluation:")
    print("-" * 60)
    print(f"{'Model':<20} {'Accuracy':<10} {'AUC-ROC':<10} {'Precision':<10} {'Recall':<10}")
    print("-" * 60)
    
    best_model = None
    best_auc = 0
    
    for name, model in models.items():
        # Predictions
        predictions = model.transform(test_processed)
        
        # Metrics
        accuracy = evaluator_multiclass.evaluate(predictions)
        auc_roc = evaluator_binary.evaluate(predictions)
        
        # Precision and Recall
        tp = predictions.filter((col("prediction") == 1.0) & (col("credit_risk_label") == 1.0)).count()
        fp = predictions.filter((col("prediction") == 1.0) & (col("credit_risk_label") == 0.0)).count()
        fn = predictions.filter((col("prediction") == 0.0) & (col("credit_risk_label") == 1.0)).count()
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        
        print(f"{name:<20} {accuracy:<10.4f} {auc_roc:<10.4f} {precision:<10.4f} {recall:<10.4f}")
        
        # Track best model
        if auc_roc > best_auc:
            best_auc = auc_roc
            best_model = (name, model)
    
    print("-" * 60)
    print(f"🏆 Best model: {best_model[0]} (AUC-ROC: {best_auc:.4f})")
    
    # Feature importance (for Random Forest)
    if "Random_Forest" in models:
        rf_model = models["Random_Forest"]
        feature_importance = rf_model.featureImportances.toArray()
        
        # Map importance to feature names
        feature_cols = [
            "age", "account_age_days", "has_email", "has_phone", "kyc_approved",
            "credit_score", "total_loans", "total_borrowed", "avg_loan_amount",
            "product_diversity", "approval_rate", "default_rate", "risk_indicators"
        ]
        
        importance_df = spark.createDataFrame(
            [(feature, float(importance)) for feature, importance in zip(feature_cols, feature_importance)],
            ["feature", "importance"]
        ).orderBy(col("importance").desc())
        
        print("\n🎯 Top 10 Feature Importance (Random Forest):")
        importance_df.show(10)
    
    # Save best model
    model_path = "/tmp/lakehouse-credit-scoring-model"
    best_model[1].write().overwrite().save(model_path)
    print(f"💾 Best model saved to: {model_path}")
    
    # Create scoring function
    def score_customer(customer_features):
        """Score a single customer for credit risk"""
        processed_features = preprocessing_model.transform(customer_features)
        prediction = best_model[1].transform(processed_features)
        return prediction.select("customer_id", "prediction", "probability").collect()[0]
    
    return best_model[1], preprocessing_model, score_customer, spark
 
# Run credit scoring model development
# model, preprocessor, scoring_func, spark = develop_credit_scoring_model()

Real-time Credit Scoring

def realtime_credit_scoring():
    """
    Real-time credit scoring cho loan applications
    """
    # Load trained model
    model_path = "/tmp/lakehouse-credit-scoring-model"
    
    spark = SparkSession.builder \
        .appName("Lakehouse-Realtime-CreditScoring") \
        .master("spark://spark-master:7077") \
        .getOrCreate()
    
    # Load model (trong thực tế sẽ từ model registry)
    from pyspark.ml.classification import RandomForestClassificationModel
    # credit_model = RandomForestClassificationModel.load(model_path)
    
    print("⚡ Real-time Credit Scoring System")
    print("="*50)
    
    # Kafka stream cho loan applications
    loan_applications = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("subscribe", "lakehouse-loan-applications") \
        .option("startingOffsets", "latest") \
        .load()
    
    # Parse application data
    application_schema = StructType([
        StructField("application_id", StringType(), True),
        StructField("customer_id", StringType(), True),
        StructField("requested_amount", DoubleType(), True),
        StructField("loan_term", IntegerType(), True),
        StructField("product_type", StringType(), True),
        StructField("declared_income", DoubleType(), True),
        StructField("application_timestamp", TimestampType(), True)
    ])
    
    parsed_applications = loan_applications.select(
        from_json(col("value").cast("string"), application_schema).alias("application")
    ).select("application.*")
    
    # Enrich với customer data trong real-time
    def enrich_and_score(batch_df, batch_id):
        """
        Function để enrich và score từng batch
        """
        if batch_df.count() == 0:
            return
        
        print(f"Processing batch {batch_id} with {batch_df.count()} applications")
        
        # Load customer data cho batch
        customer_ids = [row.customer_id for row in batch_df.select("customer_id").distinct().collect()]
        
        if not customer_ids:
            return
        
        # Batch lookup customer data
        customer_data = spark.read \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
            .option("dbtable", f"""
                (SELECT * FROM core.customers 
                 WHERE customer_id IN ({','.join([f"'{id}'" for id in customer_ids])})) as customer_subset
            """) \
            .option("user", "lakehouse_admin") \
            .option("password", "CHANGE_ME_PASSWORD") \
            .load()
        
        # Join application với customer data
        enriched_batch = batch_df.join(customer_data, "customer_id")
        
        # Feature engineering (simplified version)
        features_batch = enriched_batch \
            .withColumn("age", floor(datediff(current_date(), col("date_of_birth")) / 365.25)) \
            .withColumn("amount_to_income_ratio", col("requested_amount") / col("declared_income")) \
            .withColumn("kyc_approved", (col("kyc_status") == "APPROVED").cast("int"))
        
        # Score applications (simplified scoring logic)
        scored_batch = features_batch \
            .withColumn("credit_score_normalized", col("credit_score") / 850.0) \
            .withColumn("risk_score",
                (1.0 - col("credit_score_normalized")) * 0.4 +
                when(col("amount_to_income_ratio") > 5, 0.3).otherwise(0.0) +
                when(col("kyc_approved") == 0, 0.2).otherwise(0.0) +
                when(col("age") < 25, 0.1).otherwise(0.0)
            ) \
            .withColumn("credit_decision",
                when(col("risk_score") <= 0.3, "APPROVED")
                .when(col("risk_score") <= 0.6, "MANUAL_REVIEW")
                .otherwise("REJECTED")
            ) \
            .withColumn("scoring_timestamp", current_timestamp())
        
        # Save results
        decision_output = scored_batch.select(
            "application_id", "customer_id", "requested_amount",
            "credit_score", "risk_score", "credit_decision", "scoring_timestamp"
        )
        
        # Write to decision table
        decision_output.write \
            .format("jdbc") \
            .option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
            .option("dbtable", "lakehouse_decisions.credit_decisions") \
            .option("user", "lakehouse_admin") \
            .option("password", "CHANGE_ME_PASSWORD") \
            .mode("append") \
            .save()
        
        # Send decisions to Kafka
        decision_kafka = decision_output.select(
            col("application_id").alias("key"),
            to_json(struct("*")).alias("value")
        )
        
        decision_kafka.write \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "kafka:9092") \
            .option("topic", "lakehouse-credit-decisions") \
            .save()
        
        print(f"✅ Batch {batch_id} processed and saved")
    
    # Start streaming query
    query = parsed_applications.writeStream \
        .foreachBatch(enrich_and_score) \
        .outputMode("append") \
        .option("checkpointLocation", "/tmp/lakehouse-credit-scoring-checkpoint") \
        .trigger(processingTime="10 seconds") \
        .start()
    
    return query
 
# Start real-time credit scoring
# scoring_query = realtime_credit_scoring()

Tổng kết Phần 2

✅ Kiến thức đã học

  1. Structured Streaming

    • Real-time stream processing với Kafka integration
    • Windowing operations cho time-based analytics
    • Stream enrichment và complex event processing
  2. Real-time Analytics cho Lakehouse

    • Customer activity monitoring
    • Fraud detection system
    • Risk assessment automation
  3. MLlib Machine Learning

    • Feature engineering framework
    • Credit scoring model development
    • Real-time model inference

🎯 Chuẩn bị cho Phần 3

Phần 3 sẽ tập trung vào:

  • Performance Tuning và optimization
  • Production Deployment strategies
  • Monitoring & Troubleshooting
  • Advanced Integration patterns

📝 Thực hành

  1. Triển khai streaming pipelines với Kafka
  2. Xây dựng ML models cho credit scoring
  3. Testing real-time inference
  4. Monitor stream processing performance

Phần tiếp theo: tối ưu hiệu năng và vận hành Structured Streaming trong production. 🚀

LDK

Le Duy Khuong

AI Transformation & Digital Strategy. Writing about agentic systems, engineering leadership, and building in public.