Lê Duy Khương (Daniel)

Năng suất & công cụ dev

Apache Spark — Hướng dẫn đào tạo (Phần 1)

Spark trong Lakehouse: RDD, DataFrame, Spark SQL.

2026-03-1726 phút đọcVI

Spark Fundamentals & Lakehouse Integration

Đối tượng: Data Engineers, Analytics Engineers, Data Scientists Thời lượng đào tạo: 2-3 giờ (Phần 1) Cập nhật lần cuối: 26/06/2025 Phiên bản: 1.0.0


Mục lục

  1. Tổng quan Apache Spark
  2. Kiến trúc Spark trong Lakehouse
  3. Cài đặt và Cấu hình
  4. Spark Core & RDD Fundamentals
  5. Spark SQL & DataFrames
  6. Hands-on Labs - Cơ bản

Tổng quan Apache Spark

Apache Spark là gì trong ngữ cảnh Lakehouse?

Apache Spark đóng vai trò unified analytics engine trong Lakehouse Platform, cung cấp khả năng:

  • Xử lý dữ liệu quy mô lớn (Big Data Processing): Phân tích hàng triệu giao dịch khách hàng
  • Batch Processing: Xử lý báo cáo hàng ngày, tháng về tình hình kinh doanh
  • Stream Processing: Phân tích real-time các sự kiện giao dịch, đăng nhập khách hàng
  • Machine Learning: Xây dựng mô hình credit scoring, fraud detection
  • ETL/ELT Operations: Transform dữ liệu từ nguồn raw thành dữ liệu phân tích

Tại sao Lakehouse chọn Apache Spark?

Loading diagram...

vs MapReduce

100x In-memory computing

Faster than traditional batch processing

Customer Records

1M+

Scalable financial data processing

Languages Supported

4

Python, SQL, Scala, Java

Spark trong Lakehouse Data Ecosystem

Loading diagram...

Kiến trúc Spark trong Lakehouse

Cấu hình Cluster Architecture

# Cấu hình Spark trong docker-compose.yml của Lakehouse
version: '3.8'
services:
  spark-master:
    image: bitnami/spark:3.5.0
    container_name: spark-master
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_MASTER_HOST=spark-master
      - SPARK_MASTER_PORT_NUMBER=7077
      - SPARK_MASTER_WEBUI_PORT=8080
    ports:
      - "8080:8080"  # Spark Master Web UI
      - "7077:7077"  # Spark Master Port
    volumes:
      - ./data/spark:/opt/bitnami/spark/data
      - ./platform/spark/conf:/opt/bitnami/spark/conf
    networks:
      - lakehouse
 
  spark-worker-1:
    image: bitnami/spark:3.5.0
    container_name: spark-worker-1
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=2g
      - SPARK_WORKER_CORES=2
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    volumes:
      - ./data/spark:/opt/bitnami/spark/data
    depends_on:
      - spark-master
    networks:
      - lakehouse
 
  spark-worker-2:
    image: bitnami/spark:3.5.0
    container_name: spark-worker-2
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=2g
      - SPARK_WORKER_CORES=2
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    volumes:
      - ./data/spark:/opt/bitnami/spark/data
    depends_on:
      - spark-master
    networks:
      - lakehouse
 
  jupyter-spark:
    image: jupyter/pyspark-notebook:latest
    container_name: jupyter-spark
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - JUPYTER_ENABLE_LAB=yes
    ports:
      - "8888:8888"  # Jupyter Lab
    volumes:
      - ./notebooks:/home/jovyan/work
      - ./data:/home/jovyan/data
    depends_on:
      - spark-master
    networks:
      - lakehouse
 
networks:
  lakehouse:
    external: true

Spark Configuration cho Lakehouse Workload

# platform/spark/conf/spark-defaults.conf
# Cấu hình tối ưu cho Lakehouse financial data processing
 
# Memory Management
spark.executor.memory              2g
spark.executor.cores               2
spark.executor.instances           4
spark.driver.memory                1g
spark.driver.cores                 1
 
# Performance Tuning
spark.sql.adaptive.enabled                true
spark.sql.adaptive.coalescePartitions.enabled  true
spark.sql.adaptive.skewJoin.enabled       true
spark.serializer                   org.apache.spark.serializer.KryoSerializer
 
# Delta Lake Integration
spark.sql.extensions               io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog    org.apache.spark.sql.delta.catalog.DeltaCatalog
 
# S3/MinIO Configuration
spark.hadoop.fs.s3a.endpoint              http://minio:9000
spark.hadoop.fs.s3a.access.key            lakehouse_admin
spark.hadoop.fs.s3a.secret.key            CHANGE_ME_PASSWORD
spark.hadoop.fs.s3a.path.style.access     true
spark.hadoop.fs.s3a.impl                  org.apache.hadoop.fs.s3a.S3AFileSystem
 
# Hive Metastore Integration
spark.sql.warehouse.dir            s3a://lakehouse/warehouse
spark.sql.catalogImplementation    hive
spark.hadoop.hive.metastore.uris   thrift://hive-metastore:9083
 
# Logging
spark.eventLog.enabled             true
spark.eventLog.dir                 s3a://lakehouse/spark-logs
spark.history.fs.logDirectory      s3a://lakehouse/spark-logs
 
# Security
spark.authenticate                 false
spark.network.crypto.enabled       false

Cài đặt và Cấu hình

Kiểm tra và khởi động Spark Cluster

# Kiểm tra các container Spark
docker ps | grep spark
 
# Khởi động Spark cluster nếu chưa chạy
docker-compose up -d spark-master spark-worker-1 spark-worker-2
 
# Kiểm tra Spark Master Web UI
curl http://localhost:8080
 
# Khởi động Jupyter với PySpark
docker-compose up -d jupyter-spark
 
echo "Spark Cluster URLs:"
echo "- Master Web UI: http://localhost:8080"
echo "- Jupyter Lab: http://localhost:8888"

Cài đặt Python Dependencies cho PySpark

# Tạo requirements file cho PySpark environment
cat > requirements-spark.txt << 'EOF'
pyspark==3.5.0
delta-spark==3.0.0
py4j==0.10.9.7
pandas==2.0.3
numpy==1.24.3
matplotlib==3.7.1
seaborn==0.12.2
plotly==5.15.0
jupyter==1.0.0
findspark==2.0.1
boto3==1.28.25
psycopg2-binary==2.9.7
kafka-python==2.0.2
EOF
 
# Cài đặt dependencies trong Jupyter container
docker exec jupyter-spark pip install -r /home/jovyan/work/requirements-spark.txt

Tạo Spark Session Template cho Lakehouse

# scripts/spark_session_template.py
"""
Template khởi tạo Spark Session cho Lakehouse
Bao gồm tất cả cấu hình cần thiết để kết nối với các services
"""
 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
 
