Lê Duy Khương (Daniel)

Năng suất & công cụ dev

Hướng dẫn Trino

Trino trong Lakehouse: unified SQL, catalogs, labs.

2026-03-1724 phút đọcVI

Mục Lục

  1. Giới Thiệu Tổng Quan
  2. Trino trong Lakehouse Architecture
  3. Data Catalogs và Schemas
  4. Hands-on Labs
  5. Advanced Query Patterns
  6. Performance Optimization
  7. Security và Access Control
  8. Production Best Practices

Giới Thiệu Tổng Quan

Trino trong Lakehouse Platform

Trino (formerly PrestoSQL) được triển khai như distributed SQL query engine trong Lakehouse Platform, đóng vai trò:

  • Unified Query Interface: Single point để query data từ multiple sources
  • Cross-catalog Analytics: Join data từ PostgreSQL, MinIO, và external systems
  • Real-time Analytics: Low-latency queries cho business intelligence
  • Data Federation: Federated queries across heterogeneous data sources
  • Scalable Processing: Distributed processing cho large datasets

Key Benefits cho Lakehouse

🏦 Lakehouse Business Value:
├── Real-time Customer 360 queries
├── Cross-system analytics (Core Banking + Digital)
├── Regulatory reporting với SQL familiar interface
├── Ad-hoc analysis cho business users
└── Cost-effective alternative to traditional DWH

Architecture Components

Lakehouse Trino Deployment:
├── Trino Coordinator (lakehouse-trino:8085)
├── Query Processing Engine
├── Multiple Data Catalogs:
│   ├── PostgreSQL Catalog (Lakehouse metadata store)
│   ├── MinIO Catalog (Data Lake storage) 
│   ├── System Catalog (Trino internals)
│   └── Memory Catalog (Testing)
└── REST API Interface

Trino trong Lakehouse Architecture

Overall Data Flow

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Data Sources  │───▶│  Trino Engine   │───▶│   Analytics     │
│                 │    │                 │    │                 │
│ • PostgreSQL    │    │ • SQL Interface │    │ • Dashboards    │
│ • MinIO S3      │    │ • Query Planner │    │ • Reports       │
│ • Kafka Streams │    │ • Execution     │    │ • ML Pipelines  │
│ • External APIs │    │ • Federation    │    │ • API Responses │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Query Processing Architecture

Lakehouse Trino Query Execution:

