Streaming and Batch Processing

Stream large datasets directly to files without keeping all data in memory.

flatten_stream()

import transmog as tm

# Stream to CSV files — returns list of written paths
files = tm.flatten_stream(
    large_data,
    output_path="output/",
    name="dataset",
    output_format="csv"
)
# files: [PosixPath('output/dataset.csv'), ...]

# Stream to compressed Parquet
files = tm.flatten_stream(
    large_data,
    output_path="output/",
    name="dataset",
    output_format="parquet",
    compression="snappy"
)

# Stream to compressed ORC
files = tm.flatten_stream(
    large_data,
    output_path="output/",
    name="dataset",
    output_format="orc",
    compression="zstd"
)

# Stream to Avro
files = tm.flatten_stream(
    large_data,
    output_path="output/",
    name="dataset",
    output_format="avro",
    codec="snappy"
)

flatten() keeps results in memory and returns a FlattenResult object. flatten_stream() writes directly to disk and returns a list[Path] of written file paths.

Warning

When using flatten_stream(), ensure the output directory has sufficient disk space for the processed data. Large datasets can generate substantial output files.

Batch Size

# Default batch size: 1000 for flatten()
result = tm.flatten(data)

# flatten_stream() defaults to batch_size=100 for memory efficiency
tm.flatten_stream(large_data, "output/")

# Small batches for memory-constrained environments
config = tm.TransmogConfig(batch_size=100)
tm.flatten_stream(large_data, "output/", config=config)

# Large batches for throughput
config = tm.TransmogConfig(batch_size=10000)
result = tm.flatten(data, config=config)

Progress Tracking

Track processing progress with a callback:

def on_progress(records_processed, total_records):
    if total_records:
        pct = records_processed / total_records * 100
        print(f"{records_processed}/{total_records} ({pct:.0f}%)")
    else:
        print(f"{records_processed} records processed")

# Works with both flatten() and flatten_stream()
result = tm.flatten(data, progress_callback=on_progress)

tm.flatten_stream(
    large_data,
    output_path="output/",
    progress_callback=on_progress
)

total_records is the input length when known (list or dict input), otherwise None (file paths, byte strings, iterators). The callback fires once per batch, so frequency depends on batch_size.

Using with tqdm

from tqdm import tqdm

data = load_data()  # list of records
bar = tqdm(total=len(data), unit="rec")

def update_bar(processed, total):
    bar.update(processed - bar.n)

result = tm.flatten(data, progress_callback=update_bar)
bar.close()

File Processing

All file formats supported by flatten() work with flatten_stream(). JSONL files are processed line-by-line, making them ideal for streaming large datasets. See Working with Files for supported formats and dependency requirements.

tm.flatten_stream("large_file.jsonl", "output/", output_format="parquet")

Output Formats

See Output Formats for full details on each format and its options.

tm.flatten_stream(data, "output/", output_format="csv")
tm.flatten_stream(data, "output/", output_format="parquet", compression="snappy")
tm.flatten_stream(data, "output/", output_format="parquet", row_group_size=50000)
tm.flatten_stream(data, "output/", output_format="orc", compression="zstd")
tm.flatten_stream(data, "output/", output_format="avro", codec="snappy")
tm.flatten_stream(data, "output/", output_format="avro", codec="deflate", sync_interval=32000)

Note

The ORC writer accepts a batch_size format option (e.g., batch_size=50000) that controls how many rows are written per ORC stripe. This is separate from TransmogConfig.batch_size, which controls how many records are processed per processing batch.

Examples

config = tm.TransmogConfig(batch_size=100)
tm.flatten_stream(
    "large_dataset.json",
    "processed/",
    name="records",
    output_format="parquet",
    compression="snappy",
    config=config
)

ETL Pipeline

config = tm.TransmogConfig(batch_size=5000)
tm.flatten_stream("raw_data.jsonl", "transformed/", name="events", output_format="parquet", config=config)