Streaming and Memory-Efficient Processing¶
Large datasets require specialized handling to avoid memory exhaustion and optimize processing performance. Transmog provides streaming capabilities and memory optimization features designed for datasets that exceed available system memory.
Memory-Efficient Functions¶
Stream Processing¶
The flatten_stream()
function processes data in chunks and writes results directly to files, minimizing memory usage:
import transmog as tm
# Process large dataset with streaming
tm.flatten_stream(
input_path="large_dataset.jsonl",
output_dir="output/",
name="dataset",
low_memory=True,
batch_size=500
)
Low Memory Mode¶
The low_memory
parameter activates comprehensive memory optimization strategies:
# Memory-optimized processing with adaptive features
result = tm.flatten(
data=large_data,
low_memory=True,
batch_size=1000, # Starting batch size (adapts automatically)
skip_empty=True
)
# The system automatically:
# - Monitors memory usage and pressure
# - Adapts batch sizes based on available memory
# - Uses in-place modifications to reduce allocations
# - Applies strategic garbage collection
Batch Processing Configuration¶
Batch Size Optimization¶
Batch size affects memory usage and processing speed:
# Small batches for memory-constrained environments
result = tm.flatten(
data=data,
batch_size=100, # Process 100 records at a time
low_memory=True
)
# Larger batches for performance
result = tm.flatten(
data=data,
batch_size=5000, # Process 5000 records at a time
low_memory=False
)
Memory Usage Patterns¶
Different configurations impact memory consumption. The adaptive memory management system automatically adjusts parameters based on available memory:
Configuration |
Memory Usage |
Processing Speed |
Adaptive Features |
Best For |
---|---|---|---|---|
|
Minimal |
Moderate |
Full optimization |
Limited memory |
|
Low |
High |
Adaptive sizing |
Balanced approach |
|
Variable |
Highest |
Minimal adaptation |
Abundant memory |
The system achieves consistent throughput (13,000+ records/sec) across different memory configurations through adaptive optimization.
Streaming Input Sources¶
JSONL File Processing¶
Large JSONL files are processed line by line:
# Stream processing of JSONL file
tm.flatten_stream(
input_path="data.jsonl",
output_dir="results/",
name="records",
batch_size=1000
)
CSV File Processing¶
Large CSV files with nested JSON columns:
# Stream processing of CSV with JSON columns
tm.flatten_stream(
input_path="data.csv",
output_dir="results/",
name="records",
json_columns=["metadata", "attributes"],
batch_size=2000
)
Output Streaming¶
Direct File Output¶
Results are written directly to files without intermediate storage:
# Stream to multiple output formats
tm.flatten_stream(
input_path="input.jsonl",
output_dir="output/",
name="data",
formats=["json", "csv", "parquet"],
low_memory=True
)
Incremental Processing¶
Large datasets are processed incrementally:
# Process with progress tracking
import logging
logging.basicConfig(level=logging.INFO)
tm.flatten_stream(
input_path="massive_dataset.jsonl",
output_dir="results/",
name="dataset",
batch_size=1000,
low_memory=True
)
# INFO: Processed batch 1 (1000 records)
# INFO: Processed batch 2 (2000 records)
# ...
Memory Optimization Strategies¶
Data Type Optimization¶
Preserve minimal data types to reduce memory usage:
result = tm.flatten(
data=data,
preserve_types=False, # Convert all to strings
skip_null=True, # Remove null values
skip_empty=True, # Remove empty strings/lists
low_memory=True
)
Field Filtering¶
Skip unnecessary fields during processing:
# Custom processor for field filtering
from transmog.process import Processor
from transmog.config import TransmogConfig
config = TransmogConfig.memory_optimized()
processor = Processor(config)
# Process with field exclusions
result = processor.process(
data=data,
exclude_fields=["debug_info", "temp_data", "cache"]
)
Performance Monitoring¶
Memory Usage Tracking¶
Monitor memory consumption during processing:
import psutil
import os
def track_memory():
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024 # MB
# Before processing
initial_memory = track_memory()
result = tm.flatten(
data=large_data,
low_memory=True,
batch_size=1000
)
# After processing
final_memory = track_memory()
print(f"Memory used: {final_memory - initial_memory:.2f} MB")
Processing Time Optimization¶
Balance memory usage with processing time:
import time
start_time = time.time()
tm.flatten_stream(
input_path="data.jsonl",
output_dir="output/",
name="data",
batch_size=2000,
low_memory=True
)
processing_time = time.time() - start_time
print(f"Processing completed in {processing_time:.2f} seconds")
Error Handling in Streaming¶
Resilient Processing¶
Handle errors gracefully during streaming:
# Continue processing despite errors
tm.flatten_stream(
input_path="data_with_errors.jsonl",
output_dir="output/",
name="data",
errors="skip", # Skip problematic records
batch_size=1000
)
Error Logging¶
Track processing issues:
import logging
# Configure error logging
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('processing_errors.log'),
logging.StreamHandler()
]
)
tm.flatten_stream(
input_path="input.jsonl",
output_dir="output/",
name="data",
errors="warn", # Log warnings for errors
batch_size=1000
)
Best Practices¶
Memory-Efficient Configuration¶
Recommended settings for large datasets:
# Optimal streaming configuration
tm.flatten_stream(
input_path="large_dataset.jsonl",
output_dir="output/",
name="dataset",
batch_size=1000, # Moderate batch size
low_memory=True, # Enable memory optimization
skip_empty=True, # Reduce output size
preserve_types=False, # Minimize memory usage
errors="skip", # Continue on errors
formats=["parquet"] # Efficient storage format
)
Disk Space Management¶
Monitor disk usage during streaming operations:
import shutil
def check_disk_space(path):
total, used, free = shutil.disk_usage(path)
return free // (1024**3) # GB free
# Check available space before processing
free_space = check_disk_space("output/")
if free_space < 10: # Less than 10GB
print("Warning: Low disk space available")
# Process with space monitoring
tm.flatten_stream(
input_path="input.jsonl",
output_dir="output/",
name="data",
batch_size=1000
)
Resource Cleanup¶
Ensure proper resource cleanup after processing:
import gc
try:
result = tm.flatten(
data=large_data,
low_memory=True,
batch_size=1000
)
# Process results
print(f"Processed {len(result.main)} main records")
finally:
# Force garbage collection
gc.collect()
Integration Examples¶
With Data Pipelines¶
Integrate streaming processing into data pipelines:
def process_daily_data(date_str):
"""Process daily data files efficiently."""
input_file = f"data/daily_{date_str}.jsonl"
output_dir = f"processed/{date_str}/"
tm.flatten_stream(
input_path=input_file,
output_dir=output_dir,
name="daily_records",
batch_size=2000,
low_memory=True,
formats=["parquet", "csv"]
)
return output_dir
# Process multiple days
for day in ["2024-01-01", "2024-01-02", "2024-01-03"]:
output_path = process_daily_data(day)
print(f"Processed {day} data to {output_path}")
With Monitoring Systems¶
Integrate with monitoring and alerting:
import logging
from datetime import datetime
def monitored_processing(input_path, output_dir):
"""Process data with comprehensive monitoring."""
start_time = datetime.now()
initial_memory = track_memory()
try:
tm.flatten_stream(
input_path=input_path,
output_dir=output_dir,
name="monitored_data",
batch_size=1000,
low_memory=True,
errors="warn"
)
# Log success metrics
duration = datetime.now() - start_time
memory_used = track_memory() - initial_memory
logging.info(f"Processing completed successfully")
logging.info(f"Duration: {duration}")
logging.info(f"Memory used: {memory_used:.2f} MB")
except Exception as e:
logging.error(f"Processing failed: {e}")
raise
# Set up monitoring
logging.basicConfig(level=logging.INFO)
monitored_processing("large_file.jsonl", "output/")