1. Query Submission
   ├── REST API (http://localhost:8085)
   ├── Authentication & Authorization
   └── Query Parsing & Validation

2. Query Planning  
   ├── Catalog Metadata Discovery
   ├── Cost-based Optimization
   ├── Cross-catalog Join Planning
   └── Execution Plan Generation

3. Distributed Execution
   ├── Task Distribution
   ├── Data Source Connectors
   ├── In-memory Processing
   └── Result Aggregation

4. Result Delivery
   ├── Streaming Results
   ├── Pagination Support
   └── Multiple Output Formats

Lakehouse Data Federation Model

-- Lakehouse Federated Query Example
SELECT 
    c.customer_id,
    c.full_name,
    c.customer_tier,
    t.total_transactions,
    t.total_amount,
    s.current_balance
FROM postgresql.customer_domain.customers c
JOIN postgresql.transaction_domain.customer_summary t 
    ON c.customer_id = t.customer_id
JOIN minio.datalake.customer_snapshots s
    ON c.customer_id = s.customer_id
WHERE c.customer_tier IN ('Gold', 'Platinum')
    AND t.total_amount > 100000000  -- 100M VND

Data Catalogs và Schemas

Available Catalogs

1. PostgreSQL Catalog

Purpose: Lakehouse operational data và metadata store

Connection: postgresql.{schema}.{table}

Schemas:

-- Customer Domain Schemas
postgresql.customer_domain.customers
postgresql.customer_domain.customer_segments  
postgresql.customer_domain.customer_analytics
 
-- Transaction Domain
postgresql.transaction_domain.transactions
postgresql.transaction_domain.payments
postgresql.transaction_domain.transfers
 
-- Product Domain  
postgresql.product_domain.loan_products
postgresql.product_domain.applications
postgresql.product_domain.approvals
 
-- Risk Domain
postgresql.risk_domain.credit_scores
postgresql.risk_domain.risk_assessments
postgresql.risk_domain.fraud_alerts
 
-- Analytics Schemas
postgresql.analytics_domain.customer_360
postgresql.analytics_domain.transaction_summary
postgresql.analytics_domain.risk_metrics

2. MinIO Catalog (S3-compatible)

Purpose: Lakehouse data lake storage for large datasets

Connection: minio.{bucket}.{path}

Structure:

-- Data Lake Structure
minio.datalake.customer_profiles/year=2025/month=06/
minio.datalake.transaction_logs/year=2025/month=06/day=26/
minio.datalake.risk_models/version=v2.1/
minio.datalake.ml_features/customer_segments/
 
-- Backup & Archive
minio.backup.postgresql_dumps/date=2025-06-26/
minio.archive.historical_data/year=2024/
 
-- ML Model Storage
minio.models.risk_scoring/model_v1.2.pkl
minio.models.customer_segmentation/lgb_model.pkl

3. System Catalog

Purpose: Trino metadata và system information

-- Query monitoring
system.runtime.queries
system.runtime.tasks
system.runtime.nodes
 
-- Catalog information  
system.metadata.catalogs
system.metadata.schemas
system.metadata.tables

4. Memory Catalog

Purpose: Testing và temporary data

-- Temporary calculations
memory.default.temp_calculations
memory.default.query_results

Hands-on Labs

Lab 1: Connecting to Lakehouse Trino

Basic Connection Test

# Tạo script: scripts/lab_trino_connection.py
import requests
import json
import time
 
class TrinoClient:
    def __init__(self, host='localhost', port=8085):
        self.base_url = f"http://{host}:{port}"
        self.session = requests.Session()
        self.session.headers.update({
            'X-Trino-User': 'lakehouse_analyst',
            'X-Trino-Source': 'lakehouse_training_lab',
            'X-Trino-Catalog': 'postgresql',
            'X-Trino-Schema': 'customer_domain'
        })
    
    def execute_query(self, sql):
        """Execute SQL query on Lakehouse Trino"""
        try:
            # Submit query
            response = self.session.post(
                f"{self.base_url}/v1/statement",
                data=sql,
                headers={'Content-Type': 'text/plain'}
            )
            
            if response.status_code != 200:
                print(f"❌ Query submission failed: {response.status_code}")
                return None
            
            # Poll for results
            query_info = response.json()
            next_uri = query_info.get('nextUri')
            
            all_data = []
            columns = None
            
            while next_uri:
                response = self.session.get(next_uri)
                if response.status_code != 200:
                    break
                    
                result = response.json()
                
                if 'columns' in result and columns is None:
                    columns = result['columns']
                
                if 'data' in result and result['data']:
                    all_data.extend(result['data'])
                
                state = result.get('stats', {}).get('state')
                if state == 'FINISHED':
                    return {
                        'columns': columns,
                        'data': all_data,
                        'stats': result.get('stats', {})
                    }
                elif state == 'FAILED':
                    error = result.get('error', {})
                    print(f"❌ Query failed: {error.get('message', 'Unknown error')}")
                    return None
                
                next_uri = result.get('nextUri')
                time.sleep(0.1)
            
            return {'columns': columns, 'data': all_data}
            
        except Exception as e:
            print(f"❌ Error executing query: {e}")
            return None
    
    def show_catalogs(self):
        """Show all available catalogs"""
        result = self.execute_query("SHOW CATALOGS")
        if result and result['data']:
            print("📊 Available Data Catalogs:")
            for row in result['data']:
                catalog = row[0]
                print(f"   • {catalog}")
                
                # Show schemas for main catalogs
                if catalog in ['postgresql', 'minio']:
                    schemas_result = self.execute_query(f"SHOW SCHEMAS FROM {catalog}")
                    if schemas_result and schemas_result['data']:
                        print(f"     Schemas ({len(schemas_result['data'])}):")
                        for schema_row in schemas_result['data'][:5]:  # Show first 5
                            print(f"       - {schema_row[0]}")
                        if len(schemas_result['data']) > 5:
                            print(f"       - ... and {len(schemas_result['data']) - 5} more")
                    print()
        return result
 
# Test connection
client = TrinoClient()
print("🔗 Testing Lakehouse Trino Connection...")
print("=" * 50)
 
# Test basic connectivity
info_response = requests.get("http://localhost:8085/v1/info")
if info_response.status_code == 200:
    info = info_response.json()
    print(f"✅ Trino Coordinator: {info.get('coordinator', False)}")
    print(f"✅ Version: {info.get('nodeVersion', {}).get('version', 'Unknown')}")
    print(f"✅ Environment: {info.get('environment', 'Unknown')}")
else:
    print("❌ Failed to connect to Trino")
    exit(1)
 
# Show catalogs
print("\n" + "=" * 50)
client.show_catalogs()

Verify Lakehouse Schemas

# Tạo script: scripts/lab_lakehouse_schemas.py
from lab_trino_connection import TrinoClient
 
client = TrinoClient()
 
def explore_lakehouse_data_structure():
    """Explore Lakehouse data structure in detail"""
    
    print("🏦 Lakehouse Data Structure")
    print("=" * 60)
    
    # PostgreSQL schemas exploration
    print("\n📊 PostgreSQL Catalog (Operational Data):")
    postgres_schemas = client.execute_query("SHOW SCHEMAS FROM postgresql")
    
    if postgres_schemas and postgres_schemas['data']:
        lakehouse_schemas = [schema[0] for schema in postgres_schemas['data'] 
                      if 'domain' in schema[0] or schema[0] in ['public', 'analytics']]
        
        for schema in lakehouse_schemas:
            print(f"\n   📁 Schema: {schema}")
            
            # Show tables in each domain schema
            tables_query = f"SHOW TABLES FROM postgresql.{schema}"
            tables_result = client.execute_query(tables_query)
            
            if tables_result and tables_result['data']:
                for table_row in tables_result['data']:
                    table_name = table_row[0]
                    print(f"      📋 Table: {table_name}")
                    
                    # Show sample data for key tables
                    if any(keyword in table_name.lower() 
                          for keyword in ['customer', 'transaction', 'profile']):
                        sample_query = f"SELECT COUNT(*) as record_count FROM postgresql.{schema}.{table_name}"
                        count_result = client.execute_query(sample_query)
                        if count_result and count_result['data']:
                            count = count_result['data'][0][0]
                            print(f"         📊 Records: {count:,}")
 
# Run exploration
explore_lakehouse_data_structure()

Lab 2: Lakehouse Customer Analytics Queries

Customer 360 Queries

# Tạo script: scripts/lab_customer_360.py
from lab_trino_connection import TrinoClient
 
client = TrinoClient()
 
def lakehouse_customer_360_analytics():
    """Lakehouse Customer 360 analytics using Trino"""
    
    print("👥 Lakehouse Customer 360 Analytics")
    print("=" * 50)
    
    # Query 1: Customer Tier Distribution
    print("\n📊 Customer Tier Distribution:")
    tier_query = """
    SELECT 
        customer_tier,
        COUNT(*) as customer_count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
    FROM postgresql.public.customers_test
    GROUP BY customer_tier
    ORDER BY customer_count DESC
    """
    
    result = client.execute_query(tier_query)
    if result and result['data']:
        print("   Tier          | Count | Percentage")
        print("   --------------|-------|----------")
        for row in result['data']:
            tier, count, pct = row
            print(f"   {tier:<13} | {count:>5} | {pct:>7}%")
    
    # Query 2: High Value Customers
    print("\n💰 High Value Customers (>15M VND):")
    high_value_query = """
    SELECT 
        customer_id,
        full_name,
        customer_tier,
        total_balance,
        city
    FROM postgresql.public.customers_test
    WHERE total_balance > 15000000
    ORDER BY total_balance DESC
    """
    
    result = client.execute_query(high_value_query)
    if result and result['data']:
        print("   ID     | Name              | Tier      | Balance      | City")
        print("   -------|-------------------|-----------|--------------|----------")
        for row in result['data']:
            cust_id, name, tier, balance, city = row
            balance_formatted = f"{float(balance):,.0f}"
            print(f"   {cust_id} | {name:<17} | {tier:<9} | {balance_formatted:>12} | {city}")
    
    # Query 3: City-wise Customer Analytics
    print("\n🏙️ City-wise Customer Analytics:")
    city_query = """
    SELECT 
        city,
        COUNT(*) as customers,
        AVG(total_balance) as avg_balance,
        MAX(total_balance) as max_balance,
        COUNT(CASE WHEN customer_tier IN ('Gold', 'Platinum') THEN 1 END) as premium_customers
    FROM postgresql.public.customers_test
    GROUP BY city
    ORDER BY avg_balance DESC
    """
    
    result = client.execute_query(city_query)
    if result and result['data']:
        print("   City         | Customers | Avg Balance  | Max Balance  | Premium")
        print("   -------------|-----------|--------------|--------------|--------")
        for row in result['data']:
            city, customers, avg_bal, max_bal, premium = row
            avg_formatted = f"{float(avg_bal):,.0f}"
            max_formatted = f"{float(max_bal):,.0f}"
            print(f"   {city:<12} | {customers:>9} | {avg_formatted:>12} | {max_formatted:>12} | {premium:>7}")
 
# Run analytics
lakehouse_customer_360_analytics()

Risk Analytics Queries

# Tạo script: scripts/lab_risk_analytics.py
from lab_trino_connection import TrinoClient
 
client = TrinoClient()
 
def lakehouse_risk_analytics():
    """Lakehouse Risk analytics and credit scoring"""
    
    print("⚠️ Lakehouse Risk Analytics")
    print("=" * 50)
    
    # Query 1: Credit Score Distribution
    print("\n📈 Credit Score Distribution:")
    credit_query = """
    SELECT 
        CASE 
            WHEN credit_score >= 800 THEN 'Excellent (800+)'
            WHEN credit_score >= 740 THEN 'Very Good (740-799)'
            WHEN credit_score >= 670 THEN 'Good (670-739)'
            WHEN credit_score >= 580 THEN 'Fair (580-669)'
            ELSE 'Poor (<580)'
        END as credit_category,
        COUNT(*) as customers,
        AVG(total_balance) as avg_balance,
        AVG(credit_score) as avg_credit_score
    FROM postgresql.public.customers_test
    GROUP BY 
        CASE 
            WHEN credit_score >= 800 THEN 'Excellent (800+)'
            WHEN credit_score >= 740 THEN 'Very Good (740-799)'
            WHEN credit_score >= 670 THEN 'Good (670-739)'
            WHEN credit_score >= 580 THEN 'Fair (580-669)'
            ELSE 'Poor (<580)'
        END
    ORDER BY avg_credit_score DESC
    """
    
    result = client.execute_query(credit_query)
    if result and result['data']:
        print("   Category            | Customers | Avg Balance  | Avg Score")
        print("   --------------------|-----------|--------------|----------")
        for row in result['data']:
            category, customers, avg_balance, avg_score = row
            balance_formatted = f"{float(avg_balance):,.0f}"
            score_formatted = f"{float(avg_score):,.0f}"
            print(f"   {category:<19} | {customers:>9} | {balance_formatted:>12} | {score_formatted:>9}")
    
    # Query 2: Risk Tier Matrix
    print("\n🎯 Customer Tier vs Credit Score Matrix:")
    matrix_query = """
    SELECT 
        customer_tier,
        AVG(credit_score) as avg_credit_score,
        MIN(credit_score) as min_credit_score,
        MAX(credit_score) as max_credit_score,
        COUNT(*) as customer_count
    FROM postgresql.public.customers_test
    GROUP BY customer_tier
    ORDER BY avg_credit_score DESC
    """
    
    result = client.execute_query(matrix_query)
    if result and result['data']:
        print("   Tier      | Avg Score | Min Score | Max Score | Count")
        print("   ----------|-----------|-----------|-----------|------")
        for row in result['data']:
            tier, avg_score, min_score, max_score, count = row
            print(f"   {tier:<9} | {avg_score:>9.0f} | {min_score:>9.0f} | {max_score:>9.0f} | {count:>5}")
    
    # Query 3: Age vs Credit Risk Analysis
    print("\n👥 Age vs Credit Risk Analysis:")
    age_risk_query = """
    SELECT 
        CASE 
            WHEN age < 25 THEN 'Young (18-24)'
            WHEN age < 35 THEN 'Young Adult (25-34)'
            WHEN age < 45 THEN 'Middle Age (35-44)'
            WHEN age < 55 THEN 'Mature (45-54)'
            ELSE 'Senior (55+)'
        END as age_group,
        COUNT(*) as customers,
        AVG(credit_score) as avg_credit_score,
        AVG(total_balance) as avg_balance,
        COUNT(CASE WHEN customer_tier IN ('Gold', 'Platinum') THEN 1 END) as premium_count
    FROM postgresql.public.customers_test
    GROUP BY 
        CASE 
            WHEN age < 25 THEN 'Young (18-24)'
            WHEN age < 35 THEN 'Young Adult (25-34)'
            WHEN age < 45 THEN 'Middle Age (35-44)'
            WHEN age < 55 THEN 'Mature (45-54)'
            ELSE 'Senior (55+)'
        END
    ORDER BY avg_credit_score DESC
    """
    
    result = client.execute_query(age_risk_query)
    if result and result['data']:
        print("   Age Group        | Customers | Avg Score | Avg Balance  | Premium")
        print("   -----------------|-----------|-----------|--------------|--------")
        for row in result['data']:
            age_group, customers, avg_score, avg_balance, premium = row
            balance_formatted = f"{float(avg_balance):,.0f}"
            print(f"   {age_group:<16} | {customers:>9} | {avg_score:>9.0f} | {balance_formatted:>12} | {premium:>7}")
 
# Run risk analytics
lakehouse_risk_analytics()

Lab 3: Cross-Catalog Queries

PostgreSQL + System Catalog Joins

# Tạo script: scripts/lab_cross_catalog.py
from lab_trino_connection import TrinoClient
 
client = TrinoClient()
 
def lakehouse_cross_catalog_analytics():
    """Lakehouse Cross-catalog analytics demonstrating data federation"""
    
    print("🔄 Lakehouse Cross-Catalog Analytics")
    print("=" * 50)
    
    # Query 1: Data Source Summary
    print("\n📊 Data Source Summary:")
    cross_query = """
    SELECT 
        'PostgreSQL - Customer Data' as data_source,
        COUNT(*) as record_count,
        'postgresql' as catalog_name
    FROM postgresql.public.customers_test
    
    UNION ALL
    
    SELECT 
        'System - Active Queries' as data_source,
        COUNT(*) as record_count,
        'system' as catalog_name  
    FROM system.runtime.queries
    WHERE state = 'RUNNING'
    
    UNION ALL
    
    SELECT 
        'System - Query History' as data_source,
        COUNT(*) as record_count,
        'system' as catalog_name
    FROM system.runtime.queries
    WHERE created > current_timestamp - interval '1' hour
    """
    
    result = client.execute_query(cross_query)
    if result and result['data']:
        print("   Data Source                | Records | Catalog")
        print("   ---------------------------|---------|----------")
        for row in result['data']:
            source, count, catalog = row
            print(f"   {source:<26} | {count:>7} | {catalog}")
    
    # Query 2: Query Performance Analysis
    print("\n⚡ Recent Query Performance Analysis:")
    perf_query = """
    SELECT 
        query_id,
        state,
        query_type,
        ROUND(execution_time_ms / 1000.0, 2) as execution_seconds,
        processed_rows,
        ROUND(processed_bytes / 1024.0 / 1024.0, 2) as processed_mb
    FROM system.runtime.queries 
    WHERE created > current_timestamp - interval '10' minute
        AND query NOT LIKE '%system.runtime.queries%'  -- Exclude meta queries
    ORDER BY execution_time_ms DESC
    LIMIT 10
    """
    
    result = client.execute_query(perf_query)
    if result and result['data']:
        print("   Query ID         | State    | Type | Exec(s) | Rows    | MB")
        print("   -----------------|----------|------|---------|---------|--------")
        for row in result['data']:
            query_id, state, query_type, exec_sec, rows, mb = row
            short_id = query_id[-8:] if query_id else "N/A"
            state_short = (state or "N/A")[:8]
            type_short = (query_type or "N/A")[:6]
            rows_val = rows or 0
            mb_val = mb or 0.0
            print(f"   ...{short_id:<13} | {state_short:<8} | {type_short:<4} | {exec_sec:>7} | {rows_val:>7} | {mb_val:>6}")
    
    # Query 3: Catalog Metadata Comparison
    print("\n📋 Catalog Metadata Comparison:")
    metadata_query = """
    SELECT 
        catalog_name,
        COUNT(DISTINCT schema_name) as schema_count,
        COUNT(table_name) as table_count
    FROM system.metadata.tables
    WHERE catalog_name IN ('postgresql', 'memory', 'system')
    GROUP BY catalog_name
    ORDER BY table_count DESC
    """
    
    result = client.execute_query(metadata_query)
    if result and result['data']:
        print("   Catalog    | Schemas | Tables")
        print("   -----------|---------|--------")
        for row in result['data']:
            catalog, schemas, tables = row
            print(f"   {catalog:<10} | {schemas:>7} | {tables:>6}")
 
# Run cross-catalog analytics
lakehouse_cross_catalog_analytics()

Lab 4: Performance Monitoring và Optimization

Query Performance Analysis

# Tạo script: scripts/lab_performance_monitoring.py
from lab_trino_connection import TrinoClient
import time
 
client = TrinoClient()
 
def lakehouse_performance_benchmarks():
    """Lakehouse Performance benchmarks and optimization analysis"""
    
    print("⚡ Lakehouse Trino Performance Benchmarks")
    print("=" * 50)
    
    # Benchmark queries với different complexity levels
    benchmark_queries = [
        {
            'name': 'Simple Count',
            'description': 'Basic aggregation query',
            'sql': 'SELECT COUNT(*) FROM postgresql.public.customers_test'
        },
        {
            'name': 'Group By Analysis', 
            'description': 'Grouping and aggregation',
            'sql': '''
                SELECT customer_tier, COUNT(*), AVG(total_balance)
                FROM postgresql.public.customers_test 
                GROUP BY customer_tier
            '''
        },
        {
            'name': 'Complex Analytics',
            'description': 'Window functions and calculations',
            'sql': '''
                SELECT 
                    customer_id,
                    customer_tier,
                    total_balance,
                    AVG(total_balance) OVER (PARTITION BY customer_tier) as tier_avg_balance,
                    RANK() OVER (PARTITION BY customer_tier ORDER BY total_balance DESC) as tier_rank
                FROM postgresql.public.customers_test
            '''
        },
        {
            'name': 'Cross-Schema Join',
            'description': 'Cross-schema data federation',
            'sql': '''
                SELECT 
                    c.customer_tier,
                    COUNT(*) as customer_count,
                    AVG(c.total_balance) as avg_balance
                FROM postgresql.public.customers_test c
                GROUP BY c.customer_tier
                HAVING COUNT(*) > 0
            '''
        }
    ]
    
    benchmark_results = []
    
    for i, benchmark in enumerate(benchmark_queries):
        print(f"\n🏃 Benchmark {i+1}: {benchmark['name']}")
        print(f"   Description: {benchmark['description']}")
        
        # Execute with timing
        start_time = time.time()
        result = client.execute_query(benchmark['sql'])
        end_time = time.time()
        
        execution_time = end_time - start_time
        
        if result:
            row_count = len(result['data']) if result['data'] else 0
            stats = result.get('stats', {})
            
            print(f"   ⏱️  Execution Time: {execution_time:.3f}s")
            print(f"   📊 Rows Returned: {row_count:,}")
            
            if stats:
                processed_rows = stats.get('processedRows', 0)
                cpu_time = stats.get('cpuTimeMillis', 0) / 1000.0
                print(f"   🔄 Processed Rows: {processed_rows:,}")
                print(f"   💻 CPU Time: {cpu_time:.3f}s")
            
            benchmark_results.append({
                'name': benchmark['name'],
                'execution_time': execution_time,
                'rows_returned': row_count,
                'processed_rows': stats.get('processedRows', 0),
                'cpu_time': cpu_time
            })
            
            print("   ✅ Success")
        else:
            print("   ❌ Failed")
            benchmark_results.append({
                'name': benchmark['name'],
                'execution_time': execution_time,
                'rows_returned': 0,
                'processed_rows': 0,
                'cpu_time': 0,
                'status': 'failed'
            })
        
        time.sleep(1)  # Brief pause between benchmarks
    
    # Performance Summary
    print(f"\n📈 Performance Summary:")
    print("   " + "="*60)
    successful_benchmarks = [b for b in benchmark_results if b.get('status') != 'failed']
    
    if successful_benchmarks:
        avg_time = sum(b['execution_time'] for b in successful_benchmarks) / len(successful_benchmarks)
        fastest = min(successful_benchmarks, key=lambda x: x['execution_time'])
        slowest = max(successful_benchmarks, key=lambda x: x['execution_time'])
        
        print(f"   📊 Total Benchmarks: {len(benchmark_queries)}")
        print(f"   ✅ Successful: {len(successful_benchmarks)}")
        print(f"   ⏱️  Average Execution Time: {avg_time:.3f}s")
        print(f"   🚀 Fastest Query: {fastest['name']} ({fastest['execution_time']:.3f}s)")
        print(f"   🐌 Slowest Query: {slowest['name']} ({slowest['execution_time']:.3f}s)")
    
    return benchmark_results
 
def monitor_active_queries():
    """Monitor currently active queries"""
    
    print("\n🔍 Active Query Monitoring:")
    print("=" * 40)
    
    active_queries_sql = """
    SELECT 
        query_id,
        state,
        query_type,
        user_name,
        source,
        ROUND((current_timestamp - created) * 86400, 1) as running_seconds,
        processed_rows,
        queued_splits,
        running_splits
    FROM system.runtime.queries
    WHERE state IN ('PLANNING', 'STARTING', 'RUNNING', 'FINISHING')
    ORDER BY created DESC
    LIMIT 10
    """
    
    result = client.execute_query(active_queries_sql)
    if result and result['data']:
        print("   Query ID (suffix) | State     | Type   | User     | Running(s)")
        print("   ------------------|-----------|--------|----------|----------")
        for row in result['data']:
            query_id, state, qtype, user, source, runtime, rows, queued, running = row
            short_id = query_id[-12:] if query_id else "N/A"
            state_short = (state or "N/A")[:9]
            type_short = (qtype or "N/A")[:6]
            user_short = (user or "N/A")[:8]
            print(f"   ...{short_id:<13} | {state_short:<9} | {type_short:<6} | {user_short:<8} | {runtime:>8}")
    else:
        print("   📭 No active queries found")
 
# Run performance monitoring
benchmark_results = lakehouse_performance_benchmarks()
monitor_active_queries()

Advanced Query Patterns

1. Lakehouse Customer Segmentation Queries

RFM Analysis với Trino

-- Lakehouse RFM (Recency, Frequency, Monetary) Analysis
WITH customer_rfm AS (
    SELECT 
        c.customer_id,
        c.full_name,
        c.customer_tier,
        
        -- Recency: Days since last transaction (simulated)
        COALESCE(
            DATE_DIFF('day', c.last_login_date, CURRENT_DATE), 
            365
        ) as recency_days,
        
        -- Frequency: Transaction frequency (based on tier)
        CASE c.customer_tier
            WHEN 'Platinum' THEN 50
            WHEN 'Gold' THEN 30
            WHEN 'Silver' THEN 15
            ELSE 5
        END as frequency_score,
        
        -- Monetary: Customer value
        c.total_balance as monetary_value
        
    FROM postgresql.public.customers_test c
),
rfm_scores AS (
    SELECT *,
        -- RFM Scoring (1-5 scale)
        CASE 
            WHEN recency_days <= 30 THEN 5
            WHEN recency_days <= 60 THEN 4
            WHEN recency_days <= 90 THEN 3
            WHEN recency_days <= 180 THEN 2
            ELSE 1
        END as recency_score,
        
        CASE 
            WHEN frequency_score >= 40 THEN 5
            WHEN frequency_score >= 25 THEN 4
            WHEN frequency_score >= 15 THEN 3
            WHEN frequency_score >= 8 THEN 2
            ELSE 1
        END as frequency_final_score,
        
        CASE 
            WHEN monetary_value >= 20000000 THEN 5  -- 20M+
            WHEN monetary_value >= 15000000 THEN 4  -- 15M+
            WHEN monetary_value >= 10000000 THEN 3  -- 10M+
            WHEN monetary_value >= 5000000 THEN 2   -- 5M+
            ELSE 1
        END as monetary_score
    FROM customer_rfm
)
SELECT 
    customer_id,
    full_name,
    customer_tier,
    recency_score,
    frequency_final_score,
    monetary_score,
    CONCAT(recency_score, frequency_final_score, monetary_score) as rfm_score,
    
    -- RFM Segmentation
    CASE 
        WHEN recency_score >= 4 AND frequency_final_score >= 4 AND monetary_score >= 4 
        THEN 'Champions'
        
        WHEN recency_score >= 3 AND frequency_final_score >= 3 AND monetary_score >= 3
        THEN 'Loyal Customers'
        
        WHEN recency_score >= 4 AND frequency_final_score <= 2
        THEN 'New Customers'
        
        WHEN recency_score <= 2 AND frequency_final_score >= 3 AND monetary_score >= 3
        THEN 'At Risk'
        
        WHEN recency_score <= 2 AND frequency_final_score <= 2
        THEN 'Lost Customers'
        
        ELSE 'Potential Loyalists'
    END as customer_segment,
    
    monetary_value
FROM rfm_scores
ORDER BY monetary_score DESC, frequency_final_score DESC, recency_score DESC;

2. Time Series Analysis

-- Lakehouse Customer Balance Trend Analysis (Simulated time series)
WITH date_spine AS (
    SELECT DATE_ADD('day', -seq, CURRENT_DATE) as analysis_date
    FROM (
        SELECT sequence(0, 29) as date_sequence
    ) t(date_sequence)
    CROSS JOIN UNNEST(date_sequence) as t(seq)
),
customer_daily_balances AS (
    SELECT 
        c.customer_id,
        c.customer_tier,
        d.analysis_date,
        
        -- Simulate balance fluctuation based on customer tier
        c.total_balance + 
        (RANDOM() - 0.5) * 
        CASE c.customer_tier
            WHEN 'Platinum' THEN 2000000  -- Higher volatility
            WHEN 'Gold' THEN 1000000
            WHEN 'Silver' THEN 500000
            ELSE 200000
        END as daily_balance
        
    FROM postgresql.public.customers_test c
    CROSS JOIN date_spine d
),
balance_analytics AS (
    SELECT 
        customer_id,
        customer_tier,
        analysis_date,
        daily_balance,
        
        -- 7-day moving average
        AVG(daily_balance) OVER (
            PARTITION BY customer_id 
            ORDER BY analysis_date 
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ) as balance_7day_avg,
        
        -- Balance change from previous day
        daily_balance - LAG(daily_balance) OVER (
            PARTITION BY customer_id 
            ORDER BY analysis_date
        ) as daily_change,
        
        -- Volatility (standard deviation over 7 days)
        STDDEV(daily_balance) OVER (
            PARTITION BY customer_id 
            ORDER BY analysis_date 
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ) as volatility_7day
        
    FROM customer_daily_balances
)
SELECT 
    customer_tier,
    COUNT(DISTINCT customer_id) as customers,
    AVG(daily_balance) as avg_balance,
    AVG(ABS(daily_change)) as avg_daily_change,
    AVG(volatility_7day) as avg_volatility,
    
    -- Risk indicators
    COUNT(CASE WHEN ABS(daily_change) > 1000000 THEN 1 END) as high_volatility_days,
    COUNT(CASE WHEN daily_balance < 0 THEN 1 END) as negative_balance_days
    
FROM balance_analytics
WHERE analysis_date >= CURRENT_DATE - INTERVAL '30' DAY
    AND volatility_7day IS NOT NULL
GROUP BY customer_tier
ORDER BY avg_balance DESC;

3. Advanced Analytics Functions

Customer Cohort Analysis

-- Lakehouse Customer Cohort Analysis
WITH customer_cohorts AS (
    SELECT 
        customer_id,
        customer_tier,
        DATE_TRUNC('month', registration_date) as cohort_month,
        registration_date,
        total_balance,
        age
    FROM postgresql.public.customers_test
),
cohort_analysis AS (
    SELECT 
        cohort_month,
        customer_tier,
        COUNT(*) as cohort_customers,
        AVG(total_balance) as avg_initial_balance,
        AVG(age) as avg_age,
        
        -- Customer value distribution
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY total_balance) as p25_balance,
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY total_balance) as median_balance, 
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY total_balance) as p75_balance,
        
        -- Age demographics
        COUNT(CASE WHEN age < 30 THEN 1 END) as young_customers,
        COUNT(CASE WHEN age BETWEEN 30 AND 45 THEN 1 END) as middle_age_customers,
        COUNT(CASE WHEN age > 45 THEN 1 END) as mature_customers
        
    FROM customer_cohorts
    GROUP BY cohort_month, customer_tier
)
SELECT 
    cohort_month,
    customer_tier,
    cohort_customers,
    ROUND(avg_initial_balance, 0) as avg_initial_balance,
    ROUND(avg_age, 1) as avg_age,
    ROUND(p25_balance, 0) as p25_balance,
    ROUND(median_balance, 0) as median_balance,
    ROUND(p75_balance, 0) as p75_balance,
    
    -- Demographics percentages
    ROUND(young_customers * 100.0 / cohort_customers, 1) as pct_young,
    ROUND(middle_age_customers * 100.0 / cohort_customers, 1) as pct_middle,
    ROUND(mature_customers * 100.0 / cohort_customers, 1) as pct_mature,
    
    -- Tier performance ranking
    RANK() OVER (PARTITION BY cohort_month ORDER BY avg_initial_balance DESC) as tier_rank
    
