Năng suất & công cụ dev
Hướng dẫn Trino
Trino trong Lakehouse: unified SQL, catalogs, labs.
2026-03-1724 phút đọcVI
Mục Lục
- Giới Thiệu Tổng Quan
- Trino trong Lakehouse Architecture
- Data Catalogs và Schemas
- Hands-on Labs
- Advanced Query Patterns
- Performance Optimization
- Security và Access Control
- Production Best Practices
Giới Thiệu Tổng Quan
Trino trong Lakehouse Platform
Trino (formerly PrestoSQL) được triển khai như distributed SQL query engine trong Lakehouse Platform, đóng vai trò:
- Unified Query Interface: Single point để query data từ multiple sources
- Cross-catalog Analytics: Join data từ PostgreSQL, MinIO, và external systems
- Real-time Analytics: Low-latency queries cho business intelligence
- Data Federation: Federated queries across heterogeneous data sources
- Scalable Processing: Distributed processing cho large datasets
Key Benefits cho Lakehouse
🏦 Lakehouse Business Value:
├── Real-time Customer 360 queries
├── Cross-system analytics (Core Banking + Digital)
├── Regulatory reporting với SQL familiar interface
├── Ad-hoc analysis cho business users
└── Cost-effective alternative to traditional DWH
Architecture Components
Lakehouse Trino Deployment:
├── Trino Coordinator (lakehouse-trino:8085)
├── Query Processing Engine
├── Multiple Data Catalogs:
│ ├── PostgreSQL Catalog (Lakehouse metadata store)
│ ├── MinIO Catalog (Data Lake storage)
│ ├── System Catalog (Trino internals)
│ └── Memory Catalog (Testing)
└── REST API Interface
Trino trong Lakehouse Architecture
Overall Data Flow
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Data Sources │───▶│ Trino Engine │───▶│ Analytics │
│ │ │ │ │ │
│ • PostgreSQL │ │ • SQL Interface │ │ • Dashboards │
│ • MinIO S3 │ │ • Query Planner │ │ • Reports │
│ • Kafka Streams │ │ • Execution │ │ • ML Pipelines │
│ • External APIs │ │ • Federation │ │ • API Responses │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Query Processing Architecture
Lakehouse Trino Query Execution:
1. Query Submission
├── REST API (http://localhost:8085)
├── Authentication & Authorization
└── Query Parsing & Validation
2. Query Planning
├── Catalog Metadata Discovery
├── Cost-based Optimization
├── Cross-catalog Join Planning
└── Execution Plan Generation
3. Distributed Execution
├── Task Distribution
├── Data Source Connectors
├── In-memory Processing
└── Result Aggregation
4. Result Delivery
├── Streaming Results
├── Pagination Support
└── Multiple Output Formats
Lakehouse Data Federation Model
-- Lakehouse Federated Query Example
SELECT
c.customer_id,
c.full_name,
c.customer_tier,
t.total_transactions,
t.total_amount,
s.current_balance
FROM postgresql.customer_domain.customers c
JOIN postgresql.transaction_domain.customer_summary t
ON c.customer_id = t.customer_id
JOIN minio.datalake.customer_snapshots s
ON c.customer_id = s.customer_id
WHERE c.customer_tier IN ('Gold', 'Platinum')
AND t.total_amount > 100000000 -- 100M VNDData Catalogs và Schemas
Available Catalogs
1. PostgreSQL Catalog
Purpose: Lakehouse operational data và metadata store
Connection: postgresql.{schema}.{table}
Schemas:
-- Customer Domain Schemas
postgresql.customer_domain.customers
postgresql.customer_domain.customer_segments
postgresql.customer_domain.customer_analytics
-- Transaction Domain
postgresql.transaction_domain.transactions
postgresql.transaction_domain.payments
postgresql.transaction_domain.transfers
-- Product Domain
postgresql.product_domain.loan_products
postgresql.product_domain.applications
postgresql.product_domain.approvals
-- Risk Domain
postgresql.risk_domain.credit_scores
postgresql.risk_domain.risk_assessments
postgresql.risk_domain.fraud_alerts
-- Analytics Schemas
postgresql.analytics_domain.customer_360
postgresql.analytics_domain.transaction_summary
postgresql.analytics_domain.risk_metrics2. MinIO Catalog (S3-compatible)
Purpose: Lakehouse data lake storage for large datasets
Connection: minio.{bucket}.{path}
Structure:
-- Data Lake Structure
minio.datalake.customer_profiles/year=2025/month=06/
minio.datalake.transaction_logs/year=2025/month=06/day=26/
minio.datalake.risk_models/version=v2.1/
minio.datalake.ml_features/customer_segments/
-- Backup & Archive
minio.backup.postgresql_dumps/date=2025-06-26/
minio.archive.historical_data/year=2024/
-- ML Model Storage
minio.models.risk_scoring/model_v1.2.pkl
minio.models.customer_segmentation/lgb_model.pkl3. System Catalog
Purpose: Trino metadata và system information
-- Query monitoring
system.runtime.queries
system.runtime.tasks
system.runtime.nodes
-- Catalog information
system.metadata.catalogs
system.metadata.schemas
system.metadata.tables4. Memory Catalog
Purpose: Testing và temporary data
-- Temporary calculations
memory.default.temp_calculations
memory.default.query_resultsHands-on Labs
Lab 1: Connecting to Lakehouse Trino
Basic Connection Test
# Tạo script: scripts/lab_trino_connection.py
import requests
import json
import time
class TrinoClient:
def __init__(self, host='localhost', port=8085):
self.base_url = f"http://{host}:{port}"
self.session = requests.Session()
self.session.headers.update({
'X-Trino-User': 'lakehouse_analyst',
'X-Trino-Source': 'lakehouse_training_lab',
'X-Trino-Catalog': 'postgresql',
'X-Trino-Schema': 'customer_domain'
})
def execute_query(self, sql):
"""Execute SQL query on Lakehouse Trino"""
try:
# Submit query
response = self.session.post(
f"{self.base_url}/v1/statement",
data=sql,
headers={'Content-Type': 'text/plain'}
)
if response.status_code != 200:
print(f"❌ Query submission failed: {response.status_code}")
return None
# Poll for results
query_info = response.json()
next_uri = query_info.get('nextUri')
all_data = []
columns = None
while next_uri:
response = self.session.get(next_uri)
if response.status_code != 200:
break
result = response.json()
if 'columns' in result and columns is None:
columns = result['columns']
if 'data' in result and result['data']:
all_data.extend(result['data'])
state = result.get('stats', {}).get('state')
if state == 'FINISHED':
return {
'columns': columns,
'data': all_data,
'stats': result.get('stats', {})
}
elif state == 'FAILED':
error = result.get('error', {})
print(f"❌ Query failed: {error.get('message', 'Unknown error')}")
return None
next_uri = result.get('nextUri')
time.sleep(0.1)
return {'columns': columns, 'data': all_data}
except Exception as e:
print(f"❌ Error executing query: {e}")
return None
def show_catalogs(self):
"""Show all available catalogs"""
result = self.execute_query("SHOW CATALOGS")
if result and result['data']:
print("📊 Available Data Catalogs:")
for row in result['data']:
catalog = row[0]
print(f" • {catalog}")
# Show schemas for main catalogs
if catalog in ['postgresql', 'minio']:
schemas_result = self.execute_query(f"SHOW SCHEMAS FROM {catalog}")
if schemas_result and schemas_result['data']:
print(f" Schemas ({len(schemas_result['data'])}):")
for schema_row in schemas_result['data'][:5]: # Show first 5
print(f" - {schema_row[0]}")
if len(schemas_result['data']) > 5:
print(f" - ... and {len(schemas_result['data']) - 5} more")
print()
return result
# Test connection
client = TrinoClient()
print("🔗 Testing Lakehouse Trino Connection...")
print("=" * 50)
# Test basic connectivity
info_response = requests.get("http://localhost:8085/v1/info")
if info_response.status_code == 200:
info = info_response.json()
print(f"✅ Trino Coordinator: {info.get('coordinator', False)}")
print(f"✅ Version: {info.get('nodeVersion', {}).get('version', 'Unknown')}")
print(f"✅ Environment: {info.get('environment', 'Unknown')}")
else:
print("❌ Failed to connect to Trino")
exit(1)
# Show catalogs
print("\n" + "=" * 50)
client.show_catalogs()Verify Lakehouse Schemas
# Tạo script: scripts/lab_lakehouse_schemas.py
from lab_trino_connection import TrinoClient
client = TrinoClient()
def explore_lakehouse_data_structure():
"""Explore Lakehouse data structure in detail"""
print("🏦 Lakehouse Data Structure")
print("=" * 60)
# PostgreSQL schemas exploration
print("\n📊 PostgreSQL Catalog (Operational Data):")
postgres_schemas = client.execute_query("SHOW SCHEMAS FROM postgresql")
if postgres_schemas and postgres_schemas['data']:
lakehouse_schemas = [schema[0] for schema in postgres_schemas['data']
if 'domain' in schema[0] or schema[0] in ['public', 'analytics']]
for schema in lakehouse_schemas:
print(f"\n 📁 Schema: {schema}")
# Show tables in each domain schema
tables_query = f"SHOW TABLES FROM postgresql.{schema}"
tables_result = client.execute_query(tables_query)
if tables_result and tables_result['data']:
for table_row in tables_result['data']:
table_name = table_row[0]
print(f" 📋 Table: {table_name}")
# Show sample data for key tables
if any(keyword in table_name.lower()
for keyword in ['customer', 'transaction', 'profile']):
sample_query = f"SELECT COUNT(*) as record_count FROM postgresql.{schema}.{table_name}"
count_result = client.execute_query(sample_query)
if count_result and count_result['data']:
count = count_result['data'][0][0]
print(f" 📊 Records: {count:,}")
# Run exploration
explore_lakehouse_data_structure()Lab 2: Lakehouse Customer Analytics Queries
Customer 360 Queries
# Tạo script: scripts/lab_customer_360.py
from lab_trino_connection import TrinoClient
client = TrinoClient()
def lakehouse_customer_360_analytics():
"""Lakehouse Customer 360 analytics using Trino"""
print("👥 Lakehouse Customer 360 Analytics")
print("=" * 50)
# Query 1: Customer Tier Distribution
print("\n📊 Customer Tier Distribution:")
tier_query = """
SELECT
customer_tier,
COUNT(*) as customer_count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
FROM postgresql.public.customers_test
GROUP BY customer_tier
ORDER BY customer_count DESC
"""
result = client.execute_query(tier_query)
if result and result['data']:
print(" Tier | Count | Percentage")
print(" --------------|-------|----------")
for row in result['data']:
tier, count, pct = row
print(f" {tier:<13} | {count:>5} | {pct:>7}%")
# Query 2: High Value Customers
print("\n💰 High Value Customers (>15M VND):")
high_value_query = """
SELECT
customer_id,
full_name,
customer_tier,
total_balance,
city
FROM postgresql.public.customers_test
WHERE total_balance > 15000000
ORDER BY total_balance DESC
"""
result = client.execute_query(high_value_query)
if result and result['data']:
print(" ID | Name | Tier | Balance | City")
print(" -------|-------------------|-----------|--------------|----------")
for row in result['data']:
cust_id, name, tier, balance, city = row
balance_formatted = f"{float(balance):,.0f}"
print(f" {cust_id} | {name:<17} | {tier:<9} | {balance_formatted:>12} | {city}")
# Query 3: City-wise Customer Analytics
print("\n🏙️ City-wise Customer Analytics:")
city_query = """
SELECT
city,
COUNT(*) as customers,
AVG(total_balance) as avg_balance,
MAX(total_balance) as max_balance,
COUNT(CASE WHEN customer_tier IN ('Gold', 'Platinum') THEN 1 END) as premium_customers
FROM postgresql.public.customers_test
GROUP BY city
ORDER BY avg_balance DESC
"""
result = client.execute_query(city_query)
if result and result['data']:
print(" City | Customers | Avg Balance | Max Balance | Premium")
print(" -------------|-----------|--------------|--------------|--------")
for row in result['data']:
city, customers, avg_bal, max_bal, premium = row
avg_formatted = f"{float(avg_bal):,.0f}"
max_formatted = f"{float(max_bal):,.0f}"
print(f" {city:<12} | {customers:>9} | {avg_formatted:>12} | {max_formatted:>12} | {premium:>7}")
# Run analytics
lakehouse_customer_360_analytics()Risk Analytics Queries
# Tạo script: scripts/lab_risk_analytics.py
from lab_trino_connection import TrinoClient
client = TrinoClient()
def lakehouse_risk_analytics():
"""Lakehouse Risk analytics and credit scoring"""
print("⚠️ Lakehouse Risk Analytics")
print("=" * 50)
# Query 1: Credit Score Distribution
print("\n📈 Credit Score Distribution:")
credit_query = """
SELECT
CASE
WHEN credit_score >= 800 THEN 'Excellent (800+)'
WHEN credit_score >= 740 THEN 'Very Good (740-799)'
WHEN credit_score >= 670 THEN 'Good (670-739)'
WHEN credit_score >= 580 THEN 'Fair (580-669)'
ELSE 'Poor (<580)'
END as credit_category,
COUNT(*) as customers,
AVG(total_balance) as avg_balance,
AVG(credit_score) as avg_credit_score
FROM postgresql.public.customers_test
GROUP BY
CASE
WHEN credit_score >= 800 THEN 'Excellent (800+)'
WHEN credit_score >= 740 THEN 'Very Good (740-799)'
WHEN credit_score >= 670 THEN 'Good (670-739)'
WHEN credit_score >= 580 THEN 'Fair (580-669)'
ELSE 'Poor (<580)'
END
ORDER BY avg_credit_score DESC
"""
result = client.execute_query(credit_query)
if result and result['data']:
print(" Category | Customers | Avg Balance | Avg Score")
print(" --------------------|-----------|--------------|----------")
for row in result['data']:
category, customers, avg_balance, avg_score = row
balance_formatted = f"{float(avg_balance):,.0f}"
score_formatted = f"{float(avg_score):,.0f}"
print(f" {category:<19} | {customers:>9} | {balance_formatted:>12} | {score_formatted:>9}")
# Query 2: Risk Tier Matrix
print("\n🎯 Customer Tier vs Credit Score Matrix:")
matrix_query = """
SELECT
customer_tier,
AVG(credit_score) as avg_credit_score,
MIN(credit_score) as min_credit_score,
MAX(credit_score) as max_credit_score,
COUNT(*) as customer_count
FROM postgresql.public.customers_test
GROUP BY customer_tier
ORDER BY avg_credit_score DESC
"""
result = client.execute_query(matrix_query)
if result and result['data']:
print(" Tier | Avg Score | Min Score | Max Score | Count")
print(" ----------|-----------|-----------|-----------|------")
for row in result['data']:
tier, avg_score, min_score, max_score, count = row
print(f" {tier:<9} | {avg_score:>9.0f} | {min_score:>9.0f} | {max_score:>9.0f} | {count:>5}")
# Query 3: Age vs Credit Risk Analysis
print("\n👥 Age vs Credit Risk Analysis:")
age_risk_query = """
SELECT
CASE
WHEN age < 25 THEN 'Young (18-24)'
WHEN age < 35 THEN 'Young Adult (25-34)'
WHEN age < 45 THEN 'Middle Age (35-44)'
WHEN age < 55 THEN 'Mature (45-54)'
ELSE 'Senior (55+)'
END as age_group,
COUNT(*) as customers,
AVG(credit_score) as avg_credit_score,
AVG(total_balance) as avg_balance,
COUNT(CASE WHEN customer_tier IN ('Gold', 'Platinum') THEN 1 END) as premium_count
FROM postgresql.public.customers_test
GROUP BY
CASE
WHEN age < 25 THEN 'Young (18-24)'
WHEN age < 35 THEN 'Young Adult (25-34)'
WHEN age < 45 THEN 'Middle Age (35-44)'
WHEN age < 55 THEN 'Mature (45-54)'
ELSE 'Senior (55+)'
END
ORDER BY avg_credit_score DESC
"""
result = client.execute_query(age_risk_query)
if result and result['data']:
print(" Age Group | Customers | Avg Score | Avg Balance | Premium")
print(" -----------------|-----------|-----------|--------------|--------")
for row in result['data']:
age_group, customers, avg_score, avg_balance, premium = row
balance_formatted = f"{float(avg_balance):,.0f}"
print(f" {age_group:<16} | {customers:>9} | {avg_score:>9.0f} | {balance_formatted:>12} | {premium:>7}")
# Run risk analytics
lakehouse_risk_analytics()Lab 3: Cross-Catalog Queries
PostgreSQL + System Catalog Joins
# Tạo script: scripts/lab_cross_catalog.py
from lab_trino_connection import TrinoClient
client = TrinoClient()
def lakehouse_cross_catalog_analytics():
"""Lakehouse Cross-catalog analytics demonstrating data federation"""
print("🔄 Lakehouse Cross-Catalog Analytics")
print("=" * 50)
# Query 1: Data Source Summary
print("\n📊 Data Source Summary:")
cross_query = """
SELECT
'PostgreSQL - Customer Data' as data_source,
COUNT(*) as record_count,
'postgresql' as catalog_name
FROM postgresql.public.customers_test
UNION ALL
SELECT
'System - Active Queries' as data_source,
COUNT(*) as record_count,
'system' as catalog_name
FROM system.runtime.queries
WHERE state = 'RUNNING'
UNION ALL
SELECT
'System - Query History' as data_source,
COUNT(*) as record_count,
'system' as catalog_name
FROM system.runtime.queries
WHERE created > current_timestamp - interval '1' hour
"""
result = client.execute_query(cross_query)
if result and result['data']:
print(" Data Source | Records | Catalog")
print(" ---------------------------|---------|----------")
for row in result['data']:
source, count, catalog = row
print(f" {source:<26} | {count:>7} | {catalog}")
# Query 2: Query Performance Analysis
print("\n⚡ Recent Query Performance Analysis:")
perf_query = """
SELECT
query_id,
state,
query_type,
ROUND(execution_time_ms / 1000.0, 2) as execution_seconds,
processed_rows,
ROUND(processed_bytes / 1024.0 / 1024.0, 2) as processed_mb
FROM system.runtime.queries
WHERE created > current_timestamp - interval '10' minute
AND query NOT LIKE '%system.runtime.queries%' -- Exclude meta queries
ORDER BY execution_time_ms DESC
LIMIT 10
"""
result = client.execute_query(perf_query)
if result and result['data']:
print(" Query ID | State | Type | Exec(s) | Rows | MB")
print(" -----------------|----------|------|---------|---------|--------")
for row in result['data']:
query_id, state, query_type, exec_sec, rows, mb = row
short_id = query_id[-8:] if query_id else "N/A"
state_short = (state or "N/A")[:8]
type_short = (query_type or "N/A")[:6]
rows_val = rows or 0
mb_val = mb or 0.0
print(f" ...{short_id:<13} | {state_short:<8} | {type_short:<4} | {exec_sec:>7} | {rows_val:>7} | {mb_val:>6}")
# Query 3: Catalog Metadata Comparison
print("\n📋 Catalog Metadata Comparison:")
metadata_query = """
SELECT
catalog_name,
COUNT(DISTINCT schema_name) as schema_count,
COUNT(table_name) as table_count
FROM system.metadata.tables
WHERE catalog_name IN ('postgresql', 'memory', 'system')
GROUP BY catalog_name
ORDER BY table_count DESC
"""
result = client.execute_query(metadata_query)
if result and result['data']:
print(" Catalog | Schemas | Tables")
print(" -----------|---------|--------")
for row in result['data']:
catalog, schemas, tables = row
print(f" {catalog:<10} | {schemas:>7} | {tables:>6}")
# Run cross-catalog analytics
lakehouse_cross_catalog_analytics()Lab 4: Performance Monitoring và Optimization
Query Performance Analysis
# Tạo script: scripts/lab_performance_monitoring.py
from lab_trino_connection import TrinoClient
import time
client = TrinoClient()
def lakehouse_performance_benchmarks():
"""Lakehouse Performance benchmarks and optimization analysis"""
print("⚡ Lakehouse Trino Performance Benchmarks")
print("=" * 50)
# Benchmark queries với different complexity levels
benchmark_queries = [
{
'name': 'Simple Count',
'description': 'Basic aggregation query',
'sql': 'SELECT COUNT(*) FROM postgresql.public.customers_test'
},
{
'name': 'Group By Analysis',
'description': 'Grouping and aggregation',
'sql': '''
SELECT customer_tier, COUNT(*), AVG(total_balance)
FROM postgresql.public.customers_test
GROUP BY customer_tier
'''
},
{
'name': 'Complex Analytics',
'description': 'Window functions and calculations',
'sql': '''
SELECT
customer_id,
customer_tier,
total_balance,
AVG(total_balance) OVER (PARTITION BY customer_tier) as tier_avg_balance,
RANK() OVER (PARTITION BY customer_tier ORDER BY total_balance DESC) as tier_rank
FROM postgresql.public.customers_test
'''
},
{
'name': 'Cross-Schema Join',
'description': 'Cross-schema data federation',
'sql': '''
SELECT
c.customer_tier,
COUNT(*) as customer_count,
AVG(c.total_balance) as avg_balance
FROM postgresql.public.customers_test c
GROUP BY c.customer_tier
HAVING COUNT(*) > 0
'''
}
]
benchmark_results = []
for i, benchmark in enumerate(benchmark_queries):
print(f"\n🏃 Benchmark {i+1}: {benchmark['name']}")
print(f" Description: {benchmark['description']}")
# Execute with timing
start_time = time.time()
result = client.execute_query(benchmark['sql'])
end_time = time.time()
execution_time = end_time - start_time
if result:
row_count = len(result['data']) if result['data'] else 0
stats = result.get('stats', {})
print(f" ⏱️ Execution Time: {execution_time:.3f}s")
print(f" 📊 Rows Returned: {row_count:,}")
if stats:
processed_rows = stats.get('processedRows', 0)
cpu_time = stats.get('cpuTimeMillis', 0) / 1000.0
print(f" 🔄 Processed Rows: {processed_rows:,}")
print(f" 💻 CPU Time: {cpu_time:.3f}s")
benchmark_results.append({
'name': benchmark['name'],
'execution_time': execution_time,
'rows_returned': row_count,
'processed_rows': stats.get('processedRows', 0),
'cpu_time': cpu_time
})
print(" ✅ Success")
else:
print(" ❌ Failed")
benchmark_results.append({
'name': benchmark['name'],
'execution_time': execution_time,
'rows_returned': 0,
'processed_rows': 0,
'cpu_time': 0,
'status': 'failed'
})
time.sleep(1) # Brief pause between benchmarks
# Performance Summary
print(f"\n📈 Performance Summary:")
print(" " + "="*60)
successful_benchmarks = [b for b in benchmark_results if b.get('status') != 'failed']
if successful_benchmarks:
avg_time = sum(b['execution_time'] for b in successful_benchmarks) / len(successful_benchmarks)
fastest = min(successful_benchmarks, key=lambda x: x['execution_time'])
slowest = max(successful_benchmarks, key=lambda x: x['execution_time'])
print(f" 📊 Total Benchmarks: {len(benchmark_queries)}")
print(f" ✅ Successful: {len(successful_benchmarks)}")
print(f" ⏱️ Average Execution Time: {avg_time:.3f}s")
print(f" 🚀 Fastest Query: {fastest['name']} ({fastest['execution_time']:.3f}s)")
print(f" 🐌 Slowest Query: {slowest['name']} ({slowest['execution_time']:.3f}s)")
return benchmark_results
def monitor_active_queries():
"""Monitor currently active queries"""
print("\n🔍 Active Query Monitoring:")
print("=" * 40)
active_queries_sql = """
SELECT
query_id,
state,
query_type,
user_name,
source,
ROUND((current_timestamp - created) * 86400, 1) as running_seconds,
processed_rows,
queued_splits,
running_splits
FROM system.runtime.queries
WHERE state IN ('PLANNING', 'STARTING', 'RUNNING', 'FINISHING')
ORDER BY created DESC
LIMIT 10
"""
result = client.execute_query(active_queries_sql)
if result and result['data']:
print(" Query ID (suffix) | State | Type | User | Running(s)")
print(" ------------------|-----------|--------|----------|----------")
for row in result['data']:
query_id, state, qtype, user, source, runtime, rows, queued, running = row
short_id = query_id[-12:] if query_id else "N/A"
state_short = (state or "N/A")[:9]
type_short = (qtype or "N/A")[:6]
user_short = (user or "N/A")[:8]
print(f" ...{short_id:<13} | {state_short:<9} | {type_short:<6} | {user_short:<8} | {runtime:>8}")
else:
print(" 📭 No active queries found")
# Run performance monitoring
benchmark_results = lakehouse_performance_benchmarks()
monitor_active_queries()Advanced Query Patterns
1. Lakehouse Customer Segmentation Queries
RFM Analysis với Trino
-- Lakehouse RFM (Recency, Frequency, Monetary) Analysis
WITH customer_rfm AS (
SELECT
c.customer_id,
c.full_name,
c.customer_tier,
-- Recency: Days since last transaction (simulated)
COALESCE(
DATE_DIFF('day', c.last_login_date, CURRENT_DATE),
365
) as recency_days,
-- Frequency: Transaction frequency (based on tier)
CASE c.customer_tier
WHEN 'Platinum' THEN 50
WHEN 'Gold' THEN 30
WHEN 'Silver' THEN 15
ELSE 5
END as frequency_score,
-- Monetary: Customer value
c.total_balance as monetary_value
FROM postgresql.public.customers_test c
),
rfm_scores AS (
SELECT *,
-- RFM Scoring (1-5 scale)
CASE
WHEN recency_days <= 30 THEN 5
WHEN recency_days <= 60 THEN 4
WHEN recency_days <= 90 THEN 3
WHEN recency_days <= 180 THEN 2
ELSE 1
END as recency_score,
CASE
WHEN frequency_score >= 40 THEN 5
WHEN frequency_score >= 25 THEN 4
WHEN frequency_score >= 15 THEN 3
WHEN frequency_score >= 8 THEN 2
ELSE 1
END as frequency_final_score,
CASE
WHEN monetary_value >= 20000000 THEN 5 -- 20M+
WHEN monetary_value >= 15000000 THEN 4 -- 15M+
WHEN monetary_value >= 10000000 THEN 3 -- 10M+
WHEN monetary_value >= 5000000 THEN 2 -- 5M+
ELSE 1
END as monetary_score
FROM customer_rfm
)
SELECT
customer_id,
full_name,
customer_tier,
recency_score,
frequency_final_score,
monetary_score,
CONCAT(recency_score, frequency_final_score, monetary_score) as rfm_score,
-- RFM Segmentation
CASE
WHEN recency_score >= 4 AND frequency_final_score >= 4 AND monetary_score >= 4
THEN 'Champions'
WHEN recency_score >= 3 AND frequency_final_score >= 3 AND monetary_score >= 3
THEN 'Loyal Customers'
WHEN recency_score >= 4 AND frequency_final_score <= 2
THEN 'New Customers'
WHEN recency_score <= 2 AND frequency_final_score >= 3 AND monetary_score >= 3
THEN 'At Risk'
WHEN recency_score <= 2 AND frequency_final_score <= 2
THEN 'Lost Customers'
ELSE 'Potential Loyalists'
END as customer_segment,
monetary_value
FROM rfm_scores
ORDER BY monetary_score DESC, frequency_final_score DESC, recency_score DESC;2. Time Series Analysis
Customer Balance Trends
-- Lakehouse Customer Balance Trend Analysis (Simulated time series)
WITH date_spine AS (
SELECT DATE_ADD('day', -seq, CURRENT_DATE) as analysis_date
FROM (
SELECT sequence(0, 29) as date_sequence
) t(date_sequence)
CROSS JOIN UNNEST(date_sequence) as t(seq)
),
customer_daily_balances AS (
SELECT
c.customer_id,
c.customer_tier,
d.analysis_date,
-- Simulate balance fluctuation based on customer tier
c.total_balance +
(RANDOM() - 0.5) *
CASE c.customer_tier
WHEN 'Platinum' THEN 2000000 -- Higher volatility
WHEN 'Gold' THEN 1000000
WHEN 'Silver' THEN 500000
ELSE 200000
END as daily_balance
FROM postgresql.public.customers_test c
CROSS JOIN date_spine d
),
balance_analytics AS (
SELECT
customer_id,
customer_tier,
analysis_date,
daily_balance,
-- 7-day moving average
AVG(daily_balance) OVER (
PARTITION BY customer_id
ORDER BY analysis_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as balance_7day_avg,
-- Balance change from previous day
daily_balance - LAG(daily_balance) OVER (
PARTITION BY customer_id
ORDER BY analysis_date
) as daily_change,
-- Volatility (standard deviation over 7 days)
STDDEV(daily_balance) OVER (
PARTITION BY customer_id
ORDER BY analysis_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as volatility_7day
FROM customer_daily_balances
)
SELECT
customer_tier,
COUNT(DISTINCT customer_id) as customers,
AVG(daily_balance) as avg_balance,
AVG(ABS(daily_change)) as avg_daily_change,
AVG(volatility_7day) as avg_volatility,
-- Risk indicators
COUNT(CASE WHEN ABS(daily_change) > 1000000 THEN 1 END) as high_volatility_days,
COUNT(CASE WHEN daily_balance < 0 THEN 1 END) as negative_balance_days
FROM balance_analytics
WHERE analysis_date >= CURRENT_DATE - INTERVAL '30' DAY
AND volatility_7day IS NOT NULL
GROUP BY customer_tier
ORDER BY avg_balance DESC;3. Advanced Analytics Functions
Customer Cohort Analysis
-- Lakehouse Customer Cohort Analysis
WITH customer_cohorts AS (
SELECT
customer_id,
customer_tier,
DATE_TRUNC('month', registration_date) as cohort_month,
registration_date,
total_balance,
age
FROM postgresql.public.customers_test
),
cohort_analysis AS (
SELECT
cohort_month,
customer_tier,
COUNT(*) as cohort_customers,
AVG(total_balance) as avg_initial_balance,
AVG(age) as avg_age,
-- Customer value distribution
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY total_balance) as p25_balance,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY total_balance) as median_balance,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY total_balance) as p75_balance,
-- Age demographics
COUNT(CASE WHEN age < 30 THEN 1 END) as young_customers,
COUNT(CASE WHEN age BETWEEN 30 AND 45 THEN 1 END) as middle_age_customers,
COUNT(CASE WHEN age > 45 THEN 1 END) as mature_customers
FROM customer_cohorts
GROUP BY cohort_month, customer_tier
)
SELECT
cohort_month,
customer_tier,
cohort_customers,
ROUND(avg_initial_balance, 0) as avg_initial_balance,
ROUND(avg_age, 1) as avg_age,
ROUND(p25_balance, 0) as p25_balance,
ROUND(median_balance, 0) as median_balance,
ROUND(p75_balance, 0) as p75_balance,
-- Demographics percentages
ROUND(young_customers * 100.0 / cohort_customers, 1) as pct_young,
ROUND(middle_age_customers * 100.0 / cohort_customers, 1) as pct_middle,
ROUND(mature_customers * 100.0 / cohort_customers, 1) as pct_mature,
-- Tier performance ranking
RANK() OVER (PARTITION BY cohort_month ORDER BY avg_initial_balance DESC) as tier_rank
FROM cohort_analysis
ORDER BY cohort_month DESC, avg_initial_balance DESC;Performance Optimization
1. Query Optimization Strategies
Cost-Based Optimization
-- Before optimization: Full table scan
SELECT customer_id, total_balance
FROM postgresql.public.customers_test
WHERE customer_tier = 'Gold'
AND total_balance > 10000000;
-- After optimization: Use partitioning hints và selective predicates
SELECT customer_id, total_balance
FROM postgresql.public.customers_test
WHERE customer_tier = 'Gold' -- High selectivity predicate first
AND total_balance > 10000000 -- Numeric range filter
ORDER BY customer_id -- Use indexed column for ordering
LIMIT 100; -- Limit results for paginationJoin Optimization
-- Optimized join order và predicate pushdown
SELECT
c.customer_id,
c.customer_tier,
c.total_balance
FROM postgresql.public.customers_test c
WHERE c.customer_tier IN ('Gold', 'Platinum') -- Filter early
AND c.total_balance > 5000000 -- Additional filter
AND c.city IN ('Ho Chi Minh', 'Ha Noi') -- Geographic filter
ORDER BY c.total_balance DESC
LIMIT 50;2. Partitioning và Data Layout
Partition-aware Queries
-- Time-based partitioning example (for future MinIO integration)
-- Query optimization with partition elimination
SELECT
customer_id,
transaction_amount,
transaction_date
FROM minio.datalake.customer_transactions
WHERE transaction_date >= DATE '2025-06-01' -- Partition elimination
AND transaction_date < DATE '2025-07-01' -- Range scan
AND customer_tier = 'Gold' -- Additional filter
AND transaction_amount > 1000000; -- Value filter3. Memory và Resource Management
Memory Configuration
-- Check query memory usage
SELECT
query_id,
query_type,
state,
memory_pool,
peak_user_memory_bytes / 1024.0 / 1024.0 as peak_memory_mb,
peak_total_memory_bytes / 1024.0 / 1024.0 as peak_total_mb
FROM system.runtime.queries
WHERE created > current_timestamp - interval '1' hour
AND peak_user_memory_bytes > 0
ORDER BY peak_total_memory_bytes DESC
LIMIT 10;Security và Access Control
1. User Management
Lakehouse User Roles
-- Lakehouse User role structure (for future implementation)
CREATE ROLE lakehouse_analyst; -- Business analysts
CREATE ROLE lakehouse_data_engineer; -- Data engineering team
CREATE ROLE lakehouse_compliance; -- Compliance và audit
CREATE ROLE lakehouse_executive; -- Executive dashboard access
CREATE ROLE lakehouse_ml_engineer; -- ML engineering teamAccess Control Patterns
# Lakehouse Access control configuration
ACCESS_CONTROL = {
'lakehouse_analyst': {
'catalogs': ['postgresql'],
'schemas': ['customer_domain', 'analytics_domain'],
'tables': ['customers', 'customer_segments', 'customer_analytics'],
'operations': ['SELECT'],
'row_filters': "customer_tier IN ('Gold', 'Platinum')" # Only high-value customers
},
'lakehouse_compliance': {
'catalogs': ['postgresql', 'system'],
'schemas': ['ALL'],
'tables': ['ALL'],
'operations': ['SELECT'],
'audit_required': True
},
'lakehouse_data_engineer': {
'catalogs': ['postgresql', 'minio', 'system'],
'schemas': ['ALL'],
'tables': ['ALL'],
'operations': ['SELECT', 'INSERT', 'CREATE', 'DROP'],
'admin_access': True
}
}2. Data Privacy và Masking
PII Data Masking
-- Lakehouse PII masking for non-privileged users
SELECT
customer_id,
-- Mask personal information
CASE
WHEN CURRENT_USER IN ('lakehouse_compliance', 'lakehouse_executive')
THEN full_name
ELSE CONCAT(SUBSTR(full_name, 1, 1), '***')
END as masked_name,
-- Mask phone numbers
CASE
WHEN CURRENT_USER = 'lakehouse_compliance'
THEN phone
ELSE CONCAT('***', SUBSTR(phone, -4))
END as masked_phone,
-- Show tier và balance (business data)
customer_tier,
total_balance,
city -- Geographic data is OK
FROM postgresql.public.customers_test
WHERE customer_tier IN ('Gold', 'Platinum');Production Best Practices
1. Query Performance Guidelines
Lakehouse Query Standards
-- ✅ Good: Optimized Lakehouse query pattern
SELECT
customer_id,
customer_tier,
total_balance
FROM postgresql.customer_domain.customers
WHERE customer_tier = 'Gold' -- Use equality predicates
AND registration_date >= DATE '2025-01-01' -- Date range filtering
AND total_balance BETWEEN 10000000 AND 50000000 -- Range filtering
ORDER BY customer_id -- Use indexed columns
LIMIT 100; -- Always limit large result sets
-- ❌ Bad: Inefficient query pattern
SELECT * -- Avoid SELECT *
FROM postgresql.customer_domain.customers
WHERE UPPER(customer_tier) LIKE '%GOLD%' -- Avoid functions in WHERE
OR total_balance > ( -- Avoid complex subqueries
SELECT AVG(total_balance) * 1.5
FROM postgresql.customer_domain.customers
)
ORDER BY RANDOM(); -- Avoid non-deterministic ordering2. Resource Management
Connection Pooling
# Lakehouse Trino connection pooling configuration
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager
class TrinoConnectionManager:
def __init__(self, max_connections=20):
self.session = requests.Session()
# Configure connection pooling
adapter = HTTPAdapter(
max_retries=3,
pool_connections=5,
pool_maxsize=max_connections,
pool_block=True
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# Lakehouse standard headers
self.session.headers.update({
'X-Trino-User': 'lakehouse_application',
'X-Trino-Source': 'lakehouse_production_app',
'X-Trino-Client-Info': 'lakehouse-v1.0'
})3. Monitoring và Alerting
Performance Monitoring
# Tạo script: scripts/lakehouse_trino_monitoring.py
import requests
import time
import json
from datetime import datetime, timedelta
class TrinoMonitor:
def __init__(self):
self.base_url = "http://localhost:8085"
self.session = requests.Session()
def get_cluster_health(self):
"""Get Lakehouse Trino cluster health metrics"""
try:
# Cluster info
info_response = self.session.get(f"{self.base_url}/v1/info")
cluster_info = info_response.json()
# Node status
nodes_response = self.session.get(f"{self.base_url}/v1/node")
nodes_info = nodes_response.json()
health_metrics = {
'timestamp': datetime.now().isoformat(),
'coordinator': cluster_info.get('coordinator'),
'version': cluster_info.get('nodeVersion', {}).get('version'),
'environment': cluster_info.get('environment'),
'active_nodes': len(nodes_info),
'uptime_check': True
}
return health_metrics
except Exception as e:
return {
'timestamp': datetime.now().isoformat(),
'error': str(e),
'uptime_check': False
}
def get_query_metrics(self):
"""Get Lakehouse query performance metrics"""
client = TrinoClient()
# Recent query performance
query_metrics_sql = """
SELECT
COUNT(*) as total_queries,
COUNT(CASE WHEN state = 'FINISHED' THEN 1 END) as successful_queries,
COUNT(CASE WHEN state = 'FAILED' THEN 1 END) as failed_queries,
AVG(execution_time_ms) / 1000.0 as avg_execution_seconds,
MAX(execution_time_ms) / 1000.0 as max_execution_seconds,
SUM(processed_rows) as total_processed_rows,
SUM(processed_bytes) / 1024.0 / 1024.0 as total_processed_mb
FROM system.runtime.queries
WHERE created > current_timestamp - interval '1' hour
"""
result = client.execute_query(query_metrics_sql)
if result and result['data']:
row = result['data'][0]
return {
'timestamp': datetime.now().isoformat(),
'total_queries': row[0] or 0,
'successful_queries': row[1] or 0,
'failed_queries': row[2] or 0,
'avg_execution_seconds': round(float(row[3] or 0), 3),
'max_execution_seconds': round(float(row[4] or 0), 3),
'total_processed_rows': row[5] or 0,
'total_processed_mb': round(float(row[6] or 0), 2)
}
return {}
def generate_health_report(self):
"""Generate comprehensive Lakehouse Trino health report"""
print("🏥 Lakehouse Trino Health Report")
print("=" * 50)
print(f"📅 Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Cluster health
health = self.get_cluster_health()
print(f"\n🖥️ Cluster Health:")
if health.get('uptime_check'):
print(f" ✅ Status: Online")
print(f" 📊 Version: {health.get('version', 'Unknown')}")
print(f" 🏠 Environment: {health.get('environment', 'Unknown')}")
print(f" 🔗 Coordinator: {health.get('coordinator', False)}")
print(f" 📡 Active Nodes: {health.get('active_nodes', 0)}")
else:
print(f" ❌ Status: Offline or Error")
print(f" 🚨 Error: {health.get('error', 'Unknown')}")
# Query metrics
metrics = self.get_query_metrics()
if metrics:
print(f"\n📊 Query Performance (Last Hour):")
print(f" 📈 Total Queries: {metrics.get('total_queries', 0):,}")
print(f" ✅ Successful: {metrics.get('successful_queries', 0):,}")
print(f" ❌ Failed: {metrics.get('failed_queries', 0):,}")
success_rate = 0
if metrics.get('total_queries', 0) > 0:
success_rate = (metrics.get('successful_queries', 0) / metrics.get('total_queries', 1)) * 100
print(f" 📊 Success Rate: {success_rate:.1f}%")
print(f" ⏱️ Avg Execution: {metrics.get('avg_execution_seconds', 0):.3f}s")
print(f" 🐌 Max Execution: {metrics.get('max_execution_seconds', 0):.3f}s")
print(f" 📋 Processed Rows: {metrics.get('total_processed_rows', 0):,}")
print(f" 💾 Processed Data: {metrics.get('total_processed_mb', 0):.2f} MB")
# Health assessment
print(f"\n🎯 Health Assessment:")
# Define health thresholds
if health.get('uptime_check') and success_rate >= 95:
health_status = "🟢 Excellent"
elif health.get('uptime_check') and success_rate >= 90:
health_status = "🟡 Good"
elif health.get('uptime_check'):
health_status = "🟠 Warning"
else:
health_status = "🔴 Critical"
print(f" Overall Status: {health_status}")
# Recommendations
print(f"\n💡 Recommendations:")
if not health.get('uptime_check'):
print(" 🚨 Critical: Trino cluster is down - immediate attention required")
elif success_rate < 90:
print(" ⚠️ Warning: High query failure rate - investigate failed queries")
elif metrics.get('avg_execution_seconds', 0) > 5:
print(" 📈 Performance: Average query time is high - consider optimization")
else:
print(" ✅ System is performing well")
return {
'health': health,
'metrics': metrics,
'health_status': health_status,
'success_rate': success_rate
}
# Run monitoring
if __name__ == "__main__":
from lab_trino_connection import TrinoClient # Import our client
monitor = TrinoMonitor()
report = monitor.generate_health_report()4. Error Handling và Recovery
Retry Logic và Circuit Breaker
import time
import random
from functools import wraps
def retry_with_backoff_with_backoff(max_retries=3, base_delay=1.0):
"""Lakehouse retry decorator with exponential backoff"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
time.sleep(delay)
return None
return wrapper
return decorator
class TrinoCircuitBreaker:
"""Circuit breaker pattern for Lakehouse Trino connections"""
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.success()
return result
except Exception as e:
self.failure()
raise e
def success(self):
self.failure_count = 0
self.state = 'CLOSED'
def failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'Kết Luận
Key Takeaways cho Data Engineering Team:
- Unified Analytics: Trino provides single SQL interface cho all Lakehouse data sources
- Performance: Optimized queries deliver sub-second response times
- Scalability: Distributed architecture scales với Lakehouse data growth
- Flexibility: Support for complex analytics và cross-system joins
- Production Ready: Comprehensive monitoring và error handling
Lakehouse Implementation Roadmap:
Phase 1: Foundation (✅ Completed)
- ✅ Trino cluster deployment
- ✅ PostgreSQL catalog integration
- ✅ Basic query capabilities
- ✅ Performance benchmarking
Phase 2: Data Lake Integration
- 🔄 MinIO catalog configuration
- 🔄 Parquet file format optimization
- 🔄 Partition strategy implementation
- 🔄 Data lifecycle management
Phase 3: Advanced Analytics
- 📋 ML feature store integration
- 📋 Real-time analytics dashboards
- 📋 Advanced security implementation
- 📋 Multi-tenant access control
Phase 4: Production Operations
- 📋 Automated monitoring và alerting
- 📋 Performance optimization
- 📋 Disaster recovery procedures
- 📋 Team training và documentation
Next Steps:
- Practice với provided lab exercises
- Implement MinIO catalog integration
- Setup production monitoring
- Train business users on SQL analytics
- Prepare for advanced Spark integration
Tài liệu này là phần của Lakehouse Platform Training Series. Để có câu hỏi hoặc feedback, liên hệ Data Engineering Team.
