Streaming and Memory-Efficient Processing

Large datasets require specialized handling to avoid memory exhaustion and optimize processing performance. Transmog provides streaming capabilities and memory optimization features designed for datasets that exceed available system memory.

Memory-Efficient Functions

Stream Processing

The flatten_stream() function processes data in chunks and writes results directly to files, minimizing memory usage:

import transmog as tm

# Process large dataset with streaming
tm.flatten_stream(
    input_path="large_dataset.jsonl",
    output_dir="output/",
    name="dataset",
    low_memory=True,
    batch_size=500
)

Low Memory Mode

The low_memory parameter activates comprehensive memory optimization strategies:

# Memory-optimized processing with adaptive features
result = tm.flatten(
    data=large_data,
    low_memory=True,
    batch_size=1000,        # Starting batch size (adapts automatically)
    skip_empty=True
)

# The system automatically:
# - Monitors memory usage and pressure
# - Adapts batch sizes based on available memory
# - Uses in-place modifications to reduce allocations
# - Applies strategic garbage collection

Batch Processing Configuration

Batch Size Optimization

Batch size affects memory usage and processing speed:

# Small batches for memory-constrained environments
result = tm.flatten(
    data=data,
    batch_size=100,  # Process 100 records at a time
    low_memory=True
)

# Larger batches for performance
result = tm.flatten(
    data=data,
    batch_size=5000,  # Process 5000 records at a time
    low_memory=False
)

Memory Usage Patterns

Different configurations impact memory consumption. The adaptive memory management system automatically adjusts parameters based on available memory:

Configuration

Memory Usage

Processing Speed

Adaptive Features

Best For

batch_size=100, low_memory=True

Minimal

Moderate

Full optimization

Limited memory

batch_size=1000, low_memory=True

Low

High

Adaptive sizing

Balanced approach

batch_size=5000, low_memory=False

Variable

Highest

Minimal adaptation

Abundant memory

The system achieves consistent throughput (13,000+ records/sec) across different memory configurations through adaptive optimization.

Streaming Input Sources

JSONL File Processing

Large JSONL files are processed line by line:

# Stream processing of JSONL file
tm.flatten_stream(
    input_path="data.jsonl",
    output_dir="results/",
    name="records",
    batch_size=1000
)

CSV File Processing

Large CSV files with nested JSON columns:

# Stream processing of CSV with JSON columns
tm.flatten_stream(
    input_path="data.csv",
    output_dir="results/",
    name="records",
    json_columns=["metadata", "attributes"],
    batch_size=2000
)

Output Streaming

Direct File Output

Results are written directly to files without intermediate storage:

# Stream to multiple output formats
tm.flatten_stream(
    input_path="input.jsonl",
    output_dir="output/",
    name="data",
    formats=["json", "csv", "parquet"],
    low_memory=True
)

Incremental Processing

Large datasets are processed incrementally:

# Process with progress tracking
import logging

logging.basicConfig(level=logging.INFO)

tm.flatten_stream(
    input_path="massive_dataset.jsonl",
    output_dir="results/",
    name="dataset",
    batch_size=1000,
    low_memory=True
)
# INFO: Processed batch 1 (1000 records)
# INFO: Processed batch 2 (2000 records)
# ...

Memory Optimization Strategies

Data Type Optimization

Preserve minimal data types to reduce memory usage:

result = tm.flatten(
    data=data,
    preserve_types=False,  # Convert all to strings
    skip_null=True,        # Remove null values
    skip_empty=True,       # Remove empty strings/lists
    low_memory=True
)

Field Filtering

Skip unnecessary fields during processing:

# Custom processor for field filtering
from transmog.process import Processor
from transmog.config import TransmogConfig

config = TransmogConfig.memory_optimized()
processor = Processor(config)

# Process with field exclusions
result = processor.process(
    data=data,
    exclude_fields=["debug_info", "temp_data", "cache"]
)

Performance Monitoring

Memory Usage Tracking

Monitor memory consumption during processing:

import psutil
import os

def track_memory():
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # MB

# Before processing
initial_memory = track_memory()