FROM cohort_analysis
ORDER BY cohort_month DESC, avg_initial_balance DESC;

Performance Optimization

1. Query Optimization Strategies

Cost-Based Optimization

-- Before optimization: Full table scan
SELECT customer_id, total_balance
FROM postgresql.public.customers_test
WHERE customer_tier = 'Gold'
    AND total_balance > 10000000;
 
-- After optimization: Use partitioning hints và selective predicates
SELECT customer_id, total_balance
FROM postgresql.public.customers_test
WHERE customer_tier = 'Gold'          -- High selectivity predicate first
    AND total_balance > 10000000       -- Numeric range filter
ORDER BY customer_id                  -- Use indexed column for ordering
LIMIT 100;                           -- Limit results for pagination

Join Optimization

-- Optimized join order và predicate pushdown
SELECT 
    c.customer_id,
    c.customer_tier,
    c.total_balance
FROM postgresql.public.customers_test c
WHERE c.customer_tier IN ('Gold', 'Platinum')  -- Filter early
    AND c.total_balance > 5000000              -- Additional filter
    AND c.city IN ('Ho Chi Minh', 'Ha Noi')    -- Geographic filter
ORDER BY c.total_balance DESC
LIMIT 50;

2. Partitioning và Data Layout

Partition-aware Queries

