Master Databricks ELT/ETL: Cost & Performance Guide
Cut Databricks costs by 90% and speed up pipelines by 75%. Real case studies, production patterns, and troubleshooting tips from 50+ deployments.
Your data pipelines are expensive. Your Databricks clusters run longer than they should. Integration takes weeks instead of days.
You're not alone. Most teams struggle with the same issues when building production data pipelines on Databricks.
After working with dozens of companies on their Databricks deployments, we've seen teams reduce costs by up to 90% and cut integration time by 60%. Here's what we learned.
Why Databricks for Data Pipelines?
Databricks handles large data volumes well. Its unified platform combines data engineering, data science, and machine learning in one place.
What it does:
- Process millions of records per second
- Scale compute and storage independently
- Handle both batch and streaming workloads
- Support SQL, Python, Scala, and R
- Built on Apache Spark for distributed processing
What makes Databricks different: Unlike traditional ETL tools, Databricks processes data where it lives. You don't need to move data to a separate processing engine. The compute layer scales dynamically based on workload.
ETL vs ELT: Choosing Your Approach
Understanding the difference between ETL and ELT is fundamental to building efficient pipelines.
ETL (Extract, Transform, Load):
- Transform data before loading into warehouse
- Requires separate processing layer
- Good for complex transformations before storage
- Traditional approach, established patterns
ELT (Extract, Load, Transform):
- Load raw data first, transform in warehouse
- Leverages warehouse compute power
- More flexible, easier to iterate
- Modern approach, cloud-native
When to use ETL:
- Limited warehouse compute capacity
- Complex transformations required before loading
- Sensitive data that must be cleaned before storage
- Legacy systems with established patterns
When to use ELT:
- Cloud data warehouse with ample compute
- Need to preserve raw data
- Frequent schema or logic changes
- Fast time-to-insight requirements
Databricks works well for both approaches, but ELT is often the better choice for cloud deployments. You get flexibility, faster iterations, and better use of Databricks' compute power.
If you're building modern data pipelines, understanding this distinction helps you choose the right architecture from the start.
Databricks Lakeflow: Declarative Pipelines
Lakeflow transforms how you build data pipelines. Instead of writing complex orchestration code, you declare what you want and Lakeflow figures out how to do it.
What you get:
- SQL-based pipeline definitions
- Automatic dependency management
- Built-in data quality checks
- Schema evolution handling
- Incremental processing
Simple example:
-- Define a pipeline that processes user events
CREATE OR REFRESH STREAMING TABLE user_events
AS SELECT
user_id,
event_type,
event_timestamp,
properties
FROM cloud_files(
"/mnt/raw/events",
"json",
map("cloudFiles.inferColumnTypes", "true")
);
-- Transform events into user sessions
CREATE OR REFRESH STREAMING TABLE user_sessions
AS SELECT
user_id,
session_start,
session_end,
COUNT(*) as event_count
FROM STREAM(user_events)
GROUP BY
user_id,
window(event_timestamp, "30 minutes")
QUALIFY ROW_NUMBER() OVER (
PARTITION BY user_id, session_start
ORDER BY session_end DESC
) = 1;
Lakeflow handles:
- Reading from cloud storage incrementally
- Managing checkpoints and state
- Handling late-arriving data
- Updating tables as new data arrives
- Data quality validation
No manual orchestration needed. Declare your logic, Lakeflow executes it.
Cost Optimization Strategies
Databricks costs add up quickly if not managed properly. Here's how to control them.
1. Cluster Configuration
Choose the right cluster mode:
# Job cluster (ephemeral, cheaper)
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
cluster_config = {
"cluster_name": "etl-job-cluster",
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"spark_conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true"
}
}
# For scheduled jobs
job_cluster = w.jobs.create(
name="daily-etl",
tasks=[{
"task_key": "process_data",
"new_cluster": cluster_config,
"notebook_task": {
"notebook_path": "/pipelines/daily_etl"
}
}]
)
Cluster types:
- All-purpose clusters: For interactive work, more expensive
- Job clusters: For scheduled jobs, automatically terminate
- SQL warehouses: For SQL queries, serverless option
Cost-saving tips:
- Use job clusters for production workloads (40-50% cheaper)
- Enable auto-termination for interactive clusters
- Use spot instances for non-critical jobs (up to 90% savings)
- Right-size your clusters (don't over-provision)
2. Storage Optimization
Store data efficiently to reduce costs.
# Improve table storage with Z-ordering
spark.sql("""
OPTIMIZE events
ZORDER BY (user_id, event_date)
""")
# Vacuum old files (removes files older than retention)
spark.sql("""
VACUUM events RETAIN 168 HOURS
""")
# Use appropriate file formats
df.write .format("delta") .mode("overwrite") .option("compression", "snappy") .option("optimizeWrite", "true") .save("/mnt/optimized/events")
Storage best practices:
- Use Delta Lake format (better compression, faster queries)
- Enable auto-optimize for writes
- Partition large tables appropriately
- Z-order frequently queried columns
- Vacuum regularly to remove old files
3. Query Optimization
Optimize queries to use less compute time.
# Before: Full table scan
expensive_query = """
SELECT *
FROM large_table
WHERE event_date = '2025-01-15'
"""
# After: Partition pruning
better_query = """
SELECT *
FROM large_table
WHERE event_date = '2025-01-15' -- Uses partition pruning
AND user_id IS NOT NULL -- Push down filter
"""
# Use query hints when needed
spark.sql("""
SELECT /*+ BROADCAST(small_table) */
large_table.*,
small_table.category
FROM large_table
JOIN small_table ON large_table.category_id = small_table.id
""")
Query improvement checklist:
- Turn on Adaptive Query Execution (AQE)
- Use partition pruning
- Push filters down to data source
- Use broadcast joins for small tables
- Avoid unnecessary shuffles
- Cache frequently accessed data
4. Monitoring Costs
Track spending to identify optimization opportunities.
# Query cost data from system tables
cost_analysis = spark.sql("""
SELECT
workspace_id,
cluster_id,
usage_date,
sku_name,
usage_quantity,
usage_quantity * list_price as estimated_cost
FROM system.billing.usage
WHERE usage_date >= current_date() - 30
GROUP BY 1, 2, 3, 4, 5, 6
ORDER BY estimated_cost DESC
""")
# Identify expensive workloads
expensive_jobs = spark.sql("""
SELECT
job_id,
job_name,
SUM(usage_quantity * list_price) as total_cost
FROM system.billing.usage
WHERE usage_date >= current_date() - 7
GROUP BY job_id, job_name
ORDER BY total_cost DESC
LIMIT 10
""")
Set up alerts for cost anomalies. Monitor daily spending. Review expensive workloads weekly.
Integration Patterns
Connect Databricks to your data sources efficiently.
PostgreSQL Integration
from pyspark.sql import SparkSession
spark = SparkSession.builder .appName("PostgreSQL Integration") .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") .getOrCreate()
# Read from PostgreSQL
jdbc_url = "jdbc:postgresql://hostname:5432/database"
connection_properties = {
"user": "username",
"password": "password",
"driver": "org.postgresql.Driver"
}
# Read entire table
df = spark.read.jdbc(
url=jdbc_url,
table="users",
properties=connection_properties
)
# Read with query
query = "(SELECT * FROM users WHERE created_at > '2025-01-01') AS subset"
df = spark.read.jdbc(
url=jdbc_url,
table=query,
properties=connection_properties
)
# Parallel read with partitioning
df = spark.read.jdbc(
url=jdbc_url,
table="large_table",
properties=connection_properties,
column="id",
lowerBound=1,
upperBound=1000000,
numPartitions=10
)
AWS S3 Integration
# Configure S3 access
spark.conf.set("fs.s3a.access.key", "YOUR_ACCESS_KEY")
spark.conf.set("fs.s3a.secret.key", "YOUR_SECRET_KEY")
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
# Or use IAM role (recommended)
spark.conf.set("fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.InstanceProfileCredentialsProvider")
# Read from S3
df = spark.read .format("parquet") .load("s3a://bucket-name/path/to/data")
# Write to S3
df.write .format("delta") .mode("overwrite") .save("s3a://bucket-name/output/path")
# Auto-loader for incremental ingestion
df = spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", "/mnt/schema") .load("s3a://bucket-name/incoming/")
df.writeStream .format("delta") .option("checkpointLocation", "/mnt/checkpoints") .start("/mnt/processed/data")
Kafka Integration
# Read from Kafka
kafka_df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka-broker:9092") .option("subscribe", "events-topic") .option("startingOffsets", "latest") .load()
# Parse Kafka messages
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType
schema = StructType([
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("timestamp", LongType())
])
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Write to Delta table
parsed_df.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/mnt/checkpoints/kafka") .start("/mnt/events")
Schema Evolution and Management
Handling schema changes gracefully is critical for production pipelines.
Schema Evolution with Delta Lake
-- Enable schema evolution
SET spark.databricks.delta.schema.autoMerge.enabled = true;
-- Add new columns automatically
INSERT INTO events
SELECT
user_id,
event_type,
timestamp,
'new_column_value' as new_column -- Automatically added
FROM source_events;
-- Explicitly add columns
ALTER TABLE events ADD COLUMNS (
device_type STRING COMMENT 'Type of device',
app_version STRING COMMENT 'Application version'
);
-- Rename columns
ALTER TABLE events RENAME COLUMN old_name TO new_name;
-- Change column types (with constraints)
ALTER TABLE events ALTER COLUMN event_count TYPE BIGINT;
Managing Breaking Changes
from pyspark.sql.functions import col, when, coalesce
# Handle schema changes in source data
df = spark.read.json("/mnt/raw/events")
# Add missing columns with defaults
df = df .withColumn("device_type",
when(col("device_type").isNotNull(), col("device_type"))
.otherwise("unknown")
) .withColumn("app_version",
coalesce(col("app_version"), col("version"), lit("1.0.0"))
)
# Validate schema before writing
expected_schema = ["user_id", "event_type", "timestamp", "device_type"]
actual_columns = df.columns
missing_columns = set(expected_schema) - set(actual_columns)
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Write with schema enforcement
df.write .format("delta") .mode("append") .option("mergeSchema", "true") .save("/mnt/processed/events")
Schema Validation
from pyspark.sql.types import StructType, StructField, StringType, LongType
# Define expected schema
expected_schema = StructType([
StructField("user_id", StringType(), nullable=False),
StructField("event_type", StringType(), nullable=False),
StructField("timestamp", LongType(), nullable=False),
StructField("properties", StringType(), nullable=True)
])
# Validate incoming data
def validate_schema(df, expected_schema):
"""Validate dataframe against expected schema"""
actual_schema = df.schema
# Check required fields exist
expected_fields = {f.name: f for f in expected_schema.fields}
actual_fields = {f.name: f for f in actual_schema.fields}
missing_fields = set(expected_fields.keys()) - set(actual_fields.keys())
if missing_fields:
return False, f"Missing fields: {missing_fields}"
# Check field types match
for field_name in expected_fields:
if expected_fields[field_name].dataType != actual_fields[field_name].dataType:
return False, f"Type mismatch for {field_name}"
return True, "Schema valid"
# Use validation
is_valid, message = validate_schema(df, expected_schema)
if not is_valid:
raise ValueError(f"Schema validation failed: {message}")
Performance Optimization
Speed up your pipelines with these optimization techniques.
Data Partitioning
# Partition by date for time-series data
df.write .format("delta") .partitionBy("event_date") .save("/mnt/events")
# Multi-level partitioning
df.write .format("delta") .partitionBy("year", "month", "day") .save("/mnt/events")
# Query optimized by partitions
spark.sql("""
SELECT *
FROM events
WHERE event_date = '2025-01-15' -- Only reads one partition
""")
# Z-ordering for secondary sort keys
spark.sql("""
OPTIMIZE events
ZORDER BY (user_id, event_type)
""")
Caching Strategies
# Cache frequently accessed data
lookup_table = spark.table("lookup_table").cache()
# Use in multiple operations
result1 = df.join(lookup_table, "id")
result2 = df2.join(lookup_table, "id")
# Unpersist when done
lookup_table.unpersist()
# Broadcast small tables
from pyspark.sql.functions import broadcast
result = large_df.join(
broadcast(small_df),
"key"
)
Incremental Processing
# Track last processed timestamp
last_processed = spark.sql("""
SELECT MAX(processed_at)
FROM events_processed
""").first()[0]
# Process only new data
new_data = spark.read .format("delta") .load("/mnt/raw/events") .where(col("timestamp") > last_processed)
# Process and mark as processed
processed = new_data.withColumn(
"processed_at",
current_timestamp()
)
processed.write .format("delta") .mode("append") .save("/mnt/processed/events")
Lakeflow Jobs: Orchestration
Lakeflow Jobs provides a modern orchestration layer for Databricks pipelines.
Key features:
- Visual pipeline designer
- Dependency management
- Error handling and retries
- Monitoring and alerting
- Version control integration
Example pipeline definition:
# Define a multi-task job
job_config = {
"name": "daily_etl_pipeline",
"tasks": [
{
"task_key": "extract_data",
"notebook_task": {
"notebook_path": "/pipelines/extract"
}
},
{
"task_key": "transform_data",
"depends_on": [{"task_key": "extract_data"}],
"notebook_task": {
"notebook_path": "/pipelines/transform"
}
},
{
"task_key": "load_data",
"depends_on": [{"task_key": "transform_data"}],
"notebook_task": {
"notebook_path": "/pipelines/load"
}
}
],
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?", # Daily at 2 AM
"timezone_id": "America/Los_Angeles"
},
"email_notifications": {
"on_failure": ["data-team@company.com"]
}
}
Troubleshooting Common Issues
Out of Memory Errors
# Problem: OOM when processing large datasets
# Solution 1: Increase executor memory
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.driver.memory", "4g")
# Solution 2: Process in smaller chunks
dates = get_date_range(start_date, end_date)
for date in dates:
df = spark.read.parquet(f"/mnt/data/date={date}")
# Process and write
df.write.mode("append").save("/mnt/output")
# Solution 3: Use repartition to reduce memory per partition
df = df.repartition(200) # More partitions = less memory per partition
Slow Joins
# Problem: Join taking too long
# Solution 1: Broadcast small tables
result = large_df.join(broadcast(small_df), "key")
# Solution 2: Improve join keys
# Use same data type and partition key
df1 = df1.repartition(col("join_key"))
df2 = df2.repartition(col("join_key"))
result = df1.join(df2, "join_key")
# Solution 3: Use bucketing for repeated joins
df.write .format("delta") .bucketBy(100, "user_id") .saveAsTable("events")
Data Skew
# Problem: Some partitions much larger than others
# Solution: Salt the skewed key
from pyspark.sql.functions import rand, concat, lit
df_salted = df.withColumn(
"salted_key",
concat(col("skewed_key"), lit("_"), (rand() * 10).cast("int"))
)
# Join with salted key
result = df_salted.join(other_df, "salted_key")
Real-World Case Study
A retail company processing 50 million transactions daily faced high costs and slow pipelines.
Challenges:
- Monthly Databricks bill: $45,000
- Pipeline runtime: 6 hours
- Frequent out-of-memory errors
- Manual schema management Solutions we used:
- Switched to job clusters for production (saved 40%)
- Turned on spot instances for non-critical jobs (saved 50% more)
- Better partitioning and Z-ordering (2x faster queries)
- Added incremental processing (4x faster pipelines)
- Moved to Lakeflow Declarative Pipelines (eliminated schema issues)
Results:
- Monthly cost: $4,500 (90% reduction)
- Pipeline runtime: 1.5 hours (75% faster)
- Zero schema-related failures
- Team freed up for higher-value work
Getting Started Checklist
Week 1: Foundation
- Set up Databricks workspace
- Configure cloud storage integration
- Create first Delta table
- Build simple ETL pipeline
Week 2: Optimization
- Enable auto-optimization
- Configure cluster autoscaling
- Implement partition strategy
- Set up monitoring
Week 3: Scale
- Migrate to Lakeflow Declarative Pipelines
- Implement incremental processing
- Add data quality checks
- Configure alerts
Week 4: Production
- Set up CI/CD for notebooks
- Document pipeline logic
- Train team on best practices
- Establish cost monitoring
Tools and Resources
Databricks Documentation:
Optimization Tools:
- Query Profile Analyzer
- Cluster Event Logs
- System Tables for billing
- Spark UI for debugging
Community Resources:
- Databricks Community Forum
- GitHub examples and templates
- Databricks Blog
- Stack Overflow databricks tag
The Bottom Line
Databricks ELT/ETL doesn't have to be expensive or complicated. Focus on these fundamentals:
- Right-size your clusters: Use job clusters, turn on autoscaling, use spot instances
- Better storage: Use Delta Lake, partition appropriately, Z-order frequently queried columns
- Process incrementally: Don't reprocess everything, track what's new
- Use Lakeflow: Let Databricks handle orchestration and schema evolution
- Monitor costs: Track spending, identify expensive workloads, improve continuously
Start simple. Build one pipeline. Make it better. Then expand.
The companies seeing 90% cost reductions didn't do everything at once. They started with one improvement, measured the impact, then moved to the next.
Your data pipelines should be fast, reliable, and cost-effective. With these patterns, they will be.
Next steps: Check out our guide on building reliable data pipelines to learn error handling patterns that prevent pipeline failures.
Want help with your Databricks deployment? Get in touch - we've helped 50+ companies cut costs and improve performance.