揭秘工业级 Text-to-SQL 系统:3800行代码背后的工程智慧
摘要:本文深入剖析一个近4000行的生产级SQL生成与验证系统,揭示其如何通过多轮反思、并行候选、智能裁判、断点续跑等机制,实现从自然语言到可执行SQL的高精度转换。我们将拆解其核心架构、关键技术点和工程实践,为构建可靠的AI数据查询系统提供参考。
一、系统概览:不只是"LLM生成SQL"那么简单
当你看到validate_sql.py这个3869行的文件时,第一反应可能是:"为什么一个简单的Text-to-SQL任务需要这么多代码?"
答案在于:生产环境与Demo的本质区别。
1.1 三大核心模式
该系统支持三种工作模式:
| 模式 | 功能 | 应用场景 |
|---|---|---|
| 模式1 | JSONL批量验证 | 已有SQL数据集的回归测试 |
| 模式2 | Excel生成+验证 | 从问题列表自动生成SQL(主流程) |
| 模式3 | Excel已有SQL评估 | 对历史SQL进行质量审计 |
1.2 核心能力矩阵
plaintext
┌─────────────────────────────────────────┐ │ validate_sql.py 核心能力 │ ├─────────────────────────────────────────┤ │ ✓ 多轮反思纠错(最多N轮迭代) │ │ ✓ n-best并行候选生成 │ │ ✓ 双模型协作(小模型生成 + 大模型裁判) │ │ ✓ 细粒度错误诊断(15种标签体系) │ │ ✓ 数据库断连自动重连 │ │ ✓ 断点续跑与周期落盘 │ │ ✓ LLM服务降级与重试机制 │ │ ✓ SQL清洗与安全防护 │ └─────────────────────────────────────────┘二、架构设计:分层解耦的工程哲学
2.1 三层架构
虽然代码在一个文件中,但逻辑上分为三个层次:
plaintext
┌──────────────────────────────────────┐ │ Layer 1: 流程编排层 │ │ - run_generate_validate_for_xlsx() │ │ - run_judge_existing_sql_for_xlsx()│ │ - main() │ ├──────────────────────────────────────┤ │ Layer 2: 业务逻辑层 │ │ - build_generate_messages() │ │ - build_reflect_messages() │ │ - _call_llm_judge_realistic() │ │ - execute_sql_server() │ ├──────────────────────────────────────┤ │ Layer 3: 工具函数层 │ │ - _strip_code_fences() │ │ - _ensure_single_statement() │ │ - _extract_exec_error_facts() │ │ - _truncate_text() │ └──────────────────────────────────────┘2.2 关键数据结构
python
@dataclass class XlsxRow: """Excel行数据结构""" 表名: str 问题种类: str 问题: str sql: str @dataclass class SqlTask: """SQL验证任务""" idx: int obj: Dict[str, Any] key: str # sql / sql_turn1 / sql_turn2 sql: str三、核心技术深度解析
3.1 多轮反思机制(Reflection Loop)
这是系统的灵魂所在。传统Text-to-SQL是"一次生成",而本系统采用生成-执行-评估-修正的闭环:
python
for r in range(max_rounds): # 默认3轮 if r == 0: # 第1轮:初始生成 msgs = build_generate_messages(...) else: # 后续轮次:基于上一轮反馈反思 fb = _summarize_exec_feedback_for_small(last_exec) tags = last_judge.get("error_tags") advice = last_judge.get("advice") msgs = build_reflect_messages( prev_sql=prev_sql, exec_feedback=fb, error_tags=tags, judge_advice=advice, ... ) gen_sql = call_llm_sql(client=small_client, messages=msgs) exec_ret = execute_sql_server(sql=gen_sql) if exec_ret["success"] and exec_ret["row_count"] > 0: break # 成功则退出 # 失败则进入下一轮反思 last_judge = _call_llm_judge_realistic(...) prev_sql = gen_sql关键洞察:
- 每轮反思都携带结构化反馈(非原始报错)
- 裁判模型输出错误标签和修复建议
- 小模型根据标签定向修正(而非盲目重试)
3.2 n-best并行候选生成
首轮生成不是单条SQL,而是并行生成N个候选(默认5个):
python
n_best = max(1, int(gen_n)) # 默认5 max_workers = min(int(gen_parallel), n_best) with ThreadPoolExecutor(max_workers=max_workers) as ex: futs = {ex.submit(_gen_one, i + 1): (i + 1) for i in range(n_best)} for fut in as_completed(futs): sql_out = fut.result() if sql_out: gen_sqls.append(sql_out)优势:
- 提高首轮命中率(多样性采样)
- 避免串行等待(并行加速)
- 去重后保留唯一候选
3.3 双模型协作架构
系统采用大小模型分工策略:
| 角色 | 模型 | 职责 | 温度 |
|---|---|---|---|
| 生成模型 | qwen3-coder(小模型) | SQL生成/修正 | 0.0-0.3 |
| 裁判模型 | qwen3(大模型) | 结果评估/错误诊断 | 0.01 |
裁判模型的两大功能:
功能1:完整诊断(_call_llm_judge_realistic)
输出JSON结构:
json
{ "is_correct": false, "score": 45, "error_tags": ["COLUMN_NAME_FIX"], "repair_task": "sql_correct", "comment": "列名拼写错误", "advice": "将PART_NM改为PART_NAME_C" }功能2:简化判断(_call_llm_judge_mode3_answer_only)
仅输出:
json
{ "is_correct": true, "comment": "SQL正确回答了问题" }设计考量:
- 完整诊断用于反思阶段(指导修正)
- 简化判断用于最终评估(快速筛选)
3.4 细粒度错误标签体系
系统定义了15种诊断标签,覆盖常见SQL错误:
python
DIAG_TAGS = [ # 语法/拼写 "SYNTAX_FIX_KEYWORD_TYPO", # 关键字拼写错误 "SYNTAX_FIX_PUNCTUATION", # 标点/括号问题 "SYNTAX_FIX_IDENTIFIER_QUOTE", # 标识符引号 # 列/表/别名 "ALIAS_ADD_FOR_EXPR", # 表达式缺别名 "COLUMN_NAME_FIX", # 列名错误 "TABLE_NAME_FIX", # 表名错误 "AMBIGUOUS_COLUMN_ADD_QUALIFIER", # 歧义列 # 类型/空值 "TYPE_CAST_NUMERIC", # 数值类型转换 "TYPE_CAST_DATE", # 日期类型转换 "NULL_COALESCE_ADD", # NULL处理 "DIVIDE_BY_ZERO_GUARD", # 除零保护 # 语义/逻辑 "AGGREGATION_FIX", # 聚合/GROUP BY "FILTER_CONDITION_FIX", # WHERE条件 "TIME_WINDOW_FIX", # 时间窗口 "SELECT_COLUMNS_FIX", # SELECT列选择 "ORDER_BY_FIX", # ORDER BY排序 ]标签映射规则:
- 每个错误只分配1个最相关标签
- 标签驱动修复策略(
repair_task) - 提供针对性建议(
advice字段)
3.5 SQL清洗与安全防护
LLM输出的SQL往往包含噪声,系统实现了8步清洗流水线:
python
def _strip_code_fences(text: str) -> str: # 1. 去除Markdown代码块(```sql ... ```) # 2. 去除<think>推理块 # 3. 识别并剔除推理开场白("好的,我需要...") # 4. 从第一个SQL关键字开始截取 # 5. 去除EOS标记(</s>, <|eot_id|>等) # 6. 去除外层双引号 # 7. 清理末尾分号 # 8. 清除推理结束标记("因此SQL为:") return cleaned_sql安全校验:
python
def _ensure_single_statement(sql: str) -> Optional[str]: """确保只有一条SELECT语句""" # 禁止多条语句(防止注入) # 禁止非SELECT操作(INSERT/UPDATE/DELETE) # 允许安全前缀(SET NOCOUNT ON; DECLARE @var)3.6 数据库容错机制
生产环境数据库连接不稳定,系统实现了三层防护:
python
# 1. 预检机制 cur.execute("SELECT 1") # 启动时验证连接 # 2. 断连检测 def _is_db_disconnected(exec_ret): err = str(exec_ret.get("error")) return any(k in err for k in [ "Not connected to any MS SQL server", "DBPROCESS is dead", "Connection reset", ]) # 3. 自动重连 if _is_db_disconnected(exec_ret): _close_db(conn) conn = _connect_db() exec_ret = execute_sql_server(...) # 重试四、工程实践亮点
4.1 断点续跑(Resume)
处理大规模数据集时,中断后可无缝续跑:
python
# 跳过已成功的行 if resume and existing_sql and _is_true(existing_judge): skipped_cnt += 1 continue # 周期落盘(每N条保存一次) if attempted % save_every == 0: wb.save(target_xlsx) ff.flush()收益:
- 避免重复计算(节省成本)
- 故障恢复(网络中断/服务重启)
- 增量处理(新数据追加)
4.2 LLM服务降级
面对不稳定的LLM服务,系统实现多级回退:
python
# 1. 重试机制(最多N次) for attempt in range(llm_retries + 1): try: resp = client.chat.completions.create(...) break except Exception as e: if _is_retryable_llm_error(e): time.sleep(1.0) continue raise # 2. response_format降级 try: resp = client.chat.completions.create( response_format={"type": "json_object"} ) except Exception: resp = client.chat.completions.create() # 不带format # 3. 裁判失败兜底 if judge_parse_failed: last_judge = _fallback_diagnosis_when_judge_down(...)4.3 执行结果压缩
为避免Prompt过长,系统对执行结果做智能压缩:
python
def _execution_summary(exec_result, max_preview_rows=3): return { "success": True/False, "error": "错误信息(失败时)", "row_count": 10, "columns": ["col1", "col2"], "rows_preview": [前3行数据], # 截断长文本 "elapsed_ms": 125 }压缩策略:
- 成功且行数少 → 全量返回
- 行数过多 → 仅返回摘要(前3行预览)
- 字符串超过120字符 → 截断
4.4 进度跟踪与日志
系统提供实时进度反馈:
python
[PROGRESS] attempted=50 filled=42 skipped=5 fail=3 (rows=200) [JUDGE_FALSE] row=23 row_count=0 truncated=False comment=过滤条件过严 [EXEC_FAIL] row=45 err=Invalid column name 'PART_NAM'五、性能优化技巧
5.1 并行化策略
| 环节 | 并行度 | 说明 |
|---|---|---|
| n-best生成 | 5线程 | 同时生成5个候选SQL |
| Judge评估 | 8线程 | 并行判定多个候选 |
| 数据库执行 | 串行 | 避免DB压力过大 |
5.2 Token控制
python
# 限制Judge输出长度 judge_max_tokens = int(os.getenv("JUDGE_MAX_TOKENS", "256")) # 避免temperature=0触发后端冲突 temp_for_judge = max(0.01, temperature) # 文本截断 def _truncate_text(s, max_chars=2000): if len(s) <= max_chars: return s return s[:max_chars] + "...<truncated>"5.3 缓存机制
python
# 表列名缓存(避免重复查询系统表) cols_cache: Dict[str, List[str]] = {} if table not in cols_cache: cols_cache[table] = get_table_columns(conn, table)六、典型错误案例与修复
案例1:列名拼写错误
错误SQL:
sql
SELECT PART_NAM, PRICE FROM VIEW_PART_USER_HISTORY裁判诊断:
json
{ "error_tags": ["COLUMN_NAME_FIX"], "advice": "将PART_NAM改为PART_NAME_C" }修复后:
sql
SELECT PART_NAME_C, PRICE FROM VIEW_PART_USER_HISTORY案例2:时间函数不兼容
错误SQL:
sql
SELECT * FROM orders WHERE QUARTER(order_date) = 1裁判诊断:
json
{ "error_tags": ["TIME_WINDOW_FIX"], "advice": "SQL Server不支持QUARTER(),改用DATEPART(QUARTER, order_date)" }案例3:GROUP BY缺失
错误SQL:
sql
SELECT category, SUM(amount) FROM orders裁判诊断:
json
{ "error_tags": ["AGGREGATION_FIX"], "advice": "非聚合列category需加入GROUP BY" }七、使用指南
7.1 基本用法
bash
# 模式2:从Excel生成SQL python validate_sql.py \ --questions-xlsx questions.xlsx \ --ref-xlsx sql3.xlsx \ --db-host localhost \ --db-user sa \ --db-password your_password \ --db-name your_database \ --small-base-url http://your-llm-api \ --small-model qwen3-coder \ --judge-model qwen3 \ --max-rounds 3 \ --gen-n 57.2 关键参数
| 参数 | 默认值 | 说明 |
|---|---|---|
--max-rounds | 3 | 最大反思轮次 |
--gen-n | 5 | n-best候选数量 |
--gen-parallel | 5 | 并行生成线程数 |
--judge-parallel | 8 | 并行Judge线程数 |
--save-every | 20 | 每N条落盘一次 |
--resume | True | 启用断点续跑 |
--strict-health | True | 严格健康检查 |
7.3 输出文件
plaintext
questions_result_20260414.xlsx # 带SQL和判定的Excel sql_gen_success.jsonl # 成功轨迹 sql_gen_failures.jsonl # 失败轨迹(含每轮尝试)八、局限性与改进方向
8.1 当前局限
- 单表限制:主要针对单表查询,复杂JOIN场景支持有限
- Schema静态:表结构硬编码在代码中(
STATIC_SCHEMA_COLUMNS) - 依赖pymssql:仅支持SQL Server,未抽象数据库接口
- 单文件臃肿:3869行代码维护成本高(已模块化重构)
8.2 改进建议
- 多表JOIN推理:引入Schema链接(Schema Linking)技术
- 动态Schema加载:从数据库系统表自动获取列信息
- 数据库抽象层:支持MySQL/PostgreSQL等多引擎
- 向量检索增强:集成FAISS实现few-shot样例智能检索
- 执行计划分析:结合EXPLAIN PLAN优化复杂查询
九、总结
validate_sql.py展现了一个工业级Text-to-SQL系统应有的工程素养:
✅鲁棒性:断连重连、服务降级、异常捕获
✅可维护性:分层架构、清晰命名、详细注释
✅可扩展性:模块化设计、配置驱动、插件式标签
✅可观测性:进度跟踪、轨迹记录、详细日志
它告诉我们:优秀的AI应用不仅是算法创新,更是工程艺术的体现。
附录:核心代码统计
| 模块 | 行数 | 占比 |
|---|---|---|
| SQL生成与反思 | ~800 | 21% |
| 裁判模型评估 | ~600 | 16% |
| 数据库执行 | ~200 | 5% |
| 工具函数 | ~500 | 13% |
| 流程编排 | ~1200 | 31% |
| 其他 | ~569 | 14% |
| 总计 | 3869 | 100% |
