Early accessSome features may be unavailable
Back to Blog
AMLfraud detectionstreamingSIEMreal-timePythoncomplianceDataSynth 3.1.1

Real-Time Fraud Pipelines: From Synthetic Labels to SIEM Alerts

Build a back-pressured fraud alerting pipeline that streams AML labels from VynFi, classifies typologies, and dispatches to Slack/PagerDuty/SIEM without overwhelming downstream rate limits.

VynFi Team · EngineeringApril 13, 202611 min read

Production AML systems don't batch-process yesterday's alerts at 9 AM. They stream. A transaction hits the wire, the model scores it, and if the score exceeds the threshold, an alert lands in the compliance queue within seconds. The challenge isn't the model — it's the plumbing: rate-limited downstream APIs (Slack posts 1/sec, ServiceNow 5/sec), bursty upstream data, and the need for back-pressure so your pipeline doesn't drown the consumer.

This tutorial builds that pipeline using VynFi's NDJSON streaming endpoint and Python. The data is synthetic — fully labeled with ground-truth AML typologies — but the architecture is production-grade. Swap the simulated Slack webhook for a real one and you have a working compliance alert fanout.

**DataSynth 3.1.1 update:** AML typology coverage jumped from 0.000 (silent failure in 3.1.0 — Debug-cased type names broke the evaluator's lowercase matcher) to **0.857**, passing the ≥0.80 coverage threshold. All seven canonical typologies now appear (structuring / funnel / layering / mule / round_tripping / fraud / spoofing). Network density is up 38× with mule_link and shell_link edges populating from coordinated criminal structures. Plug the pipeline into the regenerated VynFi/vynfi-sar-narratives or VynFi/vynfi-aml-100k for a realistic SIEM playback without a generation job.

The Architecture

Three components: (1) VynFi streams AML anomaly labels at up to 500 lines/sec via NDJSON. (2) A Python consumer classifies each label by typology category and applies a filter policy (only Fraud and Statistical anomalies trigger alerts). (3) A token-bucket rate limiter ensures downstream consumers never receive more than their capacity allows.

Token-Bucket Rate Limiter

The key abstraction is a simple token-bucket that sleeps when the bucket is empty. This is the same pattern used by VynFi's server-side NDJSON streaming — we mirror it on the consumer side to protect downstream APIs.

Python
import time
from collections import Counter
class RateLimiter:
"""Token-bucket rate limiter for downstream consumer fan-out."""
def __init__(self, rate_per_sec: float) -> None:
self.rate = rate_per_sec
self.tokens = rate_per_sec
self.last = time.monotonic()
def take(self) -> None:
now = time.monotonic()
self.tokens = min(self.rate, self.tokens + (now - self.last) * self.rate)
self.last = now
if self.tokens < 1:
time.sleep((1 - self.tokens) / self.rate)
self.tokens = 0
else:
self.tokens -= 1
# Simulate Slack: 2 alerts/sec
downstream = RateLimiter(rate_per_sec=2.0)

Streaming + Classification + Dispatch

The VynFi Python SDK's `stream_ndjson()` method returns an iterator of self-describing JSON envelopes. We pull from the anomaly-labels file, classify by typology category, and dispatch alerts for Fraud and Statistical anomalies — all while respecting the downstream rate limit.

Python
import vynfi
client = vynfi.VynFi(api_key=os.environ["VYNFI_API_KEY"])
job = client.jobs.list(status="completed", limit=1).data[0]
categories: Counter = Counter()
alerts_sent = 0
for env in client.jobs.stream_ndjson(
job.id,
file="labels/anomaly_labels.jsonl",
rate=500,
progress_interval=200,
):
if env.get("type") == "_progress":
continue
atype = env.get("anomaly_type", {})
category = next(iter(atype), "unknown") if isinstance(atype, dict) else str(atype)
categories[category] += 1
if category in ("Fraud", "Statistical"):
downstream.take() # respect Slack/SIEM rate limit
doc_id = str(env.get("document_id", "?"))[:30]
print(f" [ALERT] {category} doc={doc_id}")
alerts_sent += 1
print(f"\nDispatched {alerts_sent} alerts")
print("Category breakdown:", dict(categories.most_common()))

Why Synthetic Labels Matter

You can't test an alerting pipeline without data that contains known alerts. Real SAR labels are legally restricted and sparse — maybe 0.1% of transactions in a year. VynFi's AML module generates 14 typologies with configurable rates, ground-truth explanations on every suspicious transaction, and false-positive injection so your pipeline also handles near-miss noise.

The streaming pattern shown here works identically whether the data is synthetic (for development) or production (via a Kafka topic). The rate limiter, classification logic, and dispatch code are the same. Only the source iterator changes.

Production Considerations

  • Replace the simulated `send_alert()` with your real Slack/PagerDuty/ServiceNow webhook.
  • Add dead-letter logging for alerts that fail dispatch (network errors, 429s).
  • Use `progress_interval` envelopes to checkpoint your consumer offset — if the pipeline restarts, resume from the last checkpoint.
  • For Kafka fanout, use the streaming_aggregator.py pattern instead — it maintains running counters with O(1) memory.
  • Scale tier is required for NDJSON streaming. Developer/Team tiers can achieve similar results by downloading the archive and iterating locally.

Ready to try VynFi?

Start generating synthetic financial data with 10,000 free credits. No credit card required.