Lê Duy Khương (Daniel)

Năng suất & công cụ dev

Hướng dẫn đào tạo Kafka Streaming — Lakehouse

Kafka trong Lakehouse: streaming layer, thiết kế topic, lab thực hành, producer/consumer, fraud detection, cấu hình production.

2026-03-1713 phút đọcVI

Mục Lục

  1. Giới Thiệu Tổng Quan
  2. Kafka trong Lakehouse Architecture
  3. Customer Domain Streaming
  4. Hands-on Labs
  5. Production Best Practices
  6. Troubleshooting
  7. Monitoring và Performance

Giới Thiệu Tổng Quan

Kafka trong Lakehouse Platform

Apache Kafka được triển khai như backbone của streaming layer trong Lakehouse Platform, đóng vai trò:

  • Real-time Data Ingestion: Thu thập dữ liệu real-time từ các hệ thống core banking, mobile app, web
  • Event-driven Architecture: Hỗ trợ kiến trúc event-driven cho các microservices
  • Data Pipeline Hub: Trung tâm phân phối dữ liệu cho data lake, analytics, ML pipelines
  • Domain Segregation: Tách biệt dữ liệu theo domain business (Customer, Transaction, Product, etc.)

Key Components đã Deploy

Kafka cluster:
├── Kafka Broker (kafka:9092)
├── Zookeeper (zookeeper:2181)  
├── Schema Registry (schema-registry:8081)
└── Kafka UI (kafka-ui:8084)

Kafka trong Lakehouse Architecture

Architecture Overview

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Data Sources  │───▶│  Kafka Cluster  │───▶│  Data Consumers │
│                 │    │                 │    │                 │
│ • Core Banking  │    │ • Topics        │    │ • Spark Jobs    │
│ • Mobile App    │    │ • Partitions    │    │ • Trino Queries │
│ • Web Portal    │    │ • Replication   │    │ • ML Pipelines  │
│ • ATM/POS       │    │ • Schema Reg    │    │ • Analytics     │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Domain-based Topic Structure

Lakehouse sử dụng cấu trúc topic theo domain business:

{domain}-{entity}-{type}

Examples:
• customer-profiles        # Customer 360 data
• customer-transactions    # Customer transaction events
• customer-interactions    # Customer touchpoint events
• product-applications     # Loan/product applications
• risk-assessments         # Risk scoring events
• marketing-campaigns      # Marketing event tracking

Customer Domain Streaming

Customer Domain Topics

1. customer-profiles

Purpose: Customer 360 profile updates và thay đổi thông tin cá nhân

Schema:

{
  "customer_id": "string",
  "customer_uuid": "string", 
  "event_type": "profile_update|new_customer|status_change",
  "timestamp": "timestamp",
  "data": {
    "personal_info": {
      "full_name": "string",
      "date_of_birth": "date",
      "id_number": "string",
      "phone": "string",
      "email": "string"
    },
    "address": {
      "street": "string",
      "district": "string", 
      "city": "string",
      "province": "string"
    },
    "financial_profile": {
      "income_range": "string",
      "employment_status": "string",
      "customer_tier": "Bronze|Silver|Gold|Platinum"
    }
  },
  "metadata": {
    "source_system": "string",
    "data_version": "string",
    "processing_timestamp": "timestamp"
  }
}

Use Cases:

  • Real-time customer 360 view updates
  • Trigger customer tier recalculation
  • Compliance và KYC updates
  • Marketing segmentation updates

2. customer-transactions

Purpose: Real-time customer transaction events

Schema:

{
  "transaction_id": "string",
  "customer_id": "string",
  "event_type": "loan_disbursement|payment|transfer|deposit",
  "timestamp": "timestamp", 
  "data": {
    "amount": "decimal",
    "currency": "VND",
    "transaction_type": "string",
    "channel": "branch|mobile|web|atm",
    "branch_id": "string",
    "product_id": "string",
    "status": "pending|completed|failed"
  },
  "metadata": {
    "source_system": "core_banking",
    "processing_timestamp": "timestamp"
  }
}

