Dev Productivity & Tools
MinIO Object Storage Guide — Lakehouse
MinIO trong Lakehouse: S3 API, buckets, security.
2026-03-1720 min read
Table of Contents
- Overview
- MinIO Architecture
- Bucket Strategy & Data Organization
- Hands-on Labs
- Data Lake Integration
- Security & Access Control
- Performance Optimization
- Backup & Versioning
- Monitoring & Troubleshooting
- Production Best Practices
Overview
What is MinIO in Lakehouse?
MinIO serves as the primary object storage layer in the Lakehouse architecture, providing:
- Data Lake Storage: Raw data ingestion and storage for analytics
- Document Storage: Customer documents, loan applications, KYC files
- Backup Storage: Database backups, configuration backups
- ETL Staging: Temporary storage for data transformation processes
- ML/AI Datasets: Training data, model artifacts, feature stores
Key Features for Lakehouse
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Applications │ │ Data Sources │ │ Analytics │
│ (Upload/API) │ │ (Kafka/ETL) │ │ (Trino/Spark) │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
└──────────────────────┼──────────────────────┘
│
┌─────────────▼─────────────┐
│ MinIO │
│ - S3 Compatible API │
│ - Erasure Coding │
│ - Versioning │
│ - Encryption │
│ - Multi-tenancy │
└───────────────────────────┘
MinIO Architecture
1. Service Configuration
# From docker-compose.yml
minio:
image: minio/minio:latest
container_name: minio
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: lakehouse_admin
MINIO_ROOT_PASSWORD: CHANGE_ME_PASSWORD
MINIO_REGION_NAME: lakehouse-region
ports:
- "9000:9000" # S3 API
- "9001:9001" # Web Console
volumes:
- ./data/minio:/data2. Access Configuration
# MinIO Credentials
Access Key: lakehouse_admin
Secret Key: CHANGE_ME_PASSWORD
Endpoint: http://localhost:9000
Console: http://localhost:9001
Region: lakehouse-region3. S3 API Compatibility
MinIO provides full S3 API compatibility, allowing use of:
- AWS SDK for various languages
- AWS CLI tools
- S3-compatible applications
- Direct REST API calls
Bucket Strategy & Data Organization
1. Lakehouse Bucket Structure
lakehouse/
├── raw-data/ # Raw data ingestion
│ ├── streaming/ # Kafka stream outputs
│ ├── batch/ # Daily/hourly batch imports
│ ├── external/ # Third-party data feeds
│ └── apis/ # API responses and logs
├── processed-data/ # Transformed data
│ ├── clean/ # Cleaned and validated data
│ ├── enriched/ # Data with additional context
│ ├── aggregated/ # Pre-computed aggregations
│ └── curated/ # Business-ready datasets
├── documents/ # Business documents
│ ├── customer-kyc/ # KYC documents
│ ├── loan-applications/ # Loan documents
│ ├── contracts/ # Legal contracts
│ └── reports/ # Generated reports
├── ml-datasets/ # Machine learning data
│ ├── training/ # Training datasets
│ ├── models/ # Model artifacts
│ ├── features/ # Feature stores
│ └── predictions/ # Model outputs
├── backups/ # System backups
│ ├── postgresql/ # Database backups
│ ├── kafka/ # Kafka topic backups
│ └── configurations/ # Config backups
└── temp/ # Temporary storage
├── etl-staging/ # ETL intermediate files
├── uploads/ # Temporary uploads
└── processing/ # Data processing workspace
2. Data Partitioning Strategy
# Date-based partitioning for time-series data
raw-data/streaming/
├── year=2024/
│ ├── month=01/
│ │ ├── day=01/
│ │ │ ├── hour=00/
│ │ │ │ ├── customer_events_000.parquet
│ │ │ │ └── loan_events_000.parquet
│ │ │ └── hour=01/
│ │ └── day=02/
│ └── month=02/
└── year=2025/
# Entity-based partitioning for documents
documents/customer-kyc/
├── customer_type=individual/
│ ├── region=hcm/
│ │ ├── customer_id=12345/
│ │ │ ├── id_card_front.jpg
│ │ │ ├── id_card_back.jpg
│ │ │ └── selfie.jpg
│ │ └── customer_id=12346/
│ └── region=hanoi/
└── customer_type=business/
Hands-on Labs
Lab 1: MinIO Setup and Basic Operations
Step 1: Verify MinIO Service
# Check MinIO container status
docker ps | grep minio
# Check MinIO logs
docker logs minio --tail=20
# Test MinIO health
curl -I http://localhost:9000/minio/health/liveStep 2: Install MinIO Client (mc)
# Download and install mc client
wget https://dl.min.io/client/mc/release/windows-amd64/mc.exe -O mc.exe
# Add MinIO server alias
./mc.exe alias set minio http://localhost:9000 lakehouse_admin CHANGE_ME_PASSWORD
# Test connection
./mc.exe admin info minioStep 3: Create Initial Bucket Structure
# Create main buckets
./mc.exe mb minio/raw-data
./mc.exe mb minio/processed-data
./mc.exe mb minio/documents
./mc.exe mb minio/ml-datasets
./mc.exe mb minio/backups
./mc.exe mb minio/temp
# List all buckets
./mc.exe ls minio
# Set bucket policies (public read for processed data)
./mc.exe policy set download minio/processed-dataLab 2: Data Upload and Management
Step 1: Upload Sample Data Files
# Create sample data directory
mkdir -p sample_data/customer_data
mkdir -p sample_data/loan_data
# Create sample CSV files
cat > sample_data/customer_data/customers_20240101.csv << 'EOF'
customer_id,customer_code,full_name,phone_number,email,kyc_status,credit_score,created_at
1,CUST001,Nguyen Van An,0901234567,an.nguyen@email.com,APPROVED,750,2024-01-01 10:30:00
2,CUST002,Tran Thi Binh,0912345678,binh.tran@email.com,APPROVED,680,2024-01-01 11:15:00
3,CUST003,Le Van Cuong,0923456789,cuong.le@email.com,PENDING,720,2024-01-01 14:20:00
EOF
cat > sample_data/loan_data/loans_20240101.csv << 'EOF'
loan_id,loan_number,customer_id,product_type,loan_amount,interest_rate,term_months,status,application_date
1,LN2024001,1,PERSONAL_LOAN,50000000,0.1250,24,APPROVED,2024-01-01 10:30:00
2,LN2024002,2,BUSINESS_LOAN,100000000,0.1150,36,DISBURSED,2024-01-01 14:15:00
EOF
# Upload files with proper structure
./mc.exe cp sample_data/customer_data/customers_20240101.csv \
minio/raw-data/batch/year=2024/month=01/day=01/customers_20240101.csv
./mc.exe cp sample_data/loan_data/loans_20240101.csv \
minio/raw-data/batch/year=2024/month=01/day=01/loans_20240101.csv
# Verify uploads
./mc.exe ls minio/raw-data/batch/year=2024/month=01/day=01/Step 2: Create JSON Data Files
# Create JSON sample data
cat > sample_data/customer_events.json << 'EOF'
{"event_id": "evt_001", "customer_id": "CUST001", "event_type": "LOGIN", "timestamp": "2024-01-01T10:30:00Z", "ip_address": "192.168.1.100", "user_agent": "MobileApp/1.0"}
{"event_id": "evt_002", "customer_id": "CUST001", "event_type": "LOAN_APPLICATION", "timestamp": "2024-01-01T10:35:00Z", "loan_amount": 50000000, "product_type": "PERSONAL_LOAN"}
{"event_id": "evt_003", "customer_id": "CUST002", "event_type": "PAYMENT", "timestamp": "2024-01-01T11:00:00Z", "amount": 2500000, "payment_method": "BANK_TRANSFER"}
EOF
# Upload JSON data
./mc.exe cp sample_data/customer_events.json \
minio/raw-data/streaming/year=2024/month=01/day=01/hour=10/customer_events.json
# Create Parquet sample (simulate from processing)
mkdir -p sample_data/processed
cat > sample_data/processed/customer_summary.csv << 'EOF'
customer_code,total_loans,total_amount,avg_interest_rate,risk_category,last_activity
CUST001,1,50000000,0.1250,LOW_RISK,2024-01-01
CUST002,1,100000000,0.1150,LOW_RISK,2024-01-01
CUST003,0,0,0,MEDIUM_RISK,2024-01-01
EOF
./mc.exe cp sample_data/processed/customer_summary.csv \
minio/processed-data/curated/customer_summary/date=2024-01-01/customer_summary.csvLab 3: Document Management
Step 1: Simulate KYC Document Upload
# Create sample document files
mkdir -p sample_data/documents/kyc
# Create placeholder image files (in real scenario, these would be actual images)
echo "ID Card Front Image Data" > sample_data/documents/kyc/id_front_CUST001.txt
echo "ID Card Back Image Data" > sample_data/documents/kyc/id_back_CUST001.txt
echo "Selfie Image Data" > sample_data/documents/kyc/selfie_CUST001.txt
# Upload KYC documents with proper structure
./mc.exe cp sample_data/documents/kyc/id_front_CUST001.txt \
minio/documents/customer-kyc/customer_type=individual/region=hcm/customer_id=CUST001/id_card_front.jpg
./mc.exe cp sample_data/documents/kyc/id_back_CUST001.txt \
minio/documents/customer-kyc/customer_type=individual/region=hcm/customer_id=CUST001/id_card_back.jpg
./mc.exe cp sample_data/documents/kyc/selfie_CUST001.txt \
minio/documents/customer-kyc/customer_type=individual/region=hcm/customer_id=CUST001/selfie.jpg
# Verify document upload
./mc.exe ls minio/documents/customer-kyc/customer_type=individual/region=hcm/customer_id=CUST001/Step 2: Generate Reports and Store
# Create sample report
cat > sample_data/daily_loan_report_20240101.json << 'EOF'
{
"report_date": "2024-01-01",
"report_type": "DAILY_LOAN_SUMMARY",
"total_applications": 15,
"total_amount": 750000000,
"approved_count": 12,
"rejected_count": 3,
"avg_amount": 50000000,
"top_products": [
{"product": "PERSONAL_LOAN", "count": 8, "amount": 400000000},
{"product": "BUSINESS_LOAN", "count": 4, "amount": 350000000}
],
"generated_at": "2024-01-02T00:30:00Z"
}
EOF
# Upload report
./mc.exe cp sample_data/daily_loan_report_20240101.json \
minio/documents/reports/daily/year=2024/month=01/daily_loan_report_20240101.jsonLab 4: Python Integration
Step 1: Install Required Libraries
# Install Python dependencies
pip install boto3 pandas pyarrowStep 2: Create Python MinIO Client
# Create file: scripts/minio_operations.py
import boto3
import pandas as pd
import json
from datetime import datetime
from botocore.exceptions import ClientError
class LakehouseMinIOClient:
def __init__(self):
self.s3_client = boto3.client(
's3',
endpoint_url='http://localhost:9000',
aws_access_key_id='lakehouse_admin',
aws_secret_access_key='CHANGE_ME_PASSWORD',
region_name='lakehouse-region'
)
def upload_dataframe_as_parquet(self, df, bucket, key):
"""Upload pandas DataFrame as Parquet file"""
try:
# Convert DataFrame to Parquet in memory
parquet_buffer = df.to_parquet(index=False)
# Upload to MinIO
self.s3_client.put_object(
Bucket=bucket,
Key=key,
Body=parquet_buffer,
ContentType='application/octet-stream'
)
print(f"Successfully uploaded {key} to {bucket}")
return True
except Exception as e:
print(f"Error uploading {key}: {str(e)}")
return False
def read_parquet_to_dataframe(self, bucket, key):
"""Read Parquet file from MinIO to pandas DataFrame"""
try:
response = self.s3_client.get_object(Bucket=bucket, Key=key)
df = pd.read_parquet(response['Body'])
print(f"Successfully read {key} from {bucket}")
return df
except Exception as e:
print(f"Error reading {key}: {str(e)}")
return None
def upload_json_data(self, data, bucket, key):
"""Upload JSON data to MinIO"""
try:
json_data = json.dumps(data, indent=2, ensure_ascii=False)
self.s3_client.put_object(
Bucket=bucket,
Key=key,
Body=json_data.encode('utf-8'),
ContentType='application/json'
)
print(f"Successfully uploaded JSON to {bucket}/{key}")
return True
except Exception as e:
print(f"Error uploading JSON: {str(e)}")
return False
def list_objects(self, bucket, prefix=''):
"""List objects in bucket with optional prefix"""
try:
response = self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)
if 'Contents' in response:
objects = [obj['Key'] for obj in response['Contents']]
print(f"Found {len(objects)} objects in {bucket}/{prefix}")
return objects
else:
print(f"No objects found in {bucket}/{prefix}")
return []
except Exception as e:
print(f"Error listing objects: {str(e)}")
return []
# Example usage function
def demo_operations():
"""Demonstrate MinIO operations"""
client = LakehouseMinIOClient()
# Create sample DataFrame
customer_data = {
'customer_id': ['CUST001', 'CUST002', 'CUST003'],
'customer_name': ['Nguyen Van An', 'Tran Thi Binh', 'Le Van Cuong'],
'credit_score': [750, 680, 720],
'total_loans': [1, 1, 0],
'total_amount': [50000000, 100000000, 0],
'last_updated': [datetime.now()] * 3
}
df = pd.DataFrame(customer_data)
# Upload DataFrame as Parquet
parquet_key = 'curated/customer_analytics/date=2024-01-01/customer_summary.parquet'
client.upload_dataframe_as_parquet(df, 'processed-data', parquet_key)
# Upload JSON metadata
metadata = {
'dataset': 'customer_summary',
'created_at': datetime.now().isoformat(),
'record_count': len(df),
'schema': list(df.columns),
'data_sources': ['postgresql.core.customers', 'postgresql.core.loans']
}
metadata_key = 'curated/customer_analytics/date=2024-01-01/metadata.json'
client.upload_json_data(metadata, 'processed-data', metadata_key)
# List objects in processed data
objects = client.list_objects('processed-data', 'curated/customer_analytics/')
for obj in objects:
print(f" - {obj}")
# Read back the data
df_read = client.read_parquet_to_dataframe('processed-data', parquet_key)
if df_read is not None:
print("\nRead back data:")
print(df_read.head())
if __name__ == "__main__":
demo_operations()Step 3: Run Python Integration
# Create and run the Python script
cat > scripts/minio_operations.py << 'EOF'
# (Insert the Python code from above)
EOF
# Run the script
python scripts/minio_operations.pyData Lake Integration
1. Trino Integration with MinIO
Configure Trino Hive Catalog for MinIO
# platform/trino/catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://metastore:9083
hive.s3.endpoint=http://minio:9000
hive.s3.aws-access-key=lakehouse_admin
hive.s3.aws-secret-key=CHANGE_ME_PASSWORD
hive.s3.path-style-access=true
hive.s3.ssl.enabled=falseQuery MinIO Data via Trino
-- Create external table pointing to MinIO data
CREATE TABLE hive.lakehouse_db.customer_events (
event_id VARCHAR,
customer_id VARCHAR,
event_type VARCHAR,
timestamp TIMESTAMP,
metadata JSON
)
WITH (
external_location = 's3a://raw-data/streaming/',
format = 'JSON',
partitioned_by = ARRAY['year', 'month', 'day', 'hour']
);
-- Query data from MinIO
SELECT
event_type,
COUNT(*) as event_count,
DATE(timestamp) as event_date
FROM hive.lakehouse_db.customer_events
WHERE year = '2024' AND month = '01'
GROUP BY event_type, DATE(timestamp)
ORDER BY event_date DESC, event_count DESC;2. Kafka to MinIO Pipeline
Create Kafka Connect S3 Sink
{
"name": "lakehouse-s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "customer-events,lakehouse-loan-events,lakehouse-payment-events",
"s3.region": "lakehouse-region",
"s3.bucket.name": "raw-data",
"s3.part.size": "5242880",
"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "YYYY/MM/dd/HH",
"partition.duration.ms": "3600000",
"timestamp.extractor": "Record",
"store.url": "http://minio:9000",
"s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProvider"
}
}3. ETL Processing Pipeline
Create ETL Script for Data Processing
# scripts/etl_minio_processing.py
import boto3
import pandas as pd
import json
from datetime import datetime, timedelta
import pyarrow as pa
import pyarrow.parquet as pq
class LakehouseDataProcessor:
def __init__(self):
self.s3_client = boto3.client(
's3',
endpoint_url='http://localhost:9000',
aws_access_key_id='lakehouse_admin',
aws_secret_access_key='CHANGE_ME_PASSWORD',
region_name='lakehouse-region'
)
def process_daily_customer_events(self, processing_date):
"""Process daily customer events from raw to processed"""
# Define paths
raw_prefix = f"streaming/year={processing_date.year}/month={processing_date.month:02d}/day={processing_date.day:02d}/"
processed_key = f"clean/customer_events/date={processing_date.strftime('%Y-%m-%d')}/customer_events_processed.parquet"
# Read raw data
raw_events = []
try:
response = self.s3_client.list_objects_v2(
Bucket='raw-data',
Prefix=raw_prefix
)
if 'Contents' in response:
for obj in response['Contents']:
if obj['Key'].endswith('.json'):
obj_response = self.s3_client.get_object(
Bucket='raw-data',
Key=obj['Key']
)
content = obj_response['Body'].read().decode('utf-8')
# Parse JSON lines
for line in content.strip().split('\n'):
if line.strip():
raw_events.append(json.loads(line))
if not raw_events:
print(f"No events found for {processing_date}")
return False
# Convert to DataFrame
df = pd.DataFrame(raw_events)
# Data cleaning and enrichment
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['processing_date'] = processing_date
df['event_hour'] = df['timestamp'].dt.hour
df['is_business_hours'] = (df['event_hour'] >= 8) & (df['event_hour'] <= 18)
# Add data quality flags
df['has_valid_customer_id'] = df['customer_id'].notna() & (df['customer_id'] != '')
df['event_type_category'] = df['event_type'].map({
'LOGIN': 'USER_ACTIVITY',
'LOGOUT': 'USER_ACTIVITY',
'LOAN_APPLICATION': 'BUSINESS_EVENT',
'PAYMENT': 'BUSINESS_EVENT',
'KYC_UPDATE': 'PROFILE_EVENT'
}).fillna('OTHER')
# Save processed data
parquet_buffer = df.to_parquet(index=False)
self.s3_client.put_object(
Bucket='processed-data',
Key=processed_key,
Body=parquet_buffer,
ContentType='application/octet-stream'
)
# Create processing metadata
metadata = {
'processing_date': processing_date.isoformat(),
'source_files_count': len([obj for obj in response['Contents'] if obj['Key'].endswith('.json')]),
'total_events': len(df),
'valid_events': df['has_valid_customer_id'].sum(),
'event_types': df['event_type'].value_counts().to_dict(),
'processing_timestamp': datetime.now().isoformat(),
'data_quality_score': df['has_valid_customer_id'].mean()
}
metadata_key = f"clean/customer_events/date={processing_date.strftime('%Y-%m-%d')}/processing_metadata.json"
self.s3_client.put_object(
Bucket='processed-data',
Key=metadata_key,
Body=json.dumps(metadata, indent=2).encode('utf-8'),
ContentType='application/json'
)
print(f"Successfully processed {len(df)} events for {processing_date}")
return True
except Exception as e:
print(f"Error processing events for {processing_date}: {str(e)}")
return False
def generate_daily_aggregations(self, processing_date):
"""Generate daily aggregations from processed data"""
try:
# Read processed events
processed_key = f"clean/customer_events/date={processing_date.strftime('%Y-%m-%d')}/customer_events_processed.parquet"
response = self.s3_client.get_object(
Bucket='processed-data',
Key=processed_key
)
df = pd.read_parquet(response['Body'])
# Generate customer daily summary
customer_summary = df.groupby('customer_id').agg({
'event_id': 'count',
'event_type': lambda x: x.value_counts().to_dict(),
'timestamp': ['min', 'max'],
'is_business_hours': 'sum'
}).reset_index()
customer_summary.columns = [
'customer_id', 'total_events', 'event_breakdown',
'first_activity', 'last_activity', 'business_hours_events'
]
customer_summary['activity_duration_hours'] = (
pd.to_datetime(customer_summary['last_activity']) -
pd.to_datetime(customer_summary['first_activity'])
).dt.total_seconds() / 3600
# Save aggregated data
agg_key = f"aggregated/customer_daily_summary/date={processing_date.strftime('%Y-%m-%d')}/summary.parquet"
parquet_buffer = customer_summary.to_parquet(index=False)
self.s3_client.put_object(
Bucket='processed-data',
Key=agg_key,
Body=parquet_buffer,
ContentType='application/octet-stream'
)
print(f"Generated daily aggregations for {len(customer_summary)} customers")
return True
except Exception as e:
print(f"Error generating aggregations for {processing_date}: {str(e)}")
return False
def run_daily_etl():
"""Run daily ETL process"""
processor = LakehouseDataProcessor()
processing_date = datetime.now().date() - timedelta(days=1) # Process previous day
print(f"Starting ETL processing for {processing_date}")
# Step 1: Process raw events
if processor.process_daily_customer_events(processing_date):
print("✓ Event processing completed")
# Step 2: Generate aggregations
if processor.generate_daily_aggregations(processing_date):
print("✓ Aggregation generation completed")
print(f"ETL processing completed successfully for {processing_date}")
else:
print("✗ Aggregation generation failed")
else:
print("✗ Event processing failed")
if __name__ == "__main__":
run_daily_etl()Security & Access Control
1. Bucket Policies and IAM
Create Service Account for Applications
# Create service account for Lakehouse applications
./mc.exe admin user add minio lakehouse-app-service lakehouseAppSecure2024
# Create policy for application access
cat > lakehouse-app-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::raw-data/*",
"arn:aws:s3:::processed-data/*",
"arn:aws:s3:::temp/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::raw-data",
"arn:aws:s3:::processed-data",
"arn:aws:s3:::temp"
]
}
]
}
EOF
# Apply policy
./mc.exe admin policy add minio lakehouse-app-policy lakehouse-app-policy.json
./mc.exe admin policy set minio lakehouse-app-policy user=lakehouse-app-serviceRead-only Access for Analytics
# Create read-only user for analytics
./mc.exe admin user add minio lakehouse-analytics-readonly lakehouseAnalyticsRead2024
# Create read-only policy
cat > lakehouse-readonly-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::processed-data",
"arn:aws:s3:::processed-data/*",
"arn:aws:s3:::documents",
"arn:aws:s3:::documents/*"
]
}
]
}
EOF
./mc.exe admin policy add minio lakehouse-readonly-policy lakehouse-readonly-policy.json
./mc.exe admin policy set minio lakehouse-readonly-policy user=lakehouse-analytics-readonly2. Encryption Configuration
Server-Side Encryption
# Enable default encryption for sensitive buckets
./mc.exe encrypt set sse-s3 minio/documents
./mc.exe encrypt set sse-s3 minio/ml-datasets
# Verify encryption settings
./mc.exe encrypt info minio/documentsClient-Side Encryption Example
# Python example with client-side encryption
import boto3
from cryptography.fernet import Fernet
class EncryptedMinIOClient:
def __init__(self, encryption_key=None):
self.s3_client = boto3.client(
's3',
endpoint_url='http://localhost:9000',
aws_access_key_id='lakehouse_admin',
aws_secret_access_key='CHANGE_ME_PASSWORD',
region_name='lakehouse-region'
)
# Use provided key or generate new one
if encryption_key:
self.cipher = Fernet(encryption_key)
else:
key = Fernet.generate_key()
self.cipher = Fernet(key)
print(f"Generated encryption key: {key.decode()}")
def upload_encrypted_data(self, data, bucket, key):
"""Upload data with client-side encryption"""
# Encrypt data
if isinstance(data, str):
data = data.encode('utf-8')
encrypted_data = self.cipher.encrypt(data)
# Upload encrypted data
self.s3_client.put_object(
Bucket=bucket,
Key=key,
Body=encrypted_data,
ContentType='application/octet-stream',
Metadata={'encrypted': 'true'}
)
return True
def download_encrypted_data(self, bucket, key):
"""Download and decrypt data"""
response = self.s3_client.get_object(Bucket=bucket, Key=key)
encrypted_data = response['Body'].read()
# Decrypt data
decrypted_data = self.cipher.decrypt(encrypted_data)
return decrypted_data.decode('utf-8')Performance Optimization
1. Optimal File Formats
Parquet for Analytics
# Optimize Parquet files for analytics
def optimize_parquet_storage(df, bucket, base_key):
"""Optimize DataFrame storage as Parquet with proper partitioning"""
# Sort by frequently queried columns
df_sorted = df.sort_values(['customer_id', 'timestamp'])
# Write with optimal row group size
table = pa.Table.from_pandas(df_sorted)
# Configure Parquet writer
writer = pq.ParquetWriter(
f's3://{bucket}/{base_key}',
table.schema,
compression='snappy', # Good balance of compression and speed
row_group_size=100000, # Optimize for query patterns
use_dictionary=True, # Efficient for repetitive data
write_statistics=True # Enable column statistics
)
# Write in batches
batch_size = 10000
for i in range(0, len(table), batch_size):
batch = table.slice(i, min(batch_size, len(table) - i))
writer.write_batch(batch)
writer.close()2. Multipart Upload for Large Files
Efficient Large File Upload
def upload_large_file_multipart(file_path, bucket, key, part_size=5*1024*1024):
"""Upload large files using multipart upload"""
s3_client = boto3.client(
's3',
endpoint_url='http://localhost:9000',
aws_access_key_id='lakehouse_admin',
aws_secret_access_key='CHANGE_ME_PASSWORD'
)
# Initiate multipart upload
response = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
upload_id = response['UploadId']
parts = []
part_number = 1
try:
with open(file_path, 'rb') as f:
while True:
chunk = f.read(part_size)
if not chunk:
break
# Upload part
part_response = s3_client.upload_part(
Bucket=bucket,
Key=key,
PartNumber=part_number,
UploadId=upload_id,
Body=chunk
)
parts.append({
'ETag': part_response['ETag'],
'PartNumber': part_number
})
part_number += 1
print(f"Uploaded part {part_number-1}")
# Complete multipart upload
s3_client.complete_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts}
)
print(f"Successfully uploaded {file_path} to {bucket}/{key}")
return True
except Exception as e:
# Abort multipart upload on error
s3_client.abort_multipart_upload(
Bucket=bucket,
Key=key,
UploadId=upload_id
)
print(f"Error uploading file: {str(e)}")
return False3. Concurrent Operations
Parallel File Processing
import concurrent.futures
import threading
def process_files_parallel(file_list, bucket, max_workers=4):
"""Process multiple files in parallel"""
def process_single_file(file_info):
file_path, s3_key = file_info
# Your processing logic here
return upload_large_file_multipart(file_path, bucket, s3_key)
# Use ThreadPoolExecutor for I/O bound operations
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_single_file, file_info): file_info
for file_info in file_list}
results = []
for future in concurrent.futures.as_completed(futures):
file_info = futures[future]
try:
result = future.result()
results.append((file_info, result))
print(f"Completed: {file_info[1]}")
except Exception as e:
print(f"Error processing {file_info[1]}: {str(e)}")
results.append((file_info, False))
return resultsBackup & Versioning
1. Object Versioning
Enable Versioning
# Enable versioning on critical buckets
./mc.exe version enable minio/processed-data
./mc.exe version enable minio/documents
./mc.exe version enable minio/ml-datasets
# Check versioning status
./mc.exe version info minio/processed-dataVersion Management
# List object versions
./mc.exe ls --versions minio/processed-data/curated/customer_summary/
# Restore specific version
./mc.exe cp minio/processed-data/curated/customer_summary/customer_summary.parquet?versionId=VERSION_ID \
minio/processed-data/curated/customer_summary/customer_summary_restored.parquet2. Cross-Region Replication
Setup Mirror for Backup
# Add backup MinIO instance (simulate with different alias)
./mc.exe alias set lakehouse-backup http://backup-server:9000 backup_user backup_pass
# Setup mirroring
./mc.exe mirror minio/processed-data lakehouse-backup/processed-data-backup --watch
# Verify mirror
./mc.exe ls lakehouse-backup/processed-data-backup3. Lifecycle Management
Configure Lifecycle Policies
# Create lifecycle policy for temporary data
cat > temp-lifecycle.json << 'EOF'
{
"Rules": [
{
"ID": "temp-data-cleanup",
"Status": "Enabled",
"Filter": {
"Prefix": "temp/"
},
"Expiration": {
"Days": 7
}
},
{
"ID": "old-versions-cleanup",
"Status": "Enabled",
"NoncurrentVersionExpiration": {
"NoncurrentDays": 30
}
}
]
}
EOF
# Apply lifecycle policy
./mc.exe ilm add minio/temp --expiry-days 7
./mc.exe ilm add minio/raw-data --expiry-days 365Monitoring & Troubleshooting
1. Performance Monitoring
MinIO Metrics Collection
# Enable Prometheus metrics
./mc.exe admin prometheus metrics minio
# Check server info
./mc.exe admin info minio
# Monitor storage usage
./mc.exe admin usage minioCustom Monitoring Script
# scripts/minio_monitoring.py
import boto3
import json
from datetime import datetime, timedelta
class MinIOMonitor:
def __init__(self):
self.s3_client = boto3.client(
's3',
endpoint_url='http://localhost:9000',
aws_access_key_id='lakehouse_admin',
aws_secret_access_key='CHANGE_ME_PASSWORD'
)
def get_bucket_sizes(self):
"""Get size information for all buckets"""
buckets = self.s3_client.list_buckets()['Buckets']
bucket_sizes = {}
for bucket in buckets:
bucket_name = bucket['Name']
total_size = 0
object_count = 0
try:
paginator = self.s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name)
for page in pages:
if 'Contents' in page:
for obj in page['Contents']:
total_size += obj['Size']
object_count += 1
bucket_sizes[bucket_name] = {
'size_bytes': total_size,
'size_mb': total_size / (1024 * 1024),
'object_count': object_count,
'last_checked': datetime.now().isoformat()
}
except Exception as e:
bucket_sizes[bucket_name] = {
'error': str(e),
'last_checked': datetime.now().isoformat()
}
return bucket_sizes
def check_data_freshness(self):
"""Check freshness of data in critical buckets"""
freshness_report = {}
critical_paths = [
('raw-data', 'streaming/'),
('processed-data', 'clean/'),
('processed-data', 'aggregated/')
]
for bucket, prefix in critical_paths:
try:
response = self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix,
MaxKeys=10
)
if 'Contents' in response:
latest_object = max(response['Contents'],
key=lambda x: x['LastModified'])
age_hours = (datetime.now(latest_object['LastModified'].tzinfo) -
latest_object['LastModified']).total_seconds() / 3600
freshness_report[f"{bucket}/{prefix}"] = {
'latest_file': latest_object['Key'],
'last_modified': latest_object['LastModified'].isoformat(),
'age_hours': age_hours,
'is_fresh': age_hours < 24 # Consider fresh if < 24 hours
}
else:
freshness_report[f"{bucket}/{prefix}"] = {
'status': 'no_objects_found'
}
except Exception as e:
freshness_report[f"{bucket}/{prefix}"] = {
'error': str(e)
}
return freshness_report
def generate_health_report(self):
"""Generate comprehensive health report"""
report = {
'timestamp': datetime.now().isoformat(),
'bucket_sizes': self.get_bucket_sizes(),
'data_freshness': self.check_data_freshness()
}
# Calculate totals
total_size_mb = sum(
bucket['size_mb'] for bucket in report['bucket_sizes'].values()
if 'size_mb' in bucket
)
total_objects = sum(
bucket['object_count'] for bucket in report['bucket_sizes'].values()
if 'object_count' in bucket
)
report['summary'] = {
'total_size_mb': total_size_mb,
'total_size_gb': total_size_mb / 1024,
'total_objects': total_objects,
'healthy_buckets': len([b for b in report['bucket_sizes'].values()
if 'error' not in b])
}
return report
def run_monitoring():
"""Run monitoring and save report"""
monitor = MinIOMonitor()
report = monitor.generate_health_report()
# Print summary
print("=== MinIO Health Report ===")
print(f"Total Storage: {report['summary']['total_size_gb']:.2f} GB")
print(f"Total Objects: {report['summary']['total_objects']}")
print(f"Healthy Buckets: {report['summary']['healthy_buckets']}")
# Check data freshness alerts
print("\n=== Data Freshness ===")
for path, info in report['data_freshness'].items():
if 'is_fresh' in info:
status = "✓ FRESH" if info['is_fresh'] else "⚠ STALE"
print(f"{path}: {status} (age: {info['age_hours']:.1f}h)")
else:
print(f"{path}: ❌ ERROR")
# Save detailed report
report_filename = f"minio_health_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_filename, 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"\nDetailed report saved to: {report_filename}")
if __name__ == "__main__":
run_monitoring()2. Common Issues and Solutions
Connection Issues
# Test MinIO connectivity
curl -I http://localhost:9000/minio/health/live
# Check container logs
docker logs minio --tail=50
# Verify network connectivity
docker exec minio ping trino
docker exec minio nslookup minioPerformance Issues
# Check disk usage
docker exec minio df -h /data
# Monitor active connections
docker exec minio netstat -an | grep :9000
# Check MinIO server stats
./mc.exe admin trace minio --verboseData Corruption Detection
def verify_data_integrity():
"""Verify data integrity using checksums"""
import hashlib
s3_client = boto3.client(
's3',
endpoint_url='http://localhost:9000',
aws_access_key_id='lakehouse_admin',
aws_secret_access_key='CHANGE_ME_PASSWORD'
)
# List critical files
response = s3_client.list_objects_v2(
Bucket='processed-data',
Prefix='curated/'
)
integrity_report = []
for obj in response.get('Contents', []):
try:
# Download object
obj_response = s3_client.get_object(
Bucket='processed-data',
Key=obj['Key']
)
# Calculate checksum
content = obj_response['Body'].read()
calculated_md5 = hashlib.md5(content).hexdigest()
# Compare with stored ETag (if MD5)
stored_etag = obj_response['ETag'].strip('"')
integrity_report.append({
'key': obj['Key'],
'size': obj['Size'],
'calculated_md5': calculated_md5,
'stored_etag': stored_etag,
'integrity_ok': calculated_md5 == stored_etag
})
except Exception as e:
integrity_report.append({
'key': obj['Key'],
'error': str(e),
'integrity_ok': False
})
return integrity_reportProduction Best Practices
1. Deployment Configuration
Production MinIO Configuration
# docker-compose.prod.yml
version: '3.8'
services:
minio:
image: minio/minio:latest
container_name: minio-prod
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD}
MINIO_REGION_NAME: ${MINIO_REGION}
# Security settings
MINIO_SERVER_URL: https://minio.lakehouse.com
MINIO_BROWSER_REDIRECT_URL: https://minio-console.lakehouse.com
# Performance settings
MINIO_CACHE: "on"
MINIO_CACHE_DRIVES: "/cache"
MINIO_CACHE_QUOTA: "80"
ports:
- "9000:9000"
- "9001:9001"
volumes:
- ./data/minio:/data
- ./cache/minio:/cache
networks:
- lakehouse
deploy:
resources:
limits:
memory: 4G
cpus: '2.0'
reservations:
memory: 2G
cpus: '1.0'
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s2. Security Hardening
SSL/TLS Configuration
# Generate SSL certificates for production
openssl req -new -x509 -days 365 -nodes \
-out minio.crt \
-keyout minio.key \
-subj "/C=VN/ST=HCM/L=HoChiMinh/O=Lakehouse/CN=minio.lakehouse.com"
# Create certificates directory
mkdir -p platform/minio/certs
# Copy certificates
cp minio.crt platform/minio/certs/public.crt
cp minio.key platform/minio/certs/private.keyNetwork Security
# Production network configuration
networks:
lakehouse:
driver: bridge
ipam:
config:
- subnet: 172.20.0.0/16
lakehouse-internal:
driver: bridge
internal: true # No external access3. Operational Procedures
Daily Operations Checklist
#!/bin/bash
# daily_minio_health_check.sh
echo "=== MinIO Health Check - $(date) ==="
# Check container status
echo "1. Container Status:"
docker ps | grep minio
# Check MinIO health endpoint
echo "2. Health Check:"
curl -sf http://localhost:9000/minio/health/live && echo "✓ Live" || echo "✗ Down"
curl -sf http://localhost:9000/minio/health/ready && echo "✓ Ready" || echo "✗ Not Ready"
# Check disk usage
echo "3. Disk Usage:"
docker exec minio df -h /data
# Check recent activity
echo "4. Recent Activity:"
./mc.exe admin trace minio --quiet --json | tail -10
# Run integrity check on critical data
echo "5. Data Integrity:"
python scripts/minio_monitoring.py
# Check backup status
echo "6. Backup Status:"
./mc.exe mirror --dry-run minio/processed-data lakehouse-backup/processed-data-backup
echo "Health check completed"Disaster Recovery Procedures
#!/bin/bash
# minio_disaster_recovery.sh
BACKUP_LOCATION="/backup/minio"
TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
echo "Starting MinIO disaster recovery procedure..."
# 1. Stop MinIO service
echo "Stopping MinIO service..."
docker stop minio
# 2. Backup current data (if possible)
echo "Backing up current data..."
docker run --rm -v $(pwd)/data/minio:/source -v $BACKUP_LOCATION:/backup \
alpine tar czf /backup/emergency_backup_$TIMESTAMP.tar.gz /source
# 3. Restore from latest backup
echo "Restoring from backup..."
LATEST_BACKUP=$(ls -t $BACKUP_LOCATION/*.tar.gz | head -1)
docker run --rm -v $BACKUP_LOCATION:/backup -v $(pwd)/data/minio:/target \
alpine tar xzf /backup/$(basename $LATEST_BACKUP) -C /target --strip-components=1
# 4. Restart MinIO
echo "Restarting MinIO..."
docker start minio
# 5. Verify recovery
echo "Verifying recovery..."
sleep 30
curl -sf http://localhost:9000/minio/health/live && echo "✓ Recovery successful" || echo "✗ Recovery failed"
# 6. Run integrity check
echo "Running post-recovery integrity check..."
python scripts/minio_monitoring.py
echo "Disaster recovery completed"4. Scaling Considerations
Distributed MinIO Setup
# For production scaling - distributed MinIO
version: '3.8'
services:
minio1:
image: minio/minio:latest
hostname: minio1
command: server http://minio{1...4}/data{1...2} --console-address ":9001"
environment:
MINIO_ROOT_USER: lakehouse_admin
MINIO_ROOT_PASSWORD: CHANGE_ME_PASSWORD
volumes:
- ./data/minio1/data1:/data1
- ./data/minio1/data2:/data2
minio2:
image: minio/minio:latest
hostname: minio2
command: server http://minio{1...4}/data{1...2} --console-address ":9001"
environment:
MINIO_ROOT_USER: lakehouse_admin
MINIO_ROOT_PASSWORD: CHANGE_ME_PASSWORD
volumes:
- ./data/minio2/data1:/data1
- ./data/minio2/data2:/data2
# minio3 and minio4 similar configuration...
nginx:
image: nginx:alpine
ports:
- "9000:9000"
volumes:
- ./platform/nginx/minio-lb.conf:/etc/nginx/nginx.conf
depends_on:
- minio1
- minio2Conclusion
This MinIO Object Storage guide provides comprehensive coverage of:
- Architecture Understanding: How MinIO serves as the data lake foundation
- Data Organization: Strategic bucket design and partitioning for Lakehouse use cases
- Hands-on Practice: Complete labs from basic operations to advanced integration
- Integration Patterns: Seamless connection with Trino, Kafka, and ETL pipelines
- Security Implementation: IAM, encryption, and access control best practices
- Performance Optimization: Efficient file formats, parallel processing, and caching
- Operations Excellence: Monitoring, backup, disaster recovery procedures
Next Steps
- Complete All Labs: Execute hands-on exercises in sequence
- Data Lake Design: Implement bucket strategy for your specific requirements
- Integration Testing: Test connections with all platform components
- Security Deployment: Implement production-grade security measures
- Performance Tuning: Optimize for your specific workload patterns
- Monitoring Setup: Deploy comprehensive monitoring and alerting
Key Takeaways
- S3 Compatibility: Leverage existing AWS tools and libraries
- Scalable Storage: Design for growth with proper partitioning
- Security First: Implement encryption and access controls from day one
- Integration Ready: Built for seamless lakehouse integration
- Production Ready: Comprehensive operational procedures
MinIO is now ready to serve as the robust object storage foundation for Lakehouse Platform!
