Le Duy Khuong (Daniel)

Dev Productivity & Tools

MinIO Object Storage Guide — Lakehouse

MinIO trong Lakehouse: S3 API, buckets, security.

2026-03-1720 min read

Table of Contents

  1. Overview
  2. MinIO Architecture
  3. Bucket Strategy & Data Organization
  4. Hands-on Labs
  5. Data Lake Integration
  6. Security & Access Control
  7. Performance Optimization
  8. Backup & Versioning
  9. Monitoring & Troubleshooting
  10. Production Best Practices

Overview

What is MinIO in Lakehouse?

MinIO serves as the primary object storage layer in the Lakehouse architecture, providing:

  • Data Lake Storage: Raw data ingestion and storage for analytics
  • Document Storage: Customer documents, loan applications, KYC files
  • Backup Storage: Database backups, configuration backups
  • ETL Staging: Temporary storage for data transformation processes
  • ML/AI Datasets: Training data, model artifacts, feature stores

Key Features for Lakehouse

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Applications  │    │   Data Sources  │    │   Analytics     │
│   (Upload/API)  │    │   (Kafka/ETL)   │    │   (Trino/Spark) │
└─────────┬───────┘    └─────────┬───────┘    └─────────┬───────┘
          │                      │                      │
          └──────────────────────┼──────────────────────┘
                                 │
                    ┌─────────────▼─────────────┐
                    │         MinIO             │
                    │   - S3 Compatible API     │
                    │   - Erasure Coding        │
                    │   - Versioning            │
                    │   - Encryption            │
                    │   - Multi-tenancy         │
                    └───────────────────────────┘

MinIO Architecture

1. Service Configuration

# From docker-compose.yml
minio:
  image: minio/minio:latest
  container_name: minio
  command: server /data --console-address ":9001"
  environment:
    MINIO_ROOT_USER: lakehouse_admin
    MINIO_ROOT_PASSWORD: CHANGE_ME_PASSWORD
    MINIO_REGION_NAME: lakehouse-region
  ports:
    - "9000:9000"    # S3 API
    - "9001:9001"    # Web Console
  volumes:
    - ./data/minio:/data

2. Access Configuration

# MinIO Credentials
Access Key: lakehouse_admin
Secret Key: CHANGE_ME_PASSWORD
Endpoint: http://localhost:9000
Console: http://localhost:9001
Region: lakehouse-region

3. S3 API Compatibility

MinIO provides full S3 API compatibility, allowing use of:

  • AWS SDK for various languages
  • AWS CLI tools
  • S3-compatible applications
  • Direct REST API calls

Bucket Strategy & Data Organization

1. Lakehouse Bucket Structure

lakehouse/
├── raw-data/           # Raw data ingestion
│   ├── streaming/          # Kafka stream outputs
│   ├── batch/             # Daily/hourly batch imports
│   ├── external/          # Third-party data feeds
│   └── apis/              # API responses and logs
├── processed-data/     # Transformed data
│   ├── clean/             # Cleaned and validated data
│   ├── enriched/          # Data with additional context
│   ├── aggregated/        # Pre-computed aggregations
│   └── curated/           # Business-ready datasets
├── documents/          # Business documents
│   ├── customer-kyc/      # KYC documents
│   ├── loan-applications/ # Loan documents
│   ├── contracts/         # Legal contracts
│   └── reports/           # Generated reports
├── ml-datasets/        # Machine learning data
│   ├── training/          # Training datasets
│   ├── models/            # Model artifacts
│   ├── features/          # Feature stores
│   └── predictions/       # Model outputs
├── backups/           # System backups
│   ├── postgresql/        # Database backups
│   ├── kafka/             # Kafka topic backups
│   └── configurations/    # Config backups
└── temp/              # Temporary storage
    ├── etl-staging/       # ETL intermediate files
    ├── uploads/           # Temporary uploads
    └── processing/        # Data processing workspace

2. Data Partitioning Strategy

# Date-based partitioning for time-series data
raw-data/streaming/
├── year=2024/
│   ├── month=01/
│   │   ├── day=01/
│   │   │   ├── hour=00/
│   │   │   │   ├── customer_events_000.parquet
│   │   │   │   └── loan_events_000.parquet
│   │   │   └── hour=01/
│   │   └── day=02/
│   └── month=02/
└── year=2025/

# Entity-based partitioning for documents
documents/customer-kyc/
├── customer_type=individual/
│   ├── region=hcm/
│   │   ├── customer_id=12345/
│   │   │   ├── id_card_front.jpg
│   │   │   ├── id_card_back.jpg
│   │   │   └── selfie.jpg
│   │   └── customer_id=12346/
│   └── region=hanoi/
└── customer_type=business/

Hands-on Labs

Lab 1: MinIO Setup and Basic Operations

Step 1: Verify MinIO Service

# Check MinIO container status
docker ps | grep minio
 
# Check MinIO logs
docker logs minio --tail=20
 
# Test MinIO health
curl -I http://localhost:9000/minio/health/live

Step 2: Install MinIO Client (mc)

# Download and install mc client
wget https://dl.min.io/client/mc/release/windows-amd64/mc.exe -O mc.exe
 
# Add MinIO server alias
./mc.exe alias set minio http://localhost:9000 lakehouse_admin CHANGE_ME_PASSWORD
 
# Test connection
./mc.exe admin info minio

Step 3: Create Initial Bucket Structure

# Create main buckets
./mc.exe mb minio/raw-data
./mc.exe mb minio/processed-data
./mc.exe mb minio/documents
./mc.exe mb minio/ml-datasets
./mc.exe mb minio/backups
./mc.exe mb minio/temp
 
# List all buckets
./mc.exe ls minio
 