Use Cases:

  • Real-time fraud detection
  • Customer behavior analytics
  • Risk scoring updates
  • Commission calculations

3. customer-interactions

Purpose: Customer touchpoint và interaction events

Schema:

{
  "interaction_id": "string",
  "customer_id": "string", 
  "event_type": "login|page_view|call|branch_visit|application",
  "timestamp": "timestamp",
  "data": {
    "channel": "mobile|web|call_center|branch",
    "interaction_details": {
      "session_id": "string",
      "duration_seconds": "integer",
      "pages_viewed": "array",
      "actions_taken": "array"
    },
    "device_info": {
      "device_type": "mobile|desktop|tablet",
      "os": "string",
      "app_version": "string"
    }
  }
}

Use Cases:

  • Digital engagement scoring
  • Customer journey tracking
  • Personalization engines
  • Next best action recommendations

Hands-on Labs

Lab 1: Connecting to Kafka cluster

Environment Setup

# Access Lakehouse development environment
cd /f/Workstation-dev/<project-root>
 
# Verify Kafka cluster status
docker-compose ps kafka zookeeper
 
# Check cluster health
docker-compose logs kafka --tail=10

Python Connection

from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import ConfigResource, ConfigResourceType
import json
 
# Lakehouse Kafka Configuration
KAFKA_CONFIG = {
    'bootstrap_servers': ['localhost:9092'],
    'client_id': 'training-client',
    'api_version': (2, 8, 1)
}
 
# Test connection
admin_client = KafkaAdminClient(**KAFKA_CONFIG)
cluster_metadata = admin_client.describe_cluster()
print(f"Cluster ID: {cluster_metadata.cluster_id}")
print(f"Controller: {cluster_metadata.controller}")

Lab 2: Working with Customer Domain Topics

List Lakehouse Topics

# Tạo script: scripts/lab_kafka_topics.py
from kafka.admin import KafkaAdminClient, NewTopic
 
admin_client = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
 
# List all Lakehouse topics
topics = admin_client.list_topics()
customer_topics = [topic for topic in topics if topic.startswith('customer-')]
 
print("Customer Domain Topics:")
for topic in customer_topics:
    print(f"  - {topic}")

Customer Profile Producer Example

# Tạo script: scripts/lab_customer_producer.py
from kafka import KafkaProducer
import json
from datetime import datetime
import uuid
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8')
)
 
def send_customer_profile_update(customer_id, update_data):
    """Send customer profile update to Kafka"""
    
    message = {
        "customer_id": customer_id,
        "customer_uuid": str(uuid.uuid4()),
        "event_type": "profile_update",
        "timestamp": datetime.now().isoformat(),
        "data": update_data,
        "metadata": {
            "source_system": "training_lab",
            "data_version": "1.0",
            "processing_timestamp": datetime.now().isoformat()
        }
    }
    
    # Send to topic
    future = producer.send(
        'customer-profiles',
        key=customer_id,
        value=message
    )
    
    # Wait for confirmation
    record_metadata = future.get(timeout=10)
    print(f"Message sent to {record_metadata.topic}[{record_metadata.partition}] at offset {record_metadata.offset}")
 
# Example usage
customer_update = {
    "personal_info": {
        "full_name": "Nguyen Van A (Updated)",
        "phone": "0901234567",
        "email": "nguyenvana.new@email.com"
    },
    "address": {
        "street": "123 Nguyen Trai Street",
        "district": "District 1", 
        "city": "Ho Chi Minh",
        "province": "Ho Chi Minh"
    },
    "financial_profile": {
        "income_range": "20-50M",
        "employment_status": "employed",
        "customer_tier": "Gold"
    }
}
 
send_customer_profile_update("CUST001", customer_update)
producer.close()

Customer Profile Consumer Example

# Tạo script: scripts/lab_customer_consumer.py
from kafka import KafkaConsumer
import json
 
