Real-Time Fraud Pipelines: From Synthetic Labels to SIEM Alerts
Build a back-pressured fraud alerting pipeline that streams AML labels from VynFi, classifies typologies, and dispatches to Slack/PagerDuty/SIEM without overwhelming downstream rate limits.
Production AML systems don't batch-process yesterday's alerts at 9 AM. They stream. A transaction hits the wire, the model scores it, and if the score exceeds the threshold, an alert lands in the compliance queue within seconds. The challenge isn't the model — it's the plumbing: rate-limited downstream APIs (Slack posts 1/sec, ServiceNow 5/sec), bursty upstream data, and the need for back-pressure so your pipeline doesn't drown the consumer.
This tutorial builds that pipeline using VynFi's NDJSON streaming endpoint and Python. The data is synthetic — fully labeled with ground-truth AML typologies — but the architecture is production-grade. Swap the simulated Slack webhook for a real one and you have a working compliance alert fanout.
**DataSynth 3.1.1 update:** AML typology coverage jumped from 0.000 (silent failure in 3.1.0 — Debug-cased type names broke the evaluator's lowercase matcher) to **0.857**, passing the ≥0.80 coverage threshold. All seven canonical typologies now appear (structuring / funnel / layering / mule / round_tripping / fraud / spoofing). Network density is up 38× with mule_link and shell_link edges populating from coordinated criminal structures. Plug the pipeline into the regenerated VynFi/vynfi-sar-narratives or VynFi/vynfi-aml-100k for a realistic SIEM playback without a generation job.
The Architecture
Three components: (1) VynFi streams AML anomaly labels at up to 500 lines/sec via NDJSON. (2) A Python consumer classifies each label by typology category and applies a filter policy (only Fraud and Statistical anomalies trigger alerts). (3) A token-bucket rate limiter ensures downstream consumers never receive more than their capacity allows.
Token-Bucket Rate Limiter
The key abstraction is a simple token-bucket that sleeps when the bucket is empty. This is the same pattern used by VynFi's server-side NDJSON streaming — we mirror it on the consumer side to protect downstream APIs.
import timefrom collections import Counterclass RateLimiter: """Token-bucket rate limiter for downstream consumer fan-out.""" def __init__(self, rate_per_sec: float) -> None: self.rate = rate_per_sec self.tokens = rate_per_sec self.last = time.monotonic() def take(self) -> None: now = time.monotonic() self.tokens = min(self.rate, self.tokens + (now - self.last) * self.rate) self.last = now if self.tokens < 1: time.sleep((1 - self.tokens) / self.rate) self.tokens = 0 else: self.tokens -= 1# Simulate Slack: 2 alerts/secdownstream = RateLimiter(rate_per_sec=2.0)Streaming + Classification + Dispatch
The VynFi Python SDK's `stream_ndjson()` method returns an iterator of self-describing JSON envelopes. We pull from the anomaly-labels file, classify by typology category, and dispatch alerts for Fraud and Statistical anomalies — all while respecting the downstream rate limit.
import vynficlient = vynfi.VynFi(api_key=os.environ["VYNFI_API_KEY"])job = client.jobs.list(status="completed", limit=1).data[0]categories: Counter = Counter()alerts_sent = 0for env in client.jobs.stream_ndjson( job.id, file="labels/anomaly_labels.jsonl", rate=500, progress_interval=200,): if env.get("type") == "_progress": continue atype = env.get("anomaly_type", {}) category = next(iter(atype), "unknown") if isinstance(atype, dict) else str(atype) categories[category] += 1 if category in ("Fraud", "Statistical"): downstream.take() # respect Slack/SIEM rate limit doc_id = str(env.get("document_id", "?"))[:30] print(f" [ALERT] {category} doc={doc_id}") alerts_sent += 1print(f"\nDispatched {alerts_sent} alerts")print("Category breakdown:", dict(categories.most_common()))Why Synthetic Labels Matter
You can't test an alerting pipeline without data that contains known alerts. Real SAR labels are legally restricted and sparse — maybe 0.1% of transactions in a year. VynFi's AML module generates 14 typologies with configurable rates, ground-truth explanations on every suspicious transaction, and false-positive injection so your pipeline also handles near-miss noise.
The streaming pattern shown here works identically whether the data is synthetic (for development) or production (via a Kafka topic). The rate limiter, classification logic, and dispatch code are the same. Only the source iterator changes.
Production Considerations
- Replace the simulated `send_alert()` with your real Slack/PagerDuty/ServiceNow webhook.
- Add dead-letter logging for alerts that fail dispatch (network errors, 429s).
- Use `progress_interval` envelopes to checkpoint your consumer offset — if the pipeline restarts, resume from the last checkpoint.
- For Kafka fanout, use the streaming_aggregator.py pattern instead — it maintains running counters with O(1) memory.
- Scale tier is required for NDJSON streaming. Developer/Team tiers can achieve similar results by downloading the archive and iterating locally.