# Set bucket policies (public read for processed data)
./mc.exe policy set download minio/processed-data

Lab 2: Data Upload and Management

Step 1: Upload Sample Data Files

# Create sample data directory
mkdir -p sample_data/customer_data
mkdir -p sample_data/loan_data
 
# Create sample CSV files
cat > sample_data/customer_data/customers_20240101.csv << 'EOF'
customer_id,customer_code,full_name,phone_number,email,kyc_status,credit_score,created_at
1,CUST001,Nguyen Van An,0901234567,an.nguyen@email.com,APPROVED,750,2024-01-01 10:30:00
2,CUST002,Tran Thi Binh,0912345678,binh.tran@email.com,APPROVED,680,2024-01-01 11:15:00
3,CUST003,Le Van Cuong,0923456789,cuong.le@email.com,PENDING,720,2024-01-01 14:20:00
EOF
 
cat > sample_data/loan_data/loans_20240101.csv << 'EOF'
loan_id,loan_number,customer_id,product_type,loan_amount,interest_rate,term_months,status,application_date
1,LN2024001,1,PERSONAL_LOAN,50000000,0.1250,24,APPROVED,2024-01-01 10:30:00
2,LN2024002,2,BUSINESS_LOAN,100000000,0.1150,36,DISBURSED,2024-01-01 14:15:00
EOF
 
# Upload files with proper structure
./mc.exe cp sample_data/customer_data/customers_20240101.csv \
    minio/raw-data/batch/year=2024/month=01/day=01/customers_20240101.csv
 
./mc.exe cp sample_data/loan_data/loans_20240101.csv \
    minio/raw-data/batch/year=2024/month=01/day=01/loans_20240101.csv
 
# Verify uploads
./mc.exe ls minio/raw-data/batch/year=2024/month=01/day=01/

Step 2: Create JSON Data Files

# Create JSON sample data
cat > sample_data/customer_events.json << 'EOF'
{"event_id": "evt_001", "customer_id": "CUST001", "event_type": "LOGIN", "timestamp": "2024-01-01T10:30:00Z", "ip_address": "192.168.1.100", "user_agent": "MobileApp/1.0"}
{"event_id": "evt_002", "customer_id": "CUST001", "event_type": "LOAN_APPLICATION", "timestamp": "2024-01-01T10:35:00Z", "loan_amount": 50000000, "product_type": "PERSONAL_LOAN"}
{"event_id": "evt_003", "customer_id": "CUST002", "event_type": "PAYMENT", "timestamp": "2024-01-01T11:00:00Z", "amount": 2500000, "payment_method": "BANK_TRANSFER"}
EOF
 
# Upload JSON data
./mc.exe cp sample_data/customer_events.json \
    minio/raw-data/streaming/year=2024/month=01/day=01/hour=10/customer_events.json
 
# Create Parquet sample (simulate from processing)
mkdir -p sample_data/processed
cat > sample_data/processed/customer_summary.csv << 'EOF'
customer_code,total_loans,total_amount,avg_interest_rate,risk_category,last_activity
CUST001,1,50000000,0.1250,LOW_RISK,2024-01-01
CUST002,1,100000000,0.1150,LOW_RISK,2024-01-01
CUST003,0,0,0,MEDIUM_RISK,2024-01-01
EOF
 
./mc.exe cp sample_data/processed/customer_summary.csv \
    minio/processed-data/curated/customer_summary/date=2024-01-01/customer_summary.csv

Lab 3: Document Management

Step 1: Simulate KYC Document Upload

# Create sample document files
mkdir -p sample_data/documents/kyc
 
# Create placeholder image files (in real scenario, these would be actual images)
echo "ID Card Front Image Data" > sample_data/documents/kyc/id_front_CUST001.txt
echo "ID Card Back Image Data" > sample_data/documents/kyc/id_back_CUST001.txt
echo "Selfie Image Data" > sample_data/documents/kyc/selfie_CUST001.txt
 
# Upload KYC documents with proper structure
./mc.exe cp sample_data/documents/kyc/id_front_CUST001.txt \
    minio/documents/customer-kyc/customer_type=individual/region=hcm/customer_id=CUST001/id_card_front.jpg
 
./mc.exe cp sample_data/documents/kyc/id_back_CUST001.txt \
    minio/documents/customer-kyc/customer_type=individual/region=hcm/customer_id=CUST001/id_card_back.jpg
 
./mc.exe cp sample_data/documents/kyc/selfie_CUST001.txt \
    minio/documents/customer-kyc/customer_type=individual/region=hcm/customer_id=CUST001/selfie.jpg
 
# Verify document upload
./mc.exe ls minio/documents/customer-kyc/customer_type=individual/region=hcm/customer_id=CUST001/

Step 2: Generate Reports and Store

# Create sample report
cat > sample_data/daily_loan_report_20240101.json << 'EOF'
{
    "report_date": "2024-01-01",
    "report_type": "DAILY_LOAN_SUMMARY",
    "total_applications": 15,
    "total_amount": 750000000,
    "approved_count": 12,
    "rejected_count": 3,
    "avg_amount": 50000000,
    "top_products": [
        {"product": "PERSONAL_LOAN", "count": 8, "amount": 400000000},
        {"product": "BUSINESS_LOAN", "count": 4, "amount": 350000000}
    ],
    "generated_at": "2024-01-02T00:30:00Z"
}
EOF
 
# Upload report
./mc.exe cp sample_data/daily_loan_report_20240101.json \
    minio/documents/reports/daily/year=2024/month=01/daily_loan_report_20240101.json

Lab 4: Python Integration

Step 1: Install Required Libraries

# Install Python dependencies
pip install boto3 pandas pyarrow

