File Processing

This guide covers processing files directly with Transmog, including JSON and CSV files with various options and configurations.

File Processing Basics

Transmog provides the flatten_file() function for direct file processing:

import transmog as tm

# Process JSON file
result = tm.flatten_file("data.json", name="records")

# Process with auto-detected name from filename
result = tm.flatten_file("products.json")  # name="products"

# Process with custom configuration
result = tm.flatten_file(
    "data.json",
    name="entities",
    separator=".",
    arrays="separate"
)

JSON File Processing

Basic JSON Processing

# Simple JSON file processing
result = tm.flatten_file("products.json")

# Access results
print(f"Processed {len(result.main)} records")
print(f"Created {len(result.tables)} child tables")

# Save processed results
result.save("output", output_format="csv")

JSON File Structure Support

Transmog handles various JSON file structures:

Single Object

{
  "company": "TechCorp",
  "employees": ["Alice", "Bob"]
}

Array of Objects

[
  {"id": 1, "name": "Product A"},
  {"id": 2, "name": "Product B"}
]

Nested Structure

{
  "data": {
    "records": [
      {"nested": {"values": [1, 2, 3]}}
    ]
  }
}

Large JSON Files

For large JSON files, consider streaming:

# Stream large JSON files directly to output
tm.flatten_stream(
    "large_data.json",
    output_path="processed/",
    name="large_dataset",
    output_format="parquet",
    batch_size=1000,
    low_memory=True
)

CSV File Processing

Basic CSV Processing

# Process CSV file
result = tm.flatten_file("employees.csv", name="employees")

# CSV files typically produce only a main table
print(f"Processed {len(result.main)} CSV records")

CSV Configuration Options

The flatten_file() function auto-detects CSV format and handles it appropriately:

# Process CSV with custom entity name
result = tm.flatten_file("data.csv", name="custom_name")

# All flatten() options work with CSV files
result = tm.flatten_file(
    "data.csv",
    name="records",
    preserve_types=True,
    skip_null=False
)

CSV File Requirements

CSV files should follow these guidelines:

  • First row contains column headers

  • Consistent column structure throughout

  • Standard CSV delimiters (comma, semicolon, tab)

  • UTF-8 encoding preferred

File Format Detection

Transmog automatically detects file formats based on extensions:

Extension

Format

Processing

.json

JSON

Full nested processing

.jsonl, .ndjson

JSON Lines

Line-by-line processing

.csv

CSV

Tabular processing

# Automatic format detection
result = tm.flatten_file("data.json")    # Processed as JSON
result = tm.flatten_file("data.csv")     # Processed as CSV
result = tm.flatten_file("data.jsonl")   # Processed as JSON Lines

Advanced File Processing

Custom Naming from Filename

from pathlib import Path

# Auto-extract entity name from filename
file_path = "customer_orders.json"
name = Path(file_path).stem  # "customer_orders"
result = tm.flatten_file(file_path, name=name)

Processing Multiple Files

import glob

# Process multiple files with consistent configuration
results = []
for file_path in glob.glob("data/*.json"):
    name = Path(file_path).stem
    result = tm.flatten_file(
        file_path,
        name=name,
        separator="_",
        arrays="separate"
    )
    results.append(result)

# Combine or process results as needed
total_records = sum(len(r.main) for r in results)
print(f"Processed {total_records} total records")

Batch File Processing

def process_data_directory(input_dir, output_dir):
    """Process all JSON files in a directory."""
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)

    for file_path in input_path.glob("*.json"):
        print(f"Processing {file_path.name}...")

        # Process file
        result = tm.flatten_file(file_path, name=file_path.stem)

        # Save results
        output_file = output_path / file_path.stem
        result.save(output_file, output_format="csv")

# Process entire directory
process_data_directory("raw_data/", "processed_data/")

Error Handling with Files

Robust File Processing

def safe_file_processing(file_path, **options):
    """Process file with comprehensive error handling."""
    try:
        result = tm.flatten_file(file_path, **options)
        return result, None
    except FileNotFoundError:
        return None, f"File not found: {file_path}"
    except tm.ValidationError as e:
        return None, f"Configuration error: {e}"
    except tm.TransmogError as e:
        return None, f"Processing error: {e}"
    except Exception as e:
        return None, f"Unexpected error: {e}"

# Use safe processing
result, error = safe_file_processing("data.json", name="data")
if error:
    print(f"Error: {error}")
else:
    print(f"Success: {len(result.main)} records processed")

