Le Duy Khuong (Daniel)

Dev Productivity & Tools

PostgreSQL Database Guide — Lakehouse

PostgreSQL trong Lakehouse: schema, labs, integration.

2026-03-1716 min read

Table of Contents

  1. Overview
  2. PostgreSQL Architecture
  3. Database Schema Design
  4. Hands-on Labs
  5. Integration with Lakehouse
  6. Performance Optimization
  7. Security & Access Control
  8. Backup & Recovery
  9. Monitoring & Troubleshooting
  10. Production Best Practices

Overview

What is PostgreSQL in Lakehouse?

PostgreSQL serves as the primary relational database in the Lakehouse architecture, handling:

  • Transactional Data: Customer transactions, loan applications, payment records
  • Operational Data: User accounts, product catalogs, configuration data
  • Analytics Support: Pre-aggregated metrics, dimension tables, lookup data
  • Metadata Storage: Data lineage, catalog information, job execution logs

Key Features for Lakehouse

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Applications  │    │   Analytics     │    │   Data Science  │
│   (OLTP)        │    │   (OLAP)        │    │   (ML/AI)       │
└─────────┬───────┘    └─────────┬───────┘    └─────────┬───────┘
          │                      │                      │
          └──────────────────────┼──────────────────────┘
                                 │
                    ┌─────────────▼─────────────┐
                    │      PostgreSQL           │
                    │   - ACID Compliance       │
                    │   - JSON/JSONB Support    │
                    │   - Full-text Search      │
                    │   - Partitioning          │
                    │   - Replication           │
                    └───────────────────────────┘

PostgreSQL Architecture

1. Database Structure

-- Lakehouse Database Organization
lakehouse_db/
├── core/           -- Core business data
│   ├── customers       -- Customer master data
│   ├── loans          -- Loan applications & status
│   ├── payments       -- Payment transactions
│   └── products       -- Financial products
├── analytics/     -- Analytics & reporting
│   ├── customer_metrics
│   ├── loan_analytics
│   └── risk_indicators
├── metadata/      -- System metadata
│   ├── data_lineage
│   ├── job_execution
│   └── catalog_info
└── staging/       -- ETL staging area
    ├── raw_imports
    └── transformation_temp

2. Container Configuration

# From docker-compose.yml
postgresql:
  image: postgres:15
  container_name: postgresql
  environment:
    POSTGRES_DB: lakehouse_db
    POSTGRES_USER: lakehouse_admin
    POSTGRES_PASSWORD: CHANGE_ME_PASSWORD
  ports:
    - "5432:5432"
  volumes:
    - ./data/postgresql:/var/lib/postgresql/data
    - ./platform/postgresql/init:/docker-entrypoint-initdb.d

3. Connection Details

# Connection Parameters
Host: localhost
Port: 5432
Database: lakehouse_db
Username: lakehouse_admin
Password: CHANGE_ME_PASSWORD
 
# Connection String
postgresql://lakehouse_admin:CHANGE_ME_PASSWORD@localhost:5432/lakehouse_db

Database Schema Design

1. Core Business Tables

Customer Management