Step 2: Create Python MinIO Client

# Create file: scripts/minio_operations.py
import boto3
import pandas as pd
import json
from datetime import datetime
from botocore.exceptions import ClientError
 
class LakehouseMinIOClient:
    def __init__(self):
        self.s3_client = boto3.client(
            's3',
            endpoint_url='http://localhost:9000',
            aws_access_key_id='lakehouse_admin',
            aws_secret_access_key='CHANGE_ME_PASSWORD',
            region_name='lakehouse-region'
        )
    
    def upload_dataframe_as_parquet(self, df, bucket, key):
        """Upload pandas DataFrame as Parquet file"""
        try:
            # Convert DataFrame to Parquet in memory
            parquet_buffer = df.to_parquet(index=False)
            
            # Upload to MinIO
            self.s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=parquet_buffer,
                ContentType='application/octet-stream'
            )
            print(f"Successfully uploaded {key} to {bucket}")
            return True
        except Exception as e:
            print(f"Error uploading {key}: {str(e)}")
            return False
    
    def read_parquet_to_dataframe(self, bucket, key):
        """Read Parquet file from MinIO to pandas DataFrame"""
        try:
            response = self.s3_client.get_object(Bucket=bucket, Key=key)
            df = pd.read_parquet(response['Body'])
            print(f"Successfully read {key} from {bucket}")
            return df
        except Exception as e:
            print(f"Error reading {key}: {str(e)}")
            return None
    
    def upload_json_data(self, data, bucket, key):
        """Upload JSON data to MinIO"""
        try:
            json_data = json.dumps(data, indent=2, ensure_ascii=False)
            self.s3_client.put_object(
                Bucket=bucket,
                Key=key,
                Body=json_data.encode('utf-8'),
                ContentType='application/json'
            )
            print(f"Successfully uploaded JSON to {bucket}/{key}")
            return True
        except Exception as e:
            print(f"Error uploading JSON: {str(e)}")
            return False
    
    def list_objects(self, bucket, prefix=''):
        """List objects in bucket with optional prefix"""
        try:
            response = self.s3_client.list_objects_v2(
                Bucket=bucket,
                Prefix=prefix
            )
            
            if 'Contents' in response:
                objects = [obj['Key'] for obj in response['Contents']]
                print(f"Found {len(objects)} objects in {bucket}/{prefix}")
                return objects
            else:
                print(f"No objects found in {bucket}/{prefix}")
                return []
        except Exception as e:
            print(f"Error listing objects: {str(e)}")
            return []
 
# Example usage function
def demo_operations():
    """Demonstrate MinIO operations"""
    client = LakehouseMinIOClient()
    
    # Create sample DataFrame
    customer_data = {
        'customer_id': ['CUST001', 'CUST002', 'CUST003'],
        'customer_name': ['Nguyen Van An', 'Tran Thi Binh', 'Le Van Cuong'],
        'credit_score': [750, 680, 720],
        'total_loans': [1, 1, 0],
        'total_amount': [50000000, 100000000, 0],
        'last_updated': [datetime.now()] * 3
    }
    df = pd.DataFrame(customer_data)
    
    # Upload DataFrame as Parquet
    parquet_key = 'curated/customer_analytics/date=2024-01-01/customer_summary.parquet'
    client.upload_dataframe_as_parquet(df, 'processed-data', parquet_key)
    
    # Upload JSON metadata
    metadata = {
        'dataset': 'customer_summary',
        'created_at': datetime.now().isoformat(),
        'record_count': len(df),
        'schema': list(df.columns),
        'data_sources': ['postgresql.core.customers', 'postgresql.core.loans']
    }
    
    metadata_key = 'curated/customer_analytics/date=2024-01-01/metadata.json'
    client.upload_json_data(metadata, 'processed-data', metadata_key)
    
    # List objects in processed data
    objects = client.list_objects('processed-data', 'curated/customer_analytics/')
    for obj in objects:
        print(f"  - {obj}")
    
    # Read back the data
    df_read = client.read_parquet_to_dataframe('processed-data', parquet_key)
    if df_read is not None:
        print("\nRead back data:")
        print(df_read.head())
 
if __name__ == "__main__":
    demo_operations()

Step 3: Run Python Integration

# Create and run the Python script
cat > scripts/minio_operations.py << 'EOF'
# (Insert the Python code from above)
EOF
 
# Run the script
python scripts/minio_operations.py

Data Lake Integration

1. Trino Integration with MinIO

Configure Trino Hive Catalog for MinIO

# platform/trino/catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://metastore:9083
hive.s3.endpoint=http://minio:9000
hive.s3.aws-access-key=lakehouse_admin
hive.s3.aws-secret-key=CHANGE_ME_PASSWORD
hive.s3.path-style-access=true
hive.s3.ssl.enabled=false

Query MinIO Data via Trino

-- Create external table pointing to MinIO data
CREATE TABLE hive.lakehouse_db.customer_events (
    event_id VARCHAR,
    customer_id VARCHAR,
    event_type VARCHAR,
    timestamp TIMESTAMP,
    metadata JSON
)
WITH (
    external_location = 's3a://raw-data/streaming/',
    format = 'JSON',
    partitioned_by = ARRAY['year', 'month', 'day', 'hour']
);
 
-- Query data from MinIO
SELECT 
    event_type,
    COUNT(*) as event_count,
    DATE(timestamp) as event_date
FROM hive.lakehouse_db.customer_events
WHERE year = '2024' AND month = '01'
GROUP BY event_type, DATE(timestamp)
ORDER BY event_date DESC, event_count DESC;

2. Kafka to MinIO Pipeline

Create Kafka Connect S3 Sink

