Performance Optimization

Transmog performance can be optimized through configuration tuning, data preparation, and processing strategies. This guide covers techniques for maximizing throughput and minimizing processing time.

Configuration Optimization

Batch Size Tuning

Batch size significantly impacts processing performance:

import transmog as tm
import time

def benchmark_batch_sizes(data, sizes=[100, 500, 1000, 2000, 5000]):
    """Benchmark different batch sizes."""
    results = {}

    for size in sizes:
        start = time.time()
        result = tm.flatten(data, batch_size=size)
        duration = time.time() - start
        results[size] = duration

    return results

# Find optimal batch size
data = [{"nested": {"field": i}} for i in range(10000)]
timings = benchmark_batch_sizes(data)

for size, duration in timings.items():
    print(f"Batch size {size}: {duration:.2f}s")

Memory vs Speed Trade-offs

Balance memory usage with processing speed. The system includes adaptive memory management that automatically adjusts processing parameters based on available memory:

# High performance configuration
fast_config = {
    "batch_size": 5000,
    "low_memory": False,
    "preserve_types": True
}

# Memory-efficient configuration (uses adaptive batch sizing)
efficient_config = {
    "batch_size": 1000,
    "low_memory": True,
    "preserve_types": False
}

# Benchmark both approaches
data = load_large_dataset()

start = time.time()
fast_result = tm.flatten(data, **fast_config)
fast_time = time.time() - start

start = time.time()
efficient_result = tm.flatten(data, **efficient_config)
efficient_time = time.time() - start

print(f"Fast config: {fast_time:.2f}s")
print(f"Efficient config: {efficient_time:.2f}s")

Data Preparation Optimization

Input Data Structure

Optimize input data structure for better performance:

# Inefficient: Deeply nested with many empty fields
inefficient_data = {
    "level1": {
        "level2": {
            "level3": {
                "level4": {
                    "value": "data",
                    "empty1": None,
                    "empty2": "",
                    "empty3": []
                }
            }
        }
    }
}

# Efficient: Flatter structure, minimal empty fields
efficient_data = {
    "level1_level2_value": "data",
    "metadata": {"timestamp": "2024-01-01"}
}

# Process with optimized settings
result = tm.flatten(
    efficient_data,
    skip_empty=True,
    skip_null=True,
    nested_threshold=3
)

Array Handling Optimization

Choose optimal array handling strategy:

data_with_arrays = {
    "users": [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"}
    ]
}

# Fastest: Skip arrays if not needed
result_skip = tm.flatten(data_with_arrays, arrays="skip")

# Moderate: Inline small arrays
result_inline = tm.flatten(data_with_arrays, arrays="inline")

# Slowest: Separate tables (most flexible)
result_separate = tm.flatten(data_with_arrays, arrays="separate")

Processing Strategies

Parallel Processing

Process multiple datasets concurrently:

import concurrent.futures
import transmog as tm

def process_file(file_path):
    """Process a single file."""
    return tm.flatten_file(
        file_path,
        batch_size=2000,
        low_memory=True
    )

# Process multiple files in parallel
file_paths = ["data1.json", "data2.json", "data3.json"]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(process_file, path) for path in file_paths]
    results = [future.result() for future in futures]

print(f"Processed {len(results)} files")

Incremental Processing

Process large datasets incrementally:

def process_incrementally(large_dataset, chunk_size=10000):
    """Process dataset in chunks."""
    results = []

    for i in range(0, len(large_dataset), chunk_size):
        chunk = large_dataset[i:i + chunk_size]
        result = tm.flatten(
            chunk,
            name=f"chunk_{i//chunk_size}",
            batch_size=2000
        )
        results.append(result)

        # Optional: Clear memory between chunks
        del chunk

    return results

# Process 100k records in 10k chunks
large_data = generate_large_dataset(100000)
chunk_results = process_incrementally(large_data)

Advanced Configuration

Memory Optimization Features

Transmog includes adaptive memory management that automatically adjusts processing parameters:

