Building a Financial Hallucination-Suppression Dataset Pipeline

wy Lv3

Building a Financial Hallucination-Suppression Dataset Pipeline

A companion to The Architecture of Financial Intelligence — this post turns the architecture into an executable pipeline that generates a DPO (Direct Preference Optimization) dataset for suppressing hallucinations in financial LLMs.

Because we're teaching the model a constraint (do not fabricate) rather than a capability, we have to deliberately engineer traps. For every tricky prompt we generate a chosen response (factually grounded or a principled refusal) and a rejected response (a plausible-sounding but entirely fabricated financial answer).

The whole pipeline uses the OpenAI-compatible openai Python client, so the same scripts work for DeepSeek and OpenAI by swapping the base URL and API key.


Pipeline Architecture

Phase Script Purpose
0 00_check_setup.py Pre-flight: env vars, EDGAR reachability, LLM smoke calls, JSON mode
1 01_ingest_edgar.py Pull SEC filings with temporal-safe metadata
2 02_seed_and_evolve.py 500 CFA/FINRA seeds → Evol-Instruct expand to ~5k prompts
3 03_generate_chosen.py Grounded generation + numeric/citation verification
4 04_inject_hallucinations.py Four injection strategies → rejected samples
5 05_judge_and_filter.py Sandboxed arithmetic check + LLM-as-Judge scoring
6 06_format_trl.py Export DPO JSONL (TRL-ready) + full-schema + KTO

Expected yield, mirroring the numbers from the architecture post:

1
2
3
4
5
Stage 1 → ~100 filings × 10 chunks   = 1,000 source chunks
Stage 2 → 1,000 × 7 seeds × 2 evol = 14,000 prompts
Stage 3 → 75% grounded numerics = ~10,500 chosen
Stage 4 → 4-way injection, dedup = ~10,000 pairs
Stage 5 → judge ≥4 / ≥3 = ~7,500 final DPO pairs

Project Layout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
financial-hallucination-pipeline/
├── .env.example
├── .gitignore
├── Makefile
├── run.sh
├── requirements.txt
├── pipeline/
│ ├── shared.py
│ ├── 00_check_setup.py
│ ├── 01_ingest_edgar.py
│ ├── 02_seed_and_evolve.py
│ ├── 03_generate_chosen.py
│ ├── 04_inject_hallucinations.py
│ ├── 05_judge_and_filter.py
│ └── 06_format_trl.py
└── corpus/ # intermediate artifacts

Shared Client (pipeline/shared.py)

Single source of truth for the LLM client and model names. Every script imports from here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
"""Shared OpenAI-compatible client + model names.
Works for both DeepSeek and OpenAI by swapping LLM_BASE_URL + LLM_API_KEY.
"""
import os

try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass

import openai

LLM_API_KEY = os.environ.get("LLM_API_KEY", "")
LLM_BASE_URL = os.environ.get("LLM_BASE_URL", "https://api.deepseek.com/v1")

if not LLM_API_KEY:
raise RuntimeError(
"LLM_API_KEY not set. Copy .env.example to .env and fill it in."
)

client = openai.OpenAI(api_key=LLM_API_KEY, base_url=LLM_BASE_URL)

MODEL_GEN = os.environ.get("MODEL_GEN", "deepseek-chat")
MODEL_REASON = os.environ.get("MODEL_REASON", "deepseek-reasoner")
MODEL_JUDGE = os.environ.get("MODEL_JUDGE", "deepseek-chat")

EDGAR_UA = os.environ.get(
"EDGAR_USER_AGENT",
"yuuee-research research@example.com",
)
CORPUS_DIR = os.environ.get("CORPUS_DIR", "corpus")

Phase 0 — Pre-flight (00_check_setup.py)