consumer = KafkaConsumer(
    'customer-profiles',
    bootstrap_servers=['localhost:9092'],
    group_id='training-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    auto_offset_reset='earliest'
)
 
print("Listening for Lakehouse Customer Profile updates...")
 
for message in consumer:
    customer_data = message.value
    customer_id = message.key
    
    print(f"\n🔔 Customer Update Received:")
    print(f"   Customer ID: {customer_id}")
    print(f"   Event Type: {customer_data['event_type']}")
    print(f"   Timestamp: {customer_data['timestamp']}")
    
    # Process customer tier changes
    if 'financial_profile' in customer_data['data']:
        tier = customer_data['data']['financial_profile'].get('customer_tier')
        if tier:
            print(f"   🏆 Customer Tier: {tier}")
            
            # Trigger tier-based actions
            if tier in ['Gold', 'Platinum']:
                print(f"   💰 High-value customer detected - trigger VIP services")
    
    # Process contact info changes  
    if 'personal_info' in customer_data['data']:
        contact = customer_data['data']['personal_info']
        if 'email' in contact or 'phone' in contact:
            print(f"   📞 Contact info updated - sync with CRM systems")

Lab 3: Real-time Customer Transaction Processing

Transaction Producer

# Tạo script: scripts/lab_transaction_producer.py
from kafka import KafkaProducer
import json
from datetime import datetime
import random
import time
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8')
)
 
def generate_transaction_event(customer_id):
    """Generate realistic Lakehouse transaction event"""
    
    transaction_types = [
        ("loan_disbursement", "disbursement", [5000000, 50000000]),
        ("payment", "payment", [100000, 2000000]), 
        ("transfer", "transfer", [50000, 1000000]),
        ("deposit", "deposit", [100000, 5000000])
    ]
    
    event_type, transaction_type, amount_range = random.choice(transaction_types)
    
    transaction = {
        "transaction_id": f"TXN{int(time.time())}{random.randint(1000,9999)}",
        "customer_id": customer_id,
        "event_type": event_type,
        "timestamp": datetime.now().isoformat(),
        "data": {
            "amount": random.randint(amount_range[0], amount_range[1]),
            "currency": "VND",
            "transaction_type": transaction_type,
            "channel": random.choice(["branch", "mobile", "web", "atm"]),
            "branch_id": f"BR{random.randint(100,999)}",
            "product_id": f"PRD{random.randint(10,99)}",
            "status": random.choice(["completed", "completed", "completed", "pending"])
        },
        "metadata": {
            "source_system": "core_banking_simulator",
            "processing_timestamp": datetime.now().isoformat()
        }
    }
    
    return transaction
 
# Simulate continuous transactions
customer_ids = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
 
print("🏦 Starting Lakehouse Transaction Simulation...")
for i in range(20):
    customer_id = random.choice(customer_ids)
    transaction = generate_transaction_event(customer_id)
    
    producer.send(
        'customer-transactions',
        key=customer_id,
        value=transaction
    )
    
    print(f"💳 Transaction {i+1}: {customer_id} - {transaction['data']['amount']:,} VND")
    time.sleep(2)
 
producer.close()
print("\n✅ Transaction simulation completed")

Real-time Fraud Detection Consumer

# Tạo script: scripts/lab_fraud_detection.py
from kafka import KafkaConsumer
import json
from collections import defaultdict, deque
from datetime import datetime, timedelta
 
# Fraud detection rules
FRAUD_RULES = {
    'max_amount_per_transaction': 100000000,  # 100M VND
    'max_transactions_per_hour': 10,
    'max_amount_per_hour': 200000000,  # 200M VND
    'suspicious_channels': ['atm'],  # High-risk channels
    'off_hours_start': 22,  # 10 PM
    'off_hours_end': 6     # 6 AM
}
 
