Extending and Customization¶
Transmog provides extension points for customizing data transformation behavior, adding functionality, and integrating with external systems. This guide covers advanced customization techniques and extension patterns.
Advanced API Access¶
For developers requiring access to internal functionality, Transmog provides an advanced API namespace that exposes lower-level components:
import transmog as tm
# Standard API (recommended for most users)
result = tm.flatten(data)
writer = tm.io.create_writer("json")
# Advanced API (for power users and extensions)
from transmog.io import advanced
# Access internal registry for custom format registration
registry = advanced.get_format_registry()
advanced.register_writer("custom_format", CustomWriter)
# Access dependency manager for runtime checks
dependency_manager = advanced.get_dependency_manager()
The advanced API provides access to internal classes like FormatRegistry
and DependencyManager
that are not exposed in the standard public API for cleaner interfaces.
Custom Processors¶
Creating Custom Processors¶
Extend the base processor for specialized functionality:
from transmog.process import Processor
from transmog.config import TransmogConfig
class CustomProcessor(Processor):
"""Custom processor with specialized behavior."""
def __init__(self, config=None):
super().__init__(config or TransmogConfig.default())
self.custom_handlers = {}
def register_field_handler(self, field_pattern, handler):
"""Register custom handler for specific fields."""
self.custom_handlers[field_pattern] = handler
def process_field(self, field_name, value, context):
"""Override field processing with custom logic."""
# Check for custom handlers
for pattern, handler in self.custom_handlers.items():
if self.matches_pattern(field_name, pattern):
return handler(value, context)
# Fall back to default processing
return super().process_field(field_name, value, context)
def matches_pattern(self, field_name, pattern):
"""Check if field matches pattern."""
import re
return re.match(pattern, field_name) is not None
# Usage
processor = CustomProcessor()
# Register custom handler for timestamp fields
def timestamp_handler(value, context):
"""Convert timestamp formats."""
from datetime import datetime
if isinstance(value, str):
try:
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
return dt.isoformat()
except ValueError:
return value
return value
processor.register_field_handler(r'.*timestamp.*', timestamp_handler)
result = processor.process(data)
Specialized Data Type Processors¶
Create processors for specific data types:
class GeospatialProcessor(Processor):
"""Processor specialized for geospatial data."""
def process_coordinates(self, value, context):
"""Process coordinate data."""
if isinstance(value, dict) and 'lat' in value and 'lon' in value:
return {
'latitude': float(value['lat']),
'longitude': float(value['lon']),
'coordinate_string': f"{value['lat']},{value['lon']}"
}
return value
def process_field(self, field_name, value, context):
"""Override for geospatial field handling."""
if 'location' in field_name.lower() or 'coordinates' in field_name.lower():
return self.process_coordinates(value, context)
return super().process_field(field_name, value, context)
# Usage
geo_processor = GeospatialProcessor()
result = geo_processor.process(geospatial_data)
Custom Transformations¶
Field Transformation Functions¶
Create reusable field transformation functions:
def create_field_transformer(transformation_func):
"""Create a field transformer."""
def transformer(processor):
original_process_field = processor.process_field
def enhanced_process_field(field_name, value, context):
# Apply transformation
transformed_value = transformation_func(field_name, value, context)
# Continue with normal processing
return original_process_field(field_name, transformed_value, context)
processor.process_field = enhanced_process_field
return processor
return transformer
# Example transformations
def normalize_currency(field_name, value, context):
"""Normalize currency values."""
if 'price' in field_name.lower() or 'cost' in field_name.lower():
if isinstance(value, str):
# Remove currency symbols and convert to float
import re
numeric_value = re.sub(r'[^\d.]', '', value)
try:
return float(numeric_value)
except ValueError:
return value
return value
def standardize_dates(field_name, value, context):
"""Standardize date formats."""
if 'date' in field_name.lower() or 'time' in field_name.lower():
if isinstance(value, str):
from datetime import datetime
date_formats = [
'%Y-%m-%d',
'%m/%d/%Y',
'%d-%m-%Y',
'%Y-%m-%d %H:%M:%S'
]
for fmt in date_formats:
try:
dt = datetime.strptime(value, fmt)
return dt.isoformat()
except ValueError:
continue
return value
# Apply transformations
processor = Processor()
processor = create_field_transformer(normalize_currency)(processor)
processor = create_field_transformer(standardize_dates)(processor)
Validation Transformations¶
Add data validation during processing:
class ValidatingProcessor(Processor):
"""Processor with built-in validation."""
def __init__(self, config=None, validation_rules=None):
super().__init__(config)
self.validation_rules = validation_rules or {}
self.validation_errors = []
def validate_field(self, field_name, value, context):
"""Validate field against rules."""
if field_name in self.validation_rules:
rule = self.validation_rules[field_name]
if 'type' in rule:
expected_type = rule['type']
if not isinstance(value, expected_type):
error = f"Field {field_name}: expected {expected_type}, got {type(value)}"
self.validation_errors.append(error)
return False
if 'range' in rule and isinstance(value, (int, float)):
min_val, max_val = rule['range']
if not (min_val <= value <= max_val):
error = f"Field {field_name}: value {value} not in range [{min_val}, {max_val}]"
self.validation_errors.append(error)
return False
if 'pattern' in rule and isinstance(value, str):
import re
if not re.match(rule['pattern'], value):
error = f"Field {field_name}: value does not match pattern"
self.validation_errors.append(error)
return False
return True
def process_field(self, field_name, value, context):
"""Process with validation."""
if self.validate_field(field_name, value, context):
return super().process_field(field_name, value, context)
else:
# Return original value for invalid data
return value
# Usage with validation rules
validation_rules = {
'age': {'type': int, 'range': (0, 150)},
'email': {'pattern': r'^[^@]+@[^@]+\.[^@]+$'},
'price': {'type': float, 'range': (0, 10000)}
}
validator = ValidatingProcessor(validation_rules=validation_rules)
result = validator.process(data)
if validator.validation_errors:
print("Validation errors:", validator.validation_errors)
Plugin System¶
Creating Plugins¶
Develop reusable plugins for common functionality:
class TransmogPlugin:
"""Base class for Transmog plugins."""
def __init__(self, name, version="1.0.0"):
self.name = name
self.version = version
def configure(self, processor):
"""Configure the processor with plugin functionality."""
raise NotImplementedError
def validate_config(self, config):
"""Validate plugin configuration."""
return True
class DataCleaningPlugin(TransmogPlugin):
"""Plugin for data cleaning operations."""
def __init__(self, cleaning_rules=None):
super().__init__("data_cleaning", "1.0.0")
self.cleaning_rules = cleaning_rules or {}
def configure(self, processor):
"""Add data cleaning to processor."""
original_process_field = processor.process_field
def clean_field(field_name, value, context):
# Apply cleaning rules
cleaned_value = self.clean_value(field_name, value)
return original_process_field(field_name, cleaned_value, context)
processor.process_field = clean_field
return processor
def clean_value(self, field_name, value):
"""Clean individual field values."""
if isinstance(value, str):
# Remove leading/trailing whitespace
value = value.strip()
# Apply field-specific cleaning
if field_name in self.cleaning_rules:
rule = self.cleaning_rules[field_name]
if rule == 'uppercase':
value = value.upper()
elif rule == 'lowercase':
value = value.lower()
elif rule == 'title_case':
value = value.title()
return value
# Usage
cleaning_rules = {
'name': 'title_case',
'email': 'lowercase',
'status': 'uppercase'
}
plugin = DataCleaningPlugin(cleaning_rules)
processor = Processor()
processor = plugin.configure(processor)
Plugin Manager¶
Manage multiple plugins:
class PluginManager:
"""Manage Transmog plugins."""
def __init__(self):
self.plugins = {}
def register_plugin(self, plugin):
"""Register a plugin."""
if plugin.name in self.plugins:
raise ValueError(f"Plugin {plugin.name} already registered")
self.plugins[plugin.name] = plugin
def apply_plugins(self, processor, plugin_names=None):
"""Apply plugins to processor."""
plugins_to_apply = plugin_names or list(self.plugins.keys())
for plugin_name in plugins_to_apply:
if plugin_name in self.plugins:
plugin = self.plugins[plugin_name]
processor = plugin.configure(processor)
return processor
def list_plugins(self):
"""List registered plugins."""
return [(name, plugin.version) for name, plugin in self.plugins.items()]
# Usage
manager = PluginManager()
manager.register_plugin(DataCleaningPlugin())
manager.register_plugin(ValidatingProcessor())
processor = Processor()
processor = manager.apply_plugins(processor, ['data_cleaning'])
Custom Output Formats¶
Creating Custom Writers¶
Implement custom output format writers:
from transmog.io import BaseWriter
class XMLWriter(BaseWriter):
"""Custom XML output writer."""
def __init__(self, config=None):
super().__init__(config)
self.format_name = "xml"
def write_table(self, table_name, records, output_path):
"""Write table to XML format."""
import xml.etree.ElementTree as ET
root = ET.Element("table", name=table_name)
for record in records:
record_element = ET.SubElement(root, "record")
for key, value in record.items():
field_element = ET.SubElement(record_element, "field", name=key)
field_element.text = str(value) if value is not None else ""
tree = ET.ElementTree(root)
tree.write(output_path, encoding="utf-8", xml_declaration=True)
def write_result(self, result, output_dir):
"""Write complete result to XML files."""
import os
# Write main table
main_path = os.path.join(output_dir, f"{result.name}.xml")
self.write_table(result.name, result.main, main_path)
# Write child tables
for table_name, records in result.tables.items():
table_path = os.path.join(output_dir, f"{table_name}.xml")
self.write_table(table_name, records, table_path)
# Register custom writer
from transmog.io import WriterRegistry
WriterRegistry.register_writer("xml", XMLWriter)
# Usage
import transmog as tm
result = tm.flatten(data)
xml_writer = XMLWriter()
xml_writer.write_result(result, "output/xml/")
Custom Format Integration¶
Integrate custom formats with main API:
def flatten_to_xml(data, output_dir, **kwargs):
"""Flatten data and output to XML format."""
import transmog as tm
# Process data
result = tm.flatten(data, **kwargs)
# Write to XML
writer = XMLWriter()
writer.write_result(result, output_dir)
return result
# Usage
result = flatten_to_xml(data, "output/", name="custom_data")
Integration Hooks¶
Pre/Post Processing Hooks¶
Add hooks for custom processing stages:
class HookableProcessor(Processor):
"""Processor with hook support."""
def __init__(self, config=None):
super().__init__(config)
self.pre_hooks = []
self.post_hooks = []
self.field_hooks = {}
def add_pre_hook(self, hook_func):
"""Add pre-processing hook."""
self.pre_hooks.append(hook_func)
def add_post_hook(self, hook_func):
"""Add post-processing hook."""
self.post_hooks.append(hook_func)
def add_field_hook(self, field_pattern, hook_func):
"""Add field-specific hook."""
if field_pattern not in self.field_hooks:
self.field_hooks[field_pattern] = []
self.field_hooks[field_pattern].append(hook_func)
def process(self, data, **kwargs):
"""Process with hooks."""
# Execute pre-hooks
for hook in self.pre_hooks:
data = hook(data)
# Normal processing
result = super().process(data, **kwargs)
# Execute post-hooks
for hook in self.post_hooks:
result = hook(result)
return result
# Example hooks
def data_validation_hook(data):
"""Validate input data."""
if not isinstance(data, (dict, list)):
raise ValueError("Data must be dict or list")
return data
def result_enhancement_hook(result):
"""Enhance processing result."""
# Add custom metadata
if hasattr(result, 'metadata'):
result.metadata['enhanced'] = True
return result
# Usage
processor = HookableProcessor()
processor.add_pre_hook(data_validation_hook)
processor.add_post_hook(result_enhancement_hook)
result = processor.process(data)
Event System¶
Implement event-driven processing:
class EventDrivenProcessor(Processor):
"""Processor with event system."""
def __init__(self, config=None):
super().__init__(config)
self.event_handlers = {}
def on(self, event_name, handler):
"""Register event handler."""
if event_name not in self.event_handlers:
self.event_handlers[event_name] = []
self.event_handlers[event_name].append(handler)
def emit(self, event_name, **event_data):
"""Emit event to handlers."""
if event_name in self.event_handlers:
for handler in self.event_handlers[event_name]:
handler(**event_data)
def process_record(self, record, context):
"""Process single record with events."""
self.emit('record_start', record=record, context=context)
try:
result = super().process_record(record, context)
self.emit('record_success', record=record, result=result, context=context)
return result
except Exception as e:
self.emit('record_error', record=record, error=e, context=context)
raise
# Event handlers
def log_record_start(record, context):
print(f"Processing record: {record.get('id', 'unknown')}")
def log_record_error(record, error, context):
print(f"Error processing record {record.get('id', 'unknown')}: {error}")
# Usage
processor = EventDrivenProcessor()
processor.on('record_start', log_record_start)
processor.on('record_error', log_record_error)
result = processor.process(data)
Testing Extensions¶
Extension Testing Framework¶
Create testing utilities for extensions:
import unittest
from transmog.testing import ProcessorTestCase
class CustomProcessorTestCase(ProcessorTestCase):
"""Test case for custom processors."""
def setUp(self):
self.processor = CustomProcessor()
def test_custom_field_handler(self):
"""Test custom field handler functionality."""
# Register test handler
def test_handler(value, context):
return f"processed_{value}"
self.processor.register_field_handler("test_.*", test_handler)
# Test data
data = {"test_field": "value", "other_field": "value"}
result = self.processor.process(data)
# Assertions
main_record = result.main[0]
self.assertEqual(main_record["test_field"], "processed_value")
self.assertEqual(main_record["other_field"], "value")
def test_error_handling(self):
"""Test error handling in custom processor."""
def error_handler(value, context):
raise ValueError("Test error")
self.processor.register_field_handler("error_.*", error_handler)
data = {"error_field": "value"}
with self.assertRaises(ValueError):
self.processor.process(data)
# Run tests
if __name__ == '__main__':
unittest.main()
Plugin Testing¶
Test plugin functionality:
class PluginTestCase(unittest.TestCase):
"""Test case for plugins."""
def test_data_cleaning_plugin(self):
"""Test data cleaning plugin."""
# Configure plugin
cleaning_rules = {'name': 'title_case'}
plugin = DataCleaningPlugin(cleaning_rules)
# Apply to processor
processor = Processor()
processor = plugin.configure(processor)
# Test data
data = {"name": "john doe", "age": 30}
result = processor.process(data)
# Check cleaning was applied
main_record = result.main[0]
self.assertEqual(main_record["name"], "John Doe")
self.assertEqual(main_record["age"], 30)
def test_plugin_manager(self):
"""Test plugin manager functionality."""
manager = PluginManager()
plugin = DataCleaningPlugin()
# Test registration
manager.register_plugin(plugin)
self.assertIn("data_cleaning", manager.plugins)
# Test duplicate registration
with self.assertRaises(ValueError):
manager.register_plugin(plugin)
# Test plugin application
processor = Processor()
configured_processor = manager.apply_plugins(processor)
# Verify processor was modified
self.assertNotEqual(processor.process_field, configured_processor.process_field)
Documentation for Extensions¶
Documenting Custom Processors¶
Create documentation for custom processors:
class DocumentedProcessor(Processor):
"""
Custom processor with comprehensive documentation.
This processor extends the base Transmog processor with specialized
functionality for [specific use case].
Features:
- Custom field handling for [specific field types]
- Enhanced error reporting
- Integration with [external system]
Example:
>>> processor = DocumentedProcessor(config)
>>> result = processor.process(data)
>>> print(result.main)
Args:
config: TransmogConfig instance or None for default config
custom_param: Additional parameter for [specific functionality]
Attributes:
custom_handlers: Dictionary of registered field handlers
statistics: Processing statistics and metrics
"""
def __init__(self, config=None, custom_param=None):
super().__init__(config)
self.custom_param = custom_param
self.custom_handlers = {}
self.statistics = {"processed_fields": 0, "custom_handled": 0}
def register_field_handler(self, pattern, handler):
"""
Register a custom field handler.
Args:
pattern (str): Regex pattern to match field names
handler (callable): Function to process matching fields
Signature: handler(value, context) -> processed_value
Example:
>>> def timestamp_handler(value, context):
... return parse_timestamp(value)
>>> processor.register_field_handler(r'.*_timestamp', timestamp_handler)
"""
self.custom_handlers[pattern] = handler
This completes the major content for the advanced section. Now I’ll continue with the API reference documentation.