Dev Productivity & Tools
PostgreSQL Database Guide — Lakehouse
PostgreSQL trong Lakehouse: schema, labs, integration.
2026-03-1716 min read
Table of Contents
- Overview
- PostgreSQL Architecture
- Database Schema Design
- Hands-on Labs
- Integration with Lakehouse
- Performance Optimization
- Security & Access Control
- Backup & Recovery
- Monitoring & Troubleshooting
- Production Best Practices
Overview
What is PostgreSQL in Lakehouse?
PostgreSQL serves as the primary relational database in the Lakehouse architecture, handling:
- Transactional Data: Customer transactions, loan applications, payment records
- Operational Data: User accounts, product catalogs, configuration data
- Analytics Support: Pre-aggregated metrics, dimension tables, lookup data
- Metadata Storage: Data lineage, catalog information, job execution logs
Key Features for Lakehouse
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Applications │ │ Analytics │ │ Data Science │
│ (OLTP) │ │ (OLAP) │ │ (ML/AI) │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌─────────────▼─────────────┐
│ PostgreSQL │
│ - ACID Compliance │
│ - JSON/JSONB Support │
│ - Full-text Search │
│ - Partitioning │
│ - Replication │
└───────────────────────────┘
PostgreSQL Architecture
1. Database Structure
-- Lakehouse Database Organization
lakehouse_db/
├── core/ -- Core business data
│ ├── customers -- Customer master data
│ ├── loans -- Loan applications & status
│ ├── payments -- Payment transactions
│ └── products -- Financial products
├── analytics/ -- Analytics & reporting
│ ├── customer_metrics
│ ├── loan_analytics
│ └── risk_indicators
├── metadata/ -- System metadata
│ ├── data_lineage
│ ├── job_execution
│ └── catalog_info
└── staging/ -- ETL staging area
├── raw_imports
└── transformation_temp2. Container Configuration
# From docker-compose.yml
postgresql:
image: postgres:15
container_name: postgresql
environment:
POSTGRES_DB: lakehouse_db
POSTGRES_USER: lakehouse_admin
POSTGRES_PASSWORD: CHANGE_ME_PASSWORD
ports:
- "5432:5432"
volumes:
- ./data/postgresql:/var/lib/postgresql/data
- ./platform/postgresql/init:/docker-entrypoint-initdb.d3. Connection Details
# Connection Parameters
Host: localhost
Port: 5432
Database: lakehouse_db
Username: lakehouse_admin
Password: CHANGE_ME_PASSWORD
# Connection String
postgresql://lakehouse_admin:CHANGE_ME_PASSWORD@localhost:5432/lakehouse_dbDatabase Schema Design
1. Core Business Tables
Customer Management
-- Customer master table
CREATE TABLE core.customers (
customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_code VARCHAR(20) UNIQUE NOT NULL,
full_name VARCHAR(255) NOT NULL,
id_number VARCHAR(20) UNIQUE NOT NULL,
phone_number VARCHAR(15),
email VARCHAR(255),
date_of_birth DATE,
address JSONB,
kyc_status VARCHAR(20) DEFAULT 'PENDING',
credit_score INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Indexing for performance (customer_code/id_number already indexed by UNIQUE)
CREATE INDEX idx_customer_kyc_status ON core.customers (kyc_status);
CREATE INDEX idx_customer_created_at ON core.customers (created_at);Loan Management
-- Loan applications and lifecycle
CREATE TABLE core.loans (
loan_id UUID DEFAULT gen_random_uuid(),
loan_number VARCHAR(30) NOT NULL,
customer_id UUID REFERENCES core.customers(customer_id),
product_type VARCHAR(50) NOT NULL,
loan_amount DECIMAL(15,2) NOT NULL,
interest_rate DECIMAL(5,4) NOT NULL,
term_months INTEGER NOT NULL,
status VARCHAR(20) DEFAULT 'APPLIED',
application_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
approval_date TIMESTAMP,
disbursement_date TIMESTAMP,
maturity_date DATE,
collateral_info JSONB,
risk_assessment JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Partitioned table: PK + UNIQUE must include the partition key
PRIMARY KEY (loan_id, application_date),
UNIQUE (loan_number, application_date)
) PARTITION BY RANGE (application_date);
-- Partitions for better performance
CREATE TABLE core.loans_2024 PARTITION OF core.loans
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
CREATE INDEX idx_loans_customer ON core.loans (customer_id);Payment Transactions
-- Payment and transaction records
CREATE TABLE core.payments (
payment_id UUID DEFAULT gen_random_uuid(),
transaction_id VARCHAR(50) NOT NULL,
loan_id UUID,
customer_id UUID REFERENCES core.customers(customer_id),
payment_type VARCHAR(20) NOT NULL, -- 'DISBURSEMENT', 'REPAYMENT', 'FEE'
amount DECIMAL(15,2) NOT NULL,
payment_method VARCHAR(30),
payment_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
due_date DATE,
status VARCHAR(20) DEFAULT 'PENDING',
channel VARCHAR(30), -- 'MOBILE_APP', 'BRANCH', 'BANK_TRANSFER'
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Partitioned table: PK must include the partition key
PRIMARY KEY (payment_id, payment_date)
) PARTITION BY RANGE (payment_date);
-- Indexing for queries (loan_id kept as plain column: FK to a partitioned parent
-- needs the full partition key, omitted here for tutorial simplicity)
CREATE INDEX idx_payments_loan ON core.payments (loan_id);
CREATE INDEX idx_payments_customer ON core.payments (customer_id);
CREATE INDEX idx_payments_payment_date ON core.payments (payment_date);
CREATE INDEX idx_payments_status ON core.payments (status);2. Analytics Tables
Customer Analytics
-- Pre-aggregated customer metrics
CREATE TABLE analytics.customer_metrics (
metric_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
customer_id UUID REFERENCES core.customers(customer_id),
metric_date DATE NOT NULL,
total_loans INTEGER DEFAULT 0,
active_loans INTEGER DEFAULT 0,
total_borrowed DECIMAL(15,2) DEFAULT 0,
total_repaid DECIMAL(15,2) DEFAULT 0,
current_outstanding DECIMAL(15,2) DEFAULT 0,
days_past_due INTEGER DEFAULT 0,
risk_score DECIMAL(5,2),
ltv_ratio DECIMAL(5,4), -- Loan-to-Value
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Unique constraint for daily metrics
UNIQUE(customer_id, metric_date)
);Risk Analytics
-- Risk assessment and monitoring
CREATE TABLE analytics.risk_indicators (
indicator_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_type VARCHAR(20) NOT NULL, -- 'CUSTOMER', 'LOAN', 'PORTFOLIO'
entity_id UUID NOT NULL,
indicator_type VARCHAR(50) NOT NULL,
indicator_value DECIMAL(10,4),
threshold_value DECIMAL(10,4),
alert_level VARCHAR(20), -- 'LOW', 'MEDIUM', 'HIGH', 'CRITICAL'
calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
metadata JSONB,
-- Composite index for queries
INDEX idx_entity_type_id (entity_type, entity_id),
INDEX idx_alert_level (alert_level),
INDEX idx_calculated_at (calculated_at)
);3. Metadata Tables
Data Lineage
-- Track data flow and transformations
CREATE TABLE metadata.data_lineage (
lineage_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_system VARCHAR(100) NOT NULL,
source_table VARCHAR(200) NOT NULL,
target_system VARCHAR(100) NOT NULL,
target_table VARCHAR(200) NOT NULL,
transformation_type VARCHAR(50),
transformation_logic TEXT,
pipeline_id VARCHAR(100),
execution_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
record_count BIGINT,
success_flag BOOLEAN DEFAULT TRUE,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);Hands-on Labs
Lab 1: Database Setup and Initial Schema
Step 1: Connect to PostgreSQL
# Connect using psql
docker exec -it postgresql psql -U lakehouse_admin -d lakehouse_db
# Or using connection string
psql "postgresql://lakehouse_admin:CHANGE_ME_PASSWORD@localhost:5432/lakehouse_db"Step 2: Create Schema Structure
-- Create schemas
CREATE SCHEMA IF NOT EXISTS core;
CREATE SCHEMA IF NOT EXISTS analytics;
CREATE SCHEMA IF NOT EXISTS metadata;
CREATE SCHEMA IF NOT EXISTS staging;
-- Verify schema creation
\dnStep 3: Create Core Tables
-- Execute the customer table creation
-- (Use the customer table DDL from above)
-- Verify table creation
\dt core.*
-- Check table structure
\d core.customersLab 2: Sample Data Creation
Step 1: Insert Sample Customers
-- Insert test customers
INSERT INTO core.customers (
customer_code, full_name, id_number, phone_number, email,
date_of_birth, address, kyc_status, credit_score
) VALUES
('CUST001', 'Nguyen Van An', '123456789', '0901234567', 'an.nguyen@email.com',
'1985-05-15', '{"city": "Ho Chi Minh", "district": "District 1", "ward": "Ben Nghe"}',
'APPROVED', 750),
('CUST002', 'Tran Thi Binh', '987654321', '0912345678', 'binh.tran@email.com',
'1990-08-22', '{"city": "Ha Noi", "district": "Dong Da", "ward": "Lang Thuong"}',
'APPROVED', 680),
('CUST003', 'Le Van Cuong', '456789123', '0923456789', 'cuong.le@email.com',
'1988-12-10', '{"city": "Da Nang", "district": "Hai Chau", "ward": "Thuan Phuoc"}',
'PENDING', 720);
-- Verify data insertion
SELECT customer_code, full_name, kyc_status, credit_score
FROM core.customers;Step 2: Create Sample Loans
-- First, get customer IDs
SELECT customer_id, customer_code FROM core.customers;
-- Insert sample loans (replace with actual customer_ids)
INSERT INTO core.loans (
loan_number, customer_id, product_type, loan_amount, interest_rate,
term_months, status, application_date, collateral_info
) VALUES
('LN2024001',
(SELECT customer_id FROM core.customers WHERE customer_code = 'CUST001'),
'PERSONAL_LOAN', 50000000.00, 0.1250, 24, 'APPROVED',
'2024-01-15 10:30:00', '{"type": "MOTORBIKE", "value": 30000000}'),
('LN2024002',
(SELECT customer_id FROM core.customers WHERE customer_code = 'CUST002'),
'BUSINESS_LOAN', 100000000.00, 0.1150, 36, 'DISBURSED',
'2024-02-20 14:15:00', '{"type": "REAL_ESTATE", "value": 150000000}');
-- Check loan data
SELECT l.loan_number, c.customer_code, l.product_type,
l.loan_amount, l.status
FROM core.loans l
JOIN core.customers c ON l.customer_id = c.customer_id;Lab 3: Advanced Queries and Analytics
Step 1: Customer Portfolio Analysis
-- Customer portfolio summary
SELECT
c.customer_code,
c.full_name,
COUNT(l.loan_id) as total_loans,
SUM(l.loan_amount) as total_borrowed,
AVG(l.interest_rate) as avg_interest_rate,
string_agg(DISTINCT l.product_type, ', ') as product_types
FROM core.customers c
LEFT JOIN core.loans l ON c.customer_id = l.customer_id
GROUP BY c.customer_id, c.customer_code, c.full_name
ORDER BY total_borrowed DESC NULLS LAST;Step 2: Risk Assessment Queries
-- High-risk loan identification
SELECT
l.loan_number,
c.customer_code,
l.loan_amount,
l.interest_rate,
c.credit_score,
CASE
WHEN c.credit_score < 600 THEN 'HIGH_RISK'
WHEN c.credit_score < 700 THEN 'MEDIUM_RISK'
ELSE 'LOW_RISK'
END as risk_category,
l.collateral_info->>'type' as collateral_type,
l.collateral_info->>'value' as collateral_value
FROM core.loans l
JOIN core.customers c ON l.customer_id = c.customer_id
WHERE l.status IN ('APPROVED', 'DISBURSED')
ORDER BY c.credit_score ASC;Step 3: JSON Data Operations
-- Working with address data (JSONB)
SELECT
customer_code,
full_name,
address->>'city' as city,
address->>'district' as district,
address->>'ward' as ward
FROM core.customers
WHERE address->>'city' = 'Ho Chi Minh';
-- Update address information
UPDATE core.customers
SET address = jsonb_set(address, '{postal_code}', '"70000"')
WHERE address->>'city' = 'Ho Chi Minh';Lab 4: Performance Optimization
Step 1: Index Creation and Analysis
-- Create performance indexes
CREATE INDEX CONCURRENTLY idx_customers_credit_score
ON core.customers (credit_score DESC);
CREATE INDEX CONCURRENTLY idx_loans_amount_status
ON core.loans (loan_amount DESC, status);
-- Analyze query performance
EXPLAIN ANALYZE
SELECT c.customer_code, l.loan_amount, l.status
FROM core.customers c
JOIN core.loans l ON c.customer_id = l.customer_id
WHERE c.credit_score > 700 AND l.loan_amount > 50000000;Step 2: Table Partitioning Demo
-- Create monthly partitions for payments
CREATE TABLE core.payments_2024_01 PARTITION OF core.payments
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE core.payments_2024_02 PARTITION OF core.payments
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Insert test payment data
INSERT INTO core.payments (
transaction_id, loan_id, customer_id, payment_type, amount,
payment_method, payment_date, status, channel
) VALUES
('TXN2024001',
(SELECT loan_id FROM core.loans WHERE loan_number = 'LN2024001'),
(SELECT customer_id FROM core.customers WHERE customer_code = 'CUST001'),
'REPAYMENT', 2500000.00, 'BANK_TRANSFER', '2024-01-31 16:30:00',
'COMPLETED', 'MOBILE_APP');Integration with Lakehouse
1. Trino Integration
The PostgreSQL database is already configured as a catalog in Trino:
# platform/trino/catalog/postgresql.properties
connector.name=postgresql
connection-url=jdbc:postgresql://postgresql:5432/lakehouse_db
connection-user=lakehouse_admin
connection-password=CHANGE_ME_PASSWORDQuery PostgreSQL via Trino
-- Connect to Trino and query PostgreSQL data
SELECT
p.customer_code,
p.full_name,
p.credit_score,
COUNT(l.loan_id) as loan_count,
SUM(l.loan_amount) as total_amount
FROM postgresql.core.customers p
LEFT JOIN postgresql.core.loans l ON p.customer_id = l.customer_id
GROUP BY p.customer_code, p.full_name, p.credit_score
ORDER BY total_amount DESC;2. Kafka Integration
Stream Changes to Kafka
-- Create a trigger function for change data capture
CREATE OR REPLACE FUNCTION notify_loan_status_change()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('loan_status_changed',
json_build_object(
'loan_id', NEW.loan_id,
'loan_number', NEW.loan_number,
'old_status', OLD.status,
'new_status', NEW.status,
'changed_at', CURRENT_TIMESTAMP
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create trigger
CREATE TRIGGER loan_status_change_trigger
AFTER UPDATE OF status ON core.loans
FOR EACH ROW
EXECUTE FUNCTION notify_loan_status_change();3. ETL Pipeline Integration
Staging Area for Data Loading
-- Create staging table for bulk data imports
CREATE TABLE staging.customer_imports (
import_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
batch_id VARCHAR(50),
source_system VARCHAR(100),
raw_data JSONB,
processed BOOLEAN DEFAULT FALSE,
error_message TEXT,
imported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- ETL procedure for processing staged data
CREATE OR REPLACE FUNCTION process_customer_staging()
RETURNS INTEGER AS $$
DECLARE
processed_count INTEGER := 0;
staging_record RECORD;
BEGIN
FOR staging_record IN
SELECT * FROM staging.customer_imports
WHERE NOT processed
LOOP
BEGIN
-- Insert into core table
INSERT INTO core.customers (
customer_code, full_name, id_number,
phone_number, email, date_of_birth, address
) VALUES (
staging_record.raw_data->>'customer_code',
staging_record.raw_data->>'full_name',
staging_record.raw_data->>'id_number',
staging_record.raw_data->>'phone_number',
staging_record.raw_data->>'email',
(staging_record.raw_data->>'date_of_birth')::DATE,
staging_record.raw_data->'address'
);
-- Mark as processed
UPDATE staging.customer_imports
SET processed = TRUE
WHERE import_id = staging_record.import_id;
processed_count := processed_count + 1;
EXCEPTION WHEN OTHERS THEN
-- Log error
UPDATE staging.customer_imports
SET error_message = SQLERRM
WHERE import_id = staging_record.import_id;
END;
END LOOP;
RETURN processed_count;
END;
$$ LANGUAGE plpgsql;Performance Optimization
1. Query Optimization
Effective Indexing Strategy
-- Composite indexes for common query patterns
CREATE INDEX idx_loans_customer_status_date
ON core.loans (customer_id, status, application_date DESC);
CREATE INDEX idx_payments_loan_date_amount
ON core.payments (loan_id, payment_date DESC, amount DESC);
-- Partial indexes for specific conditions
CREATE INDEX idx_active_loans
ON core.loans (customer_id, loan_amount)
WHERE status IN ('APPROVED', 'DISBURSED');
-- GIN index for JSONB queries
CREATE INDEX idx_customer_address_gin
ON core.customers USING GIN (address);Query Optimization Examples
-- Optimized customer loan summary
WITH customer_summaries AS (
SELECT
customer_id,
COUNT(*) as loan_count,
SUM(loan_amount) as total_amount,
AVG(interest_rate) as avg_rate
FROM core.loans
WHERE status IN ('APPROVED', 'DISBURSED')
GROUP BY customer_id
)
SELECT
c.customer_code,
c.full_name,
c.credit_score,
COALESCE(cs.loan_count, 0) as loan_count,
COALESCE(cs.total_amount, 0) as total_amount,
COALESCE(cs.avg_rate, 0) as avg_rate
FROM core.customers c
LEFT JOIN customer_summaries cs ON c.customer_id = cs.customer_id
WHERE c.kyc_status = 'APPROVED'
ORDER BY cs.total_amount DESC NULLS LAST;2. Connection Pooling
PgBouncer Configuration
# pgbouncer.ini
[databases]
lakehouse_db = host=postgresql port=5432 dbname=lakehouse_db
[pgbouncer]
listen_port = 6432
listen_addr = *
auth_type = md5
auth_file = userlist.txt
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 25
server_reset_query = DISCARD ALL3. Maintenance Operations
Regular Maintenance Tasks
-- Update table statistics
ANALYZE core.customers;
ANALYZE core.loans;
ANALYZE core.payments;
-- Vacuum operations
VACUUM ANALYZE core.customers;
-- Reindex operations
REINDEX INDEX CONCURRENTLY idx_customers_credit_score;
-- Check table sizes
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname LIKE 'lakehouse_%'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;Security & Access Control
1. Role-Based Access Control
Create Application Roles
-- Create roles for different access levels
CREATE ROLE lakehouse_read_only NOLOGIN;
CREATE ROLE lakehouse_analyst NOLOGIN;
CREATE ROLE lakehouse_app_user NOLOGIN;
CREATE ROLE lakehouse_etl_user NOLOGIN;
-- Grant permissions to roles
GRANT USAGE ON SCHEMA core TO lakehouse_read_only;
GRANT SELECT ON ALL TABLES IN SCHEMA core TO lakehouse_read_only;
GRANT lakehouse_read_only TO lakehouse_analyst;
GRANT USAGE ON SCHEMA analytics TO lakehouse_analyst;
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA analytics TO lakehouse_analyst;
-- Create specific users
CREATE USER lakehouse_app_reader WITH PASSWORD 'reader_password_2024';
GRANT lakehouse_read_only TO lakehouse_app_reader;
CREATE USER lakehouse_data_analyst WITH PASSWORD 'analyst_password_2024';
GRANT lakehouse_analyst TO lakehouse_data_analyst;2. Row Level Security
Implement Customer Data Protection
-- Enable RLS on sensitive tables
ALTER TABLE core.customers ENABLE ROW LEVEL SECURITY;
-- Create policy for customer access
CREATE POLICY customer_own_data ON core.customers
FOR ALL TO lakehouse_app_user
USING (customer_id = current_setting('app.current_customer_id')::UUID);
-- Policy for analysts (aggregate access only)
CREATE POLICY analyst_aggregate_access ON core.customers
FOR SELECT TO lakehouse_analyst
USING (true); -- Allow all for analytics, but sensitive fields can be masked3. Data Encryption
Column-Level Encryption
-- Create extension for encryption
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- Encrypt sensitive data
ALTER TABLE core.customers
ADD COLUMN encrypted_id_number TEXT;
UPDATE core.customers
SET encrypted_id_number = pgp_sym_encrypt(id_number, 'lakehouse_encryption_key_2024');
-- Query encrypted data
SELECT
customer_code,
pgp_sym_decrypt(encrypted_id_number::bytea, 'lakehouse_encryption_key_2024') as id_number
FROM core.customers
WHERE customer_code = 'CUST001';Backup & Recovery
1. Backup Strategy
Automated Backup Script
#!/bin/bash
# backup_postgresql.sh
BACKUP_DIR="/backup/postgresql"
DB_NAME="lakehouse_db"
DB_USER="lakehouse_admin"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
# Create backup directory
mkdir -p $BACKUP_DIR
# Full database backup
docker exec postgresql pg_dump -U $DB_USER -d $DB_NAME \
--format=custom --compress=9 \
> $BACKUP_DIR/lakehouse_db_$TIMESTAMP.backup
# Schema-only backup
docker exec postgresql pg_dump -U $DB_USER -d $DB_NAME \
--schema-only --format=plain \
> $BACKUP_DIR/lakehouse_schema_$TIMESTAMP.sql
# Data-only backup for specific tables
docker exec postgresql pg_dump -U $DB_USER -d $DB_NAME \
--data-only --table=core.customers --table=core.loans \
--format=custom \
> $BACKUP_DIR/core_data_$TIMESTAMP.backup
echo "Backup completed: $TIMESTAMP"Point-in-Time Recovery Setup
# Enable WAL archiving in postgresql.conf
wal_level = replica
archive_mode = on
archive_command = 'cp %p /backup/postgresql/wal/%f'
max_wal_senders = 32. Recovery Procedures
Database Restore
# Restore full database
docker exec -i postgresql pg_restore -U lakehouse_admin -d postgres \
--create --clean --if-exists \
< /backup/postgresql/lakehouse_db_20241201_120000.backup
# Restore specific schema
docker exec -i postgresql psql -U lakehouse_admin -d lakehouse_db \
< /backup/postgresql/lakehouse_schema_20241201_120000.sqlMonitoring & Troubleshooting
1. Performance Monitoring
Key Metrics to Monitor
-- Database size and growth
SELECT
pg_database.datname,
pg_size_pretty(pg_database_size(pg_database.datname)) AS size
FROM pg_database
ORDER BY pg_database_size(pg_database.datname) DESC;
-- Active connections
SELECT
state,
COUNT(*) as connection_count
FROM pg_stat_activity
WHERE datname = 'lakehouse_db'
GROUP BY state;
-- Slow queries identification
SELECT
query,
state,
now() - query_start as duration,
pid
FROM pg_stat_activity
WHERE now() - query_start > interval '5 minutes'
AND state != 'idle'
ORDER BY duration DESC;
-- Index usage statistics
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
WHERE schemaname LIKE 'lakehouse_%'
ORDER BY idx_scan DESC;2. Log Analysis
Important Log Patterns
# Monitor PostgreSQL logs
docker logs postgresql --tail=100 -f
# Check for errors
docker logs postgresql 2>&1 | grep -i error
# Monitor connection issues
docker logs postgresql 2>&1 | grep -i "connection"
# Check slow queries (if log_min_duration_statement is set)
docker logs postgresql 2>&1 | grep -i "duration"3. Common Issues and Solutions
Connection Issues
-- Check connection limits
SHOW max_connections;
-- Current connection count
SELECT count(*) FROM pg_stat_activity;
-- Kill long-running queries
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE state != 'idle'
AND now() - query_start > interval '1 hour';Performance Issues
-- Identify missing indexes
SELECT
schemaname,
tablename,
seq_scan,
seq_tup_read,
seq_tup_read / seq_scan as avg_tup_per_scan
FROM pg_stat_user_tables
WHERE seq_scan > 0
ORDER BY seq_tup_read DESC;
-- Lock analysis
SELECT
blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement,
blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;Production Best Practices
1. Configuration Optimization
PostgreSQL Configuration
# postgresql.conf optimizations for Lakehouse workload
# Memory settings
shared_buffers = 256MB # 25% of RAM
effective_cache_size = 1GB # 75% of RAM
work_mem = 4MB # Per operation
maintenance_work_mem = 64MB # For maintenance operations
# Checkpoint settings
checkpoint_completion_target = 0.9
wal_buffers = 16MB
random_page_cost = 1.1 # For SSD storage
# Connection settings
max_connections = 200
shared_preload_libraries = 'pg_stat_statements'
# Logging
log_statement = 'mod' # Log modifications
log_min_duration_statement = 1000 # Log slow queries (1s+)
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '2. Operational Procedures
Daily Operations Checklist
#!/bin/bash
# daily_postgresql_health_check.sh
echo "=== PostgreSQL Health Check - $(date) ==="
# Check container status
echo "1. Container Status:"
docker ps | grep postgresql
# Check database connectivity
echo "2. Database Connectivity:"
docker exec postgresql pg_isready -U lakehouse_admin
# Check database size
echo "3. Database Size:"
docker exec postgresql psql -U lakehouse_admin -d lakehouse_db -c "
SELECT pg_size_pretty(pg_database_size('lakehouse_db'));"
# Check active connections
echo "4. Active Connections:"
docker exec postgresql psql -U lakehouse_admin -d lakehouse_db -c "
SELECT count(*) as active_connections FROM pg_stat_activity WHERE state = 'active';"
# Check replication lag (if applicable)
echo "5. Recent Activity:"
docker exec postgresql psql -U lakehouse_admin -d lakehouse_db -c "
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
WHERE schemaname LIKE 'lakehouse_%'
ORDER BY n_tup_ins + n_tup_upd + n_tup_del DESC LIMIT 10;"3. Disaster Recovery Plan
Recovery Time Objectives (RTO)
- Critical Systems: < 1 hour
- Analytics Systems: < 4 hours
- Reporting Systems: < 24 hours
Recovery Point Objectives (RPO)
- Transactional Data: < 15 minutes
- Analytics Data: < 1 hour
- Archive Data: < 24 hours
Emergency Procedures
# Emergency recovery script
#!/bin/bash
# emergency_recovery.sh
BACKUP_LOCATION="/backup/postgresql"
LATEST_BACKUP=$(ls -t $BACKUP_LOCATION/*.backup | head -1)
echo "Starting emergency recovery with backup: $LATEST_BACKUP"
# Stop current container
docker stop postgresql
# Backup current data (if possible)
docker run --rm -v $(pwd)/data/postgresql:/backup alpine tar czf /backup/emergency_backup_$(date +%Y%m%d_%H%M%S).tar.gz /backup/
# Restore from backup
docker run --rm -v $BACKUP_LOCATION:/backup -v $(pwd)/data/postgresql:/var/lib/postgresql/data \
postgres:15 pg_restore -U lakehouse_admin -d lakehouse_db --create --clean /backup/$(basename $LATEST_BACKUP)
# Restart container
docker start postgresql
echo "Emergency recovery completed"Conclusion
This PostgreSQL guide provides comprehensive coverage of:
- Architecture Understanding: How PostgreSQL fits in the Lakehouse
- Schema Design: Production-ready table structures for financial data
- Hands-on Practice: Step-by-step labs for real-world scenarios
- Integration: Seamless connection with Trino, Kafka, and other components
- Performance: Optimization techniques for large-scale operations
- Security: Enterprise-grade access control and data protection
- Operations: Backup, monitoring, and troubleshooting procedures
Next Steps
- Practice Labs: Execute all hands-on labs in sequence
- Schema Customization: Adapt schemas to specific Lakehouse requirements
- Integration Testing: Test connections with other platform components
- Performance Tuning: Optimize for your specific workload patterns
- Security Implementation: Deploy role-based access and encryption
- Monitoring Setup: Implement comprehensive monitoring and alerting
Additional Resources
- PostgreSQL Documentation: https://www.postgresql.org/docs/
- Performance Tuning: https://wiki.postgresql.org/wiki/Performance_Optimization
- Security Guide: https://www.postgresql.org/docs/current/security.html
- Backup Best Practices: https://www.postgresql.org/docs/current/backup.html
Ready for production deployment with enterprise-grade PostgreSQL management for Lakehouse Platform!