class FraudDetector:
    def __init__(self):
        self.customer_activity = defaultdict(lambda: {
            'recent_transactions': deque(),
            'hourly_amount': 0,
            'risk_score': 0
        })
    
    def analyze_transaction(self, transaction):
        """Analyze transaction for fraud indicators"""
        customer_id = transaction['customer_id']
        amount = transaction['data']['amount']
        channel = transaction['data']['channel']
        timestamp = datetime.fromisoformat(transaction['timestamp'])
        
        risk_factors = []
        risk_score = 0
        
        # Rule 1: High amount transaction
        if amount > FRAUD_RULES['max_amount_per_transaction']:
            risk_factors.append(f"High amount: {amount:,} VND")
            risk_score += 30
        
        # Rule 2: Off-hours transaction
        hour = timestamp.hour
        if hour >= FRAUD_RULES['off_hours_start'] or hour <= FRAUD_RULES['off_hours_end']:
            risk_factors.append(f"Off-hours transaction: {hour}:00")
            risk_score += 15
        
        # Rule 3: High-risk channel
        if channel in FRAUD_RULES['suspicious_channels']:
            risk_factors.append(f"High-risk channel: {channel}")
            risk_score += 20
        
        # Rule 4: Frequency analysis
        customer_data = self.customer_activity[customer_id]
        recent_transactions = customer_data['recent_transactions']
        
        # Clean old transactions (older than 1 hour)
        cutoff_time = timestamp - timedelta(hours=1)
        while recent_transactions and recent_transactions[0]['timestamp'] < cutoff_time:
            old_tx = recent_transactions.popleft()
            customer_data['hourly_amount'] -= old_tx['amount']
        
        # Add current transaction
        recent_transactions.append({
            'timestamp': timestamp,
            'amount': amount
        })
        customer_data['hourly_amount'] += amount
        
        # Check frequency rules
        if len(recent_transactions) > FRAUD_RULES['max_transactions_per_hour']:
            risk_factors.append(f"High frequency: {len(recent_transactions)} transactions/hour")
            risk_score += 25
        
        if customer_data['hourly_amount'] > FRAUD_RULES['max_amount_per_hour']:
            risk_factors.append(f"High hourly amount: {customer_data['hourly_amount']:,} VND")
            risk_score += 35
        
        return risk_score, risk_factors
 
# Start fraud detection consumer
consumer = KafkaConsumer(
    'customer-transactions',
    bootstrap_servers=['localhost:9092'],
    group_id='fraud-detection',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    auto_offset_reset='earliest'
)
 
fraud_detector = FraudDetector()
 
print("🔍 Lakehouse Real-time Fraud Detection Started...")
print("⚠️  Monitoring customer transactions for suspicious activity\n")
 
for message in consumer:
    transaction = message.value
    customer_id = message.key
    
    # Analyze for fraud
    risk_score, risk_factors = fraud_detector.analyze_transaction(transaction)
    
    # Display transaction
    amount = transaction['data']['amount']
    channel = transaction['data']['channel']
    status = transaction['data']['status']
    
    print(f"💳 Transaction: {customer_id} - {amount:,} VND via {channel}")
    
    # Alert on high-risk transactions
    if risk_score >= 30:
        print(f"🚨 FRAUD ALERT - Risk Score: {risk_score}")
        print(f"   Customer: {customer_id}")
        print(f"   Amount: {amount:,} VND")
        print(f"   Risk Factors:")
        for factor in risk_factors:
            print(f"   - {factor}")
        print("   🔄 Action: Block transaction & notify security team")
    elif risk_score >= 15:
        print(f"⚠️  Medium Risk - Score: {risk_score}")
        print("   🔄 Action: Additional verification required")
    
    print("─" * 50)

Lab 4: Customer Journey Tracking

Interaction Events Producer

# Tạo script: scripts/lab_customer_journey.py
from kafka import KafkaProducer
import json
from datetime import datetime
import random
import time
 
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8')
)
 