result = tm.flatten(
    data=large_data,
    low_memory=True,
    batch_size=1000
)

# After processing
final_memory = track_memory()
print(f"Memory used: {final_memory - initial_memory:.2f} MB")

Processing Time Optimization

Balance memory usage with processing time:

import time

start_time = time.time()

tm.flatten_stream(
    input_path="data.jsonl",
    output_dir="output/",
    name="data",
    batch_size=2000,
    low_memory=True
)

processing_time = time.time() - start_time
print(f"Processing completed in {processing_time:.2f} seconds")

Error Handling in Streaming

Resilient Processing

Handle errors gracefully during streaming:

# Continue processing despite errors
tm.flatten_stream(
    input_path="data_with_errors.jsonl",
    output_dir="output/",
    name="data",
    errors="skip",  # Skip problematic records
    batch_size=1000
)

Error Logging

Track processing issues:

import logging

# Configure error logging
logging.basicConfig(
    level=logging.WARNING,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('processing_errors.log'),
        logging.StreamHandler()
    ]
)

tm.flatten_stream(
    input_path="input.jsonl",
    output_dir="output/",
    name="data",
    errors="warn",  # Log warnings for errors
    batch_size=1000
)

Best Practices

Memory-Efficient Configuration

Recommended settings for large datasets:

# Optimal streaming configuration
tm.flatten_stream(
    input_path="large_dataset.jsonl",
    output_dir="output/",
    name="dataset",
    batch_size=1000,         # Moderate batch size
    low_memory=True,         # Enable memory optimization
    skip_empty=True,         # Reduce output size
    preserve_types=False,    # Minimize memory usage
    errors="skip",           # Continue on errors
    formats=["parquet"]      # Efficient storage format
)

Disk Space Management

Monitor disk usage during streaming operations:

import shutil

def check_disk_space(path):
    total, used, free = shutil.disk_usage(path)
    return free // (1024**3)  # GB free

# Check available space before processing
free_space = check_disk_space("output/")
if free_space < 10:  # Less than 10GB
    print("Warning: Low disk space available")

# Process with space monitoring
tm.flatten_stream(
    input_path="input.jsonl",
    output_dir="output/",
    name="data",
    batch_size=1000
)

Resource Cleanup

Ensure proper resource cleanup after processing:

import gc

try:
    result = tm.flatten(
        data=large_data,
        low_memory=True,
        batch_size=1000
    )

    # Process results
    print(f"Processed {len(result.main)} main records")

finally:
    # Force garbage collection
    gc.collect()

Integration Examples

With Data Pipelines

Integrate streaming processing into data pipelines:

def process_daily_data(date_str):
    """Process daily data files efficiently."""
    input_file = f"data/daily_{date_str}.jsonl"
    output_dir = f"processed/{date_str}/"

    tm.flatten_stream(
        input_path=input_file,
        output_dir=output_dir,
        name="daily_records",
        batch_size=2000,
        low_memory=True,
        formats=["parquet", "csv"]
    )

    return output_dir

# Process multiple days
for day in ["2024-01-01", "2024-01-02", "2024-01-03"]:
    output_path = process_daily_data(day)
    print(f"Processed {day} data to {output_path}")

With Monitoring Systems

Integrate with monitoring and alerting:

import logging
from datetime import datetime

def monitored_processing(input_path, output_dir):
    """Process data with comprehensive monitoring."""
    start_time = datetime.now()
    initial_memory = track_memory()

    try:
        tm.flatten_stream(
            input_path=input_path,
            output_dir=output_dir,
            name="monitored_data",
            batch_size=1000,
            low_memory=True,
            errors="warn"
        )

        # Log success metrics
        duration = datetime.now() - start_time
        memory_used = track_memory() - initial_memory

        logging.info(f"Processing completed successfully")
        logging.info(f"Duration: {duration}")
        logging.info(f"Memory used: {memory_used:.2f} MB")

    except Exception as e:
        logging.error(f"Processing failed: {e}")
        raise

# Set up monitoring
logging.basicConfig(level=logging.INFO)
monitored_processing("large_file.jsonl", "output/")