Năng suất & công cụ dev
Apache Spark — Hướng dẫn đào tạo (Phần 2)
Structured Streaming, MLlib, credit scoring, fraud detection.
2026-03-1717 phút đọcVI
Structured Streaming & MLlib for Lakehouse
Đối tượng: Data Engineers, Data Scientists, ML Engineers Thời lượng đào tạo: 3-4 giờ (Phần 2) Cập nhật lần cuối: 26/06/2025 Phiên bản: 1.0.0 Tiền đề: Hoàn thành Phần 1 - Spark Fundamentals
Mục lục
- Structured Streaming Fundamentals
- Real-time Analytics cho Lakehouse
- MLlib Machine Learning
- Credit Scoring Models
- Fraud Detection System
- Advanced Hands-on Labs
Structured Streaming Fundamentals
Structured Streaming trong Lakehouse Real-time Analytics
Structured Streaming là scalable và fault-tolerant stream processing engine được xây dựng trên Spark SQL engine, cho phép Lakehouse xử lý dữ liệu streaming theo cách tương tự như batch processing.
Kiến trúc Stream Processing cho Lakehouse
┌─────────────────────────────────────────────────────────────────┐
│ Lakehouse Real-time Data Processing Architecture │
├─────────────────────────────────────────────────────────────────┤
│ Data Sources (Streaming) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│
│ │ Kafka │ │ Mobile App │ │ ATM │ │ Website ││
│ │ (Core Bus) │ │ Events │ │ Transactions│ │ Activity ││
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘│
│ ↓ │
├─────────────────────────────────────────────────────────────────┤
│ Structured Streaming Processing │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Event │ │ Aggregation │ │ ML Inference │ │
│ │ Enrichment │ │ & Windowing │ │ (Real-time) │ │
│ │ │ │ │ │ │ │
│ │ • Customer │ │ • Tumbling │ │ • Fraud │ │
│ │ Profile │ │ Windows │ │ Detection │ │
│ │ • Historical │ │ • Sliding │ │ • Credit │ │
│ │ Behavior │ │ Windows │ │ Scoring │ │
│ │ • Rules │ │ • Session │ │ • Risk │ │
│ │ Engine │ │ Windows │ │ Assessment │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ ↓ │
├─────────────────────────────────────────────────────────────────┤
│ Real-time Outputs │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Alerts │ │ Dashboards │ │ Data Lake │ │
│ │ (Kafka/SMS) │ │ (WebSocket) │ │ (MinIO) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Core Concepts của Structured Streaming
# Khái niệm cơ bản về Structured Streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
def structured_streaming_concepts():
"""
Giải thích các khái niệm cơ bản của Structured Streaming
"""
spark = SparkSession.builder \
.appName("StructuredStreaming-Concepts") \
.master("spark://spark-master:7077") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
.getOrCreate()
print("🌊 Structured Streaming Core Concepts")
print("="*50)
# 1. Input Sources
print("1. Input Sources cho Lakehouse:")
print(" 📡 Kafka Topics:")
print(" - customer-events: Hoạt động khách hàng")
print(" - lakehouse-transaction-events: Giao dịch real-time")
print(" - lakehouse-loan-events: Sự kiện vay vốn")
print(" - lakehouse-system-events: Log hệ thống")
# 2. Streaming DataFrame
print("\n2. Streaming DataFrame - Unbounded Table:")
print(" 📊 Mỗi streaming DataFrame được coi như một bảng vô hạn")
print(" 🔄 Dữ liệu mới được append liên tục")
print(" ⚡ Query engine tự động xử lý dữ liệu mới")
# 3. Output Modes
print("\n3. Output Modes:")
output_modes = {
"Append": "Chỉ output các records mới (default cho non-aggregation)",
"Complete": "Output toàn bộ result table (cho aggregation)",
"Update": "Chỉ output records có update (cho aggregation with watermark)"
}
for mode, description in output_modes.items():
print(f" • {mode}: {description}")
# 4. Triggers
print("\n4. Processing Triggers:")
triggers = {
"Default": "Xử lý micro-batch ngay khi batch trước hoàn thành",
"Fixed Interval": "Xử lý theo interval cố định (ví dụ: 30 giây)",
"One-time": "Xử lý một lần rồi dừng (useful for testing)",
"Continuous": "Low-latency processing (experimental)"
}
for trigger, description in triggers.items():
print(f" • {trigger}: {description}")
spark.stop()
# Chạy concepts demo
structured_streaming_concepts()Thiết lập Kafka Integration
# Cấu hình Kafka integration cho Structured Streaming
def setup_kafka_streaming():
"""
Thiết lập kết nối Kafka cho Structured Streaming
"""
spark = SparkSession.builder \
.appName("Kafka-Streaming") \
.master("spark://spark-master:7077") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
.getOrCreate()
# Kafka configuration
kafka_config = {
"kafka.bootstrap.servers": "kafka:9092",
"subscribe": "customer-events",
"startingOffsets": "latest",
"failOnDataLoss": "false"
}
print("🔗 Kafka Streaming Configuration:")
for key, value in kafka_config.items():
print(f" {key}: {value}")
# Đọc từ Kafka
kafka_stream = spark \
.readStream \
.format("kafka") \
.options(**kafka_config) \
.load()
print("\n📋 Kafka Stream Schema:")
kafka_stream.printSchema()
# Parse JSON data từ Kafka
json_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("amount", DoubleType(), True),
StructField("location", StringType(), True),
StructField("device_info", StringType(), True)
])
parsed_stream = kafka_stream.select(
col("key").cast("string").alias("message_key"),
from_json(col("value").cast("string"), json_schema).alias("data"),
col("timestamp").alias("kafka_timestamp"),
col("partition"),
col("offset")
).select("message_key", "data.*", "kafka_timestamp", "partition", "offset")
print("\n📊 Parsed Stream Schema:")
parsed_stream.printSchema()
return spark, parsed_stream
# Test Kafka setup
# spark, stream = setup_kafka_streaming()Real-time Analytics cho Lakehouse
Real-time Customer Activity Monitoring
def realtime_customer_monitoring():
"""
Giám sát hoạt động khách hàng real-time
"""
spark, customer_stream = setup_kafka_streaming()
print("👥 Real-time Customer Activity Monitoring")
print("="*50)
# 1. Basic stream processing - Customer activity count
activity_counts = customer_stream \
.groupBy(
window(col("timestamp"), "1 minute"),
col("customer_id"),
col("event_type")
) \
.count() \
.withColumn("window_start", col("window.start")) \
.withColumn("window_end", col("window.end")) \
.drop("window")
# 2. Suspicious activity detection
suspicious_activity = customer_stream \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("customer_id")
) \
.agg(
count("*").alias("total_events"),
countDistinct("location").alias("unique_locations"),
sum(when(col("event_type") == "LOGIN_FAILED", 1).otherwise(0)).alias("failed_logins"),
sum(when(col("event_type") == "TRANSACTION", col("amount")).otherwise(0)).alias("total_amount")
) \
.withColumn("risk_score",
when(col("failed_logins") >= 3, 100)
.when(col("unique_locations") >= 3, 80)
.when(col("total_amount") > 10000000, 70) # > 10M VND
.when(col("total_events") >= 20, 60)
.otherwise(0)
) \
.filter(col("risk_score") > 50)
# 3. Customer behavior analytics
behavior_analytics = customer_stream \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"), # Sliding window
col("customer_id")
) \
.agg(
count("*").alias("activity_count"),
countDistinct("event_type").alias("event_diversity"),
avg("amount").alias("avg_transaction_amount"),
max("amount").alias("max_transaction_amount"),
collect_list("location").alias("locations_visited")
) \
.withColumn("behavior_score",
col("activity_count") * 0.3 +
col("event_diversity") * 0.4 +
(col("avg_transaction_amount") / 1000000) * 0.3 # Normalize amount
)
# Output streams
print("📊 Setting up output streams...")
# Console output cho development
query1 = activity_counts.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime="30 seconds") \
.start()
# File output cho persistent storage
query2 = behavior_analytics.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/tmp/lakehouse-customer-behavior") \
.option("checkpointLocation", "/tmp/lakehouse-behavior-checkpoint") \
.trigger(processingTime="1 minute") \
.start()
# Kafka output cho alerts
alert_stream = suspicious_activity.select(
col("customer_id").alias("key"),
to_json(struct("*")).alias("value")
)
query3 = alert_stream.writeStream \
.outputMode("update") \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "lakehouse-customer-alerts") \
.option("checkpointLocation", "/tmp/lakehouse-alerts-checkpoint") \
.start()
return [query1, query2, query3]Real-time Fraud Detection
def realtime_fraud_detection():
"""
Hệ thống phát hiện gian lận real-time
"""
spark, transaction_stream = setup_kafka_streaming()
print("🚨 Real-time Fraud Detection System")
print("="*50)
# Load customer profile data (batch) để enrich stream
customer_profiles = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
.option("dbtable", "core.customers") \
.option("user", "lakehouse_admin") \
.option("password", "CHANGE_ME_PASSWORD") \
.load() \
.select("customer_id", "customer_code", "credit_score", "kyc_status")
# Broadcast customer profiles để join với stream
broadcast_profiles = broadcast(customer_profiles)
# Enrich stream với customer data
enriched_stream = transaction_stream \
.join(broadcast_profiles, "customer_id", "left") \
.withColumn("hour_of_day", hour(col("timestamp"))) \
.withColumn("day_of_week", dayofweek(col("timestamp")))
# Fraud detection rules
fraud_indicators = enriched_stream \
.withColumn("fraud_indicators",
array(
# Rule 1: Large transaction outside business hours
when((col("amount") > 5000000) &
((col("hour_of_day") < 6) | (col("hour_of_day") > 22)),
lit("LARGE_AMOUNT_OFF_HOURS")).otherwise(lit(None)),
# Rule 2: Multiple locations in short time
when(col("event_type") == "ATM_WITHDRAWAL",
lit("LOCATION_CHECK_NEEDED")).otherwise(lit(None)),
# Rule 3: Transaction amount vs credit score mismatch
when((col("amount") > 10000000) & (col("credit_score") < 600),
lit("AMOUNT_CREDIT_MISMATCH")).otherwise(lit(None)),
# Rule 4: Pending KYC with large transaction
when((col("kyc_status") == "PENDING") & (col("amount") > 2000000),
lit("PENDING_KYC_LARGE_AMOUNT")).otherwise(lit(None))
)
) \
.withColumn("fraud_indicators_clean",
filter(col("fraud_indicators"), lambda x: x.isNotNull())) \
.withColumn("fraud_score", size(col("fraud_indicators_clean")) * 25) \
.filter(col("fraud_score") > 0)
# Aggregate potential fraud by customer
fraud_summary = fraud_indicators \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("customer_id"),
col("customer_code")
) \
.agg(
count("*").alias("suspicious_transactions"),
sum("amount").alias("total_suspicious_amount"),
max("fraud_score").alias("max_fraud_score"),
collect_set("fraud_indicators_clean").alias("all_fraud_indicators")
) \
.withColumn("final_fraud_score",
col("max_fraud_score") +
(col("suspicious_transactions") * 10) +
when(col("total_suspicious_amount") > 50000000, 50).otherwise(0)
) \
.filter(col("final_fraud_score") >= 75) # High-confidence fraud
# Real-time ML inference (simplified)
ml_fraud_detection = enriched_stream \
.withColumn("ml_features",
array(
col("amount").cast("double"),
col("hour_of_day").cast("double"),
col("credit_score").cast("double"),
when(col("kyc_status") == "APPROVED", 1.0).otherwise(0.0)
)
) \
.withColumn("ml_fraud_probability",
# Simplified ML model (trong thực tế sẽ load trained model)
when(
(col("amount") > 10000000) &
(col("credit_score") < 650) &
((col("hour_of_day") < 6) | (col("hour_of_day") > 22)),
0.95
).when(
(col("amount") > 5000000) & (col("credit_score") < 700),
0.75
).when(
col("amount") > 20000000,
0.85
).otherwise(0.1)
) \
.filter(col("ml_fraud_probability") > 0.7)
# Output cho fraud alerts
fraud_alerts = fraud_summary.select(
col("customer_id"),
col("customer_code"),
col("final_fraud_score"),
col("total_suspicious_amount"),
col("all_fraud_indicators"),
current_timestamp().alias("alert_time")
)
# Write fraud alerts to multiple sinks
# 1. Database cho investigation team
fraud_db_query = fraud_alerts.writeStream \
.outputMode("append") \
.foreachBatch(lambda batch_df, batch_id:
batch_df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
.option("dbtable", "lakehouse_alerts.fraud_alerts") \
.option("user", "lakehouse_admin") \
.option("password", "CHANGE_ME_PASSWORD") \
.mode("append") \
.save()
) \
.option("checkpointLocation", "/tmp/lakehouse-fraud-db-checkpoint") \
.start()
# 2. Kafka cho real-time notifications
fraud_kafka_query = fraud_alerts.select(
col("customer_id").alias("key"),
to_json(struct("*")).alias("value")
).writeStream \
.outputMode("append") \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "lakehouse-fraud-alerts") \
.option("checkpointLocation", "/tmp/lakehouse-fraud-kafka-checkpoint") \
.start()
# 3. File system cho audit trail
fraud_audit_query = fraud_alerts.writeStream \
.outputMode("append") \
.format("delta") \
.option("path", "s3a://processed-data/fraud-alerts/") \
.option("checkpointLocation", "/tmp/lakehouse-fraud-audit-checkpoint") \
.trigger(processingTime="2 minutes") \
.start()
return [fraud_db_query, fraud_kafka_query, fraud_audit_query]Real-time Risk Assessment
def realtime_risk_assessment():
"""
Đánh giá rủi ro real-time cho quyết định vay vốn
"""
spark, loan_application_stream = setup_kafka_streaming()
print("⚖️ Real-time Risk Assessment System")
print("="*50)
# Load historical data cho risk scoring
historical_loans = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
.option("dbtable", """
(SELECT
customer_id,
COUNT(*) as historical_loan_count,
SUM(loan_amount) as total_historical_amount,
AVG(CASE WHEN status = 'DEFAULTED' THEN 1.0 ELSE 0.0 END) as default_rate,
MAX(loan_amount) as max_loan_amount
FROM core.loans
GROUP BY customer_id) as historical_data
""") \
.option("user", "lakehouse_admin") \
.option("password", "CHANGE_ME_PASSWORD") \
.load()
# Enrich loan applications với historical data
enriched_applications = loan_application_stream \
.join(broadcast(historical_loans), "customer_id", "left") \
.fillna({
"historical_loan_count": 0,
"total_historical_amount": 0.0,
"default_rate": 0.0,
"max_loan_amount": 0.0
})
# Real-time risk scoring
risk_scored_applications = enriched_applications \
.withColumn("debt_to_income_ratio",
col("total_historical_amount") / col("declared_income")) \
.withColumn("loan_to_max_ratio",
col("requested_amount") / greatest(col("max_loan_amount"), lit(1000000))) \
.withColumn("risk_factors",
array(
# Credit score factor (0-40 points)
when(col("credit_score") >= 750, 40)
.when(col("credit_score") >= 700, 30)
.when(col("credit_score") >= 650, 20)
.when(col("credit_score") >= 600, 10)
.otherwise(0).alias("credit_score_points"),
# Historical performance factor (0-25 points)
when(col("default_rate") == 0, 25)
.when(col("default_rate") <= 0.1, 20)
.when(col("default_rate") <= 0.2, 10)
.otherwise(0).alias("history_points"),
# Debt burden factor (0-20 points)
when(col("debt_to_income_ratio") <= 0.3, 20)
.when(col("debt_to_income_ratio") <= 0.5, 15)
.when(col("debt_to_income_ratio") <= 0.7, 10)
.otherwise(0).alias("debt_burden_points"),
# Loan size factor (0-15 points)
when(col("loan_to_max_ratio") <= 1.0, 15)
.when(col("loan_to_max_ratio") <= 1.5, 10)
.when(col("loan_to_max_ratio") <= 2.0, 5)
.otherwise(0).alias("loan_size_points")
)
) \
.withColumn("total_risk_score",
col("risk_factors")[0] + col("risk_factors")[1] +
col("risk_factors")[2] + col("risk_factors")[3]) \
.withColumn("risk_category",
when(col("total_risk_score") >= 80, "LOW_RISK")
.when(col("total_risk_score") >= 60, "MEDIUM_RISK")
.when(col("total_risk_score") >= 40, "HIGH_RISK")
.otherwise("VERY_HIGH_RISK")
) \
.withColumn("recommendation",
when(col("risk_category") == "LOW_RISK", "AUTO_APPROVE")
.when(col("risk_category") == "MEDIUM_RISK", "MANUAL_REVIEW")
.otherwise("AUTO_REJECT")
) \
.withColumn("assessment_timestamp", current_timestamp())
# Real-time decision making
auto_decisions = risk_scored_applications \
.filter(col("recommendation").isin("AUTO_APPROVE", "AUTO_REJECT"))
manual_review_cases = risk_scored_applications \
.filter(col("recommendation") == "MANUAL_REVIEW")
# Market conditions adjustment (real-time)
market_adjusted_scores = risk_scored_applications \
.withColumn("market_adjustment",
# Giả lập điều kiện thị trường real-time
when(hour(current_timestamp()).between(9, 17), 5) # Business hours bonus
.when(month(current_timestamp()).isin(11, 12), -10) # Year-end caution
.otherwise(0)
) \
.withColumn("final_risk_score",
col("total_risk_score") + col("market_adjustment")) \
.withColumn("final_recommendation",
when(col("final_risk_score") >= 85, "AUTO_APPROVE")
.when(col("final_risk_score") >= 65, "MANUAL_REVIEW")
.otherwise("AUTO_REJECT")
)
# Output processing
# 1. Auto-approved loans -> loan origination system
auto_approved_query = market_adjusted_scores \
.filter(col("final_recommendation") == "AUTO_APPROVE") \
.select(
"customer_id", "loan_application_id", "requested_amount",
"final_risk_score", "risk_category", "assessment_timestamp"
) \
.writeStream \
.outputMode("append") \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "lakehouse-auto-approved-loans") \
.option("checkpointLocation", "/tmp/lakehouse-auto-approved-checkpoint") \
.start()
# 2. Manual review cases -> review queue
manual_review_query = market_adjusted_scores \
.filter(col("final_recommendation") == "MANUAL_REVIEW") \
.writeStream \
.outputMode("append") \
.foreachBatch(lambda batch_df, batch_id:
batch_df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
.option("dbtable", "lakehouse_workflow.manual_review_queue") \
.option("user", "lakehouse_admin") \
.option("password", "CHANGE_ME_PASSWORD") \
.mode("append") \
.save()
) \
.option("checkpointLocation", "/tmp/lakehouse-manual-review-checkpoint") \
.start()
# 3. All assessments -> data lake for analysis
all_assessments_query = market_adjusted_scores \
.writeStream \
.outputMode("append") \
.format("delta") \
.option("path", "s3a://processed-data/risk-assessments/") \
.option("checkpointLocation", "/tmp/lakehouse-risk-assessments-checkpoint") \
.partitionBy("risk_category") \
.trigger(processingTime="1 minute") \
.start()
return [auto_approved_query, manual_review_query, all_assessments_query]MLlib Machine Learning
MLlib trong Lakehouse Context
MLlib cung cấp scalable machine learning library cho Lakehouse để xây dựng các mô hình:
- Credit Scoring: Đánh giá khả năng trả nợ
- Fraud Detection: Phát hiện gian lận
- Customer Segmentation: Phân khúc khách hàng
- Churn Prediction: Dự đoán khách hàng rời bỏ
- Recommendation Systems: Gợi ý sản phẩm
ML Pipeline Architecture
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import *
def lakehouse_ml_pipeline_architecture():
"""
Kiến trúc ML Pipeline cho Lakehouse
"""
print("🤖 Lakehouse ML Pipeline Architecture")
print("="*50)
architecture = {
"Data Ingestion": {
"Sources": ["PostgreSQL", "Kafka Streams", "MinIO", "External APIs"],
"Formats": ["JSON", "Parquet", "CSV", "Avro"],
"Volume": "1M+ records/day"
},
"Feature Engineering": {
"Techniques": ["StringIndexer", "VectorAssembler", "StandardScaler", "PCA"],
"Custom Features": ["Debt-to-Income", "Payment History", "Behavioral Patterns"],
"Feature Store": "Delta Lake với versioning"
},
"Model Training": {
"Algorithms": ["Random Forest", "Gradient Boosting", "Logistic Regression"],
"Validation": ["Cross-validation", "Train/Validation/Test split"],
"Hyperparameter Tuning": "Grid Search với MLlib"
},
"Model Serving": {
"Batch Inference": "Spark jobs chạy hàng ngày",
"Real-time Inference": "Structured Streaming",
"Model Registry": "MLflow tracking"
},
"Monitoring": {
"Metrics": ["Accuracy", "Precision", "Recall", "AUC-ROC"],
"Drift Detection": "Statistical tests on predictions",
"Performance": "Latency và throughput monitoring"
}
}
for stage, details in architecture.items():
print(f"\n📋 {stage}:")
for key, value in details.items():
if isinstance(value, list):
print(f" • {key}: {', '.join(value)}")
else:
print(f" • {key}: {value}")Feature Engineering Framework
def lakehouse_feature_engineering():
"""
Feature Engineering framework cho Lakehouse ML models
"""
spark = SparkSession.builder \
.appName("Lakehouse-Feature-Engineering") \
.master("spark://spark-master:7077") \
.getOrCreate()
print("⚙️ Lakehouse Feature Engineering Framework")
print("="*50)
# Load raw data
customers_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
.option("dbtable", "core.customers") \
.option("user", "lakehouse_admin") \
.option("password", "CHANGE_ME_PASSWORD") \
.load()
loans_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
.option("dbtable", "core.loans") \
.option("user", "lakehouse_admin") \
.option("password", "CHANGE_ME_PASSWORD") \
.load()
# 1. Customer Demographics Features
customer_features = customers_df \
.withColumn("age", floor(datediff(current_date(), col("date_of_birth")) / 365.25)) \
.withColumn("account_age_days", datediff(current_date(), col("created_at"))) \
.withColumn("has_email", col("email").isNotNull().cast("int")) \
.withColumn("has_phone", col("phone_number").isNotNull().cast("int")) \
.withColumn("kyc_approved", (col("kyc_status") == "APPROVED").cast("int"))
# 2. Loan History Features
loan_history = loans_df \
.groupBy("customer_id") \
.agg(
count("loan_id").alias("total_loans"),
sum("loan_amount").alias("total_borrowed"),
avg("loan_amount").alias("avg_loan_amount"),
max("loan_amount").alias("max_loan_amount"),
min("loan_amount").alias("min_loan_amount"),
avg("interest_rate").alias("avg_interest_rate"),
countDistinct("product_type").alias("product_diversity"),
sum(when(col("status") == "APPROVED", 1).otherwise(0)).alias("approved_loans"),
sum(when(col("status") == "DISBURSED", 1).otherwise(0)).alias("disbursed_loans"),
sum(when(col("status") == "DEFAULTED", 1).otherwise(0)).alias("defaulted_loans"),
max(col("application_date")).alias("last_loan_date")
) \
.withColumn("approval_rate", col("approved_loans") / col("total_loans")) \
.withColumn("disbursement_rate", col("disbursed_loans") / col("total_loans")) \
.withColumn("default_rate", col("defaulted_loans") / col("total_loans")) \
.withColumn("days_since_last_loan",
datediff(current_date(), col("last_loan_date")))
# 3. Financial Behavior Features
financial_features = loan_history \
.withColumn("debt_burden_category",
when(col("total_borrowed") <= 50000000, "LOW")
.when(col("total_borrowed") <= 200000000, "MEDIUM")
.otherwise("HIGH")
) \
.withColumn("loan_frequency_category",
when(col("total_loans") <= 2, "OCCASIONAL")
.when(col("total_loans") <= 5, "REGULAR")
.otherwise("FREQUENT")
) \
.withColumn("risk_indicators",
col("default_rate") * 50 +
(1 - col("approval_rate")) * 30 +
when(col("days_since_last_loan") < 30, 20).otherwise(0)
)
# 4. Combine all features
ml_features = customer_features \
.join(financial_features, "customer_id", "left") \
.fillna({
"total_loans": 0,
"total_borrowed": 0.0,
"avg_loan_amount": 0.0,
"max_loan_amount": 0.0,
"min_loan_amount": 0.0,
"avg_interest_rate": 0.0,
"product_diversity": 0,
"approved_loans": 0,
"disbursed_loans": 0,
"defaulted_loans": 0,
"approval_rate": 0.0,
"disbursement_rate": 0.0,
"default_rate": 0.0,
"days_since_last_loan": 9999,
"debt_burden_category": "LOW",
"loan_frequency_category": "OCCASIONAL",
"risk_indicators": 0.0
})
# 5. Feature preprocessing pipeline
# Categorical encoding
string_indexers = [
StringIndexer(inputCol="debt_burden_category", outputCol="debt_burden_idx"),
StringIndexer(inputCol="loan_frequency_category", outputCol="loan_frequency_idx")
]
# One-hot encoding
one_hot_encoders = [
OneHotEncoder(inputCol="debt_burden_idx", outputCol="debt_burden_vec"),
OneHotEncoder(inputCol="loan_frequency_idx", outputCol="loan_frequency_vec")
]
# Feature assembly
feature_cols = [
"age", "account_age_days", "has_email", "has_phone", "kyc_approved",
"credit_score", "total_loans", "total_borrowed", "avg_loan_amount",
"product_diversity", "approval_rate", "default_rate", "risk_indicators",
"debt_burden_vec", "loan_frequency_vec"
]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
# Feature scaling
scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features")
# Create preprocessing pipeline
preprocessing_pipeline = Pipeline(stages=string_indexers + one_hot_encoders + [assembler, scaler])
print("✅ Feature engineering pipeline created")
print(f"📊 Input features: {len(feature_cols)} columns")
print(f"🎯 Feature types: Demographics, Loan History, Financial Behavior")
return ml_features, preprocessing_pipeline, spark
# Run feature engineering
# features_df, pipeline, spark = lakehouse_feature_engineering()Credit Scoring Models
Credit Scoring Model Development
def develop_credit_scoring_model():
"""
Phát triển mô hình Credit Scoring cho Lakehouse
"""
# Sử dụng features từ function trước
features_df, preprocessing_pipeline, spark = lakehouse_feature_engineering()
print("💳 Credit Scoring Model Development")
print("="*50)
# Tạo target variable (credit_risk_label)
# Dựa trên default rate và credit score
labeled_data = features_df \
.withColumn("credit_risk_label",
when((col("default_rate") > 0.2) | (col("credit_score") < 600), 1.0)
.otherwise(0.0)
) \
.filter(col("total_loans") > 0) # Chỉ lấy customers có history
print(f"📊 Training dataset size: {labeled_data.count()} customers")
# Check class distribution
class_distribution = labeled_data.groupBy("credit_risk_label").count()
print("📈 Class distribution:")
class_distribution.show()
# Split data
train_data, test_data = labeled_data.randomSplit([0.8, 0.2], seed=42)
print(f"🎯 Training set: {train_data.count()} samples")
print(f"🧪 Test set: {test_data.count()} samples")
# Fit preprocessing pipeline
preprocessing_model = preprocessing_pipeline.fit(train_data)
train_processed = preprocessing_model.transform(train_data)
test_processed = preprocessing_model.transform(test_data)
# Model training với multiple algorithms
models = {}
# 1. Logistic Regression
lr = LogisticRegression(
featuresCol="scaled_features",
labelCol="credit_risk_label",
maxIter=100,
regParam=0.01
)
# 2. Random Forest
rf = RandomForestClassifier(
featuresCol="scaled_features",
labelCol="credit_risk_label",
numTrees=100,
maxDepth=10,
seed=42
)
# 3. Gradient Boosted Trees
gbt = GBTClassifier(
featuresCol="scaled_features",
labelCol="credit_risk_label",
maxIter=50,
maxDepth=8,
seed=42
)
# Train models
print("\n🚀 Training models...")
models["Logistic_Regression"] = lr.fit(train_processed)
print("✅ Logistic Regression trained")
models["Random_Forest"] = rf.fit(train_processed)
print("✅ Random Forest trained")
models["Gradient_Boosting"] = gbt.fit(train_processed)
print("✅ Gradient Boosting trained")
# Model evaluation
evaluator_binary = BinaryClassificationEvaluator(
labelCol="credit_risk_label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
evaluator_multiclass = MulticlassClassificationEvaluator(
labelCol="credit_risk_label",
predictionCol="prediction",
metricName="accuracy"
)
print("\n📊 Model Performance Evaluation:")
print("-" * 60)
print(f"{'Model':<20} {'Accuracy':<10} {'AUC-ROC':<10} {'Precision':<10} {'Recall':<10}")
print("-" * 60)
best_model = None
best_auc = 0
for name, model in models.items():
# Predictions
predictions = model.transform(test_processed)
# Metrics
accuracy = evaluator_multiclass.evaluate(predictions)
auc_roc = evaluator_binary.evaluate(predictions)
# Precision and Recall
tp = predictions.filter((col("prediction") == 1.0) & (col("credit_risk_label") == 1.0)).count()
fp = predictions.filter((col("prediction") == 1.0) & (col("credit_risk_label") == 0.0)).count()
fn = predictions.filter((col("prediction") == 0.0) & (col("credit_risk_label") == 1.0)).count()
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
print(f"{name:<20} {accuracy:<10.4f} {auc_roc:<10.4f} {precision:<10.4f} {recall:<10.4f}")
# Track best model
if auc_roc > best_auc:
best_auc = auc_roc
best_model = (name, model)
print("-" * 60)
print(f"🏆 Best model: {best_model[0]} (AUC-ROC: {best_auc:.4f})")
# Feature importance (for Random Forest)
if "Random_Forest" in models:
rf_model = models["Random_Forest"]
feature_importance = rf_model.featureImportances.toArray()
# Map importance to feature names
feature_cols = [
"age", "account_age_days", "has_email", "has_phone", "kyc_approved",
"credit_score", "total_loans", "total_borrowed", "avg_loan_amount",
"product_diversity", "approval_rate", "default_rate", "risk_indicators"
]
importance_df = spark.createDataFrame(
[(feature, float(importance)) for feature, importance in zip(feature_cols, feature_importance)],
["feature", "importance"]
).orderBy(col("importance").desc())
print("\n🎯 Top 10 Feature Importance (Random Forest):")
importance_df.show(10)
# Save best model
model_path = "/tmp/lakehouse-credit-scoring-model"
best_model[1].write().overwrite().save(model_path)
print(f"💾 Best model saved to: {model_path}")
# Create scoring function
def score_customer(customer_features):
"""Score a single customer for credit risk"""
processed_features = preprocessing_model.transform(customer_features)
prediction = best_model[1].transform(processed_features)
return prediction.select("customer_id", "prediction", "probability").collect()[0]
return best_model[1], preprocessing_model, score_customer, spark
# Run credit scoring model development
# model, preprocessor, scoring_func, spark = develop_credit_scoring_model()Real-time Credit Scoring
def realtime_credit_scoring():
"""
Real-time credit scoring cho loan applications
"""
# Load trained model
model_path = "/tmp/lakehouse-credit-scoring-model"
spark = SparkSession.builder \
.appName("Lakehouse-Realtime-CreditScoring") \
.master("spark://spark-master:7077") \
.getOrCreate()
# Load model (trong thực tế sẽ từ model registry)
from pyspark.ml.classification import RandomForestClassificationModel
# credit_model = RandomForestClassificationModel.load(model_path)
print("⚡ Real-time Credit Scoring System")
print("="*50)
# Kafka stream cho loan applications
loan_applications = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "lakehouse-loan-applications") \
.option("startingOffsets", "latest") \
.load()
# Parse application data
application_schema = StructType([
StructField("application_id", StringType(), True),
StructField("customer_id", StringType(), True),
StructField("requested_amount", DoubleType(), True),
StructField("loan_term", IntegerType(), True),
StructField("product_type", StringType(), True),
StructField("declared_income", DoubleType(), True),
StructField("application_timestamp", TimestampType(), True)
])
parsed_applications = loan_applications.select(
from_json(col("value").cast("string"), application_schema).alias("application")
).select("application.*")
# Enrich với customer data trong real-time
def enrich_and_score(batch_df, batch_id):
"""
Function để enrich và score từng batch
"""
if batch_df.count() == 0:
return
print(f"Processing batch {batch_id} with {batch_df.count()} applications")
# Load customer data cho batch
customer_ids = [row.customer_id for row in batch_df.select("customer_id").distinct().collect()]
if not customer_ids:
return
# Batch lookup customer data
customer_data = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
.option("dbtable", f"""
(SELECT * FROM core.customers
WHERE customer_id IN ({','.join([f"'{id}'" for id in customer_ids])})) as customer_subset
""") \
.option("user", "lakehouse_admin") \
.option("password", "CHANGE_ME_PASSWORD") \
.load()
# Join application với customer data
enriched_batch = batch_df.join(customer_data, "customer_id")
# Feature engineering (simplified version)
features_batch = enriched_batch \
.withColumn("age", floor(datediff(current_date(), col("date_of_birth")) / 365.25)) \
.withColumn("amount_to_income_ratio", col("requested_amount") / col("declared_income")) \
.withColumn("kyc_approved", (col("kyc_status") == "APPROVED").cast("int"))
# Score applications (simplified scoring logic)
scored_batch = features_batch \
.withColumn("credit_score_normalized", col("credit_score") / 850.0) \
.withColumn("risk_score",
(1.0 - col("credit_score_normalized")) * 0.4 +
when(col("amount_to_income_ratio") > 5, 0.3).otherwise(0.0) +
when(col("kyc_approved") == 0, 0.2).otherwise(0.0) +
when(col("age") < 25, 0.1).otherwise(0.0)
) \
.withColumn("credit_decision",
when(col("risk_score") <= 0.3, "APPROVED")
.when(col("risk_score") <= 0.6, "MANUAL_REVIEW")
.otherwise("REJECTED")
) \
.withColumn("scoring_timestamp", current_timestamp())
# Save results
decision_output = scored_batch.select(
"application_id", "customer_id", "requested_amount",
"credit_score", "risk_score", "credit_decision", "scoring_timestamp"
)
# Write to decision table
decision_output.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://postgresql:5432/lakehouse_db") \
.option("dbtable", "lakehouse_decisions.credit_decisions") \
.option("user", "lakehouse_admin") \
.option("password", "CHANGE_ME_PASSWORD") \
.mode("append") \
.save()
# Send decisions to Kafka
decision_kafka = decision_output.select(
col("application_id").alias("key"),
to_json(struct("*")).alias("value")
)
decision_kafka.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "lakehouse-credit-decisions") \
.save()
print(f"✅ Batch {batch_id} processed and saved")
# Start streaming query
query = parsed_applications.writeStream \
.foreachBatch(enrich_and_score) \
.outputMode("append") \
.option("checkpointLocation", "/tmp/lakehouse-credit-scoring-checkpoint") \
.trigger(processingTime="10 seconds") \
.start()
return query
# Start real-time credit scoring
# scoring_query = realtime_credit_scoring()Tổng kết Phần 2
✅ Kiến thức đã học
-
Structured Streaming
- Real-time stream processing với Kafka integration
- Windowing operations cho time-based analytics
- Stream enrichment và complex event processing
-
Real-time Analytics cho Lakehouse
- Customer activity monitoring
- Fraud detection system
- Risk assessment automation
-
MLlib Machine Learning
- Feature engineering framework
- Credit scoring model development
- Real-time model inference
🎯 Chuẩn bị cho Phần 3
Phần 3 sẽ tập trung vào:
- Performance Tuning và optimization
- Production Deployment strategies
- Monitoring & Troubleshooting
- Advanced Integration patterns
📝 Thực hành
- Triển khai streaming pipelines với Kafka
- Xây dựng ML models cho credit scoring
- Testing real-time inference
- Monitor stream processing performance
Phần tiếp theo: tối ưu hiệu năng và vận hành Structured Streaming trong production. 🚀
