别再死记硬背HMM了!用Python手搓一个中文分词器,从BMES标注到Viterbi解码全流程
从零构建HMM中文分词器:BMES标注与Viterbi解码实战指南
当处理中文文本时,分词是第一个关键步骤。与英文不同,中文没有天然的空格分隔,这使得分词成为NLP任务中的基础挑战。本文将带你用Python实现一个基于隐马尔可夫模型(HMM)的中文分词器,从理论到代码,一步步揭开统计分词的神秘面纱。
1. 理解BMES标注体系
在HMM分词中,我们采用BMES标注方案对汉字进行标记:
- B (Begin):表示词的首字
- M (Middle):表示词的中间字
- E (End):表示词的尾字
- S (Single):表示单字词
例如,"人工智能技术"会被标注为:
人/B 工/M 能/E 技/B 术/E这种标注方案的优势在于:
- 能够清晰表示词的边界
- 适用于不同长度的词语
- 简化了分词任务的建模过程
为什么选择BMES而不是其他标注方案?
- 相比BIOS标注,BMES更简洁
- 四类标签足以覆盖所有中文分词场景
- 实践证明在中文分词任务中效果良好
2. HMM模型的三要素
要实现HMM分词器,我们需要计算三个核心概率矩阵:
2.1 初始状态概率(π)
表示句子第一个字处于各状态的概率:
start_p = { 'B': 0.6, # 句子开头是词首的概率 'M': 0.1, 'E': 0.1, 'S': 0.2 # 句子开头是单字词的概率 }2.2 状态转移概率(A)
表示从一个状态转移到另一个状态的概率:
trans_p = { 'B': {'B': 0.1, 'M': 0.6, 'E': 0.2, 'S': 0.1}, 'M': {'B': 0.1, 'M': 0.5, 'E': 0.3, 'S': 0.1}, # ...其他状态转移概率 }2.3 发射概率(B)
表示在某个状态下观察到特定汉字的概率:
emit_p = { 'B': {'中': 0.05, '文': 0.03, ...}, 'M': {'中': 0.01, '文': 0.02, ...}, # ...其他状态的发射概率 }3. 从语料库训练HMM参数
我们需要一个已分词的中文语料库来统计这些概率。以下是训练过程的关键步骤:
- 预处理语料:将分好词的文本转换为BMES标注序列
- 统计频数:
- 统计每个状态的初始出现次数
- 统计状态间的转移次数
- 统计每个状态下各汉字的出现次数
- 计算概率:将频数转换为概率,使用加一平滑处理未登录词
def train_hmm(corpus_path): # 初始化统计字典 start_count = {'B':0, 'M':0, 'E':0, 'S':0} trans_count = {s: {t:0 for t in 'BMES'} for s in 'BMES'} emit_count = {s: {} for s in 'BMES'} state_count = {'B':0, 'M':0, 'E':0, 'S':0} with open(corpus_path, 'r', encoding='utf-8') as f: for line in f: words = line.strip().split() # 生成BMES标签序列 tags = [] for word in words: if len(word) == 1: tags.append('S') else: tags.append('B') tags.extend(['M']*(len(word)-2)) tags.append('E') # 统计频数 for i, tag in enumerate(tags): state_count[tag] += 1 if i == 0: start_count[tag] += 1 else: trans_count[tags[i-1]][tag] += 1 # 统计发射频数 char = line[i] emit_count[tag][char] = emit_count[tag].get(char, 0) + 1 # 计算概率(加一平滑) total_lines = sum(start_count.values()) start_p = {s: (start_count[s]+1)/(total_lines+4) for s in 'BMES'} trans_p = {} for s in 'BMES': trans_p[s] = {} total = sum(trans_count[s].values()) for t in 'BMES': trans_p[s][t] = (trans_count[s][t]+1)/(total+4) emit_p = {} for s in 'BMES': emit_p[s] = {} total = state_count[s] for char in emit_count[s]: emit_p[s][char] = (emit_count[s][char]+1)/(total+len(emit_count[s])) return start_p, trans_p, emit_p4. 实现Viterbi算法解码
Viterbi算法是HMM分词的核心,它能在O(TN²)时间内找到最可能的状态序列(T为序列长度,N为状态数)。
4.1 Viterbi算法步骤
- 初始化:计算第一个字在各状态的概率
- 递推:对于每个后续的字,计算到达每个状态的最大概率路径
- 终止:找到最后一个字的最可能状态
- 回溯:从最后一个字回溯,得到完整的状态序列
def viterbi(obs, states, start_p, trans_p, emit_p): V = [{}] # 存储每个时间步的概率 path = {} # 初始化 for y in states: V[0][y] = start_p[y] * emit_p[y].get(obs[0], 1e-10) path[y] = [y] # 递推 for t in range(1, len(obs)): V.append({}) newpath = {} for y in states: (prob, state) = max( (V[t-1][y0] * trans_p[y0].get(y, 1e-10) * emit_p[y].get(obs[t], 1e-10), y0) for y0 in states ) V[t][y] = prob newpath[y] = path[state] + [y] path = newpath # 终止 (prob, state) = max((V[-1][y], y) for y in states) return (prob, path[state])4.2 处理未登录词
当遇到训练集中未出现的汉字时,我们的模型可能会失效。解决方法包括:
- 使用加一平滑(已经在训练代码中实现)
- 引入汉字部首或笔画数等特征
- 对低频词进行特殊处理
5. 完整分词器实现
现在我们将所有组件整合成一个完整的HMM分词器类:
import pickle from collections import defaultdict class HMMSegmenter: def __init__(self): self.states = ['B', 'M', 'E', 'S'] self.start_p = {} # 初始概率 self.trans_p = {} # 转移概率 self.emit_p = {} # 发射概率 self.trained = False def train(self, corpus_path, model_path='hmm_model.pkl'): # 初始化统计字典 start_count = defaultdict(int) trans_count = {s: defaultdict(int) for s in self.states} emit_count = {s: defaultdict(int) for s in self.states} state_count = defaultdict(int) # 统计频数 with open(corpus_path, 'r', encoding='utf-8') as f: line_num = 0 for line in f: line = line.strip() if not line: continue line_num += 1 words = line.split() # 生成BMES标签序列 tags = [] for word in words: if len(word) == 1: tags.append('S') else: tags.append('B') tags.extend(['M']*(len(word)-2)) tags.append('E') # 确保标签序列与字符序列长度一致 assert len(tags) == len(''.join(words)) # 统计频数 for i, (char, tag) in enumerate(zip(''.join(words), tags)): state_count[tag] += 1 if i == 0: start_count[tag] += 1 else: trans_count[tags[i-1]][tag] += 1 emit_count[tag][char] += 1 # 计算概率(加一平滑) self.start_p = { s: (start_count.get(s, 0)+1)/(line_num+4) for s in self.states } self.trans_p = { s: { t: (trans_count[s].get(t, 0)+1)/(sum(trans_count[s].values())+4) for t in self.states } for s in self.states } self.emit_p = { s: { char: (emit_count[s].get(char, 0)+1)/(state_count.get(s, 1)+len(emit_count[s])) for char in emit_count[s] } for s in self.states } # 保存模型 with open(model_path, 'wb') as f: pickle.dump({ 'start_p': self.start_p, 'trans_p': self.trans_p, 'emit_p': self.emit_p }, f) self.trained = True print(f'模型训练完成,已保存到 {model_path}') def load_model(self, model_path): with open(model_path, 'rb') as f: model = pickle.load(f) self.start_p = model['start_p'] self.trans_p = model['trans_p'] self.emit_p = model['emit_p'] self.trained = True print('模型加载完成') def __viterbi(self, text): V = [{}] path = {} # 初始化 for y in self.states: V[0][y] = self.start_p[y] * self.emit_p[y].get(text[0], 1e-10) path[y] = [y] # 递推 for t in range(1, len(text)): V.append({}) new_path = {} for y in self.states: (prob, state) = max( (V[t-1][y0] * self.trans_p[y0].get(y, 1e-10) * self.emit_p[y].get(text[t], 1e-10), y0) for y0 in self.states ) V[t][y] = prob new_path[y] = path[state] + [y] path = new_path # 终止 (prob, state) = max((V[-1][y], y) for y in self.states) return path[state] def segment(self, text): if not self.trained: raise RuntimeError('请先训练或加载模型') if not text: return [] tags = self.__viterbi(text) # 根据标签序列进行分词 result = [] start = 0 for i, tag in enumerate(tags): if tag == 'B': start = i elif tag == 'E': result.append(text[start:i+1]) elif tag == 'S': result.append(text[i]) return result6. 使用示例与性能优化
6.1 基本使用方法
# 训练模型 segmenter = HMMSegmenter() segmenter.train('pku_training.utf8', 'my_hmm_model.pkl') # 或者加载已有模型 # segmenter.load_model('my_hmm_model.pkl') # 分词 text = "自然语言处理是人工智能的重要方向" result = segmenter.segment(text) print('/'.join(result)) # 输出:自然/语言/处理/是/人工/智能/的/重要/方向6.2 性能优化技巧
- 对数概率:避免浮点数下溢,使用对数概率计算
- 剪枝:在Viterbi算法中保留前N个最优路径,而不是全部
- 并行化:对长文本分段处理
- 模型压缩:只保留高频字的发射概率
优化后的Viterbi实现(对数版本):
import math def viterbi_log(obs, states, start_p, trans_p, emit_p): V = [{}] path = {} # 初始化(取对数) for y in states: V[0][y] = math.log(start_p.get(y, 1e-10)) + math.log(emit_p[y].get(obs[0], 1e-10)) path[y] = [y] # 递推 for t in range(1, len(obs)): V.append({}) new_path = {} for y in states: (prob, state) = max( (V[t-1][y0] + math.log(trans_p[y0].get(y, 1e-10)) + math.log(emit_p[y].get(obs[t], 1e-10)), y0) for y0 in states ) V[t][y] = prob new_path[y] = path[state] + [y] path = new_path # 终止 (prob, state) = max((V[-1][y], y) for y in states) return path[state]7. 评估与改进方向
7.1 评估指标
- 准确率(Precision):正确分出的词数/分词器分出的总词数
- 召回率(Recall):正确分出的词数/标准答案中的总词数
- F1值:准确率和召回率的调和平均
7.2 可能的改进方向
- 结合词典:将基于规则的方法与统计方法结合
- 特征扩展:考虑汉字的结构特征(偏旁、笔画数等)
- 模型融合:结合CRF、神经网络等更强大的模型
- 领域适应:针对特定领域(如医疗、法律)训练专用模型
在实际项目中,HMM分词器虽然简单,但仍然是许多工业级分词系统的基础组件。理解它的原理和实现,能为学习更复杂的NLP模型打下坚实基础。