def create_lakehouse_spark_session(app_name="Lakehouse-Analytics"):
    """
    Tạo Spark Session với cấu hình tối ưu cho Lakehouse
    """
    
    # Cấu hình Spark
    spark = SparkSession.builder \
        .appName(app_name) \
        .master("spark://spark-master:7077") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
        .config("spark.hadoop.fs.s3a.access.key", "lakehouse_admin") \
        .config("spark.hadoop.fs.s3a.secret.key", "CHANGE_ME_PASSWORD") \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .getOrCreate()
    
    # Set log level để giảm noise
    spark.sparkContext.setLogLevel("WARN")
    
    print(f"✅ Spark Session đã được tạo: {app_name}")
    print(f"🔗 Spark Master: {spark.conf.get('spark.master')}")
    print(f"📊 Available cores: {spark.sparkContext.defaultParallelism}")
    
    return spark
 
def get_lakehouse_database_config():
    """
    Cấu hình kết nối database cho Lakehouse
    """
    return {
        "postgresql": {
            "url": "jdbc:postgresql://postgresql:5432/lakehouse_db",
            "driver": "org.postgresql.Driver",
            "user": "lakehouse_admin",
            "password": "CHANGE_ME_PASSWORD"
        },
        "minio": {
            "endpoint": "s3a://processed-data/",
            "raw_data_path": "s3a://raw-data/",
            "backup_path": "s3a://backups/"
        }
    }
 
# Test function
def test_spark_setup():
    """
    Kiểm tra kết nối Spark và các components
    """
    spark = create_lakehouse_spark_session("Lakehouse-Setup-Test")
    
    try:
        # Test 1: Tạo DataFrame đơn giản
        data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
        columns = ["name", "age"]
        df = spark.createDataFrame(data, columns)
        
        print("Test 1 - DataFrame creation: ✅")
        df.show()
        
        # Test 2: Kiểm tra kết nối MinIO
        try:
            # Tạo test data và ghi vào MinIO
            test_path = "s3a://temp/spark-test/test_data.parquet"
            df.write.mode("overwrite").parquet(test_path)
            
            # Đọc lại từ MinIO
            df_read = spark.read.parquet(test_path)
            print("Test 2 - MinIO integration: ✅")
            
        except Exception as e:
            print(f"Test 2 - MinIO integration: ❌ {str(e)}")
        
        # Test 3: Kiểm tra SQL capabilities
        df.createOrReplaceTempView("test_table")
        result = spark.sql("SELECT name, age * 2 as double_age FROM test_table WHERE age > 25")
        print("Test 3 - SQL execution: ✅")
        result.show()
        
        spark.stop()
        print("🎉 Tất cả tests đã pass!")
        
    except Exception as e:
        print(f"❌ Error during testing: {str(e)}")
        spark.stop()
 
if __name__ == "__main__":
    test_spark_setup()

Spark Core & RDD Fundamentals

Hiểu về RDD (Resilient Distributed Datasets)

RDD là abstraction cơ bản nhất trong Spark, đại diện cho một collection dữ liệu được phân tán trên cluster.

Đặc điểm của RDD

# Ví dụ minh họa RDD characteristics
from pyspark import SparkContext
 
def demonstrate_rdd_characteristics():
    """
    Minh họa các đặc điểm chính của RDD
    """
    spark = create_lakehouse_spark_session("RDD-Demo")
    sc = spark.sparkContext
    
    # 1. Immutable - Không thể thay đổi
    original_rdd = sc.parallelize([1, 2, 3, 4, 5])
    print("Original RDD:", original_rdd.collect())
    
    # Mọi transformation tạo ra RDD mới
    doubled_rdd = original_rdd.map(lambda x: x * 2)
    print("Doubled RDD:", doubled_rdd.collect())
    print("Original RDD vẫn không đổi:", original_rdd.collect())
    
    # 2. Distributed - Phân tán trên cluster
    print(f"Number of partitions: {original_rdd.getNumPartitions()}")
    print(f"Partitions data: {original_rdd.glom().collect()}")
    
    # 3. Fault-tolerant - Tự phục hồi khi lỗi
    # RDD lưu trữ lineage để có thể rebuild khi cần
    print(f"RDD lineage: {doubled_rdd.toDebugString()}")
    
    # 4. Lazy evaluation - Chỉ thực thi khi có action
    print("Creating transformation (chưa thực thi)...")
    filtered_rdd = doubled_rdd.filter(lambda x: x > 5)
    
    print("Calling action (bây giờ mới thực thi)...")
    result = filtered_rdd.collect()
    print("Filtered result:", result)
    
    spark.stop()

RDD Operations: Transformations vs Actions

def demonstrate_transformations_actions():
    """
    Phân biệt Transformations (lazy) và Actions (eager)
    """
    spark = create_lakehouse_spark_session("Transformations-Actions")
    sc = spark.sparkContext
    
    # Sample Lakehouse transaction data
    transactions = [
        ("TRANS001", "CUST001", 1000000, "LOAN_PAYMENT"),
        ("TRANS002", "CUST002", 500000, "DEPOSIT"),
        ("TRANS003", "CUST001", 2000000, "LOAN_DISBURSEMENT"),
        ("TRANS004", "CUST003", 750000, "LOAN_PAYMENT"),
        ("TRANS005", "CUST002", 300000, "WITHDRAWAL")
    ]
    
    rdd = sc.parallelize(transactions)
    print("📊 Original transaction data:")
    print(rdd.collect())
    
    # TRANSFORMATIONS (Lazy - không thực thi ngay)
    print("\n🔄 Applying transformations (lazy)...")
    
    # Map: Transform mỗi record
    amount_rdd = rdd.map(lambda x: (x[1], x[2]))  # (customer_id, amount)
    print("✓ Map transformation applied")
    
    # Filter: Lọc dữ liệu
    large_amounts = amount_rdd.filter(lambda x: x[1] > 600000)
    print("✓ Filter transformation applied")
    
    # ReduceByKey: Aggregate theo key
    customer_totals = large_amounts.reduceByKey(lambda a, b: a + b)
    print("✓ ReduceByKey transformation applied")
    
    print("⚠️  Chưa có computation nào được thực hiện!")
    
    # ACTIONS (Eager - thực thi ngay)
    print("\n⚡ Calling actions (eager execution)...")
    
    # Collect: Lấy tất cả dữ liệu về driver
    results = customer_totals.collect()
    print(f"Customer totals (large transactions): {results}")
    
    # Count: Đếm số records
    count = customer_totals.count()
    print(f"Number of customers with large transactions: {count}")
    
    # Take: Lấy n records đầu
    sample = customer_totals.take(2)
    print(f"Sample results: {sample}")
    
    # First: Lấy record đầu tiên
    first_result = customer_totals.first()
    print(f"First result: {first_result}")
    
    spark.stop()
 