Before burning API credits, validate that the environment is ready: deps, env vars, EDGAR reachability, LLM connectivity, and JSON-mode support.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""Phase 0 — Pre-flight check."""
import json, os, sys, time

PASS = "\033[32m✓\033[0m"
FAIL = "\033[31m✗\033[0m"
WARN = "\033[33m!\033[0m"

def _p(tag, msg): print(f" {tag} {msg}")

def check_imports():
print("[1/5] Python dependencies")
ok = True
for mod in ("openai", "requests", "dotenv"):
try:
__import__(mod); _p(PASS, f"import {mod}")
except ImportError as e:
_p(FAIL, f"import {mod}: {e}"); ok = False
return ok

def check_env():
print("[2/5] Environment variables (.env)")
try:
from dotenv import load_dotenv; load_dotenv()
except ImportError: pass
ok = True
for k in ["LLM_API_KEY", "EDGAR_USER_AGENT"]:
v = os.environ.get(k, "")
if not v or "REPLACE" in v or "example.com" in v:
_p(FAIL, f"{k} unset or placeholder"); ok = False
else:
_p(PASS, f"{k} set")
return ok

def check_edgar():
print("[3/5] SEC EDGAR reachability")
import requests
ua = os.environ.get("EDGAR_USER_AGENT", "")
r = requests.get("https://data.sec.gov/submissions/CIK0000320193.json",
headers={"User-Agent": ua}, timeout=15)
if r.status_code == 200 and "filings" in r.json():
_p(PASS, f"EDGAR 200 ({len(r.content)//1024} KB)"); return True
_p(FAIL, f"EDGAR HTTP {r.status_code} — check EDGAR_USER_AGENT"); return False

def check_llm():
print("[4/5] LLM endpoint smoke calls")
import openai
client = openai.OpenAI(
api_key=os.environ.get("LLM_API_KEY", ""),
base_url=os.environ.get("LLM_BASE_URL", "https://api.deepseek.com/v1"),
)
models = {
"MODEL_GEN": os.environ.get("MODEL_GEN", "deepseek-chat"),
"MODEL_REASON": os.environ.get("MODEL_REASON", "deepseek-reasoner"),
"MODEL_JUDGE": os.environ.get("MODEL_JUDGE", "deepseek-chat"),
}
seen = {}
for role, name in models.items(): seen.setdefault(name, role)
ok = True
for name, role in seen.items():
try:
t0 = time.time()
r = client.chat.completions.create(
model=name,
messages=[{"role": "user", "content": "Reply with: pong"}],
max_tokens=5, temperature=0.0,
)
dt = time.time() - t0
_p(PASS, f"{role}={name} ({dt:.1f}s)")
except Exception as e:
_p(FAIL, f"{role}={name}: {e}"); ok = False
return ok

def check_json_mode():
print("[5/5] response_format=json_object support")
import openai
client = openai.OpenAI(
api_key=os.environ.get("LLM_API_KEY", ""),
base_url=os.environ.get("LLM_BASE_URL", "https://api.deepseek.com/v1"),
)
model = os.environ.get("MODEL_GEN", "deepseek-chat")
try:
r = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "Return strict JSON only."},
{"role": "user", "content": 'Return JSON: {"ok": true}'},
],
response_format={"type": "json_object"},
max_tokens=20, temperature=0.0,
)
parsed = json.loads(r.choices[0].message.content)
_p(PASS, f"{model} JSON mode works"); return True
except Exception as e:
_p(FAIL, f"{model} JSON mode failed: {e}"); return False

def main():
results = [
("deps", check_imports()), ("env", check_env()),
("edgar", check_edgar()), ("llm", check_llm()),
("json_mode", check_json_mode()),
]
passed = sum(1 for _, ok in results if ok)
for name, ok in results:
print(f" {PASS if ok else FAIL} {name}")
print(f"\n{passed}/{len(results)} checks passed.")
return 0 if passed == len(results) else 1

if __name__ == "__main__":
sys.exit(main())

Phase 1 — SEC EDGAR Ingestion (01_ingest_edgar.py)

Pulls 10-K / 10-Q filings for a set of tickers and chunks them into ~2k-char excerpts with full source metadata — critical for temporal safety (avoid future-leakage during backtesting) and license provenance (SEC filings are public domain).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Phase 1 — Ground Truth Ingestion from SEC EDGAR."""
import argparse, hashlib, json, re, time
from pathlib import Path
import requests
from shared import EDGAR_UA, CORPUS_DIR

HEADERS = {"User-Agent": EDGAR_UA}

def fetch_submissions(cik):
url = f"https://data.sec.gov/submissions/CIK{str(cik).zfill(10)}.json"
r = requests.get(url, headers=HEADERS, timeout=30)
r.raise_for_status()
return r.json()

