Năng suất & công cụ dev
Apache Spark — Hướng dẫn đào tạo (Phần 1)
Spark trong Lakehouse: RDD, DataFrame, Spark SQL.
2026-03-1726 phút đọcVI
Spark Fundamentals & Lakehouse Integration
Đối tượng: Data Engineers, Analytics Engineers, Data Scientists Thời lượng đào tạo: 2-3 giờ (Phần 1) Cập nhật lần cuối: 26/06/2025 Phiên bản: 1.0.0
Mục lục
- Tổng quan Apache Spark
- Kiến trúc Spark trong Lakehouse
- Cài đặt và Cấu hình
- Spark Core & RDD Fundamentals
- Spark SQL & DataFrames
- Hands-on Labs - Cơ bản
Tổng quan Apache Spark
Apache Spark là gì trong ngữ cảnh Lakehouse?
Apache Spark đóng vai trò unified analytics engine trong Lakehouse Platform, cung cấp khả năng:
- Xử lý dữ liệu quy mô lớn (Big Data Processing): Phân tích hàng triệu giao dịch khách hàng
- Batch Processing: Xử lý báo cáo hàng ngày, tháng về tình hình kinh doanh
- Stream Processing: Phân tích real-time các sự kiện giao dịch, đăng nhập khách hàng
- Machine Learning: Xây dựng mô hình credit scoring, fraud detection
- ETL/ELT Operations: Transform dữ liệu từ nguồn raw thành dữ liệu phân tích
Tại sao Lakehouse chọn Apache Spark?
vs MapReduce
Faster than traditional batch processing
Customer Records
Scalable financial data processing
Languages Supported
Python, SQL, Scala, Java
Spark trong Lakehouse Data Ecosystem
Kiến trúc Spark trong Lakehouse
Cấu hình Cluster Architecture
# Cấu hình Spark trong docker-compose.yml của Lakehouse
version: '3.8'
services:
spark-master:
image: bitnami/spark:3.5.0
container_name: spark-master
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_MASTER_HOST=spark-master
- SPARK_MASTER_PORT_NUMBER=7077
- SPARK_MASTER_WEBUI_PORT=8080
ports:
- "8080:8080" # Spark Master Web UI
- "7077:7077" # Spark Master Port
volumes:
- ./data/spark:/opt/bitnami/spark/data
- ./platform/spark/conf:/opt/bitnami/spark/conf
networks:
- lakehouse
spark-worker-1:
image: bitnami/spark:3.5.0
container_name: spark-worker-1
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2g
- SPARK_WORKER_CORES=2
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- ./data/spark:/opt/bitnami/spark/data
depends_on:
- spark-master
networks:
- lakehouse
spark-worker-2:
image: bitnami/spark:3.5.0
container_name: spark-worker-2
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2g
- SPARK_WORKER_CORES=2
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- ./data/spark:/opt/bitnami/spark/data
depends_on:
- spark-master
networks:
- lakehouse
jupyter-spark:
image: jupyter/pyspark-notebook:latest
container_name: jupyter-spark
environment:
- SPARK_MASTER=spark://spark-master:7077
- JUPYTER_ENABLE_LAB=yes
ports:
- "8888:8888" # Jupyter Lab
volumes:
- ./notebooks:/home/jovyan/work
- ./data:/home/jovyan/data
depends_on:
- spark-master
networks:
- lakehouse
networks:
lakehouse:
external: trueSpark Configuration cho Lakehouse Workload
# platform/spark/conf/spark-defaults.conf
# Cấu hình tối ưu cho Lakehouse financial data processing
# Memory Management
spark.executor.memory 2g
spark.executor.cores 2
spark.executor.instances 4
spark.driver.memory 1g
spark.driver.cores 1
# Performance Tuning
spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.skewJoin.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
# Delta Lake Integration
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
# S3/MinIO Configuration
spark.hadoop.fs.s3a.endpoint http://minio:9000
spark.hadoop.fs.s3a.access.key lakehouse_admin
spark.hadoop.fs.s3a.secret.key CHANGE_ME_PASSWORD
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
# Hive Metastore Integration
spark.sql.warehouse.dir s3a://lakehouse/warehouse
spark.sql.catalogImplementation hive
spark.hadoop.hive.metastore.uris thrift://hive-metastore:9083
# Logging
spark.eventLog.enabled true
spark.eventLog.dir s3a://lakehouse/spark-logs
spark.history.fs.logDirectory s3a://lakehouse/spark-logs
# Security
spark.authenticate false
spark.network.crypto.enabled falseCài đặt và Cấu hình
Kiểm tra và khởi động Spark Cluster
# Kiểm tra các container Spark
docker ps | grep spark
# Khởi động Spark cluster nếu chưa chạy
docker-compose up -d spark-master spark-worker-1 spark-worker-2
# Kiểm tra Spark Master Web UI
curl http://localhost:8080
# Khởi động Jupyter với PySpark
docker-compose up -d jupyter-spark
echo "Spark Cluster URLs:"
echo "- Master Web UI: http://localhost:8080"
echo "- Jupyter Lab: http://localhost:8888"Cài đặt Python Dependencies cho PySpark
# Tạo requirements file cho PySpark environment
cat > requirements-spark.txt << 'EOF'
pyspark==3.5.0
delta-spark==3.0.0
py4j==0.10.9.7
pandas==2.0.3
numpy==1.24.3
matplotlib==3.7.1
seaborn==0.12.2
plotly==5.15.0
jupyter==1.0.0
findspark==2.0.1
boto3==1.28.25
psycopg2-binary==2.9.7
kafka-python==2.0.2
EOF
# Cài đặt dependencies trong Jupyter container
docker exec jupyter-spark pip install -r /home/jovyan/work/requirements-spark.txtTạo Spark Session Template cho Lakehouse
# scripts/spark_session_template.py
"""
Template khởi tạo Spark Session cho Lakehouse
Bao gồm tất cả cấu hình cần thiết để kết nối với các services
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
def create_lakehouse_spark_session(app_name="Lakehouse-Analytics"):
"""
Tạo Spark Session với cấu hình tối ưu cho Lakehouse
"""
# Cấu hình Spark
spark = SparkSession.builder \
.appName(app_name) \
.master("spark://spark-master:7077") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
.config("spark.hadoop.fs.s3a.access.key", "lakehouse_admin") \
.config("spark.hadoop.fs.s3a.secret.key", "CHANGE_ME_PASSWORD") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
# Set log level để giảm noise
spark.sparkContext.setLogLevel("WARN")
print(f"✅ Spark Session đã được tạo: {app_name}")
print(f"🔗 Spark Master: {spark.conf.get('spark.master')}")
print(f"📊 Available cores: {spark.sparkContext.defaultParallelism}")
return spark
def get_lakehouse_database_config():
"""
Cấu hình kết nối database cho Lakehouse
"""
return {
"postgresql": {
"url": "jdbc:postgresql://postgresql:5432/lakehouse_db",
"driver": "org.postgresql.Driver",
"user": "lakehouse_admin",
"password": "CHANGE_ME_PASSWORD"
},
"minio": {
"endpoint": "s3a://processed-data/",
"raw_data_path": "s3a://raw-data/",
"backup_path": "s3a://backups/"
}
}
# Test function
def test_spark_setup():
"""
Kiểm tra kết nối Spark và các components
"""
spark = create_lakehouse_spark_session("Lakehouse-Setup-Test")
try:
# Test 1: Tạo DataFrame đơn giản
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
print("Test 1 - DataFrame creation: ✅")
df.show()
# Test 2: Kiểm tra kết nối MinIO
try:
# Tạo test data và ghi vào MinIO
test_path = "s3a://temp/spark-test/test_data.parquet"
df.write.mode("overwrite").parquet(test_path)
# Đọc lại từ MinIO
df_read = spark.read.parquet(test_path)
print("Test 2 - MinIO integration: ✅")
except Exception as e:
print(f"Test 2 - MinIO integration: ❌ {str(e)}")
# Test 3: Kiểm tra SQL capabilities
df.createOrReplaceTempView("test_table")
result = spark.sql("SELECT name, age * 2 as double_age FROM test_table WHERE age > 25")
print("Test 3 - SQL execution: ✅")
result.show()
spark.stop()
print("🎉 Tất cả tests đã pass!")
except Exception as e:
print(f"❌ Error during testing: {str(e)}")
spark.stop()
if __name__ == "__main__":
test_spark_setup()Spark Core & RDD Fundamentals
Hiểu về RDD (Resilient Distributed Datasets)
RDD là abstraction cơ bản nhất trong Spark, đại diện cho một collection dữ liệu được phân tán trên cluster.
Đặc điểm của RDD
# Ví dụ minh họa RDD characteristics
from pyspark import SparkContext
def demonstrate_rdd_characteristics():
"""
Minh họa các đặc điểm chính của RDD
"""
spark = create_lakehouse_spark_session("RDD-Demo")
sc = spark.sparkContext
# 1. Immutable - Không thể thay đổi
original_rdd = sc.parallelize([1, 2, 3, 4, 5])
print("Original RDD:", original_rdd.collect())
# Mọi transformation tạo ra RDD mới
doubled_rdd = original_rdd.map(lambda x: x * 2)
print("Doubled RDD:", doubled_rdd.collect())
print("Original RDD vẫn không đổi:", original_rdd.collect())
# 2. Distributed - Phân tán trên cluster
print(f"Number of partitions: {original_rdd.getNumPartitions()}")
print(f"Partitions data: {original_rdd.glom().collect()}")
# 3. Fault-tolerant - Tự phục hồi khi lỗi
# RDD lưu trữ lineage để có thể rebuild khi cần
print(f"RDD lineage: {doubled_rdd.toDebugString()}")
# 4. Lazy evaluation - Chỉ thực thi khi có action
print("Creating transformation (chưa thực thi)...")
filtered_rdd = doubled_rdd.filter(lambda x: x > 5)
print("Calling action (bây giờ mới thực thi)...")
result = filtered_rdd.collect()
print("Filtered result:", result)
spark.stop()RDD Operations: Transformations vs Actions
def demonstrate_transformations_actions():
"""
Phân biệt Transformations (lazy) và Actions (eager)
"""
spark = create_lakehouse_spark_session("Transformations-Actions")
sc = spark.sparkContext
# Sample Lakehouse transaction data
transactions = [
("TRANS001", "CUST001", 1000000, "LOAN_PAYMENT"),
("TRANS002", "CUST002", 500000, "DEPOSIT"),
("TRANS003", "CUST001", 2000000, "LOAN_DISBURSEMENT"),
("TRANS004", "CUST003", 750000, "LOAN_PAYMENT"),
("TRANS005", "CUST002", 300000, "WITHDRAWAL")
]
rdd = sc.parallelize(transactions)
print("📊 Original transaction data:")
print(rdd.collect())
# TRANSFORMATIONS (Lazy - không thực thi ngay)
print("\n🔄 Applying transformations (lazy)...")
# Map: Transform mỗi record
amount_rdd = rdd.map(lambda x: (x[1], x[2])) # (customer_id, amount)
print("✓ Map transformation applied")
# Filter: Lọc dữ liệu
large_amounts = amount_rdd.filter(lambda x: x[1] > 600000)
print("✓ Filter transformation applied")
# ReduceByKey: Aggregate theo key
customer_totals = large_amounts.reduceByKey(lambda a, b: a + b)
print("✓ ReduceByKey transformation applied")
print("⚠️ Chưa có computation nào được thực hiện!")
# ACTIONS (Eager - thực thi ngay)
print("\n⚡ Calling actions (eager execution)...")
# Collect: Lấy tất cả dữ liệu về driver
results = customer_totals.collect()
print(f"Customer totals (large transactions): {results}")
# Count: Đếm số records
count = customer_totals.count()
print(f"Number of customers with large transactions: {count}")
# Take: Lấy n records đầu
sample = customer_totals.take(2)
print(f"Sample results: {sample}")
# First: Lấy record đầu tiên
first_result = customer_totals.first()
print(f"First result: {first_result}")
spark.stop()
# Chạy demo
demonstrate_transformations_actions()RDD Partitioning cho Performance
def demonstrate_partitioning():
"""
Minh họa tầm quan trọng của partitioning trong Spark
"""
spark = create_lakehouse_spark_session("Partitioning-Demo")
sc = spark.sparkContext
# Tạo dữ liệu lớn hơn
large_transactions = []
customers = [f"CUST{i:04d}" for i in range(1, 1001)] # 1000 customers
import random
for i in range(10000): # 10K transactions
customer = random.choice(customers)
amount = random.randint(100000, 5000000)
transaction_type = random.choice(["LOAN_PAYMENT", "DEPOSIT", "WITHDRAWAL"])
large_transactions.append((f"TRANS{i:06d}", customer, amount, transaction_type))
# Tạo RDD với partition mặc định
rdd_default = sc.parallelize(large_transactions)
print(f"Default partitions: {rdd_default.getNumPartitions()}")
# Tạo RDD với số partition tùy chỉnh
rdd_custom = sc.parallelize(large_transactions, numSlices=8)
print(f"Custom partitions: {rdd_custom.getNumPartitions()}")
# Partition theo key (customer_id) để tối ưu join/groupBy
customer_amounts = rdd_custom.map(lambda x: (x[1], x[2])) # (customer_id, amount)
# Hash partitioning - phân phối đều theo hash của key
hash_partitioned = customer_amounts.partitionBy(4)
print(f"Hash partitioned: {hash_partitioned.getNumPartitions()}")
# So sánh performance
import time
print("\n⏱️ Performance comparison:")
# Test 1: GroupBy without partitioning
start_time = time.time()
result1 = customer_amounts.groupByKey().mapValues(sum).count()
time1 = time.time() - start_time
print(f"GroupBy without partitioning: {time1:.2f}s")
# Test 2: GroupBy with partitioning
start_time = time.time()
result2 = hash_partitioned.groupByKey().mapValues(sum).count()
time2 = time.time() - start_time
print(f"GroupBy with partitioning: {time2:.2f}s")
improvement = ((time1 - time2) / time1) * 100
print(f"Performance improvement: {improvement:.1f}%")
spark.stop()Spark SQL & DataFrames
DataFrame API - High-level abstraction
DataFrame cung cấp structured data abstraction với schema, tối ưu hơn RDD cho hầu hết use cases.
def demonstrate_dataframes():
"""
Minh họa DataFrame API cho Lakehouse financial data
"""
spark = create_lakehouse_spark_session("DataFrame-Demo")
# Tạo sample Lakehouse data với schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from datetime import datetime, timedelta
import random
# Define schema cho customer data
customer_schema = StructType([
StructField("customer_id", StringType(), False),
StructField("customer_code", StringType(), False),
StructField("full_name", StringType(), False),
StructField("phone_number", StringType(), True),
StructField("email", StringType(), True),
StructField("credit_score", IntegerType(), True),
StructField("registration_date", TimestampType(), False)
])
# Sample customer data
customers_data = [
("1", "CUST001", "Nguyen Van An", "0901234567", "an.nguyen@email.com", 750, datetime.now() - timedelta(days=365)),
("2", "CUST002", "Tran Thi Binh", "0912345678", "binh.tran@email.com", 680, datetime.now() - timedelta(days=300)),
("3", "CUST003", "Le Van Cuong", "0923456789", "cuong.le@email.com", 720, datetime.now() - timedelta(days=200)),
("4", "CUST004", "Pham Thi Dung", "0934567890", "dung.pham@email.com", 800, datetime.now() - timedelta(days=150)),
("5", "CUST005", "Hoang Van Duc", "0945678901", "duc.hoang@email.com", 650, datetime.now() - timedelta(days=100))
]
customers_df = spark.createDataFrame(customers_data, customer_schema)
print("📊 Customer DataFrame:")
customers_df.show()
customers_df.printSchema()
# Basic DataFrame operations
print("\n🔍 DataFrame operations:")
# Select specific columns
print("Select columns:")
customers_df.select("customer_code", "full_name", "credit_score").show()
# Filter data
print("High credit score customers:")
high_credit = customers_df.filter(col("credit_score") > 700)
high_credit.show()
# Add computed columns
print("Add computed columns:")
customers_enhanced = customers_df.withColumn(
"credit_category",
when(col("credit_score") >= 750, "EXCELLENT")
.when(col("credit_score") >= 700, "GOOD")
.when(col("credit_score") >= 650, "FAIR")
.otherwise("POOR")
).withColumn(
"days_since_registration",
datediff(current_date(), col("registration_date"))
)
customers_enhanced.select(
"customer_code", "credit_score", "credit_category",
"days_since_registration"
).show()
# Aggregations
print("Credit score statistics:")
customers_df.agg(
avg("credit_score").alias("avg_credit_score"),
min("credit_score").alias("min_credit_score"),
max("credit_score").alias("max_credit_score"),
count("customer_id").alias("total_customers")
).show()
# Group by operations
print("Customers by credit category:")
customers_enhanced.groupBy("credit_category") \
.agg(count("customer_id").alias("customer_count"),
avg("credit_score").alias("avg_score")) \
.orderBy("avg_score", ascending=False) \
.show()
spark.stop()Spark SQL - Familiar SQL Interface
def demonstrate_spark_sql():
"""
Minh họa Spark SQL để phân tích dữ liệu Lakehouse
"""
spark = create_lakehouse_spark_session("SparkSQL-Demo")
# Tạo bảng customers (temporary view)
customers_data = [
("CUST001", "Nguyen Van An", 750, "Ho Chi Minh", "Individual"),
("CUST002", "Tran Thi Binh", 680, "Ha Noi", "Individual"),
("CUST003", "Le Van Cuong", 720, "Da Nang", "Individual"),
("CUST004", "ABC Company Ltd", 800, "Ho Chi Minh", "Business"),
("CUST005", "XYZ Trading Co", 650, "Ha Noi", "Business")
]
customers_df = spark.createDataFrame(
customers_data,
["customer_code", "name", "credit_score", "city", "customer_type"]
)
customers_df.createOrReplaceTempView("customers")
# Tạo bảng loans
loans_data = [
("LOAN001", "CUST001", 50000000, 0.12, "PERSONAL", "APPROVED"),
("LOAN002", "CUST002", 30000000, 0.13, "PERSONAL", "APPROVED"),
("LOAN003", "CUST003", 75000000, 0.11, "PERSONAL", "DISBURSED"),
("LOAN004", "CUST004", 200000000, 0.10, "BUSINESS", "APPROVED"),
("LOAN005", "CUST005", 150000000, 0.11, "BUSINESS", "DISBURSED"),
("LOAN006", "CUST001", 25000000, 0.13, "PERSONAL", "PENDING")
]
loans_df = spark.createDataFrame(
loans_data,
["loan_number", "customer_code", "amount", "interest_rate", "loan_type", "status"]
)
loans_df.createOrReplaceTempView("loans")
print("📋 Available tables:")
spark.sql("SHOW TABLES").show()
# SQL Analytics cho Lakehouse business
print("\n📊 Lakehouse Business Analytics với SQL:")
# 1. Customer portfolio analysis
print("1. Customer Portfolio Analysis:")
portfolio_sql = """
SELECT
c.customer_code,
c.name,
c.credit_score,
c.city,
COUNT(l.loan_number) as total_loans,
SUM(l.amount) as total_borrowed,
AVG(l.interest_rate) as avg_interest_rate,
MAX(CASE WHEN l.status = 'DISBURSED' THEN l.amount ELSE 0 END) as max_disbursed_amount
FROM customers c
LEFT JOIN loans l ON c.customer_code = l.customer_code
GROUP BY c.customer_code, c.name, c.credit_score, c.city
ORDER BY total_borrowed DESC NULLS LAST
"""
spark.sql(portfolio_sql).show()
# 2. Risk assessment by city
print("2. Risk Assessment by City:")
risk_sql = """
SELECT
c.city,
COUNT(DISTINCT c.customer_code) as total_customers,
AVG(c.credit_score) as avg_credit_score,
SUM(l.amount) as total_loan_amount,
AVG(l.interest_rate) as avg_interest_rate,
COUNT(CASE WHEN l.status = 'APPROVED' THEN 1 END) as approved_loans,
COUNT(CASE WHEN l.status = 'DISBURSED' THEN 1 END) as disbursed_loans
FROM customers c
LEFT JOIN loans l ON c.customer_code = l.customer_code
GROUP BY c.city
ORDER BY total_loan_amount DESC
"""
spark.sql(risk_sql).show()
# 3. Loan type performance
print("3. Loan Type Performance:")
performance_sql = """
SELECT
l.loan_type,
COUNT(*) as loan_count,
SUM(l.amount) as total_amount,
AVG(l.amount) as avg_amount,
AVG(l.interest_rate) as avg_interest_rate,
COUNT(CASE WHEN l.status = 'APPROVED' THEN 1 END) * 100.0 / COUNT(*) as approval_rate,
COUNT(CASE WHEN l.status = 'DISBURSED' THEN 1 END) * 100.0 / COUNT(*) as disbursement_rate
FROM loans l
GROUP BY l.loan_type
ORDER BY total_amount DESC
"""
spark.sql(performance_sql).show()
# 4. Advanced window functions
print("4. Customer Ranking by Credit Score (Window Functions):")
ranking_sql = """
SELECT
customer_code,
name,
city,
credit_score,
ROW_NUMBER() OVER (PARTITION BY city ORDER BY credit_score DESC) as city_rank,
RANK() OVER (ORDER BY credit_score DESC) as overall_rank,
NTILE(3) OVER (ORDER BY credit_score DESC) as credit_tier
FROM customers
ORDER BY credit_score DESC
"""
spark.sql(ranking_sql).show()
spark.stop()Hands-on Labs - Cơ bản
Lab 1: Thiết lập Spark Environment
#!/bin/bash
# lab1_setup_spark.sh
echo "=== Lab 1: Thiết lập Spark Environment ==="
# Bước 1: Kiểm tra Docker containers
echo "🔍 Kiểm tra Spark containers..."
docker ps | grep spark
# Bước 2: Khởi động Spark cluster nếu cần
echo "🚀 Khởi động Spark cluster..."
docker-compose up -d spark-master spark-worker-1 spark-worker-2
# Đợi containers khởi động
sleep 30
# Bước 3: Kiểm tra Spark Master UI
echo "🌐 Kiểm tra Spark Master UI..."
curl -s http://localhost:8080 > /dev/null && echo "✅ Spark Master UI accessible" || echo "❌ Cannot access Spark Master UI"
# Bước 4: Khởi động Jupyter
echo "📓 Khởi động Jupyter Notebook..."
docker-compose up -d jupyter-spark
# Đợi Jupyter khởi động
sleep 20
# Bước 5: Tạo thư mục work
echo "📁 Tạo thư mục làm việc..."
mkdir -p notebooks/labs
mkdir -p data/sample
echo "✅ Setup hoàn tất!"
echo "🔗 Spark Master UI: http://localhost:8080"
echo "📚 Jupyter Lab: http://localhost:8888"Lab 2: First Spark Application
# notebooks/labs/lab2_first_spark_app.py
"""
Lab 2: Ứng dụng Spark đầu tiên cho Lakehouse
Mục tiêu: Làm quen với Spark Session, RDD, và DataFrame
"""
# Import thư viện cần thiết
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from datetime import datetime, timedelta
def lab2_first_spark_app():
"""
Lab 2: Tạo ứng dụng Spark đầu tiên
"""
print("🎯 Lab 2: First Spark Application")
print("="*50)
# Bước 1: Tạo Spark Session
print("Bước 1: Tạo Spark Session")
spark = SparkSession.builder \
.appName("Lakehouse-Lab2-FirstApp") \
.master("spark://spark-master:7077") \
.config("spark.executor.memory", "1g") \
.config("spark.executor.cores", "1") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
print(f"✅ Spark Session created: {spark.version}")
print(f"🖥️ Spark UI: {spark.sparkContext.uiWebUrl}")
# Bước 2: Tạo sample data
print("\nBước 2: Tạo sample data")
# Sample customer data cho Lakehouse
customers_data = [
("C001", "Nguyen Van An", "an.nguyen@lakehouse.vn", 750, "HCM"),
("C002", "Tran Thi Binh", "binh.tran@lakehouse.vn", 680, "HN"),
("C003", "Le Van Cuong", "cuong.le@lakehouse.vn", 720, "DN"),
("C004", "Pham Thi Dung", "dung.pham@lakehouse.vn", 800, "HCM"),
("C005", "Hoang Van Duc", "duc.hoang@lakehouse.vn", 650, "HN")
]
schema = StructType([
StructField("customer_id", StringType(), False),
StructField("name", StringType(), False),
StructField("email", StringType(), False),
StructField("credit_score", IntegerType(), False),
StructField("city", StringType(), False)
])
df = spark.createDataFrame(customers_data, schema)
print("✅ Sample DataFrame created")
# Bước 3: Basic DataFrame operations
print("\nBước 3: Basic DataFrame Operations")
# Show data
print("📊 Customer Data:")
df.show()
# Show schema
print("📋 Schema:")
df.printSchema()
# Basic statistics
print("📈 Basic Statistics:")
df.describe("credit_score").show()
# Bước 4: Data filtering và transformation
print("\nBước 4: Data Filtering & Transformation")
# Filter high credit score customers
high_credit_df = df.filter(col("credit_score") > 700)
print("🎯 High Credit Score Customers (>700):")
high_credit_df.show()
# Add computed columns
enhanced_df = df.withColumn(
"credit_category",
when(col("credit_score") >= 750, "EXCELLENT")
.when(col("credit_score") >= 700, "GOOD")
.when(col("credit_score") >= 650, "FAIR")
.otherwise("POOR")
).withColumn(
"email_domain",
split(col("email"), "@").getItem(1)
)
print("🔄 Enhanced Data with Computed Columns:")
enhanced_df.select("customer_id", "name", "credit_score",
"credit_category", "email_domain").show()
# Bước 5: Aggregations
print("\nBước 5: Aggregations")
# Group by city
city_stats = enhanced_df.groupBy("city") \
.agg(count("customer_id").alias("customer_count"),
avg("credit_score").alias("avg_credit_score"),
max("credit_score").alias("max_credit_score"),
min("credit_score").alias("min_credit_score")) \
.orderBy("avg_credit_score", ascending=False)
print("🏙️ Statistics by City:")
city_stats.show()
# Credit category distribution
category_dist = enhanced_df.groupBy("credit_category") \
.agg(count("customer_id").alias("count"),
avg("credit_score").alias("avg_score")) \
.orderBy("avg_score", ascending=False)
print("📊 Credit Category Distribution:")
category_dist.show()
# Bước 6: SQL interface
print("\nBước 6: SQL Interface")
# Register as temporary view
enhanced_df.createOrReplaceTempView("customers")
# SQL query
sql_result = spark.sql("""
SELECT
city,
credit_category,
COUNT(*) as customer_count,
AVG(credit_score) as avg_score
FROM customers
GROUP BY city, credit_category
ORDER BY city, avg_score DESC
""")
print("💾 SQL Query Result:")
sql_result.show()
# Bước 7: Save results
print("\nBước 7: Save Results")
# Save to different formats
try:
# Parquet format (recommended for analytics)
output_path = "/home/jovyan/data/lab2_output"
city_stats.coalesce(1).write.mode("overwrite").parquet(f"{output_path}/city_stats.parquet")
print(f"✅ Data saved to {output_path}/city_stats.parquet")
# CSV format
category_dist.coalesce(1).write.mode("overwrite") \
.option("header", "true").csv(f"{output_path}/category_dist.csv")
print(f"✅ Data saved to {output_path}/category_dist.csv")
except Exception as e:
print(f"⚠️ Save error: {e}")
# Bước 8: Performance insights
print("\nBước 8: Performance Insights")
print(f"🔢 DataFrame partitions: {df.rdd.getNumPartitions()}")
print(f"📊 Records count: {df.count()}")
# Clean up
spark.stop()
print("\n✅ Lab 2 completed successfully!")
# Chạy lab
if __name__ == "__main__":
lab2_first_spark_app()Lab 3: Working với Real Lakehouse Data
# notebooks/labs/lab3_real_lakehouse_data.py
"""
Lab 3: Làm việc với dữ liệu thực tế của Lakehouse
Mục tiêu: Kết nối database, xử lý dữ liệu business logic
"""
def lab3_real_lakehouse_data():
"""
Lab 3: Xử lý dữ liệu Lakehouse thực tế
"""
print("🎯 Lab 3: Working with Real Lakehouse Data")
print("="*50)
# Tạo Spark Session với database connectors
spark = SparkSession.builder \
.appName("Lakehouse-Lab3-RealData") \
.master("spark://spark-master:7077") \
.config("spark.jars", "/opt/bitnami/spark/jars/postgresql-42.6.0.jar") \
.getOrCreate()
print("✅ Spark Session with database support created")
# Database connection properties
db_properties = {
"user": "lakehouse_admin",
"password": "CHANGE_ME_PASSWORD",
"driver": "org.postgresql.Driver"
}
db_url = "jdbc:postgresql://postgresql:5432/lakehouse_db"
try:
# Bước 1: Đọc dữ liệu từ PostgreSQL
print("\nBước 1: Load data from PostgreSQL")
# Load customers table
customers_df = spark.read.jdbc(
url=db_url,
table="core.customers",
properties=db_properties
)
print(f"✅ Loaded {customers_df.count()} customers")
# Load loans table
loans_df = spark.read.jdbc(
url=db_url,
table="core.loans",
properties=db_properties
)
print(f"✅ Loaded {loans_df.count()} loans")
# Bước 2: Data exploration
print("\nBước 2: Data Exploration")
print("👥 Customers sample:")
customers_df.select("customer_code", "full_name", "credit_score", "kyc_status").show(5)
print("💰 Loans sample:")
loans_df.select("loan_number", "customer_id", "loan_amount", "status").show(5)
# Bước 3: Business analytics
print("\nBước 3: Business Analytics")
# Join customers và loans
customer_loans = customers_df.alias("c") \
.join(loans_df.alias("l"),
customers_df.customer_id == loans_df.customer_id,
"left")
# Customer portfolio analysis
portfolio_analysis = customer_loans.groupBy(
"c.customer_code", "c.full_name", "c.credit_score", "c.kyc_status"
).agg(
count("l.loan_id").alias("total_loans"),
sum("l.loan_amount").alias("total_borrowed"),
avg("l.loan_amount").alias("avg_loan_amount"),
max("l.loan_amount").alias("max_loan_amount"),
countDistinct("l.product_type").alias("product_diversity")
).filter(col("total_loans") > 0)
print("📊 Customer Portfolio Analysis:")
portfolio_analysis.orderBy("total_borrowed", ascending=False).show(10)
# Risk assessment
risk_analysis = portfolio_analysis.withColumn(
"risk_category",
when(col("credit_score") >= 750, "LOW_RISK")
.when(col("credit_score") >= 650, "MEDIUM_RISK")
.otherwise("HIGH_RISK")
).withColumn(
"exposure_level",
when(col("total_borrowed") >= 100000000, "HIGH_EXPOSURE")
.when(col("total_borrowed") >= 50000000, "MEDIUM_EXPOSURE")
.otherwise("LOW_EXPOSURE")
)
print("⚠️ Risk Assessment Summary:")
risk_analysis.groupBy("risk_category", "exposure_level") \
.agg(count("customer_code").alias("customer_count"),
sum("total_borrowed").alias("total_exposure"),
avg("credit_score").alias("avg_credit_score")) \
.orderBy("risk_category", "exposure_level") \
.show()
# Bước 4: Advanced analytics
print("\nBước 4: Advanced Analytics")
# Window functions cho ranking
from pyspark.sql.window import Window
# Rank customers by total borrowed within each risk category
window_spec = Window.partitionBy("risk_category").orderBy(col("total_borrowed").desc())
ranked_customers = risk_analysis.withColumn(
"rank_within_risk_category",
row_number().over(window_spec)
).withColumn(
"percentile_within_category",
percent_rank().over(window_spec)
)
print("🏆 Top customers by risk category:")
ranked_customers.filter(col("rank_within_risk_category") <= 3) \
.select("customer_code", "full_name", "risk_category",
"total_borrowed", "rank_within_risk_category") \
.orderBy("risk_category", "rank_within_risk_category") \
.show()
# Cohort analysis based on KYC status
kyc_performance = customer_loans.groupBy("c.kyc_status") \
.agg(count("c.customer_id").alias("total_customers"),
countDistinct(when(col("l.loan_id").isNotNull(), col("c.customer_id"))).alias("customers_with_loans"),
sum("l.loan_amount").alias("total_loan_amount"),
avg("c.credit_score").alias("avg_credit_score")) \
.withColumn("loan_penetration_rate",
col("customers_with_loans") * 100.0 / col("total_customers"))
print("📈 Performance by KYC Status:")
kyc_performance.show()
# Bước 5: Save insights to data lake
print("\nBước 5: Save Insights")
# Save to MinIO/S3 (nếu cấu hình available)
try:
# Portfolio analysis
portfolio_analysis.coalesce(1) \
.write.mode("overwrite") \
.parquet("s3a://processed-data/analytics/customer_portfolio/")
print("✅ Portfolio analysis saved to data lake")
# Risk analysis
risk_analysis.coalesce(1) \
.write.mode("overwrite") \
.parquet("s3a://processed-data/analytics/risk_assessment/")
print("✅ Risk analysis saved to data lake")
except Exception as e:
print(f"⚠️ Could not save to data lake: {e}")
# Fallback: save locally
portfolio_analysis.coalesce(1) \
.write.mode("overwrite") \
.parquet("/home/jovyan/data/customer_portfolio")
print("✅ Portfolio analysis saved locally")
except Exception as e:
print(f"❌ Error: {e}")
print("💡 Make sure PostgreSQL is running and contains sample data")
finally:
spark.stop()
print("\n✅ Lab 3 completed!")
# Chạy lab
if __name__ == "__main__":
lab3_real_lakehouse_data()Tổng kết Phần 1
✅ Kiến thức đã học
-
Apache Spark Fundamentals
- Hiểu về Spark architecture và vai trò trong Lakehouse
- RDD concepts: immutable, distributed, fault-tolerant
- Transformations vs Actions và lazy evaluation
-
Spark SQL & DataFrames
# Lab 3.2: Customer Loan Applications Analysis
# =============================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, date
# Create SparkSession
spark = SparkSession.builder \
.appName("Lakehouse_Customer_Loan_Analysis") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Define schemas
customer_schema = StructType([
StructField("customer_id", StringType(), False),
StructField("customer_code", StringType(), False),
StructField("full_name", StringType(), False),
StructField("phone", StringType(), True),
StructField("email", StringType(), True),
StructField("credit_score", IntegerType(), True),
StructField("registration_date", DateType(), False)
])
loan_schema = StructType([
StructField("loan_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("loan_amount", DoubleType(), False),
StructField("interest_rate", DoubleType(), False),
StructField("term_months", IntegerType(), False),
StructField("application_date", DateType(), False),
StructField("status", StringType(), False)
])
# Sample customer data
customers_data = [
("1", "CUST001", "Nguyen Van An", "0901234567", "an.nguyen@email.com", 750, date(2023, 1, 15)),
("2", "CUST002", "Tran Thi Binh", "0912345678", "binh.tran@email.com", 680, date(2023, 3, 20)),
("3", "CUST003", "Le Van Cuong", "0923456789", "cuong.le@email.com", 720, date(2023, 6, 10)),
("4", "CUST004", "Pham Thi Dung", "0934567890", "dung.pham@email.com", 800, date(2023, 8, 5)),
("5", "CUST005", "Hoang Van Duc", "0945678901", "duc.hoang@email.com", 650, date(2023, 9, 25))
]
# Sample loan data
loans_data = [
("L001", "1", 50000000, 0.12, 12, date(2024, 1, 15), "APPROVED"),
("L002", "2", 30000000, 0.15, 18, date(2024, 1, 20), "APPROVED"),
("L003", "3", 75000000, 0.10, 24, date(2024, 2, 1), "APPROVED"),
("L004", "4", 100000000, 0.08, 36, date(2024, 2, 10), "PENDING"),
("L005", "1", 25000000, 0.14, 6, date(2024, 2, 15), "REJECTED"),
("L006", "5", 40000000, 0.16, 12, date(2024, 3, 1), "APPROVED")
]
# Create DataFrames
customers_df = spark.createDataFrame(customers_data, customer_schema)
loans_df = spark.createDataFrame(loans_data, loan_schema)
print("📊 Customer và Loan DataFrames:")
print("\nCustomers:")
customers_df.show()
print("\nLoans:")
loans_df.show()
# Register as temporary views
customers_df.createOrReplaceTempView("customers")
loans_df.createOrReplaceTempView("loans")
# Complex SQL queries
print("\n🔍 Complex Spark SQL Queries:")
# 1. Customer loan summary
print("1. Customer Loan Summary:")
loan_summary = spark.sql("""
SELECT
c.customer_code,
c.full_name,
c.credit_score,
COUNT(l.loan_id) as total_loans,
SUM(CASE WHEN l.status = 'APPROVED' THEN l.loan_amount ELSE 0 END) as approved_amount,
AVG(l.interest_rate) as avg_interest_rate,
MAX(l.application_date) as latest_application
FROM customers c
LEFT JOIN loans l ON c.customer_id = l.customer_id
GROUP BY c.customer_id, c.customer_code, c.full_name, c.credit_score
ORDER BY approved_amount DESC
""")
loan_summary.show()
# 2. Risk analysis by credit score
print("2. Risk Analysis by Credit Score:")
risk_analysis = spark.sql("""
SELECT
CASE
WHEN c.credit_score >= 750 THEN 'LOW_RISK'
WHEN c.credit_score >= 700 THEN 'MEDIUM_RISK'
WHEN c.credit_score >= 650 THEN 'HIGH_RISK'
ELSE 'VERY_HIGH_RISK'
END as risk_category,
COUNT(DISTINCT c.customer_id) as customer_count,
COUNT(l.loan_id) as loan_applications,
SUM(CASE WHEN l.status = 'APPROVED' THEN 1 ELSE 0 END) as approved_loans,
ROUND(
SUM(CASE WHEN l.status = 'APPROVED' THEN 1 ELSE 0 END) * 100.0 / COUNT(l.loan_id), 2
) as approval_rate_percent,
ROUND(AVG(l.loan_amount), 0) as avg_loan_amount
FROM customers c
JOIN loans l ON c.customer_id = l.customer_id
GROUP BY
CASE
WHEN c.credit_score >= 750 THEN 'LOW_RISK'
WHEN c.credit_score >= 700 THEN 'MEDIUM_RISK'
WHEN c.credit_score >= 650 THEN 'HIGH_RISK'
ELSE 'VERY_HIGH_RISK'
END
ORDER BY approval_rate_percent DESC
""")
risk_analysis.show()
# 3. Monthly loan trend
print("3. Monthly Loan Trends:")
monthly_trend = spark.sql("""
SELECT
YEAR(application_date) as year,
MONTH(application_date) as month,
COUNT(*) as applications,
SUM(CASE WHEN status = 'APPROVED' THEN 1 ELSE 0 END) as approved,
SUM(CASE WHEN status = 'APPROVED' THEN loan_amount ELSE 0 END) as total_approved_amount,
AVG(loan_amount) as avg_loan_amount
FROM loans
GROUP BY YEAR(application_date), MONTH(application_date)
ORDER BY year, month
""")
monthly_trend.show()
# DataFrame API equivalent operations
print("\n📈 DataFrame API Operations:")
# Window functions
from pyspark.sql.window import Window
# Calculate running total per customer
window_spec = Window.partitionBy("customer_id").orderBy("application_date")
loans_with_running_total = loans_df.withColumn(
"running_loan_total",
sum("loan_amount").over(window_spec)
).withColumn(
"loan_rank",
row_number().over(window_spec)
)
print("Running loan totals per customer:")
loans_with_running_total.select(
"customer_id", "loan_id", "loan_amount",
"running_loan_total", "loan_rank"
).show()
# Advanced aggregations
print("Advanced loan statistics:")
loans_df.agg(
count("*").alias("total_applications"),
sum(when(col("status") == "APPROVED", 1).otherwise(0)).alias("approved_count"),
avg("loan_amount").alias("avg_loan_amount"),
stddev("loan_amount").alias("loan_amount_stddev"),
percentile_approx("loan_amount", 0.5).alias("median_loan_amount"),
max("loan_amount").alias("max_loan_amount"),
min("loan_amount").alias("min_loan_amount")
).show()
spark.stop()Ý nghĩa trong Business Context Lakehouse:
- Customer Segmentation: Phân khúc khách hàng theo credit score
- Risk Assessment: Đánh giá rủi ro và tỷ lệ phê duyệt
- Trend Analysis: Phân tích xu hướng vay vốn theo thời gian
- Portfolio Management: Quản lý danh mục cho vay
- Hands-on Experience
# Lab 3.3: Production-Ready Spark Application for Lakehouse
# ==================================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_spark_session(app_name: str):
"""Create optimized Spark session for Lakehouse workloads"""
return SparkSession.builder \
.appName(app_name) \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
def load_customer_data(spark, data_path=None):
"""Load customer data from various sources"""
if data_path:
# In production, load from parquet/delta files
return spark.read.parquet(data_path)
else:
# Sample data for demo
schema = StructType([
StructField("customer_id", StringType(), False),
StructField("customer_code", StringType(), False),
StructField("full_name", StringType(), False),
StructField("phone", StringType(), True),
StructField("email", StringType(), True),
StructField("credit_score", IntegerType(), True),
StructField("monthly_income", DoubleType(), True),
StructField("employment_type", StringType(), True),
StructField("registration_date", DateType(), False),
StructField("last_login", TimestampType(), True)
])
data = [
("1", "CUST001", "Nguyen Van An", "0901234567", "an.nguyen@email.com",
750, 25000000, "FULL_TIME", date(2023, 1, 15), datetime.now()),
("2", "CUST002", "Tran Thi Binh", "0912345678", "binh.tran@email.com",
680, 18000000, "PART_TIME", date(2023, 3, 20), datetime.now()),
("3", "CUST003", "Le Van Cuong", "0923456789", "cuong.le@email.com",
720, 30000000, "FULL_TIME", date(2023, 6, 10), datetime.now()),
("4", "CUST004", "Pham Thi Dung", "0934567890", "dung.pham@email.com",
800, 45000000, "BUSINESS", date(2023, 8, 5), datetime.now()),
("5", "CUST005", "Hoang Van Duc", "0945678901", "duc.hoang@email.com",
650, 15000000, "PART_TIME", date(2023, 9, 25), datetime.now())
]
return spark.createDataFrame(data, schema)
def calculate_loan_eligibility(customers_df):
"""Calculate loan eligibility based on Lakehouse business rules"""
# Business rules for Lakehouse
eligibility_df = customers_df.withColumn(
"max_loan_amount",
when(col("employment_type") == "BUSINESS", col("monthly_income") * 8)
.when(col("employment_type") == "FULL_TIME", col("monthly_income") * 6)
.when(col("employment_type") == "PART_TIME", col("monthly_income") * 3)
.otherwise(0)
).withColumn(
"interest_rate",
when(col("credit_score") >= 750, 0.08)
.when(col("credit_score") >= 700, 0.10)
.when(col("credit_score") >= 650, 0.12)
.otherwise(0.15)
).withColumn(
"risk_category",
when(col("credit_score") >= 750, "LOW")
.when(col("credit_score") >= 700, "MEDIUM")
.when(col("credit_score") >= 650, "HIGH")
.otherwise("VERY_HIGH")
).withColumn(
"loan_eligible",
(col("credit_score") >= 600) & (col("monthly_income") >= 10000000)
)
return eligibility_df
def generate_loan_recommendations(eligibility_df):
"""Generate personalized loan recommendations"""
recommendations = eligibility_df.filter(col("loan_eligible") == True) \
.withColumn(
"recommended_amount",
col("max_loan_amount") * 0.8 # Conservative recommendation
).withColumn(
"recommended_term",
when(col("risk_category") == "LOW", 36)
.when(col("risk_category") == "MEDIUM", 24)
.otherwise(12)
).withColumn(
"monthly_payment",
(col("recommended_amount") * (col("interest_rate") / 12)) /
(1 - pow(1 + (col("interest_rate") / 12), -col("recommended_term")))
).withColumn(
"payment_to_income_ratio",
col("monthly_payment") / col("monthly_income")
)
return recommendations
def main():
"""Main application logic"""
logger.info("Starting Lakehouse Loan Eligibility Application")
# Create Spark session
spark = create_spark_session("Lakehouse_Loan_Eligibility_Engine")
try:
# Load customer data
logger.info("Loading customer data...")
customers_df = load_customer_data(spark)
# Cache frequently used data
customers_df.cache()
print("📊 Customer Data Overview:")
customers_df.show()
# Calculate eligibility
logger.info("Calculating loan eligibility...")
eligibility_df = calculate_loan_eligibility(customers_df)
print("\n💰 Loan Eligibility Analysis:")
eligibility_df.select(
"customer_code", "full_name", "credit_score",
"max_loan_amount", "interest_rate", "risk_category", "loan_eligible"
).show()
# Generate recommendations
logger.info("Generating loan recommendations...")
recommendations_df = generate_loan_recommendations(eligibility_df)
print("\n🎯 Loan Recommendations:")
recommendations_df.select(
"customer_code", "recommended_amount", "recommended_term",
"monthly_payment", "payment_to_income_ratio"
).show()
# Business insights
print("\n📈 Business Analytics:")
# Eligibility statistics
print("Eligibility by Risk Category:")
eligibility_df.groupBy("risk_category", "loan_eligible") \
.agg(count("*").alias("customer_count"),
avg("max_loan_amount").alias("avg_max_loan")) \
.orderBy("risk_category") \
.show()
# Revenue potential
print("Revenue Potential Analysis:")
revenue_analysis = recommendations_df.agg(
sum("recommended_amount").alias("total_loan_portfolio"),
avg("recommended_amount").alias("avg_loan_amount"),
sum("monthly_payment").alias("total_monthly_revenue"),
avg("payment_to_income_ratio").alias("avg_payment_ratio")
)
revenue_analysis.show()
# Save results (in production, write to data lake)
logger.info("Saving results...")
# recommendations_df.write.mode("overwrite").parquet("/path/to/output")
logger.info("Application completed successfully")
except Exception as e:
logger.error(f"Application failed: {str(e)}")
raise
finally:
spark.stop()
if __name__ == "__main__":
main()Key Production Features:
- Error Handling: Comprehensive exception handling
- Logging: Structured logging cho monitoring
- Configuration: Optimized Spark configs
- Caching: Smart caching strategy
- Business Logic: Lakehouse-specific loan rules
- Performance: Arrow-based execution
🎯 Chuẩn bị cho Phần 2
Phần 2 sẽ tập trung vào:
- Structured Streaming cho real-time analytics
- MLlib cho machine learning workflows
- Performance tuning và optimization techniques
- Advanced integrations với Kafka, Delta Lake
📝 Bài tập về nhà
-
Thực hành DataFrame Operations: Tạo DataFrame từ CSV file chứa dữ liệu khách hàng Lakehouse và thực hiện các transformations cơ bản.
-
SQL Query Challenge: Viết các SQL queries phức tạp để phân tích portfolio loans, tính toán các metrics như NPL ratio, average loan amount by region.
-
Performance Optimization: So sánh performance giữa RDD và DataFrame operations với cùng một dataset.
🔗 Resources
- Apache Spark Documentation
- PySpark API Reference
- Spark SQL Guide
- Lakehouse Data Platform Architecture
📞 Support
Nếu gặp vấn đề trong quá trình học tập:
- Slack channel: #lakehouse-data-platform
- Email:
data-platform@lakehouse.vn - Internal wiki: confluence.lakehouse.vn/data-platform
🎉 Chúc mừng bạn đã hoàn thành Apache Spark Fundamentals! Hãy tiếp tục với Phần 2 để khám phá các tính năng advanced hơn.
- Chạy tất cả labs và chụp screenshots kết quả
- Thử nghiệm với dataset Lakehouse lớn hơn
- Viết SQL queries phức tạp hơn cho business analysis
- Explore Spark UI để hiểu execution plans
Sẵn sàng cho review và feedback trước khi chuyển sang Phần 2! 🚀