# Chạy demo
demonstrate_transformations_actions()

RDD Partitioning cho Performance

def demonstrate_partitioning():
    """
    Minh họa tầm quan trọng của partitioning trong Spark
    """
    spark = create_lakehouse_spark_session("Partitioning-Demo")
    sc = spark.sparkContext
    
    # Tạo dữ liệu lớn hơn
    large_transactions = []
    customers = [f"CUST{i:04d}" for i in range(1, 1001)]  # 1000 customers
    
    import random
    for i in range(10000):  # 10K transactions
        customer = random.choice(customers)
        amount = random.randint(100000, 5000000)
        transaction_type = random.choice(["LOAN_PAYMENT", "DEPOSIT", "WITHDRAWAL"])
        large_transactions.append((f"TRANS{i:06d}", customer, amount, transaction_type))
    
    # Tạo RDD với partition mặc định
    rdd_default = sc.parallelize(large_transactions)
    print(f"Default partitions: {rdd_default.getNumPartitions()}")
    
    # Tạo RDD với số partition tùy chỉnh
    rdd_custom = sc.parallelize(large_transactions, numSlices=8)
    print(f"Custom partitions: {rdd_custom.getNumPartitions()}")
    
    # Partition theo key (customer_id) để tối ưu join/groupBy
    customer_amounts = rdd_custom.map(lambda x: (x[1], x[2]))  # (customer_id, amount)
    
    # Hash partitioning - phân phối đều theo hash của key
    hash_partitioned = customer_amounts.partitionBy(4)
    print(f"Hash partitioned: {hash_partitioned.getNumPartitions()}")
    
    # So sánh performance
    import time
    
    print("\n⏱️  Performance comparison:")
    
    # Test 1: GroupBy without partitioning
    start_time = time.time()
    result1 = customer_amounts.groupByKey().mapValues(sum).count()
    time1 = time.time() - start_time
    print(f"GroupBy without partitioning: {time1:.2f}s")
    
    # Test 2: GroupBy with partitioning
    start_time = time.time()
    result2 = hash_partitioned.groupByKey().mapValues(sum).count()
    time2 = time.time() - start_time
    print(f"GroupBy with partitioning: {time2:.2f}s")
    
    improvement = ((time1 - time2) / time1) * 100
    print(f"Performance improvement: {improvement:.1f}%")
    
    spark.stop()

Spark SQL & DataFrames

DataFrame API - High-level abstraction

DataFrame cung cấp structured data abstraction với schema, tối ưu hơn RDD cho hầu hết use cases.

def demonstrate_dataframes():
    """
    Minh họa DataFrame API cho Lakehouse financial data
    """
    spark = create_lakehouse_spark_session("DataFrame-Demo")
    
    # Tạo sample Lakehouse data với schema
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
    from datetime import datetime, timedelta
    import random
    
    # Define schema cho customer data
    customer_schema = StructType([
        StructField("customer_id", StringType(), False),
        StructField("customer_code", StringType(), False),
        StructField("full_name", StringType(), False),
        StructField("phone_number", StringType(), True),
        StructField("email", StringType(), True),
        StructField("credit_score", IntegerType(), True),
        StructField("registration_date", TimestampType(), False)
    ])
    
    # Sample customer data
    customers_data = [
        ("1", "CUST001", "Nguyen Van An", "0901234567", "an.nguyen@email.com", 750, datetime.now() - timedelta(days=365)),
        ("2", "CUST002", "Tran Thi Binh", "0912345678", "binh.tran@email.com", 680, datetime.now() - timedelta(days=300)),
        ("3", "CUST003", "Le Van Cuong", "0923456789", "cuong.le@email.com", 720, datetime.now() - timedelta(days=200)),
        ("4", "CUST004", "Pham Thi Dung", "0934567890", "dung.pham@email.com", 800, datetime.now() - timedelta(days=150)),
        ("5", "CUST005", "Hoang Van Duc", "0945678901", "duc.hoang@email.com", 650, datetime.now() - timedelta(days=100))
    ]
    
    customers_df = spark.createDataFrame(customers_data, customer_schema)
    
    print("📊 Customer DataFrame:")
    customers_df.show()
    customers_df.printSchema()
    
    # Basic DataFrame operations
    print("\n🔍 DataFrame operations:")
    
    # Select specific columns
    print("Select columns:")
    customers_df.select("customer_code", "full_name", "credit_score").show()
    
    # Filter data
    print("High credit score customers:")
    high_credit = customers_df.filter(col("credit_score") > 700)
    high_credit.show()
    
    # Add computed columns
    print("Add computed columns:")
    customers_enhanced = customers_df.withColumn(
        "credit_category",
        when(col("credit_score") >= 750, "EXCELLENT")
        .when(col("credit_score") >= 700, "GOOD")
        .when(col("credit_score") >= 650, "FAIR")
        .otherwise("POOR")
    ).withColumn(
        "days_since_registration",
        datediff(current_date(), col("registration_date"))
    )
    
    customers_enhanced.select(
        "customer_code", "credit_score", "credit_category", 
        "days_since_registration"
    ).show()
    
    # Aggregations
    print("Credit score statistics:")
    customers_df.agg(
        avg("credit_score").alias("avg_credit_score"),
        min("credit_score").alias("min_credit_score"),
        max("credit_score").alias("max_credit_score"),
        count("customer_id").alias("total_customers")
    ).show()
    
    # Group by operations
    print("Customers by credit category:")
    customers_enhanced.groupBy("credit_category") \
        .agg(count("customer_id").alias("customer_count"),
             avg("credit_score").alias("avg_score")) \
        .orderBy("avg_score", ascending=False) \
        .show()
    
    spark.stop()

Spark SQL - Familiar SQL Interface