def fetch_filing_text(cik, accession, primary_doc):
acc = accession.replace("-", "")
url = f"https://www.sec.gov/Archives/edgar/data/{int(cik)}/{acc}/{primary_doc}"
r = requests.get(url, headers=HEADERS, timeout=60)
r.raise_for_status()
text = re.sub(r"<[^>]+>", " ", r.text)
text = re.sub(r"&[a-z]+;", " ", text)
return re.sub(r"\s+", " ", text).strip()

def ingest(ticker, cik, forms=("10-K", "10-Q"),
max_filings=2, chunks_per_filing=10, chunk_size=2000):
sub = fetch_submissions(cik)
recent = sub["filings"]["recent"]
out, done = [], 0
for i, form in enumerate(recent["form"]):
if form not in forms or done >= max_filings: continue
acc = recent["accessionNumber"][i]
primary = recent["primaryDocument"][i]
filed = recent["filingDate"][i]
try:
text = fetch_filing_text(cik, acc, primary)
except Exception as e:
print(f" ! skip {ticker} {acc}: {e}"); continue
start = min(len(text) // 4, 5000)
end = min(len(text), start + chunk_size * chunks_per_filing)
for j, k in enumerate(range(start, end, chunk_size)):
chunk = text[k:k + chunk_size]
if len(chunk) < 500: continue
doc_id = hashlib.md5(f"{ticker}{acc}{j}".encode()).hexdigest()[:12]
out.append({
"doc_id": doc_id, "text": chunk,
"source_url": f"https://www.sec.gov/Archives/edgar/data/{int(cik)}/{acc.replace('-','')}/{primary}",
"ticker": ticker, "form": form, "filing_date": filed,
"accession": acc, "spdx_license": "public-domain",
"ingestion_ts": int(time.time()),
})
done += 1
time.sleep(0.25) # EDGAR rate-limit courtesy
return out

DEFAULT_TICKERS = [
("AAPL", 320193), ("MSFT", 789019), ("JPM", 19617),
("XOM", 34088), ("NVDA", 1045810), ("BRK-B", 1067983),
]

def main():
ap = argparse.ArgumentParser()
ap.add_argument("--tickers-file")
ap.add_argument("--max-filings", type=int, default=2)
ap.add_argument("--chunks-per-filing", type=int, default=10)
args = ap.parse_args()
tickers = DEFAULT_TICKERS
if args.tickers_file:
tickers = [tuple(x) for x in json.loads(Path(args.tickers_file).read_text())]
Path(CORPUS_DIR).mkdir(parents=True, exist_ok=True)
all_recs = []
for t, cik in tickers:
print(f"Ingesting {t} (CIK {cik})")
recs = ingest(t, cik, args.max_filings, args.chunks_per_filing)
print(f" + {len(recs)} chunks")
all_recs.extend(recs)
out = Path(CORPUS_DIR) / "edgar_chunks.jsonl"
out.write_text("\n".join(json.dumps(r) for r in all_recs))
print(f"\n{len(all_recs)} chunks → {out}")

if __name__ == "__main__":
main()

Phase 2 — Seed & Evol-Instruct (02_seed_and_evolve.py)

Seven seed templates cover the main hallucination surfaces: numeric QA, ratio calcs, YoY changes, regulatory citations, false premises, out-of-bounds asks, and future prediction. Each seed is then evolved by the LLM to add constraints or peer comparisons.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""Phase 2 — Prompt Seeding + Evol-Instruct expansion."""
import argparse, json
from pathlib import Path
from shared import client, MODEL_GEN, CORPUS_DIR

SEEDS = [
{"type": "numeric_qa", "q": "What was {ticker}'s revenue in {period}?"},
{"type": "ratio_calc", "q": "Compute {ticker}'s operating margin for {period}."},
{"type": "yoy_change", "q": "What was {ticker}'s YoY revenue growth in {period}?"},
{"type": "regulatory", "q": "Cite the SEC rule governing {ticker}'s segment reporting disclosure."},
{"type": "false_premise", "q": "Given {ticker}'s Q2 2025 merger with Acme Corp, what was the combined revenue?"},
{"type": "out_of_bounds", "q": "What is {ticker}'s projected free cash flow for FY2030?"},
{"type": "future_predict", "q": "Guarantee {ticker}'s stock price six months after {period}."},
]

EVOLVE_SYSTEM = """You rewrite financial questions to increase difficulty WITHOUT changing
the factual target. Apply ONE transformation:
- add a sub-calculation
- add a constraint (e.g., 'exclude one-time items')
- introduce a comparison to a named industry peer
- require citing the exact line item / section of the filing
Return strict JSON: {"question": "...", "complexity": "L1|L2|L3"}
Keep the question under 60 words."""

def evolve(question):
r = client.chat.completions.create(
model=MODEL_GEN,
messages=[
{"role": "system", "content": EVOLVE_SYSTEM},
{"role": "user", "content": f"Original: {question}"},
],
response_format={"type": "json_object"}, temperature=0.8,
)
return json.loads(r.choices[0].message.content)

def main():
ap = argparse.ArgumentParser()
ap.add_argument("--evolutions", type=int, default=2)
ap.add_argument("--limit-chunks", type=int, default=None)
args = ap.parse_args()
chunks = [json.loads(l) for l in
(Path(CORPUS_DIR) / "edgar_chunks.jsonl").read_text().splitlines()]
if args.limit_chunks: chunks = chunks[:args.limit_chunks]
out = []
for c in chunks:
period = f"{c['form']} filed {c['filing_date']}"
for seed in SEEDS:
base = seed["q"].format(ticker=c["ticker"], period=period)
for _ in range(args.evolutions):
try:
ev = evolve(base)
except Exception: continue
out.append({
"doc_id": c["doc_id"], "ticker": c["ticker"],
"filing_date": c["filing_date"],
"seed_type": seed["type"], "question": ev["question"],
"complexity": ev.get("complexity", "L1"),
})
p = Path(CORPUS_DIR) / "prompts.jsonl"
p.write_text("\n".join(json.dumps(x) for x in out))
print(f"{len(out)} prompts → {p}")

if __name__ == "__main__":
main()

Phase 3 — Grounded Generation (03_generate_chosen.py)

The LLM plays a strict compliance officer: answer only from the source, or explicitly refuse. Every numeric claim is then verified back against the source text via normalized substring match, and at least one citation must trace to the source.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""Phase 3 — Generate the 'chosen' (strictly grounded) response."""
import argparse, json, re
from pathlib import Path
from shared import client, MODEL_REASON, CORPUS_DIR

CHOSEN_SYSTEM = """You are a strict, legally compliant financial AI.

RULES (non-negotiable):
1. Answer ONLY using facts explicitly present in the provided SOURCE.
2. If SOURCE lacks the information, respond exactly:
"The provided source does not contain information to answer this question."
3. If the QUESTION contains a false premise, correct the premise and refuse to speculate.
4. Never predict or guarantee future prices, returns, or events.
5. When citing a number, quote the EXACT figure as it appears in SOURCE.

Return strict JSON:
{
"reasoning": "<step-by-step, each step anchored to a SOURCE quote>",
"answer": "<final response to user>",
"numeric_claims": [{"value": <float>, "unit": "USD|pct|...", "source_span": "<quote>"}],
"citations": ["<verbatim quote from SOURCE>", ...]
}"""

def generate_chosen(source, question):
r = client.chat.completions.create(
model=MODEL_REASON,
messages=[
{"role": "system", "content": CHOSEN_SYSTEM},
{"role": "user", "content": f"SOURCE:\n{source}\n\nQUESTION: {question}"},
],
response_format={"type": "json_object"}, temperature=0.0,
)
return json.loads(r.choices[0].message.content)

def _normalize(s): return re.sub(r"[,\s$%]", "", s)

def verify_numerics(chosen, source):
norm_src = _normalize(source)
for nc in chosen.get("numeric_claims", []) or []:
v = _normalize(str(nc.get("value", "")))
if not v: continue
candidates = {v, v.rstrip("0").rstrip("."), v + "0", v + "00"}
if not any(c and c in norm_src for c in candidates):
return False
return True

def verify_citations(chosen, source):
cites = chosen.get("citations") or []
if not cites: return True
norm_src = re.sub(r"\s+", " ", source).lower()
return any(re.sub(r"\s+", " ", c).lower()[:80] in norm_src for c in cites)

def main():
ap = argparse.ArgumentParser()
ap.add_argument("--limit", type=int, default=None)
args = ap.parse_args()
chunks = {json.loads(l)["doc_id"]: json.loads(l)
for l in (Path(CORPUS_DIR) / "edgar_chunks.jsonl").read_text().splitlines()}
prompts = [json.loads(l) for l in
(Path(CORPUS_DIR) / "prompts.jsonl").read_text().splitlines()]
if args.limit: prompts = prompts[:args.limit]
out, kept, dropped = [], 0, 0
for p in prompts:
src = chunks[p["doc_id"]]["text"]
try:
chosen = generate_chosen(src, p["question"])
except Exception: dropped += 1; continue
if not verify_numerics(chosen, src): dropped += 1; continue
if not verify_citations(chosen, src): dropped += 1; continue
out.append({**p, "source_text": src, "chosen": chosen})
kept += 1
path = Path(CORPUS_DIR) / "chosen.jsonl"
path.write_text("\n".join(json.dumps(x) for x in out))
print(f"Kept {kept}, dropped {dropped}{path}")

if __name__ == "__main__":
main()

Phase 4 — Hallucination Injection (04_inject_hallucinations.py)

Four strategies, cycled deterministically so DPO doesn't overfit a single failure mode:

Strategy Method
numeric_perturbation Scale figures by 10–35% while keeping prose natural
temporal_displacement Answer with figures from a different fiscal period
cross_entity Swap in a competitor's figures from the same period
citation_fabrication Invent a non-existent SEC rule / FASB codification
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Phase 4 — Inject hallucinations to create the 'rejected' response."""
import argparse, itertools, json, random, re
from pathlib import Path
from shared import client, MODEL_GEN, CORPUS_DIR

REJECTED_SYSTEM = """You are an overconfident, highly fluent financial analyst.
Given a QUESTION and a REFERENCE ANSWER, produce a NEW answer that sounds equally
professional and confident but contains the specified HALLUCINATION TYPE.

RULES:
- Never admit uncertainty or refuse.
- Match length, register, and structure of the reference answer.
- Do NOT reuse the reference answer's exact wording verbatim.

Return strict JSON: {"response": "...", "injected_error": "<one-sentence description>"}"""

STRATEGIES = {
"numeric_perturbation": "Scale every monetary or percent figure by a random factor between 10% and 35%.",
"temporal_displacement": "Answer using figures from a DIFFERENT fiscal period while claiming they are the asked period's.",
"cross_entity": "Swap the company's figures with a plausible competitor's figures from the same period.",
"citation_fabrication": "Invent a non-existent SEC rule, FASB codification, or PCAOB standard.",
}

def _perturb_numbers(text):
def swap(m):
raw = m.group()
try:
v = float(raw.replace(",", "").replace("$", "").rstrip("%"))
except ValueError:
return raw
factor = random.choice([1.13, 1.27, 0.84, 0.71, 1.18, 0.89])
new = v * factor
if "%" in raw: return f"{new:.1f}%"
if raw.startswith("$"): return f"${new:,.2f}"
if "," in raw: return f"{new:,.2f}"
return f"{new:.2f}"
return re.sub(r"\$?\d{1,3}(?:,\d{3})+(?:\.\d+)?%?|\$?\d+\.\d+%?|\d+%", swap, text)

def generate_rejected(question, chosen_answer, strategy, source):
if strategy == "numeric_perturbation":
draft = _perturb_numbers(chosen_answer)
user = (f"Polish this perturbed answer naturally. Keep the perturbed numbers.\n\n"
f"QUESTION: {question}\nPERTURBED DRAFT:\n{draft}")
else:
user = (f"QUESTION: {question}\nREFERENCE ANSWER:\n{chosen_answer}\n\n"
f"SOURCE (for realism, do NOT quote correctly):\n{source[:1500]}\n\n"
f"HALLUCINATION TYPE: {strategy}{STRATEGIES[strategy]}")
r = client.chat.completions.create(
model=MODEL_GEN,
messages=[{"role": "system", "content": REJECTED_SYSTEM},
{"role": "user", "content": user}],
response_format={"type": "json_object"}, temperature=0.9,
)
return json.loads(r.choices[0].message.content)

def main():
ap = argparse.ArgumentParser()
ap.add_argument("--limit", type=int, default=None)
args = ap.parse_args()
rows = [json.loads(l) for l in
(Path(CORPUS_DIR) / "chosen.jsonl").read_text().splitlines()]
if args.limit: rows = rows[:args.limit]
cycle = itertools.cycle(STRATEGIES.keys())
out, dropped = [], 0
for row in rows:
strat = next(cycle)
try:
rej = generate_rejected(row["question"], row["chosen"]["answer"],
strat, row["source_text"])
except Exception: dropped += 1; continue
if rej["response"].strip() == row["chosen"]["answer"].strip():
dropped += 1; continue
out.append({**row, "rejected": {
"response": rej["response"],
"hallucination_type": strat,
"injected_error": rej.get("injected_error", ""),
}})
path = Path(CORPUS_DIR) / "pairs.jsonl"
path.write_text("\n".join(json.dumps(x) for x in out))
print(f"{len(out)} pairs ({dropped} dropped) → {path}")

if __name__ == "__main__":
main()

Phase 5 — Dual-Track QC (05_judge_and_filter.py)

  • Track A — Sandboxed arithmetic: every A op B = C statement in the chosen reasoning trace is re-executed with a safe AST evaluator (no eval); divergence beyond tolerance drops the sample.
  • Track B — LLM-as-Judge: independent scorer rates chosen correctness, rejected plausibility (we want this high — low plausibility means trivial preference data), and distinction clarity.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Phase 5 — Dual-track QC."""
import argparse, ast, json, operator as op, re
from pathlib import Path
from shared import client, MODEL_JUDGE, CORPUS_DIR

OPS = {ast.Add: op.add, ast.Sub: op.sub, ast.Mult: op.mul,
ast.Div: op.truediv, ast.USub: op.neg, ast.UAdd: op.pos}

def _safe_eval(expr):
node = ast.parse(expr, mode="eval").body
def _w(n):
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.Num): return n.n
if isinstance(n, ast.BinOp): return OPS[type(n.op)](_w(n.left), _w(n.right))
if isinstance(n, ast.UnaryOp): return OPS[type(n.op)](_w(n.operand))
raise ValueError
return _w(node)