-- Customer master table
CREATE TABLE core.customers (
    customer_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_code VARCHAR(20) UNIQUE NOT NULL,
    full_name VARCHAR(255) NOT NULL,
    id_number VARCHAR(20) UNIQUE NOT NULL,
    phone_number VARCHAR(15),
    email VARCHAR(255),
    date_of_birth DATE,
    address JSONB,
    kyc_status VARCHAR(20) DEFAULT 'PENDING',
    credit_score INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
-- Indexing for performance (customer_code/id_number already indexed by UNIQUE)
CREATE INDEX idx_customer_kyc_status ON core.customers (kyc_status);
CREATE INDEX idx_customer_created_at ON core.customers (created_at);

Loan Management

-- Loan applications and lifecycle
CREATE TABLE core.loans (
    loan_id UUID DEFAULT gen_random_uuid(),
    loan_number VARCHAR(30) NOT NULL,
    customer_id UUID REFERENCES core.customers(customer_id),
    product_type VARCHAR(50) NOT NULL,
    loan_amount DECIMAL(15,2) NOT NULL,
    interest_rate DECIMAL(5,4) NOT NULL,
    term_months INTEGER NOT NULL,
    status VARCHAR(20) DEFAULT 'APPLIED',
    application_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    approval_date TIMESTAMP,
    disbursement_date TIMESTAMP,
    maturity_date DATE,
    collateral_info JSONB,
    risk_assessment JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    -- Partitioned table: PK + UNIQUE must include the partition key
    PRIMARY KEY (loan_id, application_date),
    UNIQUE (loan_number, application_date)
) PARTITION BY RANGE (application_date);
 
-- Partitions for better performance
CREATE TABLE core.loans_2024 PARTITION OF core.loans
    FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
CREATE INDEX idx_loans_customer ON core.loans (customer_id);

Payment Transactions

-- Payment and transaction records
CREATE TABLE core.payments (
    payment_id UUID DEFAULT gen_random_uuid(),
    transaction_id VARCHAR(50) NOT NULL,
    loan_id UUID,
    customer_id UUID REFERENCES core.customers(customer_id),
    payment_type VARCHAR(20) NOT NULL, -- 'DISBURSEMENT', 'REPAYMENT', 'FEE'
    amount DECIMAL(15,2) NOT NULL,
    payment_method VARCHAR(30),
    payment_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    due_date DATE,
    status VARCHAR(20) DEFAULT 'PENDING',
    channel VARCHAR(30), -- 'MOBILE_APP', 'BRANCH', 'BANK_TRANSFER'
    metadata JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    -- Partitioned table: PK must include the partition key
    PRIMARY KEY (payment_id, payment_date)
) PARTITION BY RANGE (payment_date);
 
-- Indexing for queries (loan_id kept as plain column: FK to a partitioned parent
-- needs the full partition key, omitted here for tutorial simplicity)
CREATE INDEX idx_payments_loan ON core.payments (loan_id);
CREATE INDEX idx_payments_customer ON core.payments (customer_id);
CREATE INDEX idx_payments_payment_date ON core.payments (payment_date);
CREATE INDEX idx_payments_status ON core.payments (status);

2. Analytics Tables

Customer Analytics

-- Pre-aggregated customer metrics
CREATE TABLE analytics.customer_metrics (
    metric_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    customer_id UUID REFERENCES core.customers(customer_id),
    metric_date DATE NOT NULL,
    total_loans INTEGER DEFAULT 0,
    active_loans INTEGER DEFAULT 0,
    total_borrowed DECIMAL(15,2) DEFAULT 0,
    total_repaid DECIMAL(15,2) DEFAULT 0,
    current_outstanding DECIMAL(15,2) DEFAULT 0,
    days_past_due INTEGER DEFAULT 0,
    risk_score DECIMAL(5,2),
    ltv_ratio DECIMAL(5,4), -- Loan-to-Value
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    -- Unique constraint for daily metrics
    UNIQUE(customer_id, metric_date)
);

Risk Analytics

-- Risk assessment and monitoring
CREATE TABLE analytics.risk_indicators (
    indicator_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    entity_type VARCHAR(20) NOT NULL, -- 'CUSTOMER', 'LOAN', 'PORTFOLIO'
    entity_id UUID NOT NULL,
    indicator_type VARCHAR(50) NOT NULL,
    indicator_value DECIMAL(10,4),
    threshold_value DECIMAL(10,4),
    alert_level VARCHAR(20), -- 'LOW', 'MEDIUM', 'HIGH', 'CRITICAL'
    calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    expires_at TIMESTAMP,
    metadata JSONB,
    -- Composite index for queries
    INDEX idx_entity_type_id (entity_type, entity_id),
    INDEX idx_alert_level (alert_level),
    INDEX idx_calculated_at (calculated_at)
);

3. Metadata Tables

Data Lineage

-- Track data flow and transformations
CREATE TABLE metadata.data_lineage (
    lineage_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    source_system VARCHAR(100) NOT NULL,
    source_table VARCHAR(200) NOT NULL,
    target_system VARCHAR(100) NOT NULL,
    target_table VARCHAR(200) NOT NULL,
    transformation_type VARCHAR(50),
    transformation_logic TEXT,
    pipeline_id VARCHAR(100),
    execution_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    record_count BIGINT,
    success_flag BOOLEAN DEFAULT TRUE,
    error_message TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Hands-on Labs

Lab 1: Database Setup and Initial Schema

Step 1: Connect to PostgreSQL

# Connect using psql
docker exec -it postgresql psql -U lakehouse_admin -d lakehouse_db
 
# Or using connection string
psql "postgresql://lakehouse_admin:CHANGE_ME_PASSWORD@localhost:5432/lakehouse_db"

Step 2: Create Schema Structure

-- Create schemas
CREATE SCHEMA IF NOT EXISTS core;
CREATE SCHEMA IF NOT EXISTS analytics;
CREATE SCHEMA IF NOT EXISTS metadata;
CREATE SCHEMA IF NOT EXISTS staging;
 
-- Verify schema creation
\dn

Step 3: Create Core Tables

-- Execute the customer table creation
-- (Use the customer table DDL from above)
 
-- Verify table creation
\dt core.*
 
-- Check table structure
\d core.customers

Lab 2: Sample Data Creation

Step 1: Insert Sample Customers

-- Insert test customers
INSERT INTO core.customers (
    customer_code, full_name, id_number, phone_number, email,
    date_of_birth, address, kyc_status, credit_score
) VALUES 
('CUST001', 'Nguyen Van An', '123456789', '0901234567', 'an.nguyen@email.com',
 '1985-05-15', '{"city": "Ho Chi Minh", "district": "District 1", "ward": "Ben Nghe"}',
 'APPROVED', 750),
('CUST002', 'Tran Thi Binh', '987654321', '0912345678', 'binh.tran@email.com',
 '1990-08-22', '{"city": "Ha Noi", "district": "Dong Da", "ward": "Lang Thuong"}',
 'APPROVED', 680),
('CUST003', 'Le Van Cuong', '456789123', '0923456789', 'cuong.le@email.com',
 '1988-12-10', '{"city": "Da Nang", "district": "Hai Chau", "ward": "Thuan Phuoc"}',
 'PENDING', 720);
 
-- Verify data insertion
SELECT customer_code, full_name, kyc_status, credit_score 
FROM core.customers;

Step 2: Create Sample Loans

-- First, get customer IDs
SELECT customer_id, customer_code FROM core.customers;
 
-- Insert sample loans (replace with actual customer_ids)
INSERT INTO core.loans (
    loan_number, customer_id, product_type, loan_amount, interest_rate, 
    term_months, status, application_date, collateral_info
) VALUES 
('LN2024001', 
 (SELECT customer_id FROM core.customers WHERE customer_code = 'CUST001'),
 'PERSONAL_LOAN', 50000000.00, 0.1250, 24, 'APPROVED', 
 '2024-01-15 10:30:00', '{"type": "MOTORBIKE", "value": 30000000}'),
('LN2024002',
 (SELECT customer_id FROM core.customers WHERE customer_code = 'CUST002'),
 'BUSINESS_LOAN', 100000000.00, 0.1150, 36, 'DISBURSED',
 '2024-02-20 14:15:00', '{"type": "REAL_ESTATE", "value": 150000000}');
 
-- Check loan data
SELECT l.loan_number, c.customer_code, l.product_type, 
       l.loan_amount, l.status
FROM core.loans l
JOIN core.customers c ON l.customer_id = c.customer_id;

Lab 3: Advanced Queries and Analytics

Step 1: Customer Portfolio Analysis

-- Customer portfolio summary
SELECT 
    c.customer_code,
    c.full_name,
    COUNT(l.loan_id) as total_loans,
    SUM(l.loan_amount) as total_borrowed,
    AVG(l.interest_rate) as avg_interest_rate,
    string_agg(DISTINCT l.product_type, ', ') as product_types
FROM core.customers c
LEFT JOIN core.loans l ON c.customer_id = l.customer_id
GROUP BY c.customer_id, c.customer_code, c.full_name
ORDER BY total_borrowed DESC NULLS LAST;

Step 2: Risk Assessment Queries

-- High-risk loan identification
SELECT 
    l.loan_number,
    c.customer_code,
    l.loan_amount,
    l.interest_rate,
    c.credit_score,
    CASE 
        WHEN c.credit_score < 600 THEN 'HIGH_RISK'
        WHEN c.credit_score < 700 THEN 'MEDIUM_RISK'
        ELSE 'LOW_RISK'
    END as risk_category,
    l.collateral_info->>'type' as collateral_type,
    l.collateral_info->>'value' as collateral_value
FROM core.loans l
JOIN core.customers c ON l.customer_id = c.customer_id
WHERE l.status IN ('APPROVED', 'DISBURSED')
ORDER BY c.credit_score ASC;

Step 3: JSON Data Operations

-- Working with address data (JSONB)
SELECT 
    customer_code,
    full_name,
    address->>'city' as city,
    address->>'district' as district,
    address->>'ward' as ward
FROM core.customers
WHERE address->>'city' = 'Ho Chi Minh';
 
-- Update address information
UPDATE core.customers 
SET address = jsonb_set(address, '{postal_code}', '"70000"')
WHERE address->>'city' = 'Ho Chi Minh';

Lab 4: Performance Optimization

Step 1: Index Creation and Analysis

-- Create performance indexes
CREATE INDEX CONCURRENTLY idx_customers_credit_score 
ON core.customers (credit_score DESC);
 
CREATE INDEX CONCURRENTLY idx_loans_amount_status 
ON core.loans (loan_amount DESC, status);
 
-- Analyze query performance
EXPLAIN ANALYZE
SELECT c.customer_code, l.loan_amount, l.status
FROM core.customers c
JOIN core.loans l ON c.customer_id = l.customer_id
WHERE c.credit_score > 700 AND l.loan_amount > 50000000;

Step 2: Table Partitioning Demo

-- Create monthly partitions for payments
CREATE TABLE core.payments_2024_01 PARTITION OF core.payments
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
 
CREATE TABLE core.payments_2024_02 PARTITION OF core.payments
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
 
-- Insert test payment data
INSERT INTO core.payments (
    transaction_id, loan_id, customer_id, payment_type, amount, 
    payment_method, payment_date, status, channel
) VALUES 
('TXN2024001',
 (SELECT loan_id FROM core.loans WHERE loan_number = 'LN2024001'),
 (SELECT customer_id FROM core.customers WHERE customer_code = 'CUST001'),
 'REPAYMENT', 2500000.00, 'BANK_TRANSFER', '2024-01-31 16:30:00', 
 'COMPLETED', 'MOBILE_APP');

Integration with Lakehouse

1. Trino Integration

The PostgreSQL database is already configured as a catalog in Trino:

# platform/trino/catalog/postgresql.properties
connector.name=postgresql
connection-url=jdbc:postgresql://postgresql:5432/lakehouse_db
connection-user=lakehouse_admin
connection-password=CHANGE_ME_PASSWORD

Query PostgreSQL via Trino

-- Connect to Trino and query PostgreSQL data
SELECT 
    p.customer_code,
    p.full_name,
    p.credit_score,
    COUNT(l.loan_id) as loan_count,
    SUM(l.loan_amount) as total_amount
FROM postgresql.core.customers p
LEFT JOIN postgresql.core.loans l ON p.customer_id = l.customer_id
GROUP BY p.customer_code, p.full_name, p.credit_score
ORDER BY total_amount DESC;

2. Kafka Integration

Stream Changes to Kafka

-- Create a trigger function for change data capture
CREATE OR REPLACE FUNCTION notify_loan_status_change()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('loan_status_changed', 
        json_build_object(
            'loan_id', NEW.loan_id,
            'loan_number', NEW.loan_number,
            'old_status', OLD.status,
            'new_status', NEW.status,
            'changed_at', CURRENT_TIMESTAMP
        )::text
    );
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;
 
-- Create trigger
CREATE TRIGGER loan_status_change_trigger
    AFTER UPDATE OF status ON core.loans
    FOR EACH ROW
    EXECUTE FUNCTION notify_loan_status_change();

3. ETL Pipeline Integration

Staging Area for Data Loading

-- Create staging table for bulk data imports
CREATE TABLE staging.customer_imports (
    import_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    batch_id VARCHAR(50),
    source_system VARCHAR(100),
    raw_data JSONB,
    processed BOOLEAN DEFAULT FALSE,
    error_message TEXT,
    imported_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
-- ETL procedure for processing staged data
CREATE OR REPLACE FUNCTION process_customer_staging()
RETURNS INTEGER AS $$
DECLARE
    processed_count INTEGER := 0;
    staging_record RECORD;
BEGIN
    FOR staging_record IN 
        SELECT * FROM staging.customer_imports 
        WHERE NOT processed
    LOOP
        BEGIN
            -- Insert into core table
            INSERT INTO core.customers (
                customer_code, full_name, id_number, 
                phone_number, email, date_of_birth, address
            ) VALUES (
                staging_record.raw_data->>'customer_code',
                staging_record.raw_data->>'full_name',
                staging_record.raw_data->>'id_number',
                staging_record.raw_data->>'phone_number',
                staging_record.raw_data->>'email',
                (staging_record.raw_data->>'date_of_birth')::DATE,
                staging_record.raw_data->'address'
            );
            
            -- Mark as processed
            UPDATE staging.customer_imports 
            SET processed = TRUE 
            WHERE import_id = staging_record.import_id;
            
            processed_count := processed_count + 1;
            
        EXCEPTION WHEN OTHERS THEN
            -- Log error
            UPDATE staging.customer_imports 
            SET error_message = SQLERRM 
            WHERE import_id = staging_record.import_id;
        END;
    END LOOP;
    
    RETURN processed_count;
END;
$$ LANGUAGE plpgsql;

Performance Optimization

1. Query Optimization

Effective Indexing Strategy

-- Composite indexes for common query patterns
CREATE INDEX idx_loans_customer_status_date 
ON core.loans (customer_id, status, application_date DESC);
 
CREATE INDEX idx_payments_loan_date_amount 
ON core.payments (loan_id, payment_date DESC, amount DESC);
 
-- Partial indexes for specific conditions
CREATE INDEX idx_active_loans 
ON core.loans (customer_id, loan_amount) 
WHERE status IN ('APPROVED', 'DISBURSED');
 
-- GIN index for JSONB queries
CREATE INDEX idx_customer_address_gin 
ON core.customers USING GIN (address);

Query Optimization Examples

-- Optimized customer loan summary
WITH customer_summaries AS (
    SELECT 
        customer_id,
        COUNT(*) as loan_count,
        SUM(loan_amount) as total_amount,
        AVG(interest_rate) as avg_rate
    FROM core.loans 
    WHERE status IN ('APPROVED', 'DISBURSED')
    GROUP BY customer_id
)
SELECT 
    c.customer_code,
    c.full_name,
    c.credit_score,
    COALESCE(cs.loan_count, 0) as loan_count,
    COALESCE(cs.total_amount, 0) as total_amount,
    COALESCE(cs.avg_rate, 0) as avg_rate
FROM core.customers c
LEFT JOIN customer_summaries cs ON c.customer_id = cs.customer_id
WHERE c.kyc_status = 'APPROVED'
ORDER BY cs.total_amount DESC NULLS LAST;

2. Connection Pooling

PgBouncer Configuration

# pgbouncer.ini
[databases]
lakehouse_db = host=postgresql port=5432 dbname=lakehouse_db
 
[pgbouncer]
listen_port = 6432
listen_addr = *
auth_type = md5
auth_file = userlist.txt
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 25
server_reset_query = DISCARD ALL

3. Maintenance Operations

Regular Maintenance Tasks

-- Update table statistics
ANALYZE core.customers;
ANALYZE core.loans;
ANALYZE core.payments;
 
-- Vacuum operations
VACUUM ANALYZE core.customers;
 
-- Reindex operations
REINDEX INDEX CONCURRENTLY idx_customers_credit_score;
 
-- Check table sizes
SELECT 
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables 
WHERE schemaname LIKE 'lakehouse_%'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;

Security & Access Control

1. Role-Based Access Control

Create Application Roles

-- Create roles for different access levels
CREATE ROLE lakehouse_read_only NOLOGIN;
CREATE ROLE lakehouse_analyst NOLOGIN;
CREATE ROLE lakehouse_app_user NOLOGIN;
CREATE ROLE lakehouse_etl_user NOLOGIN;
 
-- Grant permissions to roles
GRANT USAGE ON SCHEMA core TO lakehouse_read_only;
GRANT SELECT ON ALL TABLES IN SCHEMA core TO lakehouse_read_only;
 
GRANT lakehouse_read_only TO lakehouse_analyst;
GRANT USAGE ON SCHEMA analytics TO lakehouse_analyst;
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA analytics TO lakehouse_analyst;
 
-- Create specific users
CREATE USER lakehouse_app_reader WITH PASSWORD 'reader_password_2024';
GRANT lakehouse_read_only TO lakehouse_app_reader;
 
CREATE USER lakehouse_data_analyst WITH PASSWORD 'analyst_password_2024';
GRANT lakehouse_analyst TO lakehouse_data_analyst;

2. Row Level Security

Implement Customer Data Protection

-- Enable RLS on sensitive tables
ALTER TABLE core.customers ENABLE ROW LEVEL SECURITY;
 
-- Create policy for customer access
CREATE POLICY customer_own_data ON core.customers
    FOR ALL TO lakehouse_app_user
    USING (customer_id = current_setting('app.current_customer_id')::UUID);
 
-- Policy for analysts (aggregate access only)
CREATE POLICY analyst_aggregate_access ON core.customers
    FOR SELECT TO lakehouse_analyst
    USING (true); -- Allow all for analytics, but sensitive fields can be masked

3. Data Encryption

Column-Level Encryption

-- Create extension for encryption
CREATE EXTENSION IF NOT EXISTS pgcrypto;
 
-- Encrypt sensitive data
ALTER TABLE core.customers 
ADD COLUMN encrypted_id_number TEXT;
 
UPDATE core.customers 
SET encrypted_id_number = pgp_sym_encrypt(id_number, 'lakehouse_encryption_key_2024');
 
-- Query encrypted data
SELECT 
    customer_code,
    pgp_sym_decrypt(encrypted_id_number::bytea, 'lakehouse_encryption_key_2024') as id_number
FROM core.customers 
WHERE customer_code = 'CUST001';

Backup & Recovery

1. Backup Strategy

Automated Backup Script

#!/bin/bash
# backup_postgresql.sh
 
BACKUP_DIR="/backup/postgresql"
DB_NAME="lakehouse_db"
DB_USER="lakehouse_admin"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
 
# Create backup directory
mkdir -p $BACKUP_DIR
 
# Full database backup
docker exec postgresql pg_dump -U $DB_USER -d $DB_NAME \
    --format=custom --compress=9 \
    > $BACKUP_DIR/lakehouse_db_$TIMESTAMP.backup
 
# Schema-only backup
docker exec postgresql pg_dump -U $DB_USER -d $DB_NAME \
    --schema-only --format=plain \
    > $BACKUP_DIR/lakehouse_schema_$TIMESTAMP.sql
 
# Data-only backup for specific tables
docker exec postgresql pg_dump -U $DB_USER -d $DB_NAME \
    --data-only --table=core.customers --table=core.loans \
    --format=custom \
    > $BACKUP_DIR/core_data_$TIMESTAMP.backup
 
echo "Backup completed: $TIMESTAMP"

Point-in-Time Recovery Setup

# Enable WAL archiving in postgresql.conf
wal_level = replica
archive_mode = on
archive_command = 'cp %p /backup/postgresql/wal/%f'
max_wal_senders = 3

2. Recovery Procedures

Database Restore

# Restore full database
docker exec -i postgresql pg_restore -U lakehouse_admin -d postgres \
    --create --clean --if-exists \
    < /backup/postgresql/lakehouse_db_20241201_120000.backup
 
# Restore specific schema
docker exec -i postgresql psql -U lakehouse_admin -d lakehouse_db \
    < /backup/postgresql/lakehouse_schema_20241201_120000.sql

Monitoring & Troubleshooting

1. Performance Monitoring

Key Metrics to Monitor

-- Database size and growth
SELECT 
    pg_database.datname,
    pg_size_pretty(pg_database_size(pg_database.datname)) AS size
FROM pg_database
ORDER BY pg_database_size(pg_database.datname) DESC;
 
-- Active connections
SELECT 
    state,
    COUNT(*) as connection_count
FROM pg_stat_activity 
WHERE datname = 'lakehouse_db'
GROUP BY state;
 
-- Slow queries identification
SELECT 
    query,
    state,
    now() - query_start as duration,
    pid
FROM pg_stat_activity 
WHERE now() - query_start > interval '5 minutes'
    AND state != 'idle'
ORDER BY duration DESC;
 
-- Index usage statistics
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes 
WHERE schemaname LIKE 'lakehouse_%'
ORDER BY idx_scan DESC;

2. Log Analysis

Important Log Patterns

# Monitor PostgreSQL logs
docker logs postgresql --tail=100 -f
 
# Check for errors
docker logs postgresql 2>&1 | grep -i error
 
# Monitor connection issues
docker logs postgresql 2>&1 | grep -i "connection"
 
# Check slow queries (if log_min_duration_statement is set)
docker logs postgresql 2>&1 | grep -i "duration"

3. Common Issues and Solutions

Connection Issues

-- Check connection limits
SHOW max_connections;
 
-- Current connection count
SELECT count(*) FROM pg_stat_activity;
 
-- Kill long-running queries
SELECT pg_terminate_backend(pid) 
FROM pg_stat_activity 
WHERE state != 'idle' 
    AND now() - query_start > interval '1 hour';

Performance Issues

-- Identify missing indexes
SELECT 
    schemaname,
    tablename,
    seq_scan,
    seq_tup_read,
    seq_tup_read / seq_scan as avg_tup_per_scan
FROM pg_stat_user_tables 
WHERE seq_scan > 0
ORDER BY seq_tup_read DESC;
 
-- Lock analysis
SELECT 
    blocked_locks.pid AS blocked_pid,
    blocked_activity.usename AS blocked_user,
    blocking_locks.pid AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_statement,
    blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.database IS NOT DISTINCT FROM blocked_locks.database
    AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
    AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
    AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
    AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
    AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
    AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
    AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
    AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted;

Production Best Practices

1. Configuration Optimization

PostgreSQL Configuration

# postgresql.conf optimizations for Lakehouse workload
 
# Memory settings
shared_buffers = 256MB                    # 25% of RAM
effective_cache_size = 1GB                # 75% of RAM
work_mem = 4MB                           # Per operation
maintenance_work_mem = 64MB              # For maintenance operations
 
# Checkpoint settings
checkpoint_completion_target = 0.9
wal_buffers = 16MB
random_page_cost = 1.1                   # For SSD storage
 
# Connection settings
max_connections = 200
shared_preload_libraries = 'pg_stat_statements'
 
# Logging
log_statement = 'mod'                    # Log modifications
log_min_duration_statement = 1000        # Log slow queries (1s+)
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '

2. Operational Procedures

Daily Operations Checklist

#!/bin/bash
# daily_postgresql_health_check.sh
 
echo "=== PostgreSQL Health Check - $(date) ==="
 
# Check container status
echo "1. Container Status:"
docker ps | grep postgresql
 
# Check database connectivity
echo "2. Database Connectivity:"
docker exec postgresql pg_isready -U lakehouse_admin
 
# Check database size
echo "3. Database Size:"
docker exec postgresql psql -U lakehouse_admin -d lakehouse_db -c "
SELECT pg_size_pretty(pg_database_size('lakehouse_db'));"
 
# Check active connections
echo "4. Active Connections:"
docker exec postgresql psql -U lakehouse_admin -d lakehouse_db -c "
SELECT count(*) as active_connections FROM pg_stat_activity WHERE state = 'active';"
 
# Check replication lag (if applicable)
echo "5. Recent Activity:"
docker exec postgresql psql -U lakehouse_admin -d lakehouse_db -c "
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del 
FROM pg_stat_user_tables 
WHERE schemaname LIKE 'lakehouse_%' 
ORDER BY n_tup_ins + n_tup_upd + n_tup_del DESC LIMIT 10;"

3. Disaster Recovery Plan

Recovery Time Objectives (RTO)

  • Critical Systems: < 1 hour
  • Analytics Systems: < 4 hours
  • Reporting Systems: < 24 hours

Recovery Point Objectives (RPO)

  • Transactional Data: < 15 minutes
  • Analytics Data: < 1 hour
  • Archive Data: < 24 hours

Emergency Procedures

# Emergency recovery script
#!/bin/bash
# emergency_recovery.sh
 
BACKUP_LOCATION="/backup/postgresql"
LATEST_BACKUP=$(ls -t $BACKUP_LOCATION/*.backup | head -1)
 
echo "Starting emergency recovery with backup: $LATEST_BACKUP"
 
# Stop current container
docker stop postgresql
 
# Backup current data (if possible)
docker run --rm -v $(pwd)/data/postgresql:/backup alpine tar czf /backup/emergency_backup_$(date +%Y%m%d_%H%M%S).tar.gz /backup/
 
# Restore from backup
docker run --rm -v $BACKUP_LOCATION:/backup -v $(pwd)/data/postgresql:/var/lib/postgresql/data \
    postgres:15 pg_restore -U lakehouse_admin -d lakehouse_db --create --clean /backup/$(basename $LATEST_BACKUP)
 
# Restart container
docker start postgresql
 
echo "Emergency recovery completed"

Conclusion

This PostgreSQL guide provides comprehensive coverage of:

  1. Architecture Understanding: How PostgreSQL fits in the Lakehouse
  2. Schema Design: Production-ready table structures for financial data
  3. Hands-on Practice: Step-by-step labs for real-world scenarios
  4. Integration: Seamless connection with Trino, Kafka, and other components
  5. Performance: Optimization techniques for large-scale operations
  6. Security: Enterprise-grade access control and data protection
  7. Operations: Backup, monitoring, and troubleshooting procedures

Next Steps

  1. Practice Labs: Execute all hands-on labs in sequence
  2. Schema Customization: Adapt schemas to specific Lakehouse requirements
  3. Integration Testing: Test connections with other platform components
  4. Performance Tuning: Optimize for your specific workload patterns
  5. Security Implementation: Deploy role-based access and encryption
  6. Monitoring Setup: Implement comprehensive monitoring and alerting

Additional Resources


Ready for production deployment with enterprise-grade PostgreSQL management for Lakehouse Platform!

LDK

Le Duy Khuong

AI Transformation & Digital Strategy. Writing about agentic systems, engineering leadership, and building in public.