-- Time-based partitioning example (for future MinIO integration)
-- Query optimization with partition elimination
SELECT 
    customer_id,
    transaction_amount,
    transaction_date
FROM minio.datalake.customer_transactions
WHERE transaction_date >= DATE '2025-06-01'     -- Partition elimination
    AND transaction_date < DATE '2025-07-01'    -- Range scan
    AND customer_tier = 'Gold'                  -- Additional filter
    AND transaction_amount > 1000000;           -- Value filter

3. Memory và Resource Management

Memory Configuration

-- Check query memory usage
SELECT 
    query_id,
    query_type,
    state,
    memory_pool,
    peak_user_memory_bytes / 1024.0 / 1024.0 as peak_memory_mb,
    peak_total_memory_bytes / 1024.0 / 1024.0 as peak_total_mb
FROM system.runtime.queries
WHERE created > current_timestamp - interval '1' hour
    AND peak_user_memory_bytes > 0
ORDER BY peak_total_memory_bytes DESC
LIMIT 10;

Security và Access Control

1. User Management

Lakehouse User Roles

-- Lakehouse User role structure (for future implementation)
CREATE ROLE lakehouse_analyst;           -- Business analysts
CREATE ROLE lakehouse_data_engineer;     -- Data engineering team  
CREATE ROLE lakehouse_compliance;        -- Compliance và audit
CREATE ROLE lakehouse_executive;         -- Executive dashboard access
CREATE ROLE lakehouse_ml_engineer;       -- ML engineering team