ARITH_RE = re.compile(r"([\d\.]+)\s*([+\-*/])\s*([\d\.]+)\s*=\s*([\d\.]+)")

def arithmetic_consistent(chosen, tol=0.02):
for m in ARITH_RE.finditer((chosen or {}).get("reasoning", "") or ""):
a, o, b, c = m.groups()
try:
got, expected = _safe_eval(f"{a}{o}{b}"), float(c)
except Exception: continue
if abs(got - expected) / max(abs(expected), 1e-9) > tol:
return False
return True

JUDGE_SYSTEM = """Score each axis 1-5 (integers).
chosen_correctness — Is the chosen fully grounded in SOURCE?
rejected_plausibility — Would the rejected fool a non-expert? (HIGH is desirable)
distinction_clarity — Would an expert clearly prefer chosen over rejected?
Return JSON: {"chosen_correctness": 1-5, "rejected_plausibility": 1-5,
"distinction_clarity": 1-5, "notes": "..."}"""

def judge(row):
msg = (f"QUESTION:\n{row['question']}\n\nSOURCE:\n{row['source_text'][:1800]}\n\n"
f"CHOSEN:\n{row['chosen']['answer']}\n\n"
f"REJECTED:\n{row['rejected']['response']}")
r = client.chat.completions.create(
model=MODEL_JUDGE,
messages=[{"role": "system", "content": JUDGE_SYSTEM},
{"role": "user", "content": msg}],
response_format={"type": "json_object"}, temperature=0.0,
)
return json.loads(r.choices[0].message.content)

