Streaming TB-Scale Financial ETL to Parquet
Stream journal entries from VynFi's NDJSON endpoint, flatten header+lines into one row per line item, and write chunk-batched Parquet files — all without loading the full dataset into memory.
A 10-million-row financial dataset with 5 companies and 12 periods produces roughly 40 GB of JSON. Load that into pandas and you need 80+ GB of RAM (JSON strings double in memory as Python objects). For a TB-scale job — 50 companies, 100M rows — you need a different approach entirely.
This tutorial shows how to stream data from VynFi's NDJSON endpoint and write chunk-batched Parquet files with constant memory. Each chunk is a self-contained Parquet file; at the end, you concatenate them or point your query engine (DuckDB, Spark, Athena) at the directory.
The Pattern: Stream, Flatten, Batch, Write
import os, tempfile, timefrom decimal import Decimal, InvalidOperationfrom pathlib import Pathimport pandas as pdimport vynficlient = vynfi.VynFi(api_key=os.environ["VYNFI_API_KEY"])job = client.jobs.list(status="completed", limit=1).data[0]def to_float(x): if x is None or x == "": return None try: return float(Decimal(str(x))) except InvalidOperation: return NoneCHUNK_SIZE = 200output_dir = Path(tempfile.mkdtemp(prefix="vynfi_etl_"))chunk, chunks_written, total_rows = [], 0, 0for env in client.jobs.stream_ndjson( job.id, file="journal_entries.json", rate=1000, progress_interval=500,): if env.get("type") == "_progress": print(f" [progress] {env['lines_emitted']} lines") continue header = env.get("header", env) lines = env.get("lines", [env]) if "lines" in env else [env] for line in lines: chunk.append({ "document_id": header.get("document_id"), "company_code": header.get("company_code"), "posting_date": header.get("posting_date"), "is_fraud": header.get("is_fraud", False), "gl_account": line.get("gl_account"), "debit_amount": to_float(line.get("debit_amount")), "credit_amount": to_float(line.get("credit_amount")), }) if len(chunk) >= CHUNK_SIZE: out = output_dir / f"chunk_{chunks_written:04d}.parquet" pd.DataFrame(chunk).to_parquet(out, index=False) total_rows += len(chunk) chunks_written += 1 chunk = []# Final partial chunkif chunk: pd.DataFrame(chunk).to_parquet( output_dir / f"chunk_{chunks_written:04d}.parquet", index=False ) total_rows += len(chunk) chunks_written += 1print(f"\nETL: {chunks_written} chunks, {total_rows:,} rows")Reading It Back
# DuckDB can query the directory directly:# SELECT * FROM read_parquet('/tmp/vynfi_etl_*/chunk_*.parquet')# Or concatenate with pandas:files = sorted(output_dir.glob("chunk_*.parquet"))combined = pd.concat([pd.read_parquet(f) for f in files], ignore_index=True)print(f"Total rows: {len(combined):,}")print(f"Total debits: ${combined['debit_amount'].sum():,.2f}")print(f"Balanced: {abs(combined['debit_amount'].sum() - combined['credit_amount'].sum()) < 0.01}")Why Chunk-Batched Parquet?
- Constant memory: each chunk is flushed to disk independently. A 200-row chunk is ~50 KB in RAM.
- Columnar compression: Parquet compresses financial data 3-5x vs JSON. Your 40 GB JSON becomes ~8 GB Parquet.
- Query-engine friendly: DuckDB, Spark, and Athena read Parquet directories natively — no import step.
- Resumable: if the stream breaks, you lose at most one chunk. The files on disk are complete.
- Parallelizable: nothing stops you from running N consumers on N different files (banking_transactions.json, subledger_ap.json, etc.) concurrently.
v2.3.1: Native Numbers Eliminate Conversion
With DataSynth 2.3.1, set `output.numericMode: native` in your generation config. Decimal fields arrive as JSON numbers (`1729237.30`) instead of strings (`"1729237.30"`). The `to_float()` helper above becomes unnecessary — pandas reads the column directly as float64. This alone can cut ETL code by 30% for numeric-heavy pipelines.