import transmog as tm

# Enable memory tracking and adaptive sizing
result = tm.flatten(
    large_dataset,
    batch_size=1000,           # Starting batch size
    low_memory=True,           # Enable memory-efficient mode
    # Memory optimization is automatic when low_memory=True
)

# The system will:
# - Monitor memory usage during processing
# - Adapt batch sizes based on available memory
# - Use strategic garbage collection to reduce pressure
# - Apply in-place modifications to reduce allocations

Memory Optimization Strategies

The memory optimization system reduces memory usage through several techniques:

  • In-place modifications: 60-70% reduction in object allocations

  • Efficient path building: 40-50% reduction in string operations

  • Adaptive caching: Memory-aware cache sizing that responds to pressure

  • Strategic garbage collection: Intelligent timing of memory cleanup

Custom Processor Configuration

Use advanced processor settings for optimal performance:

from transmog.process import Processor
from transmog.config import TransmogConfig

# Performance-optimized configuration with memory awareness
config = (
    TransmogConfig.performance_optimized()
    .with_memory_optimization(
        memory_tracking_enabled=True,
        adaptive_batch_sizing=True,
        memory_pressure_threshold=0.8
    )
    .with_processing(
        batch_size=5000,
        memory_limit="2GB"
    )
    .with_naming(
        separator="_",
        max_depth=5
    )
)

processor = Processor(config)
result = processor.process(data)

Type Preservation Optimization

Optimize type handling based on use case:

# Fast: Convert all to strings
result_strings = tm.flatten(
    data,
    preserve_types=False,
    batch_size=5000
)

# Slower but preserves data integrity
result_typed = tm.flatten(
    data,
    preserve_types=True,
    batch_size=3000
)

Performance Monitoring

Execution Time Profiling

Profile processing performance:

import time
import cProfile

def profile_processing():
    """Profile flatten operation."""
    data = generate_test_data(10000)

    # Profile with cProfile
    profiler = cProfile.Profile()
    profiler.enable()

    result = tm.flatten(data, batch_size=2000)

    profiler.disable()
    profiler.print_stats(sort='tottime')

    return result

# Run profiling
profile_processing()

Memory Usage Monitoring

Track memory consumption:

import psutil
import os

class MemoryMonitor:
    def __init__(self):
        self.process = psutil.Process(os.getpid())
        self.initial_memory = self.current_memory()

    def current_memory(self):
        return self.process.memory_info().rss / 1024 / 1024  # MB

    def memory_used(self):
        return self.current_memory() - self.initial_memory

# Monitor processing
monitor = MemoryMonitor()

result = tm.flatten(
    large_data,
    batch_size=2000,
    low_memory=True
)

print(f"Memory used: {monitor.memory_used():.2f} MB")
print(f"Records processed: {len(result.main)}")

Throughput Measurement

Measure processing throughput:

def measure_throughput(data, config):
    """Measure records processed per second."""
    start_time = time.time()
    result = tm.flatten(data, **config)
    end_time = time.time()

    duration = end_time - start_time
    record_count = len(result.main)
    throughput = record_count / duration

    return {
        "duration": duration,
        "records": record_count,
        "throughput": throughput
    }

# Test different configurations
configs = [
    {"batch_size": 1000, "low_memory": True},
    {"batch_size": 2000, "low_memory": True},
    {"batch_size": 5000, "low_memory": False}
]

data = generate_test_data(50000)

for i, config in enumerate(configs):
    metrics = measure_throughput(data, config)
    print(f"Config {i+1}: {metrics['throughput']:.0f} records/sec")

Optimization Guidelines

Choosing Optimal Settings

Recommended configurations for different scenarios:

# High-volume, simple data
high_volume_config = {
    "batch_size": 5000,
    "low_memory": False,
    "preserve_types": False,
    "skip_empty": True,
    "arrays": "inline"
}

