Building Reliable Data Pipelines: Error Handling Patterns
How to handle errors in data pipelines without breaking everything. Retry strategies, dead letter queues, and monitoring patterns that work in production.
Data pipelines fail. Files arrive late. APIs return errors. Databases timeout. Network connections drop.
The question isn't whether your pipeline will fail. It's how you handle failures when they happen.
After building pipelines for dozens of companies, we've seen the same mistakes repeated. Teams focus on the happy path, then scramble when things break. Here's how to handle errors from the start.
Why Error Handling Matters
A pipeline that fails silently is worse than one that crashes loudly. At least crashes get noticed.
Common problems:
- Pipeline fails, no one knows for hours
- Partial data gets written, causing inconsistencies
- Errors cascade, breaking downstream processes
- No way to recover without manual intervention
What good error handling gives you:
- Visibility into what's failing and why
- Ability to recover automatically when possible
- Clear alerts when manual intervention is needed
- Audit trail for debugging and compliance
Error Categories
Not all errors are created equal. Categorize them to handle appropriately:
Transient errors:
- Network timeouts
- Temporary API rate limits
- Database connection pool exhaustion
- These might succeed on retry
Permanent errors:
- Invalid data format
- Missing required fields
- Authentication failures
- These won't succeed on retry
Partial failures:
- Some records succeed, others fail
- File partially processed
- Batch job partially completed
- Need careful handling to avoid data loss
Retry Strategies
Exponential backoff: Wait longer between each retry attempt instead of retrying immediately.
import time
import random
def retry_with_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except TransientError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
continue
When to retry:
- Network errors (timeouts, connection refused)
- Rate limit errors (429 status codes)
- Service unavailable (503 errors)
- Database deadlocks
When not to retry:
- Authentication errors (401, 403)
- Invalid input (400 errors)
- Not found errors (404)
- Validation failures
Retry limits: Set maximum retries. Infinite retries can cause cascading failures. Three attempts usually works for transient errors.
Dead Letter Queues
Some records will never succeed. Don't let them block your pipeline.
What is a dead letter queue: A storage location for records that failed after all retries. Investigate them later without stopping the pipeline.
Implementation pattern:
def process_record(record):
try:
# Process record
transform_and_load(record)
except PermanentError as e:
# Send to dead letter queue
send_to_dlq(record, error=str(e))
# Continue processing other records
return
except TransientError as e:
# Retry logic here
raise
Dead letter queue storage:
- S3 bucket for failed records
- Database table with error details
- Message queue (SQS, RabbitMQ)
- Cloud storage with metadata
What to store:
- Original record
- Error message
- Timestamp
- Retry count
- Context (file name, batch ID, etc.)
Review process: Check dead letter queues regularly. Some errors reveal data quality issues. Others need code fixes. Don't let failed records pile up.
Monitoring and Alerting
What to monitor:
- Pipeline success/failure rates
- Processing latency
- Error rates by type
- Dead letter queue size
- Retry counts
Alert thresholds:
- Pipeline failure (immediate alert)
- Error rate spike (alert if error rate exceeds 5% of records)
- Dead letter queue growth (alert if more than 100 records per hour)
- Latency increase (alert if latency doubles)
Alert channels:
- PagerDuty for critical failures
- Slack for warnings
- Email for daily summaries
- Dashboard for real-time visibility
Error Handling by Pipeline Stage
Ingestion:
- Validate file format before processing
- Check file completeness (expected row count, file size)
- Handle missing files gracefully
- Log all ingestion attempts
Transformation:
- Validate data types
- Check required fields
- Handle null values explicitly
- Log transformation errors with context
Loading:
- Use transactions for atomic writes
- Handle duplicate key errors
- Check foreign key constraints
- Validate data before writing
Common Patterns
Pattern 1: Fail fast on invalid data
def validate_record(record):
required_fields = ['id', 'timestamp', 'value']
for field in required_fields:
if field not in record:
raise ValidationError(f"Missing required field: {field}")
if not isinstance(record['value'], (int, float)):
raise ValidationError(f"Invalid value type: {type(record['value'])}")
Pattern 2: Continue on partial failures
def process_batch(records):
successful = []
failed = []
for record in records:
try:
process_record(record)
successful.append(record)
except Exception as e:
failed.append((record, str(e)))
continue
# Log summary
logger.info(f"Processed {len(successful)}/{len(records)} records")
# Send failures to DLQ
if failed:
send_to_dlq(failed)
return successful, failed
Pattern 3: Circuit breaker Stop retrying when a service keeps failing.
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
def call(self, func):
if self.state == 'open':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'half-open'
else:
raise CircuitBreakerOpenError()
try:
result = func()
self.failure_count = 0
self.state = 'closed'
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
raise
Testing Error Handling
Test scenarios:
- Network failures (simulate timeouts)
- Invalid data (malformed records)
- Service outages (mock API failures)
- Partial failures (some records succeed)
Test approach:
def test_retry_logic():
call_count = 0
def failing_function():
nonlocal call_count
call_count += 1
if call_count < 3:
raise TransientError("Service unavailable")
return "success"
result = retry_with_backoff(failing_function, max_retries=3)
assert result == "success"
assert call_count == 3
Best Practices
Do:
- Log errors with full context
- Use structured logging (JSON format)
- Include correlation IDs for tracing
- Set up alerts for critical failures
- Review dead letter queues regularly
- Document error handling decisions
Don't:
- Swallow errors silently
- Retry forever
- Ignore partial failures
- Skip validation
- Assume external services are reliable
- Skip testing error paths
Real-World Example
A financial data pipeline processes daily transaction files:
- Ingestion: Validates file format, checks file size
- Transformation: Validates each record, handles missing fields
- Loading: Uses transactions, handles duplicates
- Error handling: Retries transient errors, sends permanent failures to DLQ
- Monitoring: Alerts on failures, tracks success rates
When a file arrives with invalid records:
- Valid records process successfully
- Invalid records go to dead letter queue
- Pipeline completes without manual intervention
- Data team reviews DLQ the next day
- Fixes are applied, records reprocessed
The Bottom Line
Error handling isn't optional. It's part of building reliable pipelines.
Start simple: Add retries for transient errors. Log failures clearly. Set up basic alerts.
Then improve: Add dead letter queues. Implement circuit breakers. Build dashboards.
The goal isn't zero failures. Handle failures gracefully so they don't break your pipeline or your team's workflow.
Remember: A pipeline that fails loudly and recovers automatically is better than one that fails silently and corrupts data.