字段向量索引要解决的问题是:怎么让字段信息具备语义检索能力。
字段向量索引的建立:
async def _save_column_info_to_qdrant(self, column_infos: list[ColumnInfo]):"""把字段元数据继续推进成可语义检索的 Qdrant 向量点"""await self.column_qdrant_repository.ensure_collection()points: list[dict] = []for column_info in column_infos:# 一个字段不会只生成一个向量点,而是把名字 描述 别名都拆开建立语义入口points.append({"id": uuid.uuid4(),"embedding_text": column_info.name,"payload": asdict(column_info),})points.append({"id": uuid.uuid4(),"embedding_text": column_info.description,"payload": asdict(column_info),})for alia in column_info.alias:points.append({"id": uuid.uuid4(),"embedding_text": alia,"payload": asdict(column_info),})# 先把待向量化文本抽出来,再分批调用 Embedding 服务# 这样更容易控制单次请求大小embeddings: list[list[float]] = []embedding_texts = [point["embedding_text"] for point in points]embedding_batch_size = 20for i in range(0, len(embedding_texts), embedding_batch_size):batch_embedding_texts = embedding_texts[i : i + embedding_batch_size]batch_embeddings = await self.embedding_client.aembed_documents(batch_embedding_texts)embeddings.extend(batch_embeddings)ids = [point["id"] for point in points]payloads = [point["payload"] for point in points]await self.column_qdrant_repository.upsert(ids, embeddings, payloads)
这段代码做了 4 件事:
- 先确保
Qdrant中用于存字段向量的 collection 存在 - 把每个字段拆成多个待向量化的 point
- 调用
Embedding服务批量生成向量 - 把
id、embedding、payload一起写入Qdrant
为什么一个字段不会生成一个point
字段向量索引的核心设计:一个字段不会只建一个点,而是要围绕这个字段拆出多个语义入口.
例如一个字段:
- 字段名:
order_amount - 描述:订单金额
- 别名:销售额、成交金额、订单总额
所以当前实现采用的是:
- 字段名建一个 point
- 字段描述建一个 point
- 每一个字段别名再各建一个 point
这样做的目的,就是尽量让不同的自然语言表达都能把同一个字段召回来
这个过程写得再具体一点,一个 ColumnInfo 往往会先被展开成这样一组中间数据:
# 同一个字段会被拆成多个语义入口,但 payload 都指向同一个 ColumnInfo
[{# 字段名入口"id": "随机 point id 1","embedding_text": "order_amount","payload": {...完整 column_info...},},{# 字段描述入口"id": "随机 point id 2","embedding_text": "订单金额","payload": {...完整 column_info...},},{# 字段别名入口"id": "随机 point id 3","embedding_text": "销售额","payload": {...完整 column_info...},},{# 另一个字段别名入口"id": "随机 point id 4","embedding_text": "成交金额","payload": {...完整 column_info...},},
]
复习Qdrant的四个概念
- collection: 可以类比成向量库里的一张集合表
- point: 是真正写进
Qdrant的一条记录 - vector: 是某段文本经过向量模型编码后的数值表示
- payload: 则是和向量一起保存的业务数据
如果把这 4 个概念重新串成一句更贴近代码的话,就是:
一个字段-> 拆成多个 point-> 每个 point 先有一段 embedding_text-> embedding_text 被编码成 vector-> vector 和 payload 一起写入同一个 collection
ponit和payload
point 是整条向量记录,payload 是这条向量记录里附带的业务信息。也就是说,payload 是 point 的一部分,而不是和 point 并列的东西。
如果把一个真正写入 Qdrant 的点简化画出来,大致会像这样:
PointStruct(# point 自己的存储 ID,不等同于字段 IDid="0b7f...",# embedding_text 编码后的向量,检索时主要比较它vector=[0.12, -0.08, ...],# 命中后返回给业务层的字段上下文payload={"id": "fact_order.order_amount","name": "order_amount","description": "订单金额","table_id": "fact_order",...},
)
向量检索真正比较的是 vector,但命中之后对业务最有价值的,是 payload 里带回来的字段上下文。
为什么point的id用uuid
原因很简单:一个字段会拆成多个 point。如果你直接用 column_info.id,那同一个字段名、字段描述、字段别名就会冲突
所以更合理的做法是:字段本身保持一个稳定的字段 ID,而每个 point 再单独生成自己的唯一 point ID.这里有两套不同的标识:column_info.id 表示业务字段自己的稳定身份,point.id 表示某一个向量点自己的存储身份
payload为什么直接保存整个ColumnInfo
这里不是只保存字段名,也不是只保存某一个小片段,而是直接保存整个 ColumnInfo 的字典表示。
好处是:后面向量检索命中之后,系统可以直接拿回完整字段上下文,而不需要再专门回别的地方拼装字段描述、字段角色、所属表等信息。
项目明显优先选择了另一件事:查询链路简单。因为后面一旦命中某个 point,系统立刻就能拿到完整字段信息,而不用再做“先查到 point,再根据字段 id 回 MySQL 二次补全”的额外动作。
ColumnQdrantRespostiry负责做什么
class ColumnQdrantRepository:# 字段向量统一写入这个 collectioncollection_name = "column_info_collection"def __init__(self, client: AsyncQdrantClient):self.client = clientasync def ensure_collection(self):if not await self.client.collection_exists(self.collection_name):await self.client.create_collection(collection_name=self.collection_name,vectors_config=VectorParams(# 必须和 Embedding 模型输出维度一致size=app_config.qdrant.embedding_size,# 用余弦距离衡量文本语义相似度# 说明后面做相似度检索时,主要按余弦相似度来比较“两个向量方向是否接近”distance=Distance.COSINE,),)async def upsert(self,ids: list[str],embeddings: list[list[float]],payloads: list[dict],batch_size: int = 10,):# 三个列表必须保持同一顺序,才能正确拼成 Qdrant 需要的向量记录points: list[PointStruct] = [PointStruct(id=id, vector=embedding, payload=payload)for id, embedding, payload in zip(ids, embeddings, payloads)]for i in range(0, len(points), batch_size):await self.client.upsert(collection_name=self.collection_name,# 分批写入,降低单次 upsert 压力points=points[i : i + batch_size],)
这个仓储层主要负责两件事:
- 确保字段 collection 已经建好
- 把已经准备好的向量点稳定写入
Qdrant
这里的职责边界也很清楚:Service 决定一个字段该拆成哪些 point,Repository 负责 collection 创建和批量落库
这条链路按顺序理解会更顺:
中间结构 points-> 抽出 embedding_texts 批量做向量化-> 得到 embeddings-> 再和 ids / payloads 按相同顺序拼回完整 point-> PointStruct-> Qdrant
为什么embedding和upsert要分批
这里有两个“分批”动作:一个发生在向量化阶段,一个发生在向量入库阶段。前者是为了减轻模型服务的调用压力,后者是为了降低向量库的写入压力,两者合在一起能让构建链路更稳定。
字段值全文索引构建:
字段值全文索引要解决的问题是:怎么让系统知道字段里有哪些真实值可以匹配。
async def _save_value_info_to_es(self, meta_config: MetaConfig, column_infos: list[ColumnInfo]
):await self.value_es_repository.ensure_index()# 把配置整理成“字段 id -> 是否同步真实值”的快速查询表column2sync: dict[str, bool] = {}for table in meta_config.tables:for column in table.columns:# 字段 id 要和 ColumnInfo.id 的拼法保持一致column2sync[f"{table.name}.{column.name}"] = column.syncvalue_infos: list[ValueInfo] = []for column_info in column_infos:# 根据字段 id 判断该字段是否需要同步取值sync = column2sync[column_info.id]if sync:# 从数仓查询该字段的 distinct 真实值,作为全文索引的数据来源current_column_values = await self.dw_mysql_repository.get_column_values(column_info.table_id, column_info.name, 100000)current_values_infos = [ValueInfo(# 字段 id + 字段值,组成一条值记录的唯一 idid=f"{column_info.id}.{current_column_value}",# 真正参与 ES 全文检索的文本value=current_column_value,# 记录这个值属于哪个字段,方便命中后反查字段上下文column_id=column_info.id,)for current_column_value in current_column_values]value_infos.extend(current_values_infos)# Repository 负责把 ValueInfo 转成 ES bulk 写入格式await self.value_es_repository.index(value_infos)
这段代码做了 5 件事:
- 先确保 Elasticsearch 中用于存字段值的索引存在
- 读取配置,找出哪些字段允许同步真实取值
- 去
DW查询这些字段的真实值集合 - 把每个真实值封装成
ValueInfo - 批量写入 Elasticsearch