# Complex nested data
complex_data_config = {
    "batch_size": 2000,
    "low_memory": True,
    "preserve_types": True,
    "nested_threshold": 6,
    "arrays": "separate"
}

# Memory-constrained environment
memory_constrained_config = {
    "batch_size": 500,
    "low_memory": True,
    "preserve_types": False,
    "skip_empty": True,
    "skip_null": True
}

Performance Testing Framework

Create systematic performance tests:

class PerformanceTest:
    def __init__(self, data_generator):
        self.data_generator = data_generator
        self.results = []

    def test_config(self, config, data_size=10000):
        """Test a specific configuration."""
        data = self.data_generator(data_size)

        start_time = time.time()
        initial_memory = self.get_memory()

        result = tm.flatten(data, **config)

        end_time = time.time()
        final_memory = self.get_memory()

        metrics = {
            "config": config,
            "duration": end_time - start_time,
            "memory_used": final_memory - initial_memory,
            "records_processed": len(result.main),
            "throughput": len(result.main) / (end_time - start_time)
        }

        self.results.append(metrics)
        return metrics

    def get_memory(self):
        process = psutil.Process(os.getpid())
        return process.memory_info().rss / 1024 / 1024

    def best_config(self, metric="throughput"):
        """Find best configuration by metric."""
        return max(self.results, key=lambda x: x[metric])

# Usage
def generate_nested_data(size):
    return [{"nested": {"field": i}} for i in range(size)]

tester = PerformanceTest(generate_nested_data)

# Test multiple configurations
configs = [
    {"batch_size": 1000},
    {"batch_size": 2000},
    {"batch_size": 5000}
]

for config in configs:
    metrics = tester.test_config(config)
    print(f"Config {config}: {metrics['throughput']:.0f} records/sec")

best = tester.best_config("throughput")
print(f"Best config: {best['config']}")

Real-World Optimization Examples

E-commerce Data Processing

Optimize for typical e-commerce data structures:

# E-commerce product data
ecommerce_data = {
    "products": [
        {
            "id": "prod_123",
            "name": "Widget",
            "price": 19.99,
            "attributes": {
                "color": "red",
                "size": "large",
                "materials": ["plastic", "metal"]
            },
            "reviews": [
                {"rating": 5, "comment": "Great!"},
                {"rating": 4, "comment": "Good"}
            ]
        }
    ]
}

# Optimized processing
result = tm.flatten(
    ecommerce_data,
    batch_size=3000,
    arrays="separate",  # Extract reviews as separate table
    id_field={"products": "id"},  # Use product ID
    preserve_types=True  # Keep price as number
)

Log Data Processing

Optimize for log file processing:

# Log entry structure
log_entries = [
    {
        "timestamp": "2024-01-01T12:00:00Z",
        "level": "INFO",
        "message": "User action",
        "context": {
            "user_id": "user_123",
            "session_id": "sess_456",
            "metadata": {
                "ip": "192.168.1.1",
                "user_agent": "Mozilla/5.0..."
            }
        }
    }
]

# Optimized for log processing
result = tm.flatten(
    log_entries,
    batch_size=10000,  # Large batches for simple structures
    preserve_types=False,  # Everything as strings
    skip_empty=True,  # Remove empty fields
    arrays="skip",  # Skip arrays in logs
    separator="."  # Use dot notation
)

Sensor Data Processing

Optimize for IoT sensor data:

# Sensor readings
sensor_data = [
    {
        "device_id": "sensor_001",
        "timestamp": "2024-01-01T12:00:00Z",
        "readings": {
            "temperature": 23.5,
            "humidity": 65.2,
            "pressure": 1013.25
        },
        "location": {
            "lat": 40.7128,
            "lon": -74.0060
        }
    }
]

# Optimized for numerical data
result = tm.flatten(
    sensor_data,
    batch_size=5000,
    preserve_types=True,  # Keep numerical precision
    arrays="inline",  # Simple structure
    id_field="device_id",  # Use device ID
    add_timestamp=True  # Add processing timestamp
)