File Processing¶
This guide covers processing files directly with Transmog, including JSON and CSV files with various options and configurations.
File Processing Basics¶
Transmog provides the flatten_file()
function for direct file processing:
import transmog as tm
# Process JSON file
result = tm.flatten_file("data.json", name="records")
# Process with auto-detected name from filename
result = tm.flatten_file("products.json") # name="products"
# Process with custom configuration
result = tm.flatten_file(
"data.json",
name="entities",
separator=".",
arrays="separate"
)
JSON File Processing¶
Basic JSON Processing¶
# Simple JSON file processing
result = tm.flatten_file("products.json")
# Access results
print(f"Processed {len(result.main)} records")
print(f"Created {len(result.tables)} child tables")
# Save processed results
result.save("output", output_format="csv")
JSON File Structure Support¶
Transmog handles various JSON file structures:
Single Object¶
{
"company": "TechCorp",
"employees": ["Alice", "Bob"]
}
Array of Objects¶
[
{"id": 1, "name": "Product A"},
{"id": 2, "name": "Product B"}
]
Nested Structure¶
{
"data": {
"records": [
{"nested": {"values": [1, 2, 3]}}
]
}
}
Large JSON Files¶
For large JSON files, consider streaming:
# Stream large JSON files directly to output
tm.flatten_stream(
"large_data.json",
output_path="processed/",
name="large_dataset",
output_format="parquet",
batch_size=1000,
low_memory=True
)
CSV File Processing¶
Basic CSV Processing¶
# Process CSV file
result = tm.flatten_file("employees.csv", name="employees")
# CSV files typically produce only a main table
print(f"Processed {len(result.main)} CSV records")
CSV Configuration Options¶
The flatten_file()
function auto-detects CSV format and handles it appropriately:
# Process CSV with custom entity name
result = tm.flatten_file("data.csv", name="custom_name")
# All flatten() options work with CSV files
result = tm.flatten_file(
"data.csv",
name="records",
preserve_types=True,
skip_null=False
)
CSV File Requirements¶
CSV files should follow these guidelines:
First row contains column headers
Consistent column structure throughout
Standard CSV delimiters (comma, semicolon, tab)
UTF-8 encoding preferred
File Format Detection¶
Transmog automatically detects file formats based on extensions:
Extension |
Format |
Processing |
---|---|---|
|
JSON |
Full nested processing |
|
JSON Lines |
Line-by-line processing |
|
CSV |
Tabular processing |
# Automatic format detection
result = tm.flatten_file("data.json") # Processed as JSON
result = tm.flatten_file("data.csv") # Processed as CSV
result = tm.flatten_file("data.jsonl") # Processed as JSON Lines
Advanced File Processing¶
Custom Naming from Filename¶
from pathlib import Path
# Auto-extract entity name from filename
file_path = "customer_orders.json"
name = Path(file_path).stem # "customer_orders"
result = tm.flatten_file(file_path, name=name)
Processing Multiple Files¶
import glob
# Process multiple files with consistent configuration
results = []
for file_path in glob.glob("data/*.json"):
name = Path(file_path).stem
result = tm.flatten_file(
file_path,
name=name,
separator="_",
arrays="separate"
)
results.append(result)
# Combine or process results as needed
total_records = sum(len(r.main) for r in results)
print(f"Processed {total_records} total records")
Batch File Processing¶
def process_data_directory(input_dir, output_dir):
"""Process all JSON files in a directory."""
input_path = Path(input_dir)
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
for file_path in input_path.glob("*.json"):
print(f"Processing {file_path.name}...")
# Process file
result = tm.flatten_file(file_path, name=file_path.stem)
# Save results
output_file = output_path / file_path.stem
result.save(output_file, output_format="csv")
# Process entire directory
process_data_directory("raw_data/", "processed_data/")
Error Handling with Files¶
Robust File Processing¶
def safe_file_processing(file_path, **options):
"""Process file with comprehensive error handling."""
try:
result = tm.flatten_file(file_path, **options)
return result, None
except FileNotFoundError:
return None, f"File not found: {file_path}"
except tm.ValidationError as e:
return None, f"Configuration error: {e}"
except tm.TransmogError as e:
return None, f"Processing error: {e}"
except Exception as e:
return None, f"Unexpected error: {e}"
# Use safe processing
result, error = safe_file_processing("data.json", name="data")
if error:
print(f"Error: {error}")
else:
print(f"Success: {len(result.main)} records processed")
Skip Problematic Files¶
def process_files_with_recovery(file_patterns, output_dir):
"""Process files with error recovery."""
successful = 0
failed = 0
for pattern in file_patterns:
for file_path in glob.glob(pattern):
try:
result = tm.flatten_file(
file_path,
name=Path(file_path).stem,
errors="skip" # Skip problematic records
)
# Save successful results
output_file = Path(output_dir) / f"{Path(file_path).stem}.json"
result.save(output_file)
successful += 1
except Exception as e:
print(f"Failed to process {file_path}: {e}")
failed += 1
print(f"Processed {successful} files successfully, {failed} failed")
# Process with recovery
process_files_with_recovery(["data/*.json", "backup/*.json"], "output/")
Performance Optimization¶
Memory-Efficient File Processing¶
# For large files, use low memory mode
result = tm.flatten_file(
"large_file.json",
name="large_data",
low_memory=True,
batch_size=500
)
Streaming for Very Large Files¶
# Stream very large files directly to output
tm.flatten_stream(
"huge_dataset.json",
output_path="streaming_output/",
name="huge_data",
output_format="parquet",
batch_size=1000,
low_memory=True,
errors="skip"
)
File Validation¶
Pre-Processing Validation¶
import json
from pathlib import Path
def validate_json_file(file_path):
"""Validate JSON file before processing."""
try:
with open(file_path, 'r') as f:
data = json.load(f)
if not data:
return False, "Empty file"
if not isinstance(data, (dict, list)):
return False, "Invalid JSON structure"
return True, "Valid"
except json.JSONDecodeError as e:
return False, f"Invalid JSON: {e}"
except Exception as e:
return False, f"Error reading file: {e}"
# Validate before processing
file_path = "data.json"
is_valid, message = validate_json_file(file_path)
if is_valid:
result = tm.flatten_file(file_path)
else:
print(f"Cannot process file: {message}")
Post-Processing Validation¶
def validate_results(result, expected_min_records=1):
"""Validate processing results."""
issues = []
# Check main table
if len(result.main) < expected_min_records:
issues.append(f"Too few main records: {len(result.main)}")
# Check for orphaned child records
if result.tables:
main_ids = {r["_id"] for r in result.main}
for table_name, records in result.tables.items():
orphaned = [r for r in records if r["_parent_id"] not in main_ids]
if orphaned:
issues.append(f"Orphaned records in {table_name}: {len(orphaned)}")
return issues
# Validate results
result = tm.flatten_file("data.json")
issues = validate_results(result)
if issues:
for issue in issues:
print(f"Warning: {issue}")
Integration with Other Tools¶
Database Loading¶
# Process for database import
result = tm.flatten_file(
"export.json",
name="entities",
id_field="id",
preserve_types=False, # Convert to strings for SQL
skip_null=True
)
# Save as CSV for database import
result.save("db_import", output_format="csv")
Data Pipeline Integration¶
def file_to_pipeline(input_file, pipeline_config):
"""Process file for data pipeline."""
result = tm.flatten_file(
input_file,
name=pipeline_config.get("entity_name", "data"),
separator=pipeline_config.get("separator", "_"),
arrays=pipeline_config.get("array_handling", "separate"),
preserve_types=pipeline_config.get("preserve_types", False)
)
# Apply pipeline-specific transformations
for table_name, records in result.all_tables.items():
# Apply transformations based on pipeline requirements
transformed = apply_pipeline_transforms(records, pipeline_config)
result._result.replace_table(table_name, transformed)
return result
Best Practices¶
File Organization¶
project/
├── raw_data/ # Original files
│ ├── products.json
│ └── orders.csv
├── processed/ # Flattened results
│ ├── products/
│ └── orders/
└── scripts/ # Processing scripts
└── process_files.py
Configuration Management¶
# Use configuration files for consistent processing
import yaml
def load_processing_config(config_file):
"""Load processing configuration from YAML."""
with open(config_file, 'r') as f:
return yaml.safe_load(f)
# config.yaml
"""
default:
separator: "_"
arrays: "separate"
preserve_types: false
products:
separator: "."
id_field: "product_id"
orders:
arrays: "inline"
preserve_types: true
"""
# Use configuration
config = load_processing_config("config.yaml")
result = tm.flatten_file("products.json", **config["products"])
Next Steps¶
Array Handling - Advanced array processing options
Output Formats - Choosing and configuring output formats
Streaming Guide - Memory-efficient processing for large files