Skip Problematic Files

def process_files_with_recovery(file_patterns, output_dir):
    """Process files with error recovery."""
    successful = 0
    failed = 0

    for pattern in file_patterns:
        for file_path in glob.glob(pattern):
            try:
                result = tm.flatten_file(
                    file_path,
                    name=Path(file_path).stem,
                    errors="skip"  # Skip problematic records
                )

                # Save successful results
                output_file = Path(output_dir) / f"{Path(file_path).stem}.json"
                result.save(output_file)
                successful += 1

            except Exception as e:
                print(f"Failed to process {file_path}: {e}")
                failed += 1

    print(f"Processed {successful} files successfully, {failed} failed")

# Process with recovery
process_files_with_recovery(["data/*.json", "backup/*.json"], "output/")

Performance Optimization

Memory-Efficient File Processing

# For large files, use low memory mode
result = tm.flatten_file(
    "large_file.json",
    name="large_data",
    low_memory=True,
    batch_size=500
)

Streaming for Very Large Files

# Stream very large files directly to output
tm.flatten_stream(
    "huge_dataset.json",
    output_path="streaming_output/",
    name="huge_data",
    output_format="parquet",
    batch_size=1000,
    low_memory=True,
    errors="skip"
)

File Validation

Pre-Processing Validation

import json
from pathlib import Path

def validate_json_file(file_path):
    """Validate JSON file before processing."""
    try:
        with open(file_path, 'r') as f:
            data = json.load(f)

        if not data:
            return False, "Empty file"

        if not isinstance(data, (dict, list)):
            return False, "Invalid JSON structure"

        return True, "Valid"

    except json.JSONDecodeError as e:
        return False, f"Invalid JSON: {e}"
    except Exception as e:
        return False, f"Error reading file: {e}"

# Validate before processing
file_path = "data.json"
is_valid, message = validate_json_file(file_path)

if is_valid:
    result = tm.flatten_file(file_path)
else:
    print(f"Cannot process file: {message}")

Post-Processing Validation

def validate_results(result, expected_min_records=1):
    """Validate processing results."""
    issues = []

    # Check main table
    if len(result.main) < expected_min_records:
        issues.append(f"Too few main records: {len(result.main)}")

    # Check for orphaned child records
    if result.tables:
        main_ids = {r["_id"] for r in result.main}
        for table_name, records in result.tables.items():
            orphaned = [r for r in records if r["_parent_id"] not in main_ids]
            if orphaned:
                issues.append(f"Orphaned records in {table_name}: {len(orphaned)}")

    return issues

# Validate results
result = tm.flatten_file("data.json")
issues = validate_results(result)
if issues:
    for issue in issues:
        print(f"Warning: {issue}")

Integration with Other Tools

Database Loading

# Process for database import
result = tm.flatten_file(
    "export.json",
    name="entities",
    id_field="id",
    preserve_types=False,    # Convert to strings for SQL
    skip_null=True
)

# Save as CSV for database import
result.save("db_import", output_format="csv")

Data Pipeline Integration

def file_to_pipeline(input_file, pipeline_config):
    """Process file for data pipeline."""
    result = tm.flatten_file(
        input_file,
        name=pipeline_config.get("entity_name", "data"),
        separator=pipeline_config.get("separator", "_"),
        arrays=pipeline_config.get("array_handling", "separate"),
        preserve_types=pipeline_config.get("preserve_types", False)
    )

    # Apply pipeline-specific transformations
    for table_name, records in result.all_tables.items():
        # Apply transformations based on pipeline requirements
        transformed = apply_pipeline_transforms(records, pipeline_config)
        result._result.replace_table(table_name, transformed)

    return result

Best Practices

File Organization

project/
├── raw_data/           # Original files
│   ├── products.json
│   └── orders.csv
├── processed/          # Flattened results
│   ├── products/
│   └── orders/
└── scripts/           # Processing scripts
    └── process_files.py

Configuration Management

# Use configuration files for consistent processing
import yaml

def load_processing_config(config_file):
    """Load processing configuration from YAML."""
    with open(config_file, 'r') as f:
        return yaml.safe_load(f)

# config.yaml
"""
default:
  separator: "_"
  arrays: "separate"
  preserve_types: false

products:
  separator: "."
  id_field: "product_id"

orders:
  arrays: "inline"
  preserve_types: true
"""

# Use configuration
config = load_processing_config("config.yaml")
result = tm.flatten_file("products.json", **config["products"])

Next Steps