Output Formats¶
Supported formats: CSV, Parquet, ORC, and Avro.
Saving Results¶
Auto-Detection¶
import transmog as tm
result = tm.flatten(data, name="products")
# Auto-detect format from extension
result.save("output.csv") # CSV
result.save("output.parquet") # Parquet
result.save("output.orc") # ORC
result.save("output.avro") # Avro
Explicit Format¶
# Specify format explicitly
result.save("output", output_format="csv")
result.save("output", output_format="parquet")
result.save("output", output_format="orc")
result.save("output", output_format="avro")
Multiple Tables¶
When results contain child tables, save to a directory:
# Save to directory (multiple tables)
result.save("output/")
# Creates: output/products.csv, output/products_reviews.csv
# Single table to file
result.save("output/products.csv")
CSV Output¶
result = tm.flatten(data, name="products")
result.save("output.csv")
Tip
For consistent CSV columns across all rows, use include_nulls=True in
TransmogConfig. This ensures fields that are missing in some records appear
as empty strings in the CSV output.
# Include nulls for consistent columns
config = tm.TransmogConfig(include_nulls=True)
result = tm.flatten(data, config=config)
result.save("output.csv")
Custom options:
import csv
# Custom delimiter and quoting
result.save(
"output.csv",
delimiter="|",
quoting=csv.QUOTE_ALL
)
Streaming CSV supports the same options plus schema_drift:
tm.flatten_stream(
data, "output/",
output_format="csv",
delimiter="|",
quotechar="'",
include_header=True, # Include column headers (default: True)
schema_drift="drop", # Handle schema drift (default: "strict")
)
Schema Drift¶
When using flatten_stream() with CSV output, the column schema is locked after
the first batch of records. By default, any subsequent batch containing fields
not present in the original schema raises an OutputError.
The schema_drift parameter controls this behavior:
Mode |
Behavior |
|---|---|
|
Raise |
|
Log a warning and drop unexpected fields; write remaining known fields |
import transmog as tm
# Drop unexpected fields instead of raising
tm.flatten_stream(data, "output/", name="events", output_format="csv", schema_drift="drop")
Note
An "extend" mode (rewriting headers to add new columns) is not supported.
Streaming CSV headers are already emitted to the destination and cannot be
rewritten for arbitrary outputs (stdout, binary streams, pipes).
Parquet Output¶
result = tm.flatten(data, name="products")
result.save("output.parquet")
# Compression options
result.save("output.parquet", compression="snappy") # Default
result.save("output.parquet", compression="gzip")
result.save("output.parquet", compression="brotli")
result.save("output.parquet", compression=None)
ORC Output¶
result = tm.flatten(data, name="products")
result.save("output.orc")
# Compression options
result.save("output.orc", compression="zstd") # Default
result.save("output.orc", compression="snappy")
result.save("output.orc", compression="lz4")
result.save("output.orc", compression="zlib")
Avro Output¶
result = tm.flatten(data, name="products")
result.save("output.avro")
# Compression options (codec parameter)
result.save("output.avro", codec="snappy") # Default (via cramjam)
result.save("output.avro", codec="deflate") # Built-in compression
result.save("output.avro", codec="null") # No compression
result.save("output.avro", codec="bzip2") # Via cramjam
result.save("output.avro", codec="xz") # Via cramjam
# Additional codecs (require separate package installations):
# codec="zstandard" # Requires: pip install zstandard
# codec="lz4" # Requires: pip install lz4
# Advanced: customize sync interval (bytes between sync markers)
result.save("output.avro", codec="snappy", sync_interval=32000)
Note
Codec Dependencies:
To use additional codecs beyond the defaults:
zstandard:
pip install zstandardlz4:
pip install lz4
Technical detail: While cramjam (included by default) bundles these compression algorithms, fastavro’s codec interface requires the standalone packages to expose them.
Avro Schema Inference¶
Avro schemas are automatically inferred from your data:
data = [
{"name": "Alice", "age": 30, "score": 95.5},
{"name": "Bob", "age": None, "score": 88.0}
]
result = tm.flatten(data, name="users")
result.save("output.avro")
Schema inference behavior:
Field types are detected from values (string, long, double, boolean, bytes)
Nullable fields use Avro union types:
["null", "type"]NaN and Infinity float values are automatically converted to null
Mixed types in a field result in union types with multiple type options
Schema is locked after the first batch in streaming mode
Warning
When using flatten_stream() with Avro output, the schema is determined from the
first batch of records. If subsequent batches contain new fields not present in
the first batch, a schema drift error will be raised.
Null Handling¶
data = {"name": "Product", "description": None, "notes": ""}
config = tm.TransmogConfig(include_nulls=False)
result = tm.flatten(data, config=config)
print(result.main[0])
# {'name': 'Product'}
config = tm.TransmogConfig(include_nulls=True)
result = tm.flatten(data, config=config)
print(result.main[0])
# {'name': 'Product', 'description': None, 'notes': None}
Integration Examples¶
PostgreSQL¶
config = tm.TransmogConfig(include_nulls=True)
result = tm.flatten(data, name="customers", config=config)
result.save("import/")
PyArrow¶
result = tm.flatten(data, name="sales")
result.save("analysis.parquet")
import pyarrow.parquet as pq
table = pq.read_table("analysis.parquet")
Polars¶
result = tm.flatten(data, name="sales")
result.save("analysis.parquet")
import polars as pl
df = pl.read_parquet("analysis.parquet")
DuckDB¶
result = tm.flatten(data, name="transactions")
result.save("data.parquet")
import duckdb
df = duckdb.connect().execute("SELECT * FROM 'data.parquet'").df()