Access Control Patterns

# Lakehouse Access control configuration
ACCESS_CONTROL = {
    'lakehouse_analyst': {
        'catalogs': ['postgresql'],
        'schemas': ['customer_domain', 'analytics_domain'],
        'tables': ['customers', 'customer_segments', 'customer_analytics'],
        'operations': ['SELECT'],
        'row_filters': "customer_tier IN ('Gold', 'Platinum')"  # Only high-value customers
    },
    'lakehouse_compliance': {
        'catalogs': ['postgresql', 'system'],
        'schemas': ['ALL'],
        'tables': ['ALL'],
        'operations': ['SELECT'],
        'audit_required': True
    },
    'lakehouse_data_engineer': {
        'catalogs': ['postgresql', 'minio', 'system'],
        'schemas': ['ALL'],
        'tables': ['ALL'], 
        'operations': ['SELECT', 'INSERT', 'CREATE', 'DROP'],
        'admin_access': True
    }
}

2. Data Privacy và Masking

PII Data Masking

-- Lakehouse PII masking for non-privileged users
SELECT 
    customer_id,
    
    -- Mask personal information
    CASE 
        WHEN CURRENT_USER IN ('lakehouse_compliance', 'lakehouse_executive') 
        THEN full_name
        ELSE CONCAT(SUBSTR(full_name, 1, 1), '***')
    END as masked_name,
    
    -- Mask phone numbers
    CASE 
        WHEN CURRENT_USER = 'lakehouse_compliance'
        THEN phone
        ELSE CONCAT('***', SUBSTR(phone, -4))
    END as masked_phone,
    
    -- Show tier và balance (business data)
    customer_tier,
    total_balance,
    city  -- Geographic data is OK
    