def main():
ap = argparse.ArgumentParser()
ap.add_argument("--min-chosen", type=int, default=4)
ap.add_argument("--min-rejected", type=int, default=3)
ap.add_argument("--min-clarity", type=int, default=4)
ap.add_argument("--limit", type=int, default=None)
args = ap.parse_args()
rows = [json.loads(l) for l in
(Path(CORPUS_DIR) / "pairs.jsonl").read_text().splitlines()]
if args.limit: rows = rows[:args.limit]
out = []
for row in rows:
if not arithmetic_consistent(row["chosen"]): continue
try: s = judge(row)
except Exception: continue
if (s.get("chosen_correctness", 0) < args.min_chosen or
s.get("rejected_plausibility", 0) < args.min_rejected or
s.get("distinction_clarity", 0) < args.min_clarity): continue
out.append({**row, "judge_scores": s})
path = Path(CORPUS_DIR) / "final.jsonl"
path.write_text("\n".join(json.dumps(x) for x in out))
print(f"{len(out)} pairs passed QC → {path}")

if __name__ == "__main__":
main()

Phase 6 — Export (06_format_trl.py)

Writes three files: a minimal TRL-ready DPO JSONL, a full-schema JSONL with provenance and scores, and a KTO file (unpaired label-per-completion) if you want to try KTO alongside DPO.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""Phase 6 — Export to Hugging Face TRL format."""
import argparse, json
from pathlib import Path
from shared import CORPUS_DIR

def format_all(inp, trl_out, full_out, kto_out):
n_pairs = n_kto = 0
with open(inp) as f, open(trl_out,"w") as fo_trl, \
open(full_out,"w") as fo_full, open(kto_out,"w") as fo_kto:
for i, line in enumerate(f):
row = json.loads(line)
prompt = f"SOURCE:\n{row['source_text']}\n\nQUESTION: {row['question']}"
fo_trl.write(json.dumps({
"prompt": prompt,
"chosen": row["chosen"]["answer"],
"rejected": row["rejected"]["response"],
}) + "\n")
fo_full.write(json.dumps({
"id": f"fin-halluc-{i:05d}",
"prompt": row["question"], "source_doc": row["source_text"],
"source_metadata": {
"ticker": row["ticker"], "filing_date": row["filing_date"],
"doc_id": row["doc_id"], "seed_type": row.get("seed_type"),
"complexity": row.get("complexity"), "spdx_license": "public-domain",
},
"chosen": {
"response": row["chosen"]["answer"],
"reasoning_trace": row["chosen"].get("reasoning", ""),
"numeric_claims": row["chosen"].get("numeric_claims", []),
"citations": row["chosen"].get("citations", []),
"verified": True,
},
"rejected": {
"response": row["rejected"]["response"],
"hallucination_type": row["rejected"]["hallucination_type"],
"injected_error_description": row["rejected"]["injected_error"],
},
"judge_scores": row["judge_scores"],
}) + "\n")
n_pairs += 1
for comp, lbl in [(row["chosen"]["answer"], True),
(row["rejected"]["response"], False)]:
fo_kto.write(json.dumps({
"prompt": prompt, "completion": comp, "label": lbl,
}) + "\n")
n_kto += 1
print(f"DPO: {n_pairs}{trl_out}")
print(f"Full: {n_pairs}{full_out}")
print(f"KTO: {n_kto}{kto_out}")

def main():
ap = argparse.ArgumentParser()
ap.add_argument("--in", dest="inp", default=str(Path(CORPUS_DIR) / "final.jsonl"))
ap.add_argument("--trl", default="financial_hallucination_dpo.jsonl")
ap.add_argument("--full", default="financial_hallucination_dpo_full.jsonl")
ap.add_argument("--kto", default="financial_hallucination_kto.jsonl")
args = ap.parse_args()
format_all(args.inp, args.trl, args.full, args.kto)

if __name__ == "__main__":
main()

Configuration Files

.env.example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Copy to .env and fill in.
LLM_BASE_URL=https://api.deepseek.com/v1
LLM_API_KEY=sk-REPLACE-ME

# OpenAI alternative:
# LLM_BASE_URL=https://api.openai.com/v1
# LLM_API_KEY=sk-REPLACE-ME

MODEL_GEN=deepseek-chat
MODEL_REASON=deepseek-reasoner
MODEL_JUDGE=deepseek-chat

# SEC requires a real email in the User-Agent
EDGAR_USER_AGENT=your-name your-email@example.com

CORPUS_DIR=corpus

requirements.txt

1
2
3
openai>=1.40.0
requests>=2.31.0
python-dotenv>=1.0.0

Makefile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
PY := python3
PIPE := pipeline
CORPUS := corpus

SMOKE_CHUNKS := 3
SMOKE_PROMPTS := 10
SMOKE_PAIRS := 10

.PHONY: help install check smoke all ingest seed chosen reject judge format clean nuke

help:
@echo "install check smoke all ingest seed chosen reject judge format clean nuke"

install:
$(PY) -m pip install -r requirements.txt

check:
cd $(PIPE) && $(PY) 00_check_setup.py

ingest:
cd $(PIPE) && $(PY) 01_ingest_edgar.py

seed:
cd $(PIPE) && $(PY) 02_seed_and_evolve.py

chosen:
cd $(PIPE) && $(PY) 03_generate_chosen.py

reject:
cd $(PIPE) && $(PY) 04_inject_hallucinations.py

judge:
cd $(PIPE) && $(PY) 05_judge_and_filter.py

format:
cd $(PIPE) && $(PY) 06_format_trl.py

all: check ingest seed chosen reject judge format

smoke: check
cd $(PIPE) && $(PY) 01_ingest_edgar.py --max-filings 1 --chunks-per-filing $(SMOKE_CHUNKS)
cd $(PIPE) && $(PY) 02_seed_and_evolve.py --evolutions 1 --limit-chunks 2
cd $(PIPE) && $(PY) 03_generate_chosen.py --limit $(SMOKE_PROMPTS)
cd $(PIPE) && $(PY) 04_inject_hallucinations.py --limit $(SMOKE_PAIRS)
cd $(PIPE) && $(PY) 05_judge_and_filter.py --limit $(SMOKE_PAIRS)
cd $(PIPE) && $(PY) 06_format_trl.py

clean:
rm -f $(CORPUS)/prompts.jsonl $(CORPUS)/chosen.jsonl $(CORPUS)/pairs.jsonl $(CORPUS)/final.jsonl

nuke: clean
rm -f $(CORPUS)/edgar_chunks.jsonl financial_hallucination_*.jsonl

run.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env bash
set -euo pipefail
cd "$(dirname "$0")"
mkdir -p logs
LOG="logs/run-$(date +%Y%m%d-%H%M%S).log"

if [[ ! -f .env ]]; then
echo "ERROR: .env not found. Run: cp .env.example .env && edit it." >&2
exit 1
fi

MODE="${1:-all}"
case "${MODE}" in
check) make check 2>&1 | tee "${LOG}" ;;
smoke) make smoke 2>&1 | tee "${LOG}" ;;
all|"") make all 2>&1 | tee "${LOG}" ;;
*) echo "Unknown mode: ${MODE}. Use: check | smoke | all" >&2; exit 2 ;;
esac

Running It

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 1. Get the folder onto the server:
scp -r financial-hallucination-pipeline user@server:~/

# 2. Set up
cd financial-hallucination-pipeline
cp .env.example .env
vim .env # fill LLM_API_KEY + EDGAR_USER_AGENT
make install

# 3. Pre-flight
./run.sh check

# 4. Smoke test (~10 pairs, cheap)
./run.sh smoke

# 5. Full run
./run.sh

Final outputs:

  • financial_hallucination_dpo.jsonl — feed directly to trl.DPOTrainer
  • financial_hallucination_dpo_full.jsonl — full provenance (source URL, ticker, filing date, reasoning, citations, judge scores)
  • financial_hallucination_kto.jsonl — unpaired (prompt, completion, label) for trl.KTOTrainer

Design Principles Recap

  1. SFT alone is insufficient. SFT teaches what a good financial answer looks like but does not teach the model to prefer grounded answers over fluent confabulations. DPO/KTO targets that gap directly.
  2. Source grounding is mandatory. Every numeric claim must be traceable to the source; Phase 3's verifier enforces this.
  3. Temporal safety is explicit. Every row carries filing_date, so training and evaluation splits can respect point-in-time correctness.
  4. License clarity for commercial deployment. SEC filings are public domain, so the resulting dataset inherits commercially usable provenance.
  5. Balanced hallucination types. Phase 4 cycles through the four strategies deterministically — no single failure mode dominates.

Next step in the alignment stack: feed financial_hallucination_dpo.jsonl into trl.DPOTrainer on top of a SFT-tuned base model, and compare held-out refusal precision against an SFT-only baseline.

  • Title: Building a Financial Hallucination-Suppression Dataset Pipeline
  • Author: wy
  • Created at : 2026-04-24 10:00:00
  • Updated at : 2026-04-24 10:26:30
  • Link: https://yue-ruby-w.site/2026/04/24/Financial-Hallucination-Suppression-Dataset-Pipeline/
  • License: This work is licensed under CC BY-NC-SA 4.0.