def generate_customer_journey():
    """Simulate a complete Lakehouse customer journey"""
    
    customer_id = f"CUST{random.randint(1000,9999)}"
    session_id = f"SES{int(time.time())}"
    
    # Journey stages for Lakehouse loan application
    journey_steps = [
        {
            "event_type": "login",
            "channel": "mobile",
            "details": {"login_method": "fingerprint", "app_version": "2.1.5"}
        },
        {
            "event_type": "page_view", 
            "channel": "mobile",
            "details": {"page": "loan_products", "duration_seconds": 45}
        },
        {
            "event_type": "page_view",
            "channel": "mobile", 
            "details": {"page": "loan_calculator", "duration_seconds": 120}
        },
        {
            "event_type": "application",
            "channel": "mobile",
            "details": {"product_type": "personal_loan", "amount_requested": 15000000}
        },
        {
            "event_type": "call",
            "channel": "call_center",
            "details": {"call_duration": 480, "purpose": "document_verification"}
        },
        {
            "event_type": "branch_visit",
            "channel": "branch",
            "details": {"branch_id": "BR123", "purpose": "document_submission", "duration_minutes": 30}
        }
    ]
    
    print(f"👤 Starting journey for customer {customer_id}")
    
    for i, step in enumerate(journey_steps):
        interaction = {
            "interaction_id": f"INT{int(time.time())}{i}",
            "customer_id": customer_id,
            "event_type": step["event_type"],
            "timestamp": datetime.now().isoformat(),
            "data": {
                "channel": step["channel"],
                "interaction_details": {
                    "session_id": session_id,
                    "step_number": i + 1,
                    "total_steps": len(journey_steps),
                    **step["details"]
                },
                "device_info": {
                    "device_type": "mobile" if step["channel"] == "mobile" else "other",
                    "os": "iOS 15.6",
                    "app_version": "2.1.5"
                }
            }
        }
        
        producer.send(
            'customer-interactions',
            key=customer_id,
            value=interaction
        )
        
        print(f"   📱 Step {i+1}: {step['event_type']} via {step['channel']}")
        time.sleep(3)  # Realistic timing between steps
    
    print(f"✅ Journey completed for {customer_id}\n")
 
# Generate multiple customer journeys
for _ in range(3):
    generate_customer_journey()
    time.sleep(5)
 
producer.close()

Production Best Practices

1. Topic Design

Naming Convention

{domain}-{entity}-{type}

Domains: customer, product, transaction, risk, marketing, operational
Types: events, snapshots, commands, queries

Examples:
- customer-profiles-events      # Profile change events
- customer-profiles-snapshots   # Current state snapshots  
- risk-assessments-events       # Risk scoring events
- marketing-campaigns-commands  # Campaign execution commands

Partitioning Strategy

# Customer domain partitioning by customer_id
def get_partition_key(customer_id):
    """Ensure related customer events go to same partition"""
    return customer_id
 
# Transaction partitioning by customer_id để maintain order
CUSTOMER_TRANSACTION_PARTITIONS = 12  # Based on customer volume
 
# Risk assessment partitioning by risk_level for processing priority
RISK_PARTITIONS = {
    'high': 0,     # High priority partition
    'medium': 1,   # Medium priority  
    'low': 2       # Low priority
}

Retention Policies

RETENTION_CONFIG = {
    'customer-profiles': '30d',        # 30 days for GDPR compliance
    'customer-transactions': '7y',     # 7 years for financial records
    'customer-interactions': '90d',    # 90 days for analytics
    'risk-assessments': '5y',          # 5 years for regulatory
    'marketing-events': '1y'           # 1 year for campaign analysis
}

2. Producer Configuration

# Lakehouse Production Producer Configuration
PRODUCER_CONFIG = {
    'bootstrap_servers': ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
    'client_id': 'lakehouse-producer-v1',
    'acks': 'all',                    # Wait for all replicas
    'retries': 3,                     # Retry failed sends
    'batch_size': 16384,              # Batch size in bytes
    'linger_ms': 5,                   # Wait for batching
    'buffer_memory': 33554432,        # 32MB buffer
    'compression_type': 'gzip',       # Compress messages
    'max_in_flight_requests_per_connection': 1,  # Ensure ordering
    'idempotence': True,              # Exactly-once semantics
    'security_protocol': 'SASL_SSL',  # Production security
    'sasl_mechanism': 'PLAIN',
    'sasl_plain_username': 'lakehouse-producer',
    'sasl_plain_password': '${KAFKA_PASSWORD}'
}