def demonstrate_spark_sql():
    """
    Minh họa Spark SQL để phân tích dữ liệu Lakehouse
    """
    spark = create_lakehouse_spark_session("SparkSQL-Demo")
    
    # Tạo bảng customers (temporary view)
    customers_data = [
        ("CUST001", "Nguyen Van An", 750, "Ho Chi Minh", "Individual"),
        ("CUST002", "Tran Thi Binh", 680, "Ha Noi", "Individual"),
        ("CUST003", "Le Van Cuong", 720, "Da Nang", "Individual"),
        ("CUST004", "ABC Company Ltd", 800, "Ho Chi Minh", "Business"),
        ("CUST005", "XYZ Trading Co", 650, "Ha Noi", "Business")
    ]
    
    customers_df = spark.createDataFrame(
        customers_data, 
        ["customer_code", "name", "credit_score", "city", "customer_type"]
    )
    customers_df.createOrReplaceTempView("customers")
    
    # Tạo bảng loans
    loans_data = [
        ("LOAN001", "CUST001", 50000000, 0.12, "PERSONAL", "APPROVED"),
        ("LOAN002", "CUST002", 30000000, 0.13, "PERSONAL", "APPROVED"),
        ("LOAN003", "CUST003", 75000000, 0.11, "PERSONAL", "DISBURSED"),
        ("LOAN004", "CUST004", 200000000, 0.10, "BUSINESS", "APPROVED"),
        ("LOAN005", "CUST005", 150000000, 0.11, "BUSINESS", "DISBURSED"),
        ("LOAN006", "CUST001", 25000000, 0.13, "PERSONAL", "PENDING")
    ]
    
    loans_df = spark.createDataFrame(
        loans_data,
        ["loan_number", "customer_code", "amount", "interest_rate", "loan_type", "status"]
    )
    loans_df.createOrReplaceTempView("loans")
    
    print("📋 Available tables:")
    spark.sql("SHOW TABLES").show()
    
    # SQL Analytics cho Lakehouse business
    print("\n📊 Lakehouse Business Analytics với SQL:")
    
    # 1. Customer portfolio analysis
    print("1. Customer Portfolio Analysis:")
    portfolio_sql = """
    SELECT 
        c.customer_code,
        c.name,
        c.credit_score,
        c.city,
        COUNT(l.loan_number) as total_loans,
        SUM(l.amount) as total_borrowed,
        AVG(l.interest_rate) as avg_interest_rate,
        MAX(CASE WHEN l.status = 'DISBURSED' THEN l.amount ELSE 0 END) as max_disbursed_amount
    FROM customers c
    LEFT JOIN loans l ON c.customer_code = l.customer_code
    GROUP BY c.customer_code, c.name, c.credit_score, c.city
    ORDER BY total_borrowed DESC NULLS LAST
    """
    spark.sql(portfolio_sql).show()
    
    # 2. Risk assessment by city
    print("2. Risk Assessment by City:")
    risk_sql = """
    SELECT 
        c.city,
        COUNT(DISTINCT c.customer_code) as total_customers,
        AVG(c.credit_score) as avg_credit_score,
        SUM(l.amount) as total_loan_amount,
        AVG(l.interest_rate) as avg_interest_rate,
        COUNT(CASE WHEN l.status = 'APPROVED' THEN 1 END) as approved_loans,
        COUNT(CASE WHEN l.status = 'DISBURSED' THEN 1 END) as disbursed_loans
    FROM customers c
    LEFT JOIN loans l ON c.customer_code = l.customer_code
    GROUP BY c.city
    ORDER BY total_loan_amount DESC
    """
    spark.sql(risk_sql).show()
    
    # 3. Loan type performance
    print("3. Loan Type Performance:")
    performance_sql = """
    SELECT 
        l.loan_type,
        COUNT(*) as loan_count,
        SUM(l.amount) as total_amount,
        AVG(l.amount) as avg_amount,
        AVG(l.interest_rate) as avg_interest_rate,
        COUNT(CASE WHEN l.status = 'APPROVED' THEN 1 END) * 100.0 / COUNT(*) as approval_rate,
        COUNT(CASE WHEN l.status = 'DISBURSED' THEN 1 END) * 100.0 / COUNT(*) as disbursement_rate
    FROM loans l
    GROUP BY l.loan_type
    ORDER BY total_amount DESC
    """
    spark.sql(performance_sql).show()
    
    # 4. Advanced window functions
    print("4. Customer Ranking by Credit Score (Window Functions):")
    ranking_sql = """
    SELECT 
        customer_code,
        name,
        city,
        credit_score,
        ROW_NUMBER() OVER (PARTITION BY city ORDER BY credit_score DESC) as city_rank,
        RANK() OVER (ORDER BY credit_score DESC) as overall_rank,
        NTILE(3) OVER (ORDER BY credit_score DESC) as credit_tier
    FROM customers
    ORDER BY credit_score DESC
    """
    spark.sql(ranking_sql).show()
    
    spark.stop()

Hands-on Labs - Cơ bản

Lab 1: Thiết lập Spark Environment

#!/bin/bash
# lab1_setup_spark.sh
 
echo "=== Lab 1: Thiết lập Spark Environment ==="
 
# Bước 1: Kiểm tra Docker containers
echo "🔍 Kiểm tra Spark containers..."
docker ps | grep spark
 
# Bước 2: Khởi động Spark cluster nếu cần
echo "🚀 Khởi động Spark cluster..."
docker-compose up -d spark-master spark-worker-1 spark-worker-2
 
# Đợi containers khởi động
sleep 30
 
# Bước 3: Kiểm tra Spark Master UI
echo "🌐 Kiểm tra Spark Master UI..."
curl -s http://localhost:8080 > /dev/null && echo "✅ Spark Master UI accessible" || echo "❌ Cannot access Spark Master UI"
 
# Bước 4: Khởi động Jupyter
echo "📓 Khởi động Jupyter Notebook..."
docker-compose up -d jupyter-spark
 
# Đợi Jupyter khởi động
sleep 20
 
# Bước 5: Tạo thư mục work
echo "📁 Tạo thư mục làm việc..."
mkdir -p notebooks/labs
mkdir -p data/sample
 
echo "✅ Setup hoàn tất!"
echo "🔗 Spark Master UI: http://localhost:8080"
echo "📚 Jupyter Lab: http://localhost:8888"

Lab 2: First Spark Application

# notebooks/labs/lab2_first_spark_app.py
"""
Lab 2: Ứng dụng Spark đầu tiên cho Lakehouse
Mục tiêu: Làm quen với Spark Session, RDD, và DataFrame
"""
 
# Import thư viện cần thiết
import findspark
findspark.init()
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from datetime import datetime, timedelta
 
