Năng suất & công cụ dev
Hướng dẫn đào tạo Kafka Streaming — Lakehouse
Kafka trong Lakehouse: streaming layer, thiết kế topic, lab thực hành, producer/consumer, fraud detection, cấu hình production.
2026-03-1713 phút đọcVI
Mục Lục
- Giới Thiệu Tổng Quan
- Kafka trong Lakehouse Architecture
- Customer Domain Streaming
- Hands-on Labs
- Production Best Practices
- Troubleshooting
- Monitoring và Performance
Giới Thiệu Tổng Quan
Kafka trong Lakehouse Platform
Apache Kafka được triển khai như backbone của streaming layer trong Lakehouse Platform, đóng vai trò:
- Real-time Data Ingestion: Thu thập dữ liệu real-time từ các hệ thống core banking, mobile app, web
- Event-driven Architecture: Hỗ trợ kiến trúc event-driven cho các microservices
- Data Pipeline Hub: Trung tâm phân phối dữ liệu cho data lake, analytics, ML pipelines
- Domain Segregation: Tách biệt dữ liệu theo domain business (Customer, Transaction, Product, etc.)
Key Components đã Deploy
Kafka cluster:
├── Kafka Broker (kafka:9092)
├── Zookeeper (zookeeper:2181)
├── Schema Registry (schema-registry:8081)
└── Kafka UI (kafka-ui:8084)
Kafka trong Lakehouse Architecture
Architecture Overview
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Data Sources │───▶│ Kafka Cluster │───▶│ Data Consumers │
│ │ │ │ │ │
│ • Core Banking │ │ • Topics │ │ • Spark Jobs │
│ • Mobile App │ │ • Partitions │ │ • Trino Queries │
│ • Web Portal │ │ • Replication │ │ • ML Pipelines │
│ • ATM/POS │ │ • Schema Reg │ │ • Analytics │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Domain-based Topic Structure
Lakehouse sử dụng cấu trúc topic theo domain business:
{domain}-{entity}-{type}
Examples:
• customer-profiles # Customer 360 data
• customer-transactions # Customer transaction events
• customer-interactions # Customer touchpoint events
• product-applications # Loan/product applications
• risk-assessments # Risk scoring events
• marketing-campaigns # Marketing event tracking
Customer Domain Streaming
Customer Domain Topics
1. customer-profiles
Purpose: Customer 360 profile updates và thay đổi thông tin cá nhân
Schema:
{
"customer_id": "string",
"customer_uuid": "string",
"event_type": "profile_update|new_customer|status_change",
"timestamp": "timestamp",
"data": {
"personal_info": {
"full_name": "string",
"date_of_birth": "date",
"id_number": "string",
"phone": "string",
"email": "string"
},
"address": {
"street": "string",
"district": "string",
"city": "string",
"province": "string"
},
"financial_profile": {
"income_range": "string",
"employment_status": "string",
"customer_tier": "Bronze|Silver|Gold|Platinum"
}
},
"metadata": {
"source_system": "string",
"data_version": "string",
"processing_timestamp": "timestamp"
}
}Use Cases:
- Real-time customer 360 view updates
- Trigger customer tier recalculation
- Compliance và KYC updates
- Marketing segmentation updates
2. customer-transactions
Purpose: Real-time customer transaction events
Schema:
{
"transaction_id": "string",
"customer_id": "string",
"event_type": "loan_disbursement|payment|transfer|deposit",
"timestamp": "timestamp",
"data": {
"amount": "decimal",
"currency": "VND",
"transaction_type": "string",
"channel": "branch|mobile|web|atm",
"branch_id": "string",
"product_id": "string",
"status": "pending|completed|failed"
},
"metadata": {
"source_system": "core_banking",
"processing_timestamp": "timestamp"
}
}Use Cases:
- Real-time fraud detection
- Customer behavior analytics
- Risk scoring updates
- Commission calculations
3. customer-interactions
Purpose: Customer touchpoint và interaction events
Schema:
{
"interaction_id": "string",
"customer_id": "string",
"event_type": "login|page_view|call|branch_visit|application",
"timestamp": "timestamp",
"data": {
"channel": "mobile|web|call_center|branch",
"interaction_details": {
"session_id": "string",
"duration_seconds": "integer",
"pages_viewed": "array",
"actions_taken": "array"
},
"device_info": {
"device_type": "mobile|desktop|tablet",
"os": "string",
"app_version": "string"
}
}
}Use Cases:
- Digital engagement scoring
- Customer journey tracking
- Personalization engines
- Next best action recommendations
Hands-on Labs
Lab 1: Connecting to Kafka cluster
Environment Setup
# Access Lakehouse development environment
cd /f/Workstation-dev/<project-root>
# Verify Kafka cluster status
docker-compose ps kafka zookeeper
# Check cluster health
docker-compose logs kafka --tail=10Python Connection
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
import json
# Lakehouse Kafka Configuration
KAFKA_CONFIG = {
'bootstrap_servers': ['localhost:9092'],
'client_id': 'training-client',
'api_version': (2, 8, 1)
}
# Test connection
admin_client = KafkaAdminClient(**KAFKA_CONFIG)
cluster_metadata = admin_client.describe_cluster()
print(f"Cluster ID: {cluster_metadata.cluster_id}")
print(f"Controller: {cluster_metadata.controller}")Lab 2: Working with Customer Domain Topics
List Lakehouse Topics
# Tạo script: scripts/lab_kafka_topics.py
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
# List all Lakehouse topics
topics = admin_client.list_topics()
customer_topics = [topic for topic in topics if topic.startswith('customer-')]
print("Customer Domain Topics:")
for topic in customer_topics:
print(f" - {topic}")Customer Profile Producer Example
# Tạo script: scripts/lab_customer_producer.py
from kafka import KafkaProducer
import json
from datetime import datetime
import uuid
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8')
)
def send_customer_profile_update(customer_id, update_data):
"""Send customer profile update to Kafka"""
message = {
"customer_id": customer_id,
"customer_uuid": str(uuid.uuid4()),
"event_type": "profile_update",
"timestamp": datetime.now().isoformat(),
"data": update_data,
"metadata": {
"source_system": "training_lab",
"data_version": "1.0",
"processing_timestamp": datetime.now().isoformat()
}
}
# Send to topic
future = producer.send(
'customer-profiles',
key=customer_id,
value=message
)
# Wait for confirmation
record_metadata = future.get(timeout=10)
print(f"Message sent to {record_metadata.topic}[{record_metadata.partition}] at offset {record_metadata.offset}")
# Example usage
customer_update = {
"personal_info": {
"full_name": "Nguyen Van A (Updated)",
"phone": "0901234567",
"email": "nguyenvana.new@email.com"
},
"address": {
"street": "123 Nguyen Trai Street",
"district": "District 1",
"city": "Ho Chi Minh",
"province": "Ho Chi Minh"
},
"financial_profile": {
"income_range": "20-50M",
"employment_status": "employed",
"customer_tier": "Gold"
}
}
send_customer_profile_update("CUST001", customer_update)
producer.close()Customer Profile Consumer Example
# Tạo script: scripts/lab_customer_consumer.py
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'customer-profiles',
bootstrap_servers=['localhost:9092'],
group_id='training-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest'
)
print("Listening for Lakehouse Customer Profile updates...")
for message in consumer:
customer_data = message.value
customer_id = message.key
print(f"\n🔔 Customer Update Received:")
print(f" Customer ID: {customer_id}")
print(f" Event Type: {customer_data['event_type']}")
print(f" Timestamp: {customer_data['timestamp']}")
# Process customer tier changes
if 'financial_profile' in customer_data['data']:
tier = customer_data['data']['financial_profile'].get('customer_tier')
if tier:
print(f" 🏆 Customer Tier: {tier}")
# Trigger tier-based actions
if tier in ['Gold', 'Platinum']:
print(f" 💰 High-value customer detected - trigger VIP services")
# Process contact info changes
if 'personal_info' in customer_data['data']:
contact = customer_data['data']['personal_info']
if 'email' in contact or 'phone' in contact:
print(f" 📞 Contact info updated - sync with CRM systems")Lab 3: Real-time Customer Transaction Processing
Transaction Producer
# Tạo script: scripts/lab_transaction_producer.py
from kafka import KafkaProducer
import json
from datetime import datetime
import random
import time
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8')
)
def generate_transaction_event(customer_id):
"""Generate realistic Lakehouse transaction event"""
transaction_types = [
("loan_disbursement", "disbursement", [5000000, 50000000]),
("payment", "payment", [100000, 2000000]),
("transfer", "transfer", [50000, 1000000]),
("deposit", "deposit", [100000, 5000000])
]
event_type, transaction_type, amount_range = random.choice(transaction_types)
transaction = {
"transaction_id": f"TXN{int(time.time())}{random.randint(1000,9999)}",
"customer_id": customer_id,
"event_type": event_type,
"timestamp": datetime.now().isoformat(),
"data": {
"amount": random.randint(amount_range[0], amount_range[1]),
"currency": "VND",
"transaction_type": transaction_type,
"channel": random.choice(["branch", "mobile", "web", "atm"]),
"branch_id": f"BR{random.randint(100,999)}",
"product_id": f"PRD{random.randint(10,99)}",
"status": random.choice(["completed", "completed", "completed", "pending"])
},
"metadata": {
"source_system": "core_banking_simulator",
"processing_timestamp": datetime.now().isoformat()
}
}
return transaction
# Simulate continuous transactions
customer_ids = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
print("🏦 Starting Lakehouse Transaction Simulation...")
for i in range(20):
customer_id = random.choice(customer_ids)
transaction = generate_transaction_event(customer_id)
producer.send(
'customer-transactions',
key=customer_id,
value=transaction
)
print(f"💳 Transaction {i+1}: {customer_id} - {transaction['data']['amount']:,} VND")
time.sleep(2)
producer.close()
print("\n✅ Transaction simulation completed")Real-time Fraud Detection Consumer
# Tạo script: scripts/lab_fraud_detection.py
from kafka import KafkaConsumer
import json
from collections import defaultdict, deque
from datetime import datetime, timedelta
# Fraud detection rules
FRAUD_RULES = {
'max_amount_per_transaction': 100000000, # 100M VND
'max_transactions_per_hour': 10,
'max_amount_per_hour': 200000000, # 200M VND
'suspicious_channels': ['atm'], # High-risk channels
'off_hours_start': 22, # 10 PM
'off_hours_end': 6 # 6 AM
}
class FraudDetector:
def __init__(self):
self.customer_activity = defaultdict(lambda: {
'recent_transactions': deque(),
'hourly_amount': 0,
'risk_score': 0
})
def analyze_transaction(self, transaction):
"""Analyze transaction for fraud indicators"""
customer_id = transaction['customer_id']
amount = transaction['data']['amount']
channel = transaction['data']['channel']
timestamp = datetime.fromisoformat(transaction['timestamp'])
risk_factors = []
risk_score = 0
# Rule 1: High amount transaction
if amount > FRAUD_RULES['max_amount_per_transaction']:
risk_factors.append(f"High amount: {amount:,} VND")
risk_score += 30
# Rule 2: Off-hours transaction
hour = timestamp.hour
if hour >= FRAUD_RULES['off_hours_start'] or hour <= FRAUD_RULES['off_hours_end']:
risk_factors.append(f"Off-hours transaction: {hour}:00")
risk_score += 15
# Rule 3: High-risk channel
if channel in FRAUD_RULES['suspicious_channels']:
risk_factors.append(f"High-risk channel: {channel}")
risk_score += 20
# Rule 4: Frequency analysis
customer_data = self.customer_activity[customer_id]
recent_transactions = customer_data['recent_transactions']
# Clean old transactions (older than 1 hour)
cutoff_time = timestamp - timedelta(hours=1)
while recent_transactions and recent_transactions[0]['timestamp'] < cutoff_time:
old_tx = recent_transactions.popleft()
customer_data['hourly_amount'] -= old_tx['amount']
# Add current transaction
recent_transactions.append({
'timestamp': timestamp,
'amount': amount
})
customer_data['hourly_amount'] += amount
# Check frequency rules
if len(recent_transactions) > FRAUD_RULES['max_transactions_per_hour']:
risk_factors.append(f"High frequency: {len(recent_transactions)} transactions/hour")
risk_score += 25
if customer_data['hourly_amount'] > FRAUD_RULES['max_amount_per_hour']:
risk_factors.append(f"High hourly amount: {customer_data['hourly_amount']:,} VND")
risk_score += 35
return risk_score, risk_factors
# Start fraud detection consumer
consumer = KafkaConsumer(
'customer-transactions',
bootstrap_servers=['localhost:9092'],
group_id='fraud-detection',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
auto_offset_reset='earliest'
)
fraud_detector = FraudDetector()
print("🔍 Lakehouse Real-time Fraud Detection Started...")
print("⚠️ Monitoring customer transactions for suspicious activity\n")
for message in consumer:
transaction = message.value
customer_id = message.key
# Analyze for fraud
risk_score, risk_factors = fraud_detector.analyze_transaction(transaction)
# Display transaction
amount = transaction['data']['amount']
channel = transaction['data']['channel']
status = transaction['data']['status']
print(f"💳 Transaction: {customer_id} - {amount:,} VND via {channel}")
# Alert on high-risk transactions
if risk_score >= 30:
print(f"🚨 FRAUD ALERT - Risk Score: {risk_score}")
print(f" Customer: {customer_id}")
print(f" Amount: {amount:,} VND")
print(f" Risk Factors:")
for factor in risk_factors:
print(f" - {factor}")
print(" 🔄 Action: Block transaction & notify security team")
elif risk_score >= 15:
print(f"⚠️ Medium Risk - Score: {risk_score}")
print(" 🔄 Action: Additional verification required")
print("─" * 50)Lab 4: Customer Journey Tracking
Interaction Events Producer
# Tạo script: scripts/lab_customer_journey.py
from kafka import KafkaProducer
import json
from datetime import datetime
import random
import time
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8')
)
def generate_customer_journey():
"""Simulate a complete Lakehouse customer journey"""
customer_id = f"CUST{random.randint(1000,9999)}"
session_id = f"SES{int(time.time())}"
# Journey stages for Lakehouse loan application
journey_steps = [
{
"event_type": "login",
"channel": "mobile",
"details": {"login_method": "fingerprint", "app_version": "2.1.5"}
},
{
"event_type": "page_view",
"channel": "mobile",
"details": {"page": "loan_products", "duration_seconds": 45}
},
{
"event_type": "page_view",
"channel": "mobile",
"details": {"page": "loan_calculator", "duration_seconds": 120}
},
{
"event_type": "application",
"channel": "mobile",
"details": {"product_type": "personal_loan", "amount_requested": 15000000}
},
{
"event_type": "call",
"channel": "call_center",
"details": {"call_duration": 480, "purpose": "document_verification"}
},
{
"event_type": "branch_visit",
"channel": "branch",
"details": {"branch_id": "BR123", "purpose": "document_submission", "duration_minutes": 30}
}
]
print(f"👤 Starting journey for customer {customer_id}")
for i, step in enumerate(journey_steps):
interaction = {
"interaction_id": f"INT{int(time.time())}{i}",
"customer_id": customer_id,
"event_type": step["event_type"],
"timestamp": datetime.now().isoformat(),
"data": {
"channel": step["channel"],
"interaction_details": {
"session_id": session_id,
"step_number": i + 1,
"total_steps": len(journey_steps),
**step["details"]
},
"device_info": {
"device_type": "mobile" if step["channel"] == "mobile" else "other",
"os": "iOS 15.6",
"app_version": "2.1.5"
}
}
}
producer.send(
'customer-interactions',
key=customer_id,
value=interaction
)
print(f" 📱 Step {i+1}: {step['event_type']} via {step['channel']}")
time.sleep(3) # Realistic timing between steps
print(f"✅ Journey completed for {customer_id}\n")
# Generate multiple customer journeys
for _ in range(3):
generate_customer_journey()
time.sleep(5)
producer.close()Production Best Practices
1. Topic Design
Naming Convention
{domain}-{entity}-{type}
Domains: customer, product, transaction, risk, marketing, operational
Types: events, snapshots, commands, queries
Examples:
- customer-profiles-events # Profile change events
- customer-profiles-snapshots # Current state snapshots
- risk-assessments-events # Risk scoring events
- marketing-campaigns-commands # Campaign execution commands
Partitioning Strategy
# Customer domain partitioning by customer_id
def get_partition_key(customer_id):
"""Ensure related customer events go to same partition"""
return customer_id
# Transaction partitioning by customer_id để maintain order
CUSTOMER_TRANSACTION_PARTITIONS = 12 # Based on customer volume
# Risk assessment partitioning by risk_level for processing priority
RISK_PARTITIONS = {
'high': 0, # High priority partition
'medium': 1, # Medium priority
'low': 2 # Low priority
}Retention Policies
RETENTION_CONFIG = {
'customer-profiles': '30d', # 30 days for GDPR compliance
'customer-transactions': '7y', # 7 years for financial records
'customer-interactions': '90d', # 90 days for analytics
'risk-assessments': '5y', # 5 years for regulatory
'marketing-events': '1y' # 1 year for campaign analysis
}2. Producer Configuration
# Lakehouse Production Producer Configuration
PRODUCER_CONFIG = {
'bootstrap_servers': ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
'client_id': 'lakehouse-producer-v1',
'acks': 'all', # Wait for all replicas
'retries': 3, # Retry failed sends
'batch_size': 16384, # Batch size in bytes
'linger_ms': 5, # Wait for batching
'buffer_memory': 33554432, # 32MB buffer
'compression_type': 'gzip', # Compress messages
'max_in_flight_requests_per_connection': 1, # Ensure ordering
'idempotence': True, # Exactly-once semantics
'security_protocol': 'SASL_SSL', # Production security
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': 'lakehouse-producer',
'sasl_plain_password': '${KAFKA_PASSWORD}'
}3. Consumer Configuration
# Lakehouse Production Consumer Configuration
CONSUMER_CONFIG = {
'bootstrap_servers': ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
'group_id': 'analytics-processors',
'client_id': 'lakehouse-consumer-v1',
'auto_offset_reset': 'earliest',
'enable_auto_commit': False, # Manual commit for reliability
'max_poll_records': 500, # Process in batches
'max_poll_interval_ms': 300000, # 5 minutes processing time
'session_timeout_ms': 30000, # 30 seconds heartbeat
'heartbeat_interval_ms': 3000, # 3 seconds heartbeat
'fetch_min_bytes': 1024, # Minimum fetch size
'fetch_max_wait_ms': 500, # Maximum wait for fetch
'security_protocol': 'SASL_SSL',
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': 'lakehouse-consumer',
'sasl_plain_password': '${KAFKA_PASSWORD}'
}4. Error Handling và Retry Logic
class KafkaErrorHandler:
def __init__(self, dlq_topic='dead-letter-queue'):
self.dlq_topic = dlq_topic
self.retry_counts = defaultdict(int)
self.max_retries = 3
def handle_processing_error(self, message, error, producer):
"""Handle processing errors with retry logic"""
message_key = message.key
retry_count = self.retry_counts[message_key]
if retry_count < self.max_retries:
# Retry processing
self.retry_counts[message_key] += 1
print(f"Retrying message {message_key} (attempt {retry_count + 1})")
return 'retry'
else:
# Send to dead letter queue
dlq_message = {
'original_topic': message.topic,
'original_partition': message.partition,
'original_offset': message.offset,
'original_value': message.value,
'error_message': str(error),
'failed_timestamp': datetime.now().isoformat(),
'retry_count': retry_count
}
producer.send(self.dlq_topic, value=dlq_message)
print(f"Message {message_key} sent to DLQ after {retry_count} retries")
return 'dlq'Troubleshooting
Common Issues và Solutions
1. Consumer Lag Issues
# Check consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group analytics-processors --describe
# Solutions:
# - Scale consumer instances
# - Optimize processing logic
# - Increase partition count
# - Tune consumer configuration2. Connection Issues
# Test Kafka connectivity
def test_kafka_connection():
try:
admin_client = KafkaAdminClient(
bootstrap_servers=['localhost:9092'],
request_timeout_ms=5000
)
cluster_metadata = admin_client.describe_cluster()
print(f"✅ Connected to cluster: {cluster_metadata.cluster_id}")
return True
except Exception as e:
print(f"❌ Connection failed: {e}")
return False3. Schema Evolution Issues
# Handle schema changes gracefully
def safe_deserialize_customer_event(raw_message):
"""Safely deserialize with backward compatibility"""
try:
data = json.loads(raw_message)
# Handle missing fields with defaults
data.setdefault('metadata', {})
data['metadata'].setdefault('schema_version', '1.0')
# Handle deprecated fields
if 'old_field' in data:
data['new_field'] = data.pop('old_field')
return data
except json.JSONDecodeError as e:
logger.error(f"Failed to parse message: {e}")
return NoneMonitoring và Performance
1. Key Metrics để Monitor
Cluster Health Metrics
KAFKA_METRICS_TO_MONITOR = {
# Broker metrics
'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec': 'Messages/sec',
'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec': 'Bytes in/sec',
'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec': 'Bytes out/sec',
# Consumer metrics
'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*': 'Consumer lag',
'kafka.consumer:type=consumer-coordinator-metrics,client-id=*': 'Rebalance metrics',
# Producer metrics
'kafka.producer:type=producer-metrics,client-id=*': 'Producer throughput',
'kafka.producer:type=producer-topic-metrics,client-id=*,topic=*': 'Per-topic metrics'
}Lakehouse Business Metrics
# Custom metrics for Lakehouse domain
BUSINESS_METRICS = {
'customer_events_per_minute': 'Rate of customer events',
'transaction_volume_vnd': 'Transaction volume in VND',
'fraud_alerts_triggered': 'Number of fraud alerts',
'customer_journey_completion_rate': 'Journey completion percentage',
'high_value_customer_activity': 'Gold/Platinum customer activity'
}2. Monitoring Setup
# Tạo script: scripts/kafka_monitor.py
import time
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.admin import OffsetSpec
import json
class KafkaMonitor:
def __init__(self):
self.admin_client = KafkaAdminClient(
bootstrap_servers=['localhost:9092']
)
def get_topic_metrics(self, topic_name):
"""Get comprehensive topic metrics"""
try:
# Get topic metadata
metadata = self.admin_client.describe_topics([topic_name])
topic_metadata = metadata[topic_name]
# Get partition info
partitions = topic_metadata.partitions
partition_count = len(partitions)
# Get consumer group lag
consumer_groups = self.admin_client.list_consumer_groups()
metrics = {
'topic_name': topic_name,
'partition_count': partition_count,
'replication_factor': len(partitions[0].replicas) if partitions else 0,
'consumer_groups': len(consumer_groups),
'timestamp': time.time()
}
return metrics
except Exception as e:
print(f"Error getting metrics for {topic_name}: {e}")
return None
def monitor_customer_topics(self):
"""Monitor all Lakehouse topics"""
customer_topics = [
'customer-profiles',
'customer-transactions',
'customer-interactions',
'customer-segments',
'customer-events',
'customer-analytics'
]
print("🔍 Lakehouse Kafka Topics Monitoring")
print("=" * 50)
for topic in customer_topics:
metrics = self.get_topic_metrics(topic)
if metrics:
print(f"📊 {topic}:")
print(f" Partitions: {metrics['partition_count']}")
print(f" Replication: {metrics['replication_factor']}")
print(f" Consumer Groups: {metrics['consumer_groups']}")
print()
# Run monitoring
if __name__ == "__main__":
monitor = KafkaMonitor()
monitor.monitor_customer_topics()3. Performance Tuning Guidelines
Topic Configuration
# Optimal configuration for Lakehouse high-throughput topics
kafka-topics.sh --alter --zookeeper localhost:2181 \
--topic customer-transactions \
--config segment.ms=86400000 \
--config retention.ms=604800000 \
--config compression.type=gzip \
--config min.insync.replicas=2JVM Tuning for Kafka Brokers
# Kafka broker JVM settings for Lakehouse production
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"Kết Luận
Key Takeaways cho Data Engineering Team:
- Domain-centric Design: Topics được thiết kế theo Lakehouse business domains
- Real-time Processing: Kafka enables real-time customer insights và fraud detection
- Scalability: Architecture có thể scale theo growth của tổ chức
- Reliability: Production-ready configuration với error handling
- Monitoring: Comprehensive monitoring cho operational excellence
Next Steps:
- Review tài liệu này và thực hành labs
- Implement monitoring dashboard
- Setup production security configuration
- Train team trên advanced Kafka patterns
- Prepare cho integration với Spark Streaming
Tài liệu này là phần của Lakehouse Platform Training Series. Để có câu hỏi hoặc feedback, liên hệ Data Engineering Team.