3. Consumer Configuration

# Lakehouse Production Consumer Configuration  
CONSUMER_CONFIG = {
    'bootstrap_servers': ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],
    'group_id': 'analytics-processors',
    'client_id': 'lakehouse-consumer-v1',
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': False,      # Manual commit for reliability
    'max_poll_records': 500,          # Process in batches
    'max_poll_interval_ms': 300000,   # 5 minutes processing time
    'session_timeout_ms': 30000,      # 30 seconds heartbeat
    'heartbeat_interval_ms': 3000,    # 3 seconds heartbeat
    'fetch_min_bytes': 1024,          # Minimum fetch size
    'fetch_max_wait_ms': 500,         # Maximum wait for fetch
    'security_protocol': 'SASL_SSL',
    'sasl_mechanism': 'PLAIN',
    'sasl_plain_username': 'lakehouse-consumer',
    'sasl_plain_password': '${KAFKA_PASSWORD}'
}

4. Error Handling và Retry Logic

class KafkaErrorHandler:
    def __init__(self, dlq_topic='dead-letter-queue'):
        self.dlq_topic = dlq_topic
        self.retry_counts = defaultdict(int)
        self.max_retries = 3
    
    def handle_processing_error(self, message, error, producer):
        """Handle processing errors with retry logic"""
        message_key = message.key
        retry_count = self.retry_counts[message_key]
        
        if retry_count < self.max_retries:
            # Retry processing
            self.retry_counts[message_key] += 1
            print(f"Retrying message {message_key} (attempt {retry_count + 1})")
            return 'retry'
        else:
            # Send to dead letter queue
            dlq_message = {
                'original_topic': message.topic,
                'original_partition': message.partition,
                'original_offset': message.offset,
                'original_value': message.value,
                'error_message': str(error),
                'failed_timestamp': datetime.now().isoformat(),
                'retry_count': retry_count
            }
            
            producer.send(self.dlq_topic, value=dlq_message)
            print(f"Message {message_key} sent to DLQ after {retry_count} retries")
            return 'dlq'

Troubleshooting

Common Issues và Solutions

1. Consumer Lag Issues

# Check consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group analytics-processors --describe
 
# Solutions:
# - Scale consumer instances
# - Optimize processing logic  
# - Increase partition count
# - Tune consumer configuration

2. Connection Issues

# Test Kafka connectivity
def test_kafka_connection():
    try:
        admin_client = KafkaAdminClient(
            bootstrap_servers=['localhost:9092'],
            request_timeout_ms=5000
        )
        cluster_metadata = admin_client.describe_cluster()
        print(f"✅ Connected to cluster: {cluster_metadata.cluster_id}")
        return True
    except Exception as e:
        print(f"❌ Connection failed: {e}")
        return False

3. Schema Evolution Issues

# Handle schema changes gracefully
def safe_deserialize_customer_event(raw_message):
    """Safely deserialize with backward compatibility"""
    try:
        data = json.loads(raw_message)
        
        # Handle missing fields with defaults
        data.setdefault('metadata', {})
        data['metadata'].setdefault('schema_version', '1.0')
        
        # Handle deprecated fields
        if 'old_field' in data:
            data['new_field'] = data.pop('old_field')
        
        return data
    except json.JSONDecodeError as e:
        logger.error(f"Failed to parse message: {e}")
        return None

Monitoring và Performance

1. Key Metrics để Monitor

Cluster Health Metrics

KAFKA_METRICS_TO_MONITOR = {
    # Broker metrics
    'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec': 'Messages/sec',
    'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec': 'Bytes in/sec',
    'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec': 'Bytes out/sec',
    
    # Consumer metrics  
    'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*': 'Consumer lag',
    'kafka.consumer:type=consumer-coordinator-metrics,client-id=*': 'Rebalance metrics',
    
    # Producer metrics
    'kafka.producer:type=producer-metrics,client-id=*': 'Producer throughput',
    'kafka.producer:type=producer-topic-metrics,client-id=*,topic=*': 'Per-topic metrics'
}