def lab2_first_spark_app():
    """
    Lab 2: Tạo ứng dụng Spark đầu tiên
    """
    print("🎯 Lab 2: First Spark Application")
    print("="*50)
    
    # Bước 1: Tạo Spark Session
    print("Bước 1: Tạo Spark Session")
    spark = SparkSession.builder \
        .appName("Lakehouse-Lab2-FirstApp") \
        .master("spark://spark-master:7077") \
        .config("spark.executor.memory", "1g") \
        .config("spark.executor.cores", "1") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("WARN")
    print(f"✅ Spark Session created: {spark.version}")
    print(f"🖥️  Spark UI: {spark.sparkContext.uiWebUrl}")
    
    # Bước 2: Tạo sample data
    print("\nBước 2: Tạo sample data")
    
    # Sample customer data cho Lakehouse
    customers_data = [
        ("C001", "Nguyen Van An", "an.nguyen@lakehouse.vn", 750, "HCM"),
        ("C002", "Tran Thi Binh", "binh.tran@lakehouse.vn", 680, "HN"),
        ("C003", "Le Van Cuong", "cuong.le@lakehouse.vn", 720, "DN"),
        ("C004", "Pham Thi Dung", "dung.pham@lakehouse.vn", 800, "HCM"),
        ("C005", "Hoang Van Duc", "duc.hoang@lakehouse.vn", 650, "HN")
    ]
    
    schema = StructType([
        StructField("customer_id", StringType(), False),
        StructField("name", StringType(), False),
        StructField("email", StringType(), False),
        StructField("credit_score", IntegerType(), False),
        StructField("city", StringType(), False)
    ])
    
    df = spark.createDataFrame(customers_data, schema)
    print("✅ Sample DataFrame created")
    
    # Bước 3: Basic DataFrame operations
    print("\nBước 3: Basic DataFrame Operations")
    
    # Show data
    print("📊 Customer Data:")
    df.show()
    
    # Show schema
    print("📋 Schema:")
    df.printSchema()
    
    # Basic statistics
    print("📈 Basic Statistics:")
    df.describe("credit_score").show()
    
    # Bước 4: Data filtering và transformation
    print("\nBước 4: Data Filtering & Transformation")
    
    # Filter high credit score customers
    high_credit_df = df.filter(col("credit_score") > 700)
    print("🎯 High Credit Score Customers (>700):")
    high_credit_df.show()
    
    # Add computed columns
    enhanced_df = df.withColumn(
        "credit_category",
        when(col("credit_score") >= 750, "EXCELLENT")
        .when(col("credit_score") >= 700, "GOOD")
        .when(col("credit_score") >= 650, "FAIR")
        .otherwise("POOR")
    ).withColumn(
        "email_domain",
        split(col("email"), "@").getItem(1)
    )
    
    print("🔄 Enhanced Data with Computed Columns:")
    enhanced_df.select("customer_id", "name", "credit_score", 
                      "credit_category", "email_domain").show()
    
    # Bước 5: Aggregations
    print("\nBước 5: Aggregations")
    
    # Group by city
    city_stats = enhanced_df.groupBy("city") \
        .agg(count("customer_id").alias("customer_count"),
             avg("credit_score").alias("avg_credit_score"),
             max("credit_score").alias("max_credit_score"),
             min("credit_score").alias("min_credit_score")) \
        .orderBy("avg_credit_score", ascending=False)
    
    print("🏙️  Statistics by City:")
    city_stats.show()
    
    # Credit category distribution
    category_dist = enhanced_df.groupBy("credit_category") \
        .agg(count("customer_id").alias("count"),
             avg("credit_score").alias("avg_score")) \
        .orderBy("avg_score", ascending=False)
    
    print("📊 Credit Category Distribution:")
    category_dist.show()
    
    # Bước 6: SQL interface
    print("\nBước 6: SQL Interface")
    
    # Register as temporary view
    enhanced_df.createOrReplaceTempView("customers")
    
    # SQL query
    sql_result = spark.sql("""
        SELECT 
            city,
            credit_category,
            COUNT(*) as customer_count,
            AVG(credit_score) as avg_score
        FROM customers
        GROUP BY city, credit_category
        ORDER BY city, avg_score DESC
    """)
    
    print("💾 SQL Query Result:")
    sql_result.show()
    
    # Bước 7: Save results
    print("\nBước 7: Save Results")
    
    # Save to different formats
    try:
        # Parquet format (recommended for analytics)
        output_path = "/home/jovyan/data/lab2_output"
        city_stats.coalesce(1).write.mode("overwrite").parquet(f"{output_path}/city_stats.parquet")
        print(f"✅ Data saved to {output_path}/city_stats.parquet")
        
        # CSV format
        category_dist.coalesce(1).write.mode("overwrite") \
            .option("header", "true").csv(f"{output_path}/category_dist.csv")
        print(f"✅ Data saved to {output_path}/category_dist.csv")
        
    except Exception as e:
        print(f"⚠️  Save error: {e}")
    
    # Bước 8: Performance insights
    print("\nBước 8: Performance Insights")
    print(f"🔢 DataFrame partitions: {df.rdd.getNumPartitions()}")
    print(f"📊 Records count: {df.count()}")
    
    # Clean up
    spark.stop()
    print("\n✅ Lab 2 completed successfully!")
 
# Chạy lab
if __name__ == "__main__":
    lab2_first_spark_app()

Lab 3: Working với Real Lakehouse Data

# notebooks/labs/lab3_real_lakehouse_data.py
"""
Lab 3: Làm việc với dữ liệu thực tế của Lakehouse
Mục tiêu: Kết nối database, xử lý dữ liệu business logic
"""
 
