Streaming and Batch Processing¶
Stream large datasets directly to files without keeping all data in memory.
flatten_stream()¶
import transmog as tm
# Stream to CSV files — returns list of written paths
files = tm.flatten_stream(
large_data,
output_path="output/",
name="dataset",
output_format="csv"
)
# files: [PosixPath('output/dataset.csv'), ...]
# Stream to compressed Parquet
files = tm.flatten_stream(
large_data,
output_path="output/",
name="dataset",
output_format="parquet",
compression="snappy"
)
# Stream to compressed ORC
files = tm.flatten_stream(
large_data,
output_path="output/",
name="dataset",
output_format="orc",
compression="zstd"
)
# Stream to Avro
files = tm.flatten_stream(
large_data,
output_path="output/",
name="dataset",
output_format="avro",
codec="snappy"
)
flatten() keeps results in memory and returns a FlattenResult object.
flatten_stream() writes directly to disk and returns a list[Path] of written file paths.
Warning
When using flatten_stream(), ensure the output directory has sufficient disk
space for the processed data. Large datasets can generate substantial output files.
Batch Size¶
# Default batch size: 1000 for flatten()
result = tm.flatten(data)
# flatten_stream() defaults to batch_size=100 for memory efficiency
tm.flatten_stream(large_data, "output/")
# Small batches for memory-constrained environments
config = tm.TransmogConfig(batch_size=100)
tm.flatten_stream(large_data, "output/", config=config)
# Large batches for throughput
config = tm.TransmogConfig(batch_size=10000)
result = tm.flatten(data, config=config)
Progress Tracking¶
Track processing progress with a callback:
def on_progress(records_processed, total_records):
if total_records:
pct = records_processed / total_records * 100
print(f"{records_processed}/{total_records} ({pct:.0f}%)")
else:
print(f"{records_processed} records processed")
# Works with both flatten() and flatten_stream()
result = tm.flatten(data, progress_callback=on_progress)
tm.flatten_stream(
large_data,
output_path="output/",
progress_callback=on_progress
)
total_records is the input length when known (list or dict input), otherwise
None (file paths, byte strings, iterators). The callback fires once per batch, so
frequency depends on batch_size.
Using with tqdm¶
from tqdm import tqdm
data = load_data() # list of records
bar = tqdm(total=len(data), unit="rec")
def update_bar(processed, total):
bar.update(processed - bar.n)
result = tm.flatten(data, progress_callback=update_bar)
bar.close()
File Processing¶
All file formats supported by flatten() work with flatten_stream(). JSONL
files are processed line-by-line, making them ideal for streaming large datasets.
See Working with Files for supported formats
and dependency requirements.
tm.flatten_stream("large_file.jsonl", "output/", output_format="parquet")
Output Formats¶
See Output Formats for full details on each format and its options.
tm.flatten_stream(data, "output/", output_format="csv")
tm.flatten_stream(data, "output/", output_format="parquet", compression="snappy")
tm.flatten_stream(data, "output/", output_format="parquet", row_group_size=50000)
tm.flatten_stream(data, "output/", output_format="orc", compression="zstd")
tm.flatten_stream(data, "output/", output_format="avro", codec="snappy")
tm.flatten_stream(data, "output/", output_format="avro", codec="deflate", sync_interval=32000)
Note
The ORC writer accepts a batch_size format option (e.g., batch_size=50000)
that controls how many rows are written per ORC stripe. This is separate from
TransmogConfig.batch_size, which controls how many records are processed per
processing batch.
Examples¶
config = tm.TransmogConfig(batch_size=100)
tm.flatten_stream(
"large_dataset.json",
"processed/",
name="records",
output_format="parquet",
compression="snappy",
config=config
)
ETL Pipeline¶
config = tm.TransmogConfig(batch_size=5000)
tm.flatten_stream("raw_data.jsonl", "transformed/", name="events", output_format="parquet", config=config)