Lakehouse Business Metrics

# Custom metrics for Lakehouse domain
BUSINESS_METRICS = {
    'customer_events_per_minute': 'Rate of customer events',
    'transaction_volume_vnd': 'Transaction volume in VND',
    'fraud_alerts_triggered': 'Number of fraud alerts',
    'customer_journey_completion_rate': 'Journey completion percentage',
    'high_value_customer_activity': 'Gold/Platinum customer activity'
}

2. Monitoring Setup

# Tạo script: scripts/kafka_monitor.py
import time
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.admin import OffsetSpec
import json
 
class KafkaMonitor:
    def __init__(self):
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=['localhost:9092']
        )
        
    def get_topic_metrics(self, topic_name):
        """Get comprehensive topic metrics"""
        try:
            # Get topic metadata
            metadata = self.admin_client.describe_topics([topic_name])
            topic_metadata = metadata[topic_name]
            
            # Get partition info
            partitions = topic_metadata.partitions
            partition_count = len(partitions)
            
            # Get consumer group lag
            consumer_groups = self.admin_client.list_consumer_groups()
            
            metrics = {
                'topic_name': topic_name,
                'partition_count': partition_count,
                'replication_factor': len(partitions[0].replicas) if partitions else 0,
                'consumer_groups': len(consumer_groups),
                'timestamp': time.time()
            }
            
            return metrics
            
        except Exception as e:
            print(f"Error getting metrics for {topic_name}: {e}")
            return None
    
    def monitor_customer_topics(self):
        """Monitor all Lakehouse topics"""
        customer_topics = [
            'customer-profiles',
            'customer-transactions', 
            'customer-interactions',
            'customer-segments',
            'customer-events',
            'customer-analytics'
        ]
        
        print("🔍 Lakehouse Kafka Topics Monitoring")
        print("=" * 50)
        
        for topic in customer_topics:
            metrics = self.get_topic_metrics(topic)
            if metrics:
                print(f"📊 {topic}:")
                print(f"   Partitions: {metrics['partition_count']}")
                print(f"   Replication: {metrics['replication_factor']}")
                print(f"   Consumer Groups: {metrics['consumer_groups']}")
                print()
 
# Run monitoring
if __name__ == "__main__":
    monitor = KafkaMonitor()
    monitor.monitor_customer_topics()

3. Performance Tuning Guidelines

Topic Configuration

# Optimal configuration for Lakehouse high-throughput topics
kafka-topics.sh --alter --zookeeper localhost:2181 \
  --topic customer-transactions \
  --config segment.ms=86400000 \
  --config retention.ms=604800000 \
  --config compression.type=gzip \
  --config min.insync.replicas=2

JVM Tuning for Kafka Brokers

# Kafka broker JVM settings for Lakehouse production
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"

Kết Luận

Key Takeaways cho Data Engineering Team:

  1. Domain-centric Design: Topics được thiết kế theo Lakehouse business domains
  2. Real-time Processing: Kafka enables real-time customer insights và fraud detection
  3. Scalability: Architecture có thể scale theo growth của tổ chức
  4. Reliability: Production-ready configuration với error handling
  5. Monitoring: Comprehensive monitoring cho operational excellence

Next Steps:

  1. Review tài liệu này và thực hành labs
  2. Implement monitoring dashboard
  3. Setup production security configuration
  4. Train team trên advanced Kafka patterns
  5. Prepare cho integration với Spark Streaming

Tài liệu này là phần của Lakehouse Platform Training Series. Để có câu hỏi hoặc feedback, liên hệ Data Engineering Team.

LDK

Le Duy Khuong

AI Transformation & Digital Strategy. Writing about agentic systems, engineering leadership, and building in public.