def lab3_real_lakehouse_data():
    """
    Lab 3: Xử lý dữ liệu Lakehouse thực tế
    """
    print("🎯 Lab 3: Working with Real Lakehouse Data")
    print("="*50)
    
    # Tạo Spark Session với database connectors
    spark = SparkSession.builder \
        .appName("Lakehouse-Lab3-RealData") \
        .master("spark://spark-master:7077") \
        .config("spark.jars", "/opt/bitnami/spark/jars/postgresql-42.6.0.jar") \
        .getOrCreate()
    
    print("✅ Spark Session with database support created")
    
    # Database connection properties
    db_properties = {
        "user": "lakehouse_admin",
        "password": "CHANGE_ME_PASSWORD",
        "driver": "org.postgresql.Driver"
    }
    
    db_url = "jdbc:postgresql://postgresql:5432/lakehouse_db"
    
    try:
        # Bước 1: Đọc dữ liệu từ PostgreSQL
        print("\nBước 1: Load data from PostgreSQL")
        
        # Load customers table
        customers_df = spark.read.jdbc(
            url=db_url,
            table="core.customers",
            properties=db_properties
        )
        print(f"✅ Loaded {customers_df.count()} customers")
        
        # Load loans table
        loans_df = spark.read.jdbc(
            url=db_url,
            table="core.loans", 
            properties=db_properties
        )
        print(f"✅ Loaded {loans_df.count()} loans")
        
        # Bước 2: Data exploration
        print("\nBước 2: Data Exploration")
        
        print("👥 Customers sample:")
        customers_df.select("customer_code", "full_name", "credit_score", "kyc_status").show(5)
        
        print("💰 Loans sample:")
        loans_df.select("loan_number", "customer_id", "loan_amount", "status").show(5)
        
        # Bước 3: Business analytics
        print("\nBước 3: Business Analytics")
        
        # Join customers và loans
        customer_loans = customers_df.alias("c") \
            .join(loans_df.alias("l"), 
                  customers_df.customer_id == loans_df.customer_id, 
                  "left")
        
        # Customer portfolio analysis
        portfolio_analysis = customer_loans.groupBy(
            "c.customer_code", "c.full_name", "c.credit_score", "c.kyc_status"
        ).agg(
            count("l.loan_id").alias("total_loans"),
            sum("l.loan_amount").alias("total_borrowed"),
            avg("l.loan_amount").alias("avg_loan_amount"),
            max("l.loan_amount").alias("max_loan_amount"),
            countDistinct("l.product_type").alias("product_diversity")
        ).filter(col("total_loans") > 0)
        
        print("📊 Customer Portfolio Analysis:")
        portfolio_analysis.orderBy("total_borrowed", ascending=False).show(10)
        
        # Risk assessment
        risk_analysis = portfolio_analysis.withColumn(
            "risk_category",
            when(col("credit_score") >= 750, "LOW_RISK")
            .when(col("credit_score") >= 650, "MEDIUM_RISK")
            .otherwise("HIGH_RISK")
        ).withColumn(
            "exposure_level",
            when(col("total_borrowed") >= 100000000, "HIGH_EXPOSURE")
            .when(col("total_borrowed") >= 50000000, "MEDIUM_EXPOSURE")
            .otherwise("LOW_EXPOSURE")
        )
        
        print("⚠️  Risk Assessment Summary:")
        risk_analysis.groupBy("risk_category", "exposure_level") \
            .agg(count("customer_code").alias("customer_count"),
                 sum("total_borrowed").alias("total_exposure"),
                 avg("credit_score").alias("avg_credit_score")) \
            .orderBy("risk_category", "exposure_level") \
            .show()
        
        # Bước 4: Advanced analytics
        print("\nBước 4: Advanced Analytics")
        
        # Window functions cho ranking
        from pyspark.sql.window import Window
        
        # Rank customers by total borrowed within each risk category
        window_spec = Window.partitionBy("risk_category").orderBy(col("total_borrowed").desc())
        
        ranked_customers = risk_analysis.withColumn(
            "rank_within_risk_category", 
            row_number().over(window_spec)
        ).withColumn(
            "percentile_within_category",
            percent_rank().over(window_spec)
        )
        
        print("🏆 Top customers by risk category:")
        ranked_customers.filter(col("rank_within_risk_category") <= 3) \
            .select("customer_code", "full_name", "risk_category", 
                   "total_borrowed", "rank_within_risk_category") \
            .orderBy("risk_category", "rank_within_risk_category") \
            .show()
        
        # Cohort analysis based on KYC status
        kyc_performance = customer_loans.groupBy("c.kyc_status") \
            .agg(count("c.customer_id").alias("total_customers"),
                 countDistinct(when(col("l.loan_id").isNotNull(), col("c.customer_id"))).alias("customers_with_loans"),
                 sum("l.loan_amount").alias("total_loan_amount"),
                 avg("c.credit_score").alias("avg_credit_score")) \
            .withColumn("loan_penetration_rate", 
                       col("customers_with_loans") * 100.0 / col("total_customers"))
        
        print("📈 Performance by KYC Status:")
        kyc_performance.show()
        
        # Bước 5: Save insights to data lake
        print("\nBước 5: Save Insights")
        
        # Save to MinIO/S3 (nếu cấu hình available)
        try:
            # Portfolio analysis
            portfolio_analysis.coalesce(1) \
                .write.mode("overwrite") \
                .parquet("s3a://processed-data/analytics/customer_portfolio/")
            print("✅ Portfolio analysis saved to data lake")
            
            # Risk analysis  
            risk_analysis.coalesce(1) \
                .write.mode("overwrite") \
                .parquet("s3a://processed-data/analytics/risk_assessment/")
            print("✅ Risk analysis saved to data lake")
            
        except Exception as e:
            print(f"⚠️  Could not save to data lake: {e}")
            
            # Fallback: save locally
            portfolio_analysis.coalesce(1) \
                .write.mode("overwrite") \
                .parquet("/home/jovyan/data/customer_portfolio")
            print("✅ Portfolio analysis saved locally")
        
    except Exception as e:
        print(f"❌ Error: {e}")
        print("💡 Make sure PostgreSQL is running and contains sample data")
    
    finally:
        spark.stop()
        print("\n✅ Lab 3 completed!")
 
# Chạy lab
if __name__ == "__main__":
    lab3_real_lakehouse_data()

Tổng kết Phần 1

✅ Kiến thức đã học

  1. Apache Spark Fundamentals

    • Hiểu về Spark architecture và vai trò trong Lakehouse
    • RDD concepts: immutable, distributed, fault-tolerant
    • Transformations vs Actions và lazy evaluation
  2. Spark SQL & DataFrames

# Lab 3.2: Customer Loan Applications Analysis
# =============================================
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, date
 
# Create SparkSession
spark = SparkSession.builder \
    .appName("Lakehouse_Customer_Loan_Analysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()
 
# Define schemas
customer_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("customer_code", StringType(), False),
    StructField("full_name", StringType(), False),
    StructField("phone", StringType(), True),
    StructField("email", StringType(), True),
    StructField("credit_score", IntegerType(), True),
    StructField("registration_date", DateType(), False)
])
 