FROM postgresql.public.customers_test
WHERE customer_tier IN ('Gold', 'Platinum');

Production Best Practices

1. Query Performance Guidelines

Lakehouse Query Standards

-- ✅ Good: Optimized Lakehouse query pattern
SELECT 
    customer_id,
    customer_tier,
    total_balance
FROM postgresql.customer_domain.customers
WHERE customer_tier = 'Gold'                    -- Use equality predicates
    AND registration_date >= DATE '2025-01-01'  -- Date range filtering
    AND total_balance BETWEEN 10000000 AND 50000000  -- Range filtering
ORDER BY customer_id                            -- Use indexed columns
LIMIT 100;                                     -- Always limit large result sets
 
-- ❌ Bad: Inefficient query pattern
SELECT *                                       -- Avoid SELECT *
FROM postgresql.customer_domain.customers
WHERE UPPER(customer_tier) LIKE '%GOLD%'      -- Avoid functions in WHERE
    OR total_balance > (                       -- Avoid complex subqueries
        SELECT AVG(total_balance) * 1.5 
        FROM postgresql.customer_domain.customers
    )
ORDER BY RANDOM();                             -- Avoid non-deterministic ordering

2. Resource Management

Connection Pooling

# Lakehouse Trino connection pooling configuration
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager
 