{
    "name": "lakehouse-s3-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "tasks.max": "3",
        "topics": "customer-events,lakehouse-loan-events,lakehouse-payment-events",
        "s3.region": "lakehouse-region",
        "s3.bucket.name": "raw-data",
        "s3.part.size": "5242880",
        "flush.size": "10000",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "path.format": "YYYY/MM/dd/HH",
        "partition.duration.ms": "3600000",
        "timestamp.extractor": "Record",
        "store.url": "http://minio:9000",
        "s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProvider"
    }
}

3. ETL Processing Pipeline

Create ETL Script for Data Processing

# scripts/etl_minio_processing.py
import boto3
import pandas as pd
import json
from datetime import datetime, timedelta
import pyarrow as pa
import pyarrow.parquet as pq
 
class LakehouseDataProcessor:
    def __init__(self):
        self.s3_client = boto3.client(
            's3',
            endpoint_url='http://localhost:9000',
            aws_access_key_id='lakehouse_admin',
            aws_secret_access_key='CHANGE_ME_PASSWORD',
            region_name='lakehouse-region'
        )
    
    def process_daily_customer_events(self, processing_date):
        """Process daily customer events from raw to processed"""
        
        # Define paths
        raw_prefix = f"streaming/year={processing_date.year}/month={processing_date.month:02d}/day={processing_date.day:02d}/"
        processed_key = f"clean/customer_events/date={processing_date.strftime('%Y-%m-%d')}/customer_events_processed.parquet"
        
        # Read raw data
        raw_events = []
        try:
            response = self.s3_client.list_objects_v2(
                Bucket='raw-data',
                Prefix=raw_prefix
            )
            
            if 'Contents' in response:
                for obj in response['Contents']:
                    if obj['Key'].endswith('.json'):
                        obj_response = self.s3_client.get_object(
                            Bucket='raw-data',
                            Key=obj['Key']
                        )
                        content = obj_response['Body'].read().decode('utf-8')
                        
                        # Parse JSON lines
                        for line in content.strip().split('\n'):
                            if line.strip():
                                raw_events.append(json.loads(line))
            
            if not raw_events:
                print(f"No events found for {processing_date}")
                return False
            
            # Convert to DataFrame
            df = pd.DataFrame(raw_events)
            
            # Data cleaning and enrichment
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df['processing_date'] = processing_date
            df['event_hour'] = df['timestamp'].dt.hour
            df['is_business_hours'] = (df['event_hour'] >= 8) & (df['event_hour'] <= 18)
            
            # Add data quality flags
            df['has_valid_customer_id'] = df['customer_id'].notna() & (df['customer_id'] != '')
            df['event_type_category'] = df['event_type'].map({
                'LOGIN': 'USER_ACTIVITY',
                'LOGOUT': 'USER_ACTIVITY',
                'LOAN_APPLICATION': 'BUSINESS_EVENT',
                'PAYMENT': 'BUSINESS_EVENT',
                'KYC_UPDATE': 'PROFILE_EVENT'
            }).fillna('OTHER')
            
            # Save processed data
            parquet_buffer = df.to_parquet(index=False)
            self.s3_client.put_object(
                Bucket='processed-data',
                Key=processed_key,
                Body=parquet_buffer,
                ContentType='application/octet-stream'
            )
            
            # Create processing metadata
            metadata = {
                'processing_date': processing_date.isoformat(),
                'source_files_count': len([obj for obj in response['Contents'] if obj['Key'].endswith('.json')]),
                'total_events': len(df),
                'valid_events': df['has_valid_customer_id'].sum(),
                'event_types': df['event_type'].value_counts().to_dict(),
                'processing_timestamp': datetime.now().isoformat(),
                'data_quality_score': df['has_valid_customer_id'].mean()
            }
            
            metadata_key = f"clean/customer_events/date={processing_date.strftime('%Y-%m-%d')}/processing_metadata.json"
            self.s3_client.put_object(
                Bucket='processed-data',
                Key=metadata_key,
                Body=json.dumps(metadata, indent=2).encode('utf-8'),
                ContentType='application/json'
            )
            
            print(f"Successfully processed {len(df)} events for {processing_date}")
            return True
            
        except Exception as e:
            print(f"Error processing events for {processing_date}: {str(e)}")
            return False
    
    def generate_daily_aggregations(self, processing_date):
        """Generate daily aggregations from processed data"""
        
        try:
            # Read processed events
            processed_key = f"clean/customer_events/date={processing_date.strftime('%Y-%m-%d')}/customer_events_processed.parquet"
            response = self.s3_client.get_object(
                Bucket='processed-data',
                Key=processed_key
            )
            
            df = pd.read_parquet(response['Body'])
            
            # Generate customer daily summary
            customer_summary = df.groupby('customer_id').agg({
                'event_id': 'count',
                'event_type': lambda x: x.value_counts().to_dict(),
                'timestamp': ['min', 'max'],
                'is_business_hours': 'sum'
            }).reset_index()
            
            customer_summary.columns = [
                'customer_id', 'total_events', 'event_breakdown', 
                'first_activity', 'last_activity', 'business_hours_events'
            ]
            
            customer_summary['activity_duration_hours'] = (
                pd.to_datetime(customer_summary['last_activity']) - 
                pd.to_datetime(customer_summary['first_activity'])
            ).dt.total_seconds() / 3600
            
            # Save aggregated data
            agg_key = f"aggregated/customer_daily_summary/date={processing_date.strftime('%Y-%m-%d')}/summary.parquet"
            parquet_buffer = customer_summary.to_parquet(index=False)
            
            self.s3_client.put_object(
                Bucket='processed-data',
                Key=agg_key,
                Body=parquet_buffer,
                ContentType='application/octet-stream'
            )
            
            print(f"Generated daily aggregations for {len(customer_summary)} customers")
            return True
            
        except Exception as e:
            print(f"Error generating aggregations for {processing_date}: {str(e)}")
            return False
 