loan_schema = StructType([
    StructField("loan_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("loan_amount", DoubleType(), False),
    StructField("interest_rate", DoubleType(), False),
    StructField("term_months", IntegerType(), False),
    StructField("application_date", DateType(), False),
    StructField("status", StringType(), False)
])
 
# Sample customer data
customers_data = [
    ("1", "CUST001", "Nguyen Van An", "0901234567", "an.nguyen@email.com", 750, date(2023, 1, 15)),
    ("2", "CUST002", "Tran Thi Binh", "0912345678", "binh.tran@email.com", 680, date(2023, 3, 20)),
    ("3", "CUST003", "Le Van Cuong", "0923456789", "cuong.le@email.com", 720, date(2023, 6, 10)),
    ("4", "CUST004", "Pham Thi Dung", "0934567890", "dung.pham@email.com", 800, date(2023, 8, 5)),
    ("5", "CUST005", "Hoang Van Duc", "0945678901", "duc.hoang@email.com", 650, date(2023, 9, 25))
]
 
# Sample loan data
loans_data = [
    ("L001", "1", 50000000, 0.12, 12, date(2024, 1, 15), "APPROVED"),
    ("L002", "2", 30000000, 0.15, 18, date(2024, 1, 20), "APPROVED"),
    ("L003", "3", 75000000, 0.10, 24, date(2024, 2, 1), "APPROVED"),
    ("L004", "4", 100000000, 0.08, 36, date(2024, 2, 10), "PENDING"),
    ("L005", "1", 25000000, 0.14, 6, date(2024, 2, 15), "REJECTED"),
    ("L006", "5", 40000000, 0.16, 12, date(2024, 3, 1), "APPROVED")
]
 
# Create DataFrames
customers_df = spark.createDataFrame(customers_data, customer_schema)
loans_df = spark.createDataFrame(loans_data, loan_schema)
 
print("📊 Customer và Loan DataFrames:")
print("\nCustomers:")
customers_df.show()
 
print("\nLoans:")
loans_df.show()
 
# Register as temporary views
customers_df.createOrReplaceTempView("customers")
loans_df.createOrReplaceTempView("loans")
 
# Complex SQL queries
print("\n🔍 Complex Spark SQL Queries:")
 
# 1. Customer loan summary
print("1. Customer Loan Summary:")
loan_summary = spark.sql("""
    SELECT 
        c.customer_code,
        c.full_name,
        c.credit_score,
        COUNT(l.loan_id) as total_loans,
        SUM(CASE WHEN l.status = 'APPROVED' THEN l.loan_amount ELSE 0 END) as approved_amount,
        AVG(l.interest_rate) as avg_interest_rate,
        MAX(l.application_date) as latest_application
    FROM customers c
    LEFT JOIN loans l ON c.customer_id = l.customer_id
    GROUP BY c.customer_id, c.customer_code, c.full_name, c.credit_score
    ORDER BY approved_amount DESC
""")
loan_summary.show()
 
# 2. Risk analysis by credit score
print("2. Risk Analysis by Credit Score:")
risk_analysis = spark.sql("""
    SELECT 
        CASE 
            WHEN c.credit_score >= 750 THEN 'LOW_RISK'
            WHEN c.credit_score >= 700 THEN 'MEDIUM_RISK'
            WHEN c.credit_score >= 650 THEN 'HIGH_RISK'
            ELSE 'VERY_HIGH_RISK'
        END as risk_category,
        COUNT(DISTINCT c.customer_id) as customer_count,
        COUNT(l.loan_id) as loan_applications,
        SUM(CASE WHEN l.status = 'APPROVED' THEN 1 ELSE 0 END) as approved_loans,
        ROUND(
            SUM(CASE WHEN l.status = 'APPROVED' THEN 1 ELSE 0 END) * 100.0 / COUNT(l.loan_id), 2
        ) as approval_rate_percent,
        ROUND(AVG(l.loan_amount), 0) as avg_loan_amount
    FROM customers c
    JOIN loans l ON c.customer_id = l.customer_id
    GROUP BY 
        CASE 
            WHEN c.credit_score >= 750 THEN 'LOW_RISK'
            WHEN c.credit_score >= 700 THEN 'MEDIUM_RISK'
            WHEN c.credit_score >= 650 THEN 'HIGH_RISK'
            ELSE 'VERY_HIGH_RISK'
        END
    ORDER BY approval_rate_percent DESC
""")
risk_analysis.show()
 
# 3. Monthly loan trend
print("3. Monthly Loan Trends:")
monthly_trend = spark.sql("""
    SELECT 
        YEAR(application_date) as year,
        MONTH(application_date) as month,
        COUNT(*) as applications,
        SUM(CASE WHEN status = 'APPROVED' THEN 1 ELSE 0 END) as approved,
        SUM(CASE WHEN status = 'APPROVED' THEN loan_amount ELSE 0 END) as total_approved_amount,
        AVG(loan_amount) as avg_loan_amount
    FROM loans
    GROUP BY YEAR(application_date), MONTH(application_date)
    ORDER BY year, month
""")
monthly_trend.show()
 
# DataFrame API equivalent operations
print("\n📈 DataFrame API Operations:")
 
# Window functions
from pyspark.sql.window import Window
 
# Calculate running total per customer
window_spec = Window.partitionBy("customer_id").orderBy("application_date")
 
loans_with_running_total = loans_df.withColumn(
    "running_loan_total",
    sum("loan_amount").over(window_spec)
).withColumn(
    "loan_rank",
    row_number().over(window_spec)
)
 
print("Running loan totals per customer:")
loans_with_running_total.select(
    "customer_id", "loan_id", "loan_amount", 
    "running_loan_total", "loan_rank"
).show()
 
# Advanced aggregations
print("Advanced loan statistics:")
loans_df.agg(
    count("*").alias("total_applications"),
    sum(when(col("status") == "APPROVED", 1).otherwise(0)).alias("approved_count"),
    avg("loan_amount").alias("avg_loan_amount"),
    stddev("loan_amount").alias("loan_amount_stddev"),
    percentile_approx("loan_amount", 0.5).alias("median_loan_amount"),
    max("loan_amount").alias("max_loan_amount"),
    min("loan_amount").alias("min_loan_amount")
).show()
 
spark.stop()

Ý nghĩa trong Business Context Lakehouse:

  • Customer Segmentation: Phân khúc khách hàng theo credit score
  • Risk Assessment: Đánh giá rủi ro và tỷ lệ phê duyệt
  • Trend Analysis: Phân tích xu hướng vay vốn theo thời gian
  • Portfolio Management: Quản lý danh mục cho vay
  1. Hands-on Experience
# Lab 3.3: Production-Ready Spark Application for Lakehouse
# ==================================================
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import logging
 
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
def create_spark_session(app_name: str):
    """Create optimized Spark session for Lakehouse workloads"""
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.adaptive.skewJoin.enabled", "true") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .getOrCreate()
 
def load_customer_data(spark, data_path=None):
    """Load customer data from various sources"""
    if data_path:
        # In production, load from parquet/delta files
        return spark.read.parquet(data_path)
    else:
        # Sample data for demo
        schema = StructType([
            StructField("customer_id", StringType(), False),
            StructField("customer_code", StringType(), False),
            StructField("full_name", StringType(), False),
            StructField("phone", StringType(), True),
            StructField("email", StringType(), True),
            StructField("credit_score", IntegerType(), True),
            StructField("monthly_income", DoubleType(), True),
            StructField("employment_type", StringType(), True),
            StructField("registration_date", DateType(), False),
            StructField("last_login", TimestampType(), True)
        ])
        
        data = [
            ("1", "CUST001", "Nguyen Van An", "0901234567", "an.nguyen@email.com", 
             750, 25000000, "FULL_TIME", date(2023, 1, 15), datetime.now()),
            ("2", "CUST002", "Tran Thi Binh", "0912345678", "binh.tran@email.com", 
             680, 18000000, "PART_TIME", date(2023, 3, 20), datetime.now()),
            ("3", "CUST003", "Le Van Cuong", "0923456789", "cuong.le@email.com", 
             720, 30000000, "FULL_TIME", date(2023, 6, 10), datetime.now()),
            ("4", "CUST004", "Pham Thi Dung", "0934567890", "dung.pham@email.com", 
             800, 45000000, "BUSINESS", date(2023, 8, 5), datetime.now()),
            ("5", "CUST005", "Hoang Van Duc", "0945678901", "duc.hoang@email.com", 
             650, 15000000, "PART_TIME", date(2023, 9, 25), datetime.now())
        ]
        
        return spark.createDataFrame(data, schema)
 
def calculate_loan_eligibility(customers_df):
    """Calculate loan eligibility based on Lakehouse business rules"""
    
    # Business rules for Lakehouse
    eligibility_df = customers_df.withColumn(
        "max_loan_amount",
        when(col("employment_type") == "BUSINESS", col("monthly_income") * 8)
        .when(col("employment_type") == "FULL_TIME", col("monthly_income") * 6)
        .when(col("employment_type") == "PART_TIME", col("monthly_income") * 3)
        .otherwise(0)
    ).withColumn(
        "interest_rate",
        when(col("credit_score") >= 750, 0.08)
        .when(col("credit_score") >= 700, 0.10)
        .when(col("credit_score") >= 650, 0.12)
        .otherwise(0.15)
    ).withColumn(
        "risk_category",
        when(col("credit_score") >= 750, "LOW")
        .when(col("credit_score") >= 700, "MEDIUM")
        .when(col("credit_score") >= 650, "HIGH")
        .otherwise("VERY_HIGH")
    ).withColumn(
        "loan_eligible",
        (col("credit_score") >= 600) & (col("monthly_income") >= 10000000)
    )
    
    return eligibility_df
 
def generate_loan_recommendations(eligibility_df):
    """Generate personalized loan recommendations"""
    
    recommendations = eligibility_df.filter(col("loan_eligible") == True) \
        .withColumn(
            "recommended_amount",
            col("max_loan_amount") * 0.8  # Conservative recommendation
        ).withColumn(
            "recommended_term",
            when(col("risk_category") == "LOW", 36)
            .when(col("risk_category") == "MEDIUM", 24)
            .otherwise(12)
        ).withColumn(
            "monthly_payment",
            (col("recommended_amount") * (col("interest_rate") / 12)) /
            (1 - pow(1 + (col("interest_rate") / 12), -col("recommended_term")))
        ).withColumn(
            "payment_to_income_ratio",
            col("monthly_payment") / col("monthly_income")
        )
    
    return recommendations
 
def main():
    """Main application logic"""
    logger.info("Starting Lakehouse Loan Eligibility Application")
    
    # Create Spark session
    spark = create_spark_session("Lakehouse_Loan_Eligibility_Engine")
    
    try:
        # Load customer data
        logger.info("Loading customer data...")
        customers_df = load_customer_data(spark)
        
        # Cache frequently used data
        customers_df.cache()
        
        print("📊 Customer Data Overview:")
        customers_df.show()
        
        # Calculate eligibility
        logger.info("Calculating loan eligibility...")
        eligibility_df = calculate_loan_eligibility(customers_df)
        
        print("\n💰 Loan Eligibility Analysis:")
        eligibility_df.select(
            "customer_code", "full_name", "credit_score", 
            "max_loan_amount", "interest_rate", "risk_category", "loan_eligible"
        ).show()
        
        # Generate recommendations
        logger.info("Generating loan recommendations...")
        recommendations_df = generate_loan_recommendations(eligibility_df)
        
        print("\n🎯 Loan Recommendations:")
        recommendations_df.select(
            "customer_code", "recommended_amount", "recommended_term",
            "monthly_payment", "payment_to_income_ratio"
        ).show()
        
        # Business insights
        print("\n📈 Business Analytics:")
        
        # Eligibility statistics
        print("Eligibility by Risk Category:")
        eligibility_df.groupBy("risk_category", "loan_eligible") \
            .agg(count("*").alias("customer_count"),
                 avg("max_loan_amount").alias("avg_max_loan")) \
            .orderBy("risk_category") \
            .show()
        
        # Revenue potential
        print("Revenue Potential Analysis:")
        revenue_analysis = recommendations_df.agg(
            sum("recommended_amount").alias("total_loan_portfolio"),
            avg("recommended_amount").alias("avg_loan_amount"),
            sum("monthly_payment").alias("total_monthly_revenue"),
            avg("payment_to_income_ratio").alias("avg_payment_ratio")
        )
        revenue_analysis.show()
        
        # Save results (in production, write to data lake)
        logger.info("Saving results...")
        # recommendations_df.write.mode("overwrite").parquet("/path/to/output")
        
        logger.info("Application completed successfully")
        
    except Exception as e:
        logger.error(f"Application failed: {str(e)}")
        raise
    finally:
        spark.stop()
 
if __name__ == "__main__":
    main()

Key Production Features:

  • Error Handling: Comprehensive exception handling
  • Logging: Structured logging cho monitoring
  • Configuration: Optimized Spark configs
  • Caching: Smart caching strategy
  • Business Logic: Lakehouse-specific loan rules
  • Performance: Arrow-based execution

🎯 Chuẩn bị cho Phần 2

Phần 2 sẽ tập trung vào:

  • Structured Streaming cho real-time analytics
  • MLlib cho machine learning workflows
  • Performance tuning và optimization techniques
  • Advanced integrations với Kafka, Delta Lake

📝 Bài tập về nhà

  1. Thực hành DataFrame Operations: Tạo DataFrame từ CSV file chứa dữ liệu khách hàng Lakehouse và thực hiện các transformations cơ bản.

  2. SQL Query Challenge: Viết các SQL queries phức tạp để phân tích portfolio loans, tính toán các metrics như NPL ratio, average loan amount by region.

  3. Performance Optimization: So sánh performance giữa RDD và DataFrame operations với cùng một dataset.

🔗 Resources

📞 Support

Nếu gặp vấn đề trong quá trình học tập:

  • Slack channel: #lakehouse-data-platform
  • Email: data-platform@lakehouse.vn
  • Internal wiki: confluence.lakehouse.vn/data-platform

🎉 Chúc mừng bạn đã hoàn thành Apache Spark Fundamentals! Hãy tiếp tục với Phần 2 để khám phá các tính năng advanced hơn.

  1. Chạy tất cả labs và chụp screenshots kết quả
  2. Thử nghiệm với dataset Lakehouse lớn hơn
  3. Viết SQL queries phức tạp hơn cho business analysis
  4. Explore Spark UI để hiểu execution plans

Sẵn sàng cho review và feedback trước khi chuyển sang Phần 2! 🚀

LDK

Le Duy Khuong

AI Transformation & Digital Strategy. Writing about agentic systems, engineering leadership, and building in public.