class TrinoConnectionManager:
    def __init__(self, max_connections=20):
        self.session = requests.Session()
        
        # Configure connection pooling
        adapter = HTTPAdapter(
            max_retries=3,
            pool_connections=5,
            pool_maxsize=max_connections,
            pool_block=True
        )
        
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
        
        # Lakehouse standard headers
        self.session.headers.update({
            'X-Trino-User': 'lakehouse_application',
            'X-Trino-Source': 'lakehouse_production_app',
            'X-Trino-Client-Info': 'lakehouse-v1.0'
        })

3. Monitoring và Alerting

Performance Monitoring

# Tạo script: scripts/lakehouse_trino_monitoring.py
import requests
import time
import json
from datetime import datetime, timedelta
 
class TrinoMonitor:
    def __init__(self):
        self.base_url = "http://localhost:8085"
        self.session = requests.Session()
        
    def get_cluster_health(self):
        """Get Lakehouse Trino cluster health metrics"""
        try:
            # Cluster info
            info_response = self.session.get(f"{self.base_url}/v1/info")
            cluster_info = info_response.json()
            
            # Node status
            nodes_response = self.session.get(f"{self.base_url}/v1/node")
            nodes_info = nodes_response.json()
            
            health_metrics = {
                'timestamp': datetime.now().isoformat(),
                'coordinator': cluster_info.get('coordinator'),
                'version': cluster_info.get('nodeVersion', {}).get('version'),
                'environment': cluster_info.get('environment'),
                'active_nodes': len(nodes_info),
                'uptime_check': True
            }
            
            return health_metrics
            
        except Exception as e:
            return {
                'timestamp': datetime.now().isoformat(),
                'error': str(e),
                'uptime_check': False
            }
    
    def get_query_metrics(self):
        """Get Lakehouse query performance metrics"""
        client = TrinoClient()
        
        # Recent query performance
        query_metrics_sql = """
        SELECT 
            COUNT(*) as total_queries,
            COUNT(CASE WHEN state = 'FINISHED' THEN 1 END) as successful_queries,
            COUNT(CASE WHEN state = 'FAILED' THEN 1 END) as failed_queries,
            AVG(execution_time_ms) / 1000.0 as avg_execution_seconds,
            MAX(execution_time_ms) / 1000.0 as max_execution_seconds,
            SUM(processed_rows) as total_processed_rows,
            SUM(processed_bytes) / 1024.0 / 1024.0 as total_processed_mb
        FROM system.runtime.queries
        WHERE created > current_timestamp - interval '1' hour
        """
        
        result = client.execute_query(query_metrics_sql)
        if result and result['data']:
            row = result['data'][0]
            return {
                'timestamp': datetime.now().isoformat(),
                'total_queries': row[0] or 0,
                'successful_queries': row[1] or 0,
                'failed_queries': row[2] or 0,
                'avg_execution_seconds': round(float(row[3] or 0), 3),
                'max_execution_seconds': round(float(row[4] or 0), 3),
                'total_processed_rows': row[5] or 0,
                'total_processed_mb': round(float(row[6] or 0), 2)
            }
        return {}
    
    def generate_health_report(self):
        """Generate comprehensive Lakehouse Trino health report"""
        
        print("🏥 Lakehouse Trino Health Report")
        print("=" * 50)
        print(f"📅 Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        
        # Cluster health
        health = self.get_cluster_health()
        print(f"\n🖥️  Cluster Health:")
        if health.get('uptime_check'):
            print(f"   ✅ Status: Online")
            print(f"   📊 Version: {health.get('version', 'Unknown')}")
            print(f"   🏠 Environment: {health.get('environment', 'Unknown')}")
            print(f"   🔗 Coordinator: {health.get('coordinator', False)}")
            print(f"   📡 Active Nodes: {health.get('active_nodes', 0)}")
        else:
            print(f"   ❌ Status: Offline or Error")
            print(f"   🚨 Error: {health.get('error', 'Unknown')}")
        
        # Query metrics
        metrics = self.get_query_metrics()
        if metrics:
            print(f"\n📊 Query Performance (Last Hour):")
            print(f"   📈 Total Queries: {metrics.get('total_queries', 0):,}")
            print(f"   ✅ Successful: {metrics.get('successful_queries', 0):,}")
            print(f"   ❌ Failed: {metrics.get('failed_queries', 0):,}")
            
            success_rate = 0
            if metrics.get('total_queries', 0) > 0:
                success_rate = (metrics.get('successful_queries', 0) / metrics.get('total_queries', 1)) * 100
            print(f"   📊 Success Rate: {success_rate:.1f}%")
            
            print(f"   ⏱️  Avg Execution: {metrics.get('avg_execution_seconds', 0):.3f}s")
            print(f"   🐌 Max Execution: {metrics.get('max_execution_seconds', 0):.3f}s")
            print(f"   📋 Processed Rows: {metrics.get('total_processed_rows', 0):,}")
            print(f"   💾 Processed Data: {metrics.get('total_processed_mb', 0):.2f} MB")
        
        # Health assessment
        print(f"\n🎯 Health Assessment:")
        
        # Define health thresholds
        if health.get('uptime_check') and success_rate >= 95:
            health_status = "🟢 Excellent"
        elif health.get('uptime_check') and success_rate >= 90:
            health_status = "🟡 Good"
        elif health.get('uptime_check'):
            health_status = "🟠 Warning"
        else:
            health_status = "🔴 Critical"
        
        print(f"   Overall Status: {health_status}")
        
        # Recommendations
        print(f"\n💡 Recommendations:")
        if not health.get('uptime_check'):
            print("   🚨 Critical: Trino cluster is down - immediate attention required")
        elif success_rate < 90:
            print("   ⚠️  Warning: High query failure rate - investigate failed queries")
        elif metrics.get('avg_execution_seconds', 0) > 5:
            print("   📈 Performance: Average query time is high - consider optimization")
        else:
            print("   ✅ System is performing well")
        
        return {
            'health': health,
            'metrics': metrics,
            'health_status': health_status,
            'success_rate': success_rate
        }
 
# Run monitoring
if __name__ == "__main__":
    from lab_trino_connection import TrinoClient  # Import our client
    
    monitor = TrinoMonitor()
    report = monitor.generate_health_report()

4. Error Handling và Recovery

Retry Logic và Circuit Breaker

import time
import random
from functools import wraps
 
def retry_with_backoff_with_backoff(max_retries=3, base_delay=1.0):
    """Lakehouse retry decorator with exponential backoff"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    
                    # Exponential backoff with jitter
                    delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                    print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
                    time.sleep(delay)
            
            return None
        return wrapper
    return decorator
 
class TrinoCircuitBreaker:
    """Circuit breaker pattern for Lakehouse Trino connections"""
    
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self.success()
            return result
        except Exception as e:
            self.failure()
            raise e
    
    def success(self):
        self.failure_count = 0
        self.state = 'CLOSED'
    
    def failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'

Kết Luận

Key Takeaways cho Data Engineering Team:

  1. Unified Analytics: Trino provides single SQL interface cho all Lakehouse data sources
  2. Performance: Optimized queries deliver sub-second response times
  3. Scalability: Distributed architecture scales với Lakehouse data growth
  4. Flexibility: Support for complex analytics và cross-system joins
  5. Production Ready: Comprehensive monitoring và error handling

Lakehouse Implementation Roadmap:

Phase 1: Foundation (✅ Completed)

  • ✅ Trino cluster deployment
  • ✅ PostgreSQL catalog integration
  • ✅ Basic query capabilities
  • ✅ Performance benchmarking

Phase 2: Data Lake Integration

  • 🔄 MinIO catalog configuration
  • 🔄 Parquet file format optimization
  • 🔄 Partition strategy implementation
  • 🔄 Data lifecycle management

Phase 3: Advanced Analytics

  • 📋 ML feature store integration
  • 📋 Real-time analytics dashboards
  • 📋 Advanced security implementation
  • 📋 Multi-tenant access control

Phase 4: Production Operations

  • 📋 Automated monitoring và alerting
  • 📋 Performance optimization
  • 📋 Disaster recovery procedures
  • 📋 Team training và documentation

Next Steps:

  1. Practice với provided lab exercises
  2. Implement MinIO catalog integration
  3. Setup production monitoring
  4. Train business users on SQL analytics
  5. Prepare for advanced Spark integration

Tài liệu này là phần của Lakehouse Platform Training Series. Để có câu hỏi hoặc feedback, liên hệ Data Engineering Team.

LDK

Le Duy Khuong

AI Transformation & Digital Strategy. Writing about agentic systems, engineering leadership, and building in public.