Process Mining with Synthetic Manufacturing Data and OCEL 2.0
Before Six Sigma consultants spend months mapping your processes, let process mining show you where the bottlenecks are. Here is how to do it with VynFi's manufacturing event logs.
Before Six Sigma consultants spend months mapping your processes, process mining can show you where the bottlenecks are in hours. Instead of hand-drawn process models that describe how processes are supposed to work, process mining extracts models directly from event logs — showing how processes actually execute. The gap between the two is usually where the problems are.
VynFi's manufacturing sector generates OCEL 2.0 (Object-Centric Event Logs) where a single event can reference multiple business objects. This is a richer representation than traditional single-case event logs, because real manufacturing processes rarely follow a single thread — one production order consumes multiple materials, one invoice covers multiple receipts, one payment settles multiple invoices.
This tutorial walks through the process mining notebook: load an event log from a synthetic manufacturing dataset, discover process variants, detect bottlenecks, analyze organizational handovers, and export to PM4Py-compatible XES format for further analysis.
**DataSynth 3.1.1 update:** Skip the generation step — the regenerated VynFi/vynfi-ocel-manufacturing and VynFi/vynfi-supply-chain-ocel Hugging Face datasets are already native OCEL 2.0 with microsecond timestamps (pandas-safe, 100% row retention) and realistic variant imperfection rates (rework 15% / skip 10% / out-of-order 8%). Load directly via `datasets.load_dataset("VynFi/vynfi-ocel-manufacturing", "events")`.
Generate Manufacturing Process Data
Request the p2p, s2c (source-to-contract), and manufacturing process models to get the full suite of manufacturing artifacts. The manufacturing model generates OCEL event data alongside the standard document flow files.
import osfrom vynfi import VynFiclient = VynFi(api_key=os.environ["VYNFI_API_KEY"])config = { "sector": "manufacturing", "country": "US", "accountingFramework": "us_gaap", "rows": 1000, "companies": 5, "periods": 3, "periodLength": "monthly", "processModels": ["p2p", "s2c", "manufacturing"], "exportFormat": "json", "fraudPacks": [], "fraudRate": 0.0,}job = client.jobs.generate_config(config=config)completed = client.jobs.wait(job.id)archive = client.jobs.download_archive(completed.id)# The archive includes:# - ocel-event-log table (OCEL 2.0 native) if available# - document_flows/ for reconstruction when native OCEL is absent# - events/organizational_events.json# - events/process_evolution_events.jsonprint("Archive contents:")for f in archive.files(): print(f" {f}")Build the Directly-Follows Graph
The directly-follows graph (DFG) is the foundation of process discovery. For each pair of consecutive activities in a case, the DFG counts how many times activity A is directly followed by activity B. This produces the transition matrix from which process models are extracted.
import pandas as pddef build_dfg(df: pd.DataFrame) -> pd.DataFrame: """Build a directly-follows graph from an event log. For each case, events are sorted by timestamp and consecutive pairs (A -> B) are counted. Returns DataFrame with columns: source, target, count. """ transitions = [] for _case_id, case_events in df.groupby("case_id"): sorted_events = case_events.sort_values("timestamp") activities = sorted_events["activity"].tolist() for i in range(len(activities) - 1): transitions.append((activities[i], activities[i + 1])) if not transitions: return pd.DataFrame(columns=["source", "target", "count"]) dfg = pd.DataFrame(transitions, columns=["source", "target"]) dfg = dfg.groupby(["source", "target"]).size().reset_index(name="count") return dfg.sort_values("count", ascending=False).reset_index(drop=True)dfg = build_dfg(events_df)print("Top 10 transitions in directly-follows graph:")print(dfg.head(10).to_string(index=False))Extract Process Variants
A process variant is the unique sequence of activities observed for a case. The most common variant is the happy path — the intended process. Everything else is a deviation. Variant analysis answers three questions: what is the happy path, how much deviation exists, and what causes deviations.
from collections import Counterdef extract_variants(df: pd.DataFrame) -> pd.DataFrame: """Extract the activity sequence (variant) for each case.""" case_variants = {} for case_id, case_events in df.groupby("case_id"): sorted_acts = case_events.sort_values("timestamp")["activity"].tolist() case_variants[case_id] = tuple(sorted_acts) variant_counts = Counter(case_variants.values()) total = sum(variant_counts.values()) rows, cumulative = [], 0.0 for variant, count in variant_counts.most_common(): pct = count / total * 100 cumulative += pct rows.append({"variant": variant, "count": count, "pct": round(pct, 1), "cumulative_pct": round(cumulative, 1), "length": len(variant)}) return pd.DataFrame(rows)variants_df = extract_variants(events_df)print(f"Total unique variants: {len(variants_df)}")print(f"Total cases: {variants_df['count'].sum()}")# Happy pathhappy = variants_df.iloc[0]print(f"Happy path ({happy['count']} cases, {happy['pct']}%):")print(f" {' -> '.join(happy['variant'])}")# Conformance ratesfor n in [1, 3, 5]: cases_in_top_n = variants_df.head(n)["count"].sum() rate = cases_in_top_n / variants_df["count"].sum() * 100 print(f"Conformance rate (top {n} variant{'s' if n > 1 else ''}): {rate:.1f}%")Low conformance rates (below 50% in the top 5 variants) suggest an uncontrolled process where many ad-hoc paths exist. In manufacturing, this often indicates bypassed approval steps, rework loops, or exceptions that were handled informally rather than through the standard workflow.
Bottleneck Detection
Bottlenecks are transitions where cases spend disproportionate time waiting. The compute_waiting_times function calculates the sojourn time between each consecutive pair of activities in a case, aggregated across all cases to find the transitions with the highest mean wait.
def compute_waiting_times(df: pd.DataFrame) -> pd.DataFrame: """Compute waiting time between consecutive activities in each case. Returns a DataFrame with columns: source, target, waiting_hours. """ records = [] for _case_id, case_events in df.groupby("case_id"): sorted_events = case_events.sort_values("timestamp") timestamps = sorted_events["timestamp"].tolist() activities = sorted_events["activity"].tolist() for i in range(len(activities) - 1): dt = (timestamps[i + 1] - timestamps[i]).total_seconds() / 3600 records.append({"source": activities[i], "target": activities[i + 1], "waiting_hours": dt}) return pd.DataFrame(records)waiting_df = compute_waiting_times(events_df)transition_stats = waiting_df.groupby(["source", "target"])["waiting_hours"].agg( ["mean", "median", "count"]).reset_index()transition_stats.columns = ["source", "target", "mean_h", "median_h", "count"]transition_stats = transition_stats.sort_values("mean_h", ascending=False)print("Top 10 bottleneck transitions by mean waiting time:")print(transition_stats.head(10).to_string(index=False, float_format="%.1f"))In manufacturing P2P data, the GR-to-invoice transition is typically the largest bottleneck — goods are received but vendor invoices take days or weeks to arrive. The approval-to-payment transition is often the second largest, reflecting payment terms and treasury cycle times. These are the transitions where process improvement has the highest leverage.
Case Duration Analysis
Case-level statistics — duration from first to last event, event count, and completion rate — reveal outliers at the process instance level rather than the transition level. Cases with unusually long durations are potential bottleneck victims. Cases with unusually high event counts may have rework loops. Cases that never reach a terminal activity represent process leakage.
case_stats = events_df.groupby("case_id").agg( first_event=("timestamp", "min"), last_event=("timestamp", "max"), event_count=("event_id", "count"), unique_activities=("activity", "nunique"), first_activity=("activity", "first"), last_activity=("activity", "last"),)case_stats["duration_hours"] = ( (case_stats["last_event"] - case_stats["first_event"]).dt.total_seconds() / 3600)print(f"Total cases: {len(case_stats)}")print(f"Duration statistics (hours):")print(case_stats["duration_hours"].describe().round(2))print("5 slowest cases:")for case_id, row in case_stats.nlargest(5, "duration_hours").iterrows(): print(f" {case_id}: {row['duration_hours']:.1f}h | " f"{row['event_count']} events | " f"{row['first_activity']} -> {row['last_activity']}")Organizational Mining
The handover-of-work analysis builds a matrix showing how many times each resource hands work to every other resource in the same case. Dense handover patterns between two resources indicate either a strong working relationship or a dependency that could become a bottleneck when one resource is unavailable.
def build_handover_matrix(df: pd.DataFrame) -> pd.DataFrame: """Build a resource handover matrix from an event log.""" handovers = [] for _case_id, case_events in df.groupby("case_id"): sorted_events = case_events.sort_values("timestamp") resources = sorted_events["resource"].tolist() for i in range(len(resources) - 1): if resources[i] != resources[i + 1]: # only count actual handovers handovers.append((resources[i], resources[i + 1])) if not handovers: return pd.DataFrame(columns=["from_resource", "to_resource", "count"]) ho_df = pd.DataFrame(handovers, columns=["from_resource", "to_resource"]) return (ho_df.groupby(["from_resource", "to_resource"]) .size().reset_index(name="count") .sort_values("count", ascending=False))if "resource" in events_df.columns: handovers = build_handover_matrix(events_df) print("Top 15 handover patterns:") print(handovers.head(15).to_string(index=False))Export for Process Mining Tools
For production process mining, dedicated tools provide algorithms that go beyond what you can build from scratch in pandas. VynFi event data can be exported to three formats: XES (for PM4Py, ProM, and Disco), CSV (for Celonis and ARIS), and OCEL 2.0 JSON (for PM4Py's object-centric mining module).
def to_xes(df: pd.DataFrame, output_path: str) -> None: """Export a pandas event log to XES format (IEEE 1849-2016).""" lines = [ '<?xml version="1.0" encoding="UTF-8"?>', '<log xes.version="2.0" xmlns="http://www.xes-standard.org/">', ' <extension name="Concept" prefix="concept" uri="http://www.xes-standard.org/concept.xesext"/>', ' <extension name="Time" prefix="time" uri="http://www.xes-standard.org/time.xesext"/>', ' <extension name="Organizational" prefix="org" uri="http://www.xes-standard.org/org.xesext"/>', ] for case_id, case_events in df.groupby("case_id"): lines.append(f' <trace>') lines.append(f' <string key="concept:name" value="{case_id}"/>') for _, event in case_events.sort_values("timestamp").iterrows(): ts = event["timestamp"].isoformat() if hasattr(event["timestamp"], "isoformat") else str(event["timestamp"]) lines.append(f' <event>') lines.append(f' <string key="concept:name" value="{event["activity"]}"/>') lines.append(f' <date key="time:timestamp" value="{ts}"/>') if "resource" in event.index and pd.notna(event.get("resource")): lines.append(f' <string key="org:resource" value="{event["resource"]}"/>') lines.append(f' </event>') lines.append(f' </trace>') lines.append('</log>') with open(output_path, "w", encoding="utf-8") as f: f.write("".join(lines)) print(f"Exported {len(df)} events to {output_path}")to_xes(events_df, "vynfi_manufacturing.xes")# If PM4Py is installed, discover a Petri net directlytry: import pm4py pm4py_df = events_df.rename(columns={ "case_id": "case:concept:name", "activity": "concept:name", "timestamp": "time:timestamp", }) net, im, fm = pm4py.discover_petri_net_inductive(pm4py_df) fitness = pm4py.fitness_token_based_replay(pm4py_df, net, im, fm) print(f"Petri net: {len(net.places)} places, {len(net.transitions)} transitions") print(f"Fitness: {fitness['average_trace_fitness']:.3f}")except ImportError: print("Install pm4py for Petri net discovery: pip install pm4py")Set fraudRate > 0 in your generation config and enable process mining on the resulting data. Fraudulent process variants — unusual activity sequences, bypassed approval steps, or entries posted by resources outside their normal role — will appear as low-frequency deviants from the happy path. Process mining surfaces these anomalies without requiring labeled data.
Next Steps
The manufacturing notebook covers temporal drift detection (how process behavior changes across the three monthly periods), role discovery (clustering resources by the activities they perform), and object-centric interaction analysis (which object types appear together in the same case). For production deployments, the Celonis EMS and Minit connectors accept the CSV export format directly, with the case_id, activity, and timestamp columns mapping to their standard schema.
The full notebook is available at 05_process_mining_ocel.ipynb in the VynFi Python SDK repository. It includes PM4Py integration, XES/OCEL 2.0 export, and organizational mining with handover matrices.