def run_daily_etl():
    """Run daily ETL process"""
    processor = LakehouseDataProcessor()
    processing_date = datetime.now().date() - timedelta(days=1)  # Process previous day
    
    print(f"Starting ETL processing for {processing_date}")
    
    # Step 1: Process raw events
    if processor.process_daily_customer_events(processing_date):
        print("✓ Event processing completed")
        
        # Step 2: Generate aggregations
        if processor.generate_daily_aggregations(processing_date):
            print("✓ Aggregation generation completed")
            print(f"ETL processing completed successfully for {processing_date}")
        else:
            print("✗ Aggregation generation failed")
    else:
        print("✗ Event processing failed")
 
if __name__ == "__main__":
    run_daily_etl()

Security & Access Control

1. Bucket Policies and IAM

Create Service Account for Applications

# Create service account for Lakehouse applications
./mc.exe admin user add minio lakehouse-app-service lakehouseAppSecure2024
 
# Create policy for application access
cat > lakehouse-app-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::raw-data/*",
                "arn:aws:s3:::processed-data/*",
                "arn:aws:s3:::temp/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::raw-data",
                "arn:aws:s3:::processed-data",
                "arn:aws:s3:::temp"
            ]
        }
    ]
}
EOF
 
# Apply policy
./mc.exe admin policy add minio lakehouse-app-policy lakehouse-app-policy.json
./mc.exe admin policy set minio lakehouse-app-policy user=lakehouse-app-service

Read-only Access for Analytics

# Create read-only user for analytics
./mc.exe admin user add minio lakehouse-analytics-readonly lakehouseAnalyticsRead2024
 
# Create read-only policy
cat > lakehouse-readonly-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::processed-data",
                "arn:aws:s3:::processed-data/*",
                "arn:aws:s3:::documents",
                "arn:aws:s3:::documents/*"
            ]
        }
    ]
}
EOF
 
./mc.exe admin policy add minio lakehouse-readonly-policy lakehouse-readonly-policy.json
./mc.exe admin policy set minio lakehouse-readonly-policy user=lakehouse-analytics-readonly

2. Encryption Configuration

Server-Side Encryption

# Enable default encryption for sensitive buckets
./mc.exe encrypt set sse-s3 minio/documents
./mc.exe encrypt set sse-s3 minio/ml-datasets
 
# Verify encryption settings
./mc.exe encrypt info minio/documents

Client-Side Encryption Example

# Python example with client-side encryption
import boto3
from cryptography.fernet import Fernet
 
class EncryptedMinIOClient:
    def __init__(self, encryption_key=None):
        self.s3_client = boto3.client(
            's3',
            endpoint_url='http://localhost:9000',
            aws_access_key_id='lakehouse_admin',
            aws_secret_access_key='CHANGE_ME_PASSWORD',
            region_name='lakehouse-region'
        )
        
        # Use provided key or generate new one
        if encryption_key:
            self.cipher = Fernet(encryption_key)
        else:
            key = Fernet.generate_key()
            self.cipher = Fernet(key)
            print(f"Generated encryption key: {key.decode()}")
    
    def upload_encrypted_data(self, data, bucket, key):
        """Upload data with client-side encryption"""
        # Encrypt data
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        encrypted_data = self.cipher.encrypt(data)
        
        # Upload encrypted data
        self.s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=encrypted_data,
            ContentType='application/octet-stream',
            Metadata={'encrypted': 'true'}
        )
        
        return True
    
    def download_encrypted_data(self, bucket, key):
        """Download and decrypt data"""
        response = self.s3_client.get_object(Bucket=bucket, Key=key)
        encrypted_data = response['Body'].read()
        
        # Decrypt data
        decrypted_data = self.cipher.decrypt(encrypted_data)
        return decrypted_data.decode('utf-8')

Performance Optimization

1. Optimal File Formats

Parquet for Analytics

# Optimize Parquet files for analytics
def optimize_parquet_storage(df, bucket, base_key):
    """Optimize DataFrame storage as Parquet with proper partitioning"""
    
    # Sort by frequently queried columns
    df_sorted = df.sort_values(['customer_id', 'timestamp'])
    
    # Write with optimal row group size
    table = pa.Table.from_pandas(df_sorted)
    
    # Configure Parquet writer
    writer = pq.ParquetWriter(
        f's3://{bucket}/{base_key}',
        table.schema,
        compression='snappy',  # Good balance of compression and speed
        row_group_size=100000,  # Optimize for query patterns
        use_dictionary=True,   # Efficient for repetitive data
        write_statistics=True  # Enable column statistics
    )
    
    # Write in batches
    batch_size = 10000
    for i in range(0, len(table), batch_size):
        batch = table.slice(i, min(batch_size, len(table) - i))
        writer.write_batch(batch)
    
    writer.close()

2. Multipart Upload for Large Files

Efficient Large File Upload

def upload_large_file_multipart(file_path, bucket, key, part_size=5*1024*1024):
    """Upload large files using multipart upload"""
    
    s3_client = boto3.client(
        's3',
        endpoint_url='http://localhost:9000',
        aws_access_key_id='lakehouse_admin',
        aws_secret_access_key='CHANGE_ME_PASSWORD'
    )
    
    # Initiate multipart upload
    response = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
    upload_id = response['UploadId']
    
    parts = []
    part_number = 1
    
    try:
        with open(file_path, 'rb') as f:
            while True:
                chunk = f.read(part_size)
                if not chunk:
                    break
                
                # Upload part
                part_response = s3_client.upload_part(
                    Bucket=bucket,
                    Key=key,
                    PartNumber=part_number,
                    UploadId=upload_id,
                    Body=chunk
                )
                
                parts.append({
                    'ETag': part_response['ETag'],
                    'PartNumber': part_number
                })
                
                part_number += 1
                print(f"Uploaded part {part_number-1}")
        
        # Complete multipart upload
        s3_client.complete_multipart_upload(
            Bucket=bucket,
            Key=key,
            UploadId=upload_id,
            MultipartUpload={'Parts': parts}
        )
        
        print(f"Successfully uploaded {file_path} to {bucket}/{key}")
        return True
        
    except Exception as e:
        # Abort multipart upload on error
        s3_client.abort_multipart_upload(
            Bucket=bucket,
            Key=key,
            UploadId=upload_id
        )
        print(f"Error uploading file: {str(e)}")
        return False

3. Concurrent Operations

Parallel File Processing

import concurrent.futures
import threading
 
def process_files_parallel(file_list, bucket, max_workers=4):
    """Process multiple files in parallel"""
    
    def process_single_file(file_info):
        file_path, s3_key = file_info
        # Your processing logic here
        return upload_large_file_multipart(file_path, bucket, s3_key)
    
    # Use ThreadPoolExecutor for I/O bound operations
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_single_file, file_info): file_info 
                  for file_info in file_list}
        
        results = []
        for future in concurrent.futures.as_completed(futures):
            file_info = futures[future]
            try:
                result = future.result()
                results.append((file_info, result))
                print(f"Completed: {file_info[1]}")
            except Exception as e:
                print(f"Error processing {file_info[1]}: {str(e)}")
                results.append((file_info, False))
    
    return results

Backup & Versioning

1. Object Versioning

Enable Versioning

# Enable versioning on critical buckets
./mc.exe version enable minio/processed-data
./mc.exe version enable minio/documents
./mc.exe version enable minio/ml-datasets
 
# Check versioning status
./mc.exe version info minio/processed-data

Version Management

# List object versions
./mc.exe ls --versions minio/processed-data/curated/customer_summary/
 
# Restore specific version
./mc.exe cp minio/processed-data/curated/customer_summary/customer_summary.parquet?versionId=VERSION_ID \
    minio/processed-data/curated/customer_summary/customer_summary_restored.parquet

2. Cross-Region Replication

Setup Mirror for Backup

# Add backup MinIO instance (simulate with different alias)
./mc.exe alias set lakehouse-backup http://backup-server:9000 backup_user backup_pass
 
# Setup mirroring
./mc.exe mirror minio/processed-data lakehouse-backup/processed-data-backup --watch
 
# Verify mirror
./mc.exe ls lakehouse-backup/processed-data-backup

3. Lifecycle Management

Configure Lifecycle Policies

# Create lifecycle policy for temporary data
cat > temp-lifecycle.json << 'EOF'
{
    "Rules": [
        {
            "ID": "temp-data-cleanup",
            "Status": "Enabled",
            "Filter": {
                "Prefix": "temp/"
            },
            "Expiration": {
                "Days": 7
            }
        },
        {
            "ID": "old-versions-cleanup",
            "Status": "Enabled",
            "NoncurrentVersionExpiration": {
                "NoncurrentDays": 30
            }
        }
    ]
}
EOF
 
# Apply lifecycle policy
./mc.exe ilm add minio/temp --expiry-days 7
./mc.exe ilm add minio/raw-data --expiry-days 365

Monitoring & Troubleshooting

1. Performance Monitoring

MinIO Metrics Collection

# Enable Prometheus metrics
./mc.exe admin prometheus metrics minio
 
# Check server info
./mc.exe admin info minio
 
# Monitor storage usage
./mc.exe admin usage minio

Custom Monitoring Script

# scripts/minio_monitoring.py
import boto3
import json
from datetime import datetime, timedelta
 
class MinIOMonitor:
    def __init__(self):
        self.s3_client = boto3.client(
            's3',
            endpoint_url='http://localhost:9000',
            aws_access_key_id='lakehouse_admin',
            aws_secret_access_key='CHANGE_ME_PASSWORD'
        )
    
    def get_bucket_sizes(self):
        """Get size information for all buckets"""
        buckets = self.s3_client.list_buckets()['Buckets']
        bucket_sizes = {}
        
        for bucket in buckets:
            bucket_name = bucket['Name']
            total_size = 0
            object_count = 0
            
            try:
                paginator = self.s3_client.get_paginator('list_objects_v2')
                pages = paginator.paginate(Bucket=bucket_name)
                
                for page in pages:
                    if 'Contents' in page:
                        for obj in page['Contents']:
                            total_size += obj['Size']
                            object_count += 1
                
                bucket_sizes[bucket_name] = {
                    'size_bytes': total_size,
                    'size_mb': total_size / (1024 * 1024),
                    'object_count': object_count,
                    'last_checked': datetime.now().isoformat()
                }
            except Exception as e:
                bucket_sizes[bucket_name] = {
                    'error': str(e),
                    'last_checked': datetime.now().isoformat()
                }
        
        return bucket_sizes
    
    def check_data_freshness(self):
        """Check freshness of data in critical buckets"""
        freshness_report = {}
        critical_paths = [
            ('raw-data', 'streaming/'),
            ('processed-data', 'clean/'),
            ('processed-data', 'aggregated/')
        ]
        
        for bucket, prefix in critical_paths:
            try:
                response = self.s3_client.list_objects_v2(
                    Bucket=bucket,
                    Prefix=prefix,
                    MaxKeys=10
                )
                
                if 'Contents' in response:
                    latest_object = max(response['Contents'], 
                                      key=lambda x: x['LastModified'])
                    
                    age_hours = (datetime.now(latest_object['LastModified'].tzinfo) - 
                               latest_object['LastModified']).total_seconds() / 3600
                    
                    freshness_report[f"{bucket}/{prefix}"] = {
                        'latest_file': latest_object['Key'],
                        'last_modified': latest_object['LastModified'].isoformat(),
                        'age_hours': age_hours,
                        'is_fresh': age_hours < 24  # Consider fresh if < 24 hours
                    }
                else:
                    freshness_report[f"{bucket}/{prefix}"] = {
                        'status': 'no_objects_found'
                    }
                    
            except Exception as e:
                freshness_report[f"{bucket}/{prefix}"] = {
                    'error': str(e)
                }
        
        return freshness_report
    
    def generate_health_report(self):
        """Generate comprehensive health report"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'bucket_sizes': self.get_bucket_sizes(),
            'data_freshness': self.check_data_freshness()
        }
        
        # Calculate totals
        total_size_mb = sum(
            bucket['size_mb'] for bucket in report['bucket_sizes'].values()
            if 'size_mb' in bucket
        )
        total_objects = sum(
            bucket['object_count'] for bucket in report['bucket_sizes'].values()
            if 'object_count' in bucket
        )
        
        report['summary'] = {
            'total_size_mb': total_size_mb,
            'total_size_gb': total_size_mb / 1024,
            'total_objects': total_objects,
            'healthy_buckets': len([b for b in report['bucket_sizes'].values() 
                                  if 'error' not in b])
        }
        
        return report
 
def run_monitoring():
    """Run monitoring and save report"""
    monitor = MinIOMonitor()
    report = monitor.generate_health_report()
    
    # Print summary
    print("=== MinIO Health Report ===")
    print(f"Total Storage: {report['summary']['total_size_gb']:.2f} GB")
    print(f"Total Objects: {report['summary']['total_objects']}")
    print(f"Healthy Buckets: {report['summary']['healthy_buckets']}")
    
    # Check data freshness alerts
    print("\n=== Data Freshness ===")
    for path, info in report['data_freshness'].items():
        if 'is_fresh' in info:
            status = "✓ FRESH" if info['is_fresh'] else "⚠ STALE"
            print(f"{path}: {status} (age: {info['age_hours']:.1f}h)")
        else:
            print(f"{path}: ❌ ERROR")
    
    # Save detailed report
    report_filename = f"minio_health_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(report_filename, 'w') as f:
        json.dump(report, f, indent=2, default=str)
    
    print(f"\nDetailed report saved to: {report_filename}")
 
if __name__ == "__main__":
    run_monitoring()

2. Common Issues and Solutions

Connection Issues

# Test MinIO connectivity
curl -I http://localhost:9000/minio/health/live
 
# Check container logs
docker logs minio --tail=50
 
# Verify network connectivity
docker exec minio ping trino
docker exec minio nslookup minio

Performance Issues

# Check disk usage
docker exec minio df -h /data
 
# Monitor active connections
docker exec minio netstat -an | grep :9000
 
# Check MinIO server stats
./mc.exe admin trace minio --verbose

Data Corruption Detection

def verify_data_integrity():
    """Verify data integrity using checksums"""
    import hashlib
    
    s3_client = boto3.client(
        's3',
        endpoint_url='http://localhost:9000',
        aws_access_key_id='lakehouse_admin',
        aws_secret_access_key='CHANGE_ME_PASSWORD'
    )
    
    # List critical files
    response = s3_client.list_objects_v2(
        Bucket='processed-data',
        Prefix='curated/'
    )
    
    integrity_report = []
    
    for obj in response.get('Contents', []):
        try:
            # Download object
            obj_response = s3_client.get_object(
                Bucket='processed-data',
                Key=obj['Key']
            )
            
            # Calculate checksum
            content = obj_response['Body'].read()
            calculated_md5 = hashlib.md5(content).hexdigest()
            
            # Compare with stored ETag (if MD5)
            stored_etag = obj_response['ETag'].strip('"')
            
            integrity_report.append({
                'key': obj['Key'],
                'size': obj['Size'],
                'calculated_md5': calculated_md5,
                'stored_etag': stored_etag,
                'integrity_ok': calculated_md5 == stored_etag
            })
            
        except Exception as e:
            integrity_report.append({
                'key': obj['Key'],
                'error': str(e),
                'integrity_ok': False
            })
    
    return integrity_report

Production Best Practices

1. Deployment Configuration

Production MinIO Configuration

# docker-compose.prod.yml
version: '3.8'
services:
  minio:
    image: minio/minio:latest
    container_name: minio-prod
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: ${MINIO_ROOT_USER}
      MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD}
      MINIO_REGION_NAME: ${MINIO_REGION}
      # Security settings
      MINIO_SERVER_URL: https://minio.lakehouse.com
      MINIO_BROWSER_REDIRECT_URL: https://minio-console.lakehouse.com
      # Performance settings
      MINIO_CACHE: "on"
      MINIO_CACHE_DRIVES: "/cache"
      MINIO_CACHE_QUOTA: "80"
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - ./data/minio:/data
      - ./cache/minio:/cache
    networks:
      - lakehouse
    deploy:
      resources:
        limits:
          memory: 4G
          cpus: '2.0'
        reservations:
          memory: 2G
          cpus: '1.0'
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

2. Security Hardening

SSL/TLS Configuration

# Generate SSL certificates for production
openssl req -new -x509 -days 365 -nodes \
    -out minio.crt \
    -keyout minio.key \
    -subj "/C=VN/ST=HCM/L=HoChiMinh/O=Lakehouse/CN=minio.lakehouse.com"
 
# Create certificates directory
mkdir -p platform/minio/certs
 
# Copy certificates
cp minio.crt platform/minio/certs/public.crt
cp minio.key platform/minio/certs/private.key

Network Security

# Production network configuration
networks:
  lakehouse:
    driver: bridge
    ipam:
      config:
        - subnet: 172.20.0.0/16
  lakehouse-internal:
    driver: bridge
    internal: true  # No external access

3. Operational Procedures

Daily Operations Checklist

#!/bin/bash
# daily_minio_health_check.sh
 
echo "=== MinIO Health Check - $(date) ==="
 
# Check container status
echo "1. Container Status:"
docker ps | grep minio
 
# Check MinIO health endpoint
echo "2. Health Check:"
curl -sf http://localhost:9000/minio/health/live && echo "✓ Live" || echo "✗ Down"
curl -sf http://localhost:9000/minio/health/ready && echo "✓ Ready" || echo "✗ Not Ready"
 
# Check disk usage
echo "3. Disk Usage:"
docker exec minio df -h /data
 
# Check recent activity
echo "4. Recent Activity:"
./mc.exe admin trace minio --quiet --json | tail -10
 
# Run integrity check on critical data
echo "5. Data Integrity:"
python scripts/minio_monitoring.py
 
# Check backup status
echo "6. Backup Status:"
./mc.exe mirror --dry-run minio/processed-data lakehouse-backup/processed-data-backup
 
echo "Health check completed"

Disaster Recovery Procedures

#!/bin/bash
# minio_disaster_recovery.sh
 
BACKUP_LOCATION="/backup/minio"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
 
echo "Starting MinIO disaster recovery procedure..."
 
# 1. Stop MinIO service
echo "Stopping MinIO service..."
docker stop minio
 
# 2. Backup current data (if possible)
echo "Backing up current data..."
docker run --rm -v $(pwd)/data/minio:/source -v $BACKUP_LOCATION:/backup \
    alpine tar czf /backup/emergency_backup_$TIMESTAMP.tar.gz /source
 
# 3. Restore from latest backup
echo "Restoring from backup..."
LATEST_BACKUP=$(ls -t $BACKUP_LOCATION/*.tar.gz | head -1)
docker run --rm -v $BACKUP_LOCATION:/backup -v $(pwd)/data/minio:/target \
    alpine tar xzf /backup/$(basename $LATEST_BACKUP) -C /target --strip-components=1
 
# 4. Restart MinIO
echo "Restarting MinIO..."
docker start minio
 
# 5. Verify recovery
echo "Verifying recovery..."
sleep 30
curl -sf http://localhost:9000/minio/health/live && echo "✓ Recovery successful" || echo "✗ Recovery failed"
 
# 6. Run integrity check
echo "Running post-recovery integrity check..."
python scripts/minio_monitoring.py
 
echo "Disaster recovery completed"

4. Scaling Considerations

Distributed MinIO Setup

# For production scaling - distributed MinIO
version: '3.8'
services:
  minio1:
    image: minio/minio:latest
    hostname: minio1
    command: server http://minio{1...4}/data{1...2} --console-address ":9001"
    environment:
      MINIO_ROOT_USER: lakehouse_admin
      MINIO_ROOT_PASSWORD: CHANGE_ME_PASSWORD
    volumes:
      - ./data/minio1/data1:/data1
      - ./data/minio1/data2:/data2
  
  minio2:
    image: minio/minio:latest
    hostname: minio2
    command: server http://minio{1...4}/data{1...2} --console-address ":9001"
    environment:
      MINIO_ROOT_USER: lakehouse_admin
      MINIO_ROOT_PASSWORD: CHANGE_ME_PASSWORD
    volumes:
      - ./data/minio2/data1:/data1
      - ./data/minio2/data2:/data2
  
  # minio3 and minio4 similar configuration...
  
  nginx:
    image: nginx:alpine
    ports:
      - "9000:9000"
    volumes:
      - ./platform/nginx/minio-lb.conf:/etc/nginx/nginx.conf
    depends_on:
      - minio1
      - minio2

Conclusion

This MinIO Object Storage guide provides comprehensive coverage of:

  1. Architecture Understanding: How MinIO serves as the data lake foundation
  2. Data Organization: Strategic bucket design and partitioning for Lakehouse use cases
  3. Hands-on Practice: Complete labs from basic operations to advanced integration
  4. Integration Patterns: Seamless connection with Trino, Kafka, and ETL pipelines
  5. Security Implementation: IAM, encryption, and access control best practices
  6. Performance Optimization: Efficient file formats, parallel processing, and caching
  7. Operations Excellence: Monitoring, backup, disaster recovery procedures

Next Steps

  1. Complete All Labs: Execute hands-on exercises in sequence
  2. Data Lake Design: Implement bucket strategy for your specific requirements
  3. Integration Testing: Test connections with all platform components
  4. Security Deployment: Implement production-grade security measures
  5. Performance Tuning: Optimize for your specific workload patterns
  6. Monitoring Setup: Deploy comprehensive monitoring and alerting

Key Takeaways

  • S3 Compatibility: Leverage existing AWS tools and libraries
  • Scalable Storage: Design for growth with proper partitioning
  • Security First: Implement encryption and access controls from day one
  • Integration Ready: Built for seamless lakehouse integration
  • Production Ready: Comprehensive operational procedures

MinIO is now ready to serve as the robust object storage foundation for Lakehouse Platform!

LDK

Le Duy Khuong

AI Transformation & Digital Strategy. Writing about agentic systems, engineering leadership, and building in public.